You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by pm...@apache.org on 2018/10/10 22:34:26 UTC

[3/5] samza git commit: SAMZA-1714: Creating shared context factory for shared context objects

http://git-wip-us.apache.org/repos/asf/samza/blob/9d2d49e9/samza-core/src/test/java/org/apache/samza/application/TestStreamApplicationDescriptorImpl.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/application/TestStreamApplicationDescriptorImpl.java b/samza-core/src/test/java/org/apache/samza/application/TestStreamApplicationDescriptorImpl.java
index 84f5dbb..de16ef2 100644
--- a/samza-core/src/test/java/org/apache/samza/application/TestStreamApplicationDescriptorImpl.java
+++ b/samza-core/src/test/java/org/apache/samza/application/TestStreamApplicationDescriptorImpl.java
@@ -19,16 +19,17 @@
 package org.apache.samza.application;
 
 import com.google.common.collect.ImmutableList;
-
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
+import java.util.Optional;
 import java.util.concurrent.atomic.AtomicInteger;
 import org.apache.samza.SamzaException;
 import org.apache.samza.config.Config;
 import org.apache.samza.config.JobConfig;
+import org.apache.samza.context.ApplicationContainerContextFactory;
+import org.apache.samza.context.ApplicationTaskContextFactory;
 import org.apache.samza.operators.BaseTableDescriptor;
-import org.apache.samza.operators.ContextManager;
 import org.apache.samza.operators.data.TestMessageEnvelope;
 import org.apache.samza.operators.descriptors.GenericInputDescriptor;
 import org.apache.samza.operators.descriptors.GenericOutputDescriptor;
@@ -521,11 +522,35 @@ public class TestStreamApplicationDescriptorImpl {
   }
 
   @Test
