You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by pm...@apache.org on 2019/03/06 18:39:07 UTC
[samza] branch master updated: SAMZA-1935: Refactor TaskContextImpl
to not include access to objects that are only used internally
This is an automated email from the ASF dual-hosted git repository.
pmaheshwari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git
The following commit(s) were added to refs/heads/master by this push:
new dace47b SAMZA-1935: Refactor TaskContextImpl to not include access to objects that are only used internally
dace47b is described below
commit dace47b94e7d2c3ef1d171ab922035eaa7bb3fc9
Author: Cyril Pottiez <cy...@hotmail.fr>
AuthorDate: Wed Mar 6 10:39:05 2019 -0800
SAMZA-1935: Refactor TaskContextImpl to not include access to objects that are only used internally
We created a new class called JobContextMetadata which hold the methods `registerObject`, `fetchObject`, `getJobModel` and `getStreamMetadataCache` and is instantiated in the `OperatorImplGraph` and is passed to multiple `OperatorImpl`.
The class `EmbeddedTaggedRateLimiter` implements the public API `RateLimiter`, which has a method called init with a `Context` object as inparameter. This means that the Context object is the only source from which the JobModel and the `StreamMetadataCache` can be fetched. Hence, the methods `getJobModel` and `getStreamMetadataCache` are needed outside of `JobContextMetadata`. Therefore only copies of these methods were moved to `JobContextMetadata`. This means that the `StreamMetadat [...]
Author: Cyril Pottiez <cy...@hotmail.fr>
Author: Sara Ersson <sa...@kth.se>
Reviewers: Cameron Lee <ca...@linkedin.com>
Closes #934 from vwidin/testing
---
.../apache/samza/context/InternalTaskContext.java | 56 ++++++++++++++++++++++
.../org/apache/samza/context/TaskContextImpl.java | 13 -----
.../apache/samza/operators/impl/OperatorImpl.java | 18 +++----
.../samza/operators/impl/OperatorImplGraph.java | 28 ++++++-----
.../operators/impl/PartitionByOperatorImpl.java | 8 ++--
.../samza/context/TestInternalTaskContext.java | 47 ++++++++++++++++++
.../apache/samza/context/TestTaskContextImpl.java | 13 +----
.../samza/operators/impl/TestOperatorImpl.java | 27 +++++++----
.../samza/operators/impl/TestWindowOperator.java | 26 +---------
9 files changed, 153 insertions(+), 83 deletions(-)
diff --git a/samza-core/src/main/java/org/apache/samza/context/InternalTaskContext.java b/samza-core/src/main/java/org/apache/samza/context/InternalTaskContext.java
new file mode 100644
index 0000000..3d3a53d
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/context/InternalTaskContext.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.context;
+
+import org.apache.samza.job.model.JobModel;
+import org.apache.samza.system.StreamMetadataCache;
+
+import java.util.HashMap;
+import java.util.Map;
+
+public class InternalTaskContext {
+
+ private final Context context;
+ private final Map<String, Object> objectRegistry = new HashMap<>();
+
+ public InternalTaskContext(Context context) {
+ this.context = context;
+ }
+
+ public void registerObject(String name, Object value) {
+ this.objectRegistry.put(name, value);
+ }
+
+ public Object fetchObject(String name) {
+ return this.objectRegistry.get(name);
+ }
+
+ public Context getContext() {
+ return context;
+ }
+
+ public JobModel getJobModel() {
+ return ((TaskContextImpl) this.context.getTaskContext()).getJobModel();
+ }
+
+ public StreamMetadataCache getStreamMetadataCache() {
+ return ((TaskContextImpl) this.context.getTaskContext()).getStreamMetadataCache();
+ }
+}
diff --git a/samza-core/src/main/java/org/apache/samza/context/TaskContextImpl.java b/samza-core/src/main/java/org/apache/samza/context/TaskContextImpl.java
index a29c2b3..edec17d 100644
--- a/samza-core/src/main/java/org/apache/samza/context/TaskContextImpl.java
+++ b/samza-core/src/main/java/org/apache/samza/context/TaskContextImpl.java
@@ -29,8 +29,6 @@ import org.apache.samza.system.SystemStreamPartition;
import org.apache.samza.table.ReadWriteTable;
import org.apache.samza.table.TableManager;
-import java.util.HashMap;
-import java.util.Map;
import java.util.function.Function;
@@ -43,7 +41,6 @@ public class TaskContextImpl implements TaskContext {
private final OffsetManager offsetManager;
private final JobModel jobModel;
private final StreamMetadataCache streamMetadataCache;
- private final Map<String, Object> objectRegistry = new HashMap<>();
public TaskContextImpl(TaskModel taskModel,
MetricsRegistry taskMetricsRegistry,
@@ -97,16 +94,6 @@ public class TaskContextImpl implements TaskContext {
this.offsetManager.setStartingOffset(this.taskModel.getTaskName(), systemStreamPartition, offset);
}
- // TODO SAMZA-1935: below methods are used by operator code; they should be decoupled from this client API
-
- public void registerObject(String name, Object value) {
- this.objectRegistry.put(name, value);
- }
-
- public Object fetchObject(String name) {
- return this.objectRegistry.get(name);
- }
-
public JobModel getJobModel() {
return this.jobModel;
}
diff --git a/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImpl.java b/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImpl.java
index 728a171..276e8c3 100644
--- a/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImpl.java
+++ b/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImpl.java
@@ -24,7 +24,8 @@ import org.apache.samza.config.MetricsConfig;
import org.apache.samza.container.TaskName;
import org.apache.samza.context.ContainerContext;
import org.apache.samza.context.Context;
-import org.apache.samza.context.TaskContextImpl;
+import org.apache.samza.context.InternalTaskContext;
+import org.apache.samza.context.TaskContext;
import org.apache.samza.job.model.TaskModel;
import org.apache.samza.metrics.Counter;
import org.apache.samza.metrics.MetricsRegistry;
@@ -89,9 +90,11 @@ public abstract class OperatorImpl<M, RM> {
/**
* Initialize this {@link OperatorImpl} and its user-defined functions.
*
- * @param context the {@link Context} for the task
+ * @param internalTaskContext the {@link InternalTaskContext} for the task
*/
- public final void init(Context context) {
+ public final void init(InternalTaskContext internalTaskContext) {
+ final Context context = internalTaskContext.getContext();
+
String opId = getOpImplId();
if (initialized) {
@@ -113,12 +116,11 @@ public abstract class OperatorImpl<M, RM> {
this.handleMessageNs = metricsRegistry.newTimer(METRICS_GROUP, opId + "-handle-message-ns");
this.handleTimerNs = metricsRegistry.newTimer(METRICS_GROUP, opId + "-handle-timer-ns");
- // TODO SAMZA-1935: the objects that are only accessible through TaskContextImpl should be moved somewhere else
- final TaskContextImpl taskContext = (TaskContextImpl) context.getTaskContext();
+ final TaskContext taskContext = context.getTaskContext();
this.taskName = taskContext.getTaskModel().getTaskName();
- this.eosStates = (EndOfStreamStates) taskContext.fetchObject(EndOfStreamStates.class.getName());
- this.watermarkStates = (WatermarkStates) taskContext.fetchObject(WatermarkStates.class.getName());
- this.controlMessageSender = new ControlMessageSender(taskContext.getStreamMetadataCache());
+ this.eosStates = (EndOfStreamStates) internalTaskContext.fetchObject(EndOfStreamStates.class.getName());
+ this.watermarkStates = (WatermarkStates) internalTaskContext.fetchObject(WatermarkStates.class.getName());
+ this.controlMessageSender = new ControlMessageSender(internalTaskContext.getStreamMetadataCache());
this.taskModel = taskContext.getTaskModel();
this.callbackScheduler = taskContext.getCallbackScheduler();
handleInit(context);
diff --git a/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImplGraph.java b/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImplGraph.java
index e668b91..9f33356 100644
--- a/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImplGraph.java
+++ b/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImplGraph.java
@@ -24,7 +24,7 @@ import com.google.common.collect.Multimap;
import org.apache.samza.config.Config;
import org.apache.samza.config.StreamConfig;
import org.apache.samza.context.Context;
-import org.apache.samza.context.TaskContextImpl;
+import org.apache.samza.context.InternalTaskContext;
import org.apache.samza.job.model.JobModel;
import org.apache.samza.operators.KV;
import org.apache.samza.operators.OperatorSpecGraph;
@@ -86,6 +86,8 @@ public class OperatorImplGraph {
private final Clock clock;
+ private InternalTaskContext internalTaskContext;
+
/**
* Constructs the DAG of {@link OperatorImpl}s corresponding to the the DAG of {@link OperatorSpec}s
* in the {@code specGraph}.
@@ -97,12 +99,11 @@ public class OperatorImplGraph {
public OperatorImplGraph(OperatorSpecGraph specGraph, Context context, Clock clock) {
this.clock = clock;
StreamConfig streamConfig = new StreamConfig(context.getJobContext().getConfig());
- // TODO SAMZA-1935: the objects that are only accessible through TaskContextImpl should be moved somewhere else
- TaskContextImpl taskContext = (TaskContextImpl) context.getTaskContext();
+ this.internalTaskContext = new InternalTaskContext(context);
Map<SystemStream, Integer> producerTaskCounts =
hasIntermediateStreams(specGraph)
? getProducerTaskCountForIntermediateStreams(
- getStreamToConsumerTasks(taskContext.getJobModel()),
+ getStreamToConsumerTasks(internalTaskContext.getJobModel()),
getIntermediateToInputStreamsMap(specGraph, streamConfig))
: Collections.EMPTY_MAP;
producerTaskCounts.forEach((stream, count) -> {
@@ -110,12 +111,16 @@ public class OperatorImplGraph {
});
// set states for end-of-stream
- taskContext.registerObject(EndOfStreamStates.class.getName(),
- new EndOfStreamStates(taskContext.getTaskModel().getSystemStreamPartitions(), producerTaskCounts));
+ internalTaskContext.registerObject(EndOfStreamStates.class.getName(),
+ new EndOfStreamStates(
+ internalTaskContext.getContext().getTaskContext().getTaskModel().getSystemStreamPartitions(),
+ producerTaskCounts));
// set states for watermark
- taskContext.registerObject(WatermarkStates.class.getName(),
- new WatermarkStates(taskContext.getTaskModel().getSystemStreamPartitions(), producerTaskCounts,
- context.getContainerContext().getContainerMetricsRegistry()));
+ internalTaskContext.registerObject(WatermarkStates.class.getName(),
+ new WatermarkStates(
+ internalTaskContext.getContext().getTaskContext().getTaskModel().getSystemStreamPartitions(),
+ producerTaskCounts,
+ context.getContainerContext().getContainerMetricsRegistry()));
specGraph.getInputOperators().forEach((streamId, inputOpSpec) -> {
SystemStream systemStream = streamConfig.streamIdToSystemStream(streamId);
@@ -166,7 +171,7 @@ public class OperatorImplGraph {
// Either this is the first time we've seen this operatorSpec, or this is a join operator spec
// and we need to create 2 partial join operator impls for it. Initialize and register the sub-DAG.
OperatorImpl operatorImpl = createOperatorImpl(prevOperatorSpec, operatorSpec, context);
- operatorImpl.init(context);
+ operatorImpl.init(this.internalTaskContext);
operatorImpl.registerInputStream(inputStream);
if (operatorSpec.getScheduledFn() != null) {
@@ -223,7 +228,8 @@ public class OperatorImplGraph {
} else if (operatorSpec instanceof PartitionByOperatorSpec) {
String streamId = ((PartitionByOperatorSpec) operatorSpec).getOutputStream().getStreamId();
SystemStream systemStream = streamConfig.streamIdToSystemStream(streamId);
- return new PartitionByOperatorImpl((PartitionByOperatorSpec) operatorSpec, systemStream, context);
+ return new PartitionByOperatorImpl((PartitionByOperatorSpec) operatorSpec, systemStream,
+ internalTaskContext);
} else if (operatorSpec instanceof WindowOperatorSpec) {
return new WindowOperatorImpl((WindowOperatorSpec) operatorSpec, clock);
} else if (operatorSpec instanceof JoinOperatorSpec) {
diff --git a/samza-core/src/main/java/org/apache/samza/operators/impl/PartitionByOperatorImpl.java b/samza-core/src/main/java/org/apache/samza/operators/impl/PartitionByOperatorImpl.java
index 88644ce..134a517 100644
--- a/samza-core/src/main/java/org/apache/samza/operators/impl/PartitionByOperatorImpl.java
+++ b/samza-core/src/main/java/org/apache/samza/operators/impl/PartitionByOperatorImpl.java
@@ -19,7 +19,7 @@
package org.apache.samza.operators.impl;
import org.apache.samza.context.Context;
-import org.apache.samza.context.TaskContextImpl;
+import org.apache.samza.context.InternalTaskContext;
import org.apache.samza.operators.functions.MapFunction;
import org.apache.samza.operators.spec.OperatorSpec;
import org.apache.samza.operators.spec.PartitionByOperatorSpec;
@@ -49,13 +49,13 @@ class PartitionByOperatorImpl<M, K, V> extends OperatorImpl<M, Void> {
private final ControlMessageSender controlMessageSender;
PartitionByOperatorImpl(PartitionByOperatorSpec<M, K, V> partitionByOpSpec,
- SystemStream systemStream, Context context) {
+ SystemStream systemStream, InternalTaskContext internalTaskContext) {
this.partitionByOpSpec = partitionByOpSpec;
this.systemStream = systemStream;
this.keyFunction = partitionByOpSpec.getKeyFunction();
this.valueFunction = partitionByOpSpec.getValueFunction();
- this.taskName = context.getTaskContext().getTaskModel().getTaskName().getTaskName();
- StreamMetadataCache streamMetadataCache = ((TaskContextImpl) context.getTaskContext()).getStreamMetadataCache();
+ this.taskName = internalTaskContext.getContext().getTaskContext().getTaskModel().getTaskName().getTaskName();
+ StreamMetadataCache streamMetadataCache = internalTaskContext.getStreamMetadataCache();
this.controlMessageSender = new ControlMessageSender(streamMetadataCache);
}
diff --git a/samza-core/src/test/java/org/apache/samza/context/TestInternalTaskContext.java b/samza-core/src/test/java/org/apache/samza/context/TestInternalTaskContext.java
new file mode 100644
index 0000000..04155e6
--- /dev/null
+++ b/samza-core/src/test/java/org/apache/samza/context/TestInternalTaskContext.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.context;
+
+import org.junit.Before;
+import org.junit.Test;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+
+public class TestInternalTaskContext {
+
+ private InternalTaskContext internalTaskContext;
+
+ @Before
+ public void setup() {
+ internalTaskContext = new InternalTaskContext(null);
+ }
+
+ /**
+ * Given a registered object, fetchObject should get it. If an object is not registered at a key, then fetchObject
+ * should return null.
+ */
+ @Test
+ public void testRegisterAndFetchObject() {
+ String value = "hello world";
+ internalTaskContext.registerObject("key", value);
+ assertEquals(value, internalTaskContext.fetchObject("key"));
+ assertNull(internalTaskContext.fetchObject("not a key"));
+ }
+
+}
\ No newline at end of file
diff --git a/samza-core/src/test/java/org/apache/samza/context/TestTaskContextImpl.java b/samza-core/src/test/java/org/apache/samza/context/TestTaskContextImpl.java
index 3d3803b..0e8f78e 100644
--- a/samza-core/src/test/java/org/apache/samza/context/TestTaskContextImpl.java
+++ b/samza-core/src/test/java/org/apache/samza/context/TestTaskContextImpl.java
@@ -34,7 +34,6 @@ import org.mockito.Mock;
import org.mockito.MockitoAnnotations;
import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNull;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
@@ -97,15 +96,5 @@ public class TestTaskContextImpl {
verify(offsetManager).setStartingOffset(TASK_NAME, ssp, "123");
}
- /**
- * Given a registered object, fetchObject should get it. If an object is not registered at a key, then fetchObject
- * should return null.
- */
- @Test
- public void testRegisterAndFetchObject() {
- String value = "hello world";
- taskContext.registerObject("key", value);
- assertEquals(value, taskContext.fetchObject("key"));
- assertNull(taskContext.fetchObject("not a key"));
- }
+
}
\ No newline at end of file
diff --git a/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImpl.java b/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImpl.java
index d5d7f3d..6279960 100644
--- a/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImpl.java
+++ b/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImpl.java
@@ -22,6 +22,7 @@ import java.util.Collection;
import java.util.Collections;
import java.util.Set;
import org.apache.samza.context.Context;
+import org.apache.samza.context.InternalTaskContext;
import org.apache.samza.context.MockContext;
import org.apache.samza.job.model.TaskModel;
import org.apache.samza.metrics.Counter;
@@ -47,10 +48,16 @@ import static org.mockito.Mockito.when;
public class TestOperatorImpl {
private Context context;
+ private InternalTaskContext internalTaskContext;
@Before
public void setup() {
this.context = new MockContext();
+ this.internalTaskContext = mock(InternalTaskContext.class);
+ when(this.internalTaskContext.getContext()).thenReturn(this.context);
+ // might be necessary in the future
+ when(this.internalTaskContext.fetchObject(EndOfStreamStates.class.getName())).thenReturn(mock(EndOfStreamStates.class));
+ when(this.internalTaskContext.fetchObject(WatermarkStates.class.getName())).thenReturn(mock(WatermarkStates.class));
when(this.context.getTaskContext().getTaskMetricsRegistry()).thenReturn(new MetricsRegistryMap());
when(this.context.getTaskContext().getTaskModel()).thenReturn(mock(TaskModel.class));
when(this.context.getContainerContext().getContainerMetricsRegistry()).thenReturn(new MetricsRegistryMap());
@@ -59,8 +66,8 @@ public class TestOperatorImpl {
@Test(expected = IllegalStateException.class)
public void testMultipleInitShouldThrow() {
OperatorImpl<Object, Object> opImpl = new TestOpImpl(mock(Object.class));
- opImpl.init(this.context);
- opImpl.init(this.context);
+ opImpl.init(this.internalTaskContext);
+ opImpl.init(this.internalTaskContext);
}
@Test(expected = IllegalStateException.class)
@@ -73,19 +80,19 @@ public class TestOperatorImpl {
public void testOnMessagePropagatesResults() {
Object mockTestOpImplOutput = mock(Object.class);
OperatorImpl<Object, Object> opImpl = new TestOpImpl(mockTestOpImplOutput);
- opImpl.init(this.context);
+ opImpl.init(this.internalTaskContext);
// register a couple of operators
OperatorImpl mockNextOpImpl1 = mock(OperatorImpl.class);
when(mockNextOpImpl1.getOperatorSpec()).thenReturn(new TestOpSpec());
when(mockNextOpImpl1.handleMessage(anyObject(), anyObject(), anyObject())).thenReturn(Collections.emptyList());
- mockNextOpImpl1.init(this.context);
+ mockNextOpImpl1.init(this.internalTaskContext);
opImpl.registerNextOperator(mockNextOpImpl1);
OperatorImpl mockNextOpImpl2 = mock(OperatorImpl.class);
when(mockNextOpImpl2.getOperatorSpec()).thenReturn(new TestOpSpec());
when(mockNextOpImpl2.handleMessage(anyObject(), anyObject(), anyObject())).thenReturn(Collections.emptyList());
- mockNextOpImpl2.init(this.context);
+ mockNextOpImpl2.init(this.internalTaskContext);
opImpl.registerNextOperator(mockNextOpImpl2);
// send a message to this operator
@@ -109,7 +116,7 @@ public class TestOperatorImpl {
Object mockTestOpImplOutput = mock(Object.class);
OperatorImpl<Object, Object> opImpl = new TestOpImpl(mockTestOpImplOutput);
- opImpl.init(this.context);
+ opImpl.init(this.internalTaskContext);
// send a message to this operator
MessageCollector mockCollector = mock(MessageCollector.class);
@@ -125,19 +132,19 @@ public class TestOperatorImpl {
public void testOnTimerPropagatesResultsAndTimer() {
Object mockTestOpImplOutput = mock(Object.class);
OperatorImpl<Object, Object> opImpl = new TestOpImpl(mockTestOpImplOutput);
- opImpl.init(this.context);
+ opImpl.init(this.internalTaskContext);
// register a couple of operators
OperatorImpl mockNextOpImpl1 = mock(OperatorImpl.class);
when(mockNextOpImpl1.getOperatorSpec()).thenReturn(new TestOpSpec());
when(mockNextOpImpl1.handleMessage(anyObject(), anyObject(), anyObject())).thenReturn(Collections.emptyList());
- mockNextOpImpl1.init(this.context);
+ mockNextOpImpl1.init(this.internalTaskContext);
opImpl.registerNextOperator(mockNextOpImpl1);
OperatorImpl mockNextOpImpl2 = mock(OperatorImpl.class);
when(mockNextOpImpl2.getOperatorSpec()).thenReturn(new TestOpSpec());
when(mockNextOpImpl2.handleMessage(anyObject(), anyObject(), anyObject())).thenReturn(Collections.emptyList());
- mockNextOpImpl2.init(this.context);
+ mockNextOpImpl2.init(this.internalTaskContext);
opImpl.registerNextOperator(mockNextOpImpl2);
// send a timer tick to this operator
@@ -165,7 +172,7 @@ public class TestOperatorImpl {
Object mockTestOpImplOutput = mock(Object.class);
OperatorImpl<Object, Object> opImpl = new TestOpImpl(mockTestOpImplOutput);
- opImpl.init(this.context);
+ opImpl.init(this.internalTaskContext);
// send a message to this operator
MessageCollector mockCollector = mock(MessageCollector.class);
diff --git a/samza-core/src/test/java/org/apache/samza/operators/impl/TestWindowOperator.java b/samza-core/src/test/java/org/apache/samza/operators/impl/TestWindowOperator.java
index 0b240f1..c588b3c 100644
--- a/samza-core/src/test/java/org/apache/samza/operators/impl/TestWindowOperator.java
+++ b/samza-core/src/test/java/org/apache/samza/operators/impl/TestWindowOperator.java
@@ -25,7 +25,6 @@ import java.io.IOException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
-import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -37,7 +36,6 @@ import org.apache.samza.config.MapConfig;
import org.apache.samza.container.TaskName;
import org.apache.samza.context.Context;
import org.apache.samza.context.MockContext;
-import org.apache.samza.context.TaskContextImpl;
import org.apache.samza.system.descriptors.GenericInputDescriptor;
import org.apache.samza.system.descriptors.GenericSystemDescriptor;
import org.apache.samza.job.model.TaskModel;
@@ -95,7 +93,7 @@ public class TestWindowOperator {
TaskModel taskModel = mock(TaskModel.class);
when(taskModel.getSystemStreamPartitions()).thenReturn(ImmutableSet
- .of(new SystemStreamPartition("kafka", "integTestExecutionPlannerers", new Partition(0))));
+ .of(new SystemStreamPartition("kafka", "integers", new Partition(0))));
when(taskModel.getTaskName()).thenReturn(new TaskName("task 1"));
when(this.context.getTaskContext().getTaskModel()).thenReturn(taskModel);
when(this.context.getTaskContext().getTaskMetricsRegistry()).thenReturn(new MetricsRegistryMap());
@@ -436,13 +434,6 @@ public class TestWindowOperator {
@Test
public void testEndOfStreamFlushesWithEarlyTriggerFirings() throws Exception {
- EndOfStreamStates endOfStreamStates = new EndOfStreamStates(ImmutableSet.of(new SystemStreamPartition("kafka",
- "integers", new Partition(0))), Collections.emptyMap());
-
- when(((TaskContextImpl) this.context.getTaskContext()).fetchObject(EndOfStreamStates.class.getName())).thenReturn(
- endOfStreamStates);
- when(((TaskContextImpl) this.context.getTaskContext()).fetchObject(WatermarkStates.class.getName())).thenReturn(
- mock(WatermarkStates.class));
OperatorSpecGraph sgb = this.getTumblingWindowStreamGraph(AccumulationMode.DISCARDING,
Duration.ofSeconds(1), Triggers.repeat(Triggers.count(2))).getOperatorSpecGraph();
@@ -478,14 +469,6 @@ public class TestWindowOperator {
@Test
public void testEndOfStreamFlushesWithDefaultTriggerFirings() throws Exception {
- EndOfStreamStates endOfStreamStates = new EndOfStreamStates(ImmutableSet.of(new SystemStreamPartition("kafka",
- "integers", new Partition(0))), Collections.emptyMap());
-
- when(((TaskContextImpl) this.context.getTaskContext()).fetchObject(EndOfStreamStates.class.getName())).thenReturn(
- endOfStreamStates);
- when(((TaskContextImpl) this.context.getTaskContext()).fetchObject(WatermarkStates.class.getName())).thenReturn(
- mock(WatermarkStates.class));
-
OperatorSpecGraph sgb =
this.getKeyedSessionWindowStreamGraph(AccumulationMode.DISCARDING, Duration.ofMillis(500)).getOperatorSpecGraph();
TestClock testClock = new TestClock();
@@ -516,13 +499,6 @@ public class TestWindowOperator {
@Test
public void testEndOfStreamFlushesWithNoTriggerFirings() throws Exception {
- EndOfStreamStates endOfStreamStates = new EndOfStreamStates(ImmutableSet.of(new SystemStreamPartition("kafka",
- "integers", new Partition(0))), Collections.emptyMap());
-
- when(((TaskContextImpl) this.context.getTaskContext()).fetchObject(EndOfStreamStates.class.getName())).thenReturn(
- endOfStreamStates);
- when(((TaskContextImpl) this.context.getTaskContext()).fetchObject(WatermarkStates.class.getName())).thenReturn(
- mock(WatermarkStates.class));
OperatorSpecGraph sgb =
this.getKeyedSessionWindowStreamGraph(AccumulationMode.DISCARDING, Duration.ofMillis(500)).getOperatorSpecGraph();