You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by xi...@apache.org on 2017/11/08 02:02:39 UTC

samza git commit: Fix test failures caused by PR345

Repository: samza
Updated Branches:
  refs/heads/master adfc4bfc4 -> d806e9dab


Fix test failures caused by PR345


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/d806e9da
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/d806e9da
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/d806e9da

Branch: refs/heads/master
Commit: d806e9dab12c2299c55fa6e5e95d640f3a008c28
Parents: adfc4bf
Author: xiliu <xi...@xiliu-ld1.linkedin.biz>
Authored: Tue Nov 7 18:00:03 2017 -0800
Committer: xiliu <xi...@xiliu-ld1.linkedin.biz>
Committed: Tue Nov 7 18:00:03 2017 -0800

----------------------------------------------------------------------
 .../org/apache/samza/operators/impl/OperatorImpl.java   | 12 +++++++++---
 .../samza/operators/impl/TestOperatorImplGraph.java     | 10 +++++++++-
 2 files changed, 18 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/d806e9da/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImpl.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImpl.java b/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImpl.java
index 96dcd89..92a563a 100644
--- a/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImpl.java
+++ b/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImpl.java
@@ -111,9 +111,15 @@ public abstract class OperatorImpl<M, RM> {
     TaskContextImpl taskContext = (TaskContextImpl) context;
     this.eosStates = (EndOfStreamStates) taskContext.fetchObject(EndOfStreamStates.class.getName());
     this.watermarkStates = (WatermarkStates) taskContext.fetchObject(WatermarkStates.class.getName());
-    ContainerModel containerModel = taskContext.getJobModel().getContainers()
-        .get(context.getSamzaContainerContext().id);
-    this.taskModel = containerModel.getTasks().get(taskName);
+
+    if (taskContext.getJobModel() != null) {
+      ContainerModel containerModel = taskContext.getJobModel().getContainers()
+          .get(context.getSamzaContainerContext().id);
+      this.taskModel = containerModel.getTasks().get(taskName);
+    } else {
+      this.taskModel = null;
+      this.usedInCurrentTask = true;
+    }
 
     handleInit(config, context);
 

http://git-wip-us.apache.org/repos/asf/samza/blob/d806e9da/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImplGraph.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImplGraph.java b/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImplGraph.java
index 47e55a8..3f48cf2 100644
--- a/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImplGraph.java
+++ b/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImplGraph.java
@@ -30,6 +30,7 @@ import org.apache.samza.Partition;
 import org.apache.samza.config.Config;
 import org.apache.samza.config.JobConfig;
 import org.apache.samza.config.MapConfig;
+import org.apache.samza.container.SamzaContainerContext;
 import org.apache.samza.container.TaskContextImpl;
 import org.apache.samza.container.TaskName;
 import org.apache.samza.job.model.ContainerModel;
@@ -149,8 +150,15 @@ public class TestOperatorImplGraph {
     when(mockTaskContext.getMetricsRegistry()).thenReturn(new MetricsRegistryMap());
     when(mockTaskContext.getTaskName()).thenReturn(new TaskName("task 0"));
     JobModel jobModel = mock(JobModel.class);
-    when(jobModel.getContainers()).thenReturn(Collections.EMPTY_MAP);
+    ContainerModel containerModel = mock(ContainerModel.class);
+    TaskModel taskModel = mock(TaskModel.class);
+    when(jobModel.getContainers()).thenReturn(Collections.singletonMap("0", containerModel));
+    when(containerModel.getTasks()).thenReturn(Collections.singletonMap(new TaskName("task 0"), taskModel));
+    when(taskModel.getSystemStreamPartitions()).thenReturn(Collections.emptySet());
     when(mockTaskContext.getJobModel()).thenReturn(jobModel);
+    SamzaContainerContext containerContext =
+        new SamzaContainerContext("0", mockConfig, Collections.singleton(new TaskName("task 0")));
+    when(mockTaskContext.getSamzaContainerContext()).thenReturn(containerContext);
     OperatorImplGraph opImplGraph =
         new OperatorImplGraph(streamGraph, mockConfig, mockTaskContext, mock(Clock.class));