You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by pm...@apache.org on 2019/03/06 18:39:07 UTC

[samza] branch master updated: SAMZA-1935: Refactor TaskContextImpl to not include access to objects that are only used internally

This is an automated email from the ASF dual-hosted git repository.

pmaheshwari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git


The following commit(s) were added to refs/heads/master by this push:
     new dace47b  SAMZA-1935: Refactor TaskContextImpl to not include access to objects that are only used internally
dace47b is described below

commit dace47b94e7d2c3ef1d171ab922035eaa7bb3fc9
Author: Cyril Pottiez <cy...@hotmail.fr>
AuthorDate: Wed Mar 6 10:39:05 2019 -0800

    SAMZA-1935: Refactor TaskContextImpl to not include access to objects that are only used internally
    
    We created a new class called JobContextMetadata which hold the methods `registerObject`, `fetchObject`, `getJobModel` and `getStreamMetadataCache` and is instantiated in the `OperatorImplGraph` and is passed to multiple `OperatorImpl`.
    
    The class `EmbeddedTaggedRateLimiter` implements the public API `RateLimiter`, which has a method called init with a `Context` object as inparameter. This means that the Context object is the only source from which the JobModel and the `StreamMetadataCache` can be fetched. Hence, the methods `getJobModel` and `getStreamMetadataCache` are needed outside of `JobContextMetadata`. Therefore only copies of these methods were moved to `JobContextMetadata`. This means that the `StreamMetadat [...]
    
    Author: Cyril Pottiez <cy...@hotmail.fr>
    Author: Sara Ersson <sa...@kth.se>
    
    Reviewers: Cameron Lee <ca...@linkedin.com>
    
    Closes #934 from vwidin/testing
---
 .../apache/samza/context/InternalTaskContext.java  | 56 ++++++++++++++++++++++
 .../org/apache/samza/context/TaskContextImpl.java  | 13 -----
 .../apache/samza/operators/impl/OperatorImpl.java  | 18 +++----
 .../samza/operators/impl/OperatorImplGraph.java    | 28 ++++++-----
 .../operators/impl/PartitionByOperatorImpl.java    |  8 ++--
 .../samza/context/TestInternalTaskContext.java     | 47 ++++++++++++++++++
 .../apache/samza/context/TestTaskContextImpl.java  | 13 +----
 .../samza/operators/impl/TestOperatorImpl.java     | 27 +++++++----
 .../samza/operators/impl/TestWindowOperator.java   | 26 +---------
 9 files changed, 153 insertions(+), 83 deletions(-)

diff --git a/samza-core/src/main/java/org/apache/samza/context/InternalTaskContext.java b/samza-core/src/main/java/org/apache/samza/context/InternalTaskContext.java
new file mode 100644
index 0000000..3d3a53d
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/context/InternalTaskContext.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.context;
+
+import org.apache.samza.job.model.JobModel;
+import org.apache.samza.system.StreamMetadataCache;
+
+import java.util.HashMap;
+import java.util.Map;
+
+public class InternalTaskContext {
+
+  private final Context context;
+  private final Map<String, Object> objectRegistry = new HashMap<>();
+
+  public InternalTaskContext(Context context) {
+    this.context = context;
+  }
+
+  public void registerObject(String name, Object value) {
+    this.objectRegistry.put(name, value);
+  }
+
+  public Object fetchObject(String name) {
+    return this.objectRegistry.get(name);
+  }
+
+  public Context getContext() {
+    return context;
+  }
+
+  public JobModel getJobModel() {
+    return ((TaskContextImpl) this.context.getTaskContext()).getJobModel();
+  }
+
+  public StreamMetadataCache getStreamMetadataCache() {
+    return ((TaskContextImpl) this.context.getTaskContext()).getStreamMetadataCache();
+  }
+}
diff --git a/samza-core/src/main/java/org/apache/samza/context/TaskContextImpl.java b/samza-core/src/main/java/org/apache/samza/context/TaskContextImpl.java
index a29c2b3..edec17d 100644
--- a/samza-core/src/main/java/org/apache/samza/context/TaskContextImpl.java
+++ b/samza-core/src/main/java/org/apache/samza/context/TaskContextImpl.java
@@ -29,8 +29,6 @@ import org.apache.samza.system.SystemStreamPartition;
 import org.apache.samza.table.ReadWriteTable;
 import org.apache.samza.table.TableManager;
 
-import java.util.HashMap;
-import java.util.Map;
 import java.util.function.Function;
 
 
@@ -43,7 +41,6 @@ public class TaskContextImpl implements TaskContext {
   private final OffsetManager offsetManager;
   private final JobModel jobModel;
   private final StreamMetadataCache streamMetadataCache;
-  private final Map<String, Object> objectRegistry = new HashMap<>();
 
   public TaskContextImpl(TaskModel taskModel,
       MetricsRegistry taskMetricsRegistry,
@@ -97,16 +94,6 @@ public class TaskContextImpl implements TaskContext {
     this.offsetManager.setStartingOffset(this.taskModel.getTaskName(), systemStreamPartition, offset);
   }
 
-  // TODO SAMZA-1935: below methods are used by operator code; they should be decoupled from this client API
-
-  public void registerObject(String name, Object value) {
-    this.objectRegistry.put(name, value);
-  }
-
-  public Object fetchObject(String name) {
-    return this.objectRegistry.get(name);
-  }
-
   public JobModel getJobModel() {
     return this.jobModel;
   }
diff --git a/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImpl.java b/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImpl.java
index 728a171..276e8c3 100644
--- a/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImpl.java
+++ b/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImpl.java
@@ -24,7 +24,8 @@ import org.apache.samza.config.MetricsConfig;
 import org.apache.samza.container.TaskName;
 import org.apache.samza.context.ContainerContext;
 import org.apache.samza.context.Context;
-import org.apache.samza.context.TaskContextImpl;
+import org.apache.samza.context.InternalTaskContext;
+import org.apache.samza.context.TaskContext;
 import org.apache.samza.job.model.TaskModel;
 import org.apache.samza.metrics.Counter;
 import org.apache.samza.metrics.MetricsRegistry;
@@ -89,9 +90,11 @@ public abstract class OperatorImpl<M, RM> {
   /**
    * Initialize this {@link OperatorImpl} and its user-defined functions.
    *
-   * @param context the {@link Context} for the task
+   * @param internalTaskContext the {@link InternalTaskContext} for the task
    */
-  public final void init(Context context) {
+  public final void init(InternalTaskContext internalTaskContext) {
+    final Context context = internalTaskContext.getContext();
+
     String opId = getOpImplId();
 
     if (initialized) {
@@ -113,12 +116,11 @@ public abstract class OperatorImpl<M, RM> {
     this.handleMessageNs = metricsRegistry.newTimer(METRICS_GROUP, opId + "-handle-message-ns");
     this.handleTimerNs = metricsRegistry.newTimer(METRICS_GROUP, opId + "-handle-timer-ns");
 
-    // TODO SAMZA-1935: the objects that are only accessible through TaskContextImpl should be moved somewhere else
-    final TaskContextImpl taskContext = (TaskContextImpl) context.getTaskContext();
+    final TaskContext taskContext =  context.getTaskContext();
     this.taskName = taskContext.getTaskModel().getTaskName();
-    this.eosStates = (EndOfStreamStates) taskContext.fetchObject(EndOfStreamStates.class.getName());
-    this.watermarkStates = (WatermarkStates) taskContext.fetchObject(WatermarkStates.class.getName());
-    this.controlMessageSender = new ControlMessageSender(taskContext.getStreamMetadataCache());
+    this.eosStates = (EndOfStreamStates) internalTaskContext.fetchObject(EndOfStreamStates.class.getName());
+    this.watermarkStates = (WatermarkStates) internalTaskContext.fetchObject(WatermarkStates.class.getName());
+    this.controlMessageSender = new ControlMessageSender(internalTaskContext.getStreamMetadataCache());
     this.taskModel = taskContext.getTaskModel();
     this.callbackScheduler = taskContext.getCallbackScheduler();
     handleInit(context);
diff --git a/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImplGraph.java b/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImplGraph.java
index e668b91..9f33356 100644
--- a/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImplGraph.java
+++ b/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImplGraph.java
@@ -24,7 +24,7 @@ import com.google.common.collect.Multimap;
 import org.apache.samza.config.Config;
 import org.apache.samza.config.StreamConfig;
 import org.apache.samza.context.Context;
-import org.apache.samza.context.TaskContextImpl;
+import org.apache.samza.context.InternalTaskContext;
 import org.apache.samza.job.model.JobModel;
 import org.apache.samza.operators.KV;
 import org.apache.samza.operators.OperatorSpecGraph;
@@ -86,6 +86,8 @@ public class OperatorImplGraph {
 
   private final Clock clock;
 
+  private InternalTaskContext internalTaskContext;
+
   /**
    * Constructs the DAG of {@link OperatorImpl}s corresponding to the the DAG of {@link OperatorSpec}s
    * in the {@code specGraph}.
@@ -97,12 +99,11 @@ public class OperatorImplGraph {
   public OperatorImplGraph(OperatorSpecGraph specGraph, Context context, Clock clock) {
     this.clock = clock;
     StreamConfig streamConfig = new StreamConfig(context.getJobContext().getConfig());
-    // TODO SAMZA-1935: the objects that are only accessible through TaskContextImpl should be moved somewhere else
-    TaskContextImpl taskContext = (TaskContextImpl) context.getTaskContext();
+    this.internalTaskContext = new InternalTaskContext(context);
     Map<SystemStream, Integer> producerTaskCounts =
         hasIntermediateStreams(specGraph)
             ? getProducerTaskCountForIntermediateStreams(
-                getStreamToConsumerTasks(taskContext.getJobModel()),
+                getStreamToConsumerTasks(internalTaskContext.getJobModel()),
                 getIntermediateToInputStreamsMap(specGraph, streamConfig))
             : Collections.EMPTY_MAP;
     producerTaskCounts.forEach((stream, count) -> {
@@ -110,12 +111,16 @@ public class OperatorImplGraph {
       });
 
     // set states for end-of-stream
-    taskContext.registerObject(EndOfStreamStates.class.getName(),
-        new EndOfStreamStates(taskContext.getTaskModel().getSystemStreamPartitions(), producerTaskCounts));
+    internalTaskContext.registerObject(EndOfStreamStates.class.getName(),
+        new EndOfStreamStates(
+                internalTaskContext.getContext().getTaskContext().getTaskModel().getSystemStreamPartitions(),
+                producerTaskCounts));
     // set states for watermark
-    taskContext.registerObject(WatermarkStates.class.getName(),
-        new WatermarkStates(taskContext.getTaskModel().getSystemStreamPartitions(), producerTaskCounts,
-            context.getContainerContext().getContainerMetricsRegistry()));
+    internalTaskContext.registerObject(WatermarkStates.class.getName(),
+        new WatermarkStates(
+                internalTaskContext.getContext().getTaskContext().getTaskModel().getSystemStreamPartitions(),
+                producerTaskCounts,
+                context.getContainerContext().getContainerMetricsRegistry()));
 
     specGraph.getInputOperators().forEach((streamId, inputOpSpec) -> {
         SystemStream systemStream = streamConfig.streamIdToSystemStream(streamId);
@@ -166,7 +171,7 @@ public class OperatorImplGraph {
       // Either this is the first time we've seen this operatorSpec, or this is a join operator spec
       // and we need to create 2 partial join operator impls for it. Initialize and register the sub-DAG.
       OperatorImpl operatorImpl = createOperatorImpl(prevOperatorSpec, operatorSpec, context);
-      operatorImpl.init(context);
+      operatorImpl.init(this.internalTaskContext);
       operatorImpl.registerInputStream(inputStream);
 
       if (operatorSpec.getScheduledFn() != null) {
@@ -223,7 +228,8 @@ public class OperatorImplGraph {
     } else if (operatorSpec instanceof PartitionByOperatorSpec) {
       String streamId = ((PartitionByOperatorSpec) operatorSpec).getOutputStream().getStreamId();
       SystemStream systemStream = streamConfig.streamIdToSystemStream(streamId);
-      return new PartitionByOperatorImpl((PartitionByOperatorSpec) operatorSpec, systemStream, context);
+      return new PartitionByOperatorImpl((PartitionByOperatorSpec) operatorSpec, systemStream,
+              internalTaskContext);
     } else if (operatorSpec instanceof WindowOperatorSpec) {
       return new WindowOperatorImpl((WindowOperatorSpec) operatorSpec, clock);
     } else if (operatorSpec instanceof JoinOperatorSpec) {
diff --git a/samza-core/src/main/java/org/apache/samza/operators/impl/PartitionByOperatorImpl.java b/samza-core/src/main/java/org/apache/samza/operators/impl/PartitionByOperatorImpl.java
index 88644ce..134a517 100644
--- a/samza-core/src/main/java/org/apache/samza/operators/impl/PartitionByOperatorImpl.java
+++ b/samza-core/src/main/java/org/apache/samza/operators/impl/PartitionByOperatorImpl.java
@@ -19,7 +19,7 @@
 package org.apache.samza.operators.impl;
 
 import org.apache.samza.context.Context;
-import org.apache.samza.context.TaskContextImpl;
+import org.apache.samza.context.InternalTaskContext;
 import org.apache.samza.operators.functions.MapFunction;
 import org.apache.samza.operators.spec.OperatorSpec;
 import org.apache.samza.operators.spec.PartitionByOperatorSpec;
@@ -49,13 +49,13 @@ class PartitionByOperatorImpl<M, K, V> extends OperatorImpl<M, Void> {
   private final ControlMessageSender controlMessageSender;
 
   PartitionByOperatorImpl(PartitionByOperatorSpec<M, K, V> partitionByOpSpec,
-      SystemStream systemStream, Context context) {
+      SystemStream systemStream, InternalTaskContext internalTaskContext) {
     this.partitionByOpSpec = partitionByOpSpec;
     this.systemStream = systemStream;
     this.keyFunction = partitionByOpSpec.getKeyFunction();
     this.valueFunction = partitionByOpSpec.getValueFunction();
-    this.taskName = context.getTaskContext().getTaskModel().getTaskName().getTaskName();
-    StreamMetadataCache streamMetadataCache = ((TaskContextImpl) context.getTaskContext()).getStreamMetadataCache();
+    this.taskName = internalTaskContext.getContext().getTaskContext().getTaskModel().getTaskName().getTaskName();
+    StreamMetadataCache streamMetadataCache = internalTaskContext.getStreamMetadataCache();
     this.controlMessageSender = new ControlMessageSender(streamMetadataCache);
   }
 
diff --git a/samza-core/src/test/java/org/apache/samza/context/TestInternalTaskContext.java b/samza-core/src/test/java/org/apache/samza/context/TestInternalTaskContext.java
new file mode 100644
index 0000000..04155e6
--- /dev/null
+++ b/samza-core/src/test/java/org/apache/samza/context/TestInternalTaskContext.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.context;
+
+import org.junit.Before;
+import org.junit.Test;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+
+public class TestInternalTaskContext {
+
+  private InternalTaskContext internalTaskContext;
+
+  @Before
+  public void setup() {
+    internalTaskContext = new InternalTaskContext(null);
+  }
+
+  /**
+   * Given a registered object, fetchObject should get it. If an object is not registered at a key, then fetchObject
+   * should return null.
+   */
+  @Test
+  public void testRegisterAndFetchObject() {
+    String value = "hello world";
+    internalTaskContext.registerObject("key", value);
+    assertEquals(value, internalTaskContext.fetchObject("key"));
+    assertNull(internalTaskContext.fetchObject("not a key"));
+  }
+
+}
\ No newline at end of file
diff --git a/samza-core/src/test/java/org/apache/samza/context/TestTaskContextImpl.java b/samza-core/src/test/java/org/apache/samza/context/TestTaskContextImpl.java
index 3d3803b..0e8f78e 100644
--- a/samza-core/src/test/java/org/apache/samza/context/TestTaskContextImpl.java
+++ b/samza-core/src/test/java/org/apache/samza/context/TestTaskContextImpl.java
@@ -34,7 +34,6 @@ import org.mockito.Mock;
 import org.mockito.MockitoAnnotations;
 
 import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNull;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
@@ -97,15 +96,5 @@ public class TestTaskContextImpl {
     verify(offsetManager).setStartingOffset(TASK_NAME, ssp, "123");
   }
 
-  /**
-   * Given a registered object, fetchObject should get it. If an object is not registered at a key, then fetchObject
-   * should return null.
-   */
-  @Test
-  public void testRegisterAndFetchObject() {
-    String value = "hello world";
-    taskContext.registerObject("key", value);
-    assertEquals(value, taskContext.fetchObject("key"));
-    assertNull(taskContext.fetchObject("not a key"));
-  }
+
 }
\ No newline at end of file
diff --git a/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImpl.java b/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImpl.java
index d5d7f3d..6279960 100644
--- a/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImpl.java
+++ b/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImpl.java
@@ -22,6 +22,7 @@ import java.util.Collection;
 import java.util.Collections;
 import java.util.Set;
 import org.apache.samza.context.Context;
+import org.apache.samza.context.InternalTaskContext;
 import org.apache.samza.context.MockContext;
 import org.apache.samza.job.model.TaskModel;
 import org.apache.samza.metrics.Counter;
@@ -47,10 +48,16 @@ import static org.mockito.Mockito.when;
 
 public class TestOperatorImpl {
   private Context context;
+  private InternalTaskContext internalTaskContext;
 
   @Before
   public void setup() {
     this.context = new MockContext();
+    this.internalTaskContext = mock(InternalTaskContext.class);
+    when(this.internalTaskContext.getContext()).thenReturn(this.context);
+    // might be necessary in the future
+    when(this.internalTaskContext.fetchObject(EndOfStreamStates.class.getName())).thenReturn(mock(EndOfStreamStates.class));
+    when(this.internalTaskContext.fetchObject(WatermarkStates.class.getName())).thenReturn(mock(WatermarkStates.class));
     when(this.context.getTaskContext().getTaskMetricsRegistry()).thenReturn(new MetricsRegistryMap());
     when(this.context.getTaskContext().getTaskModel()).thenReturn(mock(TaskModel.class));
     when(this.context.getContainerContext().getContainerMetricsRegistry()).thenReturn(new MetricsRegistryMap());
@@ -59,8 +66,8 @@ public class TestOperatorImpl {
   @Test(expected = IllegalStateException.class)
   public void testMultipleInitShouldThrow() {
     OperatorImpl<Object, Object> opImpl = new TestOpImpl(mock(Object.class));
-    opImpl.init(this.context);
-    opImpl.init(this.context);
+    opImpl.init(this.internalTaskContext);
+    opImpl.init(this.internalTaskContext);
   }
 
   @Test(expected = IllegalStateException.class)
@@ -73,19 +80,19 @@ public class TestOperatorImpl {
   public void testOnMessagePropagatesResults() {
     Object mockTestOpImplOutput = mock(Object.class);
     OperatorImpl<Object, Object> opImpl = new TestOpImpl(mockTestOpImplOutput);
-    opImpl.init(this.context);
+    opImpl.init(this.internalTaskContext);
 
     // register a couple of operators
     OperatorImpl mockNextOpImpl1 = mock(OperatorImpl.class);
     when(mockNextOpImpl1.getOperatorSpec()).thenReturn(new TestOpSpec());
     when(mockNextOpImpl1.handleMessage(anyObject(), anyObject(), anyObject())).thenReturn(Collections.emptyList());
-    mockNextOpImpl1.init(this.context);
+    mockNextOpImpl1.init(this.internalTaskContext);
     opImpl.registerNextOperator(mockNextOpImpl1);
 
     OperatorImpl mockNextOpImpl2 = mock(OperatorImpl.class);
     when(mockNextOpImpl2.getOperatorSpec()).thenReturn(new TestOpSpec());
     when(mockNextOpImpl2.handleMessage(anyObject(), anyObject(), anyObject())).thenReturn(Collections.emptyList());
-    mockNextOpImpl2.init(this.context);
+    mockNextOpImpl2.init(this.internalTaskContext);
     opImpl.registerNextOperator(mockNextOpImpl2);
 
     // send a message to this operator
@@ -109,7 +116,7 @@ public class TestOperatorImpl {
 
     Object mockTestOpImplOutput = mock(Object.class);
     OperatorImpl<Object, Object> opImpl = new TestOpImpl(mockTestOpImplOutput);
-    opImpl.init(this.context);
+    opImpl.init(this.internalTaskContext);
 
     // send a message to this operator
     MessageCollector mockCollector = mock(MessageCollector.class);
@@ -125,19 +132,19 @@ public class TestOperatorImpl {
   public void testOnTimerPropagatesResultsAndTimer() {
     Object mockTestOpImplOutput = mock(Object.class);
     OperatorImpl<Object, Object> opImpl = new TestOpImpl(mockTestOpImplOutput);
-    opImpl.init(this.context);
+    opImpl.init(this.internalTaskContext);
 
     // register a couple of operators
     OperatorImpl mockNextOpImpl1 = mock(OperatorImpl.class);
     when(mockNextOpImpl1.getOperatorSpec()).thenReturn(new TestOpSpec());
     when(mockNextOpImpl1.handleMessage(anyObject(), anyObject(), anyObject())).thenReturn(Collections.emptyList());
-    mockNextOpImpl1.init(this.context);
+    mockNextOpImpl1.init(this.internalTaskContext);
     opImpl.registerNextOperator(mockNextOpImpl1);
 
     OperatorImpl mockNextOpImpl2 = mock(OperatorImpl.class);
     when(mockNextOpImpl2.getOperatorSpec()).thenReturn(new TestOpSpec());
     when(mockNextOpImpl2.handleMessage(anyObject(), anyObject(), anyObject())).thenReturn(Collections.emptyList());
-    mockNextOpImpl2.init(this.context);
+    mockNextOpImpl2.init(this.internalTaskContext);
     opImpl.registerNextOperator(mockNextOpImpl2);
 
     // send a timer tick to this operator
@@ -165,7 +172,7 @@ public class TestOperatorImpl {
 
     Object mockTestOpImplOutput = mock(Object.class);
     OperatorImpl<Object, Object> opImpl = new TestOpImpl(mockTestOpImplOutput);
-    opImpl.init(this.context);
+    opImpl.init(this.internalTaskContext);
 
     // send a message to this operator
     MessageCollector mockCollector = mock(MessageCollector.class);
diff --git a/samza-core/src/test/java/org/apache/samza/operators/impl/TestWindowOperator.java b/samza-core/src/test/java/org/apache/samza/operators/impl/TestWindowOperator.java
index 0b240f1..c588b3c 100644
--- a/samza-core/src/test/java/org/apache/samza/operators/impl/TestWindowOperator.java
+++ b/samza-core/src/test/java/org/apache/samza/operators/impl/TestWindowOperator.java
@@ -25,7 +25,6 @@ import java.io.IOException;
 import java.time.Duration;
 import java.util.ArrayList;
 import java.util.Collection;
-import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -37,7 +36,6 @@ import org.apache.samza.config.MapConfig;
 import org.apache.samza.container.TaskName;
 import org.apache.samza.context.Context;
 import org.apache.samza.context.MockContext;
-import org.apache.samza.context.TaskContextImpl;
 import org.apache.samza.system.descriptors.GenericInputDescriptor;
 import org.apache.samza.system.descriptors.GenericSystemDescriptor;
 import org.apache.samza.job.model.TaskModel;
@@ -95,7 +93,7 @@ public class TestWindowOperator {
 
     TaskModel taskModel = mock(TaskModel.class);
     when(taskModel.getSystemStreamPartitions()).thenReturn(ImmutableSet
-        .of(new SystemStreamPartition("kafka", "integTestExecutionPlannerers", new Partition(0))));
+        .of(new SystemStreamPartition("kafka", "integers", new Partition(0))));
     when(taskModel.getTaskName()).thenReturn(new TaskName("task 1"));
     when(this.context.getTaskContext().getTaskModel()).thenReturn(taskModel);
     when(this.context.getTaskContext().getTaskMetricsRegistry()).thenReturn(new MetricsRegistryMap());
@@ -436,13 +434,6 @@ public class TestWindowOperator {
 
   @Test
   public void testEndOfStreamFlushesWithEarlyTriggerFirings() throws Exception {
-    EndOfStreamStates endOfStreamStates = new EndOfStreamStates(ImmutableSet.of(new SystemStreamPartition("kafka",
-        "integers", new Partition(0))), Collections.emptyMap());
-
-    when(((TaskContextImpl) this.context.getTaskContext()).fetchObject(EndOfStreamStates.class.getName())).thenReturn(
-        endOfStreamStates);
-    when(((TaskContextImpl) this.context.getTaskContext()).fetchObject(WatermarkStates.class.getName())).thenReturn(
-        mock(WatermarkStates.class));
 
     OperatorSpecGraph sgb = this.getTumblingWindowStreamGraph(AccumulationMode.DISCARDING,
         Duration.ofSeconds(1), Triggers.repeat(Triggers.count(2))).getOperatorSpecGraph();
@@ -478,14 +469,6 @@ public class TestWindowOperator {
 
   @Test
   public void testEndOfStreamFlushesWithDefaultTriggerFirings() throws Exception {
-    EndOfStreamStates endOfStreamStates = new EndOfStreamStates(ImmutableSet.of(new SystemStreamPartition("kafka",
-        "integers", new Partition(0))), Collections.emptyMap());
-
-    when(((TaskContextImpl) this.context.getTaskContext()).fetchObject(EndOfStreamStates.class.getName())).thenReturn(
-        endOfStreamStates);
-    when(((TaskContextImpl) this.context.getTaskContext()).fetchObject(WatermarkStates.class.getName())).thenReturn(
-        mock(WatermarkStates.class));
-
     OperatorSpecGraph sgb =
         this.getKeyedSessionWindowStreamGraph(AccumulationMode.DISCARDING, Duration.ofMillis(500)).getOperatorSpecGraph();
     TestClock testClock = new TestClock();
@@ -516,13 +499,6 @@ public class TestWindowOperator {
 
   @Test
   public void testEndOfStreamFlushesWithNoTriggerFirings() throws Exception {
-    EndOfStreamStates endOfStreamStates = new EndOfStreamStates(ImmutableSet.of(new SystemStreamPartition("kafka",
-        "integers", new Partition(0))), Collections.emptyMap());
-
-    when(((TaskContextImpl) this.context.getTaskContext()).fetchObject(EndOfStreamStates.class.getName())).thenReturn(
-        endOfStreamStates);
-    when(((TaskContextImpl) this.context.getTaskContext()).fetchObject(WatermarkStates.class.getName())).thenReturn(
-        mock(WatermarkStates.class));
 
     OperatorSpecGraph sgb =
         this.getKeyedSessionWindowStreamGraph(AccumulationMode.DISCARDING, Duration.ofMillis(500)).getOperatorSpecGraph();