You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by pm...@apache.org on 2018/10/10 22:34:26 UTC
[3/5] samza git commit: SAMZA-1714: Creating shared context factory
for shared context objects
http://git-wip-us.apache.org/repos/asf/samza/blob/9d2d49e9/samza-core/src/test/java/org/apache/samza/application/TestStreamApplicationDescriptorImpl.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/application/TestStreamApplicationDescriptorImpl.java b/samza-core/src/test/java/org/apache/samza/application/TestStreamApplicationDescriptorImpl.java
index 84f5dbb..de16ef2 100644
--- a/samza-core/src/test/java/org/apache/samza/application/TestStreamApplicationDescriptorImpl.java
+++ b/samza-core/src/test/java/org/apache/samza/application/TestStreamApplicationDescriptorImpl.java
@@ -19,16 +19,17 @@
package org.apache.samza.application;
import com.google.common.collect.ImmutableList;
-
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
+import java.util.Optional;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.samza.SamzaException;
import org.apache.samza.config.Config;
import org.apache.samza.config.JobConfig;
+import org.apache.samza.context.ApplicationContainerContextFactory;
+import org.apache.samza.context.ApplicationTaskContextFactory;
import org.apache.samza.operators.BaseTableDescriptor;
-import org.apache.samza.operators.ContextManager;
import org.apache.samza.operators.data.TestMessageEnvelope;
import org.apache.samza.operators.descriptors.GenericInputDescriptor;
import org.apache.samza.operators.descriptors.GenericOutputDescriptor;
@@ -521,11 +522,35 @@ public class TestStreamApplicationDescriptorImpl {
}
@Test
- public void testContextManager() {
- ContextManager cntxMan = mock(ContextManager.class);
- StreamApplication testApp = appDesc -> appDesc.withContextManager(cntxMan);
+ public void testApplicationContainerContextFactory() {
+ ApplicationContainerContextFactory factory = mock(ApplicationContainerContextFactory.class);
+ StreamApplication testApp = appDesc -> appDesc.withApplicationContainerContextFactory(factory);
+ StreamApplicationDescriptorImpl appSpec = new StreamApplicationDescriptorImpl(testApp, mock(Config.class));
+ assertEquals(appSpec.getApplicationContainerContextFactory(), Optional.of(factory));
+ }
+
+ @Test
+ public void testNoApplicationContainerContextFactory() {
+ StreamApplication testApp = appDesc -> {
+ };
+ StreamApplicationDescriptorImpl appSpec = new StreamApplicationDescriptorImpl(testApp, mock(Config.class));
+ assertEquals(appSpec.getApplicationContainerContextFactory(), Optional.empty());
+ }
+
+ @Test
+ public void testApplicationTaskContextFactory() {
+ ApplicationTaskContextFactory factory = mock(ApplicationTaskContextFactory.class);
+ StreamApplication testApp = appDesc -> appDesc.withApplicationTaskContextFactory(factory);
+ StreamApplicationDescriptorImpl appSpec = new StreamApplicationDescriptorImpl(testApp, mock(Config.class));
+ assertEquals(appSpec.getApplicationTaskContextFactory(), Optional.of(factory));
+ }
+
+ @Test
+ public void testNoApplicationTaskContextFactory() {
+ StreamApplication testApp = appDesc -> {
+ };
StreamApplicationDescriptorImpl appSpec = new StreamApplicationDescriptorImpl(testApp, mock(Config.class));
- assertEquals(appSpec.getContextManager(), cntxMan);
+ assertEquals(appSpec.getApplicationTaskContextFactory(), Optional.empty());
}
@Test
http://git-wip-us.apache.org/repos/asf/samza/blob/9d2d49e9/samza-core/src/test/java/org/apache/samza/application/TestTaskApplicationDescriptorImpl.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/application/TestTaskApplicationDescriptorImpl.java b/samza-core/src/test/java/org/apache/samza/application/TestTaskApplicationDescriptorImpl.java
index abe5ce1..e79e25b 100644
--- a/samza-core/src/test/java/org/apache/samza/application/TestTaskApplicationDescriptorImpl.java
+++ b/samza-core/src/test/java/org/apache/samza/application/TestTaskApplicationDescriptorImpl.java
@@ -21,10 +21,12 @@ package org.apache.samza.application;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
+import java.util.Optional;
import java.util.Set;
import org.apache.samza.config.Config;
+import org.apache.samza.context.ApplicationContainerContextFactory;
+import org.apache.samza.context.ApplicationTaskContextFactory;
import org.apache.samza.operators.BaseTableDescriptor;
-import org.apache.samza.operators.ContextManager;
import org.apache.samza.operators.TableDescriptor;
import org.apache.samza.operators.descriptors.base.stream.InputDescriptor;
import org.apache.samza.operators.descriptors.base.stream.OutputDescriptor;
@@ -127,13 +129,35 @@ public class TestTaskApplicationDescriptorImpl {
}
@Test
- public void testContextManager() {
- ContextManager cntxMan = mock(ContextManager.class);
+ public void testApplicationContainerContextFactory() {
+ ApplicationContainerContextFactory factory = mock(ApplicationContainerContextFactory.class);
+ TaskApplication testApp = appDesc -> appDesc.withApplicationContainerContextFactory(factory);
+ TaskApplicationDescriptorImpl appSpec = new TaskApplicationDescriptorImpl(testApp, mock(Config.class));
+ assertEquals(appSpec.getApplicationContainerContextFactory(), Optional.of(factory));
+ }
+
+ @Test
+ public void testNoApplicationContainerContextFactory() {
TaskApplication testApp = appDesc -> {
- appDesc.withContextManager(cntxMan);
};
- TaskApplicationDescriptorImpl appDesc = new TaskApplicationDescriptorImpl(testApp, config);
- assertEquals(appDesc.getContextManager(), cntxMan);
+ TaskApplicationDescriptorImpl appSpec = new TaskApplicationDescriptorImpl(testApp, mock(Config.class));
+ assertEquals(appSpec.getApplicationContainerContextFactory(), Optional.empty());
+ }
+
+ @Test
+ public void testApplicationTaskContextFactory() {
+ ApplicationTaskContextFactory factory = mock(ApplicationTaskContextFactory.class);
+ TaskApplication testApp = appDesc -> appDesc.withApplicationTaskContextFactory(factory);
+ TaskApplicationDescriptorImpl appSpec = new TaskApplicationDescriptorImpl(testApp, mock(Config.class));
+ assertEquals(appSpec.getApplicationTaskContextFactory(), Optional.of(factory));
+ }
+
+ @Test
+ public void testNoApplicationTaskContextFactory() {
+ TaskApplication testApp = appDesc -> {
+ };
+ TaskApplicationDescriptorImpl appSpec = new TaskApplicationDescriptorImpl(testApp, mock(Config.class));
+ assertEquals(appSpec.getApplicationTaskContextFactory(), Optional.empty());
}
@Test
http://git-wip-us.apache.org/repos/asf/samza/blob/9d2d49e9/samza-core/src/test/java/org/apache/samza/context/MockContext.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/context/MockContext.java b/samza-core/src/test/java/org/apache/samza/context/MockContext.java
new file mode 100644
index 0000000..778d486
--- /dev/null
+++ b/samza-core/src/test/java/org/apache/samza/context/MockContext.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.context;
+
+import org.apache.samza.config.Config;
+import org.apache.samza.config.MapConfig;
+
+import static org.mockito.Mockito.*;
+
+
+public class MockContext implements Context {
+ private final JobContext jobContext = mock(JobContext.class);
+ private final ContainerContext containerContext = mock(ContainerContext.class);
+ /**
+ * This is {@link TaskContextImpl} because some tests need more than just the interface.
+ */
+ private final TaskContextImpl taskContext = mock(TaskContextImpl.class);
+ private final ApplicationContainerContext applicationContainerContext = mock(ApplicationContainerContext.class);
+ private final ApplicationTaskContext applicationTaskContext = mock(ApplicationTaskContext.class);
+
+ public MockContext() {
+ this(new MapConfig());
+ }
+
+ /**
+ * @param config config is widely used, so help wire it in here
+ */
+ public MockContext(Config config) {
+ when(this.jobContext.getConfig()).thenReturn(config);
+ }
+
+ @Override
+ public JobContext getJobContext() {
+ return jobContext;
+ }
+
+ @Override
+ public ContainerContext getContainerContext() {
+ return containerContext;
+ }
+
+ @Override
+ public TaskContext getTaskContext() {
+ return taskContext;
+ }
+
+ @Override
+ public ApplicationContainerContext getApplicationContainerContext() {
+ return applicationContainerContext;
+ }
+
+ @Override
+ public ApplicationTaskContext getApplicationTaskContext() {
+ return applicationTaskContext;
+ }
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/9d2d49e9/samza-core/src/test/java/org/apache/samza/context/TestContextImpl.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/context/TestContextImpl.java b/samza-core/src/test/java/org/apache/samza/context/TestContextImpl.java
index 33ad3a5..40526db 100644
--- a/samza-core/src/test/java/org/apache/samza/context/TestContextImpl.java
+++ b/samza-core/src/test/java/org/apache/samza/context/TestContextImpl.java
@@ -18,9 +18,11 @@
*/
package org.apache.samza.context;
+import java.util.Optional;
import org.junit.Test;
import static org.junit.Assert.assertEquals;
+import static org.mockito.Mockito.mock;
public class TestContextImpl {
@@ -63,11 +65,17 @@ public class TestContextImpl {
}
private static Context buildWithApplicationContainerContext(ApplicationContainerContext applicationContainerContext) {
- return new ContextImpl(null, null, null, applicationContainerContext, null);
+ return buildWithApplicationContext(applicationContainerContext, mock(ApplicationTaskContext.class));
}
private static Context buildWithApplicationTaskContext(ApplicationTaskContext applicationTaskContext) {
- return new ContextImpl(null, null, null, null, applicationTaskContext);
+ return buildWithApplicationContext(mock(ApplicationContainerContext.class), applicationTaskContext);
+ }
+
+ private static Context buildWithApplicationContext(ApplicationContainerContext applicationContainerContext,
+ ApplicationTaskContext applicationTaskContext) {
+ return new ContextImpl(mock(JobContext.class), mock(ContainerContext.class), mock(TaskContext.class),
+ Optional.ofNullable(applicationContainerContext), Optional.ofNullable(applicationTaskContext));
}
/**
http://git-wip-us.apache.org/repos/asf/samza/blob/9d2d49e9/samza-core/src/test/java/org/apache/samza/context/TestTaskContextImpl.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/context/TestTaskContextImpl.java b/samza-core/src/test/java/org/apache/samza/context/TestTaskContextImpl.java
index 78f886c..3d3803b 100644
--- a/samza-core/src/test/java/org/apache/samza/context/TestTaskContextImpl.java
+++ b/samza-core/src/test/java/org/apache/samza/context/TestTaskContextImpl.java
@@ -34,6 +34,7 @@ import org.mockito.Mock;
import org.mockito.MockitoAnnotations;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
@@ -62,7 +63,7 @@ public class TestTaskContextImpl {
MockitoAnnotations.initMocks(this);
taskContext =
new TaskContextImpl(taskModel, taskMetricsRegistry, keyValueStoreProvider, tableManager, callbackScheduler,
- offsetManager);
+ offsetManager, null, null);
when(this.taskModel.getTaskName()).thenReturn(TASK_NAME);
}
@@ -95,4 +96,16 @@ public class TestTaskContextImpl {
taskContext.setStartingOffset(ssp, "123");
verify(offsetManager).setStartingOffset(TASK_NAME, ssp, "123");
}
+
+ /**
+ * Given a registered object, fetchObject should get it. If an object is not registered at a key, then fetchObject
+ * should return null.
+ */
+ @Test
+ public void testRegisterAndFetchObject() {
+ String value = "hello world";
+ taskContext.registerObject("key", value);
+ assertEquals(value, taskContext.fetchObject("key"));
+ assertNull(taskContext.fetchObject("not a key"));
+ }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/samza/blob/9d2d49e9/samza-core/src/test/java/org/apache/samza/execution/TestJobNodeConfigurationGenerator.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/execution/TestJobNodeConfigurationGenerator.java b/samza-core/src/test/java/org/apache/samza/execution/TestJobNodeConfigurationGenerator.java
index 51a9523..4618e52 100644
--- a/samza-core/src/test/java/org/apache/samza/execution/TestJobNodeConfigurationGenerator.java
+++ b/samza-core/src/test/java/org/apache/samza/execution/TestJobNodeConfigurationGenerator.java
@@ -19,14 +19,6 @@
package org.apache.samza.execution;
import com.google.common.base.Joiner;
-import java.util.ArrayList;
-import java.util.Base64;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.stream.Collectors;
import org.apache.samza.application.StreamApplicationDescriptorImpl;
import org.apache.samza.application.TaskApplicationDescriptorImpl;
import org.apache.samza.config.Config;
@@ -36,7 +28,7 @@ import org.apache.samza.config.MapConfig;
import org.apache.samza.config.SerializerConfig;
import org.apache.samza.config.TaskConfig;
import org.apache.samza.config.TaskConfigJava;
-import org.apache.samza.container.SamzaContainerContext;
+import org.apache.samza.context.Context;
import org.apache.samza.operators.BaseTableDescriptor;
import org.apache.samza.operators.KV;
import org.apache.samza.operators.TableDescriptor;
@@ -52,9 +44,17 @@ import org.apache.samza.table.Table;
import org.apache.samza.table.TableProvider;
import org.apache.samza.table.TableProviderFactory;
import org.apache.samza.table.TableSpec;
-import org.apache.samza.task.TaskContext;
import org.junit.Test;
+import java.util.ArrayList;
+import java.util.Base64;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.mock;
@@ -445,7 +445,7 @@ public class TestJobNodeConfigurationGenerator extends ExecutionPlannerTestBase
}
@Override
- public void init(SamzaContainerContext containerContext, TaskContext taskContext) {
+ public void init(Context context) {
}
http://git-wip-us.apache.org/repos/asf/samza/blob/9d2d49e9/samza-core/src/test/java/org/apache/samza/operators/TestJoinOperator.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/operators/TestJoinOperator.java b/samza-core/src/test/java/org/apache/samza/operators/TestJoinOperator.java
index 6fa9ed1..1315912 100644
--- a/samza-core/src/test/java/org/apache/samza/operators/TestJoinOperator.java
+++ b/samza-core/src/test/java/org/apache/samza/operators/TestJoinOperator.java
@@ -19,13 +19,14 @@
package org.apache.samza.operators;
import com.google.common.collect.ImmutableSet;
-
import org.apache.samza.Partition;
import org.apache.samza.SamzaException;
import org.apache.samza.application.StreamApplicationDescriptorImpl;
import org.apache.samza.config.Config;
import org.apache.samza.config.MapConfig;
-import org.apache.samza.container.TaskContextImpl;
+import org.apache.samza.context.Context;
+import org.apache.samza.context.MockContext;
+import org.apache.samza.job.model.TaskModel;
import org.apache.samza.metrics.MetricsRegistryMap;
import org.apache.samza.operators.descriptors.GenericInputDescriptor;
import org.apache.samza.operators.descriptors.GenericSystemDescriptor;
@@ -40,7 +41,6 @@ import org.apache.samza.system.SystemStream;
import org.apache.samza.system.SystemStreamPartition;
import org.apache.samza.task.MessageCollector;
import org.apache.samza.task.StreamOperatorTask;
-import org.apache.samza.task.TaskContext;
import org.apache.samza.task.TaskCoordinator;
import org.apache.samza.testUtils.StreamTestUtils;
import org.apache.samza.testUtils.TestClock;
@@ -56,10 +56,10 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
-import static org.junit.Assert.assertTrue;
import static org.junit.Assert.assertEquals;
-import static org.mockito.Matchers.eq;
+import static org.junit.Assert.assertTrue;
import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
@@ -304,22 +304,23 @@ public class TestJoinOperator {
mapConfig.put("job.id", "jobId");
StreamTestUtils.addStreamConfigs(mapConfig, "inStream", "insystem", "instream");
StreamTestUtils.addStreamConfigs(mapConfig, "inStream2", "insystem", "instream2");
- Config config = new MapConfig(mapConfig);
- TaskContextImpl taskContext = mock(TaskContextImpl.class);
- when(taskContext.getSystemStreamPartitions()).thenReturn(ImmutableSet
+ Context context = new MockContext(new MapConfig(mapConfig));
+ TaskModel taskModel = mock(TaskModel.class);
+ when(taskModel.getSystemStreamPartitions()).thenReturn(ImmutableSet
.of(new SystemStreamPartition("insystem", "instream", new Partition(0)),
new SystemStreamPartition("insystem", "instream2", new Partition(0))));
- when(taskContext.getMetricsRegistry()).thenReturn(new MetricsRegistryMap());
+ when(context.getTaskContext().getTaskModel()).thenReturn(taskModel);
+ when(context.getTaskContext().getTaskMetricsRegistry()).thenReturn(new MetricsRegistryMap());
// need to return different stores for left and right side
IntegerSerde integerSerde = new IntegerSerde();
TimestampedValueSerde timestampedValueSerde = new TimestampedValueSerde(new KVSerde(integerSerde, integerSerde));
- when(taskContext.getStore(eq("jobName-jobId-join-j1-L")))
+ when(context.getTaskContext().getStore(eq("jobName-jobId-join-j1-L")))
.thenReturn(new TestInMemoryStore(integerSerde, timestampedValueSerde));
- when(taskContext.getStore(eq("jobName-jobId-join-j1-R")))
+ when(context.getTaskContext().getStore(eq("jobName-jobId-join-j1-R")))
.thenReturn(new TestInMemoryStore(integerSerde, timestampedValueSerde));
- StreamOperatorTask sot = new StreamOperatorTask(graphSpec.getOperatorSpecGraph(), graphSpec.getContextManager(), clock);
- sot.init(config, taskContext);
+ StreamOperatorTask sot = new StreamOperatorTask(graphSpec.getOperatorSpecGraph(), clock);
+ sot.init(context);
return sot;
}
@@ -357,7 +358,7 @@ public class TestJoinOperator {
private int numCloseCalls = 0;
@Override
- public void init(Config config, TaskContext context) {
+ public void init(Context context) {
numInitCalls++;
}
http://git-wip-us.apache.org/repos/asf/samza/blob/9d2d49e9/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImpl.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImpl.java b/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImpl.java
index 6d12d99..0ff2e0d 100644
--- a/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImpl.java
+++ b/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImpl.java
@@ -21,9 +21,9 @@ package org.apache.samza.operators.impl;
import java.util.Collection;
import java.util.Collections;
import java.util.Set;
-
-import org.apache.samza.config.Config;
-import org.apache.samza.container.TaskContextImpl;
+import org.apache.samza.context.Context;
+import org.apache.samza.context.MockContext;
+import org.apache.samza.job.model.TaskModel;
import org.apache.samza.metrics.Counter;
import org.apache.samza.metrics.MetricsRegistryMap;
import org.apache.samza.metrics.ReadableMetricsRegistry;
@@ -32,8 +32,8 @@ import org.apache.samza.operators.functions.ScheduledFunction;
import org.apache.samza.operators.functions.WatermarkFunction;
import org.apache.samza.operators.spec.OperatorSpec;
import org.apache.samza.task.MessageCollector;
-import org.apache.samza.task.TaskContext;
import org.apache.samza.task.TaskCoordinator;
+import org.junit.Before;
import org.junit.Test;
import static org.mockito.Matchers.anyLong;
@@ -46,14 +46,20 @@ import static org.mockito.Mockito.when;
public class TestOperatorImpl {
+ private Context context;
+
+ @Before
+ public void setup() {
+ this.context = new MockContext();
+ when(this.context.getTaskContext().getTaskMetricsRegistry()).thenReturn(new MetricsRegistryMap());
+ when(this.context.getTaskContext().getTaskModel()).thenReturn(mock(TaskModel.class));
+ }
@Test(expected = IllegalStateException.class)
public void testMultipleInitShouldThrow() {
OperatorImpl<Object, Object> opImpl = new TestOpImpl(mock(Object.class));
- TaskContextImpl mockTaskContext = mock(TaskContextImpl.class);
- when(mockTaskContext.getMetricsRegistry()).thenReturn(new MetricsRegistryMap());
- opImpl.init(mock(Config.class), mockTaskContext);
- opImpl.init(mock(Config.class), mockTaskContext);
+ opImpl.init(this.context);
+ opImpl.init(this.context);
}
@Test(expected = IllegalStateException.class)
@@ -64,24 +70,21 @@ public class TestOperatorImpl {
@Test
public void testOnMessagePropagatesResults() {
- TaskContextImpl mockTaskContext = mock(TaskContextImpl.class);
- when(mockTaskContext.getMetricsRegistry()).thenReturn(new MetricsRegistryMap());
-
Object mockTestOpImplOutput = mock(Object.class);
OperatorImpl<Object, Object> opImpl = new TestOpImpl(mockTestOpImplOutput);
- opImpl.init(mock(Config.class), mockTaskContext);
+ opImpl.init(this.context);
// register a couple of operators
OperatorImpl mockNextOpImpl1 = mock(OperatorImpl.class);
when(mockNextOpImpl1.getOperatorSpec()).thenReturn(new TestOpSpec());
when(mockNextOpImpl1.handleMessage(anyObject(), anyObject(), anyObject())).thenReturn(Collections.emptyList());
- mockNextOpImpl1.init(mock(Config.class), mockTaskContext);
+ mockNextOpImpl1.init(this.context);
opImpl.registerNextOperator(mockNextOpImpl1);
OperatorImpl mockNextOpImpl2 = mock(OperatorImpl.class);
when(mockNextOpImpl2.getOperatorSpec()).thenReturn(new TestOpSpec());
when(mockNextOpImpl2.handleMessage(anyObject(), anyObject(), anyObject())).thenReturn(Collections.emptyList());
- mockNextOpImpl2.init(mock(Config.class), mockTaskContext);
+ mockNextOpImpl2.init(this.context);
opImpl.registerNextOperator(mockNextOpImpl2);
// send a message to this operator
@@ -96,9 +99,8 @@ public class TestOperatorImpl {
@Test
public void testOnMessageUpdatesMetrics() {
- TaskContextImpl mockTaskContext = mock(TaskContextImpl.class);
ReadableMetricsRegistry mockMetricsRegistry = mock(ReadableMetricsRegistry.class);
- when(mockTaskContext.getMetricsRegistry()).thenReturn(mockMetricsRegistry);
+ when(this.context.getTaskContext().getTaskMetricsRegistry()).thenReturn(mockMetricsRegistry);
Counter mockCounter = mock(Counter.class);
Timer mockTimer = mock(Timer.class);
when(mockMetricsRegistry.newCounter(anyString(), anyString())).thenReturn(mockCounter);
@@ -106,7 +108,7 @@ public class TestOperatorImpl {
Object mockTestOpImplOutput = mock(Object.class);
OperatorImpl<Object, Object> opImpl = new TestOpImpl(mockTestOpImplOutput);
- opImpl.init(mock(Config.class), mockTaskContext);
+ opImpl.init(this.context);
// send a message to this operator
MessageCollector mockCollector = mock(MessageCollector.class);
@@ -120,24 +122,21 @@ public class TestOperatorImpl {
@Test
public void testOnTimerPropagatesResultsAndTimer() {
- TaskContextImpl mockTaskContext = mock(TaskContextImpl.class);
- when(mockTaskContext.getMetricsRegistry()).thenReturn(new MetricsRegistryMap());
-
Object mockTestOpImplOutput = mock(Object.class);
OperatorImpl<Object, Object> opImpl = new TestOpImpl(mockTestOpImplOutput);
- opImpl.init(mock(Config.class), mockTaskContext);
+ opImpl.init(this.context);
// register a couple of operators
OperatorImpl mockNextOpImpl1 = mock(OperatorImpl.class);
when(mockNextOpImpl1.getOperatorSpec()).thenReturn(new TestOpSpec());
when(mockNextOpImpl1.handleMessage(anyObject(), anyObject(), anyObject())).thenReturn(Collections.emptyList());
- mockNextOpImpl1.init(mock(Config.class), mockTaskContext);
+ mockNextOpImpl1.init(this.context);
opImpl.registerNextOperator(mockNextOpImpl1);
OperatorImpl mockNextOpImpl2 = mock(OperatorImpl.class);
when(mockNextOpImpl2.getOperatorSpec()).thenReturn(new TestOpSpec());
when(mockNextOpImpl2.handleMessage(anyObject(), anyObject(), anyObject())).thenReturn(Collections.emptyList());
- mockNextOpImpl2.init(mock(Config.class), mockTaskContext);
+ mockNextOpImpl2.init(this.context);
opImpl.registerNextOperator(mockNextOpImpl2);
// send a timer tick to this operator
@@ -156,9 +155,8 @@ public class TestOperatorImpl {
@Test
public void testOnTimerUpdatesMetrics() {
- TaskContextImpl mockTaskContext = mock(TaskContextImpl.class);
ReadableMetricsRegistry mockMetricsRegistry = mock(ReadableMetricsRegistry.class);
- when(mockTaskContext.getMetricsRegistry()).thenReturn(mockMetricsRegistry);
+ when(this.context.getTaskContext().getTaskMetricsRegistry()).thenReturn(mockMetricsRegistry);
Counter mockMessageCounter = mock(Counter.class);
Timer mockTimer = mock(Timer.class);
when(mockMetricsRegistry.newCounter(anyString(), anyString())).thenReturn(mockMessageCounter);
@@ -166,7 +164,7 @@ public class TestOperatorImpl {
Object mockTestOpImplOutput = mock(Object.class);
OperatorImpl<Object, Object> opImpl = new TestOpImpl(mockTestOpImplOutput);
- opImpl.init(mock(Config.class), mockTaskContext);
+ opImpl.init(this.context);
// send a message to this operator
MessageCollector mockCollector = mock(MessageCollector.class);
@@ -188,7 +186,7 @@ public class TestOperatorImpl {
}
@Override
- protected void handleInit(Config config, TaskContext context) {}
+ protected void handleInit(Context context) {}
@Override
public Collection<Object> handleMessage(Object message,
http://git-wip-us.apache.org/repos/asf/samza/blob/9d2d49e9/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImplGraph.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImplGraph.java b/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImplGraph.java
index 3abd502..d760805 100644
--- a/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImplGraph.java
+++ b/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImplGraph.java
@@ -21,27 +21,16 @@ package org.apache.samza.operators.impl;
import com.google.common.collect.HashMultimap;
import com.google.common.collect.Multimap;
-import java.io.Serializable;
-import java.time.Duration;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.function.BiFunction;
-import java.util.function.Function;
import org.apache.samza.Partition;
import org.apache.samza.application.StreamApplicationDescriptorImpl;
import org.apache.samza.config.Config;
import org.apache.samza.config.JobConfig;
import org.apache.samza.config.MapConfig;
import org.apache.samza.config.StreamConfig;
-import org.apache.samza.container.SamzaContainerContext;
-import org.apache.samza.container.TaskContextImpl;
import org.apache.samza.container.TaskName;
+import org.apache.samza.context.Context;
+import org.apache.samza.context.MockContext;
+import org.apache.samza.context.TaskContextImpl;
import org.apache.samza.job.model.ContainerModel;
import org.apache.samza.job.model.JobModel;
import org.apache.samza.job.model.TaskModel;
@@ -67,15 +56,28 @@ import org.apache.samza.system.IncomingMessageEnvelope;
import org.apache.samza.system.SystemStream;
import org.apache.samza.system.SystemStreamPartition;
import org.apache.samza.task.MessageCollector;
-import org.apache.samza.task.TaskContext;
import org.apache.samza.task.TaskCoordinator;
import org.apache.samza.testUtils.StreamTestUtils;
import org.apache.samza.util.Clock;
import org.apache.samza.util.SystemClock;
import org.apache.samza.util.TimestampedValue;
import org.junit.After;
+import org.junit.Before;
import org.junit.Test;
+import java.io.Serializable;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.function.BiFunction;
+import java.util.function.Function;
+
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotSame;
import static org.junit.Assert.assertTrue;
@@ -84,7 +86,6 @@ import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
public class TestOperatorImplGraph {
-
private void addOperatorRecursively(HashSet<OperatorImpl> s, OperatorImpl op) {
List<OperatorImpl> operators = new ArrayList<>();
operators.add(op);
@@ -193,25 +194,39 @@ public class TestOperatorImplGraph {
}
@Override
- public void init(Config config, TaskContext context) {
- if (perTaskFunctionMap.get(context.getTaskName()) == null) {
- perTaskFunctionMap.put(context.getTaskName(), new HashMap<String, BaseTestFunction>() { { this.put(opId, BaseTestFunction.this); } });
+ public void init(Context context) {
+ TaskName taskName = context.getTaskContext().getTaskModel().getTaskName();
+ if (perTaskFunctionMap.get(taskName) == null) {
+ perTaskFunctionMap.put(taskName, new HashMap<String, BaseTestFunction>() { { this.put(opId, BaseTestFunction.this); } });
} else {
- if (perTaskFunctionMap.get(context.getTaskName()).containsKey(opId)) {
+ if (perTaskFunctionMap.get(taskName).containsKey(opId)) {
throw new IllegalStateException(String.format("Multiple init called for op %s in the same task instance %s", opId, this.taskName.getTaskName()));
}
- perTaskFunctionMap.get(context.getTaskName()).put(opId, this);
+ perTaskFunctionMap.get(taskName).put(opId, this);
}
- if (perTaskInitList.get(context.getTaskName()) == null) {
- perTaskInitList.put(context.getTaskName(), new ArrayList<String>() { { this.add(opId); } });
+ if (perTaskInitList.get(taskName) == null) {
+ perTaskInitList.put(taskName, new ArrayList<String>() { { this.add(opId); } });
} else {
- perTaskInitList.get(context.getTaskName()).add(opId);
+ perTaskInitList.get(taskName).add(opId);
}
- this.taskName = context.getTaskName();
+ this.taskName = taskName;
this.numInitCalled++;
}
}
+ private Context context;
+
+ @Before
+ public void setup() {
+ this.context = new MockContext();
+ // individual tests can override this config if necessary
+ when(this.context.getJobContext().getConfig()).thenReturn(mock(Config.class));
+ TaskModel taskModel = mock(TaskModel.class);
+ when(taskModel.getTaskName()).thenReturn(new TaskName("task 0"));
+ when(this.context.getTaskContext().getTaskModel()).thenReturn(taskModel);
+ when(this.context.getTaskContext().getTaskMetricsRegistry()).thenReturn(new MetricsRegistryMap());
+ }
+
@After
public void tearDown() {
BaseTestFunction.reset();
@@ -220,8 +235,7 @@ public class TestOperatorImplGraph {
@Test
public void testEmptyChain() {
StreamApplicationDescriptorImpl graphSpec = new StreamApplicationDescriptorImpl(appDesc -> { }, mock(Config.class));
- OperatorImplGraph opGraph =
- new OperatorImplGraph(graphSpec.getOperatorSpecGraph(), mock(Config.class), mock(TaskContextImpl.class), mock(Clock.class));
+ OperatorImplGraph opGraph = new OperatorImplGraph(graphSpec.getOperatorSpecGraph(), context, mock(Clock.class));
assertEquals(0, opGraph.getAllInputOperators().size());
}
@@ -242,6 +256,7 @@ public class TestOperatorImplGraph {
StreamTestUtils.addStreamConfigs(configs, inputStreamId, inputSystem, inputPhysicalName);
StreamTestUtils.addStreamConfigs(configs, outputStreamId, outputSystem, outputPhysicalName);
Config config = new MapConfig(configs);
+ when(this.context.getJobContext().getConfig()).thenReturn(config);
StreamApplicationDescriptorImpl graphSpec = new StreamApplicationDescriptorImpl(appDesc -> {
GenericSystemDescriptor sd = new GenericSystemDescriptor(inputSystem, "mockFactoryClass");
@@ -256,11 +271,8 @@ public class TestOperatorImplGraph {
.sendTo(outputStream);
}, config);
- TaskContextImpl mockTaskContext = mock(TaskContextImpl.class);
- when(mockTaskContext.getMetricsRegistry()).thenReturn(new MetricsRegistryMap());
- when(mockTaskContext.getTaskName()).thenReturn(new TaskName("task 0"));
OperatorImplGraph opImplGraph =
- new OperatorImplGraph(graphSpec.getOperatorSpecGraph(), config, mockTaskContext, mock(Clock.class));
+ new OperatorImplGraph(graphSpec.getOperatorSpecGraph(), this.context, mock(Clock.class));
InputOperatorImpl inputOpImpl = opImplGraph.getInputOperator(new SystemStream(inputSystem, inputPhysicalName));
assertEquals(1, inputOpImpl.registeredOperators.size());
@@ -296,6 +308,7 @@ public class TestOperatorImplGraph {
StreamTestUtils.addStreamConfigs(configs, inputStreamId, inputSystem, inputPhysicalName);
StreamTestUtils.addStreamConfigs(configs, outputStreamId, outputSystem, outputPhysicalName);
Config config = new MapConfig(configs);
+ when(this.context.getJobContext().getConfig()).thenReturn(config);
StreamApplicationDescriptorImpl graphSpec = new StreamApplicationDescriptorImpl(appDesc -> {
GenericSystemDescriptor isd = new GenericSystemDescriptor(inputSystem, "mockFactoryClass");
@@ -312,21 +325,15 @@ public class TestOperatorImplGraph {
.sendTo(outputStream);
}, config);
- TaskContextImpl mockTaskContext = mock(TaskContextImpl.class);
- when(mockTaskContext.getMetricsRegistry()).thenReturn(new MetricsRegistryMap());
- when(mockTaskContext.getTaskName()).thenReturn(new TaskName("task 0"));
JobModel jobModel = mock(JobModel.class);
ContainerModel containerModel = mock(ContainerModel.class);
TaskModel taskModel = mock(TaskModel.class);
when(jobModel.getContainers()).thenReturn(Collections.singletonMap("0", containerModel));
when(containerModel.getTasks()).thenReturn(Collections.singletonMap(new TaskName("task 0"), taskModel));
when(taskModel.getSystemStreamPartitions()).thenReturn(Collections.emptySet());
- when(mockTaskContext.getJobModel()).thenReturn(jobModel);
- SamzaContainerContext containerContext =
- new SamzaContainerContext("0", config, Collections.singleton(new TaskName("task 0")), new MetricsRegistryMap());
- when(mockTaskContext.getSamzaContainerContext()).thenReturn(containerContext);
+ when(((TaskContextImpl) this.context.getTaskContext()).getJobModel()).thenReturn(jobModel);
OperatorImplGraph opImplGraph =
- new OperatorImplGraph(graphSpec.getOperatorSpecGraph(), config, mockTaskContext, mock(Clock.class));
+ new OperatorImplGraph(graphSpec.getOperatorSpecGraph(), this.context, mock(Clock.class));
InputOperatorImpl inputOpImpl = opImplGraph.getInputOperator(new SystemStream(inputSystem, inputPhysicalName));
assertEquals(1, inputOpImpl.registeredOperators.size());
@@ -352,6 +359,7 @@ public class TestOperatorImplGraph {
HashMap<String, String> configMap = new HashMap<>();
StreamTestUtils.addStreamConfigs(configMap, inputStreamId, inputSystem, inputPhysicalName);
Config config = new MapConfig(configMap);
+ when(this.context.getJobContext().getConfig()).thenReturn(config);
StreamApplicationDescriptorImpl graphSpec = new StreamApplicationDescriptorImpl(appDesc -> {
GenericSystemDescriptor sd = new GenericSystemDescriptor(inputSystem, "mockFactoryClass");
GenericInputDescriptor inputDescriptor = sd.getInputDescriptor(inputStreamId, mock(Serde.class));
@@ -360,10 +368,8 @@ public class TestOperatorImplGraph {
inputStream.map(mock(MapFunction.class));
}, config);
- TaskContextImpl mockTaskContext = mock(TaskContextImpl.class);
- when(mockTaskContext.getMetricsRegistry()).thenReturn(new MetricsRegistryMap());
OperatorImplGraph opImplGraph =
- new OperatorImplGraph(graphSpec.getOperatorSpecGraph(), config, mockTaskContext, mock(Clock.class));
+ new OperatorImplGraph(graphSpec.getOperatorSpecGraph(), this.context, mock(Clock.class));
InputOperatorImpl inputOpImpl = opImplGraph.getInputOperator(new SystemStream(inputSystem, inputPhysicalName));
assertEquals(2, inputOpImpl.registeredOperators.size());
@@ -377,10 +383,6 @@ public class TestOperatorImplGraph {
public void testMergeChain() {
String inputStreamId = "input";
String inputSystem = "input-system";
- String inputPhysicalName = "input-stream";
- HashMap<String, String> configs = new HashMap<>();
- StreamTestUtils.addStreamConfigs(configs, inputStreamId, inputSystem, inputPhysicalName);
- Config config = new MapConfig(configs);
StreamApplicationDescriptorImpl graphSpec = new StreamApplicationDescriptorImpl(appDesc -> {
GenericSystemDescriptor sd = new GenericSystemDescriptor(inputSystem, "mockFactoryClass");
GenericInputDescriptor inputDescriptor = sd.getInputDescriptor(inputStreamId, mock(Serde.class));
@@ -390,13 +392,14 @@ public class TestOperatorImplGraph {
stream1.merge(Collections.singleton(stream2))
.map(new TestMapFunction<Object, Object>("test-map-1", (Function & Serializable) m -> m));
}, mock(Config.class));
- TaskContextImpl mockTaskContext = mock(TaskContextImpl.class);
+
TaskName mockTaskName = mock(TaskName.class);
- when(mockTaskContext.getTaskName()).thenReturn(mockTaskName);
- when(mockTaskContext.getMetricsRegistry()).thenReturn(new MetricsRegistryMap());
+ TaskModel taskModel = mock(TaskModel.class);
+ when(taskModel.getTaskName()).thenReturn(mockTaskName);
+ when(this.context.getTaskContext().getTaskModel()).thenReturn(taskModel);
OperatorImplGraph opImplGraph =
- new OperatorImplGraph(graphSpec.getOperatorSpecGraph(), mock(Config.class), mockTaskContext, mock(Clock.class));
+ new OperatorImplGraph(graphSpec.getOperatorSpecGraph(), this.context, mock(Clock.class));
Set<OperatorImpl> opSet = opImplGraph.getAllInputOperators().stream().collect(HashSet::new,
(s, op) -> addOperatorRecursively(s, op), HashSet::addAll);
@@ -423,6 +426,7 @@ public class TestOperatorImplGraph {
StreamTestUtils.addStreamConfigs(configs, inputStreamId1, inputSystem, inputPhysicalName1);
StreamTestUtils.addStreamConfigs(configs, inputStreamId2, inputSystem, inputPhysicalName2);
Config config = new MapConfig(configs);
+ when(this.context.getJobContext().getConfig()).thenReturn(config);
Integer joinKey = new Integer(1);
Function<Object, Integer> keyFn = (Function & Serializable) m -> joinKey;
@@ -441,15 +445,16 @@ public class TestOperatorImplGraph {
}, config);
TaskName mockTaskName = mock(TaskName.class);
- TaskContextImpl mockTaskContext = mock(TaskContextImpl.class);
- when(mockTaskContext.getTaskName()).thenReturn(mockTaskName);
- when(mockTaskContext.getMetricsRegistry()).thenReturn(new MetricsRegistryMap());
+ TaskModel taskModel = mock(TaskModel.class);
+ when(taskModel.getTaskName()).thenReturn(mockTaskName);
+ when(this.context.getTaskContext().getTaskModel()).thenReturn(taskModel);
+
KeyValueStore mockLeftStore = mock(KeyValueStore.class);
- when(mockTaskContext.getStore(eq("jobName-jobId-join-j1-L"))).thenReturn(mockLeftStore);
+ when(this.context.getTaskContext().getStore(eq("jobName-jobId-join-j1-L"))).thenReturn(mockLeftStore);
KeyValueStore mockRightStore = mock(KeyValueStore.class);
- when(mockTaskContext.getStore(eq("jobName-jobId-join-j1-R"))).thenReturn(mockRightStore);
+ when(this.context.getTaskContext().getStore(eq("jobName-jobId-join-j1-R"))).thenReturn(mockRightStore);
OperatorImplGraph opImplGraph =
- new OperatorImplGraph(graphSpec.getOperatorSpecGraph(), config, mockTaskContext, mock(Clock.class));
+ new OperatorImplGraph(graphSpec.getOperatorSpecGraph(), this.context, mock(Clock.class));
// verify that join function is initialized once.
assertEquals(TestJoinFunction.getInstanceByTaskName(mockTaskName, "jobName-jobId-join-j1").numInitCalled, 1);
@@ -491,10 +496,12 @@ public class TestOperatorImplGraph {
String inputStreamId2 = "input2";
String inputSystem = "input-system";
Config mockConfig = mock(Config.class);
+
TaskName mockTaskName = mock(TaskName.class);
- TaskContextImpl mockContext = mock(TaskContextImpl.class);
- when(mockContext.getTaskName()).thenReturn(mockTaskName);
- when(mockContext.getMetricsRegistry()).thenReturn(new MetricsRegistryMap());
+ TaskModel taskModel = mock(TaskModel.class);
+ when(taskModel.getTaskName()).thenReturn(mockTaskName);
+ when(this.context.getTaskContext().getTaskModel()).thenReturn(taskModel);
+
StreamApplicationDescriptorImpl graphSpec = new StreamApplicationDescriptorImpl(appDesc -> {
GenericSystemDescriptor sd = new GenericSystemDescriptor(inputSystem, "mockFactoryClass");
GenericInputDescriptor inputDescriptor1 = sd.getInputDescriptor(inputStreamId1, mock(Serde.class));
@@ -510,7 +517,7 @@ public class TestOperatorImplGraph {
.map(new TestMapFunction<Object, Object>("4", mapFn));
}, mockConfig);
- OperatorImplGraph opImplGraph = new OperatorImplGraph(graphSpec.getOperatorSpecGraph(), mockConfig, mockContext, SystemClock.instance());
+ OperatorImplGraph opImplGraph = new OperatorImplGraph(graphSpec.getOperatorSpecGraph(), this.context, SystemClock.instance());
List<String> initializedOperators = BaseTestFunction.getInitListByTaskName(mockTaskName);
@@ -541,6 +548,7 @@ public class TestOperatorImplGraph {
StreamTestUtils.addStreamConfigs(configs, streamId0, system, streamId0);
StreamTestUtils.addStreamConfigs(configs, streamId1, system, streamId1);
Config config = new MapConfig(configs);
+ when(this.context.getJobContext().getConfig()).thenReturn(config);
SystemStreamPartition ssp0 = new SystemStreamPartition(system, streamId0, new Partition(0));
SystemStreamPartition ssp1 = new SystemStreamPartition(system, streamId0, new Partition(1));
@@ -590,6 +598,7 @@ public class TestOperatorImplGraph {
StreamTestUtils.addStreamConfigs(configs, outputStreamId1, outputSystem, outputStreamId1);
StreamTestUtils.addStreamConfigs(configs, outputStreamId2, outputSystem, outputStreamId2);
Config config = new MapConfig(configs);
+ when(this.context.getJobContext().getConfig()).thenReturn(config);
StreamApplicationDescriptorImpl graphSpec = new StreamApplicationDescriptorImpl(appDesc -> {
GenericSystemDescriptor isd = new GenericSystemDescriptor(inputSystem, "mockFactoryClass");
@@ -640,14 +649,6 @@ public class TestOperatorImplGraph {
String inputSystem1 = "system1";
String inputSystem2 = "system2";
- HashMap<String, String> configs = new HashMap<>();
- configs.put(JobConfig.JOB_NAME(), "test-app");
- configs.put(JobConfig.JOB_DEFAULT_SYSTEM(), inputSystem1);
- StreamTestUtils.addStreamConfigs(configs, inputStreamId1, inputSystem1, inputStreamId1);
- StreamTestUtils.addStreamConfigs(configs, inputStreamId2, inputSystem2, inputStreamId2);
- StreamTestUtils.addStreamConfigs(configs, inputStreamId3, inputSystem2, inputStreamId3);
- Config config = new MapConfig(configs);
-
SystemStream input1 = new SystemStream("system1", "intput1");
SystemStream input2 = new SystemStream("system2", "intput2");
SystemStream input3 = new SystemStream("system2", "intput3");
http://git-wip-us.apache.org/repos/asf/samza/blob/9d2d49e9/samza-core/src/test/java/org/apache/samza/operators/impl/TestSinkOperatorImpl.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/operators/impl/TestSinkOperatorImpl.java b/samza-core/src/test/java/org/apache/samza/operators/impl/TestSinkOperatorImpl.java
index dc94e36..dfd8657 100644
--- a/samza-core/src/test/java/org/apache/samza/operators/impl/TestSinkOperatorImpl.java
+++ b/samza-core/src/test/java/org/apache/samza/operators/impl/TestSinkOperatorImpl.java
@@ -18,12 +18,10 @@
*/
package org.apache.samza.operators.impl;
-import org.apache.samza.config.Config;
import org.apache.samza.operators.data.TestOutputMessageEnvelope;
import org.apache.samza.operators.functions.SinkFunction;
import org.apache.samza.operators.spec.SinkOperatorSpec;
import org.apache.samza.task.MessageCollector;
-import org.apache.samza.task.TaskContext;
import org.apache.samza.task.TaskCoordinator;
import org.junit.Test;
@@ -69,9 +67,6 @@ public class TestSinkOperatorImpl {
private SinkOperatorImpl createSinkOperator(SinkFunction<TestOutputMessageEnvelope> sinkFn) {
SinkOperatorSpec<TestOutputMessageEnvelope> sinkOp = mock(SinkOperatorSpec.class);
when(sinkOp.getSinkFn()).thenReturn(sinkFn);
-
- Config mockConfig = mock(Config.class);
- TaskContext mockContext = mock(TaskContext.class);
- return new SinkOperatorImpl<>(sinkOp, mockConfig, mockContext);
+ return new SinkOperatorImpl<>(sinkOp);
}
}
http://git-wip-us.apache.org/repos/asf/samza/blob/9d2d49e9/samza-core/src/test/java/org/apache/samza/operators/impl/TestStreamOperatorImpl.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/operators/impl/TestStreamOperatorImpl.java b/samza-core/src/test/java/org/apache/samza/operators/impl/TestStreamOperatorImpl.java
index 873cd3c..ae05305 100644
--- a/samza-core/src/test/java/org/apache/samza/operators/impl/TestStreamOperatorImpl.java
+++ b/samza-core/src/test/java/org/apache/samza/operators/impl/TestStreamOperatorImpl.java
@@ -18,13 +18,11 @@
*/
package org.apache.samza.operators.impl;
-import org.apache.samza.config.Config;
import org.apache.samza.operators.data.TestMessageEnvelope;
import org.apache.samza.operators.data.TestOutputMessageEnvelope;
import org.apache.samza.operators.functions.FlatMapFunction;
import org.apache.samza.operators.spec.StreamOperatorSpec;
import org.apache.samza.task.MessageCollector;
-import org.apache.samza.task.TaskContext;
import org.apache.samza.task.TaskCoordinator;
import org.junit.Test;
@@ -45,8 +43,6 @@ public class TestStreamOperatorImpl {
StreamOperatorSpec<TestMessageEnvelope, TestOutputMessageEnvelope> mockOp = mock(StreamOperatorSpec.class);
FlatMapFunction<TestMessageEnvelope, TestOutputMessageEnvelope> txfmFn = mock(FlatMapFunction.class);
when(mockOp.getTransformFn()).thenReturn(txfmFn);
- Config mockConfig = mock(Config.class);
- TaskContext mockContext = mock(TaskContext.class);
StreamOperatorImpl<TestMessageEnvelope, TestOutputMessageEnvelope> opImpl =
new StreamOperatorImpl<>(mockOp);
TestMessageEnvelope inMsg = mock(TestMessageEnvelope.class);
@@ -65,8 +61,6 @@ public class TestStreamOperatorImpl {
StreamOperatorSpec<TestMessageEnvelope, TestOutputMessageEnvelope> mockOp = mock(StreamOperatorSpec.class);
FlatMapFunction<TestMessageEnvelope, TestOutputMessageEnvelope> txfmFn = mock(FlatMapFunction.class);
when(mockOp.getTransformFn()).thenReturn(txfmFn);
- Config mockConfig = mock(Config.class);
- TaskContext mockContext = mock(TaskContext.class);
StreamOperatorImpl<TestMessageEnvelope, TestOutputMessageEnvelope> opImpl =
new StreamOperatorImpl<>(mockOp);
http://git-wip-us.apache.org/repos/asf/samza/blob/9d2d49e9/samza-core/src/test/java/org/apache/samza/operators/impl/TestStreamTableJoinOperatorImpl.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/operators/impl/TestStreamTableJoinOperatorImpl.java b/samza-core/src/test/java/org/apache/samza/operators/impl/TestStreamTableJoinOperatorImpl.java
index d8b2e8d..9083495 100644
--- a/samza-core/src/test/java/org/apache/samza/operators/impl/TestStreamTableJoinOperatorImpl.java
+++ b/samza-core/src/test/java/org/apache/samza/operators/impl/TestStreamTableJoinOperatorImpl.java
@@ -18,10 +18,10 @@
*/
package org.apache.samza.operators.impl;
-import java.util.Collection;
-
+import junit.framework.Assert;
import org.apache.samza.SamzaException;
-import org.apache.samza.config.Config;
+import org.apache.samza.context.Context;
+import org.apache.samza.context.MockContext;
import org.apache.samza.operators.KV;
import org.apache.samza.operators.data.TestMessageEnvelope;
import org.apache.samza.operators.functions.StreamTableJoinFunction;
@@ -29,11 +29,10 @@ import org.apache.samza.operators.spec.StreamTableJoinOperatorSpec;
import org.apache.samza.table.ReadableTable;
import org.apache.samza.table.TableSpec;
import org.apache.samza.task.MessageCollector;
-import org.apache.samza.task.TaskContext;
import org.apache.samza.task.TaskCoordinator;
import org.junit.Test;
-import junit.framework.Assert;
+import java.util.Collection;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
@@ -75,18 +74,16 @@ public class TestStreamTableJoinOperatorImpl {
return record.getKey();
}
});
- Config config = mock(Config.class);
ReadableTable table = mock(ReadableTable.class);
when(table.get("1")).thenReturn("r1");
when(table.get("2")).thenReturn(null);
- TaskContext mockTaskContext = mock(TaskContext.class);
- when(mockTaskContext.getTable(tableId)).thenReturn(table);
+ Context context = new MockContext();
+ when(context.getTaskContext().getTable(tableId)).thenReturn(table);
MessageCollector mockMessageCollector = mock(MessageCollector.class);
TaskCoordinator mockTaskCoordinator = mock(TaskCoordinator.class);
- StreamTableJoinOperatorImpl streamTableJoinOperator = new StreamTableJoinOperatorImpl(
- mockJoinOpSpec, config, mockTaskContext);
+ StreamTableJoinOperatorImpl streamTableJoinOperator = new StreamTableJoinOperatorImpl(mockJoinOpSpec, context);
// Table has the key
Collection<TestMessageEnvelope> result;
http://git-wip-us.apache.org/repos/asf/samza/blob/9d2d49e9/samza-core/src/test/java/org/apache/samza/operators/impl/TestWindowOperator.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/operators/impl/TestWindowOperator.java b/samza-core/src/test/java/org/apache/samza/operators/impl/TestWindowOperator.java
index 7d468c9..20d5e25 100644
--- a/samza-core/src/test/java/org/apache/samza/operators/impl/TestWindowOperator.java
+++ b/samza-core/src/test/java/org/apache/samza/operators/impl/TestWindowOperator.java
@@ -30,13 +30,15 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.samza.Partition;
-import org.apache.samza.application.StreamApplicationDescriptorImpl;
import org.apache.samza.application.StreamApplication;
+import org.apache.samza.application.StreamApplicationDescriptorImpl;
import org.apache.samza.config.Config;
-import org.apache.samza.config.JobConfig;
import org.apache.samza.config.MapConfig;
-import org.apache.samza.container.TaskContextImpl;
import org.apache.samza.container.TaskName;
+import org.apache.samza.context.Context;
+import org.apache.samza.context.MockContext;
+import org.apache.samza.context.TaskContextImpl;
+import org.apache.samza.job.model.TaskModel;
import org.apache.samza.metrics.MetricsRegistryMap;
import org.apache.samza.operators.KV;
import org.apache.samza.operators.MessageStream;
@@ -67,8 +69,6 @@ import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
-import static org.mockito.Matchers.anyString;
-import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
@@ -77,41 +77,41 @@ import static org.mockito.Mockito.when;
public class TestWindowOperator {
private final TaskCoordinator taskCoordinator = mock(TaskCoordinator.class);
private final List<Integer> integers = ImmutableList.of(1, 2, 1, 2, 1, 2, 1, 2, 3);
+ private Context context;
private Config config;
- private TaskContextImpl taskContext;
@Before
- public void setup() throws Exception {
- config = mock(Config.class);
- when(config.get(JobConfig.JOB_NAME())).thenReturn("jobName");
- when(config.get(eq(JobConfig.JOB_ID()), anyString())).thenReturn("jobId");
- taskContext = mock(TaskContextImpl.class);
+ public void setup() {
+ Map<String, String> configMap = new HashMap<>();
+ configMap.put("job.default.system", "kafka");
+ configMap.put("job.name", "jobName");
+ configMap.put("job.id", "jobId");
+ this.config = new MapConfig(configMap);
+
+ this.context = new MockContext();
+ when(this.context.getJobContext().getConfig()).thenReturn(this.config);
Serde storeKeySerde = new TimeSeriesKeySerde(new IntegerSerde());
Serde storeValSerde = KVSerde.of(new IntegerSerde(), new IntegerSerde());
- when(taskContext.getSystemStreamPartitions()).thenReturn(ImmutableSet
+ TaskModel taskModel = mock(TaskModel.class);
+ when(taskModel.getSystemStreamPartitions()).thenReturn(ImmutableSet
.of(new SystemStreamPartition("kafka", "integTestExecutionPlannerers", new Partition(0))));
- when(taskContext.getMetricsRegistry()).thenReturn(new MetricsRegistryMap());
- when(taskContext.getStore("jobName-jobId-window-w1"))
+ when(taskModel.getTaskName()).thenReturn(new TaskName("task 1"));
+ when(this.context.getTaskContext().getTaskModel()).thenReturn(taskModel);
+ when(this.context.getTaskContext().getTaskMetricsRegistry()).thenReturn(new MetricsRegistryMap());
+ when(this.context.getTaskContext().getStore("jobName-jobId-window-w1"))
.thenReturn(new TestInMemoryStore<>(storeKeySerde, storeValSerde));
-
- Map<String, String> mapConfig = new HashMap<>();
- mapConfig.put("job.default.system", "kafka");
- mapConfig.put("job.name", "jobName");
- mapConfig.put("job.id", "jobId");
- config = new MapConfig(mapConfig);
}
@Test
public void testTumblingWindowsDiscardingMode() throws Exception {
-
OperatorSpecGraph sgb = this.getKeyedTumblingWindowStreamGraph(AccumulationMode.DISCARDING,
Duration.ofSeconds(1), Triggers.repeat(Triggers.count(2))).getOperatorSpecGraph();
List<WindowPane<Integer, Collection<IntegerEnvelope>>> windowPanes = new ArrayList<>();
TestClock testClock = new TestClock();
- StreamOperatorTask task = new StreamOperatorTask(sgb, null, testClock);
- task.init(config, taskContext);
+ StreamOperatorTask task = new StreamOperatorTask(sgb, testClock);
+ task.init(this.context);
MessageCollector messageCollector =
envelope -> windowPanes.add((WindowPane<Integer, Collection<IntegerEnvelope>>) envelope.getMessage());
integers.forEach(n -> task.process(new IntegerEnvelope(n), messageCollector, taskCoordinator));
@@ -143,8 +143,8 @@ public class TestWindowOperator {
List<WindowPane<Integer, Collection<IntegerEnvelope>>> windowPanes = new ArrayList<>();
TestClock testClock = new TestClock();
- StreamOperatorTask task = new StreamOperatorTask(sgb, null, testClock);
- task.init(config, taskContext);
+ StreamOperatorTask task = new StreamOperatorTask(sgb, testClock);
+ task.init(this.context);
MessageCollector messageCollector =
envelope -> windowPanes.add((WindowPane<Integer, Collection<IntegerEnvelope>>) envelope.getMessage());
@@ -163,8 +163,7 @@ public class TestWindowOperator {
@Test
public void testTumblingAggregatingWindowsDiscardingMode() throws Exception {
-
- when(taskContext.getStore("jobName-jobId-window-w1"))
+ when(this.context.getTaskContext().getStore("jobName-jobId-window-w1"))
.thenReturn(new TestInMemoryStore<>(new TimeSeriesKeySerde(new IntegerSerde()), new IntegerSerde()));
OperatorSpecGraph sgb = this.getAggregateTumblingWindowStreamGraph(AccumulationMode.DISCARDING,
@@ -172,8 +171,8 @@ public class TestWindowOperator {
List<WindowPane<Integer, Integer>> windowPanes = new ArrayList<>();
TestClock testClock = new TestClock();
- StreamOperatorTask task = new StreamOperatorTask(sgb, null, testClock);
- task.init(config, taskContext);
+ StreamOperatorTask task = new StreamOperatorTask(sgb, testClock);
+ task.init(this.context);
MessageCollector messageCollector = envelope -> windowPanes.add((WindowPane<Integer, Integer>) envelope.getMessage());
integers.forEach(n -> task.process(new IntegerEnvelope(n), messageCollector, taskCoordinator));
testClock.advanceTime(Duration.ofSeconds(1));
@@ -193,8 +192,8 @@ public class TestWindowOperator {
Duration.ofSeconds(1), Triggers.repeat(Triggers.count(2))).getOperatorSpecGraph();
List<WindowPane<Integer, Collection<IntegerEnvelope>>> windowPanes = new ArrayList<>();
TestClock testClock = new TestClock();
- StreamOperatorTask task = new StreamOperatorTask(sgb, null, testClock);
- task.init(config, taskContext);
+ StreamOperatorTask task = new StreamOperatorTask(sgb, testClock);
+ task.init(this.context);
MessageCollector messageCollector =
envelope -> windowPanes.add((WindowPane<Integer, Collection<IntegerEnvelope>>) envelope.getMessage());
@@ -222,8 +221,8 @@ public class TestWindowOperator {
this.getKeyedSessionWindowStreamGraph(AccumulationMode.DISCARDING, Duration.ofMillis(500)).getOperatorSpecGraph();
TestClock testClock = new TestClock();
List<WindowPane<Integer, Collection<IntegerEnvelope>>> windowPanes = new ArrayList<>();
- StreamOperatorTask task = new StreamOperatorTask(sgb, null, testClock);
- task.init(config, taskContext);
+ StreamOperatorTask task = new StreamOperatorTask(sgb, testClock);
+ task.init(this.context);
MessageCollector messageCollector =
envelope -> windowPanes.add((WindowPane<Integer, Collection<IntegerEnvelope>>) envelope.getMessage());
task.process(new IntegerEnvelope(1), messageCollector, taskCoordinator);
@@ -267,12 +266,12 @@ public class TestWindowOperator {
OperatorSpecGraph sgb = this.getKeyedSessionWindowStreamGraph(AccumulationMode.DISCARDING,
Duration.ofMillis(500)).getOperatorSpecGraph();
TestClock testClock = new TestClock();
- StreamOperatorTask task = new StreamOperatorTask(sgb, null, testClock);
+ StreamOperatorTask task = new StreamOperatorTask(sgb, testClock);
List<WindowPane<Integer, Collection<IntegerEnvelope>>> windowPanes = new ArrayList<>();
MessageCollector messageCollector =
envelope -> windowPanes.add((WindowPane<Integer, Collection<IntegerEnvelope>>) envelope.getMessage());
- task.init(config, taskContext);
+ task.init(this.context);
task.process(new IntegerEnvelope(1), messageCollector, taskCoordinator);
task.process(new IntegerEnvelope(1), messageCollector, taskCoordinator);
@@ -299,8 +298,8 @@ public class TestWindowOperator {
OperatorSpecGraph sgb = this.getKeyedTumblingWindowStreamGraph(AccumulationMode.ACCUMULATING,
Duration.ofSeconds(1), Triggers.count(2)).getOperatorSpecGraph();
TestClock testClock = new TestClock();
- StreamOperatorTask task = new StreamOperatorTask(sgb, null, testClock);
- task.init(config, taskContext);
+ StreamOperatorTask task = new StreamOperatorTask(sgb, testClock);
+ task.init(this.context);
List<WindowPane<Integer, Collection<IntegerEnvelope>>> windowPanes = new ArrayList<>();
MessageCollector messageCollector =
@@ -343,8 +342,8 @@ public class TestWindowOperator {
OperatorSpecGraph sgb = this.getKeyedTumblingWindowStreamGraph(AccumulationMode.ACCUMULATING, Duration.ofSeconds(1),
Triggers.any(Triggers.count(2), Triggers.timeSinceFirstMessage(Duration.ofMillis(500)))).getOperatorSpecGraph();
TestClock testClock = new TestClock();
- StreamOperatorTask task = new StreamOperatorTask(sgb, null, testClock);
- task.init(config, taskContext);
+ StreamOperatorTask task = new StreamOperatorTask(sgb, testClock);
+ task.init(this.context);
List<WindowPane<Integer, Collection<IntegerEnvelope>>> windowPanes = new ArrayList<>();
MessageCollector messageCollector =
@@ -406,8 +405,8 @@ public class TestWindowOperator {
envelope -> windowPanes.add((WindowPane<Integer, Collection<IntegerEnvelope>>) envelope.getMessage());
TestClock testClock = new TestClock();
- StreamOperatorTask task = new StreamOperatorTask(sgb, null, testClock);
- task.init(config, taskContext);
+ StreamOperatorTask task = new StreamOperatorTask(sgb, testClock);
+ task.init(this.context);
task.process(new IntegerEnvelope(1), messageCollector, taskCoordinator);
@@ -439,17 +438,18 @@ public class TestWindowOperator {
EndOfStreamStates endOfStreamStates = new EndOfStreamStates(ImmutableSet.of(new SystemStreamPartition("kafka",
"integers", new Partition(0))), Collections.emptyMap());
- when(taskContext.getTaskName()).thenReturn(new TaskName("task 1"));
- when(taskContext.fetchObject(EndOfStreamStates.class.getName())).thenReturn(endOfStreamStates);
- when(taskContext.fetchObject(WatermarkStates.class.getName())).thenReturn(mock(WatermarkStates.class));
+ when(((TaskContextImpl) this.context.getTaskContext()).fetchObject(EndOfStreamStates.class.getName())).thenReturn(
+ endOfStreamStates);
+ when(((TaskContextImpl) this.context.getTaskContext()).fetchObject(WatermarkStates.class.getName())).thenReturn(
+ mock(WatermarkStates.class));
OperatorSpecGraph sgb = this.getTumblingWindowStreamGraph(AccumulationMode.DISCARDING,
Duration.ofSeconds(1), Triggers.repeat(Triggers.count(2))).getOperatorSpecGraph();
List<WindowPane<Integer, Collection<IntegerEnvelope>>> windowPanes = new ArrayList<>();
TestClock testClock = new TestClock();
- StreamOperatorTask task = new StreamOperatorTask(sgb, null, testClock);
- task.init(config, taskContext);
+ StreamOperatorTask task = new StreamOperatorTask(sgb, testClock);
+ task.init(this.context);
MessageCollector messageCollector =
envelope -> windowPanes.add((WindowPane<Integer, Collection<IntegerEnvelope>>) envelope.getMessage());
@@ -480,16 +480,17 @@ public class TestWindowOperator {
EndOfStreamStates endOfStreamStates = new EndOfStreamStates(ImmutableSet.of(new SystemStreamPartition("kafka",
"integers", new Partition(0))), Collections.emptyMap());
- when(taskContext.getTaskName()).thenReturn(new TaskName("task 1"));
- when(taskContext.fetchObject(EndOfStreamStates.class.getName())).thenReturn(endOfStreamStates);
- when(taskContext.fetchObject(WatermarkStates.class.getName())).thenReturn(mock(WatermarkStates.class));
+ when(((TaskContextImpl) this.context.getTaskContext()).fetchObject(EndOfStreamStates.class.getName())).thenReturn(
+ endOfStreamStates);
+ when(((TaskContextImpl) this.context.getTaskContext()).fetchObject(WatermarkStates.class.getName())).thenReturn(
+ mock(WatermarkStates.class));
OperatorSpecGraph sgb =
this.getKeyedSessionWindowStreamGraph(AccumulationMode.DISCARDING, Duration.ofMillis(500)).getOperatorSpecGraph();
TestClock testClock = new TestClock();
List<WindowPane<Integer, Collection<IntegerEnvelope>>> windowPanes = new ArrayList<>();
- StreamOperatorTask task = new StreamOperatorTask(sgb, null, testClock);
- task.init(config, taskContext);
+ StreamOperatorTask task = new StreamOperatorTask(sgb, testClock);
+ task.init(this.context);
MessageCollector messageCollector =
envelope -> windowPanes.add((WindowPane<Integer, Collection<IntegerEnvelope>>) envelope.getMessage());
@@ -517,16 +518,17 @@ public class TestWindowOperator {
EndOfStreamStates endOfStreamStates = new EndOfStreamStates(ImmutableSet.of(new SystemStreamPartition("kafka",
"integers", new Partition(0))), Collections.emptyMap());
- when(taskContext.getTaskName()).thenReturn(new TaskName("task 1"));
- when(taskContext.fetchObject(EndOfStreamStates.class.getName())).thenReturn(endOfStreamStates);
- when(taskContext.fetchObject(WatermarkStates.class.getName())).thenReturn(mock(WatermarkStates.class));
+ when(((TaskContextImpl) this.context.getTaskContext()).fetchObject(EndOfStreamStates.class.getName())).thenReturn(
+ endOfStreamStates);
+ when(((TaskContextImpl) this.context.getTaskContext()).fetchObject(WatermarkStates.class.getName())).thenReturn(
+ mock(WatermarkStates.class));
OperatorSpecGraph sgb =
this.getKeyedSessionWindowStreamGraph(AccumulationMode.DISCARDING, Duration.ofMillis(500)).getOperatorSpecGraph();
TestClock testClock = new TestClock();
List<WindowPane<Integer, Collection<IntegerEnvelope>>> windowPanes = new ArrayList<>();
- StreamOperatorTask task = new StreamOperatorTask(sgb, null, testClock);
- task.init(config, taskContext);
+ StreamOperatorTask task = new StreamOperatorTask(sgb, testClock);
+ task.init(this.context);
MessageCollector messageCollector =
envelope -> windowPanes.add((WindowPane<Integer, Collection<IntegerEnvelope>>) envelope.getMessage());
http://git-wip-us.apache.org/repos/asf/samza/blob/9d2d49e9/samza-core/src/test/java/org/apache/samza/operators/spec/TestOperatorSpec.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/operators/spec/TestOperatorSpec.java b/samza-core/src/test/java/org/apache/samza/operators/spec/TestOperatorSpec.java
index 860e630..6e91e2a 100644
--- a/samza-core/src/test/java/org/apache/samza/operators/spec/TestOperatorSpec.java
+++ b/samza-core/src/test/java/org/apache/samza/operators/spec/TestOperatorSpec.java
@@ -31,9 +31,9 @@ import org.apache.samza.operators.functions.FilterFunction;
import org.apache.samza.operators.functions.FlatMapFunction;
import org.apache.samza.operators.functions.JoinFunction;
import org.apache.samza.operators.functions.MapFunction;
+import org.apache.samza.operators.functions.ScheduledFunction;
import org.apache.samza.operators.functions.SinkFunction;
import org.apache.samza.operators.functions.StreamTableJoinFunction;
-import org.apache.samza.operators.functions.ScheduledFunction;
import org.apache.samza.operators.functions.WatermarkFunction;
import org.apache.samza.serializers.JsonSerdeV2;
import org.apache.samza.serializers.KVSerde;
http://git-wip-us.apache.org/repos/asf/samza/blob/9d2d49e9/samza-core/src/test/java/org/apache/samza/operators/spec/TestPartitionByOperatorSpec.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/operators/spec/TestPartitionByOperatorSpec.java b/samza-core/src/test/java/org/apache/samza/operators/spec/TestPartitionByOperatorSpec.java
index e1342e3..fd4a7fb 100644
--- a/samza-core/src/test/java/org/apache/samza/operators/spec/TestPartitionByOperatorSpec.java
+++ b/samza-core/src/test/java/org/apache/samza/operators/spec/TestPartitionByOperatorSpec.java
@@ -23,9 +23,9 @@ import java.util.Map;
import org.apache.samza.application.StreamApplicationDescriptorImpl;
import org.apache.samza.config.Config;
import org.apache.samza.config.JobConfig;
-import org.apache.samza.operators.Scheduler;
import org.apache.samza.operators.MessageStream;
import org.apache.samza.operators.OperatorSpecGraph;
+import org.apache.samza.operators.Scheduler;
import org.apache.samza.operators.descriptors.GenericInputDescriptor;
import org.apache.samza.operators.descriptors.GenericSystemDescriptor;
import org.apache.samza.operators.functions.MapFunction;
http://git-wip-us.apache.org/repos/asf/samza/blob/9d2d49e9/samza-core/src/test/java/org/apache/samza/operators/spec/TestWindowOperatorSpec.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/operators/spec/TestWindowOperatorSpec.java b/samza-core/src/test/java/org/apache/samza/operators/spec/TestWindowOperatorSpec.java
index 41973b2..b73f8e3 100644
--- a/samza-core/src/test/java/org/apache/samza/operators/spec/TestWindowOperatorSpec.java
+++ b/samza-core/src/test/java/org/apache/samza/operators/spec/TestWindowOperatorSpec.java
@@ -20,16 +20,16 @@
package org.apache.samza.operators.spec;
import org.apache.samza.operators.Scheduler;
-import org.apache.samza.operators.functions.ScheduledFunction;
-import org.apache.samza.operators.functions.WatermarkFunction;
-import org.apache.samza.serializers.Serde;
import org.apache.samza.operators.functions.FoldLeftFunction;
import org.apache.samza.operators.functions.MapFunction;
+import org.apache.samza.operators.functions.ScheduledFunction;
import org.apache.samza.operators.functions.SupplierFunction;
+import org.apache.samza.operators.functions.WatermarkFunction;
import org.apache.samza.operators.triggers.Trigger;
import org.apache.samza.operators.triggers.Triggers;
import org.apache.samza.operators.windows.internal.WindowInternal;
import org.apache.samza.operators.windows.internal.WindowType;
+import org.apache.samza.serializers.Serde;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
@@ -38,7 +38,8 @@ import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
-import static org.junit.Assert.*;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
import static org.mockito.Mockito.mock;
public class TestWindowOperatorSpec {
http://git-wip-us.apache.org/repos/asf/samza/blob/9d2d49e9/samza-core/src/test/java/org/apache/samza/processor/TestStreamProcessor.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/processor/TestStreamProcessor.java b/samza-core/src/test/java/org/apache/samza/processor/TestStreamProcessor.java
index 93b157a..b002e2a 100644
--- a/samza-core/src/test/java/org/apache/samza/processor/TestStreamProcessor.java
+++ b/samza-core/src/test/java/org/apache/samza/processor/TestStreamProcessor.java
@@ -49,7 +49,8 @@ import org.mockito.Mockito;
import org.powermock.api.mockito.PowerMockito;
import static org.junit.Assert.assertEquals;
-import static org.mockito.Matchers.*;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyString;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
@@ -465,8 +466,10 @@ public class TestStreamProcessor {
@Test
public void testStreamProcessorWithStreamProcessorListenerFactory() {
AtomicReference<MockStreamProcessorLifecycleListener> mockListener = new AtomicReference<>();
- StreamProcessor streamProcessor = new StreamProcessor(mock(Config.class), new HashMap<>(), mock(TaskFactory.class),
- sp -> mockListener.updateAndGet(old -> new MockStreamProcessorLifecycleListener(sp)), mock(JobCoordinator.class));
+ StreamProcessor streamProcessor =
+ new StreamProcessor(mock(Config.class), new HashMap<>(), mock(TaskFactory.class), null, null,
+ sp -> mockListener.updateAndGet(old -> new MockStreamProcessorLifecycleListener(sp)),
+ mock(JobCoordinator.class));
assertEquals(streamProcessor, mockListener.get().processor);
}
http://git-wip-us.apache.org/repos/asf/samza/blob/9d2d49e9/samza-core/src/test/java/org/apache/samza/storage/MockStorageEngineFactory.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/storage/MockStorageEngineFactory.java b/samza-core/src/test/java/org/apache/samza/storage/MockStorageEngineFactory.java
index d483ae6..8eff4ad 100644
--- a/samza-core/src/test/java/org/apache/samza/storage/MockStorageEngineFactory.java
+++ b/samza-core/src/test/java/org/apache/samza/storage/MockStorageEngineFactory.java
@@ -20,8 +20,8 @@
package org.apache.samza.storage;
import java.io.File;
-
-import org.apache.samza.container.SamzaContainerContext;
+import org.apache.samza.context.ContainerContext;
+import org.apache.samza.context.JobContext;
import org.apache.samza.metrics.MetricsRegistry;
import org.apache.samza.serializers.Serde;
import org.apache.samza.system.SystemStreamPartition;
@@ -29,9 +29,15 @@ import org.apache.samza.task.MessageCollector;
public class MockStorageEngineFactory implements StorageEngineFactory<Object, Object> {
@Override
- public StorageEngine getStorageEngine(String storeName, File storeDir, Serde<Object> keySerde, Serde<Object> msgSerde,
- MessageCollector collector, MetricsRegistry registry, SystemStreamPartition changeLogSystemStreamPartition,
- SamzaContainerContext containerContext) {
+ public StorageEngine getStorageEngine(String storeName,
+ File storeDir,
+ Serde<Object> keySerde,
+ Serde<Object> msgSerde,
+ MessageCollector collector,
+ MetricsRegistry registry,
+ SystemStreamPartition changeLogSystemStreamPartition,
+ JobContext jobContext,
+ ContainerContext containerContext) {
StoreProperties storeProperties = new StoreProperties.StorePropertiesBuilder().setLoggedStore(true).build();
return new MockStorageEngine(storeName, storeDir, changeLogSystemStreamPartition, storeProperties);
}
http://git-wip-us.apache.org/repos/asf/samza/blob/9d2d49e9/samza-core/src/test/java/org/apache/samza/table/TestTableManager.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/table/TestTableManager.java b/samza-core/src/test/java/org/apache/samza/table/TestTableManager.java
index 42f05c0..0952a87 100644
--- a/samza-core/src/test/java/org/apache/samza/table/TestTableManager.java
+++ b/samza-core/src/test/java/org/apache/samza/table/TestTableManager.java
@@ -18,25 +18,23 @@
*/
package org.apache.samza.table;
-import java.lang.reflect.Field;
-import java.util.Base64;
-import java.util.HashMap;
-import java.util.Map;
-
+import junit.framework.Assert;
import org.apache.samza.SamzaException;
import org.apache.samza.config.JavaTableConfig;
import org.apache.samza.config.MapConfig;
import org.apache.samza.config.SerializerConfig;
-import org.apache.samza.container.SamzaContainerContext;
+import org.apache.samza.context.MockContext;
import org.apache.samza.serializers.IntegerSerde;
import org.apache.samza.serializers.Serde;
import org.apache.samza.serializers.SerializableSerde;
import org.apache.samza.serializers.StringSerde;
import org.apache.samza.storage.StorageEngine;
-import org.apache.samza.task.TaskContext;
import org.junit.Test;
-import junit.framework.Assert;
+import java.lang.reflect.Field;
+import java.util.Base64;
+import java.util.HashMap;
+import java.util.Map;
import static org.mockito.Matchers.anyObject;
import static org.mockito.Mockito.mock;
@@ -122,11 +120,11 @@ public class TestTableManager {
});
TableManager tableManager = new TableManager(new MapConfig(map), serdeMap);
- tableManager.init(mock(SamzaContainerContext.class), mock(TaskContext.class));
+ tableManager.init(new MockContext());
for (int i = 0; i < 2; i++) {
Table table = tableManager.getTable(TABLE_ID);
- verify(DummyTableProviderFactory.tableProvider, times(1)).init(anyObject(), anyObject());
+ verify(DummyTableProviderFactory.tableProvider, times(1)).init(anyObject());
verify(DummyTableProviderFactory.tableProvider, times(1)).getTable();
Assert.assertEquals(DummyTableProviderFactory.table, table);
}
http://git-wip-us.apache.org/repos/asf/samza/blob/9d2d49e9/samza-core/src/test/java/org/apache/samza/table/caching/TestCachingTable.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/table/caching/TestCachingTable.java b/samza-core/src/test/java/org/apache/samza/table/caching/TestCachingTable.java
index ec1c915..dc13d00 100644
--- a/samza-core/src/test/java/org/apache/samza/table/caching/TestCachingTable.java
+++ b/samza-core/src/test/java/org/apache/samza/table/caching/TestCachingTable.java
@@ -19,19 +19,11 @@
package org.apache.samza.table.caching;
-import java.time.Duration;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.Executors;
-
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
import org.apache.commons.lang3.tuple.Pair;
-import org.apache.samza.container.SamzaContainerContext;
+import org.apache.samza.context.Context;
+import org.apache.samza.context.MockContext;
import org.apache.samza.metrics.Counter;
import org.apache.samza.metrics.Gauge;
import org.apache.samza.metrics.MetricsRegistry;
@@ -45,17 +37,24 @@ import org.apache.samza.table.TableSpec;
import org.apache.samza.table.caching.guava.GuavaCacheTable;
import org.apache.samza.table.caching.guava.GuavaCacheTableDescriptor;
import org.apache.samza.table.caching.guava.GuavaCacheTableProvider;
-import org.apache.samza.table.remote.TableRateLimiter;
import org.apache.samza.table.remote.RemoteReadWriteTable;
+import org.apache.samza.table.remote.TableRateLimiter;
import org.apache.samza.table.remote.TableReadFunction;
import org.apache.samza.table.remote.TableWriteFunction;
-import org.apache.samza.task.TaskContext;
import org.apache.samza.util.NoOpMetricsRegistry;
import org.junit.Assert;
import org.junit.Test;
-import com.google.common.cache.Cache;
-import com.google.common.cache.CacheBuilder;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executors;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyString;
@@ -139,15 +138,14 @@ public class TestCachingTable {
}
private void initTables(ReadableTable ... tables) {
- SamzaContainerContext containerContext = mock(SamzaContainerContext.class);
- TaskContext taskContext = mock(TaskContext.class);
+ Context context = new MockContext();
MetricsRegistry metricsRegistry = mock(MetricsRegistry.class);
doReturn(mock(Timer.class)).when(metricsRegistry).newTimer(anyString(), anyString());
doReturn(mock(Counter.class)).when(metricsRegistry).newCounter(anyString(), anyString());
doReturn(mock(Gauge.class)).when(metricsRegistry).newGauge(anyString(), any());
- when(taskContext.getMetricsRegistry()).thenReturn(metricsRegistry);
+ when(context.getTaskContext().getTaskMetricsRegistry()).thenReturn(metricsRegistry);
for (ReadableTable table : tables) {
- table.init(containerContext, taskContext);
+ table.init(context);
}
}
@@ -160,9 +158,7 @@ public class TestCachingTable {
}
CachingTableProvider tableProvider = new CachingTableProvider(desc.getTableSpec());
- SamzaContainerContext containerContext = mock(SamzaContainerContext.class);
-
- TaskContext taskContext = mock(TaskContext.class);
+ Context context = new MockContext();
final ReadWriteTable cacheTable = getMockCache().getLeft();
final ReadWriteTable realTable = mock(ReadWriteTable.class);
@@ -185,11 +181,11 @@ public class TestCachingTable {
Assert.fail();
return null;
- }).when(taskContext).getTable(anyString());
+ }).when(context.getTaskContext()).getTable(anyString());
- when(taskContext.getMetricsRegistry()).thenReturn(new NoOpMetricsRegistry());
+ when(context.getTaskContext().getTaskMetricsRegistry()).thenReturn(new NoOpMetricsRegistry());
- tableProvider.init(containerContext, taskContext);
+ tableProvider.init(context);
CachingTable cachingTable = (CachingTable) tableProvider.getTable();
http://git-wip-us.apache.org/repos/asf/samza/blob/9d2d49e9/samza-core/src/test/java/org/apache/samza/table/remote/TestRemoteTable.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/table/remote/TestRemoteTable.java b/samza-core/src/test/java/org/apache/samza/table/remote/TestRemoteTable.java
index 3e844c3..571f87b 100644
--- a/samza-core/src/test/java/org/apache/samza/table/remote/TestRemoteTable.java
+++ b/samza-core/src/test/java/org/apache/samza/table/remote/TestRemoteTable.java
@@ -19,17 +19,9 @@
package org.apache.samza.table.remote;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-
-import org.apache.samza.container.SamzaContainerContext;
+import junit.framework.Assert;
+import org.apache.samza.context.Context;
+import org.apache.samza.context.MockContext;
import org.apache.samza.metrics.Counter;
import org.apache.samza.metrics.Gauge;
import org.apache.samza.metrics.MetricsRegistry;
@@ -38,11 +30,18 @@ import org.apache.samza.storage.kv.Entry;
import org.apache.samza.table.retry.RetriableReadFunction;
import org.apache.samza.table.retry.RetriableWriteFunction;
import org.apache.samza.table.retry.TableRetryPolicy;
-import org.apache.samza.task.TaskContext;
import org.junit.Test;
import org.mockito.ArgumentCaptor;
-import junit.framework.Assert;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyCollection;
@@ -57,14 +56,14 @@ import static org.mockito.Mockito.verify;
public class TestRemoteTable {
private final ScheduledExecutorService schedExec = Executors.newSingleThreadScheduledExecutor();
- public static TaskContext getMockTaskContext() {
+ public static Context getMockContext() {
+ Context context = new MockContext();
MetricsRegistry metricsRegistry = mock(MetricsRegistry.class);
doAnswer(args -> new Timer((String) args.getArguments()[0])).when(metricsRegistry).newTimer(anyString(), anyString());
doAnswer(args -> new Counter((String) args.getArguments()[0])).when(metricsRegistry).newCounter(anyString(), anyString());
doAnswer(args -> new Gauge((String) args.getArguments()[0], 0)).when(metricsRegistry).newGauge(anyString(), any());
- TaskContext taskContext = mock(TaskContext.class);
- doReturn(metricsRegistry).when(taskContext).getMetricsRegistry();
- return taskContext;
+ doReturn(metricsRegistry).when(context.getTaskContext()).getTaskMetricsRegistry();
+ return context;
}
private <K, V, T extends RemoteReadableTable<K, V>> T getTable(String tableId,
@@ -89,11 +88,9 @@ public class TestRemoteTable {
table = new RemoteReadWriteTable<K, V>(tableId, readFn, writeFn, readRateLimiter, writeRateLimiter, tableExecutor, cbExecutor);
}
- TaskContext taskContext = getMockTaskContext();
-
- SamzaContainerContext containerContext = mock(SamzaContainerContext.class);
+ Context context = getMockContext();
- table.init(containerContext, taskContext);
+ table.init(context);
return (T) table;
}