You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by pm...@apache.org on 2018/10/10 22:34:27 UTC

[4/5] samza git commit: SAMZA-1714: Creating shared context factory for shared context objects

http://git-wip-us.apache.org/repos/asf/samza/blob/9d2d49e9/samza-core/src/main/java/org/apache/samza/operators/impl/PartitionByOperatorImpl.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/operators/impl/PartitionByOperatorImpl.java b/samza-core/src/main/java/org/apache/samza/operators/impl/PartitionByOperatorImpl.java
index 63e269d..88644ce 100644
--- a/samza-core/src/main/java/org/apache/samza/operators/impl/PartitionByOperatorImpl.java
+++ b/samza-core/src/main/java/org/apache/samza/operators/impl/PartitionByOperatorImpl.java
@@ -18,8 +18,8 @@
  */
 package org.apache.samza.operators.impl;
 
-import org.apache.samza.config.Config;
-import org.apache.samza.container.TaskContextImpl;
+import org.apache.samza.context.Context;
+import org.apache.samza.context.TaskContextImpl;
 import org.apache.samza.operators.functions.MapFunction;
 import org.apache.samza.operators.spec.OperatorSpec;
 import org.apache.samza.operators.spec.PartitionByOperatorSpec;
@@ -30,7 +30,6 @@ import org.apache.samza.system.StreamMetadataCache;
 import org.apache.samza.system.SystemStream;
 import org.apache.samza.system.WatermarkMessage;
 import org.apache.samza.task.MessageCollector;
-import org.apache.samza.task.TaskContext;
 import org.apache.samza.task.TaskCoordinator;
 
 import java.util.Collection;
