You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by xi...@apache.org on 2017/11/08 02:02:39 UTC
samza git commit: Fix test failures caused by PR345
Repository: samza
Updated Branches:
refs/heads/master adfc4bfc4 -> d806e9dab
Fix test failures caused by PR345
Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/d806e9da
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/d806e9da
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/d806e9da
Branch: refs/heads/master
Commit: d806e9dab12c2299c55fa6e5e95d640f3a008c28
Parents: adfc4bf
Author: xiliu <xi...@xiliu-ld1.linkedin.biz>
Authored: Tue Nov 7 18:00:03 2017 -0800
Committer: xiliu <xi...@xiliu-ld1.linkedin.biz>
Committed: Tue Nov 7 18:00:03 2017 -0800
----------------------------------------------------------------------
.../org/apache/samza/operators/impl/OperatorImpl.java | 12 +++++++++---
.../samza/operators/impl/TestOperatorImplGraph.java | 10 +++++++++-
2 files changed, 18 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/samza/blob/d806e9da/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImpl.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImpl.java b/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImpl.java
index 96dcd89..92a563a 100644
--- a/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImpl.java
+++ b/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImpl.java
@@ -111,9 +111,15 @@ public abstract class OperatorImpl<M, RM> {
TaskContextImpl taskContext = (TaskContextImpl) context;
this.eosStates = (EndOfStreamStates) taskContext.fetchObject(EndOfStreamStates.class.getName());
this.watermarkStates = (WatermarkStates) taskContext.fetchObject(WatermarkStates.class.getName());
- ContainerModel containerModel = taskContext.getJobModel().getContainers()
- .get(context.getSamzaContainerContext().id);
- this.taskModel = containerModel.getTasks().get(taskName);
+
+ if (taskContext.getJobModel() != null) {
+ ContainerModel containerModel = taskContext.getJobModel().getContainers()
+ .get(context.getSamzaContainerContext().id);
+ this.taskModel = containerModel.getTasks().get(taskName);
+ } else {
+ this.taskModel = null;
+ this.usedInCurrentTask = true;
+ }
handleInit(config, context);
http://git-wip-us.apache.org/repos/asf/samza/blob/d806e9da/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImplGraph.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImplGraph.java b/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImplGraph.java
index 47e55a8..3f48cf2 100644
--- a/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImplGraph.java
+++ b/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImplGraph.java
@@ -30,6 +30,7 @@ import org.apache.samza.Partition;
import org.apache.samza.config.Config;
import org.apache.samza.config.JobConfig;
import org.apache.samza.config.MapConfig;
+import org.apache.samza.container.SamzaContainerContext;
import org.apache.samza.container.TaskContextImpl;
import org.apache.samza.container.TaskName;
import org.apache.samza.job.model.ContainerModel;
@@ -149,8 +150,15 @@ public class TestOperatorImplGraph {
when(mockTaskContext.getMetricsRegistry()).thenReturn(new MetricsRegistryMap());
when(mockTaskContext.getTaskName()).thenReturn(new TaskName("task 0"));
JobModel jobModel = mock(JobModel.class);
- when(jobModel.getContainers()).thenReturn(Collections.EMPTY_MAP);
+ ContainerModel containerModel = mock(ContainerModel.class);
+ TaskModel taskModel = mock(TaskModel.class);
+ when(jobModel.getContainers()).thenReturn(Collections.singletonMap("0", containerModel));
+ when(containerModel.getTasks()).thenReturn(Collections.singletonMap(new TaskName("task 0"), taskModel));
+ when(taskModel.getSystemStreamPartitions()).thenReturn(Collections.emptySet());
when(mockTaskContext.getJobModel()).thenReturn(jobModel);
+ SamzaContainerContext containerContext =
+ new SamzaContainerContext("0", mockConfig, Collections.singleton(new TaskName("task 0")));
+ when(mockTaskContext.getSamzaContainerContext()).thenReturn(containerContext);
OperatorImplGraph opImplGraph =
new OperatorImplGraph(streamGraph, mockConfig, mockTaskContext, mock(Clock.class));