You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by pm...@apache.org on 2018/10/10 22:34:25 UTC

[2/5] samza git commit: SAMZA-1714: Creating shared context factory for shared context objects

http://git-wip-us.apache.org/repos/asf/samza/blob/9d2d49e9/samza-core/src/test/java/org/apache/samza/table/remote/TestRemoteTableDescriptor.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/table/remote/TestRemoteTableDescriptor.java b/samza-core/src/test/java/org/apache/samza/table/remote/TestRemoteTableDescriptor.java
index efe1acf..f587885 100644
--- a/samza-core/src/test/java/org/apache/samza/table/remote/TestRemoteTableDescriptor.java
+++ b/samza-core/src/test/java/org/apache/samza/table/remote/TestRemoteTableDescriptor.java
@@ -19,13 +19,12 @@
 
 package org.apache.samza.table.remote;
 
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.concurrent.ThreadPoolExecutor;
-
-import org.apache.samza.container.SamzaContainerContext;
+import com.google.common.collect.ImmutableMap;
 import org.apache.samza.container.TaskName;
+import org.apache.samza.context.Context;
+import org.apache.samza.context.MockContext;
+import org.apache.samza.job.model.ContainerModel;
+import org.apache.samza.job.model.TaskModel;
 import org.apache.samza.metrics.Counter;
 import org.apache.samza.metrics.MetricsRegistry;
 import org.apache.samza.metrics.Timer;
@@ -34,18 +33,22 @@ import org.apache.samza.table.TableSpec;
 import org.apache.samza.table.retry.RetriableReadFunction;
 import org.apache.samza.table.retry.RetriableWriteFunction;
 import org.apache.samza.table.retry.TableRetryPolicy;
