You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by pm...@apache.org on 2018/12/06 00:41:39 UTC

[2/2] samza git commit: SAMZA-2012: Add API for wiring an external context through to application processing code

SAMZA-2012: Add API for wiring an external context through to application processing code

This PR also refactors TestSamzaSqlRemoteTable to be in samza-test instead of samza-sql, since it seems to actually be an integration test. It is useful to move that test in this PR so that tests that may need an external context can be consolidated.

Author: Cameron Lee <ca...@linkedin.com>

Reviewers: Prateek Maheshwari <pm...@apache.org>, Shanthoosh Venkatraman <sv...@linkedin.com>

Closes #829 from cameronlee314/external_context


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/e2adf8f9
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/e2adf8f9
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/e2adf8f9

Branch: refs/heads/master
Commit: e2adf8f9956879061a76b433369dbe3d5f524044
Parents: 1a7e270
Author: Cameron Lee <ca...@linkedin.com>
Authored: Wed Dec 5 16:41:34 2018 -0800
Committer: Prateek Maheshwari <pm...@apache.org>
Committed: Wed Dec 5 16:41:34 2018 -0800

----------------------------------------------------------------------
 .../ApplicationContainerContextFactory.java     |  32 ++-
 .../context/ApplicationTaskContextFactory.java  |  41 +++-
 .../java/org/apache/samza/context/Context.java  |  11 +
 .../apache/samza/context/ExternalContext.java   |  33 +++
 .../apache/samza/runtime/ApplicationRunner.java |  10 +-
 .../samza/runtime/TestApplicationRunners.java   |   3 +-
 .../org/apache/samza/context/ContextImpl.java   |  31 ++-
 .../apache/samza/processor/StreamProcessor.java |  19 +-
 .../samza/runtime/ApplicationRunnerMain.java    |   3 +-
 .../samza/runtime/LocalApplicationRunner.java   |  11 +-
 .../samza/runtime/LocalContainerRunner.java     |  42 ++--
 .../samza/runtime/RemoteApplicationRunner.java  |   3 +-
 .../apache/samza/container/SamzaContainer.scala |   8 +-
 .../apache/samza/container/TaskInstance.scala   |  10 +-
 .../samza/job/local/ThreadJobFactory.scala      |  14 +-
 .../org/apache/samza/context/MockContext.java   |  16 +-
 .../apache/samza/context/TestContextImpl.java   |  63 ++---
 .../samza/processor/TestStreamProcessor.java    |   4 +-
 .../runtime/TestApplicationRunnerMain.java      |   3 +-
 .../runtime/TestLocalApplicationRunner.java     |  60 ++++-
 .../org/apache/samza/task/TestAsyncRunLoop.java |   1 +
 .../samza/container/TestSamzaContainer.scala    |  33 +--
 .../samza/container/TestTaskInstance.scala      |  57 +++--
 .../processor/StreamProcessorTestUtils.scala    |   4 +-
 .../samza/sql/client/impl/SamzaExecutor.java    |   4 +-
 .../samza/sql/runner/SamzaSqlApplication.java   |  20 +-
 .../sql/runner/SamzaSqlApplicationRunner.java   |  11 +-
 .../samza/sql/translator/QueryTranslator.java   |  19 +-
 .../samza/sql/e2e/TestSamzaSqlRemoteTable.java  | 231 -------------------
 .../apache/samza/test/framework/TestRunner.java |  16 +-
 .../integration/LocalApplicationRunnerMain.java |  13 +-
 .../EndOfStreamIntegrationTest.java             |   7 +-
 .../WatermarkIntegrationTest.java               |   5 +-
 ...StreamApplicationIntegrationTestHarness.java |   2 +-
 .../processor/TestZkLocalApplicationRunner.java |  34 +--
 .../SamzaSqlIntegrationTestHarness.java         |  32 +++
 .../test/samzasql/TestSamzaSqlEndToEnd.java     |  79 +++----
 .../test/samzasql/TestSamzaSqlRemoteTable.java  | 222 ++++++++++++++++++
 .../apache/samza/test/table/TestLocalTable.java | 196 +++++++---------
 .../samza/test/table/TestRemoteTable.java       |   6 +-
 .../AbstractIntegrationTestHarness.scala        |  16 +-
 41 files changed, 834 insertions(+), 591 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/e2adf8f9/samza-api/src/main/java/org/apache/samza/context/ApplicationContainerContextFactory.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/context/ApplicationContainerContextFactory.java b/samza-api/src/main/java/org/apache/samza/context/ApplicationContainerContextFactory.java
index 7f2c6a4..499a8a9 100644
--- a/samza-api/src/main/java/org/apache/samza/context/ApplicationContainerContextFactory.java
+++ b/samza-api/src/main/java/org/apache/samza/context/ApplicationContainerContextFactory.java
@@ -19,6 +19,7 @@
 package org.apache.samza.context;
 
 import java.io.Serializable;
+import org.apache.samza.annotation.InterfaceStability;
 import org.apache.samza.application.SamzaApplication;
 import org.apache.samza.application.descriptors.ApplicationDescriptor;
 
@@ -35,14 +36,41 @@ import org.apache.samza.application.descriptors.ApplicationDescriptor;
  *
  * @param <T> concrete type of {@link ApplicationContainerContext} created by this factory
  */