@@ -50,20 +49,20 @@ class PartitionByOperatorImpl<M, K, V> extends OperatorImpl<M, Void> {
   private final ControlMessageSender controlMessageSender;
 
   PartitionByOperatorImpl(PartitionByOperatorSpec<M, K, V> partitionByOpSpec,
-      SystemStream systemStream, TaskContext context) {
+      SystemStream systemStream, Context context) {
     this.partitionByOpSpec = partitionByOpSpec;
     this.systemStream = systemStream;
     this.keyFunction = partitionByOpSpec.getKeyFunction();
     this.valueFunction = partitionByOpSpec.getValueFunction();
-    this.taskName = context.getTaskName().getTaskName();
-    StreamMetadataCache streamMetadataCache = ((TaskContextImpl) context).getStreamMetadataCache();
+    this.taskName = context.getTaskContext().getTaskModel().getTaskName().getTaskName();
+    StreamMetadataCache streamMetadataCache = ((TaskContextImpl) context.getTaskContext()).getStreamMetadataCache();
     this.controlMessageSender = new ControlMessageSender(streamMetadataCache);
   }
 
   @Override
-  protected void handleInit(Config config, TaskContext context) {
-    this.keyFunction.init(config, context);
-    this.valueFunction.init(config, context);
+  protected void handleInit(Context context) {
+    this.keyFunction.init(context);
+    this.valueFunction.init(context);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/samza/blob/9d2d49e9/samza-core/src/main/java/org/apache/samza/operators/impl/SendToTableOperatorImpl.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/operators/impl/SendToTableOperatorImpl.java b/samza-core/src/main/java/org/apache/samza/operators/impl/SendToTableOperatorImpl.java
index 5ce1328..be3e0a3 100644
--- a/samza-core/src/main/java/org/apache/samza/operators/impl/SendToTableOperatorImpl.java
+++ b/samza-core/src/main/java/org/apache/samza/operators/impl/SendToTableOperatorImpl.java
@@ -18,18 +18,17 @@
  */
 package org.apache.samza.operators.impl;
 
-import java.util.Collection;
-import java.util.Collections;
-
-import org.apache.samza.config.Config;
+import org.apache.samza.context.Context;
 import org.apache.samza.operators.KV;
 import org.apache.samza.operators.spec.OperatorSpec;
 import org.apache.samza.operators.spec.SendToTableOperatorSpec;
 import org.apache.samza.table.ReadWriteTable;
 import org.apache.samza.task.MessageCollector;
-import org.apache.samza.task.TaskContext;
 import org.apache.samza.task.TaskCoordinator;
 
+import java.util.Collection;
+import java.util.Collections;
+
 
 /**
  * Implementation of a send-stream-to-table operator that stores the record
@@ -43,13 +42,13 @@ public class SendToTableOperatorImpl<K, V> extends OperatorImpl<KV<K, V>, Void>
   private final SendToTableOperatorSpec<K, V> sendToTableOpSpec;
   private final ReadWriteTable<K, V> table;
 
-  SendToTableOperatorImpl(SendToTableOperatorSpec<K, V> sendToTableOpSpec, Config config, TaskContext context) {
+  SendToTableOperatorImpl(SendToTableOperatorSpec<K, V> sendToTableOpSpec, Context context) {
     this.sendToTableOpSpec = sendToTableOpSpec;
-    this.table = (ReadWriteTable) context.getTable(sendToTableOpSpec.getTableSpec().getId());
+    this.table = (ReadWriteTable) context.getTaskContext().getTable(sendToTableOpSpec.getTableSpec().getId());
   }
 
   @Override
-  protected void handleInit(Config config, TaskContext context) {
+  protected void handleInit(Context context) {
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/samza/blob/9d2d49e9/samza-core/src/main/java/org/apache/samza/operators/impl/SinkOperatorImpl.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/operators/impl/SinkOperatorImpl.java b/samza-core/src/main/java/org/apache/samza/operators/impl/SinkOperatorImpl.java
index 5dbe27f..6fe9006 100644
--- a/samza-core/src/main/java/org/apache/samza/operators/impl/SinkOperatorImpl.java
+++ b/samza-core/src/main/java/org/apache/samza/operators/impl/SinkOperatorImpl.java
@@ -18,12 +18,11 @@
  */
 package org.apache.samza.operators.impl;
 
-import org.apache.samza.config.Config;
+import org.apache.samza.context.Context;
 import org.apache.samza.operators.functions.SinkFunction;
 import org.apache.samza.operators.spec.OperatorSpec;
 import org.apache.samza.operators.spec.SinkOperatorSpec;
 import org.apache.samza.task.MessageCollector;
-import org.apache.samza.task.TaskContext;
 import org.apache.samza.task.TaskCoordinator;
 
 import java.util.Collection;
@@ -38,14 +37,14 @@ class SinkOperatorImpl<M> extends OperatorImpl<M, Void> {
   private final SinkOperatorSpec<M> sinkOpSpec;
   private final SinkFunction<M> sinkFn;
 
-  SinkOperatorImpl(SinkOperatorSpec<M> sinkOpSpec, Config config, TaskContext context) {
+  SinkOperatorImpl(SinkOperatorSpec<M> sinkOpSpec) {
     this.sinkOpSpec = sinkOpSpec;
     this.sinkFn = sinkOpSpec.getSinkFn();
   }
 
   @Override
-  protected void handleInit(Config config, TaskContext context) {
-    this.sinkFn.init(config, context);
+  protected void handleInit(Context context) {
+    this.sinkFn.init(context);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/samza/blob/9d2d49e9/samza-core/src/main/java/org/apache/samza/operators/impl/StreamOperatorImpl.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/operators/impl/StreamOperatorImpl.java b/samza-core/src/main/java/org/apache/samza/operators/impl/StreamOperatorImpl.java
index 6cd426b..1a615bd 100644
--- a/samza-core/src/main/java/org/apache/samza/operators/impl/StreamOperatorImpl.java
+++ b/samza-core/src/main/java/org/apache/samza/operators/impl/StreamOperatorImpl.java
@@ -18,12 +18,11 @@
  */
 package org.apache.samza.operators.impl;
 
-import org.apache.samza.config.Config;
+import org.apache.samza.context.Context;
 import org.apache.samza.operators.functions.FlatMapFunction;
 import org.apache.samza.operators.spec.OperatorSpec;
 import org.apache.samza.operators.spec.StreamOperatorSpec;
 import org.apache.samza.task.MessageCollector;
-import org.apache.samza.task.TaskContext;
 import org.apache.samza.task.TaskCoordinator;
 
 import java.util.Collection;
@@ -46,8 +45,8 @@ class StreamOperatorImpl<M, RM> extends OperatorImpl<M, RM> {
   }
 
   @Override
-  protected void handleInit(Config config, TaskContext context) {
-    transformFn.init(config, context);
+  protected void handleInit(Context context) {
+    transformFn.init(context);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/samza/blob/9d2d49e9/samza-core/src/main/java/org/apache/samza/operators/impl/StreamTableJoinOperatorImpl.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/operators/impl/StreamTableJoinOperatorImpl.java b/samza-core/src/main/java/org/apache/samza/operators/impl/StreamTableJoinOperatorImpl.java
index 54a5770..d44241d 100644
--- a/samza-core/src/main/java/org/apache/samza/operators/impl/StreamTableJoinOperatorImpl.java
+++ b/samza-core/src/main/java/org/apache/samza/operators/impl/StreamTableJoinOperatorImpl.java
@@ -18,18 +18,17 @@
  */
 package org.apache.samza.operators.impl;
 
-import java.util.Collection;
-import java.util.Collections;
-
-import org.apache.samza.config.Config;
+import org.apache.samza.context.Context;
 import org.apache.samza.operators.KV;
 import org.apache.samza.operators.spec.OperatorSpec;
 import org.apache.samza.operators.spec.StreamTableJoinOperatorSpec;
 import org.apache.samza.table.ReadableTable;
 import org.apache.samza.task.MessageCollector;
-import org.apache.samza.task.TaskContext;
 import org.apache.samza.task.TaskCoordinator;
 
+import java.util.Collection;
+import java.util.Collections;
+
 
 /**
  * Implementation of a stream-table join operator that first retrieve the value of
@@ -45,15 +44,14 @@ class StreamTableJoinOperatorImpl<K, M, R extends KV, JM> extends OperatorImpl<M
   private final StreamTableJoinOperatorSpec<K, M, R, JM> joinOpSpec;
   private final ReadableTable<K, ?> table;
 
-  StreamTableJoinOperatorImpl(StreamTableJoinOperatorSpec<K, M, R, JM> joinOpSpec,
-      Config config, TaskContext context) {
+  StreamTableJoinOperatorImpl(StreamTableJoinOperatorSpec<K, M, R, JM> joinOpSpec, Context context) {
     this.joinOpSpec = joinOpSpec;
-    this.table = (ReadableTable) context.getTable(joinOpSpec.getTableSpec().getId());
+    this.table = (ReadableTable) context.getTaskContext().getTable(joinOpSpec.getTableSpec().getId());
   }
 
   @Override
-  protected void handleInit(Config config, TaskContext context) {
-    this.joinOpSpec.getJoinFn().init(config, context);
+  protected void handleInit(Context context) {
+    this.joinOpSpec.getJoinFn().init(context);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/samza/blob/9d2d49e9/samza-core/src/main/java/org/apache/samza/operators/impl/WindowOperatorImpl.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/operators/impl/WindowOperatorImpl.java b/samza-core/src/main/java/org/apache/samza/operators/impl/WindowOperatorImpl.java
index b175671..c09c5f8 100644
--- a/samza-core/src/main/java/org/apache/samza/operators/impl/WindowOperatorImpl.java
+++ b/samza-core/src/main/java/org/apache/samza/operators/impl/WindowOperatorImpl.java
@@ -21,14 +21,13 @@
 package org.apache.samza.operators.impl;
 
 import com.google.common.base.Preconditions;
-import org.apache.samza.config.Config;
+import org.apache.samza.context.Context;
 import org.apache.samza.operators.functions.FoldLeftFunction;
 import org.apache.samza.operators.functions.MapFunction;
 import org.apache.samza.operators.functions.SupplierFunction;
 import org.apache.samza.operators.impl.store.TimeSeriesKey;
 import org.apache.samza.operators.impl.store.TimeSeriesStore;
 import org.apache.samza.operators.impl.store.TimeSeriesStoreImpl;
-import org.apache.samza.util.TimestampedValue;
 import org.apache.samza.operators.spec.OperatorSpec;
 import org.apache.samza.operators.spec.WindowOperatorSpec;
 import org.apache.samza.operators.triggers.FiringType;
@@ -45,9 +44,9 @@ import org.apache.samza.operators.windows.internal.WindowType;
 import org.apache.samza.storage.kv.ClosableIterator;
 import org.apache.samza.storage.kv.KeyValueStore;
 import org.apache.samza.task.MessageCollector;
-import org.apache.samza.task.TaskContext;
 import org.apache.samza.task.TaskCoordinator;
 import org.apache.samza.util.Clock;
+import org.apache.samza.util.TimestampedValue;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -111,23 +110,23 @@ public class WindowOperatorImpl<M, K> extends OperatorImpl<M, WindowPane<K, Obje
   }
 
   @Override
-  protected void handleInit(Config config, TaskContext context) {
+  protected void handleInit(Context context) {
 
     KeyValueStore<TimeSeriesKey<K>, Object> store =
-        (KeyValueStore<TimeSeriesKey<K>, Object>) context.getStore(windowOpSpec.getOpId());
+        (KeyValueStore<TimeSeriesKey<K>, Object>) context.getTaskContext().getStore(windowOpSpec.getOpId());
 
     if (initializer != null) {
-      initializer.init(config, context);
+      initializer.init(context);
     }
 
     if (keyFn != null) {
-      keyFn.init(config, context);
+      keyFn.init(context);
     }
 
     // For aggregating windows, we use the store in over-write mode since we only retain the aggregated
     // value. Else, we use the store in append-mode.
     if (foldLeftFn != null) {
-      foldLeftFn.init(config, context);
+      foldLeftFn.init(context);
       timeSeriesStore = new TimeSeriesStoreImpl(store, false);
     } else {
       timeSeriesStore = new TimeSeriesStoreImpl(store, true);

http://git-wip-us.apache.org/repos/asf/samza/blob/9d2d49e9/samza-core/src/main/java/org/apache/samza/operators/spec/FilterOperatorSpec.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/operators/spec/FilterOperatorSpec.java b/samza-core/src/main/java/org/apache/samza/operators/spec/FilterOperatorSpec.java
index 4e640dc..c1d62f5 100644
--- a/samza-core/src/main/java/org/apache/samza/operators/spec/FilterOperatorSpec.java
+++ b/samza-core/src/main/java/org/apache/samza/operators/spec/FilterOperatorSpec.java
@@ -20,12 +20,11 @@ package org.apache.samza.operators.spec;
 
 import java.util.ArrayList;
 import java.util.Collection;
-import org.apache.samza.config.Config;
+import org.apache.samza.context.Context;
 import org.apache.samza.operators.functions.FilterFunction;
 import org.apache.samza.operators.functions.FlatMapFunction;
 import org.apache.samza.operators.functions.ScheduledFunction;
 import org.apache.samza.operators.functions.WatermarkFunction;
-import org.apache.samza.task.TaskContext;
 
 
 /**
@@ -50,8 +49,8 @@ class FilterOperatorSpec<M> extends StreamOperatorSpec<M, M> {
       }
 
       @Override
-      public void init(Config config, TaskContext context) {
-        filterFn.init(config, context);
+      public void init(Context context) {
+        filterFn.init(context);
       }
 
       @Override

http://git-wip-us.apache.org/repos/asf/samza/blob/9d2d49e9/samza-core/src/main/java/org/apache/samza/operators/spec/MapOperatorSpec.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/operators/spec/MapOperatorSpec.java b/samza-core/src/main/java/org/apache/samza/operators/spec/MapOperatorSpec.java
index 6ce522f..d3a587a 100644
--- a/samza-core/src/main/java/org/apache/samza/operators/spec/MapOperatorSpec.java
+++ b/samza-core/src/main/java/org/apache/samza/operators/spec/MapOperatorSpec.java
@@ -20,12 +20,11 @@ package org.apache.samza.operators.spec;
 
 import java.util.ArrayList;
 import java.util.Collection;
-import org.apache.samza.config.Config;
+import org.apache.samza.context.Context;
 import org.apache.samza.operators.functions.FlatMapFunction;
 import org.apache.samza.operators.functions.MapFunction;
 import org.apache.samza.operators.functions.ScheduledFunction;
 import org.apache.samza.operators.functions.WatermarkFunction;
-import org.apache.samza.task.TaskContext;
 
 
 /**
@@ -53,8 +52,8 @@ class MapOperatorSpec<M, OM> extends StreamOperatorSpec<M, OM> {
       }
 
       @Override
-      public void init(Config config, TaskContext context) {
-        mapFn.init(config, context);
+      public void init(Context context) {
+        mapFn.init(context);
       }
 
       @Override

http://git-wip-us.apache.org/repos/asf/samza/blob/9d2d49e9/samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java b/samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java
index 26e52f2..3149989 100644
--- a/samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java
+++ b/samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java
@@ -23,6 +23,7 @@ import com.google.common.base.Preconditions;
 import com.google.common.collect.ImmutableList;
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import java.util.Map;
+import java.util.Optional;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
@@ -35,6 +36,11 @@ import org.apache.samza.config.JobCoordinatorConfig;
 import org.apache.samza.config.TaskConfigJava;
 import org.apache.samza.container.SamzaContainer;
 import org.apache.samza.container.SamzaContainerListener;
+import org.apache.samza.context.ApplicationContainerContext;
+import org.apache.samza.context.ApplicationContainerContextFactory;
+import org.apache.samza.context.ApplicationTaskContext;
+import org.apache.samza.context.ApplicationTaskContextFactory;
+import org.apache.samza.context.JobContextImpl;
 import org.apache.samza.coordinator.JobCoordinator;
 import org.apache.samza.coordinator.JobCoordinatorFactory;
 import org.apache.samza.coordinator.JobCoordinatorListener;
@@ -46,6 +52,8 @@ import org.apache.samza.util.ScalaJavaUtil;
 import org.apache.samza.util.Util;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import scala.Option;
+
 
 /**
  * StreamProcessor can be embedded in any application or executed in a distributed environment (aka cluster) as an
@@ -97,6 +105,16 @@ public class StreamProcessor {
   private final JobCoordinator jobCoordinator;
   private final ProcessorLifecycleListener processorListener;
   private final TaskFactory taskFactory;
+  /**
+   * Type parameter needs to be {@link ApplicationContainerContext} so that we can eventually call the base methods of
+   * the context object.
+   */
+  private final Optional<ApplicationContainerContextFactory<ApplicationContainerContext>> applicationDefinedContainerContextFactoryOptional;
+  /**
+   * Type parameter needs to be {@link ApplicationTaskContext} so that we can eventually call the base methods of the
+   * context object.
+   */
+  private final Optional<ApplicationTaskContextFactory<ApplicationTaskContext>> applicationDefinedTaskContextFactoryOptional;
   private final Map<String, MetricsReporter> customMetricsReporter;
   private final Config config;
   private final long taskShutdownMs;
@@ -143,57 +161,60 @@ public class StreamProcessor {
   JobCoordinatorListener jobCoordinatorListener = null;
 
   /**
-   * StreamProcessor encapsulates and manages the lifecycle of {@link JobCoordinator} and {@link SamzaContainer}.
-   *
-   * <p>
-   * On startup, StreamProcessor starts the JobCoordinator. Schedules the SamzaContainer to run in a ExecutorService
-   * when it receives new {@link JobModel} from JobCoordinator.
-   * <p>
-   *
-   * <b>Note:</b> Lifecycle of the ExecutorService is fully managed by the StreamProcessor.
+   * Same as {@link #StreamProcessor(Config, Map, TaskFactory, ProcessorLifecycleListener, JobCoordinator)}, except
+   * it creates a {@link JobCoordinator} instead of accepting it as an argument.
    *
-   * @param config configuration required to launch {@link JobCoordinator} and {@link SamzaContainer}.
-   * @param customMetricsReporters metricReporter instances that will be used by SamzaContainer and JobCoordinator to report metrics.
-   * @param taskFactory the {@link TaskFactory} to be used for creating task instances.
-   * @param processorListener listener to the StreamProcessor life cycle.
+   * Deprecated: Use {@link #StreamProcessor(Config, Map, TaskFactory, Optional, Optional,
+   * StreamProcessorLifecycleListenerFactory, JobCoordinator)} instead.
    */
+  @Deprecated
   public StreamProcessor(Config config, Map<String, MetricsReporter> customMetricsReporters, TaskFactory taskFactory,
       ProcessorLifecycleListener processorListener) {
     this(config, customMetricsReporters, taskFactory, processorListener, null);
   }
 
   /**
-   * Same as {@link #StreamProcessor(Config, Map, TaskFactory, ProcessorLifecycleListener)}, except the
-   * {@link JobCoordinator} is given for this {@link StreamProcessor}.
-   * @param config configuration required to launch {@link JobCoordinator} and {@link SamzaContainer}
-   * @param customMetricsReporters metric Reporter
-   * @param taskFactory task factory to instantiate the Task
+   * Same as {@link #StreamProcessor(Config, Map, TaskFactory, Optional, Optional,
+   * StreamProcessorLifecycleListenerFactory, JobCoordinator)}, with the following differences:
+   * <ol>
+   *   <li>Passes null for application-defined context factories</li>
+   *   <li>Accepts a {@link ProcessorLifecycleListener} directly instead of a
+   *   {@link StreamProcessorLifecycleListenerFactory}</li>
+   * </ol>
+   * Deprecated: Use {@link #StreamProcessor(Config, Map, TaskFactory, Optional, Optional,
+   * StreamProcessorLifecycleListenerFactory, JobCoordinator)} instead.
+   *
    * @param processorListener listener to the StreamProcessor life cycle
-   * @param jobCoordinator the instance of {@link JobCoordinator}
    */
+  @Deprecated
   public StreamProcessor(Config config, Map<String, MetricsReporter> customMetricsReporters, TaskFactory taskFactory,
       ProcessorLifecycleListener processorListener, JobCoordinator jobCoordinator) {
-    this(config, customMetricsReporters, taskFactory, sp -> processorListener, jobCoordinator);
+    this(config, customMetricsReporters, taskFactory, Optional.empty(), Optional.empty(), sp -> processorListener,
+        jobCoordinator);
   }
 
   /**
-   * Same as {@link #StreamProcessor(Config, Map, TaskFactory, ProcessorLifecycleListener, JobCoordinator)}, except
-   * there is a {@link StreamProcessorLifecycleListenerFactory} as input instead of {@link ProcessorLifecycleListener}.
-   * This is useful to create a {@link ProcessorLifecycleListener} with a reference to this {@link StreamProcessor}
+   * Builds a {@link StreamProcessor} with full specification of processing components.
    *
    * @param config configuration required to launch {@link JobCoordinator} and {@link SamzaContainer}
-   * @param customMetricsReporters metric Reporter
+   * @param customMetricsReporters registered with the metrics system to report metrics
    * @param taskFactory task factory to instantiate the Task
-   * @param listenerFactory listener to the StreamProcessor life cycle
+   * @param applicationDefinedContainerContextFactoryOptional optional factory for application-defined container context
+   * @param applicationDefinedTaskContextFactoryOptional optional factory for application-defined task context
+   * @param listenerFactory factory for creating a listener to the StreamProcessor life cycle
    * @param jobCoordinator the instance of {@link JobCoordinator}
    */
   public StreamProcessor(Config config, Map<String, MetricsReporter> customMetricsReporters, TaskFactory taskFactory,
+      Optional<ApplicationContainerContextFactory<ApplicationContainerContext>> applicationDefinedContainerContextFactoryOptional,
+      Optional<ApplicationTaskContextFactory<ApplicationTaskContext>> applicationDefinedTaskContextFactoryOptional,
       StreamProcessorLifecycleListenerFactory listenerFactory, JobCoordinator jobCoordinator) {
     Preconditions.checkNotNull(listenerFactory, "StreamProcessorListenerFactory cannot be null.");
-    this.taskFactory = taskFactory;
     this.config = config;
-    this.taskShutdownMs = new TaskConfigJava(config).getShutdownMs();
     this.customMetricsReporter = customMetricsReporters;
+    this.taskFactory = taskFactory;
+    this.applicationDefinedContainerContextFactoryOptional = applicationDefinedContainerContextFactoryOptional;
+    this.applicationDefinedTaskContextFactoryOptional = applicationDefinedTaskContextFactoryOptional;
+    this.taskShutdownMs = new TaskConfigJava(config).getShutdownMs();
     this.jobCoordinator = (jobCoordinator != null) ? jobCoordinator : createJobCoordinator();
     this.jobCoordinatorListener = createJobCoordinatorListener();
     this.jobCoordinator.setListener(jobCoordinatorListener);
@@ -283,7 +304,10 @@ public class StreamProcessor {
 
   @VisibleForTesting
   SamzaContainer createSamzaContainer(String processorId, JobModel jobModel) {
-    return SamzaContainer.apply(processorId, jobModel, config, ScalaJavaUtil.toScalaMap(customMetricsReporter), taskFactory);
+    return SamzaContainer.apply(processorId, jobModel, ScalaJavaUtil.toScalaMap(this.customMetricsReporter),
+        this.taskFactory, JobContextImpl.fromConfigWithDefaults(this.config),
+        Option.apply(this.applicationDefinedContainerContextFactoryOptional.orElse(null)),
+        Option.apply(this.applicationDefinedTaskContextFactoryOptional.orElse(null)));
   }
 
   private JobCoordinator createJobCoordinator() {

http://git-wip-us.apache.org/repos/asf/samza/blob/9d2d49e9/samza-core/src/main/java/org/apache/samza/runtime/LocalApplicationRunner.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/runtime/LocalApplicationRunner.java b/samza-core/src/main/java/org/apache/samza/runtime/LocalApplicationRunner.java
index 7100482..a5eeba1 100644
--- a/samza-core/src/main/java/org/apache/samza/runtime/LocalApplicationRunner.java
+++ b/samza-core/src/main/java/org/apache/samza/runtime/LocalApplicationRunner.java
@@ -167,7 +167,8 @@ public class LocalApplicationRunner implements ApplicationRunner {
     // TODO: the null processorId has to be fixed after SAMZA-1835
     appDesc.getMetricsReporterFactories().forEach((name, factory) ->
         reporters.put(name, factory.getMetricsReporter(name, null, config)));
-    return new StreamProcessor(config, reporters, taskFactory, listenerFactory, null);
+    return new StreamProcessor(config, reporters, taskFactory, appDesc.getApplicationContainerContextFactory(),
+        appDesc.getApplicationTaskContextFactory(), listenerFactory, null);
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/samza/blob/9d2d49e9/samza-core/src/main/java/org/apache/samza/runtime/LocalContainerRunner.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/runtime/LocalContainerRunner.java b/samza-core/src/main/java/org/apache/samza/runtime/LocalContainerRunner.java
index add7e69..94ff1eb 100644
--- a/samza-core/src/main/java/org/apache/samza/runtime/LocalContainerRunner.java
+++ b/samza-core/src/main/java/org/apache/samza/runtime/LocalContainerRunner.java
@@ -25,8 +25,8 @@ import java.util.Random;
 import org.slf4j.MDC;
 import org.apache.samza.SamzaException;
 import org.apache.samza.application.ApplicationDescriptor;
-import org.apache.samza.application.ApplicationDescriptorUtil;
 import org.apache.samza.application.ApplicationDescriptorImpl;
+import org.apache.samza.application.ApplicationDescriptorUtil;
 import org.apache.samza.application.ApplicationUtil;
 import org.apache.samza.config.Config;
 import org.apache.samza.config.JobConfig;
@@ -36,6 +36,7 @@ import org.apache.samza.container.ContainerHeartbeatMonitor;
 import org.apache.samza.container.SamzaContainer;
 import org.apache.samza.container.SamzaContainer$;
 import org.apache.samza.container.SamzaContainerListener;
+import org.apache.samza.context.JobContextImpl;
 import org.apache.samza.job.model.JobModel;
 import org.apache.samza.metrics.MetricsReporter;
 import org.apache.samza.task.TaskFactory;
@@ -44,6 +45,8 @@ import org.apache.samza.util.SamzaUncaughtExceptionHandler;
 import org.apache.samza.util.ScalaJavaUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import scala.Option;
+
 
 /**
  * Launches and manages the lifecycle for {@link SamzaContainer}s in YARN.
@@ -93,9 +96,11 @@ public class LocalContainerRunner {
     SamzaContainer container = SamzaContainer$.MODULE$.apply(
         containerId,
         jobModel,
-        config,
         ScalaJavaUtil.toScalaMap(loadMetricsReporters(appDesc, containerId, config)),
-        taskFactory);
+        taskFactory,
+        JobContextImpl.fromConfigWithDefaults(config),
+        Option.apply(appDesc.getApplicationContainerContextFactory().orElse(null)),
+        Option.apply(appDesc.getApplicationTaskContextFactory().orElse(null)));
 
     ProcessorLifecycleListener listener = appDesc.getProcessorLifecycleListenerFactory()
         .createInstance(new ProcessorContext() { }, config);

http://git-wip-us.apache.org/repos/asf/samza/blob/9d2d49e9/samza-core/src/main/java/org/apache/samza/storage/StorageRecovery.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/storage/StorageRecovery.java b/samza-core/src/main/java/org/apache/samza/storage/StorageRecovery.java
index 9a76d75..be074ee 100644
--- a/samza-core/src/main/java/org/apache/samza/storage/StorageRecovery.java
+++ b/samza-core/src/main/java/org/apache/samza/storage/StorageRecovery.java
@@ -27,13 +27,14 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
-
 import org.apache.samza.SamzaException;
 import org.apache.samza.config.Config;
 import org.apache.samza.config.JavaStorageConfig;
 import org.apache.samza.config.JavaSystemConfig;
 import org.apache.samza.config.StorageConfig;
-import org.apache.samza.container.SamzaContainerContext;
+import org.apache.samza.context.ContainerContext;
+import org.apache.samza.context.ContainerContextImpl;
+import org.apache.samza.context.JobContextImpl;
 import org.apache.samza.coordinator.JobModelManager;
 import org.apache.samza.coordinator.stream.CoordinatorStreamManager;
 import org.apache.samza.job.model.ContainerModel;
@@ -209,8 +210,7 @@ public class StorageRecovery extends CommandLine {
 
     for (ContainerModel containerModel : containers.values()) {
       HashMap<String, StorageEngine> taskStores = new HashMap<String, StorageEngine>();
-      SamzaContainerContext containerContext = new SamzaContainerContext(containerModel.getId(), jobConfig, containerModel.getTasks()
-          .keySet(), new MetricsRegistryMap());
+      ContainerContext containerContext = new ContainerContextImpl(containerModel, new MetricsRegistryMap());
 
       for (TaskModel taskModel : containerModel.getTasks().values()) {
         HashMap<String, SystemConsumer> storeConsumers = getStoreConsumers();
@@ -233,6 +233,7 @@ public class StorageRecovery extends CommandLine {
                 null,
                 new MetricsRegistryMap(),
                 changeLogSystemStreamPartition,
+                JobContextImpl.fromConfigWithDefaults(jobConfig),
                 containerContext);
             taskStores.put(storeName, storageEngine);
           }

http://git-wip-us.apache.org/repos/asf/samza/blob/9d2d49e9/samza-core/src/main/java/org/apache/samza/table/TableManager.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/table/TableManager.java b/samza-core/src/main/java/org/apache/samza/table/TableManager.java
index ae72414..d7b15a4 100644
--- a/samza-core/src/main/java/org/apache/samza/table/TableManager.java
+++ b/samza-core/src/main/java/org/apache/samza/table/TableManager.java
@@ -18,21 +18,19 @@
  */
 package org.apache.samza.table;
 
-import java.util.HashMap;
-import java.util.Map;
-
+import com.google.common.base.Preconditions;
 import org.apache.samza.SamzaException;
 import org.apache.samza.config.Config;
 import org.apache.samza.config.JavaTableConfig;
-import org.apache.samza.container.SamzaContainerContext;
+import org.apache.samza.context.Context;
 import org.apache.samza.serializers.KVSerde;
 import org.apache.samza.serializers.Serde;
-import org.apache.samza.task.TaskContext;
 import org.apache.samza.util.Util;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.google.common.base.Preconditions;
+import java.util.HashMap;
+import java.util.Map;
 
 
 /**
@@ -97,12 +95,10 @@ public class TableManager {
 
   /**
    * Initialize table providers with container and task contexts
-   * @param containerContext context for the Samza container
-   * @param taskContext context for the current task, nullable for global tables
+   * @param context context for the task
    */
-  public void init(SamzaContainerContext containerContext, TaskContext taskContext) {
-    Preconditions.checkNotNull(containerContext, "null container context.");
-    tableContexts.values().forEach(ctx -> ctx.tableProvider.init(containerContext, taskContext));
+  public void init(Context context) {
+    tableContexts.values().forEach(ctx -> ctx.tableProvider.init(context));
     initialized = true;
   }
 

http://git-wip-us.apache.org/repos/asf/samza/blob/9d2d49e9/samza-core/src/main/java/org/apache/samza/table/caching/CachingTable.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/table/caching/CachingTable.java b/samza-core/src/main/java/org/apache/samza/table/caching/CachingTable.java
index b7aa33c..32d2bed 100644
--- a/samza-core/src/main/java/org/apache/samza/table/caching/CachingTable.java
+++ b/samza-core/src/main/java/org/apache/samza/table/caching/CachingTable.java
@@ -19,25 +19,23 @@
 
 package org.apache.samza.table.caching;
 
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.stream.Collectors;
-
+import com.google.common.base.Preconditions;
 import org.apache.samza.SamzaException;
-import org.apache.samza.container.SamzaContainerContext;
+import org.apache.samza.context.Context;
 import org.apache.samza.storage.kv.Entry;
 import org.apache.samza.table.ReadWriteTable;
 import org.apache.samza.table.ReadableTable;
 import org.apache.samza.table.utils.DefaultTableReadMetrics;
 import org.apache.samza.table.utils.DefaultTableWriteMetrics;
 import org.apache.samza.table.utils.TableMetricsUtil;
-import org.apache.samza.task.TaskContext;
 
-import com.google.common.base.Preconditions;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.stream.Collectors;
 
 
 /**
@@ -91,10 +89,10 @@ public class CachingTable<K, V> implements ReadWriteTable<K, V> {
    * {@inheritDoc}
    */
   @Override
-  public void init(SamzaContainerContext containerContext, TaskContext taskContext) {
-    readMetrics = new DefaultTableReadMetrics(containerContext, taskContext, this, tableId);
-    writeMetrics = new DefaultTableWriteMetrics(containerContext, taskContext, this, tableId);
-    TableMetricsUtil tableMetricsUtil = new TableMetricsUtil(containerContext, taskContext, this, tableId);
+  public void init(Context context) {
+    readMetrics = new DefaultTableReadMetrics(context, this, tableId);
+    writeMetrics = new DefaultTableWriteMetrics(context, this, tableId);
+    TableMetricsUtil tableMetricsUtil = new TableMetricsUtil(context, this, tableId);
     tableMetricsUtil.newGauge("hit-rate", () -> hitRate());
     tableMetricsUtil.newGauge("miss-rate", () -> missRate());
     tableMetricsUtil.newGauge("req-count", () -> requestCount());

http://git-wip-us.apache.org/repos/asf/samza/blob/9d2d49e9/samza-core/src/main/java/org/apache/samza/table/caching/CachingTableProvider.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/table/caching/CachingTableProvider.java b/samza-core/src/main/java/org/apache/samza/table/caching/CachingTableProvider.java
index d5f7767..c959a56 100644
--- a/samza-core/src/main/java/org/apache/samza/table/caching/CachingTableProvider.java
+++ b/samza-core/src/main/java/org/apache/samza/table/caching/CachingTableProvider.java
@@ -54,13 +54,13 @@ public class CachingTableProvider extends BaseTableProvider {
   @Override
   public Table getTable() {
     String realTableId = tableSpec.getConfig().get(REAL_TABLE_ID);
-    ReadableTable table = (ReadableTable) taskContext.getTable(realTableId);
+    ReadableTable table = (ReadableTable) this.context.getTaskContext().getTable(realTableId);
 
     String cacheTableId = tableSpec.getConfig().get(CACHE_TABLE_ID);
     ReadWriteTable cache;
 
     if (cacheTableId != null) {
-      cache = (ReadWriteTable) taskContext.getTable(cacheTableId);
+      cache = (ReadWriteTable) this.context.getTaskContext().getTable(cacheTableId);
     } else {
       cache = createDefaultCacheTable(realTableId);
       defaultCaches.add(cache);
@@ -68,7 +68,7 @@ public class CachingTableProvider extends BaseTableProvider {
 
     boolean isWriteAround = Boolean.parseBoolean(tableSpec.getConfig().get(WRITE_AROUND));
     CachingTable cachingTable = new CachingTable(tableSpec.getId(), table, cache, isWriteAround);
-    cachingTable.init(containerContext, taskContext);
+    cachingTable.init(this.context);
     return cachingTable;
   }
 
@@ -97,7 +97,7 @@ public class CachingTableProvider extends BaseTableProvider {
         readTtlMs, writeTtlMs, cacheSize));
 
     GuavaCacheTable cacheTable = new GuavaCacheTable(tableId + "-def-cache", cacheBuilder.build());
-    cacheTable.init(containerContext, taskContext);
+    cacheTable.init(this.context);
 
     return cacheTable;
   }

http://git-wip-us.apache.org/repos/asf/samza/blob/9d2d49e9/samza-core/src/main/java/org/apache/samza/table/caching/guava/GuavaCacheTable.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/table/caching/guava/GuavaCacheTable.java b/samza-core/src/main/java/org/apache/samza/table/caching/guava/GuavaCacheTable.java
index a8beb3b..5f77ee4 100644
--- a/samza-core/src/main/java/org/apache/samza/table/caching/guava/GuavaCacheTable.java
+++ b/samza-core/src/main/java/org/apache/samza/table/caching/guava/GuavaCacheTable.java
@@ -19,19 +19,17 @@
 
 package org.apache.samza.table.caching.guava;
 
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.CompletableFuture;
-
+import com.google.common.cache.Cache;
 import org.apache.samza.SamzaException;
-import org.apache.samza.container.SamzaContainerContext;
+import org.apache.samza.context.Context;
 import org.apache.samza.storage.kv.Entry;
 import org.apache.samza.table.ReadWriteTable;
 import org.apache.samza.table.utils.TableMetricsUtil;
-import org.apache.samza.task.TaskContext;
 
-import com.google.common.cache.Cache;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
 
 
 /**
@@ -54,8 +52,8 @@ public class GuavaCacheTable<K, V> implements ReadWriteTable<K, V> {
    * {@inheritDoc}
    */
   @Override
-  public void init(SamzaContainerContext containerContext, TaskContext taskContext) {
-    TableMetricsUtil tableMetricsUtil = new TableMetricsUtil(containerContext, taskContext, this, tableId);
+  public void init(Context context) {
+    TableMetricsUtil tableMetricsUtil = new TableMetricsUtil(context, this, tableId);
     // hit- and miss-rate are provided by CachingTable.
     tableMetricsUtil.newGauge("evict-count", () -> cache.stats().evictionCount());
   }

http://git-wip-us.apache.org/repos/asf/samza/blob/9d2d49e9/samza-core/src/main/java/org/apache/samza/table/caching/guava/GuavaCacheTableProvider.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/table/caching/guava/GuavaCacheTableProvider.java b/samza-core/src/main/java/org/apache/samza/table/caching/guava/GuavaCacheTableProvider.java
index 1513249..39f332e 100644
--- a/samza-core/src/main/java/org/apache/samza/table/caching/guava/GuavaCacheTableProvider.java
+++ b/samza-core/src/main/java/org/apache/samza/table/caching/guava/GuavaCacheTableProvider.java
@@ -47,7 +47,7 @@ public class GuavaCacheTableProvider extends BaseTableProvider {
   public Table getTable() {
     Cache guavaCache = SerdeUtils.deserialize(GUAVA_CACHE, tableSpec.getConfig().get(GUAVA_CACHE));
     GuavaCacheTable table = new GuavaCacheTable(tableSpec.getId(), guavaCache);
-    table.init(containerContext, taskContext);
+    table.init(this.context);
     guavaTables.add(table);
     return table;
   }

http://git-wip-us.apache.org/repos/asf/samza/blob/9d2d49e9/samza-core/src/main/java/org/apache/samza/table/remote/RemoteReadWriteTable.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/table/remote/RemoteReadWriteTable.java b/samza-core/src/main/java/org/apache/samza/table/remote/RemoteReadWriteTable.java
index 9ef4c1b..4cbc270 100644
--- a/samza-core/src/main/java/org/apache/samza/table/remote/RemoteReadWriteTable.java
+++ b/samza-core/src/main/java/org/apache/samza/table/remote/RemoteReadWriteTable.java
@@ -19,21 +19,19 @@
 
 package org.apache.samza.table.remote;
 
-import java.util.List;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ExecutorService;
-import java.util.stream.Collectors;
-
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
 import org.apache.samza.SamzaException;
-import org.apache.samza.container.SamzaContainerContext;
+import org.apache.samza.context.Context;
 import org.apache.samza.storage.kv.Entry;
 import org.apache.samza.table.ReadWriteTable;
 import org.apache.samza.table.utils.DefaultTableWriteMetrics;
 import org.apache.samza.table.utils.TableMetricsUtil;
-import org.apache.samza.task.TaskContext;
 
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Preconditions;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import java.util.stream.Collectors;
 
 
 /**
@@ -63,10 +61,10 @@ public class RemoteReadWriteTable<K, V> extends RemoteReadableTable<K, V> implem
    * {@inheritDoc}
    */
   @Override
-  public void init(SamzaContainerContext containerContext, TaskContext taskContext) {
-    super.init(containerContext, taskContext);
-    writeMetrics = new DefaultTableWriteMetrics(containerContext, taskContext, this, tableId);
-    TableMetricsUtil tableMetricsUtil = new TableMetricsUtil(containerContext, taskContext, this, tableId);
+  public void init(Context context) {
+    super.init(context);
+    writeMetrics = new DefaultTableWriteMetrics(context, this, tableId);
+    TableMetricsUtil tableMetricsUtil = new TableMetricsUtil(context, this, tableId);
     writeRateLimiter.setTimerMetric(tableMetricsUtil.newTimer("put-throttle-ns"));
   }
 

http://git-wip-us.apache.org/repos/asf/samza/blob/9d2d49e9/samza-core/src/main/java/org/apache/samza/table/remote/RemoteReadableTable.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/table/remote/RemoteReadableTable.java b/samza-core/src/main/java/org/apache/samza/table/remote/RemoteReadableTable.java
index b3d82f3..9487e39 100644
--- a/samza-core/src/main/java/org/apache/samza/table/remote/RemoteReadableTable.java
+++ b/samza-core/src/main/java/org/apache/samza/table/remote/RemoteReadableTable.java
@@ -19,28 +19,26 @@
 
 package org.apache.samza.table.remote;
 
-import java.util.Collection;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ExecutorService;
-import java.util.function.BiFunction;
-import java.util.function.Function;
-
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
 import org.apache.samza.SamzaException;
-import org.apache.samza.container.SamzaContainerContext;
+import org.apache.samza.context.Context;
 import org.apache.samza.metrics.Timer;
 import org.apache.samza.storage.kv.Entry;
 import org.apache.samza.table.ReadableTable;
 import org.apache.samza.table.utils.DefaultTableReadMetrics;
 import org.apache.samza.table.utils.TableMetricsUtil;
-import org.apache.samza.task.TaskContext;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Preconditions;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import java.util.function.BiFunction;
+import java.util.function.Function;
 
 
 /**
@@ -110,9 +108,9 @@ public class RemoteReadableTable<K, V> implements ReadableTable<K, V> {
    * {@inheritDoc}
    */
   @Override
-  public void init(SamzaContainerContext containerContext, TaskContext taskContext) {
-    readMetrics = new DefaultTableReadMetrics(containerContext, taskContext, this, tableId);
-    TableMetricsUtil tableMetricsUtil = new TableMetricsUtil(containerContext, taskContext, this, tableId);
+  public void init(Context context) {
+    readMetrics = new DefaultTableReadMetrics(context, this, tableId);
+    TableMetricsUtil tableMetricsUtil = new TableMetricsUtil(context, this, tableId);
     readRateLimiter.setTimerMetric(tableMetricsUtil.newTimer("get-throttle-ns"));
   }
 

http://git-wip-us.apache.org/repos/asf/samza/blob/9d2d49e9/samza-core/src/main/java/org/apache/samza/table/remote/RemoteTableProvider.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/table/remote/RemoteTableProvider.java b/samza-core/src/main/java/org/apache/samza/table/remote/RemoteTableProvider.java
index cae0bbd..9415e70 100644
--- a/samza-core/src/main/java/org/apache/samza/table/remote/RemoteTableProvider.java
+++ b/samza-core/src/main/java/org/apache/samza/table/remote/RemoteTableProvider.java
@@ -19,14 +19,6 @@
 
 package org.apache.samza.table.remote;
 
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-
 import org.apache.samza.table.Table;
 import org.apache.samza.table.TableSpec;
 import org.apache.samza.table.retry.RetriableReadFunction;
@@ -37,6 +29,14 @@ import org.apache.samza.table.utils.SerdeUtils;
 import org.apache.samza.table.utils.TableMetricsUtil;
 import org.apache.samza.util.RateLimiter;
 
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+
 import static org.apache.samza.table.remote.RemoteTableDescriptor.RL_READ_TAG;
 import static org.apache.samza.table.remote.RemoteTableDescriptor.RL_WRITE_TAG;
 
@@ -83,7 +83,7 @@ public class RemoteTableProvider extends BaseTableProvider {
     TableReadFunction readFn = getReadFn();
     RateLimiter rateLimiter = deserializeObject(RATE_LIMITER);
     if (rateLimiter != null) {
-      rateLimiter.init(containerContext.config, taskContext);
+      rateLimiter.init(this.context);
     }
     TableRateLimiter.CreditFunction<?, ?> readCreditFn = deserializeObject(READ_CREDIT_FN);
     TableRateLimiter readRateLimiter = new TableRateLimiter(tableSpec.getId(), rateLimiter, readCreditFn, RL_READ_TAG);
@@ -150,7 +150,7 @@ public class RemoteTableProvider extends BaseTableProvider {
           writeRateLimiter, tableExecutors.get(tableId), callbackExecutors.get(tableId));
     }
 
-    TableMetricsUtil metricsUtil = new TableMetricsUtil(containerContext, taskContext, table, tableId);
+    TableMetricsUtil metricsUtil = new TableMetricsUtil(this.context, table, tableId);
     if (readRetryPolicy != null) {
       ((RetriableReadFunction) readFn).setMetrics(metricsUtil);
     }
@@ -158,7 +158,7 @@ public class RemoteTableProvider extends BaseTableProvider {
       ((RetriableWriteFunction) writeFn).setMetrics(metricsUtil);
     }
 
-    table.init(containerContext, taskContext);
+    table.init(this.context);
     tables.add(table);
     return table;
   }
@@ -184,7 +184,7 @@ public class RemoteTableProvider extends BaseTableProvider {
   private TableReadFunction<?, ?> getReadFn() {
     TableReadFunction<?, ?> readFn = deserializeObject(READ_FN);
     if (readFn != null) {
-      readFn.init(containerContext.config, taskContext);
+      readFn.init(this.context);
     }
     return readFn;
   }
@@ -192,7 +192,7 @@ public class RemoteTableProvider extends BaseTableProvider {
   private TableWriteFunction<?, ?> getWriteFn() {
     TableWriteFunction<?, ?> writeFn = deserializeObject(WRITE_FN);
     if (writeFn != null) {
-      writeFn.init(containerContext.config, taskContext);
+      writeFn.init(this.context);
     }
     return writeFn;
   }

http://git-wip-us.apache.org/repos/asf/samza/blob/9d2d49e9/samza-core/src/main/java/org/apache/samza/table/utils/BaseTableProvider.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/table/utils/BaseTableProvider.java b/samza-core/src/main/java/org/apache/samza/table/utils/BaseTableProvider.java
index 960e2a4..dfbd835 100644
--- a/samza-core/src/main/java/org/apache/samza/table/utils/BaseTableProvider.java
+++ b/samza-core/src/main/java/org/apache/samza/table/utils/BaseTableProvider.java
@@ -22,10 +22,9 @@ import java.util.HashMap;
 import java.util.Map;
 import org.apache.samza.config.Config;
 import org.apache.samza.config.JavaTableConfig;
-import org.apache.samza.container.SamzaContainerContext;
+import org.apache.samza.context.Context;
 import org.apache.samza.table.TableProvider;
 import org.apache.samza.table.TableSpec;
-import org.apache.samza.task.TaskContext;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -39,8 +38,7 @@ abstract public class BaseTableProvider implements TableProvider {
 
   final protected TableSpec tableSpec;
 
-  protected SamzaContainerContext containerContext;
-  protected TaskContext taskContext;
+  protected Context context;
 
   public BaseTableProvider(TableSpec tableSpec) {
     this.tableSpec = tableSpec;
@@ -50,9 +48,8 @@ abstract public class BaseTableProvider implements TableProvider {
    * {@inheritDoc}
    */
   @Override
-  public void init(SamzaContainerContext containerContext, TaskContext taskContext) {
-    this.containerContext = containerContext;
-    this.taskContext = taskContext;
+  public void init(Context context) {
+    this.context = context;
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/samza/blob/9d2d49e9/samza-core/src/main/java/org/apache/samza/table/utils/DefaultTableReadMetrics.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/table/utils/DefaultTableReadMetrics.java b/samza-core/src/main/java/org/apache/samza/table/utils/DefaultTableReadMetrics.java
index 2acd082..090c8c1 100644
--- a/samza-core/src/main/java/org/apache/samza/table/utils/DefaultTableReadMetrics.java
+++ b/samza-core/src/main/java/org/apache/samza/table/utils/DefaultTableReadMetrics.java
@@ -18,11 +18,10 @@
  */
 package org.apache.samza.table.utils;
 
-import org.apache.samza.container.SamzaContainerContext;
+import org.apache.samza.context.Context;
 import org.apache.samza.metrics.Counter;
 import org.apache.samza.metrics.Timer;
 import org.apache.samza.table.Table;
-import org.apache.samza.task.TaskContext;
 
 
 /**
@@ -39,14 +38,12 @@ public class DefaultTableReadMetrics {
   /**
    * Constructor based on container and task container context
    *
-   * @param containerContext container context
-   * @param taskContext task context
+   * @param context {@link Context} for this task
    * @param table underlying table
    * @param tableId table Id
    */
-  public DefaultTableReadMetrics(SamzaContainerContext containerContext, TaskContext taskContext,
-      Table table, String tableId) {
-    TableMetricsUtil tableMetricsUtil = new TableMetricsUtil(containerContext, taskContext, table, tableId);
+  public DefaultTableReadMetrics(Context context, Table table, String tableId) {
+    TableMetricsUtil tableMetricsUtil = new TableMetricsUtil(context, table, tableId);
     getNs = tableMetricsUtil.newTimer("get-ns");
     getAllNs = tableMetricsUtil.newTimer("getAll-ns");
     numGets = tableMetricsUtil.newCounter("num-gets");

http://git-wip-us.apache.org/repos/asf/samza/blob/9d2d49e9/samza-core/src/main/java/org/apache/samza/table/utils/DefaultTableWriteMetrics.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/table/utils/DefaultTableWriteMetrics.java b/samza-core/src/main/java/org/apache/samza/table/utils/DefaultTableWriteMetrics.java
index a32d6d5..69d4ef2 100644
--- a/samza-core/src/main/java/org/apache/samza/table/utils/DefaultTableWriteMetrics.java
+++ b/samza-core/src/main/java/org/apache/samza/table/utils/DefaultTableWriteMetrics.java
@@ -18,11 +18,10 @@
  */
 package org.apache.samza.table.utils;
 
-import org.apache.samza.container.SamzaContainerContext;
+import org.apache.samza.context.Context;
 import org.apache.samza.metrics.Counter;
 import org.apache.samza.metrics.Timer;
 import org.apache.samza.table.Table;
-import org.apache.samza.task.TaskContext;
 
 
 public class DefaultTableWriteMetrics {
@@ -43,14 +42,12 @@ public class DefaultTableWriteMetrics {
   /**
    * Utility class that contains the default set of write metrics.
    *
-   * @param containerContext container context
-   * @param taskContext task context
+   * @param context {@link Context} for this task
    * @param table underlying table
    * @param tableId table Id
    */
-  public DefaultTableWriteMetrics(SamzaContainerContext containerContext, TaskContext taskContext,
-      Table table, String tableId) {
-    TableMetricsUtil tableMetricsUtil = new TableMetricsUtil(containerContext, taskContext, table, tableId);
+  public DefaultTableWriteMetrics(Context context, Table table, String tableId) {
+    TableMetricsUtil tableMetricsUtil = new TableMetricsUtil(context, table, tableId);
     putNs = tableMetricsUtil.newTimer("put-ns");
     putAllNs = tableMetricsUtil.newTimer("putAll-ns");
     deleteNs = tableMetricsUtil.newTimer("delete-ns");

http://git-wip-us.apache.org/repos/asf/samza/blob/9d2d49e9/samza-core/src/main/java/org/apache/samza/table/utils/TableMetricsUtil.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/table/utils/TableMetricsUtil.java b/samza-core/src/main/java/org/apache/samza/table/utils/TableMetricsUtil.java
index 6805c64..1b19272 100644
--- a/samza-core/src/main/java/org/apache/samza/table/utils/TableMetricsUtil.java
+++ b/samza-core/src/main/java/org/apache/samza/table/utils/TableMetricsUtil.java
@@ -19,18 +19,16 @@
 
 package org.apache.samza.table.utils;
 
-import java.util.function.Supplier;
-
 import com.google.common.base.Preconditions;
-
-import org.apache.samza.container.SamzaContainerContext;
+import org.apache.samza.context.Context;
 import org.apache.samza.metrics.Counter;
 import org.apache.samza.metrics.Gauge;
 import org.apache.samza.metrics.MetricsRegistry;
 import org.apache.samza.metrics.Timer;
 import org.apache.samza.table.Table;
 import org.apache.samza.table.caching.SupplierGauge;
-import org.apache.samza.task.TaskContext;
+
+import java.util.function.Supplier;
 
 
 /**
@@ -46,21 +44,16 @@ public class TableMetricsUtil {
   /**
    * Constructor based on container context
    *
-   * @param containerContext container context
-   * @param taskContext task context
+   * @param context {@link Context} for this task
    * @param table underlying table
    * @param tableId table Id
    */
-  public TableMetricsUtil(SamzaContainerContext containerContext, TaskContext taskContext,
-      Table table, String tableId) {
-
-    Preconditions.checkNotNull(containerContext);
+  public TableMetricsUtil(Context context, Table table, String tableId) {
+    Preconditions.checkNotNull(context);
     Preconditions.checkNotNull(table);
     Preconditions.checkNotNull(tableId);
 
-    this.metricsRegistry = taskContext == null // The table is at container level, when the task
-        ? containerContext.metricsRegistry     // context passed in is null
-        : taskContext.getMetricsRegistry();
+    this.metricsRegistry = context.getTaskContext().getTaskMetricsRegistry();
     this.groupName = table.getClass().getSimpleName();
     this.tableId = tableId;
   }

http://git-wip-us.apache.org/repos/asf/samza/blob/9d2d49e9/samza-core/src/main/java/org/apache/samza/task/AsyncRunLoop.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/task/AsyncRunLoop.java b/samza-core/src/main/java/org/apache/samza/task/AsyncRunLoop.java
index 111869c..6c255f1 100644
--- a/samza-core/src/main/java/org/apache/samza/task/AsyncRunLoop.java
+++ b/samza-core/src/main/java/org/apache/samza/task/AsyncRunLoop.java
@@ -33,16 +33,15 @@ import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.stream.Collectors;
-
 import org.apache.samza.SamzaException;
 import org.apache.samza.container.SamzaContainerMetrics;
 import org.apache.samza.container.TaskInstance;
 import org.apache.samza.container.TaskInstanceMetrics;
 import org.apache.samza.container.TaskName;
-import org.apache.samza.util.HighResolutionClock;
 import org.apache.samza.system.IncomingMessageEnvelope;
 import org.apache.samza.system.SystemConsumers;
 import org.apache.samza.system.SystemStreamPartition;
+import org.apache.samza.util.HighResolutionClock;
 import org.apache.samza.util.Throttleable;
 import org.apache.samza.util.ThrottlingScheduler;
 import org.slf4j.Logger;
@@ -374,7 +373,7 @@ public class AsyncRunLoop implements Runnable, Throttleable {
         }, commitMs, commitMs, TimeUnit.MILLISECONDS);
       }
 
-      final EpochTimeScheduler epochTimeScheduler = task.context().getTimerScheduler();
+      final EpochTimeScheduler epochTimeScheduler = task.epochTimeScheduler();
       if (epochTimeScheduler != null) {
         epochTimeScheduler.registerListener(() -> {
             state.needScheduler();

http://git-wip-us.apache.org/repos/asf/samza/blob/9d2d49e9/samza-core/src/main/java/org/apache/samza/task/AsyncStreamTaskAdapter.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/task/AsyncStreamTaskAdapter.java b/samza-core/src/main/java/org/apache/samza/task/AsyncStreamTaskAdapter.java
index e2fea95..fcd9766 100644
--- a/samza-core/src/main/java/org/apache/samza/task/AsyncStreamTaskAdapter.java
+++ b/samza-core/src/main/java/org/apache/samza/task/AsyncStreamTaskAdapter.java
@@ -20,7 +20,7 @@
 package org.apache.samza.task;
 
 import java.util.concurrent.ExecutorService;
-import org.apache.samza.config.Config;
+import org.apache.samza.context.Context;
 import org.apache.samza.system.IncomingMessageEnvelope;
 
 
@@ -40,9 +40,9 @@ public class AsyncStreamTaskAdapter implements AsyncStreamTask, InitableTask, Wi
   }
 
   @Override
-  public void init(Config config, TaskContext context) throws Exception {
+  public void init(Context context) throws Exception {
     if (wrappedTask instanceof InitableTask) {
-      ((InitableTask) wrappedTask).init(config, context);
+      ((InitableTask) wrappedTask).init(context);
     }
   }
 

http://git-wip-us.apache.org/repos/asf/samza/blob/9d2d49e9/samza-core/src/main/java/org/apache/samza/task/StreamOperatorTask.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/task/StreamOperatorTask.java b/samza-core/src/main/java/org/apache/samza/task/StreamOperatorTask.java
index aa896c2..218ba5d 100644
--- a/samza-core/src/main/java/org/apache/samza/task/StreamOperatorTask.java
+++ b/samza-core/src/main/java/org/apache/samza/task/StreamOperatorTask.java
@@ -18,14 +18,13 @@
  */
 package org.apache.samza.task;
 
-import org.apache.samza.config.Config;
+import org.apache.samza.context.Context;
 import org.apache.samza.operators.OperatorSpecGraph;
-import org.apache.samza.system.EndOfStreamMessage;
-import org.apache.samza.system.MessageType;
-import org.apache.samza.operators.ContextManager;
 import org.apache.samza.operators.impl.InputOperatorImpl;
 import org.apache.samza.operators.impl.OperatorImplGraph;
+import org.apache.samza.system.EndOfStreamMessage;
 import org.apache.samza.system.IncomingMessageEnvelope;
+import org.apache.samza.system.MessageType;
 import org.apache.samza.system.SystemStream;
 import org.apache.samza.system.WatermarkMessage;
 import org.apache.samza.util.Clock;
@@ -42,8 +41,6 @@ public class StreamOperatorTask implements StreamTask, InitableTask, WindowableT
   private static final Logger LOG = LoggerFactory.getLogger(StreamOperatorTask.class);
 
   private final OperatorSpecGraph specGraph;
-  // TODO: to be replaced by proper scope of shared context factory in SAMZA-1714
-  private final ContextManager contextManager;
   private final Clock clock;
 
   private OperatorImplGraph operatorImplGraph;
@@ -52,17 +49,15 @@ public class StreamOperatorTask implements StreamTask, InitableTask, WindowableT
    * Constructs an adaptor task to run the user-implemented {@link OperatorSpecGraph}.
    * @param specGraph the serialized version of user-implemented {@link OperatorSpecGraph}
    *                  that includes the logical DAG
-   * @param contextManager the {@link ContextManager} used to set up the shared context used by operators in the DAG
    * @param clock the {@link Clock} to use for time-keeping
    */
-  public StreamOperatorTask(OperatorSpecGraph specGraph, ContextManager contextManager, Clock clock) {
+  public StreamOperatorTask(OperatorSpecGraph specGraph, Clock clock) {
     this.specGraph = specGraph.clone();
-    this.contextManager = contextManager;
     this.clock = clock;
   }
 
-  public StreamOperatorTask(OperatorSpecGraph specGraph, ContextManager contextManager) {
-    this(specGraph, contextManager, SystemClock.instance());
+  public StreamOperatorTask(OperatorSpecGraph specGraph) {
+    this(specGraph, SystemClock.instance());
   }
 
   /**
@@ -75,20 +70,13 @@ public class StreamOperatorTask implements StreamTask, InitableTask, WindowableT
    * an immutable {@link OperatorSpecGraph} accordingly, which is passed in to this class to create the {@link OperatorImplGraph}
    * corresponding to the logical DAG.
    *
-   * @param config allows accessing of fields in the configuration files that this StreamTask is specified in
    * @param context allows initializing and accessing contextual data of this StreamTask
    * @throws Exception in case of initialization errors
    */
   @Override
-  public final void init(Config config, TaskContext context) throws Exception {
-
-    // get the user-implemented per task context manager and initialize it
-    if (this.contextManager != null) {
-      this.contextManager.init(config, context);
-    }
-
+  public final void init(Context context) throws Exception {
     // create the operator impl DAG corresponding to the logical operator spec DAG
-    this.operatorImplGraph = new OperatorImplGraph(specGraph, config, context, clock);
+    this.operatorImplGraph = new OperatorImplGraph(specGraph, context, clock);
   }
 
   /**
@@ -133,9 +121,6 @@ public class StreamOperatorTask implements StreamTask, InitableTask, WindowableT
 
   @Override
   public void close() throws Exception {
-    if (this.contextManager != null) {
-      this.contextManager.close();
-    }
     if (operatorImplGraph != null) {
       operatorImplGraph.close();
     }

http://git-wip-us.apache.org/repos/asf/samza/blob/9d2d49e9/samza-core/src/main/java/org/apache/samza/task/TaskFactoryUtil.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/task/TaskFactoryUtil.java b/samza-core/src/main/java/org/apache/samza/task/TaskFactoryUtil.java
index 834777b..c312fac 100644
--- a/samza-core/src/main/java/org/apache/samza/task/TaskFactoryUtil.java
+++ b/samza-core/src/main/java/org/apache/samza/task/TaskFactoryUtil.java
@@ -48,8 +48,8 @@ public class TaskFactoryUtil {
     if (appDesc instanceof TaskApplicationDescriptorImpl) {
       return ((TaskApplicationDescriptorImpl) appDesc).getTaskFactory();
     } else if (appDesc instanceof StreamApplicationDescriptorImpl) {
-      return (StreamTaskFactory) () -> new StreamOperatorTask(((StreamApplicationDescriptorImpl) appDesc).getOperatorSpecGraph(),
-          ((StreamApplicationDescriptorImpl) appDesc).getContextManager());
+      return (StreamTaskFactory) () -> new StreamOperatorTask(
+          ((StreamApplicationDescriptorImpl) appDesc).getOperatorSpecGraph());
     }
     throw new IllegalArgumentException(String.format("ApplicationDescriptorImpl has to be either TaskApplicationDescriptorImpl or "
         + "StreamApplicationDescriptorImpl. class %s is not supported", appDesc.getClass().getName()));

http://git-wip-us.apache.org/repos/asf/samza/blob/9d2d49e9/samza-core/src/main/java/org/apache/samza/util/EmbeddedTaggedRateLimiter.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/util/EmbeddedTaggedRateLimiter.java b/samza-core/src/main/java/org/apache/samza/util/EmbeddedTaggedRateLimiter.java
index 1cf9a9c..a91d663 100644
--- a/samza-core/src/main/java/org/apache/samza/util/EmbeddedTaggedRateLimiter.java
+++ b/samza-core/src/main/java/org/apache/samza/util/EmbeddedTaggedRateLimiter.java
@@ -18,21 +18,20 @@
  */
 package org.apache.samza.util;
 
+import com.google.common.base.Preconditions;
+import com.google.common.base.Stopwatch;
+import org.apache.commons.lang3.tuple.ImmutablePair;
+import org.apache.samza.container.TaskName;
+import org.apache.samza.context.Context;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import java.util.Collections;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
 
-import org.apache.commons.lang3.tuple.ImmutablePair;
-import org.apache.samza.config.Config;
-import org.apache.samza.task.TaskContext;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.common.base.Preconditions;
-import com.google.common.base.Stopwatch;
-
 import static java.util.concurrent.TimeUnit.NANOSECONDS;
 
 
@@ -106,16 +105,15 @@ public class EmbeddedTaggedRateLimiter implements RateLimiter {
   }
 
   @Override
-  public void init(Config config, TaskContext taskContext) {
+  public void init(Context context) {
     this.tagToRateLimiterMap = Collections.unmodifiableMap(tagToTargetRateMap.entrySet().stream()
         .map(e -> {
             String tag = e.getKey();
-            int effectiveRate = e.getValue();
-            if (taskContext != null) {
-              effectiveRate /= taskContext.getSamzaContainerContext().taskNames.size();
-              LOGGER.info(String.format("Effective rate limit for task %s and tag %s is %d",
-                  taskContext.getTaskName(), tag, effectiveRate));
-            }
+            int numTasksInContainer = context.getContainerContext().getContainerModel().getTasks().keySet().size();
+            int effectiveRate = e.getValue() / numTasksInContainer;
+            TaskName taskName = context.getTaskContext().getTaskModel().getTaskName();
+            LOGGER.info(String.format("Effective rate limit for task %s and tag %s is %d", taskName, tag,
+                effectiveRate));
             return new ImmutablePair<>(tag, com.google.common.util.concurrent.RateLimiter.create(effectiveRate));
           })
         .collect(Collectors.toMap(ImmutablePair::getKey, ImmutablePair::getValue))

http://git-wip-us.apache.org/repos/asf/samza/blob/9d2d49e9/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
index 3c10aae..3292986 100644
--- a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
+++ b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
@@ -43,6 +43,7 @@ import org.apache.samza.config._
 import org.apache.samza.container.disk.DiskSpaceMonitor.Listener
 import org.apache.samza.container.disk.{DiskQuotaPolicyFactory, DiskSpaceMonitor, NoThrottlingDiskQuotaPolicyFactory, PollingScanDiskSpaceMonitor}
 import org.apache.samza.container.host.{StatisticsMonitorImpl, SystemMemoryStatistics, SystemStatisticsMonitor}
+import org.apache.samza.context._
 import org.apache.samza.job.model.{ContainerModel, JobModel}
 import org.apache.samza.metrics.{JmxServer, JvmMetrics, MetricsRegistryMap, MetricsReporter}
 import org.apache.samza.serializers._
@@ -122,9 +123,13 @@ object SamzaContainer extends Logging {
   def apply(
     containerId: String,
     jobModel: JobModel,
-    config: Config,
     customReporters: Map[String, MetricsReporter] = Map[String, MetricsReporter](),
-    taskFactory: TaskFactory[_]) = {
+    taskFactory: TaskFactory[_],
+    jobContext: JobContext,
+    applicationContainerContextFactoryOption: Option[ApplicationContainerContextFactory[ApplicationContainerContext]],
+    applicationTaskContextFactoryOption: Option[ApplicationTaskContextFactory[ApplicationTaskContext]]
+  ) = {
+    val config = jobContext.getConfig
     val containerModel = jobModel.getContainers.get(containerId)
     val containerName = "samza-container-%s" format containerId
     val maxChangeLogStreamPartitions = jobModel.maxChangeLogStreamPartitions
@@ -488,8 +493,10 @@ object SamzaContainer extends Logging {
       .asScala
       .map(_.getTaskName)
       .toSet
-    val containerContext = new SamzaContainerContext(containerId, config, taskNames.asJava, samzaContainerMetrics.registry)
 
+    val containerContext = new ContainerContextImpl(containerModel, samzaContainerMetrics.registry)
+    val applicationContainerContextOption = applicationContainerContextFactoryOption
+      .map(_.create(jobContext, containerContext))
 
     val storeWatchPaths = new util.HashSet[Path]()
 
@@ -571,6 +578,7 @@ object SamzaContainer extends Logging {
               collector,
               taskInstanceMetrics.registry,
               changeLogSystemStreamPartition,
+              jobContext,
               containerContext)
             (storeName, storageEngine)
         }
@@ -635,13 +643,11 @@ object SamzaContainer extends Logging {
 
       def createTaskInstance(task: Any): TaskInstance = new TaskInstance(
           task = task,
-          taskName = taskName,
-          config = config,
+          taskModel = taskModel,
           metrics = taskInstanceMetrics,
           systemAdmins = systemAdmins,
           consumerMultiplexer = consumerMultiplexer,
           collector = collector,
-          containerContext = containerContext,
           offsetManager = offsetManager,
           storageManager = storageManager,
           tableManager = tableManager,
@@ -652,7 +658,11 @@ object SamzaContainer extends Logging {
           streamMetadataCache = streamMetadataCache,
           timerExecutor = timerExecutor,
           sideInputSSPs = taskSideInputSSPs,
-          sideInputStorageManager = sideInputStorageManager)
+          sideInputStorageManager = sideInputStorageManager,
+          jobContext = jobContext,
+          containerContext = containerContext,
+          applicationContainerContextOption = applicationContainerContextOption,
+          applicationTaskContextFactoryOption = applicationTaskContextFactoryOption)
 
       val taskInstance = createTaskInstance(task)
 
@@ -708,7 +718,7 @@ object SamzaContainer extends Logging {
     info("Samza container setup complete.")
 
     new SamzaContainer(
-      containerContext = containerContext,
+      config = config,
       taskInstances = taskInstances,
       runLoop = runLoop,
       systemAdmins = systemAdmins,
@@ -722,10 +732,11 @@ object SamzaContainer extends Logging {
       diskSpaceMonitor = diskSpaceMonitor,
       hostStatisticsMonitor = memoryStatisticsMonitor,
       taskThreadPool = taskThreadPool,
-      timerExecutor = timerExecutor)
+      timerExecutor = timerExecutor,
+      containerContext = containerContext,
+      applicationContainerContextOption = applicationContainerContextOption)
   }
 
-
   /**
     * Builds the set of SSPs for all changelogs on this container.
     */
@@ -741,7 +752,7 @@ object SamzaContainer extends Logging {
 }
 
 class SamzaContainer(
-  containerContext: SamzaContainerContext,
+  config: Config,
   taskInstances: Map[TaskName, TaskInstance],
   runLoop: Runnable,
   systemAdmins: SystemAdmins,
@@ -756,12 +767,14 @@ class SamzaContainer(
   reporters: Map[String, MetricsReporter] = Map(),
   jvm: JvmMetrics = null,
   taskThreadPool: ExecutorService = null,
-  timerExecutor: ScheduledExecutorService = Executors.newSingleThreadScheduledExecutor) extends Runnable with Logging {
+  timerExecutor: ScheduledExecutorService = Executors.newSingleThreadScheduledExecutor,
+  containerContext: ContainerContext,
+  applicationContainerContextOption: Option[ApplicationContainerContext]) extends Runnable with Logging {
 
-  val shutdownMs = containerContext.config.getShutdownMs.getOrElse(TaskConfigJava.DEFAULT_TASK_SHUTDOWN_MS)
+  val shutdownMs = config.getShutdownMs.getOrElse(TaskConfigJava.DEFAULT_TASK_SHUTDOWN_MS)
   var shutdownHookThread: Thread = null
   var jmxServer: JmxServer = null
-  val isAutoCommitEnabled = containerContext.config.isAutoCommitEnabled
+  val isAutoCommitEnabled = config.isAutoCommitEnabled
 
   @volatile private var status = SamzaContainerStatus.NOT_STARTED
   private var exceptionSeen: Throwable = null
@@ -789,6 +802,7 @@ class SamzaContainer(
       status = SamzaContainerStatus.STARTING
 
       jmxServer = new JmxServer()
+      applicationContainerContextOption.foreach(_.start)
 
       startMetrics
       startDiagnostics
@@ -841,6 +855,8 @@ class SamzaContainer(
       shutdownSecurityManger
       shutdownAdmins
 
+      applicationContainerContextOption.foreach(_.stop)
+
       if (!status.equals(SamzaContainerStatus.FAILED)) {
         status = SamzaContainerStatus.STOPPED
       }
@@ -930,18 +946,18 @@ class SamzaContainer(
   }
 
   def startDiagnostics {
-    if (containerContext.config.getDiagnosticsEnabled) {
+    if (config.getDiagnosticsEnabled) {
       info("Starting diagnostics.")
 
       try {
-        val diagnosticsAppender = Class.forName(containerContext.config.getDiagnosticsAppenderClass).
+        val diagnosticsAppender = Class.forName(config.getDiagnosticsAppenderClass).
           getDeclaredConstructor(classOf[SamzaContainerMetrics]).newInstance(this.metrics);
       }
       catch {
         case e@(_: ClassNotFoundException | _: InstantiationException | _: InvocationTargetException) => {
           error("Failed to instantiate diagnostic appender", e)
           throw new ConfigException("Failed to instantiate diagnostic appender class " +
-            containerContext.config.getDiagnosticsAppenderClass, e)
+            config.getDiagnosticsAppenderClass, e)
         }
       }
     }
@@ -958,24 +974,25 @@ class SamzaContainer(
   }
 
   def storeContainerLocality {
-    val isHostAffinityEnabled: Boolean = new ClusterManagerConfig(containerContext.config).getHostAffinityEnabled
+    val isHostAffinityEnabled: Boolean = new ClusterManagerConfig(config).getHostAffinityEnabled
     if (isHostAffinityEnabled) {
-      val localityManager: LocalityManager = new LocalityManager(containerContext.config, containerContext.metricsRegistry)
-      val containerName = "SamzaContainer-" + String.valueOf(containerContext.id)
+      val localityManager: LocalityManager = new LocalityManager(config, containerContext.getContainerMetricsRegistry)
+      val containerId = containerContext.getContainerModel.getId
+      val containerName = "SamzaContainer-" + containerId
       info("Registering %s with metadata store" format containerName)
       try {
         val hostInet = Util.getLocalHost
         val jmxUrl = if (jmxServer != null) jmxServer.getJmxUrl else ""
         val jmxTunnelingUrl = if (jmxServer != null) jmxServer.getTunnelingJmxUrl else ""
         info("Writing container locality and JMX address to metadata store")
-        localityManager.writeContainerToHostMapping(containerContext.id, hostInet.getHostName)
+        localityManager.writeContainerToHostMapping(containerId, hostInet.getHostName)
       } catch {
         case uhe: UnknownHostException =>
           warn("Received UnknownHostException when persisting locality info for container %s: " +
-            "%s" format (containerContext.id, uhe.getMessage))  //No-op
+            "%s" format (containerId, uhe.getMessage))  //No-op
         case unknownException: Throwable =>
           warn("Received an exception when persisting locality info for container %s: " +
-            "%s" format (containerContext.id, unknownException.getMessage))
+            "%s" format (containerId, unknownException.getMessage))
       } finally {
         info("Shutting down locality manager.")
         localityManager.close()
@@ -1016,7 +1033,6 @@ class SamzaContainer(
     systemAdmins.start
   }
 
-
   def startProducers {
     info("Registering task instances with producers.")
 
@@ -1092,7 +1108,6 @@ class SamzaContainer(
     systemAdmins.stop
   }
 
-
   def shutdownProducers {
     info("Shutting down producer multiplexer.")
 
@@ -1185,4 +1200,4 @@ class SamzaContainer(
       hostStatisticsMonitor.stop()
     }
   }
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/samza/blob/9d2d49e9/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala b/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala
index 9f4fd17..f8e9c63 100644
--- a/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala
+++ b/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala
@@ -20,15 +20,17 @@
 package org.apache.samza.container
 
 
+import java.util.Optional
 import java.util.concurrent.ScheduledExecutorService
 
 import org.apache.samza.SamzaException
 import org.apache.samza.checkpoint.OffsetManager
 import org.apache.samza.config.Config
 import org.apache.samza.config.StreamConfig.Config2Stream
-import org.apache.samza.job.model.JobModel
+import org.apache.samza.context._
+import org.apache.samza.job.model.{JobModel, TaskModel}
 import org.apache.samza.metrics.MetricsReporter
-import org.apache.samza.scheduler.ScheduledCallback
+import org.apache.samza.scheduler.{CallbackSchedulerImpl, ScheduledCallback}
 import org.apache.samza.storage.kv.KeyValueStore
 import org.apache.samza.storage.{TaskSideInputStorageManager, TaskStorageManager}
 import org.apache.samza.system._
@@ -36,19 +38,17 @@ import org.apache.samza.table.TableManager
 import org.apache.samza.task._
 import org.apache.samza.util.{Logging, ScalaJavaUtil}
 
-import scala.collection.JavaConverters._
 import scala.collection.JavaConversions._
+import scala.collection.JavaConverters._
 import scala.collection.Map
 
 class TaskInstance(
   val task: Any,
-  val taskName: TaskName,
-  config: Config,
+  taskModel: TaskModel,
   val metrics: TaskInstanceMetrics,
   systemAdmins: SystemAdmins,
   consumerMultiplexer: SystemConsumers,
   collector: TaskInstanceCollector,
-  containerContext: SamzaContainerContext,
   val offsetManager: OffsetManager = new OffsetManager,
   storageManager: TaskStorageManager = null,
   tableManager: TableManager = null,
@@ -59,15 +59,23 @@ class TaskInstance(
   streamMetadataCache: StreamMetadataCache = null,
   timerExecutor : ScheduledExecutorService = null,
   sideInputSSPs: Set[SystemStreamPartition] = Set(),
-  sideInputStorageManager: TaskSideInputStorageManager = null) extends Logging {
-
+  sideInputStorageManager: TaskSideInputStorageManager = null,
+  jobContext: JobContext,
+  containerContext: ContainerContext,
+  applicationContainerContextOption: Option[ApplicationContainerContext],
+  applicationTaskContextFactoryOption: Option[ApplicationTaskContextFactory[ApplicationTaskContext]]
+) extends Logging {
+
+  val taskName: TaskName = taskModel.getTaskName
   val isInitableTask = task.isInstanceOf[InitableTask]
   val isWindowableTask = task.isInstanceOf[WindowableTask]
   val isEndOfStreamListenerTask = task.isInstanceOf[EndOfStreamListenerTask]
   val isClosableTask = task.isInstanceOf[ClosableTask]
   val isAsyncTask = task.isInstanceOf[AsyncStreamTask]
 
-  val kvStoreSupplier = ScalaJavaUtil.toJavaFunction(
+  val epochTimeScheduler: EpochTimeScheduler = EpochTimeScheduler.create(timerExecutor)
+
+  private val kvStoreSupplier = ScalaJavaUtil.toJavaFunction(
     (storeName: String) => {
       if (storageManager != null && storageManager.getStore(storeName).isDefined) {
         storageManager.getStore(storeName).get.asInstanceOf[KeyValueStore[_, _]]
@@ -77,9 +85,14 @@ class TaskInstance(
         null
       }
     })
-
-  val context = new TaskContextImpl(taskName, metrics, containerContext, systemStreamPartitions.asJava, offsetManager,
-                                    kvStoreSupplier, tableManager, jobModel, streamMetadataCache, timerExecutor)
+  private val taskContext = new TaskContextImpl(taskModel, metrics.registry, kvStoreSupplier, tableManager,
+    new CallbackSchedulerImpl(epochTimeScheduler), offsetManager, jobModel, streamMetadataCache)
+  // need separate field for this instead of using it through Context, since Context throws an exception if it is null
+  private val applicationTaskContextOption = applicationTaskContextFactoryOption.map(_.create(jobContext,
+    containerContext, taskContext, applicationContainerContextOption.orNull))
+  val context = new ContextImpl(jobContext, containerContext, taskContext,
+    Optional.ofNullable(applicationContainerContextOption.orNull),
+    Optional.ofNullable(applicationTaskContextOption.orNull))
 
   // store the (ssp -> if this ssp has caught up) mapping. "caught up"
   // means the same ssp in other taskInstances have the same offset as
@@ -88,6 +101,8 @@ class TaskInstance(
     scala.collection.mutable.Map[SystemStreamPartition, Boolean]()
   systemStreamPartitions.foreach(ssp2CaughtupMapping += _ -> false)
 
+  private val config: Config = jobContext.getConfig
+
   val intermediateStreams: Set[String] = config.getStreamIds.filter(config.getIsIntermediateStream).toSet
 
   val streamsToDeleteCommittedMessages: Set[String] = config.getStreamIds.filter(config.getDeleteCommittedMessages).map(config.getPhysicalName).toSet
@@ -126,7 +141,7 @@ class TaskInstance(
     if (tableManager != null) {
       debug("Starting table manager for taskName: %s" format taskName)
 
-      tableManager.init(containerContext, context)
+      tableManager.init(context)
     } else {
       debug("Skipping table manager initialization for taskName: %s" format taskName)
     }
@@ -136,10 +151,14 @@ class TaskInstance(
     if (isInitableTask) {
       debug("Initializing task for taskName: %s" format taskName)
 
-      task.asInstanceOf[InitableTask].init(config, context)
+      task.asInstanceOf[InitableTask].init(context)
     } else {
       debug("Skipping task initialization for taskName: %s" format taskName)
     }
+    applicationTaskContextOption.foreach(applicationTaskContext => {
+      debug("Starting application-defined task context for taskName: %s" format taskName)
+      applicationTaskContext.start()
+    })
   }
 
   def registerProducers {
@@ -226,7 +245,7 @@ class TaskInstance(
     trace("Scheduler for taskName: %s" format taskName)
 
     exceptionHandler.maybeHandle {
-      context.getTimerScheduler.removeReadyTimers().entrySet().foreach { entry =>
+      epochTimeScheduler.removeReadyTimers().entrySet().foreach { entry =>
         entry.getValue.asInstanceOf[ScheduledCallback[Any]].onCallback(entry.getKey.getKey, collector, coordinator)
       }
     }
@@ -266,6 +285,10 @@ class TaskInstance(
   }
 
   def shutdownTask {
+    applicationTaskContextOption.foreach(applicationTaskContext => {
+      debug("Stopping application-defined task context for taskName: %s" format taskName)
+      applicationTaskContext.stop()
+    })
     if (task.isInstanceOf[ClosableTask]) {
       debug("Shutting down stream task for taskName: %s" format taskName)
 

http://git-wip-us.apache.org/repos/asf/samza/blob/9d2d49e9/samza-core/src/main/scala/org/apache/samza/job/local/ThreadJobFactory.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/job/local/ThreadJobFactory.scala b/samza-core/src/main/scala/org/apache/samza/job/local/ThreadJobFactory.scala
index bec4ec0..929d6a4 100644
--- a/samza-core/src/main/scala/org/apache/samza/job/local/ThreadJobFactory.scala
+++ b/samza-core/src/main/scala/org/apache/samza/job/local/ThreadJobFactory.scala
@@ -24,6 +24,7 @@ import org.apache.samza.config.JobConfig._
 import org.apache.samza.config.ShellCommandConfig._
 import org.apache.samza.config.{Config, TaskConfigJava}
 import org.apache.samza.container.{SamzaContainer, SamzaContainerListener, TaskName}
+import org.apache.samza.context.JobContextImpl
 import org.apache.samza.coordinator.JobModelManager
 import org.apache.samza.coordinator.stream.CoordinatorStreamManager
 import org.apache.samza.job.{StreamJob, StreamJobFactory}
@@ -112,9 +113,12 @@ class ThreadJobFactory extends StreamJobFactory with Logging {
       val container = SamzaContainer(
         containerId,
         jobModel,
-        config,
         Map[String, MetricsReporter](),
-        taskFactory)
+        taskFactory,
+        JobContextImpl.fromConfigWithDefaults(config),
+        Option(appDesc.getApplicationContainerContextFactory.orElse(null)),
+        Option(appDesc.getApplicationTaskContextFactory.orElse(null))
+      )
       container.setContainerListener(containerListener)
 
       val threadJob = new ThreadJob(container)