You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by at...@apache.org on 2019/02/25 21:39:44 UTC

[samza] branch master updated: Ignoring standby tasks when grouping

This is an automated email from the ASF dual-hosted git repository.

atoomula pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git


The following commit(s) were added to refs/heads/master by this push:
     new 2c86eed  Ignoring standby tasks when grouping
2c86eed is described below

commit 2c86eed8943096cab0bf99cdd037f1cfe48227b0
Author: Ray Matharu <rm...@linkedin.com>
AuthorDate: Mon Feb 25 13:39:40 2019 -0800

    Ignoring standby tasks when grouping
    
    Author: Ray Matharu <rm...@linkedin.com>
    
    Reviewers: shanthoosh
    
    Closes #931 from rmatharu/test-standby-grouper-fix and squashes the following commits:
    
    767ca92e [Ray Matharu] Updating test
    4eab3c18 [Ray Matharu] Minor
    1394a6d7 [Ray Matharu] Igoring standby tasks when grouping
    96e3d8f3 [Ray Matharu] Merge branch 'master' of https://github.com/apache/samza
    40f68a61 [Ray Matharu] Merge branch 'master' of https://github.com/apache/samza
    497602ab [Ray Matharu] Merge branch 'master' of https://github.com/apache/samza
    1a72dc48 [Ray Matharu] Merge branch 'master' of https://github.com/apache/samza
    36c0b339 [Ray Matharu] Merge branch 'master' of https://github.com/apache/samza
    12ca96bb [Ray Matharu] Merge branch 'master' of https://github.com/apache/samza
    ee7daac8 [Ray Matharu] Merge branch 'master' of https://github.com/apache/samza
    08006871 [Ray Matharu] Merge branch 'master' of https://github.com/apache/samza
    916f66ae [Ray Matharu] Merge branch 'master' of https://github.com/apache/samza
    2c09b081 [Ray Matharu] Rocksdb bug fix
---
 .../org/apache/samza/coordinator/JobModelManager.scala     | 14 +++++++++-----
 .../org/apache/samza/coordinator/TestJobModelManager.java  |  2 +-
 2 files changed, 10 insertions(+), 6 deletions(-)

diff --git a/samza-core/src/main/scala/org/apache/samza/coordinator/JobModelManager.scala b/samza-core/src/main/scala/org/apache/samza/coordinator/JobModelManager.scala
index 3243947..a34f976 100644
--- a/samza-core/src/main/scala/org/apache/samza/coordinator/JobModelManager.scala
+++ b/samza-core/src/main/scala/org/apache/samza/coordinator/JobModelManager.scala
@@ -110,7 +110,7 @@ object JobModelManager extends Logging {
     val processorLocality: util.Map[String, LocationId] = getProcessorLocality(config, localityManager)
     val taskModes: util.Map[TaskName, TaskMode] = taskAssignmentManager.readTaskModes()
 
-    // we read the taskAssignment only for ActiveTasks
+    // We read the taskAssignment only for ActiveTasks
     val taskAssignment: util.Map[String, String] = taskAssignmentManager.readTaskAssignment().
       filterKeys(taskName => taskModes.get(new TaskName(taskName)).eq(TaskMode.Active))
 
@@ -136,10 +136,14 @@ object JobModelManager extends Logging {
     sspToTaskMapping foreach { case (systemStreamPartition: SystemStreamPartition, taskNames: util.List[String]) =>
       for (task <- taskNames) {
         val taskName: TaskName = new TaskName(task)
-        if (!taskPartitionAssignments.containsKey(taskName)) {
-          taskPartitionAssignments.put(taskName, new util.ArrayList[SystemStreamPartition]())
+
+        // We read the partition assignments only for active-tasks
+        if (taskModes.get(taskName).eq(TaskMode.Active)) {
+          if (!taskPartitionAssignments.containsKey(taskName)) {
+            taskPartitionAssignments.put(taskName, new util.ArrayList[SystemStreamPartition]())
+          }
+          taskPartitionAssignments.get(taskName).add(systemStreamPartition)
         }
-        taskPartitionAssignments.get(taskName).add(systemStreamPartition)
       }
     }
     new GrouperMetadataImpl(processorLocality, taskLocality, taskPartitionAssignments, taskNameToProcessorId)
@@ -216,7 +220,7 @@ object JobModelManager extends Logging {
     // if the set of standby tasks has changed, e.g., when the replication-factor changed, or the active-tasks-set has
     // changed, we log a warning and delete the existing mapping for these tasks
     val previousStandbyTasks = taskAssignmentManager.readTaskModes().filter(x => x._2.eq(TaskMode.Standby))
-    if(standbyTaskNames.asScala != previousStandbyTasks.keySet) {
+    if(standbyTaskNames.asScala.eq(previousStandbyTasks.keySet)) {
       info("The set of standby tasks has changed, current standby tasks %s, previous standby tasks %s" format (standbyTaskNames, previousStandbyTasks.keySet))
       taskAssignmentManager.deleteTaskContainerMappings(previousStandbyTasks.map(x => x._1.getTaskName).asJava)
     }
diff --git a/samza-core/src/test/java/org/apache/samza/coordinator/TestJobModelManager.java b/samza-core/src/test/java/org/apache/samza/coordinator/TestJobModelManager.java
index 6e627b0..bf7f16f 100644
--- a/samza-core/src/test/java/org/apache/samza/coordinator/TestJobModelManager.java
+++ b/samza-core/src/test/java/org/apache/samza/coordinator/TestJobModelManager.java
@@ -186,7 +186,7 @@ public class TestJobModelManager {
 
     // Mock the container to task assignment.
     when(mockTaskAssignmentManager.readTaskAssignment()).thenReturn(taskAssignment);
-    when(mockTaskAssignmentManager.readTaskModes()).thenReturn(Collections.singletonMap(new TaskName("task-0"), TaskMode.Active));
+    when(mockTaskAssignmentManager.readTaskModes()).thenReturn(ImmutableMap.of(new TaskName("task-0"), TaskMode.Active, new TaskName("task-1"), TaskMode.Active, new TaskName("task-2"), TaskMode.Active, new TaskName("task-3"), TaskMode.Active));
 
     GrouperMetadataImpl grouperMetadata = JobModelManager.getGrouperMetadata(new MapConfig(), mockLocalityManager, mockTaskAssignmentManager, mockTaskPartitionAssignmentManager);