-import org.apache.samza.task.TaskContext;
 import org.apache.samza.util.EmbeddedTaggedRateLimiter;
 import org.apache.samza.util.RateLimiter;
 import org.junit.Assert;
 import org.junit.Test;
 
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ThreadPoolExecutor;
+
 import static org.apache.samza.table.remote.RemoteTableDescriptor.RL_READ_TAG;
 import static org.apache.samza.table.remote.RemoteTableDescriptor.RL_WRITE_TAG;
 import static org.mockito.Matchers.anyString;
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.when;
 
 
 public class TestRemoteTableDescriptor {
@@ -117,16 +120,24 @@ public class TestRemoteTableDescriptor {
     desc.getTableSpec();
   }
 
-  private TaskContext createMockTaskContext() {
+  private Context createMockContext() {
+    Context context = new MockContext();
+
     MetricsRegistry metricsRegistry = mock(MetricsRegistry.class);
     doReturn(mock(Timer.class)).when(metricsRegistry).newTimer(anyString(), anyString());
     doReturn(mock(Counter.class)).when(metricsRegistry).newCounter(anyString(), anyString());
-    TaskContext taskContext = mock(TaskContext.class);
-    doReturn(metricsRegistry).when(taskContext).getMetricsRegistry();
-    SamzaContainerContext containerCtx = new SamzaContainerContext(
-        "1", null, Collections.singleton(new TaskName("MyTask")), null);
-    doReturn(containerCtx).when(taskContext).getSamzaContainerContext();
-    return taskContext;
+    doReturn(metricsRegistry).when(context.getTaskContext()).getTaskMetricsRegistry();
+
+    TaskName taskName = new TaskName("MyTask");
+    TaskModel taskModel = mock(TaskModel.class);
+    when(taskModel.getTaskName()).thenReturn(taskName);
+    when(context.getTaskContext().getTaskModel()).thenReturn(taskModel);
+
+    ContainerModel containerModel = mock(ContainerModel.class);
+    when(containerModel.getTasks()).thenReturn(ImmutableMap.of(taskName, taskModel));
+    when(context.getContainerContext().getContainerModel()).thenReturn(containerModel);
+
+    return context;
   }
 
   static class CountingCreditFunction<K, V> implements TableRateLimiter.CreditFunction<K, V> {
@@ -172,7 +183,7 @@ public class TestRemoteTableDescriptor {
 
     TableSpec spec = desc.getTableSpec();
     RemoteTableProvider provider = new RemoteTableProvider(spec);
-    provider.init(mock(SamzaContainerContext.class), createMockTaskContext());
+    provider.init(createMockContext());
     Table table = provider.getTable();
     Assert.assertTrue(table instanceof RemoteReadWriteTable);
     RemoteReadWriteTable rwTable = (RemoteReadWriteTable) table;

http://git-wip-us.apache.org/repos/asf/samza/blob/9d2d49e9/samza-core/src/test/java/org/apache/samza/table/retry/TestRetriableTableFunctions.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/table/retry/TestRetriableTableFunctions.java b/samza-core/src/test/java/org/apache/samza/table/retry/TestRetriableTableFunctions.java
index 9dd5a74..050ea55 100644
--- a/samza-core/src/test/java/org/apache/samza/table/retry/TestRetriableTableFunctions.java
+++ b/samza-core/src/test/java/org/apache/samza/table/retry/TestRetriableTableFunctions.java
@@ -29,19 +29,16 @@ import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
-
-import org.apache.samza.container.SamzaContainerContext;
+import junit.framework.Assert;
+import org.apache.samza.context.Context;
 import org.apache.samza.storage.kv.Entry;
 import org.apache.samza.table.Table;
 import org.apache.samza.table.remote.TableReadFunction;
 import org.apache.samza.table.remote.TableWriteFunction;
 import org.apache.samza.table.remote.TestRemoteTable;
 import org.apache.samza.table.utils.TableMetricsUtil;
-import org.apache.samza.task.TaskContext;
 import org.junit.Test;
 
-import junit.framework.Assert;
-
 import static org.mockito.Matchers.any;
 import static org.mockito.Matchers.anyString;
 import static org.mockito.Mockito.atLeast;
@@ -57,9 +54,8 @@ public class TestRetriableTableFunctions {
 
   public TableMetricsUtil getMetricsUtil(String tableId) {
     Table table = mock(Table.class);
-    SamzaContainerContext cntCtx = mock(SamzaContainerContext.class);
-    TaskContext taskCtx = TestRemoteTable.getMockTaskContext();
-    return new TableMetricsUtil(cntCtx, taskCtx, table, tableId);
+    Context context = TestRemoteTable.getMockContext();
+    return new TableMetricsUtil(context, table, tableId);
   }
 
   @Test

http://git-wip-us.apache.org/repos/asf/samza/blob/9d2d49e9/samza-core/src/test/java/org/apache/samza/task/IdentityStreamTask.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/task/IdentityStreamTask.java b/samza-core/src/test/java/org/apache/samza/task/IdentityStreamTask.java
index 1f71abd..13ce5f4 100644
--- a/samza-core/src/test/java/org/apache/samza/task/IdentityStreamTask.java
+++ b/samza-core/src/test/java/org/apache/samza/task/IdentityStreamTask.java
@@ -20,19 +20,21 @@
 package org.apache.samza.task;
 
 import org.apache.samza.config.Config;
+import org.apache.samza.context.Context;
 import org.apache.samza.system.IncomingMessageEnvelope;
 import org.apache.samza.system.OutgoingMessageEnvelope;
 import org.apache.samza.system.SystemStream;
 
 
-public class IdentityStreamTask implements StreamTask , InitableTask  {
+public class IdentityStreamTask implements StreamTask, InitableTask  {
   private int processedMessageCount = 0;
   private int expectedMessageCount;
   private String outputTopic;
   private String outputSystem;
 
   @Override
-  public void init(Config config, TaskContext taskContext) throws Exception {
+  public void init(Context context) throws Exception {
+    Config config = context.getJobContext().getConfig();
     this.expectedMessageCount = config.getInt("app.messageCount");
     this.outputTopic = config.get("app.outputTopic", "output");
     this.outputSystem = config.get("app.outputSystem", "test-system");

http://git-wip-us.apache.org/repos/asf/samza/blob/9d2d49e9/samza-core/src/test/java/org/apache/samza/task/TestAsyncRunLoop.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/task/TestAsyncRunLoop.java b/samza-core/src/test/java/org/apache/samza/task/TestAsyncRunLoop.java
index 9cdbfe6..be8d344 100644
--- a/samza-core/src/test/java/org/apache/samza/task/TestAsyncRunLoop.java
+++ b/samza-core/src/test/java/org/apache/samza/task/TestAsyncRunLoop.java
@@ -28,17 +28,17 @@ import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.atomic.AtomicInteger;
-
 import org.apache.samza.Partition;
 import org.apache.samza.checkpoint.Checkpoint;
 import org.apache.samza.checkpoint.OffsetManager;
-import org.apache.samza.config.Config;
-import org.apache.samza.container.SamzaContainerContext;
 import org.apache.samza.container.SamzaContainerMetrics;
 import org.apache.samza.container.TaskInstance;
 import org.apache.samza.container.TaskInstanceExceptionHandler;
 import org.apache.samza.container.TaskInstanceMetrics;
 import org.apache.samza.container.TaskName;
+import org.apache.samza.context.ContainerContext;
+import org.apache.samza.context.JobContext;
+import org.apache.samza.job.model.TaskModel;
 import org.apache.samza.metrics.MetricsRegistryMap;
 import org.apache.samza.system.IncomingMessageEnvelope;
 import org.apache.samza.system.SystemConsumer;
@@ -47,13 +47,20 @@ import org.apache.samza.system.SystemStreamPartition;
 import org.apache.samza.system.TestSystemConsumers;
 import org.junit.Rule;
 import org.junit.Test;
-
 import org.junit.rules.Timeout;
 import scala.Option;
 import scala.collection.JavaConverters;
 
 import static org.junit.Assert.assertEquals;
-import static org.mockito.Mockito.*;
+import static org.mockito.Mockito.any;
+import static org.mockito.Mockito.anyLong;
+import static org.mockito.Mockito.anyObject;
+import static org.mockito.Mockito.atLeastOnce;
+import static org.mockito.Mockito.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
 
 public class TestAsyncRunLoop {
   // Immutable objects shared by all test methods.
@@ -77,12 +84,31 @@ public class TestAsyncRunLoop {
   private final IncomingMessageEnvelope ssp1EndOfStream = IncomingMessageEnvelope.buildEndOfStreamEnvelope(ssp1);
 
   TaskInstance createTaskInstance(AsyncStreamTask task, TaskName taskName, SystemStreamPartition ssp, OffsetManager manager, SystemConsumers consumers) {
+    TaskModel taskModel = mock(TaskModel.class);
+    when(taskModel.getTaskName()).thenReturn(taskName);
     TaskInstanceMetrics taskInstanceMetrics = new TaskInstanceMetrics("task", new MetricsRegistryMap());
     scala.collection.immutable.Set<SystemStreamPartition> sspSet = JavaConverters.asScalaSetConverter(Collections.singleton(ssp)).asScala().toSet();
-    return new TaskInstance(task, taskName, mock(Config.class), taskInstanceMetrics,
-        null, consumers, mock(TaskInstanceCollector.class), mock(SamzaContainerContext.class),
-        manager, null, null, null, sspSet, new TaskInstanceExceptionHandler(taskInstanceMetrics,
-        new scala.collection.immutable.HashSet<String>()), null, null, null, new scala.collection.immutable.HashSet<>(), null);
+    return new TaskInstance(task,
+        taskModel,
+        taskInstanceMetrics,
+        null,
+        consumers,
+        mock(TaskInstanceCollector.class),
+        manager,
+        null,
+        null,
+        null,
+        sspSet,
+        new TaskInstanceExceptionHandler(taskInstanceMetrics, new scala.collection.immutable.HashSet<String>()),
+        null,
+        null,
+        null,
+        new scala.collection.immutable.HashSet<>(),
+        null,
+        mock(JobContext.class),
+        mock(ContainerContext.class),
+        Option.apply(null),
+        Option.apply(null));
   }
 
   interface TestCode {

http://git-wip-us.apache.org/repos/asf/samza/blob/9d2d49e9/samza-core/src/test/java/org/apache/samza/task/TestAsyncStreamAdapter.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/task/TestAsyncStreamAdapter.java b/samza-core/src/test/java/org/apache/samza/task/TestAsyncStreamAdapter.java
index d0b820a..0538980 100644
--- a/samza-core/src/test/java/org/apache/samza/task/TestAsyncStreamAdapter.java
+++ b/samza-core/src/test/java/org/apache/samza/task/TestAsyncStreamAdapter.java
@@ -22,7 +22,7 @@ package org.apache.samza.task;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
-import org.apache.samza.config.Config;
+import org.apache.samza.context.Context;
 import org.apache.samza.system.IncomingMessageEnvelope;
 import org.junit.Before;
 import org.junit.Test;
@@ -64,7 +64,7 @@ public class TestAsyncStreamAdapter {
     }
 
     @Override
-    public void init(Config config, TaskContext context) throws Exception {
+    public void init(Context context) throws Exception {
       inited = true;
     }
 
@@ -95,7 +95,7 @@ public class TestAsyncStreamAdapter {
     TestCallbackListener listener = new TestCallbackListener();
     TaskCallback callback = new TaskCallbackImpl(listener, null, envelope, null, 0L, 0L);
 
-    taskAdaptor.init(null, null);
+    taskAdaptor.init(null);
     assertTrue(task.inited);
 
     taskAdaptor.processAsync(null, null, null, callback);

http://git-wip-us.apache.org/repos/asf/samza/blob/9d2d49e9/samza-core/src/test/java/org/apache/samza/task/TestEpochTimeScheduler.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/task/TestEpochTimeScheduler.java b/samza-core/src/test/java/org/apache/samza/task/TestEpochTimeScheduler.java
index e0da2e9..da137e6 100644
--- a/samza-core/src/test/java/org/apache/samza/task/TestEpochTimeScheduler.java
+++ b/samza-core/src/test/java/org/apache/samza/task/TestEpochTimeScheduler.java
@@ -19,12 +19,11 @@
 
 package org.apache.samza.task;
 
-import org.junit.Test;
-
 import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ScheduledFuture;
+import org.junit.Test;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;

http://git-wip-us.apache.org/repos/asf/samza/blob/9d2d49e9/samza-core/src/test/java/org/apache/samza/task/TestStreamOperatorTask.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/task/TestStreamOperatorTask.java b/samza-core/src/test/java/org/apache/samza/task/TestStreamOperatorTask.java
index 1bc23d4..ab5e295 100644
--- a/samza-core/src/test/java/org/apache/samza/task/TestStreamOperatorTask.java
+++ b/samza-core/src/test/java/org/apache/samza/task/TestStreamOperatorTask.java
@@ -19,14 +19,17 @@
 
 package org.apache.samza.task;
 
-import org.apache.samza.config.Config;
-import org.apache.samza.operators.ContextManager;
+import org.apache.samza.context.Context;
+import org.apache.samza.context.JobContext;
 import org.apache.samza.operators.OperatorSpecGraph;
 import org.apache.samza.operators.impl.OperatorImplGraph;
+import org.apache.samza.util.Clock;
 import org.junit.Test;
 
-import static org.junit.Assert.*;
-import static org.mockito.Mockito.*;
+import static org.junit.Assert.fail;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
 
 
 public class TestStreamOperatorTask {
@@ -36,20 +39,19 @@ public class TestStreamOperatorTask {
   }
 
   @Test
-  public void testCloseDuringInitializationErrors() {
-    ContextManager mockContextManager = mock(ContextManager.class);
-    StreamOperatorTask operatorTask = new StreamOperatorTask(mock(OperatorSpecGraph.class), mockContextManager);
-
-    doThrow(new RuntimeException("Failed to initialize context manager"))
-        .when(mockContextManager).init(any(), any());
-
+  public void testCloseDuringInitializationErrors() throws Exception {
+    Context context = mock(Context.class);
+    JobContext jobContext = mock(JobContext.class);
+    when(context.getJobContext()).thenReturn(jobContext);
+    doThrow(new RuntimeException("Failed to get config")).when(jobContext).getConfig();
+    StreamOperatorTask operatorTask = new StreamOperatorTask(mock(OperatorSpecGraph.class), mock(Clock.class));
     try {
-      operatorTask.init(mock(Config.class), mock(TaskContext.class));
-      operatorTask.close();
-    } catch (Exception e) {
+      operatorTask.init(context);
+    } catch (RuntimeException e) {
       if (e instanceof NullPointerException) {
         fail("Unexpected null pointer exception");
       }
     }
+    operatorTask.close();
   }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/9d2d49e9/samza-core/src/test/java/org/apache/samza/util/TestEmbeddedTaggedRateLimiter.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/util/TestEmbeddedTaggedRateLimiter.java b/samza-core/src/test/java/org/apache/samza/util/TestEmbeddedTaggedRateLimiter.java
index ef606c0..8559bb3 100644
--- a/samza-core/src/test/java/org/apache/samza/util/TestEmbeddedTaggedRateLimiter.java
+++ b/samza-core/src/test/java/org/apache/samza/util/TestEmbeddedTaggedRateLimiter.java
@@ -18,19 +18,22 @@
  */
 package org.apache.samza.util;
 
-import java.lang.reflect.Field;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.Map;
-
-import org.apache.samza.SamzaException;
 import org.apache.samza.config.Config;
-import org.apache.samza.container.SamzaContainerContext;
-import org.apache.samza.task.TaskContext;
+import org.apache.samza.container.TaskName;
+import org.apache.samza.context.Context;
+import org.apache.samza.context.MockContext;
+import org.apache.samza.job.model.ContainerModel;
+import org.apache.samza.job.model.TaskModel;
 import org.junit.Assert;
 import org.junit.Ignore;
 import org.junit.Test;
 
+import java.util.HashMap;
+import java.util.Map;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
 import static java.util.concurrent.TimeUnit.MILLISECONDS;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
@@ -206,25 +209,14 @@ public class TestEmbeddedTaggedRateLimiter {
   }
 
   static void initRateLimiter(RateLimiter rateLimiter) {
-    Config config = mock(Config.class);
-    TaskContext taskContext = mock(TaskContext.class);
-    SamzaContainerContext containerContext = mockSamzaContainerContext();
-    when(taskContext.getSamzaContainerContext()).thenReturn(containerContext);
-    rateLimiter.init(config, taskContext);
-  }
-
-  static SamzaContainerContext mockSamzaContainerContext() {
-    try {
-      Collection<String> taskNames = mock(Collection.class);
-      when(taskNames.size()).thenReturn(NUMBER_OF_TASKS);
-      SamzaContainerContext containerContext = mock(SamzaContainerContext.class);
-      Field taskNamesField = SamzaContainerContext.class.getDeclaredField("taskNames");
-      taskNamesField.setAccessible(true);
-      taskNamesField.set(containerContext, taskNames);
-      taskNamesField.setAccessible(false);
-      return containerContext;
-    } catch (Exception ex) {
-      throw new SamzaException(ex);
-    }
+    Context context = new MockContext(mock(Config.class));
+    when(context.getTaskContext().getTaskModel()).thenReturn(mock(TaskModel.class));
+    ContainerModel containerModel = mock(ContainerModel.class);
+    Map<TaskName, TaskModel> tasks = IntStream.range(0, NUMBER_OF_TASKS)
+        .mapToObj(i -> new TaskName("task-" + i))
+        .collect(Collectors.toMap(Function.identity(), x -> mock(TaskModel.class)));
+    when(containerModel.getTasks()).thenReturn(tasks);
+    when(context.getContainerContext().getContainerModel()).thenReturn(containerModel);
+    rateLimiter.init(context);
   }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/9d2d49e9/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala b/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala
index 57c0bf0..a35366d 100644
--- a/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala
+++ b/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala
@@ -22,7 +22,8 @@ package org.apache.samza.container
 import java.util
 import java.util.concurrent.atomic.AtomicReference
 
-import org.apache.samza.config.MapConfig
+import org.apache.samza.config.{Config, MapConfig}
+import org.apache.samza.context.{ApplicationContainerContext, ContainerContext}
 import org.apache.samza.coordinator.JobModelManager
 import org.apache.samza.coordinator.server.{HttpServer, JobServlet}
 import org.apache.samza.job.model.{ContainerModel, JobModel, TaskModel}
@@ -46,7 +47,7 @@ class TestSamzaContainer extends AssertionsForJUnit with MockitoSugar {
   private val TASK_NAME = new TaskName("taskName")
 
   @Mock
-  private var containerContext: SamzaContainerContext = null
+  private var config: Config = null
   @Mock
   private var taskInstance: TaskInstance = null
   @Mock
@@ -60,6 +61,10 @@ class TestSamzaContainer extends AssertionsForJUnit with MockitoSugar {
   @Mock
   private var metrics: SamzaContainerMetrics = null
   @Mock
+  private var containerContext: ContainerContext = null
+  @Mock
+  private var applicationContainerContext: ApplicationContainerContext = null
+  @Mock
   private var samzaContainerListener: SamzaContainerListener = null
 
   private var samzaContainer: SamzaContainer = null
@@ -67,15 +72,7 @@ class TestSamzaContainer extends AssertionsForJUnit with MockitoSugar {
   @Before
   def setup(): Unit = {
     MockitoAnnotations.initMocks(this)
-    this.samzaContainer = new SamzaContainer(
-      this.containerContext,
-      Map(TASK_NAME -> this.taskInstance),
-      this.runLoop,
-      this.systemAdmins,
-      this.consumerMultiplexer,
-      this.producerMultiplexer,
-      metrics)
-    this.samzaContainer.setContainerListener(this.samzaContainerListener)
+    setupSamzaContainer(Some(this.applicationContainerContext))
     when(this.metrics.containerStartupTime).thenReturn(mock[Timer])
   }
 
@@ -173,6 +170,24 @@ class TestSamzaContainer extends AssertionsForJUnit with MockitoSugar {
   }
 
   @Test
+  def testApplicationContainerContext() {
+    val orderVerifier = inOrder(this.applicationContainerContext, this.runLoop)
+    this.samzaContainer.run
+    orderVerifier.verify(this.applicationContainerContext).start()
+    orderVerifier.verify(this.runLoop).run()
+    orderVerifier.verify(this.applicationContainerContext).stop()
+  }
+
+  @Test
+  def testNullApplicationContainerContextFactory() {
+    setupSamzaContainer(None)
+    this.samzaContainer.run
+    verify(this.runLoop).run()
+    // applicationContainerContext is not even wired into the container anymore, but just double check it is not used
+    verifyZeroInteractions(this.applicationContainerContext)
+  }
+
+  @Test
   def testReadJobModel() {
     val config = new MapConfig(Map("a" -> "b").asJava)
     val offsets = new util.HashMap[SystemStreamPartition, String]()
@@ -258,6 +273,20 @@ class TestSamzaContainer extends AssertionsForJUnit with MockitoSugar {
     assertEquals(Set(), SamzaContainer.getChangelogSSPsForContainer(containerModel, Map()))
   }
 
+  private def setupSamzaContainer(applicationContainerContext: Option[ApplicationContainerContext]) {
+    this.samzaContainer = new SamzaContainer(
+      this.config,
+      Map(TASK_NAME -> this.taskInstance),
+      this.runLoop,
+      this.systemAdmins,
+      this.consumerMultiplexer,
+      this.producerMultiplexer,
+      metrics,
+      containerContext = this.containerContext,
+      applicationContainerContextOption = applicationContainerContext)
+    this.samzaContainer.setContainerListener(this.samzaContainerListener)
+  }
+
   class MockJobServlet(exceptionLimit: Int, jobModelRef: AtomicReference[JobModel]) extends JobServlet(jobModelRef) {
     var exceptionCount = 0
 

http://git-wip-us.apache.org/repos/asf/samza/blob/9d2d49e9/samza-core/src/test/scala/org/apache/samza/container/TestTaskInstance.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/test/scala/org/apache/samza/container/TestTaskInstance.scala b/samza-core/src/test/scala/org/apache/samza/container/TestTaskInstance.scala
index b196131..15534cd 100644
--- a/samza-core/src/test/scala/org/apache/samza/container/TestTaskInstance.scala
+++ b/samza-core/src/test/scala/org/apache/samza/container/TestTaskInstance.scala
@@ -22,7 +22,8 @@ package org.apache.samza.container
 
 import org.apache.samza.Partition
 import org.apache.samza.checkpoint.{Checkpoint, OffsetManager}
-import org.apache.samza.config.Config
+import org.apache.samza.context.{TaskContext => _, _}
+import org.apache.samza.job.model.TaskModel
 import org.apache.samza.metrics.Counter
 import org.apache.samza.storage.TaskStorageManager
 import org.apache.samza.system.{IncomingMessageEnvelope, SystemAdmin, SystemConsumers, SystemStream, _}
@@ -34,11 +35,12 @@ import org.mockito.Mockito._
 import org.mockito.invocation.InvocationOnMock
 import org.mockito.stubbing.Answer
 import org.mockito.{Matchers, Mock, MockitoAnnotations}
+import org.scalatest.junit.AssertionsForJUnit
 import org.scalatest.mockito.MockitoSugar
 
 import scala.collection.JavaConverters._
 
-class TestTaskInstance extends MockitoSugar {
+class TestTaskInstance extends AssertionsForJUnit with MockitoSugar {
   private val SYSTEM_NAME = "test-system"
   private val TASK_NAME = new TaskName("taskName")
   private val SYSTEM_STREAM_PARTITION =
@@ -48,7 +50,7 @@ class TestTaskInstance extends MockitoSugar {
   @Mock
   private var task: AllTask = null
   @Mock
-  private var config: Config = null
+  private var taskModel: TaskModel = null
   @Mock
   private var metrics: TaskInstanceMetrics = null
   @Mock
@@ -60,13 +62,21 @@ class TestTaskInstance extends MockitoSugar {
   @Mock
   private var collector: TaskInstanceCollector = null
   @Mock
-  private var containerContext: SamzaContainerContext = null
-  @Mock
   private var offsetManager: OffsetManager = null
   @Mock
   private var taskStorageManager: TaskStorageManager = null
   // not a mock; using MockTaskInstanceExceptionHandler
   private var taskInstanceExceptionHandler: MockTaskInstanceExceptionHandler = null
+  @Mock
+  private var jobContext: JobContext = null
+  @Mock
+  private var containerContext: ContainerContext = null
+  @Mock
+  private var applicationContainerContext: ApplicationContainerContext = null
+  @Mock
+  private var applicationTaskContextFactory: ApplicationTaskContextFactory[ApplicationTaskContext] = null
+  @Mock
+  private var applicationTaskContext: ApplicationTaskContext = null
 
   private var taskInstance: TaskInstance = null
 
@@ -75,19 +85,12 @@ class TestTaskInstance extends MockitoSugar {
     MockitoAnnotations.initMocks(this)
     // not using Mockito mock since Mockito doesn't work well with the call-by-name argument in maybeHandle
     this.taskInstanceExceptionHandler = new MockTaskInstanceExceptionHandler
-    this.taskInstance = new TaskInstance(this.task,
-      TASK_NAME,
-      this.config,
-      this.metrics,
-      this.systemAdmins,
-      this.consumerMultiplexer,
-      this.collector,
-      this.containerContext,
-      this.offsetManager,
-      storageManager = this.taskStorageManager,
-      systemStreamPartitions = SYSTEM_STREAM_PARTITIONS,
-      exceptionHandler = this.taskInstanceExceptionHandler)
+    when(this.taskModel.getTaskName).thenReturn(TASK_NAME)
+    when(this.applicationTaskContextFactory.create(Matchers.eq(this.jobContext), Matchers.eq(this.containerContext),
+      any(), Matchers.eq(this.applicationContainerContext)))
+      .thenReturn(this.applicationTaskContext)
     when(this.systemAdmins.getSystemAdmin(SYSTEM_NAME)).thenReturn(this.systemAdmin)
+    setupTaskInstance(Some(this.applicationTaskContextFactory))
   }
 
   @Test
@@ -133,10 +136,10 @@ class TestTaskInstance extends MockitoSugar {
    */
   @Test
   def testManualOffsetReset() {
-    when(this.task.init(any(), any())).thenAnswer(new Answer[Void] {
+    when(this.task.init(any())).thenAnswer(new Answer[Void] {
       override def answer(invocation: InvocationOnMock): Void = {
-        val taskContext = invocation.getArgumentAt(1, classOf[TaskContext])
-        taskContext.setStartingOffset(SYSTEM_STREAM_PARTITION, "10")
+        val context = invocation.getArgumentAt(0, classOf[Context])
+        context.getTaskContext.setStartingOffset(SYSTEM_STREAM_PARTITION, "10")
         null
       }
     })
@@ -198,6 +201,35 @@ class TestTaskInstance extends MockitoSugar {
     verify(commitsCounter).inc()
   }
 
+  /**
+    * Given that an application task context factory is provided, then lifecycle calls should be made and the context
+    * should be accessible.
+    */
+  @Test
+  def testApplicationTaskContextFactoryProvided(): Unit = {
+    assertEquals(this.applicationTaskContext, this.taskInstance.context.getApplicationTaskContext)
+    this.taskInstance.initTask
+    verify(this.applicationTaskContext).start()
+    verify(this.applicationTaskContext, never()).stop()
+    this.taskInstance.shutdownTask
+    verify(this.applicationTaskContext).stop()
+  }
+
+  /**
+    * Given that no application task context factory is provided, then no lifecycle calls should be made. Also, an
+    * exception should be thrown if the application task context is accessed.
+    */
+  @Test
+  def testNoApplicationTaskContextFactoryProvided() {
+    setupTaskInstance(None)
+    this.taskInstance.initTask
+    this.taskInstance.shutdownTask
+    verifyZeroInteractions(this.applicationTaskContext)
+    intercept[IllegalStateException] {
+      this.taskInstance.context.getApplicationTaskContext
+    }
+  }
+
   @Test(expected = classOf[SystemProducerException])
   def testProducerExceptionsIsPropagated() {
     when(this.metrics.commits).thenReturn(mock[Counter])
@@ -210,6 +242,24 @@ class TestTaskInstance extends MockitoSugar {
     }
   }
 
+  private def setupTaskInstance(
+    applicationTaskContextFactory: Option[ApplicationTaskContextFactory[ApplicationTaskContext]]): Unit = {
+    this.taskInstance = new TaskInstance(this.task,
+      this.taskModel,
+      this.metrics,
+      this.systemAdmins,
+      this.consumerMultiplexer,
+      this.collector,
+      offsetManager = this.offsetManager,
+      storageManager = this.taskStorageManager,
+      systemStreamPartitions = SYSTEM_STREAM_PARTITIONS,
+      exceptionHandler = this.taskInstanceExceptionHandler,
+      jobContext = this.jobContext,
+      containerContext = this.containerContext,
+      applicationContainerContextOption = Some(this.applicationContainerContext),
+      applicationTaskContextFactoryOption = applicationTaskContextFactory)
+  }
+
   /**
     * Task type which has all task traits, which can be mocked.
     */

http://git-wip-us.apache.org/repos/asf/samza/blob/9d2d49e9/samza-core/src/test/scala/org/apache/samza/processor/StreamProcessorTestUtils.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/test/scala/org/apache/samza/processor/StreamProcessorTestUtils.scala b/samza-core/src/test/scala/org/apache/samza/processor/StreamProcessorTestUtils.scala
index 0b951f4..59f8662 100644
--- a/samza-core/src/test/scala/org/apache/samza/processor/StreamProcessorTestUtils.scala
+++ b/samza-core/src/test/scala/org/apache/samza/processor/StreamProcessorTestUtils.scala
@@ -18,21 +18,25 @@
  */
 package org.apache.samza.processor
 
-import java.util.Collections
+import java.util
 
+import org.apache.samza.Partition
 import org.apache.samza.config.MapConfig
 import org.apache.samza.container._
-import org.apache.samza.metrics.MetricsRegistryMap
+import org.apache.samza.context.{ContainerContext, JobContext}
+import org.apache.samza.job.model.TaskModel
 import org.apache.samza.serializers.SerdeManager
-import org.apache.samza.system.chooser.RoundRobinChooser
 import org.apache.samza.system._
+import org.apache.samza.system.chooser.RoundRobinChooser
 import org.apache.samza.task.{StreamTask, TaskInstanceCollector}
+import org.mockito.Mockito
 
 
 object StreamProcessorTestUtils {
   def getDummyContainer(mockRunloop: RunLoop, streamTask: StreamTask) = {
-    val config = new MapConfig
+    val config = new MapConfig()
     val taskName = new TaskName("taskName")
+    val taskModel = new TaskModel(taskName, new util.HashSet[SystemStreamPartition](), new Partition(0))
     val adminMultiplexer = new SystemAdmins(config)
     val consumerMultiplexer = new SystemConsumers(
       new RoundRobinChooser,
@@ -41,26 +45,29 @@ object StreamProcessorTestUtils {
       Map[String, SystemProducer](),
       new SerdeManager)
     val collector = new TaskInstanceCollector(producerMultiplexer)
-    val containerContext = new SamzaContainerContext("0", config, Collections.singleton[TaskName](taskName), new MetricsRegistryMap)
+    val containerContext = Mockito.mock(classOf[ContainerContext])
     val taskInstance: TaskInstance = new TaskInstance(
       streamTask,
-      taskName,
-      config,
+      taskModel,
       new TaskInstanceMetrics,
-      null,
+      adminMultiplexer,
       consumerMultiplexer,
       collector,
-      containerContext
-    )
+      jobContext = Mockito.mock(classOf[JobContext]),
+      containerContext = containerContext,
+      applicationContainerContextOption = None,
+      applicationTaskContextFactoryOption = None)
 
     val container = new SamzaContainer(
-      containerContext = containerContext,
+      config = config,
       taskInstances = Map(taskName -> taskInstance),
       runLoop = mockRunloop,
       systemAdmins = adminMultiplexer,
       consumerMultiplexer = consumerMultiplexer,
       producerMultiplexer = producerMultiplexer,
-      metrics = new SamzaContainerMetrics)
+      metrics = new SamzaContainerMetrics,
+      containerContext = containerContext,
+      applicationContainerContextOption = None)
     container
   }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/samza/blob/9d2d49e9/samza-kv-inmemory/src/main/scala/org/apache/samza/storage/kv/inmemory/InMemoryKeyValueStorageEngineFactory.scala
----------------------------------------------------------------------
diff --git a/samza-kv-inmemory/src/main/scala/org/apache/samza/storage/kv/inmemory/InMemoryKeyValueStorageEngineFactory.scala b/samza-kv-inmemory/src/main/scala/org/apache/samza/storage/kv/inmemory/InMemoryKeyValueStorageEngineFactory.scala
index 53147ad..e30328a 100644
--- a/samza-kv-inmemory/src/main/scala/org/apache/samza/storage/kv/inmemory/InMemoryKeyValueStorageEngineFactory.scala
+++ b/samza-kv-inmemory/src/main/scala/org/apache/samza/storage/kv/inmemory/InMemoryKeyValueStorageEngineFactory.scala
@@ -21,18 +21,19 @@ package org.apache.samza.storage.kv.inmemory
 
 import java.io.File
 
-import org.apache.samza.container.SamzaContainerContext
+import org.apache.samza.context.{ContainerContext, JobContext}
 import org.apache.samza.metrics.MetricsRegistry
-import org.apache.samza.storage.kv.{KeyValueStoreMetrics, BaseKeyValueStorageEngineFactory, KeyValueStore}
+import org.apache.samza.storage.kv.{BaseKeyValueStorageEngineFactory, KeyValueStore, KeyValueStoreMetrics}
 import org.apache.samza.system.SystemStreamPartition
 
 class InMemoryKeyValueStorageEngineFactory[K, V] extends BaseKeyValueStorageEngineFactory[K, V] {
 
   override def getKVStore(storeName: String,
-                          storeDir: File,
-                          registry: MetricsRegistry,
-                          changeLogSystemStreamPartition: SystemStreamPartition,
-                          containerContext: SamzaContainerContext): KeyValueStore[Array[Byte], Array[Byte]] = {
+    storeDir: File,
+    registry: MetricsRegistry,
+    changeLogSystemStreamPartition: SystemStreamPartition,
+    jobContext: JobContext,
+    containerContext: ContainerContext): KeyValueStore[Array[Byte], Array[Byte]] = {
     val metrics = new KeyValueStoreMetrics(storeName, registry)
     val inMemoryDb = new InMemoryKeyValueStore (metrics)
     inMemoryDb

http://git-wip-us.apache.org/repos/asf/samza/blob/9d2d49e9/samza-kv-rocksdb/src/main/java/org/apache/samza/storage/kv/RocksDbKeyValueReader.java
----------------------------------------------------------------------
diff --git a/samza-kv-rocksdb/src/main/java/org/apache/samza/storage/kv/RocksDbKeyValueReader.java b/samza-kv-rocksdb/src/main/java/org/apache/samza/storage/kv/RocksDbKeyValueReader.java
index 9dca23c..0734fe6 100644
--- a/samza-kv-rocksdb/src/main/java/org/apache/samza/storage/kv/RocksDbKeyValueReader.java
+++ b/samza-kv-rocksdb/src/main/java/org/apache/samza/storage/kv/RocksDbKeyValueReader.java
@@ -19,16 +19,11 @@
 
 package org.apache.samza.storage.kv;
 
-import java.util.ArrayList;
-
 import org.apache.samza.SamzaException;
 import org.apache.samza.config.Config;
 import org.apache.samza.config.JavaSerializerConfig;
 import org.apache.samza.config.JavaStorageConfig;
 import org.apache.samza.config.SerializerConfig$;
-import org.apache.samza.container.SamzaContainerContext;
-import org.apache.samza.container.TaskName;
-import org.apache.samza.metrics.MetricsRegistryMap;
 import org.apache.samza.serializers.Serde;
 import org.apache.samza.serializers.SerdeFactory;
 import org.apache.samza.util.Util;
@@ -65,11 +60,7 @@ public class RocksDbKeyValueReader {
     valueSerde = getSerdeFromName(storageConfig.getStorageMsgSerde(storeName), serializerConfig);
 
     // get db options
-    ArrayList<TaskName> taskNameList = new ArrayList<TaskName>();
-    taskNameList.add(new TaskName("read-rocks-db"));
-    SamzaContainerContext samzaContainerContext =
-        new SamzaContainerContext("0",  config, taskNameList, new MetricsRegistryMap());
-    Options options = RocksDbOptionsHelper.options(config, samzaContainerContext);
+    Options options = RocksDbOptionsHelper.options(config, 1);
 
     // open the db
     RocksDB.loadLibrary();

http://git-wip-us.apache.org/repos/asf/samza/blob/9d2d49e9/samza-kv-rocksdb/src/main/java/org/apache/samza/storage/kv/RocksDbOptionsHelper.java
----------------------------------------------------------------------
diff --git a/samza-kv-rocksdb/src/main/java/org/apache/samza/storage/kv/RocksDbOptionsHelper.java b/samza-kv-rocksdb/src/main/java/org/apache/samza/storage/kv/RocksDbOptionsHelper.java
index 9389681..7beb066 100644
--- a/samza-kv-rocksdb/src/main/java/org/apache/samza/storage/kv/RocksDbOptionsHelper.java
+++ b/samza-kv-rocksdb/src/main/java/org/apache/samza/storage/kv/RocksDbOptionsHelper.java
@@ -20,7 +20,8 @@
 package org.apache.samza.storage.kv;
 
 import org.apache.samza.config.Config;
-import org.apache.samza.container.SamzaContainerContext;
+import org.apache.samza.context.ContainerContext;
+import org.apache.samza.context.JobContext;
 import org.rocksdb.BlockBasedTableConfig;
 import org.rocksdb.CompactionStyle;
 import org.rocksdb.CompressionType;
@@ -41,12 +42,11 @@ public class RocksDbOptionsHelper {
   private static final String ROCKSDB_MAX_LOG_FILE_SIZE_BYTES = "rocksdb.max.log.file.size.bytes";
   private static final String ROCKSDB_KEEP_LOG_FILE_NUM = "rocksdb.keep.log.file.num";
 
-  public static Options options(Config storeConfig, SamzaContainerContext containerContext) {
+  public static Options options(Config storeConfig, int numTasksForContainer) {
     Options options = new Options();
     Long writeBufSize = storeConfig.getLong("container.write.buffer.size.bytes", 32 * 1024 * 1024);
     // Cache size and write buffer size are specified on a per-container basis.
-    int numTasks = containerContext.taskNames.size();
-    options.setWriteBufferSize((int) (writeBufSize / numTasks));
+    options.setWriteBufferSize((int) (writeBufSize / numTasksForContainer));
 
     CompressionType compressionType = CompressionType.SNAPPY_COMPRESSION;
     String compressionInConfig = storeConfig.get(ROCKSDB_COMPRESSION, "snappy");
@@ -75,7 +75,7 @@ public class RocksDbOptionsHelper {
     }
     options.setCompressionType(compressionType);
 
-    long blockCacheSize = getBlockCacheSize(storeConfig, containerContext);
+    long blockCacheSize = getBlockCacheSize(storeConfig, numTasksForContainer);
     int blockSize = storeConfig.getInt(ROCKSDB_BLOCK_SIZE_BYTES, 4096);
     BlockBasedTableConfig tableOptions = new BlockBasedTableConfig();
     tableOptions.setBlockCacheSize(blockCacheSize).setBlockSize(blockSize);
@@ -109,9 +109,8 @@ public class RocksDbOptionsHelper {
     return options;
   }
 
-  public static Long getBlockCacheSize(Config storeConfig, SamzaContainerContext containerContext) {
-    int numTasks = containerContext.taskNames.size();
+  public static Long getBlockCacheSize(Config storeConfig, int numTasksForContainer) {
     long cacheSize = storeConfig.getLong("container.cache.size.bytes", 100 * 1024 * 1024L);
-    return cacheSize / numTasks;
+    return cacheSize / numTasksForContainer;
   }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/samza/blob/9d2d49e9/samza-kv-rocksdb/src/main/scala/org/apache/samza/storage/kv/RocksDbKeyValueStorageEngineFactory.scala
----------------------------------------------------------------------
diff --git a/samza-kv-rocksdb/src/main/scala/org/apache/samza/storage/kv/RocksDbKeyValueStorageEngineFactory.scala b/samza-kv-rocksdb/src/main/scala/org/apache/samza/storage/kv/RocksDbKeyValueStorageEngineFactory.scala
index 2b7ffb5..704af4a 100644
--- a/samza-kv-rocksdb/src/main/scala/org/apache/samza/storage/kv/RocksDbKeyValueStorageEngineFactory.scala
+++ b/samza-kv-rocksdb/src/main/scala/org/apache/samza/storage/kv/RocksDbKeyValueStorageEngineFactory.scala
@@ -20,11 +20,12 @@
 package org.apache.samza.storage.kv
 
 import java.io.File
-import org.apache.samza.container.SamzaContainerContext
+
+import org.apache.samza.config.StorageConfig._
+import org.apache.samza.context.{ContainerContext, JobContext}
 import org.apache.samza.metrics.MetricsRegistry
 import org.apache.samza.system.SystemStreamPartition
 import org.rocksdb.{FlushOptions, WriteOptions}
-import org.apache.samza.config.StorageConfig._
 
 class RocksDbKeyValueStorageEngineFactory [K, V] extends BaseKeyValueStorageEngineFactory[K, V] {
   /**
@@ -37,17 +38,19 @@ class RocksDbKeyValueStorageEngineFactory [K, V] extends BaseKeyValueStorageEngi
    * @return A valid KeyValueStore instance
    */
   override def getKVStore(storeName: String,
-                          storeDir: File,
-                          registry: MetricsRegistry,
-                          changeLogSystemStreamPartition: SystemStreamPartition,
-                          containerContext: SamzaContainerContext): KeyValueStore[Array[Byte], Array[Byte]] = {
-    val storageConfig = containerContext.config.subset("stores." + storeName + ".", true)
-    val isLoggedStore = containerContext.config.getChangelogStream(storeName).isDefined
+    storeDir: File,
+    registry: MetricsRegistry,
+    changeLogSystemStreamPartition: SystemStreamPartition,
+    jobContext: JobContext,
+    containerContext: ContainerContext): KeyValueStore[Array[Byte], Array[Byte]] = {
+    val storageConfig = jobContext.getConfig.subset("stores." + storeName + ".", true)
+    val isLoggedStore = jobContext.getConfig.getChangelogStream(storeName).isDefined
     val rocksDbMetrics = new KeyValueStoreMetrics(storeName, registry)
+    val numTasksForContainer = containerContext.getContainerModel.getTasks.keySet().size()
     rocksDbMetrics.newGauge("rocksdb.block-cache-size",
-      () => RocksDbOptionsHelper.getBlockCacheSize(storageConfig, containerContext))
+      () => RocksDbOptionsHelper.getBlockCacheSize(storageConfig, numTasksForContainer))
 
-    val rocksDbOptions = RocksDbOptionsHelper.options(storageConfig, containerContext)
+    val rocksDbOptions = RocksDbOptionsHelper.options(storageConfig, numTasksForContainer)
     val rocksDbWriteOptions = new WriteOptions().setDisableWAL(true)
     val rocksDbFlushOptions = new FlushOptions().setWaitForFlush(true)
     val rocksDb = new RocksDbKeyValueStore(

http://git-wip-us.apache.org/repos/asf/samza/blob/9d2d49e9/samza-kv-rocksdb/src/test/java/org/apache/samza/storage/kv/TestRocksDbTableDescriptor.java
----------------------------------------------------------------------
diff --git a/samza-kv-rocksdb/src/test/java/org/apache/samza/storage/kv/TestRocksDbTableDescriptor.java b/samza-kv-rocksdb/src/test/java/org/apache/samza/storage/kv/TestRocksDbTableDescriptor.java
index 35a66e8..cd7e85c 100644
--- a/samza-kv-rocksdb/src/test/java/org/apache/samza/storage/kv/TestRocksDbTableDescriptor.java
+++ b/samza-kv-rocksdb/src/test/java/org/apache/samza/storage/kv/TestRocksDbTableDescriptor.java
@@ -18,14 +18,13 @@
  */
 package org.apache.samza.storage.kv;
 
+import junit.framework.Assert;
 import org.apache.samza.serializers.IntegerSerde;
 import org.apache.samza.serializers.KVSerde;
 import org.apache.samza.serializers.StringSerde;
 import org.apache.samza.table.TableSpec;
 import org.junit.Test;
 
-import junit.framework.Assert;
-
 
 public class TestRocksDbTableDescriptor {
 

http://git-wip-us.apache.org/repos/asf/samza/blob/9d2d49e9/samza-kv/src/main/java/org/apache/samza/storage/kv/BaseLocalStoreBackedTableProvider.java
----------------------------------------------------------------------
diff --git a/samza-kv/src/main/java/org/apache/samza/storage/kv/BaseLocalStoreBackedTableProvider.java b/samza-kv/src/main/java/org/apache/samza/storage/kv/BaseLocalStoreBackedTableProvider.java
index 8231905..e56c977 100644
--- a/samza-kv/src/main/java/org/apache/samza/storage/kv/BaseLocalStoreBackedTableProvider.java
+++ b/samza-kv/src/main/java/org/apache/samza/storage/kv/BaseLocalStoreBackedTableProvider.java
@@ -18,11 +18,11 @@
  */
 package org.apache.samza.storage.kv;
 
+import com.google.common.base.Preconditions;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.regex.Pattern;
-
 import org.apache.commons.lang3.StringUtils;
 import org.apache.samza.SamzaException;
 import org.apache.samza.config.Config;
@@ -31,15 +31,12 @@ import org.apache.samza.config.JavaTableConfig;
 import org.apache.samza.config.JobConfig;
 import org.apache.samza.config.MapConfig;
 import org.apache.samza.config.StorageConfig;
-import org.apache.samza.container.SamzaContainerContext;
+import org.apache.samza.context.Context;
 import org.apache.samza.table.ReadableTable;
 import org.apache.samza.table.Table;
 import org.apache.samza.table.TableSpec;
 import org.apache.samza.table.utils.BaseTableProvider;
 import org.apache.samza.table.utils.SerdeUtils;
-import org.apache.samza.task.TaskContext;
-
-import com.google.common.base.Preconditions;
 
 
 /**
@@ -59,13 +56,12 @@ abstract public class BaseLocalStoreBackedTableProvider extends BaseTableProvide
   }
 
   @Override
-  public void init(SamzaContainerContext containerContext, TaskContext taskContext) {
-
-    super.init(containerContext, taskContext);
+  public void init(Context context) {
+    super.init(context);
 
-    Preconditions.checkNotNull(this.taskContext, "Must specify task context for local tables.");
+    Preconditions.checkNotNull(this.context, "Must specify context for local tables.");
 
-    kvStore = (KeyValueStore) taskContext.getStore(tableSpec.getId());
+    kvStore = (KeyValueStore) this.context.getTaskContext().getStore(tableSpec.getId());
 
     if (kvStore == null) {
       throw new SamzaException(String.format(
@@ -81,7 +77,7 @@ abstract public class BaseLocalStoreBackedTableProvider extends BaseTableProvide
       throw new SamzaException("Store not initialized for table " + tableSpec.getId());
     }
     ReadableTable table = new LocalStoreBackedReadWriteTable(tableSpec.getId(), kvStore);
-    table.init(containerContext, taskContext);
+    table.init(this.context);
     return table;
   }
 

http://git-wip-us.apache.org/repos/asf/samza/blob/9d2d49e9/samza-kv/src/main/java/org/apache/samza/storage/kv/LocalStoreBackedReadWriteTable.java
----------------------------------------------------------------------
diff --git a/samza-kv/src/main/java/org/apache/samza/storage/kv/LocalStoreBackedReadWriteTable.java b/samza-kv/src/main/java/org/apache/samza/storage/kv/LocalStoreBackedReadWriteTable.java
index 9eeb55e..804df43 100644
--- a/samza-kv/src/main/java/org/apache/samza/storage/kv/LocalStoreBackedReadWriteTable.java
+++ b/samza-kv/src/main/java/org/apache/samza/storage/kv/LocalStoreBackedReadWriteTable.java
@@ -20,11 +20,9 @@ package org.apache.samza.storage.kv;
 
 import java.util.List;
 import java.util.concurrent.CompletableFuture;
-
-import org.apache.samza.container.SamzaContainerContext;
+import org.apache.samza.context.Context;
 import org.apache.samza.table.ReadWriteTable;
 import org.apache.samza.table.utils.DefaultTableWriteMetrics;
-import org.apache.samza.task.TaskContext;
 
 
 /**
@@ -51,9 +49,9 @@ public class LocalStoreBackedReadWriteTable<K, V> extends LocalStoreBackedReadab
    * {@inheritDoc}
    */
   @Override
-  public void init(SamzaContainerContext containerContext, TaskContext taskContext) {
-    super.init(containerContext, taskContext);
-    writeMetrics = new DefaultTableWriteMetrics(containerContext, taskContext, this, tableId);
+  public void init(Context context) {
+    super.init(context);
+    writeMetrics = new DefaultTableWriteMetrics(context, this, tableId);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/samza/blob/9d2d49e9/samza-kv/src/main/java/org/apache/samza/storage/kv/LocalStoreBackedReadableTable.java
----------------------------------------------------------------------
diff --git a/samza-kv/src/main/java/org/apache/samza/storage/kv/LocalStoreBackedReadableTable.java b/samza-kv/src/main/java/org/apache/samza/storage/kv/LocalStoreBackedReadableTable.java
index d0629c4..d440d42 100644
--- a/samza-kv/src/main/java/org/apache/samza/storage/kv/LocalStoreBackedReadableTable.java
+++ b/samza-kv/src/main/java/org/apache/samza/storage/kv/LocalStoreBackedReadableTable.java
@@ -18,15 +18,13 @@
  */
 package org.apache.samza.storage.kv;
 
+import com.google.common.base.Preconditions;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.CompletableFuture;
-
-import com.google.common.base.Preconditions;
-import org.apache.samza.container.SamzaContainerContext;
+import org.apache.samza.context.Context;
 import org.apache.samza.table.ReadableTable;
 import org.apache.samza.table.utils.DefaultTableReadMetrics;
-import org.apache.samza.task.TaskContext;
 
 
 /**
@@ -58,8 +56,8 @@ public class LocalStoreBackedReadableTable<K, V> implements ReadableTable<K, V>
    * {@inheritDoc}
    */
   @Override
-  public void init(SamzaContainerContext containerContext, TaskContext taskContext) {
-    readMetrics = new DefaultTableReadMetrics(containerContext, taskContext, this, tableId);
+  public void init(Context context) {
+    readMetrics = new DefaultTableReadMetrics(context, this, tableId);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/samza/blob/9d2d49e9/samza-kv/src/main/scala/org/apache/samza/storage/kv/BaseKeyValueStorageEngineFactory.scala
----------------------------------------------------------------------
diff --git a/samza-kv/src/main/scala/org/apache/samza/storage/kv/BaseKeyValueStorageEngineFactory.scala b/samza-kv/src/main/scala/org/apache/samza/storage/kv/BaseKeyValueStorageEngineFactory.scala
index da80560..d962e93 100644
--- a/samza-kv/src/main/scala/org/apache/samza/storage/kv/BaseKeyValueStorageEngineFactory.scala
+++ b/samza-kv/src/main/scala/org/apache/samza/storage/kv/BaseKeyValueStorageEngineFactory.scala
@@ -22,14 +22,14 @@ package org.apache.samza.storage.kv
 import java.io.File
 
 import org.apache.samza.SamzaException
-import org.apache.samza.container.SamzaContainerContext
+import org.apache.samza.config.MetricsConfig.Config2Metrics
+import org.apache.samza.context.{ContainerContext, JobContext}
 import org.apache.samza.metrics.MetricsRegistry
 import org.apache.samza.serializers.Serde
 import org.apache.samza.storage.{StorageEngine, StorageEngineFactory, StoreProperties}
 import org.apache.samza.system.SystemStreamPartition
 import org.apache.samza.task.MessageCollector
-import org.apache.samza.config.MetricsConfig.Config2Metrics
-import org.apache.samza.util.{HighResolutionClock, ScalaJavaUtil}
+import org.apache.samza.util.HighResolutionClock
 
 /**
  * A key value storage engine factory implementation
@@ -52,11 +52,12 @@ trait BaseKeyValueStorageEngineFactory[K, V] extends StorageEngineFactory[K, V]
    * @param containerContext Information about the container in which the task is executing.
    * @return A valid KeyValueStore instance
    */
-  def getKVStore( storeName: String,
-                  storeDir: File,
-                  registry: MetricsRegistry,
-                  changeLogSystemStreamPartition: SystemStreamPartition,
-                  containerContext: SamzaContainerContext): KeyValueStore[Array[Byte], Array[Byte]]
+  def getKVStore(storeName: String,
+    storeDir: File,
+    registry: MetricsRegistry,
+    changeLogSystemStreamPartition: SystemStreamPartition,
+    jobContext: JobContext,
+    containerContext: ContainerContext): KeyValueStore[Array[Byte], Array[Byte]]
 
   /**
    * Constructs a key-value StorageEngine and returns it to the caller
@@ -70,15 +71,16 @@ trait BaseKeyValueStorageEngineFactory[K, V] extends StorageEngineFactory[K, V]
    * @param changeLogSystemStreamPartition Samza stream partition from which to receive the changelog.
    * @param containerContext Information about the container in which the task is executing.
    **/
-  def getStorageEngine( storeName: String,
-                        storeDir: File,
-                        keySerde: Serde[K],
-                        msgSerde: Serde[V],
-                        collector: MessageCollector,
-                        registry: MetricsRegistry,
-                        changeLogSystemStreamPartition: SystemStreamPartition,
-                        containerContext: SamzaContainerContext): StorageEngine = {
-    val storageConfig = containerContext.config.subset("stores." + storeName + ".", true)
+  def getStorageEngine(storeName: String,
+    storeDir: File,
+    keySerde: Serde[K],
+    msgSerde: Serde[V],
+    collector: MessageCollector,
+    registry: MetricsRegistry,
+    changeLogSystemStreamPartition: SystemStreamPartition,
+    jobContext: JobContext,
+    containerContext: ContainerContext): StorageEngine = {
+    val storageConfig = jobContext.getConfig.subset("stores." + storeName + ".", true)
     val storeFactory = storageConfig.get("factory")
     var storePropertiesBuilder = new StoreProperties.StorePropertiesBuilder()
     val accessLog = storageConfig.getBoolean("accesslog.enabled", false)
@@ -106,7 +108,8 @@ trait BaseKeyValueStorageEngineFactory[K, V] extends StorageEngineFactory[K, V]
       throw new SamzaException("Must define a message serde when using key value storage.")
     }
 
-    val rawStore = getKVStore(storeName, storeDir, registry, changeLogSystemStreamPartition, containerContext)
+    val rawStore =
+      getKVStore(storeName, storeDir, registry, changeLogSystemStreamPartition, jobContext, containerContext)
 
     // maybe wrap with logging
     val maybeLoggedStore = if (changeLogSystemStreamPartition == null) {
@@ -141,7 +144,7 @@ trait BaseKeyValueStorageEngineFactory[K, V] extends StorageEngineFactory[K, V]
     // create the storage engine and return
     // TODO: Decide if we should use raw bytes when restoring
     val keyValueStorageEngineMetrics = new KeyValueStorageEngineMetrics(storeName, registry)
-    val clock = if (containerContext.config.getMetricsTimerEnabled) {
+    val clock = if (jobContext.getConfig.getMetricsTimerEnabled) {
       new HighResolutionClock {
         override def nanoTime(): Long = System.nanoTime()
       }

http://git-wip-us.apache.org/repos/asf/samza/blob/9d2d49e9/samza-kv/src/test/java/org/apache/samza/storage/kv/TestBaseLocalStoreBackedTableProvider.java
----------------------------------------------------------------------
diff --git a/samza-kv/src/test/java/org/apache/samza/storage/kv/TestBaseLocalStoreBackedTableProvider.java b/samza-kv/src/test/java/org/apache/samza/storage/kv/TestBaseLocalStoreBackedTableProvider.java
index 2b0166c..399f9fd 100644
--- a/samza-kv/src/test/java/org/apache/samza/storage/kv/TestBaseLocalStoreBackedTableProvider.java
+++ b/samza-kv/src/test/java/org/apache/samza/storage/kv/TestBaseLocalStoreBackedTableProvider.java
@@ -28,33 +28,33 @@ import org.apache.samza.config.JavaTableConfig;
 import org.apache.samza.config.JobConfig;
 import org.apache.samza.config.MapConfig;
 import org.apache.samza.config.StorageConfig;
-import org.apache.samza.container.SamzaContainerContext;
-import org.apache.samza.storage.StorageEngine;
+import org.apache.samza.context.Context;
+import org.apache.samza.context.TaskContext;
 import org.apache.samza.table.TableProvider;
 import org.apache.samza.table.TableSpec;
-import org.apache.samza.task.TaskContext;
 import org.apache.samza.util.NoOpMetricsRegistry;
 import org.junit.Test;
 
 import static org.mockito.Matchers.any;
-import static org.mockito.Mockito.*;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
 
 
 public class TestBaseLocalStoreBackedTableProvider {
 
   @Test
   public void testInit() {
-    StorageEngine store = mock(KeyValueStorageEngine.class);
-    SamzaContainerContext containerContext = mock(SamzaContainerContext.class);
+    Context context = mock(Context.class);
     TaskContext taskContext = mock(TaskContext.class);
-    when(taskContext.getStore(any())).thenReturn(store);
-    when(taskContext.getMetricsRegistry()).thenReturn(new NoOpMetricsRegistry());
+    when(context.getTaskContext()).thenReturn(taskContext);
+    when(taskContext.getStore(any())).thenReturn(mock(KeyValueStore.class));
+    when(taskContext.getTaskMetricsRegistry()).thenReturn(new NoOpMetricsRegistry());
 
     TableSpec tableSpec = mock(TableSpec.class);
     when(tableSpec.getId()).thenReturn("t1");
 
     TableProvider tableProvider = createTableProvider(tableSpec);
-    tableProvider.init(containerContext, taskContext);
+    tableProvider.init(context);
     Assert.assertNotNull(tableProvider.getTable());
   }
 

http://git-wip-us.apache.org/repos/asf/samza/blob/9d2d49e9/samza-sql/src/main/java/org/apache/samza/sql/runner/SamzaSqlApplicationContext.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/runner/SamzaSqlApplicationContext.java b/samza-sql/src/main/java/org/apache/samza/sql/runner/SamzaSqlApplicationContext.java
new file mode 100644
index 0000000..6841e15
--- /dev/null
+++ b/samza-sql/src/main/java/org/apache/samza/sql/runner/SamzaSqlApplicationContext.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.sql.runner;
+
+import org.apache.samza.context.ApplicationTaskContext;
+import org.apache.samza.sql.translator.TranslatorContext;
+
+
+public class SamzaSqlApplicationContext implements ApplicationTaskContext {
+  private final TranslatorContext translatorContext;
+
+  public SamzaSqlApplicationContext(TranslatorContext translatorContext) {
+    this.translatorContext = translatorContext;
+  }
+
+  public TranslatorContext getTranslatorContext() {
+    return translatorContext;
+  }
+
+  @Override
+  public void start() {
+  }
+
+  @Override
+  public void stop() {
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/9d2d49e9/samza-sql/src/main/java/org/apache/samza/sql/translator/FilterTranslator.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/translator/FilterTranslator.java b/samza-sql/src/main/java/org/apache/samza/sql/translator/FilterTranslator.java
index f33c5ca..77a24f8 100644
--- a/samza-sql/src/main/java/org/apache/samza/sql/translator/FilterTranslator.java
+++ b/samza-sql/src/main/java/org/apache/samza/sql/translator/FilterTranslator.java
@@ -21,14 +21,13 @@ package org.apache.samza.sql.translator;
 
 import java.util.Arrays;
 import java.util.Collections;
-
 import org.apache.calcite.rel.logical.LogicalFilter;
-import org.apache.samza.config.Config;
+import org.apache.samza.context.Context;
 import org.apache.samza.operators.MessageStream;
 import org.apache.samza.operators.functions.FilterFunction;
 import org.apache.samza.sql.data.Expression;
 import org.apache.samza.sql.data.SamzaSqlRelMessage;
-import org.apache.samza.task.TaskContext;
+import org.apache.samza.sql.runner.SamzaSqlApplicationContext;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -53,8 +52,8 @@ class FilterTranslator {
     }
 
     @Override
-    public void init(Config config, TaskContext context) {
-      this.context = (TranslatorContext) context.getUserContext();
+    public void init(Context context) {
+      this.context = ((SamzaSqlApplicationContext) context.getApplicationTaskContext()).getTranslatorContext();
       this.filter = (LogicalFilter) this.context.getRelNode(filterId);
       this.expr = this.context.getExpressionCompiler().compile(filter.getInputs(), Collections.singletonList(filter.getCondition()));
     }

http://git-wip-us.apache.org/repos/asf/samza/blob/9d2d49e9/samza-sql/src/main/java/org/apache/samza/sql/translator/ModifyTranslator.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/translator/ModifyTranslator.java b/samza-sql/src/main/java/org/apache/samza/sql/translator/ModifyTranslator.java
index 965338f..435a2cc 100644
--- a/samza-sql/src/main/java/org/apache/samza/sql/translator/ModifyTranslator.java
+++ b/samza-sql/src/main/java/org/apache/samza/sql/translator/ModifyTranslator.java
@@ -26,7 +26,7 @@ import org.apache.calcite.rel.core.TableModify;
 import org.apache.commons.lang.Validate;
 import org.apache.samza.SamzaException;
 import org.apache.samza.application.StreamApplicationDescriptor;
-import org.apache.samza.config.Config;
+import org.apache.samza.context.Context;
 import org.apache.samza.operators.KV;
 import org.apache.samza.operators.MessageStream;
 import org.apache.samza.operators.MessageStreamImpl;
@@ -39,8 +39,8 @@ import org.apache.samza.serializers.NoOpSerde;
 import org.apache.samza.sql.data.SamzaSqlRelMessage;
 import org.apache.samza.sql.interfaces.SamzaRelConverter;
 import org.apache.samza.sql.interfaces.SqlIOConfig;
+import org.apache.samza.sql.runner.SamzaSqlApplicationContext;
 import org.apache.samza.table.Table;
-import org.apache.samza.task.TaskContext;
 
 
 /**
@@ -70,9 +70,10 @@ class ModifyTranslator {
     }
 
     @Override
-    public void init(Config config, TaskContext taskContext) {
-      TranslatorContext context = (TranslatorContext) taskContext.getUserContext();
-      this.samzaMsgConverter = context.getMsgConverter(outputTopic);
+    public void init(Context context) {
+      TranslatorContext translatorContext =
+          ((SamzaSqlApplicationContext) context.getApplicationTaskContext()).getTranslatorContext();
+      this.samzaMsgConverter = translatorContext.getMsgConverter(outputTopic);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/samza/blob/9d2d49e9/samza-sql/src/main/java/org/apache/samza/sql/translator/ProjectTranslator.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/translator/ProjectTranslator.java b/samza-sql/src/main/java/org/apache/samza/sql/translator/ProjectTranslator.java
index 8e6f687..9a1ff84 100644
--- a/samza-sql/src/main/java/org/apache/samza/sql/translator/ProjectTranslator.java
+++ b/samza-sql/src/main/java/org/apache/samza/sql/translator/ProjectTranslator.java
@@ -31,12 +31,12 @@ import org.apache.calcite.rex.RexInputRef;
 import org.apache.calcite.rex.RexNode;
 import org.apache.calcite.sql.validate.SqlUserDefinedFunction;
 import org.apache.samza.SamzaException;
-import org.apache.samza.config.Config;
+import org.apache.samza.context.Context;
 import org.apache.samza.operators.MessageStream;
 import org.apache.samza.operators.functions.MapFunction;
 import org.apache.samza.sql.data.Expression;
 import org.apache.samza.sql.data.SamzaSqlRelMessage;
-import org.apache.samza.task.TaskContext;
+import org.apache.samza.sql.runner.SamzaSqlApplicationContext;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -61,8 +61,8 @@ class ProjectTranslator {
     }
 
     @Override
-    public void init(Config config, TaskContext taskContext) {
-      this.context = (TranslatorContext) taskContext.getUserContext();
+    public void init(Context context) {
+      this.context = ((SamzaSqlApplicationContext) context.getApplicationTaskContext()).getTranslatorContext();
       this.project = (Project) this.context.getRelNode(projectId);
       this.expr = this.context.getExpressionCompiler().compile(project.getInputs(), project.getProjects());
     }

http://git-wip-us.apache.org/repos/asf/samza/blob/9d2d49e9/samza-sql/src/main/java/org/apache/samza/sql/translator/QueryTranslator.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/translator/QueryTranslator.java b/samza-sql/src/main/java/org/apache/samza/sql/translator/QueryTranslator.java
index 3a35b97..b13043f 100644
--- a/samza-sql/src/main/java/org/apache/samza/sql/translator/QueryTranslator.java
+++ b/samza-sql/src/main/java/org/apache/samza/sql/translator/QueryTranslator.java
@@ -31,16 +31,17 @@ import org.apache.calcite.rel.logical.LogicalJoin;
 import org.apache.calcite.rel.logical.LogicalProject;
 import org.apache.samza.SamzaException;
 import org.apache.samza.application.StreamApplicationDescriptor;
-import org.apache.samza.config.Config;
-import org.apache.samza.operators.ContextManager;
+import org.apache.samza.context.Context;
+import org.apache.samza.operators.KV;
+import org.apache.samza.operators.functions.MapFunction;
 import org.apache.samza.sql.data.SamzaSqlExecutionContext;
+import org.apache.samza.sql.data.SamzaSqlRelMessage;
 import org.apache.samza.sql.interfaces.SamzaRelConverter;
 import org.apache.samza.sql.interfaces.SqlIOResolver;
 import org.apache.samza.sql.planner.QueryPlanner;
 import org.apache.samza.sql.runner.SamzaSqlApplicationConfig;
+import org.apache.samza.sql.runner.SamzaSqlApplicationContext;
 import org.apache.samza.sql.testutil.SamzaSqlQueryParser;
-import org.apache.samza.task.TaskContext;
-
 
 /**
  * This class is used to populate the {@link StreamApplicationDescriptor} using the SQL queries.
@@ -72,8 +73,8 @@ public class QueryTranslator {
 
   public void translate(RelRoot relRoot, StreamApplicationDescriptor appDesc) {
     final SamzaSqlExecutionContext executionContext = new SamzaSqlExecutionContext(this.sqlConfig);
-    final TranslatorContext context = new TranslatorContext(appDesc, relRoot, executionContext, this.converters);
-    final SqlIOResolver ioResolver = context.getExecutionContext().getSamzaSqlApplicationConfig().getIoResolver();
+    final TranslatorContext translatorContext = new TranslatorContext(appDesc, relRoot, executionContext, this.converters);
+    final SqlIOResolver ioResolver = translatorContext.getExecutionContext().getSamzaSqlApplicationConfig().getIoResolver();
     final RelNode node = relRoot.project();
 
     node.accept(new RelShuttleImpl() {
@@ -93,28 +94,28 @@ public class QueryTranslator {
           throw new SamzaException("Not a supported operation: " + modify.toString());
         }
         RelNode node = super.visit(modify);
-        modifyTranslator.translate(modify, context);
+        modifyTranslator.translate(modify, translatorContext);
         return node;
       }
 
       @Override
       public RelNode visit(TableScan scan) {
         RelNode node = super.visit(scan);
-        scanTranslator.translate(scan, context);
+        scanTranslator.translate(scan, translatorContext);
         return node;
       }
 
       @Override
       public RelNode visit(LogicalFilter filter) {
         RelNode node = visitChild(filter, 0, filter.getInput());
-        new FilterTranslator().translate(filter, context);
+        new FilterTranslator().translate(filter, translatorContext);
         return node;
       }
 
       @Override
       public RelNode visit(LogicalProject project) {
         RelNode node = super.visit(project);
-        new ProjectTranslator().translate(project, context);
+        new ProjectTranslator().translate(project, translatorContext);
         return node;
       }
 
@@ -122,7 +123,7 @@ public class QueryTranslator {
       public RelNode visit(LogicalJoin join) {
         RelNode node = super.visit(join);
         joinId++;
-        new JoinTranslator(joinId, ioResolver).translate(join, context);
+        new JoinTranslator(joinId, ioResolver).translate(join, translatorContext);
         return node;
       }
 
@@ -130,23 +131,21 @@ public class QueryTranslator {
       public RelNode visit(LogicalAggregate aggregate) {
         RelNode node = super.visit(aggregate);
         windowId++;
-        new LogicalAggregateTranslator(windowId).translate(aggregate, context);
+        new LogicalAggregateTranslator(windowId).translate(aggregate, translatorContext);
         return node;
       }
     });
 
-    appDesc.withContextManager(new ContextManager() {
-      @Override
-      public void init(Config config, TaskContext taskContext) {
-        taskContext.setUserContext(context.clone());
-      }
-
-      @Override
-      public void close() {
-
-      }
-
-    });
-
+    /*
+     * TODO When serialization of ApplicationDescriptor is actually needed, then something will need to be updated here,
+     * since translatorContext is not Serializable. Currently, a new ApplicationDescriptor instance is created in each
+     * container, so it does not need to be serialized. Therefore, the translatorContext is recreated in each container
+     * and does not need to be serialized.
+     */
+    appDesc.withApplicationTaskContextFactory((jobContext,
+        containerContext,
+        taskContext,
+        applicationContainerContext) ->
+        new SamzaSqlApplicationContext(translatorContext.clone()));
   }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/9d2d49e9/samza-sql/src/main/java/org/apache/samza/sql/translator/ScanTranslator.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/translator/ScanTranslator.java b/samza-sql/src/main/java/org/apache/samza/sql/translator/ScanTranslator.java
index 771a5d5..be94160 100644
--- a/samza-sql/src/main/java/org/apache/samza/sql/translator/ScanTranslator.java
+++ b/samza-sql/src/main/java/org/apache/samza/sql/translator/ScanTranslator.java
@@ -24,7 +24,7 @@ import java.util.Map;
 import org.apache.calcite.rel.core.TableScan;
 import org.apache.commons.lang.Validate;
 import org.apache.samza.application.StreamApplicationDescriptor;
-import org.apache.samza.config.Config;
+import org.apache.samza.context.Context;
 import org.apache.samza.operators.KV;
 import org.apache.samza.operators.MessageStream;
 import org.apache.samza.operators.descriptors.DelegatingSystemDescriptor;
@@ -34,8 +34,8 @@ import org.apache.samza.serializers.KVSerde;
 import org.apache.samza.serializers.NoOpSerde;
 import org.apache.samza.sql.data.SamzaSqlRelMessage;
 import org.apache.samza.sql.interfaces.SamzaRelConverter;
-import org.apache.samza.task.TaskContext;
 import org.apache.samza.sql.interfaces.SqlIOConfig;
+import org.apache.samza.sql.runner.SamzaSqlApplicationContext;
 
 
 /**
@@ -64,9 +64,10 @@ class ScanTranslator {
     }
 
     @Override
-    public void init(Config config, TaskContext taskContext) {
-      TranslatorContext context = (TranslatorContext) taskContext.getUserContext();
-      this.msgConverter = context.getMsgConverter(streamName);
+    public void init(Context context) {
+      TranslatorContext translatorContext =
+          ((SamzaSqlApplicationContext) context.getApplicationTaskContext()).getTranslatorContext();
+      this.msgConverter = translatorContext.getMsgConverter(streamName);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/samza/blob/9d2d49e9/samza-sql/src/test/java/org/apache/samza/sql/e2e/TestSamzaSqlTable.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/test/java/org/apache/samza/sql/e2e/TestSamzaSqlTable.java b/samza-sql/src/test/java/org/apache/samza/sql/e2e/TestSamzaSqlTable.java
index 2005c21..ec0a993 100644
--- a/samza-sql/src/test/java/org/apache/samza/sql/e2e/TestSamzaSqlTable.java
+++ b/samza-sql/src/test/java/org/apache/samza/sql/e2e/TestSamzaSqlTable.java
@@ -22,7 +22,6 @@ package org.apache.samza.sql.e2e;
 import java.util.Arrays;
 import java.util.List;
 import java.util.Map;
-
 import org.apache.samza.config.MapConfig;
 import org.apache.samza.sql.runner.SamzaSqlApplicationConfig;
 import org.apache.samza.sql.runner.SamzaSqlApplicationRunner;

http://git-wip-us.apache.org/repos/asf/samza/blob/9d2d49e9/samza-sql/src/test/java/org/apache/samza/sql/runner/TestSamzaSqlApplicationRunner.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/test/java/org/apache/samza/sql/runner/TestSamzaSqlApplicationRunner.java b/samza-sql/src/test/java/org/apache/samza/sql/runner/TestSamzaSqlApplicationRunner.java
index 1ac804e..f0df3a9 100644
--- a/samza-sql/src/test/java/org/apache/samza/sql/runner/TestSamzaSqlApplicationRunner.java
+++ b/samza-sql/src/test/java/org/apache/samza/sql/runner/TestSamzaSqlApplicationRunner.java
@@ -20,14 +20,12 @@
 package org.apache.samza.sql.runner;
 
 import java.util.Map;
-
 import org.apache.samza.config.Config;
 import org.apache.samza.config.MapConfig;
 import org.apache.samza.runtime.LocalApplicationRunner;
 import org.apache.samza.runtime.RemoteApplicationRunner;
 import org.apache.samza.sql.testutil.SamzaSqlTestConfig;
 import org.junit.Assert;
-
 import org.junit.Test;
 
 

http://git-wip-us.apache.org/repos/asf/samza/blob/9d2d49e9/samza-sql/src/test/java/org/apache/samza/sql/system/TestAvroSystemFactory.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/test/java/org/apache/samza/sql/system/TestAvroSystemFactory.java b/samza-sql/src/test/java/org/apache/samza/sql/system/TestAvroSystemFactory.java
index 458196f..fd811cd 100644
--- a/samza-sql/src/test/java/org/apache/samza/sql/system/TestAvroSystemFactory.java
+++ b/samza-sql/src/test/java/org/apache/samza/sql/system/TestAvroSystemFactory.java
@@ -28,7 +28,6 @@ import java.util.Map;
 import java.util.Set;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
-
 import org.apache.avro.generic.GenericData;
 import org.apache.avro.generic.GenericRecord;
 import org.apache.samza.config.Config;

http://git-wip-us.apache.org/repos/asf/samza/blob/9d2d49e9/samza-sql/src/test/java/org/apache/samza/sql/testutil/TestIOResolverFactory.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/test/java/org/apache/samza/sql/testutil/TestIOResolverFactory.java b/samza-sql/src/test/java/org/apache/samza/sql/testutil/TestIOResolverFactory.java
index 8318e8a..4c78b5a 100644
--- a/samza-sql/src/test/java/org/apache/samza/sql/testutil/TestIOResolverFactory.java
+++ b/samza-sql/src/test/java/org/apache/samza/sql/testutil/TestIOResolverFactory.java
@@ -24,7 +24,6 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.CompletableFuture;
-
 import org.apache.commons.lang.NotImplementedException;
 import org.apache.samza.config.Config;
 import org.apache.samza.operators.BaseTableDescriptor;

http://git-wip-us.apache.org/repos/asf/samza/blob/9d2d49e9/samza-sql/src/test/java/org/apache/samza/sql/testutil/TestSamzaSqlFileParser.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/test/java/org/apache/samza/sql/testutil/TestSamzaSqlFileParser.java b/samza-sql/src/test/java/org/apache/samza/sql/testutil/TestSamzaSqlFileParser.java
index a84f347..dd98b92 100644
--- a/samza-sql/src/test/java/org/apache/samza/sql/testutil/TestSamzaSqlFileParser.java
+++ b/samza-sql/src/test/java/org/apache/samza/sql/testutil/TestSamzaSqlFileParser.java
@@ -23,8 +23,6 @@ import java.io.File;
 import java.io.IOException;
 import java.io.PrintWriter;
 import java.util.List;
-
-import org.apache.samza.sql.testutil.SqlFileParser;
 import org.junit.Assert;
 import org.junit.Test;
 

http://git-wip-us.apache.org/repos/asf/samza/blob/9d2d49e9/samza-sql/src/test/java/org/apache/samza/sql/translator/TestFilterTranslator.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/test/java/org/apache/samza/sql/translator/TestFilterTranslator.java b/samza-sql/src/test/java/org/apache/samza/sql/translator/TestFilterTranslator.java
index e7c2195..07ebe33 100644
--- a/samza-sql/src/test/java/org/apache/samza/sql/translator/TestFilterTranslator.java
+++ b/samza-sql/src/test/java/org/apache/samza/sql/translator/TestFilterTranslator.java
@@ -21,14 +21,11 @@ package org.apache.samza.sql.translator;
 
 import java.io.IOException;
 import java.util.ArrayList;
-import java.util.HashSet;
 import org.apache.calcite.DataContext;
 import org.apache.calcite.rel.RelNode;
 import org.apache.calcite.rel.logical.LogicalFilter;
 import org.apache.samza.application.StreamApplicationDescriptorImpl;
-import org.apache.samza.config.Config;
-import org.apache.samza.container.TaskContextImpl;
-import org.apache.samza.container.TaskName;
+import org.apache.samza.context.Context;
 import org.apache.samza.operators.MessageStream;
 import org.apache.samza.operators.MessageStreamImpl;
 import org.apache.samza.operators.functions.FilterFunction;
@@ -38,6 +35,7 @@ import org.apache.samza.sql.data.Expression;
 import org.apache.samza.sql.data.RexToJavaCompiler;
 import org.apache.samza.sql.data.SamzaSqlExecutionContext;
 import org.apache.samza.sql.data.SamzaSqlRelMessage;
+import org.apache.samza.sql.runner.SamzaSqlApplicationContext;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.mockito.internal.util.reflection.Whitebox;
@@ -50,8 +48,8 @@ import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
 import static org.mockito.Matchers.any;
-import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
@@ -96,11 +94,9 @@ public class TestFilterTranslator extends TranslatorTestBase {
     assertEquals(filterSpec.getOpCode(), OperatorSpec.OpCode.FILTER);
 
     // Verify that the describe() method will establish the context for the filter function
-    Config mockConfig = mock(Config.class);
-    TaskContextImpl taskContext = new TaskContextImpl(new TaskName("Partition-1"), null, null,
-        new HashSet<>(), null, null, null, null, null, null);
-    taskContext.setUserContext(mockContext);
-    filterSpec.getTransformFn().init(mockConfig, taskContext);
+    Context context = mock(Context.class);
+    when(context.getApplicationTaskContext()).thenReturn(new SamzaSqlApplicationContext(mockContext));
+    filterSpec.getTransformFn().init(context);
     FilterFunction filterFn = (FilterFunction) Whitebox.getInternalState(filterSpec, "filterFn");
     assertNotNull(filterFn);
     assertEquals(mockContext, Whitebox.getInternalState(filterFn, "context"));

http://git-wip-us.apache.org/repos/asf/samza/blob/9d2d49e9/samza-sql/src/test/java/org/apache/samza/sql/translator/TestProjectTranslator.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/test/java/org/apache/samza/sql/translator/TestProjectTranslator.java b/samza-sql/src/test/java/org/apache/samza/sql/translator/TestProjectTranslator.java
index 3046c1f..2ed7a00 100644
--- a/samza-sql/src/test/java/org/apache/samza/sql/translator/TestProjectTranslator.java
+++ b/samza-sql/src/test/java/org/apache/samza/sql/translator/TestProjectTranslator.java
@@ -21,7 +21,6 @@ package org.apache.samza.sql.translator;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collection;
-import java.util.HashSet;
 import java.util.List;
 import org.apache.calcite.DataContext;
 import org.apache.calcite.rel.RelNode;
@@ -33,9 +32,7 @@ import org.apache.calcite.rex.RexNode;
 import org.apache.calcite.sql.validate.SqlUserDefinedFunction;
 import org.apache.calcite.util.Pair;
 import org.apache.samza.application.StreamApplicationDescriptorImpl;
-import org.apache.samza.config.Config;
-import org.apache.samza.container.TaskContextImpl;
-import org.apache.samza.container.TaskName;
+import org.apache.samza.context.Context;
 import org.apache.samza.operators.MessageStream;
 import org.apache.samza.operators.MessageStreamImpl;
 import org.apache.samza.operators.functions.MapFunction;
@@ -47,6 +44,7 @@ import org.apache.samza.sql.data.Expression;
 import org.apache.samza.sql.data.RexToJavaCompiler;
 import org.apache.samza.sql.data.SamzaSqlExecutionContext;
 import org.apache.samza.sql.data.SamzaSqlRelMessage;
+import org.apache.samza.sql.runner.SamzaSqlApplicationContext;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.mockito.internal.util.reflection.Whitebox;
@@ -58,8 +56,8 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
 import static org.mockito.Matchers.any;
-import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
@@ -114,11 +112,9 @@ public class TestProjectTranslator extends TranslatorTestBase {
     assertEquals(projectSpec.getOpCode(), OperatorSpec.OpCode.MAP);
 
     // Verify that the bootstrap() method will establish the context for the map function
-    Config mockConfig = mock(Config.class);
-    TaskContextImpl taskContext = new TaskContextImpl(new TaskName("Partition-1"), null, null,
-        new HashSet<>(), null, null, null, null, null, null);
-    taskContext.setUserContext(mockContext);
-    projectSpec.getTransformFn().init(mockConfig, taskContext);
+    Context context = mock(Context.class);
+    when(context.getApplicationTaskContext()).thenReturn(new SamzaSqlApplicationContext(mockContext));
+    projectSpec.getTransformFn().init(context);
     MapFunction mapFn = (MapFunction) Whitebox.getInternalState(projectSpec, "mapFn");
     assertNotNull(mapFn);
     assertEquals(mockContext, Whitebox.getInternalState(mapFn, "context"));
@@ -249,11 +245,9 @@ public class TestProjectTranslator extends TranslatorTestBase {
     assertEquals(projectSpec.getOpCode(), OperatorSpec.OpCode.MAP);
 
     // Verify that the describe() method will establish the context for the map function
-    Config mockConfig = mock(Config.class);
-    TaskContextImpl taskContext = new TaskContextImpl(new TaskName("Partition-1"), null, null,
-        new HashSet<>(), null, null, null, null, null, null);
-    taskContext.setUserContext(mockContext);
-    projectSpec.getTransformFn().init(mockConfig, taskContext);
+    Context context = mock(Context.class);
+    when(context.getApplicationTaskContext()).thenReturn(new SamzaSqlApplicationContext(mockContext));
+    projectSpec.getTransformFn().init(context);
     MapFunction mapFn = (MapFunction) Whitebox.getInternalState(projectSpec, "mapFn");
     assertNotNull(mapFn);
     assertEquals(mockContext, Whitebox.getInternalState(mapFn, "context"));
@@ -285,5 +279,4 @@ public class TestProjectTranslator extends TranslatorTestBase {
     }});
 
   }
-
 }