-  public void testContextManager() {
-    ContextManager cntxMan = mock(ContextManager.class);
-    StreamApplication testApp = appDesc -> appDesc.withContextManager(cntxMan);
+  public void testApplicationContainerContextFactory() {
+    ApplicationContainerContextFactory factory = mock(ApplicationContainerContextFactory.class);
+    StreamApplication testApp = appDesc -> appDesc.withApplicationContainerContextFactory(factory);
+    StreamApplicationDescriptorImpl appSpec = new StreamApplicationDescriptorImpl(testApp, mock(Config.class));
+    assertEquals(appSpec.getApplicationContainerContextFactory(), Optional.of(factory));
+  }
+
+  @Test
+  public void testNoApplicationContainerContextFactory() {
+    StreamApplication testApp = appDesc -> {
+    };
+    StreamApplicationDescriptorImpl appSpec = new StreamApplicationDescriptorImpl(testApp, mock(Config.class));
+    assertEquals(appSpec.getApplicationContainerContextFactory(), Optional.empty());
+  }
+
+  @Test
+  public void testApplicationTaskContextFactory() {
+    ApplicationTaskContextFactory factory = mock(ApplicationTaskContextFactory.class);
+    StreamApplication testApp = appDesc -> appDesc.withApplicationTaskContextFactory(factory);
+    StreamApplicationDescriptorImpl appSpec = new StreamApplicationDescriptorImpl(testApp, mock(Config.class));
+    assertEquals(appSpec.getApplicationTaskContextFactory(), Optional.of(factory));
+  }
+
+  @Test
+  public void testNoApplicationTaskContextFactory() {
+    StreamApplication testApp = appDesc -> {
+    };
     StreamApplicationDescriptorImpl appSpec = new StreamApplicationDescriptorImpl(testApp, mock(Config.class));
-    assertEquals(appSpec.getContextManager(), cntxMan);
+    assertEquals(appSpec.getApplicationTaskContextFactory(), Optional.empty());
   }
 
   @Test

http://git-wip-us.apache.org/repos/asf/samza/blob/9d2d49e9/samza-core/src/test/java/org/apache/samza/application/TestTaskApplicationDescriptorImpl.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/application/TestTaskApplicationDescriptorImpl.java b/samza-core/src/test/java/org/apache/samza/application/TestTaskApplicationDescriptorImpl.java
index abe5ce1..e79e25b 100644
--- a/samza-core/src/test/java/org/apache/samza/application/TestTaskApplicationDescriptorImpl.java
+++ b/samza-core/src/test/java/org/apache/samza/application/TestTaskApplicationDescriptorImpl.java
@@ -21,10 +21,12 @@ package org.apache.samza.application;
 import java.util.ArrayList;
 import java.util.HashSet;
 import java.util.List;
+import java.util.Optional;
 import java.util.Set;
 import org.apache.samza.config.Config;
+import org.apache.samza.context.ApplicationContainerContextFactory;
+import org.apache.samza.context.ApplicationTaskContextFactory;
 import org.apache.samza.operators.BaseTableDescriptor;
-import org.apache.samza.operators.ContextManager;
 import org.apache.samza.operators.TableDescriptor;
 import org.apache.samza.operators.descriptors.base.stream.InputDescriptor;
 import org.apache.samza.operators.descriptors.base.stream.OutputDescriptor;
@@ -127,13 +129,35 @@ public class TestTaskApplicationDescriptorImpl {
   }
 
   @Test
-  public void testContextManager() {
-    ContextManager cntxMan = mock(ContextManager.class);
+  public void testApplicationContainerContextFactory() {
+    ApplicationContainerContextFactory factory = mock(ApplicationContainerContextFactory.class);
+    TaskApplication testApp = appDesc -> appDesc.withApplicationContainerContextFactory(factory);
+    TaskApplicationDescriptorImpl appSpec = new TaskApplicationDescriptorImpl(testApp, mock(Config.class));
+    assertEquals(appSpec.getApplicationContainerContextFactory(), Optional.of(factory));
+  }
+
+  @Test
+  public void testNoApplicationContainerContextFactory() {
     TaskApplication testApp = appDesc -> {
-      appDesc.withContextManager(cntxMan);
     };
-    TaskApplicationDescriptorImpl appDesc = new TaskApplicationDescriptorImpl(testApp, config);
-    assertEquals(appDesc.getContextManager(), cntxMan);
+    TaskApplicationDescriptorImpl appSpec = new TaskApplicationDescriptorImpl(testApp, mock(Config.class));
+    assertEquals(appSpec.getApplicationContainerContextFactory(), Optional.empty());
+  }
+
+  @Test
+  public void testApplicationTaskContextFactory() {
+    ApplicationTaskContextFactory factory = mock(ApplicationTaskContextFactory.class);
+    TaskApplication testApp = appDesc -> appDesc.withApplicationTaskContextFactory(factory);
+    TaskApplicationDescriptorImpl appSpec = new TaskApplicationDescriptorImpl(testApp, mock(Config.class));
+    assertEquals(appSpec.getApplicationTaskContextFactory(), Optional.of(factory));
+  }
+
+  @Test
+  public void testNoApplicationTaskContextFactory() {
+    TaskApplication testApp = appDesc -> {
+    };
+    TaskApplicationDescriptorImpl appSpec = new TaskApplicationDescriptorImpl(testApp, mock(Config.class));
+    assertEquals(appSpec.getApplicationTaskContextFactory(), Optional.empty());
   }
 
   @Test

http://git-wip-us.apache.org/repos/asf/samza/blob/9d2d49e9/samza-core/src/test/java/org/apache/samza/context/MockContext.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/context/MockContext.java b/samza-core/src/test/java/org/apache/samza/context/MockContext.java
new file mode 100644
index 0000000..778d486
--- /dev/null
+++ b/samza-core/src/test/java/org/apache/samza/context/MockContext.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.context;
+
+import org.apache.samza.config.Config;
+import org.apache.samza.config.MapConfig;
+
+import static org.mockito.Mockito.*;
+
+
+public class MockContext implements Context {
+  private final JobContext jobContext = mock(JobContext.class);
+  private final ContainerContext containerContext = mock(ContainerContext.class);
+  /**
+   * This is {@link TaskContextImpl} because some tests need more than just the interface.
+   */
+  private final TaskContextImpl taskContext = mock(TaskContextImpl.class);
+  private final ApplicationContainerContext applicationContainerContext = mock(ApplicationContainerContext.class);
+  private final ApplicationTaskContext applicationTaskContext = mock(ApplicationTaskContext.class);
+
+  public MockContext() {
+    this(new MapConfig());
+  }
+
+  /**
+   * @param config config is widely used, so help wire it in here
+   */
+  public MockContext(Config config) {
+    when(this.jobContext.getConfig()).thenReturn(config);
+  }
+
+  @Override
+  public JobContext getJobContext() {
+    return jobContext;
+  }
+
+  @Override
+  public ContainerContext getContainerContext() {
+    return containerContext;
+  }
+
+  @Override
+  public TaskContext getTaskContext() {
+    return taskContext;
+  }
+
+  @Override
+  public ApplicationContainerContext getApplicationContainerContext() {
+    return applicationContainerContext;
+  }
+
+  @Override
+  public ApplicationTaskContext getApplicationTaskContext() {
+    return applicationTaskContext;
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/9d2d49e9/samza-core/src/test/java/org/apache/samza/context/TestContextImpl.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/context/TestContextImpl.java b/samza-core/src/test/java/org/apache/samza/context/TestContextImpl.java
index 33ad3a5..40526db 100644
--- a/samza-core/src/test/java/org/apache/samza/context/TestContextImpl.java
+++ b/samza-core/src/test/java/org/apache/samza/context/TestContextImpl.java
@@ -18,9 +18,11 @@
  */
 package org.apache.samza.context;
 
+import java.util.Optional;
 import org.junit.Test;
 
 import static org.junit.Assert.assertEquals;
+import static org.mockito.Mockito.mock;
 
 
 public class TestContextImpl {
@@ -63,11 +65,17 @@ public class TestContextImpl {
   }
 
   private static Context buildWithApplicationContainerContext(ApplicationContainerContext applicationContainerContext) {
-    return new ContextImpl(null, null, null, applicationContainerContext, null);
+    return buildWithApplicationContext(applicationContainerContext, mock(ApplicationTaskContext.class));
   }
 
   private static Context buildWithApplicationTaskContext(ApplicationTaskContext applicationTaskContext) {
-    return new ContextImpl(null, null, null, null, applicationTaskContext);
+    return buildWithApplicationContext(mock(ApplicationContainerContext.class), applicationTaskContext);
+  }
+
+  private static Context buildWithApplicationContext(ApplicationContainerContext applicationContainerContext,
+      ApplicationTaskContext applicationTaskContext) {
+    return new ContextImpl(mock(JobContext.class), mock(ContainerContext.class), mock(TaskContext.class),
+        Optional.ofNullable(applicationContainerContext), Optional.ofNullable(applicationTaskContext));
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/samza/blob/9d2d49e9/samza-core/src/test/java/org/apache/samza/context/TestTaskContextImpl.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/context/TestTaskContextImpl.java b/samza-core/src/test/java/org/apache/samza/context/TestTaskContextImpl.java
index 78f886c..3d3803b 100644
--- a/samza-core/src/test/java/org/apache/samza/context/TestTaskContextImpl.java
+++ b/samza-core/src/test/java/org/apache/samza/context/TestTaskContextImpl.java
@@ -34,6 +34,7 @@ import org.mockito.Mock;
 import org.mockito.MockitoAnnotations;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
@@ -62,7 +63,7 @@ public class TestTaskContextImpl {
     MockitoAnnotations.initMocks(this);
     taskContext =
         new TaskContextImpl(taskModel, taskMetricsRegistry, keyValueStoreProvider, tableManager, callbackScheduler,
-            offsetManager);
+            offsetManager, null, null);
     when(this.taskModel.getTaskName()).thenReturn(TASK_NAME);
   }
 
@@ -95,4 +96,16 @@ public class TestTaskContextImpl {
     taskContext.setStartingOffset(ssp, "123");
     verify(offsetManager).setStartingOffset(TASK_NAME, ssp, "123");
   }
+
+  /**
+   * Given a registered object, fetchObject should get it. If an object is not registered at a key, then fetchObject
+   * should return null.
+   */
+  @Test
+  public void testRegisterAndFetchObject() {
+    String value = "hello world";
+    taskContext.registerObject("key", value);
+    assertEquals(value, taskContext.fetchObject("key"));
+    assertNull(taskContext.fetchObject("not a key"));
+  }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/samza/blob/9d2d49e9/samza-core/src/test/java/org/apache/samza/execution/TestJobNodeConfigurationGenerator.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/execution/TestJobNodeConfigurationGenerator.java b/samza-core/src/test/java/org/apache/samza/execution/TestJobNodeConfigurationGenerator.java
index 51a9523..4618e52 100644
--- a/samza-core/src/test/java/org/apache/samza/execution/TestJobNodeConfigurationGenerator.java
+++ b/samza-core/src/test/java/org/apache/samza/execution/TestJobNodeConfigurationGenerator.java
@@ -19,14 +19,6 @@
 package org.apache.samza.execution;
 
 import com.google.common.base.Joiner;
-import java.util.ArrayList;
-import java.util.Base64;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.stream.Collectors;
 import org.apache.samza.application.StreamApplicationDescriptorImpl;
 import org.apache.samza.application.TaskApplicationDescriptorImpl;
 import org.apache.samza.config.Config;
@@ -36,7 +28,7 @@ import org.apache.samza.config.MapConfig;
 import org.apache.samza.config.SerializerConfig;
 import org.apache.samza.config.TaskConfig;
 import org.apache.samza.config.TaskConfigJava;
-import org.apache.samza.container.SamzaContainerContext;
+import org.apache.samza.context.Context;
 import org.apache.samza.operators.BaseTableDescriptor;
 import org.apache.samza.operators.KV;
 import org.apache.samza.operators.TableDescriptor;
@@ -52,9 +44,17 @@ import org.apache.samza.table.Table;
 import org.apache.samza.table.TableProvider;
 import org.apache.samza.table.TableProviderFactory;
 import org.apache.samza.table.TableSpec;
-import org.apache.samza.task.TaskContext;
 import org.junit.Test;
 
+import java.util.ArrayList;
+import java.util.Base64;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 import static org.mockito.Mockito.mock;
@@ -445,7 +445,7 @@ public class TestJobNodeConfigurationGenerator extends ExecutionPlannerTestBase
     }
 
     @Override
-    public void init(SamzaContainerContext containerContext, TaskContext taskContext) {
+    public void init(Context context) {
 
     }
 

http://git-wip-us.apache.org/repos/asf/samza/blob/9d2d49e9/samza-core/src/test/java/org/apache/samza/operators/TestJoinOperator.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/operators/TestJoinOperator.java b/samza-core/src/test/java/org/apache/samza/operators/TestJoinOperator.java
index 6fa9ed1..1315912 100644
--- a/samza-core/src/test/java/org/apache/samza/operators/TestJoinOperator.java
+++ b/samza-core/src/test/java/org/apache/samza/operators/TestJoinOperator.java
@@ -19,13 +19,14 @@
 package org.apache.samza.operators;
 
 import com.google.common.collect.ImmutableSet;
-
 import org.apache.samza.Partition;
 import org.apache.samza.SamzaException;
 import org.apache.samza.application.StreamApplicationDescriptorImpl;
 import org.apache.samza.config.Config;
 import org.apache.samza.config.MapConfig;
-import org.apache.samza.container.TaskContextImpl;
+import org.apache.samza.context.Context;
+import org.apache.samza.context.MockContext;
+import org.apache.samza.job.model.TaskModel;
 import org.apache.samza.metrics.MetricsRegistryMap;
 import org.apache.samza.operators.descriptors.GenericInputDescriptor;
 import org.apache.samza.operators.descriptors.GenericSystemDescriptor;
@@ -40,7 +41,6 @@ import org.apache.samza.system.SystemStream;
 import org.apache.samza.system.SystemStreamPartition;
 import org.apache.samza.task.MessageCollector;
 import org.apache.samza.task.StreamOperatorTask;
-import org.apache.samza.task.TaskContext;
 import org.apache.samza.task.TaskCoordinator;
 import org.apache.samza.testUtils.StreamTestUtils;
 import org.apache.samza.testUtils.TestClock;
@@ -56,10 +56,10 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
-import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.assertEquals;
-import static org.mockito.Matchers.eq;
+import static org.junit.Assert.assertTrue;
 import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.eq;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
@@ -304,22 +304,23 @@ public class TestJoinOperator {
     mapConfig.put("job.id", "jobId");
     StreamTestUtils.addStreamConfigs(mapConfig, "inStream", "insystem", "instream");
     StreamTestUtils.addStreamConfigs(mapConfig, "inStream2", "insystem", "instream2");
-    Config config = new MapConfig(mapConfig);
-    TaskContextImpl taskContext = mock(TaskContextImpl.class);
-    when(taskContext.getSystemStreamPartitions()).thenReturn(ImmutableSet
+    Context context = new MockContext(new MapConfig(mapConfig));
+    TaskModel taskModel = mock(TaskModel.class);
+    when(taskModel.getSystemStreamPartitions()).thenReturn(ImmutableSet
         .of(new SystemStreamPartition("insystem", "instream", new Partition(0)),
             new SystemStreamPartition("insystem", "instream2", new Partition(0))));
-    when(taskContext.getMetricsRegistry()).thenReturn(new MetricsRegistryMap());
+    when(context.getTaskContext().getTaskModel()).thenReturn(taskModel);
+    when(context.getTaskContext().getTaskMetricsRegistry()).thenReturn(new MetricsRegistryMap());
     // need to return different stores for left and right side
     IntegerSerde integerSerde = new IntegerSerde();
     TimestampedValueSerde timestampedValueSerde = new TimestampedValueSerde(new KVSerde(integerSerde, integerSerde));
-    when(taskContext.getStore(eq("jobName-jobId-join-j1-L")))
+    when(context.getTaskContext().getStore(eq("jobName-jobId-join-j1-L")))
         .thenReturn(new TestInMemoryStore(integerSerde, timestampedValueSerde));
-    when(taskContext.getStore(eq("jobName-jobId-join-j1-R")))
+    when(context.getTaskContext().getStore(eq("jobName-jobId-join-j1-R")))
         .thenReturn(new TestInMemoryStore(integerSerde, timestampedValueSerde));
 
-    StreamOperatorTask sot = new StreamOperatorTask(graphSpec.getOperatorSpecGraph(), graphSpec.getContextManager(), clock);
-    sot.init(config, taskContext);
+    StreamOperatorTask sot = new StreamOperatorTask(graphSpec.getOperatorSpecGraph(), clock);
+    sot.init(context);
     return sot;
   }
 
@@ -357,7 +358,7 @@ public class TestJoinOperator {
     private int numCloseCalls = 0;
 
     @Override
-    public void init(Config config, TaskContext context) {
+    public void init(Context context) {
       numInitCalls++;
     }
 

http://git-wip-us.apache.org/repos/asf/samza/blob/9d2d49e9/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImpl.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImpl.java b/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImpl.java
index 6d12d99..0ff2e0d 100644
--- a/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImpl.java
+++ b/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImpl.java
@@ -21,9 +21,9 @@ package org.apache.samza.operators.impl;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.Set;
-
-import org.apache.samza.config.Config;
-import org.apache.samza.container.TaskContextImpl;
+import org.apache.samza.context.Context;
+import org.apache.samza.context.MockContext;
+import org.apache.samza.job.model.TaskModel;
 import org.apache.samza.metrics.Counter;
 import org.apache.samza.metrics.MetricsRegistryMap;
 import org.apache.samza.metrics.ReadableMetricsRegistry;
@@ -32,8 +32,8 @@ import org.apache.samza.operators.functions.ScheduledFunction;
 import org.apache.samza.operators.functions.WatermarkFunction;
 import org.apache.samza.operators.spec.OperatorSpec;
 import org.apache.samza.task.MessageCollector;
-import org.apache.samza.task.TaskContext;
 import org.apache.samza.task.TaskCoordinator;
+import org.junit.Before;
 import org.junit.Test;
 
 import static org.mockito.Matchers.anyLong;
@@ -46,14 +46,20 @@ import static org.mockito.Mockito.when;
 
 
 public class TestOperatorImpl {
+  private Context context;
+
+  @Before
+  public void setup() {
+    this.context = new MockContext();
+    when(this.context.getTaskContext().getTaskMetricsRegistry()).thenReturn(new MetricsRegistryMap());
+    when(this.context.getTaskContext().getTaskModel()).thenReturn(mock(TaskModel.class));
+  }
 
   @Test(expected = IllegalStateException.class)
   public void testMultipleInitShouldThrow() {
     OperatorImpl<Object, Object> opImpl = new TestOpImpl(mock(Object.class));
-    TaskContextImpl mockTaskContext = mock(TaskContextImpl.class);
-    when(mockTaskContext.getMetricsRegistry()).thenReturn(new MetricsRegistryMap());
-    opImpl.init(mock(Config.class), mockTaskContext);
-    opImpl.init(mock(Config.class), mockTaskContext);
+    opImpl.init(this.context);
+    opImpl.init(this.context);
   }
 
   @Test(expected = IllegalStateException.class)
@@ -64,24 +70,21 @@ public class TestOperatorImpl {
 
   @Test
   public void testOnMessagePropagatesResults() {
-    TaskContextImpl mockTaskContext = mock(TaskContextImpl.class);
-    when(mockTaskContext.getMetricsRegistry()).thenReturn(new MetricsRegistryMap());
-
     Object mockTestOpImplOutput = mock(Object.class);
     OperatorImpl<Object, Object> opImpl = new TestOpImpl(mockTestOpImplOutput);
-    opImpl.init(mock(Config.class), mockTaskContext);
+    opImpl.init(this.context);
 
     // register a couple of operators
     OperatorImpl mockNextOpImpl1 = mock(OperatorImpl.class);
     when(mockNextOpImpl1.getOperatorSpec()).thenReturn(new TestOpSpec());
     when(mockNextOpImpl1.handleMessage(anyObject(), anyObject(), anyObject())).thenReturn(Collections.emptyList());
-    mockNextOpImpl1.init(mock(Config.class), mockTaskContext);
+    mockNextOpImpl1.init(this.context);
     opImpl.registerNextOperator(mockNextOpImpl1);
 
     OperatorImpl mockNextOpImpl2 = mock(OperatorImpl.class);
     when(mockNextOpImpl2.getOperatorSpec()).thenReturn(new TestOpSpec());
     when(mockNextOpImpl2.handleMessage(anyObject(), anyObject(), anyObject())).thenReturn(Collections.emptyList());
-    mockNextOpImpl2.init(mock(Config.class), mockTaskContext);
+    mockNextOpImpl2.init(this.context);
     opImpl.registerNextOperator(mockNextOpImpl2);
 
     // send a message to this operator
@@ -96,9 +99,8 @@ public class TestOperatorImpl {
 
   @Test
   public void testOnMessageUpdatesMetrics() {
-    TaskContextImpl mockTaskContext = mock(TaskContextImpl.class);
     ReadableMetricsRegistry mockMetricsRegistry = mock(ReadableMetricsRegistry.class);
-    when(mockTaskContext.getMetricsRegistry()).thenReturn(mockMetricsRegistry);
+    when(this.context.getTaskContext().getTaskMetricsRegistry()).thenReturn(mockMetricsRegistry);
     Counter mockCounter = mock(Counter.class);
     Timer mockTimer = mock(Timer.class);
     when(mockMetricsRegistry.newCounter(anyString(), anyString())).thenReturn(mockCounter);
@@ -106,7 +108,7 @@ public class TestOperatorImpl {
 
     Object mockTestOpImplOutput = mock(Object.class);
     OperatorImpl<Object, Object> opImpl = new TestOpImpl(mockTestOpImplOutput);
-    opImpl.init(mock(Config.class), mockTaskContext);
+    opImpl.init(this.context);
 
     // send a message to this operator
     MessageCollector mockCollector = mock(MessageCollector.class);
@@ -120,24 +122,21 @@ public class TestOperatorImpl {
 
   @Test
   public void testOnTimerPropagatesResultsAndTimer() {
-    TaskContextImpl mockTaskContext = mock(TaskContextImpl.class);
-    when(mockTaskContext.getMetricsRegistry()).thenReturn(new MetricsRegistryMap());
-
     Object mockTestOpImplOutput = mock(Object.class);
     OperatorImpl<Object, Object> opImpl = new TestOpImpl(mockTestOpImplOutput);
-    opImpl.init(mock(Config.class), mockTaskContext);
+    opImpl.init(this.context);
 
     // register a couple of operators
     OperatorImpl mockNextOpImpl1 = mock(OperatorImpl.class);
     when(mockNextOpImpl1.getOperatorSpec()).thenReturn(new TestOpSpec());
     when(mockNextOpImpl1.handleMessage(anyObject(), anyObject(), anyObject())).thenReturn(Collections.emptyList());
-    mockNextOpImpl1.init(mock(Config.class), mockTaskContext);
+    mockNextOpImpl1.init(this.context);
     opImpl.registerNextOperator(mockNextOpImpl1);
 
     OperatorImpl mockNextOpImpl2 = mock(OperatorImpl.class);
     when(mockNextOpImpl2.getOperatorSpec()).thenReturn(new TestOpSpec());
     when(mockNextOpImpl2.handleMessage(anyObject(), anyObject(), anyObject())).thenReturn(Collections.emptyList());
-    mockNextOpImpl2.init(mock(Config.class), mockTaskContext);
+    mockNextOpImpl2.init(this.context);
     opImpl.registerNextOperator(mockNextOpImpl2);
 
     // send a timer tick to this operator
@@ -156,9 +155,8 @@ public class TestOperatorImpl {
 
   @Test
   public void testOnTimerUpdatesMetrics() {
-    TaskContextImpl mockTaskContext = mock(TaskContextImpl.class);
     ReadableMetricsRegistry mockMetricsRegistry = mock(ReadableMetricsRegistry.class);
-    when(mockTaskContext.getMetricsRegistry()).thenReturn(mockMetricsRegistry);
+    when(this.context.getTaskContext().getTaskMetricsRegistry()).thenReturn(mockMetricsRegistry);
     Counter mockMessageCounter = mock(Counter.class);
     Timer mockTimer = mock(Timer.class);
     when(mockMetricsRegistry.newCounter(anyString(), anyString())).thenReturn(mockMessageCounter);
@@ -166,7 +164,7 @@ public class TestOperatorImpl {
 
     Object mockTestOpImplOutput = mock(Object.class);
     OperatorImpl<Object, Object> opImpl = new TestOpImpl(mockTestOpImplOutput);
-    opImpl.init(mock(Config.class), mockTaskContext);
+    opImpl.init(this.context);
 
     // send a message to this operator
     MessageCollector mockCollector = mock(MessageCollector.class);
@@ -188,7 +186,7 @@ public class TestOperatorImpl {
     }
 
     @Override
-    protected void handleInit(Config config, TaskContext context) {}
+    protected void handleInit(Context context) {}
 
     @Override
     public Collection<Object> handleMessage(Object message,

http://git-wip-us.apache.org/repos/asf/samza/blob/9d2d49e9/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImplGraph.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImplGraph.java b/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImplGraph.java
index 3abd502..d760805 100644
--- a/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImplGraph.java
+++ b/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImplGraph.java
@@ -21,27 +21,16 @@ package org.apache.samza.operators.impl;
 
 import com.google.common.collect.HashMultimap;
 import com.google.common.collect.Multimap;
-import java.io.Serializable;
-import java.time.Duration;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.function.BiFunction;
-import java.util.function.Function;
 import org.apache.samza.Partition;
 import org.apache.samza.application.StreamApplicationDescriptorImpl;
 import org.apache.samza.config.Config;
 import org.apache.samza.config.JobConfig;
 import org.apache.samza.config.MapConfig;
 import org.apache.samza.config.StreamConfig;
-import org.apache.samza.container.SamzaContainerContext;
-import org.apache.samza.container.TaskContextImpl;
 import org.apache.samza.container.TaskName;
+import org.apache.samza.context.Context;
+import org.apache.samza.context.MockContext;
+import org.apache.samza.context.TaskContextImpl;
 import org.apache.samza.job.model.ContainerModel;
 import org.apache.samza.job.model.JobModel;
 import org.apache.samza.job.model.TaskModel;
@@ -67,15 +56,28 @@ import org.apache.samza.system.IncomingMessageEnvelope;
 import org.apache.samza.system.SystemStream;
 import org.apache.samza.system.SystemStreamPartition;
 import org.apache.samza.task.MessageCollector;
-import org.apache.samza.task.TaskContext;
 import org.apache.samza.task.TaskCoordinator;
 import org.apache.samza.testUtils.StreamTestUtils;
 import org.apache.samza.util.Clock;
 import org.apache.samza.util.SystemClock;
 import org.apache.samza.util.TimestampedValue;
 import org.junit.After;
+import org.junit.Before;
 import org.junit.Test;
 
+import java.io.Serializable;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.function.BiFunction;
+import java.util.function.Function;
+
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotSame;
 import static org.junit.Assert.assertTrue;
@@ -84,7 +86,6 @@ import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
 public class TestOperatorImplGraph {
-
   private void addOperatorRecursively(HashSet<OperatorImpl> s, OperatorImpl op) {
     List<OperatorImpl> operators = new ArrayList<>();
     operators.add(op);
@@ -193,25 +194,39 @@ public class TestOperatorImplGraph {
     }
 
     @Override
-    public void init(Config config, TaskContext context) {
-      if (perTaskFunctionMap.get(context.getTaskName()) == null) {
-        perTaskFunctionMap.put(context.getTaskName(), new HashMap<String, BaseTestFunction>() { { this.put(opId, BaseTestFunction.this); } });
+    public void init(Context context) {
+      TaskName taskName = context.getTaskContext().getTaskModel().getTaskName();
+      if (perTaskFunctionMap.get(taskName) == null) {
+        perTaskFunctionMap.put(taskName, new HashMap<String, BaseTestFunction>() { { this.put(opId, BaseTestFunction.this); } });
       } else {
-        if (perTaskFunctionMap.get(context.getTaskName()).containsKey(opId)) {
+        if (perTaskFunctionMap.get(taskName).containsKey(opId)) {
           throw new IllegalStateException(String.format("Multiple init called for op %s in the same task instance %s", opId, this.taskName.getTaskName()));
         }
-        perTaskFunctionMap.get(context.getTaskName()).put(opId, this);
+        perTaskFunctionMap.get(taskName).put(opId, this);
       }
-      if (perTaskInitList.get(context.getTaskName()) == null) {
-        perTaskInitList.put(context.getTaskName(), new ArrayList<String>() { { this.add(opId); } });
+      if (perTaskInitList.get(taskName) == null) {
+        perTaskInitList.put(taskName, new ArrayList<String>() { { this.add(opId); } });
       } else {
-        perTaskInitList.get(context.getTaskName()).add(opId);
+        perTaskInitList.get(taskName).add(opId);
       }
-      this.taskName = context.getTaskName();
+      this.taskName = taskName;
       this.numInitCalled++;
     }
   }
 
+  private Context context;
+
+  @Before
+  public void setup() {
+    this.context = new MockContext();
+    // individual tests can override this config if necessary
+    when(this.context.getJobContext().getConfig()).thenReturn(mock(Config.class));
+    TaskModel taskModel = mock(TaskModel.class);
+    when(taskModel.getTaskName()).thenReturn(new TaskName("task 0"));
+    when(this.context.getTaskContext().getTaskModel()).thenReturn(taskModel);
+    when(this.context.getTaskContext().getTaskMetricsRegistry()).thenReturn(new MetricsRegistryMap());
+  }
+
   @After
   public void tearDown() {
     BaseTestFunction.reset();
@@ -220,8 +235,7 @@ public class TestOperatorImplGraph {
   @Test
   public void testEmptyChain() {
     StreamApplicationDescriptorImpl graphSpec = new StreamApplicationDescriptorImpl(appDesc -> { }, mock(Config.class));
-    OperatorImplGraph opGraph =
-        new OperatorImplGraph(graphSpec.getOperatorSpecGraph(), mock(Config.class), mock(TaskContextImpl.class), mock(Clock.class));
+    OperatorImplGraph opGraph = new OperatorImplGraph(graphSpec.getOperatorSpecGraph(), context, mock(Clock.class));
     assertEquals(0, opGraph.getAllInputOperators().size());
   }
 
@@ -242,6 +256,7 @@ public class TestOperatorImplGraph {
     StreamTestUtils.addStreamConfigs(configs, inputStreamId, inputSystem, inputPhysicalName);
     StreamTestUtils.addStreamConfigs(configs, outputStreamId, outputSystem, outputPhysicalName);
     Config config = new MapConfig(configs);
+    when(this.context.getJobContext().getConfig()).thenReturn(config);
 
     StreamApplicationDescriptorImpl graphSpec = new StreamApplicationDescriptorImpl(appDesc -> {
         GenericSystemDescriptor sd = new GenericSystemDescriptor(inputSystem, "mockFactoryClass");
@@ -256,11 +271,8 @@ public class TestOperatorImplGraph {
             .sendTo(outputStream);
       }, config);
 
-    TaskContextImpl mockTaskContext = mock(TaskContextImpl.class);
-    when(mockTaskContext.getMetricsRegistry()).thenReturn(new MetricsRegistryMap());
-    when(mockTaskContext.getTaskName()).thenReturn(new TaskName("task 0"));
     OperatorImplGraph opImplGraph =
-        new OperatorImplGraph(graphSpec.getOperatorSpecGraph(), config, mockTaskContext, mock(Clock.class));
+        new OperatorImplGraph(graphSpec.getOperatorSpecGraph(), this.context, mock(Clock.class));
 
     InputOperatorImpl inputOpImpl = opImplGraph.getInputOperator(new SystemStream(inputSystem, inputPhysicalName));
     assertEquals(1, inputOpImpl.registeredOperators.size());
@@ -296,6 +308,7 @@ public class TestOperatorImplGraph {
     StreamTestUtils.addStreamConfigs(configs, inputStreamId, inputSystem, inputPhysicalName);
     StreamTestUtils.addStreamConfigs(configs, outputStreamId, outputSystem, outputPhysicalName);
     Config config = new MapConfig(configs);
+    when(this.context.getJobContext().getConfig()).thenReturn(config);
 
     StreamApplicationDescriptorImpl graphSpec = new StreamApplicationDescriptorImpl(appDesc -> {
         GenericSystemDescriptor isd = new GenericSystemDescriptor(inputSystem, "mockFactoryClass");
@@ -312,21 +325,15 @@ public class TestOperatorImplGraph {
             .sendTo(outputStream);
       }, config);
 
-    TaskContextImpl mockTaskContext = mock(TaskContextImpl.class);
-    when(mockTaskContext.getMetricsRegistry()).thenReturn(new MetricsRegistryMap());
-    when(mockTaskContext.getTaskName()).thenReturn(new TaskName("task 0"));
     JobModel jobModel = mock(JobModel.class);
     ContainerModel containerModel = mock(ContainerModel.class);
     TaskModel taskModel = mock(TaskModel.class);
     when(jobModel.getContainers()).thenReturn(Collections.singletonMap("0", containerModel));
     when(containerModel.getTasks()).thenReturn(Collections.singletonMap(new TaskName("task 0"), taskModel));
     when(taskModel.getSystemStreamPartitions()).thenReturn(Collections.emptySet());
-    when(mockTaskContext.getJobModel()).thenReturn(jobModel);
-    SamzaContainerContext containerContext =
-        new SamzaContainerContext("0", config, Collections.singleton(new TaskName("task 0")), new MetricsRegistryMap());
-    when(mockTaskContext.getSamzaContainerContext()).thenReturn(containerContext);
+    when(((TaskContextImpl) this.context.getTaskContext()).getJobModel()).thenReturn(jobModel);
     OperatorImplGraph opImplGraph =
-        new OperatorImplGraph(graphSpec.getOperatorSpecGraph(), config, mockTaskContext, mock(Clock.class));
+        new OperatorImplGraph(graphSpec.getOperatorSpecGraph(), this.context, mock(Clock.class));
 
     InputOperatorImpl inputOpImpl = opImplGraph.getInputOperator(new SystemStream(inputSystem, inputPhysicalName));
     assertEquals(1, inputOpImpl.registeredOperators.size());
@@ -352,6 +359,7 @@ public class TestOperatorImplGraph {
     HashMap<String, String> configMap = new HashMap<>();
     StreamTestUtils.addStreamConfigs(configMap, inputStreamId, inputSystem, inputPhysicalName);
     Config config = new MapConfig(configMap);
+    when(this.context.getJobContext().getConfig()).thenReturn(config);
     StreamApplicationDescriptorImpl graphSpec = new StreamApplicationDescriptorImpl(appDesc -> {
         GenericSystemDescriptor sd = new GenericSystemDescriptor(inputSystem, "mockFactoryClass");
         GenericInputDescriptor inputDescriptor = sd.getInputDescriptor(inputStreamId, mock(Serde.class));
@@ -360,10 +368,8 @@ public class TestOperatorImplGraph {
         inputStream.map(mock(MapFunction.class));
       }, config);
 
-    TaskContextImpl mockTaskContext = mock(TaskContextImpl.class);
-    when(mockTaskContext.getMetricsRegistry()).thenReturn(new MetricsRegistryMap());
     OperatorImplGraph opImplGraph =
-        new OperatorImplGraph(graphSpec.getOperatorSpecGraph(), config, mockTaskContext, mock(Clock.class));
+        new OperatorImplGraph(graphSpec.getOperatorSpecGraph(), this.context, mock(Clock.class));
 
     InputOperatorImpl inputOpImpl = opImplGraph.getInputOperator(new SystemStream(inputSystem, inputPhysicalName));
     assertEquals(2, inputOpImpl.registeredOperators.size());
@@ -377,10 +383,6 @@ public class TestOperatorImplGraph {
   public void testMergeChain() {
     String inputStreamId = "input";
     String inputSystem = "input-system";
-    String inputPhysicalName = "input-stream";
-    HashMap<String, String> configs = new HashMap<>();
-    StreamTestUtils.addStreamConfigs(configs, inputStreamId, inputSystem, inputPhysicalName);
-    Config config = new MapConfig(configs);
     StreamApplicationDescriptorImpl graphSpec = new StreamApplicationDescriptorImpl(appDesc -> {
         GenericSystemDescriptor sd = new GenericSystemDescriptor(inputSystem, "mockFactoryClass");
         GenericInputDescriptor inputDescriptor = sd.getInputDescriptor(inputStreamId, mock(Serde.class));
@@ -390,13 +392,14 @@ public class TestOperatorImplGraph {
         stream1.merge(Collections.singleton(stream2))
             .map(new TestMapFunction<Object, Object>("test-map-1", (Function & Serializable) m -> m));
       }, mock(Config.class));
-    TaskContextImpl mockTaskContext = mock(TaskContextImpl.class);
+
     TaskName mockTaskName = mock(TaskName.class);
-    when(mockTaskContext.getTaskName()).thenReturn(mockTaskName);
-    when(mockTaskContext.getMetricsRegistry()).thenReturn(new MetricsRegistryMap());
+    TaskModel taskModel = mock(TaskModel.class);
+    when(taskModel.getTaskName()).thenReturn(mockTaskName);
+    when(this.context.getTaskContext().getTaskModel()).thenReturn(taskModel);
 
     OperatorImplGraph opImplGraph =
-        new OperatorImplGraph(graphSpec.getOperatorSpecGraph(), mock(Config.class), mockTaskContext, mock(Clock.class));
+        new OperatorImplGraph(graphSpec.getOperatorSpecGraph(), this.context, mock(Clock.class));
 
     Set<OperatorImpl> opSet = opImplGraph.getAllInputOperators().stream().collect(HashSet::new,
         (s, op) -> addOperatorRecursively(s, op), HashSet::addAll);
@@ -423,6 +426,7 @@ public class TestOperatorImplGraph {
     StreamTestUtils.addStreamConfigs(configs, inputStreamId1, inputSystem, inputPhysicalName1);
     StreamTestUtils.addStreamConfigs(configs, inputStreamId2, inputSystem, inputPhysicalName2);
     Config config = new MapConfig(configs);
+    when(this.context.getJobContext().getConfig()).thenReturn(config);
 
     Integer joinKey = new Integer(1);
     Function<Object, Integer> keyFn = (Function & Serializable) m -> joinKey;
@@ -441,15 +445,16 @@ public class TestOperatorImplGraph {
       }, config);
 
     TaskName mockTaskName = mock(TaskName.class);
-    TaskContextImpl mockTaskContext = mock(TaskContextImpl.class);
-    when(mockTaskContext.getTaskName()).thenReturn(mockTaskName);
-    when(mockTaskContext.getMetricsRegistry()).thenReturn(new MetricsRegistryMap());
+    TaskModel taskModel = mock(TaskModel.class);
+    when(taskModel.getTaskName()).thenReturn(mockTaskName);
+    when(this.context.getTaskContext().getTaskModel()).thenReturn(taskModel);
+
     KeyValueStore mockLeftStore = mock(KeyValueStore.class);
-    when(mockTaskContext.getStore(eq("jobName-jobId-join-j1-L"))).thenReturn(mockLeftStore);
+    when(this.context.getTaskContext().getStore(eq("jobName-jobId-join-j1-L"))).thenReturn(mockLeftStore);
     KeyValueStore mockRightStore = mock(KeyValueStore.class);
-    when(mockTaskContext.getStore(eq("jobName-jobId-join-j1-R"))).thenReturn(mockRightStore);
+    when(this.context.getTaskContext().getStore(eq("jobName-jobId-join-j1-R"))).thenReturn(mockRightStore);
     OperatorImplGraph opImplGraph =
-        new OperatorImplGraph(graphSpec.getOperatorSpecGraph(), config, mockTaskContext, mock(Clock.class));
+        new OperatorImplGraph(graphSpec.getOperatorSpecGraph(), this.context, mock(Clock.class));
 
     // verify that join function is initialized once.
     assertEquals(TestJoinFunction.getInstanceByTaskName(mockTaskName, "jobName-jobId-join-j1").numInitCalled, 1);
@@ -491,10 +496,12 @@ public class TestOperatorImplGraph {
     String inputStreamId2 = "input2";
     String inputSystem = "input-system";
     Config mockConfig = mock(Config.class);
+
     TaskName mockTaskName = mock(TaskName.class);
-    TaskContextImpl mockContext = mock(TaskContextImpl.class);
-    when(mockContext.getTaskName()).thenReturn(mockTaskName);
-    when(mockContext.getMetricsRegistry()).thenReturn(new MetricsRegistryMap());
+    TaskModel taskModel = mock(TaskModel.class);
+    when(taskModel.getTaskName()).thenReturn(mockTaskName);
+    when(this.context.getTaskContext().getTaskModel()).thenReturn(taskModel);
+
     StreamApplicationDescriptorImpl graphSpec = new StreamApplicationDescriptorImpl(appDesc -> {
         GenericSystemDescriptor sd = new GenericSystemDescriptor(inputSystem, "mockFactoryClass");
         GenericInputDescriptor inputDescriptor1 = sd.getInputDescriptor(inputStreamId1, mock(Serde.class));
@@ -510,7 +517,7 @@ public class TestOperatorImplGraph {
             .map(new TestMapFunction<Object, Object>("4", mapFn));
       }, mockConfig);
 
-    OperatorImplGraph opImplGraph = new OperatorImplGraph(graphSpec.getOperatorSpecGraph(), mockConfig, mockContext, SystemClock.instance());
+    OperatorImplGraph opImplGraph = new OperatorImplGraph(graphSpec.getOperatorSpecGraph(), this.context, SystemClock.instance());
 
     List<String> initializedOperators = BaseTestFunction.getInitListByTaskName(mockTaskName);
 
@@ -541,6 +548,7 @@ public class TestOperatorImplGraph {
     StreamTestUtils.addStreamConfigs(configs, streamId0, system, streamId0);
     StreamTestUtils.addStreamConfigs(configs, streamId1, system, streamId1);
     Config config = new MapConfig(configs);
+    when(this.context.getJobContext().getConfig()).thenReturn(config);
 
     SystemStreamPartition ssp0 = new SystemStreamPartition(system, streamId0, new Partition(0));
     SystemStreamPartition ssp1 = new SystemStreamPartition(system, streamId0, new Partition(1));
@@ -590,6 +598,7 @@ public class TestOperatorImplGraph {
     StreamTestUtils.addStreamConfigs(configs, outputStreamId1, outputSystem, outputStreamId1);
     StreamTestUtils.addStreamConfigs(configs, outputStreamId2, outputSystem, outputStreamId2);
     Config config = new MapConfig(configs);
+    when(this.context.getJobContext().getConfig()).thenReturn(config);
 
     StreamApplicationDescriptorImpl graphSpec = new StreamApplicationDescriptorImpl(appDesc -> {
         GenericSystemDescriptor isd = new GenericSystemDescriptor(inputSystem, "mockFactoryClass");
@@ -640,14 +649,6 @@ public class TestOperatorImplGraph {
     String inputSystem1 = "system1";
     String inputSystem2 = "system2";
 
-    HashMap<String, String> configs = new HashMap<>();
-    configs.put(JobConfig.JOB_NAME(), "test-app");
-    configs.put(JobConfig.JOB_DEFAULT_SYSTEM(), inputSystem1);
-    StreamTestUtils.addStreamConfigs(configs, inputStreamId1, inputSystem1, inputStreamId1);
-    StreamTestUtils.addStreamConfigs(configs, inputStreamId2, inputSystem2, inputStreamId2);
-    StreamTestUtils.addStreamConfigs(configs, inputStreamId3, inputSystem2, inputStreamId3);
-    Config config = new MapConfig(configs);
-
     SystemStream input1 = new SystemStream("system1", "intput1");
     SystemStream input2 = new SystemStream("system2", "intput2");
     SystemStream input3 = new SystemStream("system2", "intput3");

http://git-wip-us.apache.org/repos/asf/samza/blob/9d2d49e9/samza-core/src/test/java/org/apache/samza/operators/impl/TestSinkOperatorImpl.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/operators/impl/TestSinkOperatorImpl.java b/samza-core/src/test/java/org/apache/samza/operators/impl/TestSinkOperatorImpl.java
index dc94e36..dfd8657 100644
--- a/samza-core/src/test/java/org/apache/samza/operators/impl/TestSinkOperatorImpl.java
+++ b/samza-core/src/test/java/org/apache/samza/operators/impl/TestSinkOperatorImpl.java
@@ -18,12 +18,10 @@
  */
 package org.apache.samza.operators.impl;
 
-import org.apache.samza.config.Config;
 import org.apache.samza.operators.data.TestOutputMessageEnvelope;
 import org.apache.samza.operators.functions.SinkFunction;
 import org.apache.samza.operators.spec.SinkOperatorSpec;
 import org.apache.samza.task.MessageCollector;
-import org.apache.samza.task.TaskContext;
 import org.apache.samza.task.TaskCoordinator;
 import org.junit.Test;
 
@@ -69,9 +67,6 @@ public class TestSinkOperatorImpl {
   private SinkOperatorImpl createSinkOperator(SinkFunction<TestOutputMessageEnvelope> sinkFn) {
     SinkOperatorSpec<TestOutputMessageEnvelope> sinkOp = mock(SinkOperatorSpec.class);
     when(sinkOp.getSinkFn()).thenReturn(sinkFn);
-
-    Config mockConfig = mock(Config.class);
-    TaskContext mockContext = mock(TaskContext.class);
-    return new SinkOperatorImpl<>(sinkOp, mockConfig, mockContext);
+    return new SinkOperatorImpl<>(sinkOp);
   }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/9d2d49e9/samza-core/src/test/java/org/apache/samza/operators/impl/TestStreamOperatorImpl.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/operators/impl/TestStreamOperatorImpl.java b/samza-core/src/test/java/org/apache/samza/operators/impl/TestStreamOperatorImpl.java
index 873cd3c..ae05305 100644
--- a/samza-core/src/test/java/org/apache/samza/operators/impl/TestStreamOperatorImpl.java
+++ b/samza-core/src/test/java/org/apache/samza/operators/impl/TestStreamOperatorImpl.java
@@ -18,13 +18,11 @@
  */
 package org.apache.samza.operators.impl;
 
-import org.apache.samza.config.Config;
 import org.apache.samza.operators.data.TestMessageEnvelope;
 import org.apache.samza.operators.data.TestOutputMessageEnvelope;
 import org.apache.samza.operators.functions.FlatMapFunction;
 import org.apache.samza.operators.spec.StreamOperatorSpec;
 import org.apache.samza.task.MessageCollector;
-import org.apache.samza.task.TaskContext;
 import org.apache.samza.task.TaskCoordinator;
 import org.junit.Test;
 
@@ -45,8 +43,6 @@ public class TestStreamOperatorImpl {
     StreamOperatorSpec<TestMessageEnvelope, TestOutputMessageEnvelope> mockOp = mock(StreamOperatorSpec.class);
     FlatMapFunction<TestMessageEnvelope, TestOutputMessageEnvelope> txfmFn = mock(FlatMapFunction.class);
     when(mockOp.getTransformFn()).thenReturn(txfmFn);
-    Config mockConfig = mock(Config.class);
-    TaskContext mockContext = mock(TaskContext.class);
     StreamOperatorImpl<TestMessageEnvelope, TestOutputMessageEnvelope> opImpl =
         new StreamOperatorImpl<>(mockOp);
     TestMessageEnvelope inMsg = mock(TestMessageEnvelope.class);
@@ -65,8 +61,6 @@ public class TestStreamOperatorImpl {
     StreamOperatorSpec<TestMessageEnvelope, TestOutputMessageEnvelope> mockOp = mock(StreamOperatorSpec.class);
     FlatMapFunction<TestMessageEnvelope, TestOutputMessageEnvelope> txfmFn = mock(FlatMapFunction.class);
     when(mockOp.getTransformFn()).thenReturn(txfmFn);
-    Config mockConfig = mock(Config.class);
-    TaskContext mockContext = mock(TaskContext.class);
 
     StreamOperatorImpl<TestMessageEnvelope, TestOutputMessageEnvelope> opImpl =
         new StreamOperatorImpl<>(mockOp);

http://git-wip-us.apache.org/repos/asf/samza/blob/9d2d49e9/samza-core/src/test/java/org/apache/samza/operators/impl/TestStreamTableJoinOperatorImpl.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/operators/impl/TestStreamTableJoinOperatorImpl.java b/samza-core/src/test/java/org/apache/samza/operators/impl/TestStreamTableJoinOperatorImpl.java
index d8b2e8d..9083495 100644
--- a/samza-core/src/test/java/org/apache/samza/operators/impl/TestStreamTableJoinOperatorImpl.java
+++ b/samza-core/src/test/java/org/apache/samza/operators/impl/TestStreamTableJoinOperatorImpl.java
@@ -18,10 +18,10 @@
  */
 package org.apache.samza.operators.impl;
 
-import java.util.Collection;
-
+import junit.framework.Assert;
 import org.apache.samza.SamzaException;
-import org.apache.samza.config.Config;
+import org.apache.samza.context.Context;
+import org.apache.samza.context.MockContext;
 import org.apache.samza.operators.KV;
 import org.apache.samza.operators.data.TestMessageEnvelope;
 import org.apache.samza.operators.functions.StreamTableJoinFunction;
@@ -29,11 +29,10 @@ import org.apache.samza.operators.spec.StreamTableJoinOperatorSpec;
 import org.apache.samza.table.ReadableTable;
 import org.apache.samza.table.TableSpec;
 import org.apache.samza.task.MessageCollector;
-import org.apache.samza.task.TaskContext;
 import org.apache.samza.task.TaskCoordinator;
 import org.junit.Test;
 
-import junit.framework.Assert;
+import java.util.Collection;
 
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
@@ -75,18 +74,16 @@ public class TestStreamTableJoinOperatorImpl {
             return record.getKey();
           }
         });
-    Config config = mock(Config.class);
     ReadableTable table = mock(ReadableTable.class);
     when(table.get("1")).thenReturn("r1");
     when(table.get("2")).thenReturn(null);
-    TaskContext mockTaskContext = mock(TaskContext.class);
-    when(mockTaskContext.getTable(tableId)).thenReturn(table);
+    Context context = new MockContext();
+    when(context.getTaskContext().getTable(tableId)).thenReturn(table);
 
     MessageCollector mockMessageCollector = mock(MessageCollector.class);
     TaskCoordinator mockTaskCoordinator = mock(TaskCoordinator.class);
 
-    StreamTableJoinOperatorImpl streamTableJoinOperator = new StreamTableJoinOperatorImpl(
-        mockJoinOpSpec, config, mockTaskContext);
+    StreamTableJoinOperatorImpl streamTableJoinOperator = new StreamTableJoinOperatorImpl(mockJoinOpSpec, context);
 
     // Table has the key
     Collection<TestMessageEnvelope> result;

http://git-wip-us.apache.org/repos/asf/samza/blob/9d2d49e9/samza-core/src/test/java/org/apache/samza/operators/impl/TestWindowOperator.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/operators/impl/TestWindowOperator.java b/samza-core/src/test/java/org/apache/samza/operators/impl/TestWindowOperator.java
index 7d468c9..20d5e25 100644
--- a/samza-core/src/test/java/org/apache/samza/operators/impl/TestWindowOperator.java
+++ b/samza-core/src/test/java/org/apache/samza/operators/impl/TestWindowOperator.java
@@ -30,13 +30,15 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import org.apache.samza.Partition;
-import org.apache.samza.application.StreamApplicationDescriptorImpl;
 import org.apache.samza.application.StreamApplication;
+import org.apache.samza.application.StreamApplicationDescriptorImpl;
 import org.apache.samza.config.Config;
-import org.apache.samza.config.JobConfig;
 import org.apache.samza.config.MapConfig;
-import org.apache.samza.container.TaskContextImpl;
 import org.apache.samza.container.TaskName;
+import org.apache.samza.context.Context;
+import org.apache.samza.context.MockContext;
+import org.apache.samza.context.TaskContextImpl;
+import org.apache.samza.job.model.TaskModel;
 import org.apache.samza.metrics.MetricsRegistryMap;
 import org.apache.samza.operators.KV;
 import org.apache.samza.operators.MessageStream;
@@ -67,8 +69,6 @@ import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 
-import static org.mockito.Matchers.anyString;
-import static org.mockito.Matchers.eq;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
@@ -77,41 +77,41 @@ import static org.mockito.Mockito.when;
 public class TestWindowOperator {
   private final TaskCoordinator taskCoordinator = mock(TaskCoordinator.class);
   private final List<Integer> integers = ImmutableList.of(1, 2, 1, 2, 1, 2, 1, 2, 3);
+  private Context context;
   private Config config;
-  private TaskContextImpl taskContext;
 
   @Before
-  public void setup() throws Exception {
-    config = mock(Config.class);
-    when(config.get(JobConfig.JOB_NAME())).thenReturn("jobName");
-    when(config.get(eq(JobConfig.JOB_ID()), anyString())).thenReturn("jobId");
-    taskContext = mock(TaskContextImpl.class);
+  public void setup() {
+    Map<String, String> configMap = new HashMap<>();
+    configMap.put("job.default.system", "kafka");
+    configMap.put("job.name", "jobName");
+    configMap.put("job.id", "jobId");
+    this.config = new MapConfig(configMap);
+
+    this.context = new MockContext();
+    when(this.context.getJobContext().getConfig()).thenReturn(this.config);
     Serde storeKeySerde = new TimeSeriesKeySerde(new IntegerSerde());
     Serde storeValSerde = KVSerde.of(new IntegerSerde(), new IntegerSerde());
 
-    when(taskContext.getSystemStreamPartitions()).thenReturn(ImmutableSet
+    TaskModel taskModel = mock(TaskModel.class);
+    when(taskModel.getSystemStreamPartitions()).thenReturn(ImmutableSet
         .of(new SystemStreamPartition("kafka", "integTestExecutionPlannerers", new Partition(0))));
-    when(taskContext.getMetricsRegistry()).thenReturn(new MetricsRegistryMap());
-    when(taskContext.getStore("jobName-jobId-window-w1"))
+    when(taskModel.getTaskName()).thenReturn(new TaskName("task 1"));
+    when(this.context.getTaskContext().getTaskModel()).thenReturn(taskModel);
+    when(this.context.getTaskContext().getTaskMetricsRegistry()).thenReturn(new MetricsRegistryMap());
+    when(this.context.getTaskContext().getStore("jobName-jobId-window-w1"))
         .thenReturn(new TestInMemoryStore<>(storeKeySerde, storeValSerde));
-
-    Map<String, String> mapConfig = new HashMap<>();
-    mapConfig.put("job.default.system", "kafka");
-    mapConfig.put("job.name", "jobName");
-    mapConfig.put("job.id", "jobId");
-    config = new MapConfig(mapConfig);
   }
 
   @Test
   public void testTumblingWindowsDiscardingMode() throws Exception {
-
     OperatorSpecGraph sgb = this.getKeyedTumblingWindowStreamGraph(AccumulationMode.DISCARDING,
         Duration.ofSeconds(1), Triggers.repeat(Triggers.count(2))).getOperatorSpecGraph();
     List<WindowPane<Integer, Collection<IntegerEnvelope>>> windowPanes = new ArrayList<>();
 
     TestClock testClock = new TestClock();
-    StreamOperatorTask task = new StreamOperatorTask(sgb, null, testClock);
-    task.init(config, taskContext);
+    StreamOperatorTask task = new StreamOperatorTask(sgb, testClock);
+    task.init(this.context);
     MessageCollector messageCollector =
         envelope -> windowPanes.add((WindowPane<Integer, Collection<IntegerEnvelope>>) envelope.getMessage());
     integers.forEach(n -> task.process(new IntegerEnvelope(n), messageCollector, taskCoordinator));
@@ -143,8 +143,8 @@ public class TestWindowOperator {
     List<WindowPane<Integer, Collection<IntegerEnvelope>>> windowPanes = new ArrayList<>();
 
     TestClock testClock = new TestClock();
-    StreamOperatorTask task = new StreamOperatorTask(sgb, null, testClock);
-    task.init(config, taskContext);
+    StreamOperatorTask task = new StreamOperatorTask(sgb, testClock);
+    task.init(this.context);
 
     MessageCollector messageCollector =
         envelope -> windowPanes.add((WindowPane<Integer, Collection<IntegerEnvelope>>) envelope.getMessage());
@@ -163,8 +163,7 @@ public class TestWindowOperator {
 
   @Test
   public void testTumblingAggregatingWindowsDiscardingMode() throws Exception {
-
-    when(taskContext.getStore("jobName-jobId-window-w1"))
+    when(this.context.getTaskContext().getStore("jobName-jobId-window-w1"))
         .thenReturn(new TestInMemoryStore<>(new TimeSeriesKeySerde(new IntegerSerde()), new IntegerSerde()));
 
     OperatorSpecGraph sgb = this.getAggregateTumblingWindowStreamGraph(AccumulationMode.DISCARDING,
@@ -172,8 +171,8 @@ public class TestWindowOperator {
     List<WindowPane<Integer, Integer>> windowPanes = new ArrayList<>();
 
     TestClock testClock = new TestClock();
-    StreamOperatorTask task = new StreamOperatorTask(sgb, null, testClock);
-    task.init(config, taskContext);
+    StreamOperatorTask task = new StreamOperatorTask(sgb, testClock);
+    task.init(this.context);
     MessageCollector messageCollector = envelope -> windowPanes.add((WindowPane<Integer, Integer>) envelope.getMessage());
     integers.forEach(n -> task.process(new IntegerEnvelope(n), messageCollector, taskCoordinator));
     testClock.advanceTime(Duration.ofSeconds(1));
@@ -193,8 +192,8 @@ public class TestWindowOperator {
         Duration.ofSeconds(1), Triggers.repeat(Triggers.count(2))).getOperatorSpecGraph();
     List<WindowPane<Integer, Collection<IntegerEnvelope>>> windowPanes = new ArrayList<>();
     TestClock testClock = new TestClock();
-    StreamOperatorTask task = new StreamOperatorTask(sgb, null, testClock);
-    task.init(config, taskContext);
+    StreamOperatorTask task = new StreamOperatorTask(sgb, testClock);
+    task.init(this.context);
 
     MessageCollector messageCollector =
         envelope -> windowPanes.add((WindowPane<Integer, Collection<IntegerEnvelope>>) envelope.getMessage());
@@ -222,8 +221,8 @@ public class TestWindowOperator {
         this.getKeyedSessionWindowStreamGraph(AccumulationMode.DISCARDING, Duration.ofMillis(500)).getOperatorSpecGraph();
     TestClock testClock = new TestClock();
     List<WindowPane<Integer, Collection<IntegerEnvelope>>> windowPanes = new ArrayList<>();
-    StreamOperatorTask task = new StreamOperatorTask(sgb, null, testClock);
-    task.init(config, taskContext);
+    StreamOperatorTask task = new StreamOperatorTask(sgb, testClock);
+    task.init(this.context);
     MessageCollector messageCollector =
         envelope -> windowPanes.add((WindowPane<Integer, Collection<IntegerEnvelope>>) envelope.getMessage());
     task.process(new IntegerEnvelope(1), messageCollector, taskCoordinator);
@@ -267,12 +266,12 @@ public class TestWindowOperator {
     OperatorSpecGraph sgb = this.getKeyedSessionWindowStreamGraph(AccumulationMode.DISCARDING,
         Duration.ofMillis(500)).getOperatorSpecGraph();
     TestClock testClock = new TestClock();
-    StreamOperatorTask task = new StreamOperatorTask(sgb, null, testClock);
+    StreamOperatorTask task = new StreamOperatorTask(sgb, testClock);
     List<WindowPane<Integer, Collection<IntegerEnvelope>>> windowPanes = new ArrayList<>();
 
     MessageCollector messageCollector =
         envelope -> windowPanes.add((WindowPane<Integer, Collection<IntegerEnvelope>>) envelope.getMessage());
-    task.init(config, taskContext);
+    task.init(this.context);
 
     task.process(new IntegerEnvelope(1), messageCollector, taskCoordinator);
     task.process(new IntegerEnvelope(1), messageCollector, taskCoordinator);
@@ -299,8 +298,8 @@ public class TestWindowOperator {
     OperatorSpecGraph sgb = this.getKeyedTumblingWindowStreamGraph(AccumulationMode.ACCUMULATING,
         Duration.ofSeconds(1), Triggers.count(2)).getOperatorSpecGraph();
     TestClock testClock = new TestClock();
-    StreamOperatorTask task = new StreamOperatorTask(sgb, null, testClock);
-    task.init(config, taskContext);
+    StreamOperatorTask task = new StreamOperatorTask(sgb, testClock);
+    task.init(this.context);
 
     List<WindowPane<Integer, Collection<IntegerEnvelope>>> windowPanes = new ArrayList<>();
     MessageCollector messageCollector =
@@ -343,8 +342,8 @@ public class TestWindowOperator {
     OperatorSpecGraph sgb = this.getKeyedTumblingWindowStreamGraph(AccumulationMode.ACCUMULATING, Duration.ofSeconds(1),
         Triggers.any(Triggers.count(2), Triggers.timeSinceFirstMessage(Duration.ofMillis(500)))).getOperatorSpecGraph();
     TestClock testClock = new TestClock();
-    StreamOperatorTask task = new StreamOperatorTask(sgb, null, testClock);
-    task.init(config, taskContext);
+    StreamOperatorTask task = new StreamOperatorTask(sgb, testClock);
+    task.init(this.context);
 
     List<WindowPane<Integer, Collection<IntegerEnvelope>>> windowPanes = new ArrayList<>();
     MessageCollector messageCollector =
@@ -406,8 +405,8 @@ public class TestWindowOperator {
         envelope -> windowPanes.add((WindowPane<Integer, Collection<IntegerEnvelope>>) envelope.getMessage());
 
     TestClock testClock = new TestClock();
-    StreamOperatorTask task = new StreamOperatorTask(sgb, null, testClock);
-    task.init(config, taskContext);
+    StreamOperatorTask task = new StreamOperatorTask(sgb, testClock);
+    task.init(this.context);
 
     task.process(new IntegerEnvelope(1), messageCollector, taskCoordinator);
 
@@ -439,17 +438,18 @@ public class TestWindowOperator {
     EndOfStreamStates endOfStreamStates = new EndOfStreamStates(ImmutableSet.of(new SystemStreamPartition("kafka",
         "integers", new Partition(0))), Collections.emptyMap());
 
-    when(taskContext.getTaskName()).thenReturn(new TaskName("task 1"));
-    when(taskContext.fetchObject(EndOfStreamStates.class.getName())).thenReturn(endOfStreamStates);
-    when(taskContext.fetchObject(WatermarkStates.class.getName())).thenReturn(mock(WatermarkStates.class));
+    when(((TaskContextImpl) this.context.getTaskContext()).fetchObject(EndOfStreamStates.class.getName())).thenReturn(
+        endOfStreamStates);
+    when(((TaskContextImpl) this.context.getTaskContext()).fetchObject(WatermarkStates.class.getName())).thenReturn(
+        mock(WatermarkStates.class));
 
     OperatorSpecGraph sgb = this.getTumblingWindowStreamGraph(AccumulationMode.DISCARDING,
         Duration.ofSeconds(1), Triggers.repeat(Triggers.count(2))).getOperatorSpecGraph();
     List<WindowPane<Integer, Collection<IntegerEnvelope>>> windowPanes = new ArrayList<>();
 
     TestClock testClock = new TestClock();
-    StreamOperatorTask task = new StreamOperatorTask(sgb, null, testClock);
-    task.init(config, taskContext);
+    StreamOperatorTask task = new StreamOperatorTask(sgb, testClock);
+    task.init(this.context);
 
     MessageCollector messageCollector =
         envelope -> windowPanes.add((WindowPane<Integer, Collection<IntegerEnvelope>>) envelope.getMessage());
@@ -480,16 +480,17 @@ public class TestWindowOperator {
     EndOfStreamStates endOfStreamStates = new EndOfStreamStates(ImmutableSet.of(new SystemStreamPartition("kafka",
         "integers", new Partition(0))), Collections.emptyMap());
 
-    when(taskContext.getTaskName()).thenReturn(new TaskName("task 1"));
-    when(taskContext.fetchObject(EndOfStreamStates.class.getName())).thenReturn(endOfStreamStates);
-    when(taskContext.fetchObject(WatermarkStates.class.getName())).thenReturn(mock(WatermarkStates.class));
+    when(((TaskContextImpl) this.context.getTaskContext()).fetchObject(EndOfStreamStates.class.getName())).thenReturn(
+        endOfStreamStates);
+    when(((TaskContextImpl) this.context.getTaskContext()).fetchObject(WatermarkStates.class.getName())).thenReturn(
+        mock(WatermarkStates.class));
 
     OperatorSpecGraph sgb =
         this.getKeyedSessionWindowStreamGraph(AccumulationMode.DISCARDING, Duration.ofMillis(500)).getOperatorSpecGraph();
     TestClock testClock = new TestClock();
     List<WindowPane<Integer, Collection<IntegerEnvelope>>> windowPanes = new ArrayList<>();
-    StreamOperatorTask task = new StreamOperatorTask(sgb, null, testClock);
-    task.init(config, taskContext);
+    StreamOperatorTask task = new StreamOperatorTask(sgb, testClock);
+    task.init(this.context);
 
     MessageCollector messageCollector =
         envelope -> windowPanes.add((WindowPane<Integer, Collection<IntegerEnvelope>>) envelope.getMessage());
@@ -517,16 +518,17 @@ public class TestWindowOperator {
     EndOfStreamStates endOfStreamStates = new EndOfStreamStates(ImmutableSet.of(new SystemStreamPartition("kafka",
         "integers", new Partition(0))), Collections.emptyMap());
 
-    when(taskContext.getTaskName()).thenReturn(new TaskName("task 1"));
-    when(taskContext.fetchObject(EndOfStreamStates.class.getName())).thenReturn(endOfStreamStates);
-    when(taskContext.fetchObject(WatermarkStates.class.getName())).thenReturn(mock(WatermarkStates.class));
+    when(((TaskContextImpl) this.context.getTaskContext()).fetchObject(EndOfStreamStates.class.getName())).thenReturn(
+        endOfStreamStates);
+    when(((TaskContextImpl) this.context.getTaskContext()).fetchObject(WatermarkStates.class.getName())).thenReturn(
+        mock(WatermarkStates.class));
 
     OperatorSpecGraph sgb =
         this.getKeyedSessionWindowStreamGraph(AccumulationMode.DISCARDING, Duration.ofMillis(500)).getOperatorSpecGraph();
     TestClock testClock = new TestClock();
     List<WindowPane<Integer, Collection<IntegerEnvelope>>> windowPanes = new ArrayList<>();
-    StreamOperatorTask task = new StreamOperatorTask(sgb, null, testClock);
-    task.init(config, taskContext);
+    StreamOperatorTask task = new StreamOperatorTask(sgb, testClock);
+    task.init(this.context);
 
     MessageCollector messageCollector =
         envelope -> windowPanes.add((WindowPane<Integer, Collection<IntegerEnvelope>>) envelope.getMessage());

http://git-wip-us.apache.org/repos/asf/samza/blob/9d2d49e9/samza-core/src/test/java/org/apache/samza/operators/spec/TestOperatorSpec.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/operators/spec/TestOperatorSpec.java b/samza-core/src/test/java/org/apache/samza/operators/spec/TestOperatorSpec.java
index 860e630..6e91e2a 100644
--- a/samza-core/src/test/java/org/apache/samza/operators/spec/TestOperatorSpec.java
+++ b/samza-core/src/test/java/org/apache/samza/operators/spec/TestOperatorSpec.java
@@ -31,9 +31,9 @@ import org.apache.samza.operators.functions.FilterFunction;
 import org.apache.samza.operators.functions.FlatMapFunction;
 import org.apache.samza.operators.functions.JoinFunction;
 import org.apache.samza.operators.functions.MapFunction;
+import org.apache.samza.operators.functions.ScheduledFunction;
 import org.apache.samza.operators.functions.SinkFunction;
 import org.apache.samza.operators.functions.StreamTableJoinFunction;
-import org.apache.samza.operators.functions.ScheduledFunction;
 import org.apache.samza.operators.functions.WatermarkFunction;
 import org.apache.samza.serializers.JsonSerdeV2;
 import org.apache.samza.serializers.KVSerde;

http://git-wip-us.apache.org/repos/asf/samza/blob/9d2d49e9/samza-core/src/test/java/org/apache/samza/operators/spec/TestPartitionByOperatorSpec.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/operators/spec/TestPartitionByOperatorSpec.java b/samza-core/src/test/java/org/apache/samza/operators/spec/TestPartitionByOperatorSpec.java
index e1342e3..fd4a7fb 100644
--- a/samza-core/src/test/java/org/apache/samza/operators/spec/TestPartitionByOperatorSpec.java
+++ b/samza-core/src/test/java/org/apache/samza/operators/spec/TestPartitionByOperatorSpec.java
@@ -23,9 +23,9 @@ import java.util.Map;
 import org.apache.samza.application.StreamApplicationDescriptorImpl;
 import org.apache.samza.config.Config;
 import org.apache.samza.config.JobConfig;
-import org.apache.samza.operators.Scheduler;
 import org.apache.samza.operators.MessageStream;
 import org.apache.samza.operators.OperatorSpecGraph;
+import org.apache.samza.operators.Scheduler;
 import org.apache.samza.operators.descriptors.GenericInputDescriptor;
 import org.apache.samza.operators.descriptors.GenericSystemDescriptor;
 import org.apache.samza.operators.functions.MapFunction;

http://git-wip-us.apache.org/repos/asf/samza/blob/9d2d49e9/samza-core/src/test/java/org/apache/samza/operators/spec/TestWindowOperatorSpec.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/operators/spec/TestWindowOperatorSpec.java b/samza-core/src/test/java/org/apache/samza/operators/spec/TestWindowOperatorSpec.java
index 41973b2..b73f8e3 100644
--- a/samza-core/src/test/java/org/apache/samza/operators/spec/TestWindowOperatorSpec.java
+++ b/samza-core/src/test/java/org/apache/samza/operators/spec/TestWindowOperatorSpec.java
@@ -20,16 +20,16 @@
 package org.apache.samza.operators.spec;
 
 import org.apache.samza.operators.Scheduler;
-import org.apache.samza.operators.functions.ScheduledFunction;
-import org.apache.samza.operators.functions.WatermarkFunction;
-import org.apache.samza.serializers.Serde;
 import org.apache.samza.operators.functions.FoldLeftFunction;
 import org.apache.samza.operators.functions.MapFunction;
+import org.apache.samza.operators.functions.ScheduledFunction;
 import org.apache.samza.operators.functions.SupplierFunction;
+import org.apache.samza.operators.functions.WatermarkFunction;
 import org.apache.samza.operators.triggers.Trigger;
 import org.apache.samza.operators.triggers.Triggers;
 import org.apache.samza.operators.windows.internal.WindowInternal;
 import org.apache.samza.operators.windows.internal.WindowType;
+import org.apache.samza.serializers.Serde;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
@@ -38,7 +38,8 @@ import java.time.Duration;
 import java.util.ArrayList;
 import java.util.Collection;
 
-import static org.junit.Assert.*;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
 import static org.mockito.Mockito.mock;
 
 public class TestWindowOperatorSpec {

http://git-wip-us.apache.org/repos/asf/samza/blob/9d2d49e9/samza-core/src/test/java/org/apache/samza/processor/TestStreamProcessor.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/processor/TestStreamProcessor.java b/samza-core/src/test/java/org/apache/samza/processor/TestStreamProcessor.java
index 93b157a..b002e2a 100644
--- a/samza-core/src/test/java/org/apache/samza/processor/TestStreamProcessor.java
+++ b/samza-core/src/test/java/org/apache/samza/processor/TestStreamProcessor.java
@@ -49,7 +49,8 @@ import org.mockito.Mockito;
 import org.powermock.api.mockito.PowerMockito;
 
 import static org.junit.Assert.assertEquals;
-import static org.mockito.Matchers.*;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyString;
 import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.mock;
@@ -465,8 +466,10 @@ public class TestStreamProcessor {
   @Test
   public void testStreamProcessorWithStreamProcessorListenerFactory() {
     AtomicReference<MockStreamProcessorLifecycleListener> mockListener = new AtomicReference<>();
-    StreamProcessor streamProcessor = new StreamProcessor(mock(Config.class), new HashMap<>(), mock(TaskFactory.class),
-        sp -> mockListener.updateAndGet(old -> new MockStreamProcessorLifecycleListener(sp)), mock(JobCoordinator.class));
+    StreamProcessor streamProcessor =
+        new StreamProcessor(mock(Config.class), new HashMap<>(), mock(TaskFactory.class), null, null,
+            sp -> mockListener.updateAndGet(old -> new MockStreamProcessorLifecycleListener(sp)),
+            mock(JobCoordinator.class));
     assertEquals(streamProcessor, mockListener.get().processor);
   }
 

http://git-wip-us.apache.org/repos/asf/samza/blob/9d2d49e9/samza-core/src/test/java/org/apache/samza/storage/MockStorageEngineFactory.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/storage/MockStorageEngineFactory.java b/samza-core/src/test/java/org/apache/samza/storage/MockStorageEngineFactory.java
index d483ae6..8eff4ad 100644
--- a/samza-core/src/test/java/org/apache/samza/storage/MockStorageEngineFactory.java
+++ b/samza-core/src/test/java/org/apache/samza/storage/MockStorageEngineFactory.java
@@ -20,8 +20,8 @@
 package org.apache.samza.storage;
 
 import java.io.File;
-
-import org.apache.samza.container.SamzaContainerContext;
+import org.apache.samza.context.ContainerContext;
+import org.apache.samza.context.JobContext;
 import org.apache.samza.metrics.MetricsRegistry;
 import org.apache.samza.serializers.Serde;
 import org.apache.samza.system.SystemStreamPartition;
@@ -29,9 +29,15 @@ import org.apache.samza.task.MessageCollector;
 
 public class MockStorageEngineFactory implements StorageEngineFactory<Object, Object> {
   @Override
-  public StorageEngine getStorageEngine(String storeName, File storeDir, Serde<Object> keySerde, Serde<Object> msgSerde,
-      MessageCollector collector, MetricsRegistry registry, SystemStreamPartition changeLogSystemStreamPartition,
-      SamzaContainerContext containerContext) {
+  public StorageEngine getStorageEngine(String storeName,
+      File storeDir,
+      Serde<Object> keySerde,
+      Serde<Object> msgSerde,
+      MessageCollector collector,
+      MetricsRegistry registry,
+      SystemStreamPartition changeLogSystemStreamPartition,
+      JobContext jobContext,
+      ContainerContext containerContext) {
     StoreProperties storeProperties = new StoreProperties.StorePropertiesBuilder().setLoggedStore(true).build();
     return new MockStorageEngine(storeName, storeDir, changeLogSystemStreamPartition, storeProperties);
   }

http://git-wip-us.apache.org/repos/asf/samza/blob/9d2d49e9/samza-core/src/test/java/org/apache/samza/table/TestTableManager.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/table/TestTableManager.java b/samza-core/src/test/java/org/apache/samza/table/TestTableManager.java
index 42f05c0..0952a87 100644
--- a/samza-core/src/test/java/org/apache/samza/table/TestTableManager.java
+++ b/samza-core/src/test/java/org/apache/samza/table/TestTableManager.java
@@ -18,25 +18,23 @@
  */
 package org.apache.samza.table;
 
-import java.lang.reflect.Field;
-import java.util.Base64;
-import java.util.HashMap;
-import java.util.Map;
-
+import junit.framework.Assert;
 import org.apache.samza.SamzaException;
 import org.apache.samza.config.JavaTableConfig;
 import org.apache.samza.config.MapConfig;
 import org.apache.samza.config.SerializerConfig;
-import org.apache.samza.container.SamzaContainerContext;
+import org.apache.samza.context.MockContext;
 import org.apache.samza.serializers.IntegerSerde;
 import org.apache.samza.serializers.Serde;
 import org.apache.samza.serializers.SerializableSerde;
 import org.apache.samza.serializers.StringSerde;
 import org.apache.samza.storage.StorageEngine;
-import org.apache.samza.task.TaskContext;
 import org.junit.Test;
 
-import junit.framework.Assert;
+import java.lang.reflect.Field;
+import java.util.Base64;
+import java.util.HashMap;
+import java.util.Map;
 
 import static org.mockito.Matchers.anyObject;
 import static org.mockito.Mockito.mock;
@@ -122,11 +120,11 @@ public class TestTableManager {
           });
 
     TableManager tableManager = new TableManager(new MapConfig(map), serdeMap);
-    tableManager.init(mock(SamzaContainerContext.class), mock(TaskContext.class));
+    tableManager.init(new MockContext());
 
     for (int i = 0; i < 2; i++) {
       Table table = tableManager.getTable(TABLE_ID);
-      verify(DummyTableProviderFactory.tableProvider, times(1)).init(anyObject(), anyObject());
+      verify(DummyTableProviderFactory.tableProvider, times(1)).init(anyObject());
       verify(DummyTableProviderFactory.tableProvider, times(1)).getTable();
       Assert.assertEquals(DummyTableProviderFactory.table, table);
     }

http://git-wip-us.apache.org/repos/asf/samza/blob/9d2d49e9/samza-core/src/test/java/org/apache/samza/table/caching/TestCachingTable.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/table/caching/TestCachingTable.java b/samza-core/src/test/java/org/apache/samza/table/caching/TestCachingTable.java
index ec1c915..dc13d00 100644
--- a/samza-core/src/test/java/org/apache/samza/table/caching/TestCachingTable.java
+++ b/samza-core/src/test/java/org/apache/samza/table/caching/TestCachingTable.java
@@ -19,19 +19,11 @@
 
 package org.apache.samza.table.caching;
 
-import java.time.Duration;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.Executors;
-
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
 import org.apache.commons.lang3.tuple.Pair;
-import org.apache.samza.container.SamzaContainerContext;
+import org.apache.samza.context.Context;
+import org.apache.samza.context.MockContext;
 import org.apache.samza.metrics.Counter;
 import org.apache.samza.metrics.Gauge;
 import org.apache.samza.metrics.MetricsRegistry;
@@ -45,17 +37,24 @@ import org.apache.samza.table.TableSpec;
 import org.apache.samza.table.caching.guava.GuavaCacheTable;
 import org.apache.samza.table.caching.guava.GuavaCacheTableDescriptor;
 import org.apache.samza.table.caching.guava.GuavaCacheTableProvider;
-import org.apache.samza.table.remote.TableRateLimiter;
 import org.apache.samza.table.remote.RemoteReadWriteTable;
+import org.apache.samza.table.remote.TableRateLimiter;
 import org.apache.samza.table.remote.TableReadFunction;
 import org.apache.samza.table.remote.TableWriteFunction;
-import org.apache.samza.task.TaskContext;
 import org.apache.samza.util.NoOpMetricsRegistry;
 import org.junit.Assert;
 import org.junit.Test;
 
-import com.google.common.cache.Cache;
-import com.google.common.cache.CacheBuilder;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executors;
 
 import static org.mockito.Matchers.any;
 import static org.mockito.Matchers.anyString;
@@ -139,15 +138,14 @@ public class TestCachingTable {
   }
 
   private void initTables(ReadableTable ... tables) {
-    SamzaContainerContext containerContext = mock(SamzaContainerContext.class);
-    TaskContext taskContext = mock(TaskContext.class);
+    Context context = new MockContext();
     MetricsRegistry metricsRegistry = mock(MetricsRegistry.class);
     doReturn(mock(Timer.class)).when(metricsRegistry).newTimer(anyString(), anyString());
     doReturn(mock(Counter.class)).when(metricsRegistry).newCounter(anyString(), anyString());
     doReturn(mock(Gauge.class)).when(metricsRegistry).newGauge(anyString(), any());
-    when(taskContext.getMetricsRegistry()).thenReturn(metricsRegistry);
+    when(context.getTaskContext().getTaskMetricsRegistry()).thenReturn(metricsRegistry);
     for (ReadableTable table : tables) {
-      table.init(containerContext, taskContext);
+      table.init(context);
     }
   }
 
@@ -160,9 +158,7 @@ public class TestCachingTable {
     }
     CachingTableProvider tableProvider = new CachingTableProvider(desc.getTableSpec());
 
-    SamzaContainerContext containerContext = mock(SamzaContainerContext.class);
-
-    TaskContext taskContext = mock(TaskContext.class);
+    Context context = new MockContext();
     final ReadWriteTable cacheTable = getMockCache().getLeft();
 
     final ReadWriteTable realTable = mock(ReadWriteTable.class);
@@ -185,11 +181,11 @@ public class TestCachingTable {
 
         Assert.fail();
         return null;
-      }).when(taskContext).getTable(anyString());
+      }).when(context.getTaskContext()).getTable(anyString());
 
-    when(taskContext.getMetricsRegistry()).thenReturn(new NoOpMetricsRegistry());
+    when(context.getTaskContext().getTaskMetricsRegistry()).thenReturn(new NoOpMetricsRegistry());
 
-    tableProvider.init(containerContext, taskContext);
+    tableProvider.init(context);
 
     CachingTable cachingTable = (CachingTable) tableProvider.getTable();
 

http://git-wip-us.apache.org/repos/asf/samza/blob/9d2d49e9/samza-core/src/test/java/org/apache/samza/table/remote/TestRemoteTable.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/table/remote/TestRemoteTable.java b/samza-core/src/test/java/org/apache/samza/table/remote/TestRemoteTable.java
index 3e844c3..571f87b 100644
--- a/samza-core/src/test/java/org/apache/samza/table/remote/TestRemoteTable.java
+++ b/samza-core/src/test/java/org/apache/samza/table/remote/TestRemoteTable.java
@@ -19,17 +19,9 @@
 
 package org.apache.samza.table.remote;
 
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-
-import org.apache.samza.container.SamzaContainerContext;
+import junit.framework.Assert;
+import org.apache.samza.context.Context;
+import org.apache.samza.context.MockContext;
 import org.apache.samza.metrics.Counter;
 import org.apache.samza.metrics.Gauge;
 import org.apache.samza.metrics.MetricsRegistry;
@@ -38,11 +30,18 @@ import org.apache.samza.storage.kv.Entry;
 import org.apache.samza.table.retry.RetriableReadFunction;
 import org.apache.samza.table.retry.RetriableWriteFunction;
 import org.apache.samza.table.retry.TableRetryPolicy;
-import org.apache.samza.task.TaskContext;
 import org.junit.Test;
 import org.mockito.ArgumentCaptor;
 
-import junit.framework.Assert;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
 
 import static org.mockito.Matchers.any;
 import static org.mockito.Matchers.anyCollection;
@@ -57,14 +56,14 @@ import static org.mockito.Mockito.verify;
 public class TestRemoteTable {
   private final ScheduledExecutorService schedExec = Executors.newSingleThreadScheduledExecutor();
 
-  public static TaskContext getMockTaskContext() {
+  public static Context getMockContext() {
+    Context context = new MockContext();
     MetricsRegistry metricsRegistry = mock(MetricsRegistry.class);
     doAnswer(args -> new Timer((String) args.getArguments()[0])).when(metricsRegistry).newTimer(anyString(), anyString());
     doAnswer(args -> new Counter((String) args.getArguments()[0])).when(metricsRegistry).newCounter(anyString(), anyString());
     doAnswer(args -> new Gauge((String) args.getArguments()[0], 0)).when(metricsRegistry).newGauge(anyString(), any());
-    TaskContext taskContext = mock(TaskContext.class);
-    doReturn(metricsRegistry).when(taskContext).getMetricsRegistry();
-    return taskContext;
+    doReturn(metricsRegistry).when(context.getTaskContext()).getTaskMetricsRegistry();
+    return context;
   }
 
   private <K, V, T extends RemoteReadableTable<K, V>> T getTable(String tableId,
@@ -89,11 +88,9 @@ public class TestRemoteTable {
       table = new RemoteReadWriteTable<K, V>(tableId, readFn, writeFn, readRateLimiter, writeRateLimiter, tableExecutor, cbExecutor);
     }
 
-    TaskContext taskContext = getMockTaskContext();
-
-    SamzaContainerContext containerContext = mock(SamzaContainerContext.class);
+    Context context = getMockContext();
 
-    table.init(containerContext, taskContext);
+    table.init(context);
 
     return (T) table;
   }