+@InterfaceStability.Evolving
 public interface ApplicationContainerContextFactory<T extends ApplicationContainerContext> extends Serializable {
-
   /**
    * Creates an instance of the application-defined {@link ApplicationContainerContext}.
+   * <p>
+   * Applications should implement this to provide a context for container initialization.
    *
+   * @param externalContext external context provided for the application; null if it was not provided
    * @param jobContext framework-provided job context
    * @param containerContext framework-provided container context
    * @return a new instance of the application-defined {@link ApplicationContainerContext}
    */
-  T create(JobContext jobContext, ContainerContext containerContext);
+  default T create(ExternalContext externalContext, JobContext jobContext, ContainerContext containerContext) {
+    return create(jobContext, containerContext);
+  }
+
+  /**
+   * New implementations should not implement this directly. Implement
+   * {@link #create(ExternalContext, JobContext, ContainerContext)} instead.
+   * <p>
+   * This is the same as {@link #create(ExternalContext, JobContext, ContainerContext)}, except it does not provide
+   * access to external context.
+   * <p>
+   * This is being left here for backwards compatibility.
+   *
+   * @param jobContext framework-provided job context
+   * @param containerContext framework-provided container context
+   * @return a new instance of the application-defined {@link ApplicationContainerContext}
+   *
+   * Deprecated: Applications should implement {@link #create(ExternalContext, JobContext, ContainerContext)} directly.
+   * This is being left here for backwards compatibility.
+   */
+  @Deprecated
+  default T create(JobContext jobContext, ContainerContext containerContext) {
+    // adding this here so that new apps do not need to implement this
+    throw new UnsupportedOperationException("Please implement a version of create for the factory implementation.");
+  }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/e2adf8f9/samza-api/src/main/java/org/apache/samza/context/ApplicationTaskContextFactory.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/context/ApplicationTaskContextFactory.java b/samza-api/src/main/java/org/apache/samza/context/ApplicationTaskContextFactory.java
index 873a2d1..856aff4 100644
--- a/samza-api/src/main/java/org/apache/samza/context/ApplicationTaskContextFactory.java
+++ b/samza-api/src/main/java/org/apache/samza/context/ApplicationTaskContextFactory.java
@@ -19,6 +19,7 @@
 package org.apache.samza.context;
 
 import java.io.Serializable;
+import org.apache.samza.annotation.InterfaceStability;
 import org.apache.samza.application.SamzaApplication;
 import org.apache.samza.application.descriptors.ApplicationDescriptor;
 
@@ -35,17 +36,49 @@ import org.apache.samza.application.descriptors.ApplicationDescriptor;
  *
  * @param <T> concrete type of {@link ApplicationTaskContext} created by this factory
  */
+@InterfaceStability.Evolving
 public interface ApplicationTaskContextFactory<T extends ApplicationTaskContext> extends Serializable {
-
   /**
    * Creates an instance of the application-defined {@link ApplicationTaskContext}.
+   * <p>
+   * Applications should implement this to provide a context for task initialization.
+   *
+   * @param externalContext external context provided for the application; null if it was not provided
+   * @param jobContext framework-provided job context
+   * @param containerContext framework-provided container context
+   * @param taskContext framework-provided task context
+   * @param applicationContainerContext application-defined container context; null if it was not provided
+   * @return a new instance of the application-defined {@link ApplicationTaskContext}
+   */
+  default T create(ExternalContext externalContext, JobContext jobContext, ContainerContext containerContext,
+      TaskContext taskContext, ApplicationContainerContext applicationContainerContext) {
+    return create(jobContext, containerContext, taskContext, applicationContainerContext);
+  }
+
+  /**
+   * New implementations should not implement this directly. Implement
+   * {@link #create(ExternalContext, JobContext, ContainerContext, TaskContext, ApplicationContainerContext)} instead.
+   * <p>
+   * This is the same as
+   * {@link #create(ExternalContext, JobContext, ContainerContext, TaskContext, ApplicationContainerContext)}, except it
+   * does not provide access to external context.
+   * <p>
+   * This is being left here for backwards compatibility.
    *
    * @param jobContext framework-provided job context
    * @param containerContext framework-provided container context
    * @param taskContext framework-provided task context
-   * @param applicationContainerContext application-defined container context
+   * @param applicationContainerContext application-defined container context; null if it was not provided
    * @return a new instance of the application-defined {@link ApplicationTaskContext}
+   *
+   * Deprecated: Applications should implement
+   * {@link #create(ExternalContext, JobContext, ContainerContext, TaskContext, ApplicationContainerContext)} directly.
+   * This is being left here for backwards compatibility.
    */
-  T create(JobContext jobContext, ContainerContext containerContext, TaskContext taskContext,
-      ApplicationContainerContext applicationContainerContext);
+  @Deprecated
+  default T create(JobContext jobContext, ContainerContext containerContext, TaskContext taskContext,
+      ApplicationContainerContext applicationContainerContext) {
+    // adding this here so that new apps do not need to implement this
+    throw new UnsupportedOperationException("Please implement a version of create for the factory implementation.");
+  }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/e2adf8f9/samza-api/src/main/java/org/apache/samza/context/Context.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/context/Context.java b/samza-api/src/main/java/org/apache/samza/context/Context.java
index 44c8a05..8167e2a 100644
--- a/samza-api/src/main/java/org/apache/samza/context/Context.java
+++ b/samza-api/src/main/java/org/apache/samza/context/Context.java
@@ -19,6 +19,8 @@
 package org.apache.samza.context;
 
 import org.apache.samza.application.descriptors.ApplicationDescriptor;
+import org.apache.samza.runtime.ApplicationRunner;
+
 
 /**
  * A holder for all framework and application defined contexts at runtime.
@@ -76,4 +78,13 @@ public interface Context {
    * @throws IllegalStateException if no {@link ApplicationTaskContextFactory} was provided for the application
    */
   ApplicationTaskContext getApplicationTaskContext();
+
+  /**
+   * Gets the {@link ExternalContext} that was created outside of the application.
+   * <p>
+   * Use {@link ApplicationRunner#run(ExternalContext)} to provide this context.
+   *
+   * @return the external context provided for the application
+   */
+  ExternalContext getExternalContext();
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/e2adf8f9/samza-api/src/main/java/org/apache/samza/context/ExternalContext.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/context/ExternalContext.java b/samza-api/src/main/java/org/apache/samza/context/ExternalContext.java
new file mode 100644
index 0000000..15ecac8
--- /dev/null
+++ b/samza-api/src/main/java/org/apache/samza/context/ExternalContext.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.context;
+
+/**
+ * An {@link ExternalContext} can be used to pass components created and managed outside of Samza into a Samza
+ * application. This will be made accessible through the {@link Context}.
+ * <p>
+ * This is passed to {@link org.apache.samza.runtime.ApplicationRunner#run(ExternalContext)} and propagated down to the
+ * {@link Context} object provided to tasks.
+ * <p>
+ * {@link ExternalContext} can be used to inject objects that need to be created by other frameworks, such as Spring.
+ * <p>
+ * This is currently just a marker interface for the object passed into Samza.
+ */
+public interface ExternalContext {
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/e2adf8f9/samza-api/src/main/java/org/apache/samza/runtime/ApplicationRunner.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/runtime/ApplicationRunner.java b/samza-api/src/main/java/org/apache/samza/runtime/ApplicationRunner.java
index 59543c0..a7690c9 100644
--- a/samza-api/src/main/java/org/apache/samza/runtime/ApplicationRunner.java
+++ b/samza-api/src/main/java/org/apache/samza/runtime/ApplicationRunner.java
@@ -20,6 +20,7 @@ package org.apache.samza.runtime;
 
 import java.time.Duration;
 import org.apache.samza.annotation.InterfaceStability;
+import org.apache.samza.context.ExternalContext;
 import org.apache.samza.job.ApplicationStatus;
 
 
@@ -32,12 +33,19 @@ import org.apache.samza.job.ApplicationStatus;
  */
 @InterfaceStability.Evolving
 public interface ApplicationRunner {
+  /**
+   * This is like {@link #run(ExternalContext)}, except it provides a null {@link ExternalContext}.
+   */
+  default void run() {
+    run(null);
+  }
 
   /**
    * Deploy and run the Samza jobs to execute {@link org.apache.samza.application.SamzaApplication}.
    * It is non-blocking so it doesn't wait for the application running.
+   * @param externalContext nullable {@link ExternalContext} to pass through to the application
    */
-  void run();
+  void run(ExternalContext externalContext);
 
   /**
    * Kill the Samza jobs represented by {@link org.apache.samza.application.SamzaApplication}

http://git-wip-us.apache.org/repos/asf/samza/blob/e2adf8f9/samza-api/src/test/java/org/apache/samza/runtime/TestApplicationRunners.java
----------------------------------------------------------------------
diff --git a/samza-api/src/test/java/org/apache/samza/runtime/TestApplicationRunners.java b/samza-api/src/test/java/org/apache/samza/runtime/TestApplicationRunners.java
index 5829cf7..139ed69 100644
--- a/samza-api/src/test/java/org/apache/samza/runtime/TestApplicationRunners.java
+++ b/samza-api/src/test/java/org/apache/samza/runtime/TestApplicationRunners.java
@@ -25,6 +25,7 @@ import org.apache.samza.application.SamzaApplication;
 import org.apache.samza.application.StreamApplication;
 import org.apache.samza.config.Config;
 import org.apache.samza.config.MapConfig;
+import org.apache.samza.context.ExternalContext;
 import org.apache.samza.job.ApplicationStatus;
 import org.junit.Test;
 
@@ -60,7 +61,7 @@ public class TestApplicationRunners {
     }
 
     @Override
-    public void run() {
+    public void run(ExternalContext externalContext) {
 
     }
 

http://git-wip-us.apache.org/repos/asf/samza/blob/e2adf8f9/samza-core/src/main/java/org/apache/samza/context/ContextImpl.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/context/ContextImpl.java b/samza-core/src/main/java/org/apache/samza/context/ContextImpl.java
index 93c7eb1..8edd2bf 100644
--- a/samza-core/src/main/java/org/apache/samza/context/ContextImpl.java
+++ b/samza-core/src/main/java/org/apache/samza/context/ContextImpl.java
@@ -30,6 +30,7 @@ public class ContextImpl implements Context {
   private final TaskContext taskContext;
   private final Optional<ApplicationContainerContext> applicationContainerContextOptional;
   private final Optional<ApplicationTaskContext> applicationTaskContextOptional;
+  private final Optional<ExternalContext> externalContextOptional;
 
   /**
    * @param jobContext non-null job context
@@ -37,17 +38,18 @@ public class ContextImpl implements Context {
    * @param taskContext non-null framework task context
    * @param applicationContainerContextOptional optional application-defined container context
    * @param applicationTaskContextOptional optional application-defined task context
+   * @param externalContextOptional optional external context
    */
-  public ContextImpl(JobContext jobContext,
-      ContainerContext containerContext,
-      TaskContext taskContext,
+  public ContextImpl(JobContext jobContext, ContainerContext containerContext, TaskContext taskContext,
       Optional<ApplicationContainerContext> applicationContainerContextOptional,
-      Optional<ApplicationTaskContext> applicationTaskContextOptional) {
+      Optional<ApplicationTaskContext> applicationTaskContextOptional,
+      Optional<ExternalContext> externalContextOptional) {
     this.jobContext = Preconditions.checkNotNull(jobContext, "Job context can not be null");
     this.containerContext = Preconditions.checkNotNull(containerContext, "Container context can not be null");
     this.taskContext = Preconditions.checkNotNull(taskContext, "Task context can not be null");
     this.applicationContainerContextOptional = applicationContainerContextOptional;
     this.applicationTaskContextOptional = applicationTaskContextOptional;
+    this.externalContextOptional = externalContextOptional;
   }
 
   @Override
@@ -67,21 +69,25 @@ public class ContextImpl implements Context {
 
   @Override
   public ApplicationContainerContext getApplicationContainerContext() {
-    if (!this.applicationContainerContextOptional.isPresent()) {
-      throw new IllegalStateException("No application-defined container context exists");
-    }
+    Preconditions.checkState(this.applicationContainerContextOptional.isPresent(),
+        "No application-defined container context exists");
     return this.applicationContainerContextOptional.get();
   }
 
   @Override
   public ApplicationTaskContext getApplicationTaskContext() {
-    if (!this.applicationTaskContextOptional.isPresent()) {
-      throw new IllegalStateException("No application-defined task context exists");
-    }
+    Preconditions.checkState(this.applicationTaskContextOptional.isPresent(),
+        "No application-defined task context exists");
     return this.applicationTaskContextOptional.get();
   }
 
   @Override
+  public ExternalContext getExternalContext() {
+    Preconditions.checkState(this.externalContextOptional.isPresent(), "No external context exists");
+    return this.externalContextOptional.get();
+  }
+
+  @Override
   public boolean equals(Object o) {
     if (this == o) {
       return true;
@@ -93,12 +99,13 @@ public class ContextImpl implements Context {
     return Objects.equals(jobContext, context.jobContext) && Objects.equals(containerContext, context.containerContext)
         && Objects.equals(taskContext, context.taskContext) && Objects.equals(applicationContainerContextOptional,
         context.applicationContainerContextOptional) && Objects.equals(applicationTaskContextOptional,
-        context.applicationTaskContextOptional);
+        context.applicationTaskContextOptional) && Objects.equals(externalContextOptional,
+        context.externalContextOptional);
   }
 
   @Override
   public int hashCode() {
     return Objects.hash(jobContext, containerContext, taskContext, applicationContainerContextOptional,
-        applicationTaskContextOptional);
+        applicationTaskContextOptional, externalContextOptional);
   }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/e2adf8f9/samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java b/samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java
index 34d67cc..389dafd 100644
--- a/samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java
+++ b/samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java
@@ -40,6 +40,7 @@ import org.apache.samza.context.ApplicationContainerContext;
 import org.apache.samza.context.ApplicationContainerContextFactory;
 import org.apache.samza.context.ApplicationTaskContext;
 import org.apache.samza.context.ApplicationTaskContextFactory;
+import org.apache.samza.context.ExternalContext;
 import org.apache.samza.context.JobContextImpl;
 import org.apache.samza.coordinator.JobCoordinator;
 import org.apache.samza.coordinator.JobCoordinatorFactory;
@@ -115,6 +116,7 @@ public class StreamProcessor {
    * context object.
    */
   private final Optional<ApplicationTaskContextFactory<ApplicationTaskContext>> applicationDefinedTaskContextFactoryOptional;
+  private final Optional<ExternalContext> externalContextOptional;
   private final Map<String, MetricsReporter> customMetricsReporter;
   private final Config config;
   private final long taskShutdownMs;
@@ -169,7 +171,7 @@ public class StreamProcessor {
    * @param taskFactory task factory to instantiate the Task
    * @param processorListener listener to the StreamProcessor life cycle
    *
-   * Deprecated: Use {@link #StreamProcessor(Config, Map, TaskFactory, Optional, Optional,
+   * Deprecated: Use {@link #StreamProcessor(Config, Map, TaskFactory, Optional, Optional, Optional,
    * StreamProcessorLifecycleListenerFactory, JobCoordinator)} instead.
    */
   @Deprecated
@@ -179,7 +181,7 @@ public class StreamProcessor {
   }
 
   /**
-   * Same as {@link #StreamProcessor(Config, Map, TaskFactory, Optional, Optional,
+   * Same as {@link #StreamProcessor(Config, Map, TaskFactory, Optional, Optional, Optional,
    * StreamProcessorLifecycleListenerFactory, JobCoordinator)}, with the following differences:
    * <ol>
    *   <li>Passes null for application-defined context factories</li>
@@ -193,14 +195,14 @@ public class StreamProcessor {
    * @param processorListener listener to the StreamProcessor life cycle
    * @param jobCoordinator the instance of {@link JobCoordinator}
    *
-   * Deprecated: Use {@link #StreamProcessor(Config, Map, TaskFactory, Optional, Optional,
+   * Deprecated: Use {@link #StreamProcessor(Config, Map, TaskFactory, Optional, Optional, Optional,
    * StreamProcessorLifecycleListenerFactory, JobCoordinator)} instead.
    */
   @Deprecated
   public StreamProcessor(Config config, Map<String, MetricsReporter> customMetricsReporters, TaskFactory taskFactory,
       ProcessorLifecycleListener processorListener, JobCoordinator jobCoordinator) {
-    this(config, customMetricsReporters, taskFactory, Optional.empty(), Optional.empty(), sp -> processorListener,
-        jobCoordinator);
+    this(config, customMetricsReporters, taskFactory, Optional.empty(), Optional.empty(), Optional.empty(),
+        sp -> processorListener, jobCoordinator);
   }
 
   /**
@@ -211,19 +213,22 @@ public class StreamProcessor {
    * @param taskFactory task factory to instantiate the Task
    * @param applicationDefinedContainerContextFactoryOptional optional factory for application-defined container context
    * @param applicationDefinedTaskContextFactoryOptional optional factory for application-defined task context
+   * @param externalContextOptional optional {@link ExternalContext} to pass through to the application
    * @param listenerFactory factory for creating a listener to the StreamProcessor life cycle
    * @param jobCoordinator the instance of {@link JobCoordinator}
    */
   public StreamProcessor(Config config, Map<String, MetricsReporter> customMetricsReporters, TaskFactory taskFactory,
       Optional<ApplicationContainerContextFactory<ApplicationContainerContext>> applicationDefinedContainerContextFactoryOptional,
       Optional<ApplicationTaskContextFactory<ApplicationTaskContext>> applicationDefinedTaskContextFactoryOptional,
-      StreamProcessorLifecycleListenerFactory listenerFactory, JobCoordinator jobCoordinator) {
+      Optional<ExternalContext> externalContextOptional, StreamProcessorLifecycleListenerFactory listenerFactory,
+      JobCoordinator jobCoordinator) {
     Preconditions.checkNotNull(listenerFactory, "StreamProcessorListenerFactory cannot be null.");
     this.config = config;
     this.customMetricsReporter = customMetricsReporters;
     this.taskFactory = taskFactory;
     this.applicationDefinedContainerContextFactoryOptional = applicationDefinedContainerContextFactoryOptional;
     this.applicationDefinedTaskContextFactoryOptional = applicationDefinedTaskContextFactoryOptional;
+    this.externalContextOptional = externalContextOptional;
     this.taskShutdownMs = new TaskConfigJava(config).getShutdownMs();
     this.jobCoordinator = (jobCoordinator != null) ? jobCoordinator : createJobCoordinator();
     this.jobCoordinatorListener = createJobCoordinatorListener();
@@ -318,7 +323,7 @@ public class StreamProcessor {
         this.taskFactory, JobContextImpl.fromConfigWithDefaults(this.config),
         Option.apply(this.applicationDefinedContainerContextFactoryOptional.orElse(null)),
         Option.apply(this.applicationDefinedTaskContextFactoryOptional.orElse(null)),
-        null);
+        Option.apply(this.externalContextOptional.orElse(null)), null);
   }
 
   private JobCoordinator createJobCoordinator() {

http://git-wip-us.apache.org/repos/asf/samza/blob/e2adf8f9/samza-core/src/main/java/org/apache/samza/runtime/ApplicationRunnerMain.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/runtime/ApplicationRunnerMain.java b/samza-core/src/main/java/org/apache/samza/runtime/ApplicationRunnerMain.java
index 17a9dc1..fccde1c 100644
--- a/samza-core/src/main/java/org/apache/samza/runtime/ApplicationRunnerMain.java
+++ b/samza-core/src/main/java/org/apache/samza/runtime/ApplicationRunnerMain.java
@@ -26,7 +26,6 @@ import org.apache.samza.config.Config;
 import org.apache.samza.util.CommandLine;
 import org.apache.samza.util.Util;
 
-
 /**
  * This class contains the main() method used by run-app.sh.
  * It creates the {@link ApplicationRunner} based on the config, and then run the application.
@@ -59,7 +58,7 @@ public class ApplicationRunnerMain {
 
     switch (op) {
       case RUN:
-        appRunner.run();
+        appRunner.run(null);
         break;
       case KILL:
         appRunner.kill();

http://git-wip-us.apache.org/repos/asf/samza/blob/e2adf8f9/samza-core/src/main/java/org/apache/samza/runtime/LocalApplicationRunner.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/runtime/LocalApplicationRunner.java b/samza-core/src/main/java/org/apache/samza/runtime/LocalApplicationRunner.java
index 85306ef..189fc1f 100644
--- a/samza-core/src/main/java/org/apache/samza/runtime/LocalApplicationRunner.java
+++ b/samza-core/src/main/java/org/apache/samza/runtime/LocalApplicationRunner.java
@@ -25,6 +25,7 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CountDownLatch;
@@ -39,6 +40,7 @@ import org.apache.samza.application.SamzaApplication;
 import org.apache.samza.config.ApplicationConfig;
 import org.apache.samza.config.Config;
 import org.apache.samza.config.JobConfig;
+import org.apache.samza.context.ExternalContext;
 import org.apache.samza.execution.LocalJobPlanner;
 import org.apache.samza.job.ApplicationStatus;
 import org.apache.samza.metrics.MetricsReporter;
@@ -85,7 +87,7 @@ public class LocalApplicationRunner implements ApplicationRunner {
   }
 
   @Override
-  public void run() {
+  public void run(ExternalContext externalContext) {
     try {
       List<JobConfig> jobConfigs = planner.prepareJobs();
 
@@ -96,7 +98,7 @@ public class LocalApplicationRunner implements ApplicationRunner {
       jobConfigs.forEach(jobConfig -> {
           LOG.debug("Starting job {} StreamProcessor with config {}", jobConfig.getName(), jobConfig);
           StreamProcessor processor = createStreamProcessor(jobConfig, appDesc,
-              sp -> new LocalStreamProcessorLifecycleListener(sp, jobConfig));
+              sp -> new LocalStreamProcessorLifecycleListener(sp, jobConfig), Optional.ofNullable(externalContext));
           processors.add(processor);
         });
       numProcessorsToStart.set(processors.size());
@@ -161,14 +163,15 @@ public class LocalApplicationRunner implements ApplicationRunner {
 
   @VisibleForTesting
   StreamProcessor createStreamProcessor(Config config, ApplicationDescriptorImpl<? extends ApplicationDescriptor> appDesc,
-      StreamProcessor.StreamProcessorLifecycleListenerFactory listenerFactory) {
+      StreamProcessor.StreamProcessorLifecycleListenerFactory listenerFactory,
+      Optional<ExternalContext> externalContextOptional) {
     TaskFactory taskFactory = TaskFactoryUtil.getTaskFactory(appDesc);
     Map<String, MetricsReporter> reporters = new HashMap<>();
     // TODO: the null processorId has to be fixed after SAMZA-1835
     appDesc.getMetricsReporterFactories().forEach((name, factory) ->
         reporters.put(name, factory.getMetricsReporter(name, null, config)));
     return new StreamProcessor(config, reporters, taskFactory, appDesc.getApplicationContainerContextFactory(),
-        appDesc.getApplicationTaskContextFactory(), listenerFactory, null);
+        appDesc.getApplicationTaskContextFactory(), externalContextOptional, listenerFactory, null);
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/samza/blob/e2adf8f9/samza-core/src/main/java/org/apache/samza/runtime/LocalContainerRunner.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/runtime/LocalContainerRunner.java b/samza-core/src/main/java/org/apache/samza/runtime/LocalContainerRunner.java
index a5a45ba..7e71b00 100644
--- a/samza-core/src/main/java/org/apache/samza/runtime/LocalContainerRunner.java
+++ b/samza-core/src/main/java/org/apache/samza/runtime/LocalContainerRunner.java
@@ -19,28 +19,24 @@
 
 package org.apache.samza.runtime;
 
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Random;
-
-import org.apache.samza.container.ContainerHeartbeatClient;
-import org.apache.samza.container.ContainerHeartbeatMonitor;
-import org.apache.samza.container.LocalityManager;
-import org.apache.samza.container.SamzaContainer;
-import org.apache.samza.container.SamzaContainer$;
-import org.apache.samza.container.SamzaContainerListener;
-import org.apache.samza.metrics.MetricsRegistryMap;
-import org.slf4j.MDC;
 import org.apache.samza.SamzaException;
+import org.apache.samza.application.ApplicationUtil;
 import org.apache.samza.application.descriptors.ApplicationDescriptor;
 import org.apache.samza.application.descriptors.ApplicationDescriptorImpl;
 import org.apache.samza.application.descriptors.ApplicationDescriptorUtil;
-import org.apache.samza.application.ApplicationUtil;
 import org.apache.samza.config.Config;
 import org.apache.samza.config.JobConfig;
 import org.apache.samza.config.ShellCommandConfig;
+import org.apache.samza.container.ContainerHeartbeatClient;
+import org.apache.samza.container.ContainerHeartbeatMonitor;
+import org.apache.samza.container.LocalityManager;
+import org.apache.samza.container.SamzaContainer;
+import org.apache.samza.container.SamzaContainer$;
+import org.apache.samza.container.SamzaContainerListener;
+import org.apache.samza.context.ExternalContext;
 import org.apache.samza.context.JobContextImpl;
 import org.apache.samza.job.model.JobModel;
+import org.apache.samza.metrics.MetricsRegistryMap;
 import org.apache.samza.metrics.MetricsReporter;
 import org.apache.samza.task.TaskFactory;
 import org.apache.samza.task.TaskFactoryUtil;
@@ -48,7 +44,12 @@ import org.apache.samza.util.SamzaUncaughtExceptionHandler;
 import org.apache.samza.util.ScalaJavaUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import org.slf4j.MDC;
 import scala.Option;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Random;
 
 /**
  * Launches and manages the lifecycle for {@link SamzaContainer}s in YARN.
@@ -87,13 +88,13 @@ public class LocalContainerRunner {
 
     ApplicationDescriptorImpl<? extends ApplicationDescriptor> appDesc =
         ApplicationDescriptorUtil.getAppDescriptor(ApplicationUtil.fromConfig(config), config);
-    run(appDesc, containerId, jobModel, config);
+    run(appDesc, containerId, jobModel, config, buildExternalContext(config));
 
     System.exit(0);
   }
 
   private static void run(ApplicationDescriptorImpl<? extends ApplicationDescriptor> appDesc, String containerId,
-      JobModel jobModel, Config config) {
+      JobModel jobModel, Config config, Optional<ExternalContext> externalContextOptional) {
     TaskFactory taskFactory = TaskFactoryUtil.getTaskFactory(appDesc);
     LocalityManager localityManager = new LocalityManager(config, new MetricsRegistryMap());
     SamzaContainer container = SamzaContainer$.MODULE$.apply(
@@ -104,7 +105,7 @@ public class LocalContainerRunner {
         JobContextImpl.fromConfigWithDefaults(config),
         Option.apply(appDesc.getApplicationContainerContextFactory().orElse(null)),
         Option.apply(appDesc.getApplicationTaskContextFactory().orElse(null)),
-        localityManager);
+        Option.apply(externalContextOptional.orElse(null)), localityManager);
 
     ProcessorLifecycleListener listener = appDesc.getProcessorLifecycleListenerFactory()
         .createInstance(new ProcessorContext() { }, config);
@@ -153,6 +154,15 @@ public class LocalContainerRunner {
     }
   }
 
+  private static Optional<ExternalContext> buildExternalContext(Config config) {
+    /*
+     * By default, use an empty ExternalContext here. In a custom fork of Samza, this can be implemented to pass
+     * a non-empty ExternalContext to SamzaContainer. Only config should be used to build the external context. In the
+     * future, components like the application descriptor may not be available to LocalContainerRunner.
+     */
+    return Optional.empty();
+  }
+
   // TODO: this is going away when SAMZA-1168 is done and the initialization of metrics reporters are done via
   // LocalApplicationRunner#createStreamProcessor()
   private static Map<String, MetricsReporter> loadMetricsReporters(

http://git-wip-us.apache.org/repos/asf/samza/blob/e2adf8f9/samza-core/src/main/java/org/apache/samza/runtime/RemoteApplicationRunner.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/runtime/RemoteApplicationRunner.java b/samza-core/src/main/java/org/apache/samza/runtime/RemoteApplicationRunner.java
index 1b38c9b..da4fab2 100644
--- a/samza-core/src/main/java/org/apache/samza/runtime/RemoteApplicationRunner.java
+++ b/samza-core/src/main/java/org/apache/samza/runtime/RemoteApplicationRunner.java
@@ -28,6 +28,7 @@ import org.apache.samza.application.descriptors.ApplicationDescriptorUtil;
 import org.apache.samza.application.SamzaApplication;
 import org.apache.samza.config.Config;
 import org.apache.samza.config.JobConfig;
+import org.apache.samza.context.ExternalContext;
 import org.apache.samza.execution.RemoteJobPlanner;
 import org.apache.samza.job.ApplicationStatus;
 import org.apache.samza.job.JobRunner;
@@ -60,7 +61,7 @@ public class RemoteApplicationRunner implements ApplicationRunner {
   }
 
   @Override
-  public void run() {
+  public void run(ExternalContext externalContext) {
     try {
       List<JobConfig> jobConfigs = planner.prepareJobs();
       if (jobConfigs.isEmpty()) {

http://git-wip-us.apache.org/repos/asf/samza/blob/e2adf8f9/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
index 865658f..e163d7d 100644
--- a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
+++ b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
@@ -128,6 +128,7 @@ object SamzaContainer extends Logging {
     jobContext: JobContext,
     applicationContainerContextFactoryOption: Option[ApplicationContainerContextFactory[ApplicationContainerContext]],
     applicationTaskContextFactoryOption: Option[ApplicationTaskContextFactory[ApplicationTaskContext]],
+    externalContextOption: Option[ExternalContext],
     localityManager: LocalityManager = null) = {
     val config = jobContext.getConfig
     val containerModel = jobModel.getContainers.get(containerId)
@@ -489,7 +490,7 @@ object SamzaContainer extends Logging {
 
     val containerContext = new ContainerContextImpl(containerModel, samzaContainerMetrics.registry)
     val applicationContainerContextOption = applicationContainerContextFactoryOption
-      .map(_.create(jobContext, containerContext))
+      .map(_.create(externalContextOption.orNull, jobContext, containerContext))
 
     val storeWatchPaths = new util.HashSet[Path]()
 
@@ -668,7 +669,8 @@ object SamzaContainer extends Logging {
           jobContext = jobContext,
           containerContext = containerContext,
           applicationContainerContextOption = applicationContainerContextOption,
-          applicationTaskContextFactoryOption = applicationTaskContextFactoryOption)
+          applicationTaskContextFactoryOption = applicationTaskContextFactoryOption,
+          externalContextOption = externalContextOption)
 
       val taskInstance = createTaskInstance(task)
 
@@ -747,6 +749,7 @@ object SamzaContainer extends Logging {
       timerExecutor = timerExecutor,
       containerContext = containerContext,
       applicationContainerContextOption = applicationContainerContextOption,
+      externalContextOption = externalContextOption,
       containerStorageManager = containerStorageManager)
   }
 
@@ -783,6 +786,7 @@ class SamzaContainer(
   timerExecutor: ScheduledExecutorService = Executors.newSingleThreadScheduledExecutor,
   containerContext: ContainerContext,
   applicationContainerContextOption: Option[ApplicationContainerContext],
+  externalContextOption: Option[ExternalContext],
   containerStorageManager: ContainerStorageManager) extends Runnable with Logging {
 
   val shutdownMs = config.getShutdownMs.getOrElse(TaskConfigJava.DEFAULT_TASK_SHUTDOWN_MS)

http://git-wip-us.apache.org/repos/asf/samza/blob/e2adf8f9/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala b/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala
index 53e5af7..ef89581 100644
--- a/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala
+++ b/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala
@@ -63,7 +63,8 @@ class TaskInstance(
   jobContext: JobContext,
   containerContext: ContainerContext,
   applicationContainerContextOption: Option[ApplicationContainerContext],
-  applicationTaskContextFactoryOption: Option[ApplicationTaskContextFactory[ApplicationTaskContext]]
+  applicationTaskContextFactoryOption: Option[ApplicationTaskContextFactory[ApplicationTaskContext]],
+  externalContextOption: Option[ExternalContext]
 ) extends Logging {
 
   val taskName: TaskName = taskModel.getTaskName
@@ -88,11 +89,12 @@ class TaskInstance(
   private val taskContext = new TaskContextImpl(taskModel, metrics.registry, kvStoreSupplier, tableManager,
     new CallbackSchedulerImpl(epochTimeScheduler), offsetManager, jobModel, streamMetadataCache)
   // need separate field for this instead of using it through Context, since Context throws an exception if it is null
-  private val applicationTaskContextOption = applicationTaskContextFactoryOption.map(_.create(jobContext,
-    containerContext, taskContext, applicationContainerContextOption.orNull))
+  private val applicationTaskContextOption = applicationTaskContextFactoryOption
+    .map(_.create(externalContextOption.orNull, jobContext, containerContext, taskContext,
+      applicationContainerContextOption.orNull))
   val context = new ContextImpl(jobContext, containerContext, taskContext,
     Optional.ofNullable(applicationContainerContextOption.orNull),
-    Optional.ofNullable(applicationTaskContextOption.orNull))
+    Optional.ofNullable(applicationTaskContextOption.orNull), Optional.ofNullable(externalContextOption.orNull))
 
   // store the (ssp -> if this ssp has caught up) mapping. "caught up"
   // means the same ssp in other taskInstances have the same offset as

http://git-wip-us.apache.org/repos/asf/samza/blob/e2adf8f9/samza-core/src/main/scala/org/apache/samza/job/local/ThreadJobFactory.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/job/local/ThreadJobFactory.scala b/samza-core/src/main/scala/org/apache/samza/job/local/ThreadJobFactory.scala
index e4a7838..f505f22 100644
--- a/samza-core/src/main/scala/org/apache/samza/job/local/ThreadJobFactory.scala
+++ b/samza-core/src/main/scala/org/apache/samza/job/local/ThreadJobFactory.scala
@@ -25,7 +25,7 @@ import org.apache.samza.config.JobConfig._
 import org.apache.samza.config.ShellCommandConfig._
 import org.apache.samza.config.{Config, JobConfig, TaskConfigJava}
 import org.apache.samza.container.{SamzaContainer, SamzaContainerListener, TaskName}
-import org.apache.samza.context.JobContextImpl
+import org.apache.samza.context.{ExternalContext, JobContextImpl}
 import org.apache.samza.coordinator.JobModelManager
 import org.apache.samza.coordinator.stream.CoordinatorStreamManager
 import org.apache.samza.job.{StreamJob, StreamJobFactory}
@@ -121,7 +121,8 @@ class ThreadJobFactory extends StreamJobFactory with Logging {
         taskFactory,
         JobContextImpl.fromConfigWithDefaults(config),
         Option(appDesc.getApplicationContainerContextFactory.orElse(null)),
-        Option(appDesc.getApplicationTaskContextFactory.orElse(null))
+        Option(appDesc.getApplicationTaskContextFactory.orElse(null)),
+        buildExternalContext(config)
       )
       container.setContainerListener(containerListener)
 
@@ -135,4 +136,13 @@ class ThreadJobFactory extends StreamJobFactory with Logging {
       }
     }
   }
+
+  private def buildExternalContext(config: Config): Option[ExternalContext] = {
+    /*
+     * By default, use an empty ExternalContext here. In a custom fork of Samza, this can be implemented to pass
+     * a non-empty ExternalContext to SamzaContainer. Only config should be used to build the external context. In the
+     * future, components like the application descriptor may not be available.
+     */
+    None
+  }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/e2adf8f9/samza-core/src/test/java/org/apache/samza/context/MockContext.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/context/MockContext.java b/samza-core/src/test/java/org/apache/samza/context/MockContext.java
index fdab692..2ece9f1 100644
--- a/samza-core/src/test/java/org/apache/samza/context/MockContext.java
+++ b/samza-core/src/test/java/org/apache/samza/context/MockContext.java
@@ -36,6 +36,7 @@ public class MockContext implements Context {
   private final TaskContextImpl taskContext = mock(TaskContextImpl.class);
   private final ApplicationContainerContext applicationContainerContext = mock(ApplicationContainerContext.class);
   private final ApplicationTaskContext applicationTaskContext = mock(ApplicationTaskContext.class);
+  private final ExternalContext externalContext = mock(ExternalContext.class);
 
   public MockContext() {
     this(new MapConfig(
@@ -52,26 +53,31 @@ public class MockContext implements Context {
 
   @Override
   public JobContext getJobContext() {
-    return jobContext;
+    return this.jobContext;
   }
 
   @Override
   public ContainerContext getContainerContext() {
-    return containerContext;
+    return this.containerContext;
   }
 
   @Override
   public TaskContext getTaskContext() {
-    return taskContext;
+    return this.taskContext;
   }
 
   @Override
   public ApplicationContainerContext getApplicationContainerContext() {
-    return applicationContainerContext;
+    return this.applicationContainerContext;
   }
 
   @Override
   public ApplicationTaskContext getApplicationTaskContext() {
-    return applicationTaskContext;
+    return this.applicationTaskContext;
+  }
+
+  @Override
+  public ExternalContext getExternalContext() {
+    return this.externalContext;
   }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/e2adf8f9/samza-core/src/test/java/org/apache/samza/context/TestContextImpl.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/context/TestContextImpl.java b/samza-core/src/test/java/org/apache/samza/context/TestContextImpl.java
index 40526db..7ae81dd 100644
--- a/samza-core/src/test/java/org/apache/samza/context/TestContextImpl.java
+++ b/samza-core/src/test/java/org/apache/samza/context/TestContextImpl.java
@@ -31,7 +31,7 @@ public class TestContextImpl {
    */
   @Test
   public void testGetApplicationContainerContext() {
-    MockApplicationContainerContext applicationContainerContext = new MockApplicationContainerContext();
+    ApplicationContainerContext applicationContainerContext = mock(ApplicationContainerContext.class);
     Context context = buildWithApplicationContainerContext(applicationContainerContext);
     assertEquals(applicationContainerContext, context.getApplicationContainerContext());
   }
@@ -50,7 +50,7 @@ public class TestContextImpl {
    */
   @Test
   public void testGetApplicationTaskContext() {
-    MockApplicationTaskContext applicationTaskContext = new MockApplicationTaskContext();
+    ApplicationTaskContext applicationTaskContext = mock(ApplicationTaskContext.class);
     Context context = buildWithApplicationTaskContext(applicationTaskContext);
     assertEquals(applicationTaskContext, context.getApplicationTaskContext());
   }
@@ -64,43 +64,44 @@ public class TestContextImpl {
     context.getApplicationTaskContext();
   }
 
-  private static Context buildWithApplicationContainerContext(ApplicationContainerContext applicationContainerContext) {
-    return buildWithApplicationContext(applicationContainerContext, mock(ApplicationTaskContext.class));
+  /**
+   * Given a concrete context, getExternalContext should return it.
+   */
+  @Test
+  public void testGetExternalContext() {
+    ExternalContext externalContext = mock(ExternalContext.class);
+    Context context = buildWithExternalContext(externalContext);
+    assertEquals(externalContext, context.getExternalContext());
   }
 
-  private static Context buildWithApplicationTaskContext(ApplicationTaskContext applicationTaskContext) {
-    return buildWithApplicationContext(mock(ApplicationContainerContext.class), applicationTaskContext);
+  /**
+   * Given no concrete context, getExternalContext should throw an exception.
+   */
+  @Test(expected = IllegalStateException.class)
+  public void testGetMissingExternalContext() {
+    Context context = buildWithExternalContext(null);
+    context.getExternalContext();
   }
 
-  private static Context buildWithApplicationContext(ApplicationContainerContext applicationContainerContext,
-      ApplicationTaskContext applicationTaskContext) {
-    return new ContextImpl(mock(JobContext.class), mock(ContainerContext.class), mock(TaskContext.class),
-        Optional.ofNullable(applicationContainerContext), Optional.ofNullable(applicationTaskContext));
+  private static Context buildWithApplicationContainerContext(ApplicationContainerContext applicationContainerContext) {
+    return buildWithApplicationContext(applicationContainerContext, mock(ApplicationTaskContext.class),
+        mock(ExternalContext.class));
   }
 
-  /**
-   * Simple empty implementation for testing.
-   */
-  private class MockApplicationContainerContext implements ApplicationContainerContext {
-    @Override
-    public void start() {
-    }
-
-    @Override
-    public void stop() {
-    }
+  private static Context buildWithApplicationTaskContext(ApplicationTaskContext applicationTaskContext) {
+    return buildWithApplicationContext(mock(ApplicationContainerContext.class), applicationTaskContext,
+        mock(ExternalContext.class));
   }
 
-  /**
-   * Simple empty implementation for testing.
-   */
-  private class MockApplicationTaskContext implements ApplicationTaskContext {
-    @Override
-    public void start() {
-    }
+  private static Context buildWithExternalContext(ExternalContext externalContext) {
+    return buildWithApplicationContext(mock(ApplicationContainerContext.class), mock(ApplicationTaskContext.class),
+        externalContext);
+  }
 
-    @Override
-    public void stop() {
-    }
+  private static Context buildWithApplicationContext(ApplicationContainerContext applicationContainerContext,
+      ApplicationTaskContext applicationTaskContext, ExternalContext externalContext) {
+    return new ContextImpl(mock(JobContext.class), mock(ContainerContext.class), mock(TaskContext.class),
+        Optional.ofNullable(applicationContainerContext), Optional.ofNullable(applicationTaskContext),
+        Optional.ofNullable(externalContext));
   }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/samza/blob/e2adf8f9/samza-core/src/test/java/org/apache/samza/processor/TestStreamProcessor.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/processor/TestStreamProcessor.java b/samza-core/src/test/java/org/apache/samza/processor/TestStreamProcessor.java
index b002e2a..1c33b96 100644
--- a/samza-core/src/test/java/org/apache/samza/processor/TestStreamProcessor.java
+++ b/samza-core/src/test/java/org/apache/samza/processor/TestStreamProcessor.java
@@ -21,6 +21,7 @@ package org.apache.samza.processor;
 import com.google.common.collect.ImmutableMap;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.Optional;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.CountDownLatch;
@@ -467,7 +468,8 @@ public class TestStreamProcessor {
   public void testStreamProcessorWithStreamProcessorListenerFactory() {
     AtomicReference<MockStreamProcessorLifecycleListener> mockListener = new AtomicReference<>();
     StreamProcessor streamProcessor =
-        new StreamProcessor(mock(Config.class), new HashMap<>(), mock(TaskFactory.class), null, null,
+        new StreamProcessor(mock(Config.class), new HashMap<>(), mock(TaskFactory.class), Optional.empty(),
+            Optional.empty(), Optional.empty(),
             sp -> mockListener.updateAndGet(old -> new MockStreamProcessorLifecycleListener(sp)),
             mock(JobCoordinator.class));
     assertEquals(streamProcessor, mockListener.get().processor);

http://git-wip-us.apache.org/repos/asf/samza/blob/e2adf8f9/samza-core/src/test/java/org/apache/samza/runtime/TestApplicationRunnerMain.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/runtime/TestApplicationRunnerMain.java b/samza-core/src/test/java/org/apache/samza/runtime/TestApplicationRunnerMain.java
index cfa2680..e3e8eea 100644
--- a/samza-core/src/test/java/org/apache/samza/runtime/TestApplicationRunnerMain.java
+++ b/samza-core/src/test/java/org/apache/samza/runtime/TestApplicationRunnerMain.java
@@ -23,6 +23,7 @@ import org.apache.samza.application.SamzaApplication;
 import org.apache.samza.application.MockStreamApplication;
 import org.apache.samza.config.ApplicationConfig;
 import org.apache.samza.config.Config;
+import org.apache.samza.context.ExternalContext;
 import org.apache.samza.job.ApplicationStatus;
 import org.junit.Test;
 
@@ -87,7 +88,7 @@ public class TestApplicationRunnerMain {
     }
 
     @Override
-    public void run() {
+    public void run(ExternalContext externalContext) {
       runCount++;
     }
 

http://git-wip-us.apache.org/repos/asf/samza/blob/e2adf8f9/samza-core/src/test/java/org/apache/samza/runtime/TestLocalApplicationRunner.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/runtime/TestLocalApplicationRunner.java b/samza-core/src/test/java/org/apache/samza/runtime/TestLocalApplicationRunner.java
index 1b93072..5bd7893 100644
--- a/samza-core/src/test/java/org/apache/samza/runtime/TestLocalApplicationRunner.java
+++ b/samza-core/src/test/java/org/apache/samza/runtime/TestLocalApplicationRunner.java
@@ -23,6 +23,7 @@ import java.time.Duration;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.Optional;
 import org.apache.samza.application.descriptors.ApplicationDescriptor;
 import org.apache.samza.application.descriptors.ApplicationDescriptorImpl;
 import org.apache.samza.application.LegacyTaskApplication;
@@ -33,6 +34,7 @@ import org.apache.samza.config.ApplicationConfig;
 import org.apache.samza.config.Config;
 import org.apache.samza.config.JobConfig;
 import org.apache.samza.config.MapConfig;
+import org.apache.samza.context.ExternalContext;
 import org.apache.samza.job.ApplicationStatus;
 import org.apache.samza.processor.StreamProcessor;
 import org.apache.samza.execution.LocalJobPlanner;
@@ -45,7 +47,7 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
-import static org.mockito.Matchers.anyObject;
+import static org.mockito.Matchers.*;
 import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.mock;
@@ -67,8 +69,7 @@ public class TestLocalApplicationRunner {
   }
 
   @Test
-  public void testRunStreamTask()
-      throws Exception {
+  public void testRunStreamTask() {
     final Map<String, String> cfgs = new HashMap<>();
     cfgs.put(ApplicationConfig.APP_PROCESSOR_ID_GENERATOR_CLASS, UUIDGenerator.class.getName());
     cfgs.put(JobConfig.JOB_NAME(), "test-task-job");
@@ -90,7 +91,40 @@ public class TestLocalApplicationRunner {
         return null;
       }).when(sp).start();
 
-    doReturn(sp).when(runner).createStreamProcessor(anyObject(), anyObject(), captor.capture());
+    ExternalContext externalContext = mock(ExternalContext.class);
+    doReturn(sp).when(runner)
+        .createStreamProcessor(anyObject(), anyObject(), captor.capture(), eq(Optional.of(externalContext)));
+    doReturn(ApplicationStatus.SuccessfulFinish).when(runner).status();
+
+    runner.run(externalContext);
+
+    assertEquals(ApplicationStatus.SuccessfulFinish, runner.status());
+  }
+
+  @Test
+  public void testRunStreamTaskWithoutExternalContext() {
+    final Map<String, String> cfgs = new HashMap<>();
+    cfgs.put(ApplicationConfig.APP_PROCESSOR_ID_GENERATOR_CLASS, UUIDGenerator.class.getName());
+    cfgs.put(JobConfig.JOB_NAME(), "test-task-job");
+    cfgs.put(JobConfig.JOB_ID(), "jobId");
+    config = new MapConfig(cfgs);
+    mockApp = new LegacyTaskApplication(IdentityStreamTask.class.getName());
+    prepareTest();
+
+    StreamProcessor sp = mock(StreamProcessor.class);
+
+    ArgumentCaptor<StreamProcessor.StreamProcessorLifecycleListenerFactory> captor =
+        ArgumentCaptor.forClass(StreamProcessor.StreamProcessorLifecycleListenerFactory.class);
+
+    doAnswer(i ->
+      {
+        ProcessorLifecycleListener listener = captor.getValue().createInstance(sp);
+        listener.afterStart();
+        listener.afterStop();
+        return null;
+      }).when(sp).start();
+
+    doReturn(sp).when(runner).createStreamProcessor(anyObject(), anyObject(), captor.capture(), eq(Optional.empty()));
     doReturn(ApplicationStatus.SuccessfulFinish).when(runner).status();
 
     runner.run();
@@ -99,8 +133,7 @@ public class TestLocalApplicationRunner {
   }
 
   @Test
-  public void testRunComplete()
-      throws Exception {
+  public void testRunComplete() {
     Map<String, String> cfgs = new HashMap<>();
     cfgs.put(ApplicationConfig.APP_PROCESSOR_ID_GENERATOR_CLASS, UUIDGenerator.class.getName());
     config = new MapConfig(cfgs);
@@ -125,17 +158,18 @@ public class TestLocalApplicationRunner {
         return null;
       }).when(sp).start();
 
-    doReturn(sp).when(runner).createStreamProcessor(anyObject(), anyObject(), captor.capture());
+    ExternalContext externalContext = mock(ExternalContext.class);
+    doReturn(sp).when(runner)
+        .createStreamProcessor(anyObject(), anyObject(), captor.capture(), eq(Optional.of(externalContext)));
 
-    runner.run();
+    runner.run(externalContext);
     runner.waitForFinish();
 
     assertEquals(runner.status(), ApplicationStatus.SuccessfulFinish);
   }
 
   @Test
-  public void testRunFailure()
-      throws Exception {
+  public void testRunFailure() {
     Map<String, String> cfgs = new HashMap<>();
     cfgs.put(ApplicationConfig.PROCESSOR_ID, "0");
     config = new MapConfig(cfgs);
@@ -157,10 +191,12 @@ public class TestLocalApplicationRunner {
         throw new Exception("test failure");
       }).when(sp).start();
 
-    doReturn(sp).when(runner).createStreamProcessor(anyObject(), anyObject(), captor.capture());
+    ExternalContext externalContext = mock(ExternalContext.class);
+    doReturn(sp).when(runner)
+        .createStreamProcessor(anyObject(), anyObject(), captor.capture(), eq(Optional.of(externalContext)));
 
     try {
-      runner.run();
+      runner.run(externalContext);
       runner.waitForFinish();
     } catch (Throwable th) {
       assertNotNull(th);

http://git-wip-us.apache.org/repos/asf/samza/blob/e2adf8f9/samza-core/src/test/java/org/apache/samza/task/TestAsyncRunLoop.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/task/TestAsyncRunLoop.java b/samza-core/src/test/java/org/apache/samza/task/TestAsyncRunLoop.java
index be8d344..f8c40dc 100644
--- a/samza-core/src/test/java/org/apache/samza/task/TestAsyncRunLoop.java
+++ b/samza-core/src/test/java/org/apache/samza/task/TestAsyncRunLoop.java
@@ -108,6 +108,7 @@ public class TestAsyncRunLoop {
         mock(JobContext.class),
         mock(ContainerContext.class),
         Option.apply(null),
+        Option.apply(null),
         Option.apply(null));
   }
 

http://git-wip-us.apache.org/repos/asf/samza/blob/e2adf8f9/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala b/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala
index 49a4e84..d1f60bc 100644
--- a/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala
+++ b/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala
@@ -64,12 +64,13 @@ class TestSamzaContainer extends AssertionsForJUnit with MockitoSugar {
   @Mock
   private var metrics: SamzaContainerMetrics = null
   @Mock
+  private var localityManager: LocalityManager = null
+  @Mock
   private var containerContext: ContainerContext = null
   @Mock
   private var applicationContainerContext: ApplicationContainerContext = null
   @Mock
   private var samzaContainerListener: SamzaContainerListener = null
-
   @Mock
   private var containerStorageManager: ContainerStorageManager = null
 
@@ -264,28 +265,15 @@ class TestSamzaContainer extends AssertionsForJUnit with MockitoSugar {
 
   @Test
   def testStoreContainerLocality():Unit = {
-    val localityManager: LocalityManager = Mockito.mock[LocalityManager](classOf[LocalityManager])
-    val containerContext: ContainerContext = Mockito.mock[ContainerContext](classOf[ContainerContext])
+    this.config = new MapConfig(Map(ClusterManagerConfig.JOB_HOST_AFFINITY_ENABLED -> "true"))
+    setupSamzaContainer(None) // re-init with an actual config
     val containerModel: ContainerModel = Mockito.mock[ContainerModel](classOf[ContainerModel])
     val testContainerId = "1"
     Mockito.when(containerModel.getId).thenReturn(testContainerId)
-    Mockito.when(containerContext.getContainerModel).thenReturn(containerModel)
+    Mockito.when(this.containerContext.getContainerModel).thenReturn(containerModel)
 
-    val samzaContainer: SamzaContainer = new SamzaContainer(
-      new MapConfig(Map(ClusterManagerConfig.JOB_HOST_AFFINITY_ENABLED -> "true")),
-      Map(TASK_NAME -> this.taskInstance),
-      this.runLoop,
-      this.systemAdmins,
-      this.consumerMultiplexer,
-      this.producerMultiplexer,
-      metrics,
-      containerContext = containerContext,
-      applicationContainerContextOption = null,
-      localityManager = localityManager,
-      containerStorageManager = Mockito.mock(classOf[ContainerStorageManager]))
-
-    samzaContainer.storeContainerLocality
-    Mockito.verify(localityManager).writeContainerToHostMapping(any(), any())
+    this.samzaContainer.storeContainerLocality
+    Mockito.verify(this.localityManager).writeContainerToHostMapping(any(), any())
   }
 
   private def setupSamzaContainer(applicationContainerContext: Option[ApplicationContainerContext]) {
@@ -296,9 +284,12 @@ class TestSamzaContainer extends AssertionsForJUnit with MockitoSugar {
       this.systemAdmins,
       this.consumerMultiplexer,
       this.producerMultiplexer,
-      metrics,
+      this.metrics,
+      localityManager = this.localityManager,
       containerContext = this.containerContext,
-      applicationContainerContextOption = applicationContainerContext, containerStorageManager = containerStorageManager)
+      applicationContainerContextOption = applicationContainerContext,
+      externalContextOption = None,
+      containerStorageManager = containerStorageManager)
     this.samzaContainer.setContainerListener(this.samzaContainerListener)
   }
 

http://git-wip-us.apache.org/repos/asf/samza/blob/e2adf8f9/samza-core/src/test/scala/org/apache/samza/container/TestTaskInstance.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/test/scala/org/apache/samza/container/TestTaskInstance.scala b/samza-core/src/test/scala/org/apache/samza/container/TestTaskInstance.scala
index 15534cd..97dc886 100644
--- a/samza-core/src/test/scala/org/apache/samza/container/TestTaskInstance.scala
+++ b/samza-core/src/test/scala/org/apache/samza/container/TestTaskInstance.scala
@@ -34,7 +34,7 @@ import org.mockito.Matchers._
 import org.mockito.Mockito._
 import org.mockito.invocation.InvocationOnMock
 import org.mockito.stubbing.Answer
-import org.mockito.{Matchers, Mock, MockitoAnnotations}
+import org.mockito.{ArgumentCaptor, Matchers, Mock, MockitoAnnotations}
 import org.scalatest.junit.AssertionsForJUnit
 import org.scalatest.mockito.MockitoSugar
 
@@ -77,6 +77,8 @@ class TestTaskInstance extends AssertionsForJUnit with MockitoSugar {
   private var applicationTaskContextFactory: ApplicationTaskContextFactory[ApplicationTaskContext] = null
   @Mock
   private var applicationTaskContext: ApplicationTaskContext = null
+  @Mock
+  private var externalContext: ExternalContext = null
 
   private var taskInstance: TaskInstance = null
 
@@ -86,8 +88,8 @@ class TestTaskInstance extends AssertionsForJUnit with MockitoSugar {
     // not using Mockito mock since Mockito doesn't work well with the call-by-name argument in maybeHandle
     this.taskInstanceExceptionHandler = new MockTaskInstanceExceptionHandler
     when(this.taskModel.getTaskName).thenReturn(TASK_NAME)
-    when(this.applicationTaskContextFactory.create(Matchers.eq(this.jobContext), Matchers.eq(this.containerContext),
-      any(), Matchers.eq(this.applicationContainerContext)))
+    when(this.applicationTaskContextFactory.create(Matchers.eq(this.externalContext), Matchers.eq(this.jobContext),
+      Matchers.eq(this.containerContext), any(), Matchers.eq(this.applicationContainerContext)))
       .thenReturn(this.applicationTaskContext)
     when(this.systemAdmins.getSystemAdmin(SYSTEM_NAME)).thenReturn(this.systemAdmin)
     setupTaskInstance(Some(this.applicationTaskContextFactory))
@@ -121,6 +123,30 @@ class TestTaskInstance extends AssertionsForJUnit with MockitoSugar {
   }
 
   @Test
+  def testInitTask(): Unit = {
+    this.taskInstance.initTask
+
+    val contextCaptor = ArgumentCaptor.forClass(classOf[Context])
+    verify(this.task).init(contextCaptor.capture())
+    val actualContext = contextCaptor.getValue
+    assertEquals(this.jobContext, actualContext.getJobContext)
+    assertEquals(this.containerContext, actualContext.getContainerContext)
+    assertEquals(this.taskModel, actualContext.getTaskContext.getTaskModel)
+    assertEquals(this.applicationContainerContext, actualContext.getApplicationContainerContext)
+    assertEquals(this.applicationTaskContext, actualContext.getApplicationTaskContext)
+    assertEquals(this.externalContext, actualContext.getExternalContext)
+
+    verify(this.applicationTaskContext).start()
+  }
+
+  @Test
+  def testShutdownTask(): Unit = {
+    this.taskInstance.shutdownTask
+    verify(this.applicationTaskContext).stop()
+    verify(this.task).close()
+  }
+
+  @Test
   def testOffsetsAreUpdatedOnProcess() {
     when(this.metrics.processes).thenReturn(mock[Counter])
     when(this.metrics.messagesActuallyProcessed).thenReturn(mock[Counter])
@@ -202,22 +228,7 @@ class TestTaskInstance extends AssertionsForJUnit with MockitoSugar {
   }
 
   /**
-    * Given that an application task context factory is provided, then lifecycle calls should be made and the context
-    * should be accessible.
-    */
-  @Test
-  def testApplicationTaskContextFactoryProvided(): Unit = {
-    assertEquals(this.applicationTaskContext, this.taskInstance.context.getApplicationTaskContext)
-    this.taskInstance.initTask
-    verify(this.applicationTaskContext).start()
-    verify(this.applicationTaskContext, never()).stop()
-    this.taskInstance.shutdownTask
-    verify(this.applicationTaskContext).stop()
-  }
-
-  /**
-    * Given that no application task context factory is provided, then no lifecycle calls should be made. Also, an
-    * exception should be thrown if the application task context is accessed.
+    * Given that no application task context factory is provided, then no lifecycle calls should be made.
     */
   @Test
   def testNoApplicationTaskContextFactoryProvided() {
@@ -225,9 +236,6 @@ class TestTaskInstance extends AssertionsForJUnit with MockitoSugar {
     this.taskInstance.initTask
     this.taskInstance.shutdownTask
     verifyZeroInteractions(this.applicationTaskContext)
-    intercept[IllegalStateException] {
-      this.taskInstance.context.getApplicationTaskContext
-    }
   }
 
   @Test(expected = classOf[SystemProducerException])
@@ -257,13 +265,14 @@ class TestTaskInstance extends AssertionsForJUnit with MockitoSugar {
       jobContext = this.jobContext,
       containerContext = this.containerContext,
       applicationContainerContextOption = Some(this.applicationContainerContext),
-      applicationTaskContextFactoryOption = applicationTaskContextFactory)
+      applicationTaskContextFactoryOption = applicationTaskContextFactory,
+      externalContextOption = Some(this.externalContext))
   }
 
   /**
     * Task type which has all task traits, which can be mocked.
     */
-  trait AllTask extends StreamTask with InitableTask with WindowableTask {}
+  trait AllTask extends StreamTask with InitableTask with ClosableTask with WindowableTask {}
 
   /**
     * Mock version of [TaskInstanceExceptionHandler] which just does a passthrough execution and keeps track of the

http://git-wip-us.apache.org/repos/asf/samza/blob/e2adf8f9/samza-core/src/test/scala/org/apache/samza/processor/StreamProcessorTestUtils.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/test/scala/org/apache/samza/processor/StreamProcessorTestUtils.scala b/samza-core/src/test/scala/org/apache/samza/processor/StreamProcessorTestUtils.scala
index 69bae4b..9bb485a 100644
--- a/samza-core/src/test/scala/org/apache/samza/processor/StreamProcessorTestUtils.scala
+++ b/samza-core/src/test/scala/org/apache/samza/processor/StreamProcessorTestUtils.scala
@@ -57,7 +57,8 @@ object StreamProcessorTestUtils {
       jobContext = Mockito.mock(classOf[JobContext]),
       containerContext = containerContext,
       applicationContainerContextOption = None,
-      applicationTaskContextFactoryOption = None)
+      applicationTaskContextFactoryOption = None,
+      externalContextOption = None)
 
     val container = new SamzaContainer(
       config = config,
@@ -69,6 +70,7 @@ object StreamProcessorTestUtils {
       metrics = new SamzaContainerMetrics,
       containerContext = containerContext,
       applicationContainerContextOption = None,
+      externalContextOption = None,
       containerStorageManager = Mockito.mock(classOf[ContainerStorageManager]))
     container
   }

http://git-wip-us.apache.org/repos/asf/samza/blob/e2adf8f9/samza-sql-shell/src/main/java/org/apache/samza/sql/client/impl/SamzaExecutor.java
----------------------------------------------------------------------
diff --git a/samza-sql-shell/src/main/java/org/apache/samza/sql/client/impl/SamzaExecutor.java b/samza-sql-shell/src/main/java/org/apache/samza/sql/client/impl/SamzaExecutor.java
index 7c2ca32..bf72464 100755
--- a/samza-sql-shell/src/main/java/org/apache/samza/sql/client/impl/SamzaExecutor.java
+++ b/samza-sql-shell/src/main/java/org/apache/samza/sql/client/impl/SamzaExecutor.java
@@ -169,7 +169,7 @@ public class SamzaExecutor implements SqlExecutor {
     SamzaSqlApplicationRunner runner;
     try {
       runner = new SamzaSqlApplicationRunner(true, new MapConfig(staticConfigs));
-      runner.run();
+      runner.run(null);
     } catch (SamzaException ex) {
       lastErrorMsg = ex.toString();
       LOG.error(lastErrorMsg);
@@ -242,7 +242,7 @@ public class SamzaExecutor implements SqlExecutor {
     SamzaSqlApplicationRunner runner;
     try {
       runner = new SamzaSqlApplicationRunner(true, new MapConfig(staticConfigs));
-      runner.run();
+      runner.run(null);
     } catch (SamzaException ex) {
       lastErrorMsg = ex.toString();
       LOG.error(lastErrorMsg);

http://git-wip-us.apache.org/repos/asf/samza/blob/e2adf8f9/samza-sql/src/main/java/org/apache/samza/sql/runner/SamzaSqlApplication.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/runner/SamzaSqlApplication.java b/samza-sql/src/main/java/org/apache/samza/sql/runner/SamzaSqlApplication.java
index 868f5a2..4304b65 100644
--- a/samza-sql/src/main/java/org/apache/samza/sql/runner/SamzaSqlApplication.java
+++ b/samza-sql/src/main/java/org/apache/samza/sql/runner/SamzaSqlApplication.java
@@ -27,6 +27,13 @@ import java.util.Map;
 import org.apache.calcite.rel.RelRoot;
 import org.apache.samza.application.StreamApplication;
 import org.apache.samza.application.descriptors.StreamApplicationDescriptor;
+import org.apache.samza.context.ApplicationContainerContext;
+import org.apache.samza.context.ApplicationTaskContext;
+import org.apache.samza.context.ApplicationTaskContextFactory;
+import org.apache.samza.context.ContainerContext;
+import org.apache.samza.context.ExternalContext;
+import org.apache.samza.context.JobContext;
+import org.apache.samza.context.TaskContext;
 import org.apache.samza.sql.data.SamzaSqlExecutionContext;
 import org.apache.samza.sql.dsl.SamzaSqlDslConverter;
 import org.apache.samza.sql.translator.QueryTranslator;
@@ -82,11 +89,14 @@ public class SamzaSqlApplication implements StreamApplication {
        * container, so it does not need to be serialized. Therefore, the translatorContext is recreated in each container
        * and does not need to be serialized.
        */
-      appDescriptor.withApplicationTaskContextFactory((jobContext,
-          containerContext,
-          taskContext,
-          applicationContainerContext) ->
-          new SamzaSqlApplicationContext(translatorContextMap));
+      appDescriptor.withApplicationTaskContextFactory(new ApplicationTaskContextFactory<SamzaSqlApplicationContext>() {
+        @Override
+        public SamzaSqlApplicationContext create(ExternalContext externalContext, JobContext jobContext,
+            ContainerContext containerContext, TaskContext taskContext,
+            ApplicationContainerContext applicationContainerContext) {
+          return new SamzaSqlApplicationContext(translatorContextMap);
+        }
+      });
     } catch (RuntimeException e) {
       LOG.error("SamzaSqlApplication threw exception.", e);
       throw e;

http://git-wip-us.apache.org/repos/asf/samza/blob/e2adf8f9/samza-sql/src/main/java/org/apache/samza/sql/runner/SamzaSqlApplicationRunner.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/runner/SamzaSqlApplicationRunner.java b/samza-sql/src/main/java/org/apache/samza/sql/runner/SamzaSqlApplicationRunner.java
index d9a44ec..3ff170a 100644
--- a/samza-sql/src/main/java/org/apache/samza/sql/runner/SamzaSqlApplicationRunner.java
+++ b/samza-sql/src/main/java/org/apache/samza/sql/runner/SamzaSqlApplicationRunner.java
@@ -28,6 +28,7 @@ import org.apache.commons.lang3.Validate;
 import org.apache.samza.application.SamzaApplication;
 import org.apache.samza.config.Config;
 import org.apache.samza.config.MapConfig;
+import org.apache.samza.context.ExternalContext;
 import org.apache.samza.job.ApplicationStatus;
 import org.apache.samza.runtime.ApplicationRunner;
 import org.apache.samza.runtime.ApplicationRunners;
@@ -118,14 +119,18 @@ public class SamzaSqlApplicationRunner implements ApplicationRunner {
   }
 
   public void runAndWaitForFinish() {
+    runAndWaitForFinish(null);
+  }
+
+  public void runAndWaitForFinish(ExternalContext externalContext) {
     Validate.isTrue(runner instanceof LocalApplicationRunner, "This method can be called only in standalone mode.");
-    run();
+    run(externalContext);
     waitForFinish();
   }
 
   @Override
-  public void run() {
-    runner.run();
+  public void run(ExternalContext externalContext) {
+    runner.run(externalContext);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/samza/blob/e2adf8f9/samza-sql/src/main/java/org/apache/samza/sql/translator/QueryTranslator.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/translator/QueryTranslator.java b/samza-sql/src/main/java/org/apache/samza/sql/translator/QueryTranslator.java
index fef5471..12e83c3 100644
--- a/samza-sql/src/main/java/org/apache/samza/sql/translator/QueryTranslator.java
+++ b/samza-sql/src/main/java/org/apache/samza/sql/translator/QueryTranslator.java
@@ -35,7 +35,13 @@ import org.apache.calcite.rel.logical.LogicalProject;
 import org.apache.commons.lang.Validate;
 import org.apache.samza.SamzaException;
 import org.apache.samza.application.descriptors.StreamApplicationDescriptor;
+import org.apache.samza.context.ApplicationContainerContext;
+import org.apache.samza.context.ApplicationTaskContextFactory;
+import org.apache.samza.context.ContainerContext;
 import org.apache.samza.context.Context;
+import org.apache.samza.context.ExternalContext;
+import org.apache.samza.context.JobContext;
+import org.apache.samza.context.TaskContext;
 import org.apache.samza.operators.KV;
 import org.apache.samza.operators.MessageStream;
 import org.apache.samza.operators.OutputStream;
@@ -113,11 +119,14 @@ public class QueryTranslator {
     translate(relRoot, sqlConfig.getOutputSystemStreams().get(queryId), translatorContext, queryId);
     Map<Integer, TranslatorContext> translatorContexts = new HashMap<>();
     translatorContexts.put(queryId, translatorContext.clone());
-    appDesc.withApplicationTaskContextFactory((jobContext,
-        containerContext,
-        taskContext,
-        applicationContainerContext) ->
-        new SamzaSqlApplicationContext(translatorContexts));
+    appDesc.withApplicationTaskContextFactory(new ApplicationTaskContextFactory<SamzaSqlApplicationContext>() {
+      @Override
+      public SamzaSqlApplicationContext create(ExternalContext externalContext, JobContext jobContext,
+          ContainerContext containerContext, TaskContext taskContext,
+          ApplicationContainerContext applicationContainerContext) {
+        return new SamzaSqlApplicationContext(translatorContexts);
+      }
+    });
   }
 
   /**