You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by at...@apache.org on 2019/02/25 21:39:44 UTC
[samza] branch master updated: Ignoring standby tasks when grouping
This is an automated email from the ASF dual-hosted git repository.
atoomula pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git
The following commit(s) were added to refs/heads/master by this push:
new 2c86eed Ignoring standby tasks when grouping
2c86eed is described below
commit 2c86eed8943096cab0bf99cdd037f1cfe48227b0
Author: Ray Matharu <rm...@linkedin.com>
AuthorDate: Mon Feb 25 13:39:40 2019 -0800
Ignoring standby tasks when grouping
Author: Ray Matharu <rm...@linkedin.com>
Reviewers: shanthoosh
Closes #931 from rmatharu/test-standby-grouper-fix and squashes the following commits:
767ca92e [Ray Matharu] Updating test
4eab3c18 [Ray Matharu] Minor
1394a6d7 [Ray Matharu] Igoring standby tasks when grouping
96e3d8f3 [Ray Matharu] Merge branch 'master' of https://github.com/apache/samza
40f68a61 [Ray Matharu] Merge branch 'master' of https://github.com/apache/samza
497602ab [Ray Matharu] Merge branch 'master' of https://github.com/apache/samza
1a72dc48 [Ray Matharu] Merge branch 'master' of https://github.com/apache/samza
36c0b339 [Ray Matharu] Merge branch 'master' of https://github.com/apache/samza
12ca96bb [Ray Matharu] Merge branch 'master' of https://github.com/apache/samza
ee7daac8 [Ray Matharu] Merge branch 'master' of https://github.com/apache/samza
08006871 [Ray Matharu] Merge branch 'master' of https://github.com/apache/samza
916f66ae [Ray Matharu] Merge branch 'master' of https://github.com/apache/samza
2c09b081 [Ray Matharu] Rocksdb bug fix
---
.../org/apache/samza/coordinator/JobModelManager.scala | 14 +++++++++-----
.../org/apache/samza/coordinator/TestJobModelManager.java | 2 +-
2 files changed, 10 insertions(+), 6 deletions(-)
diff --git a/samza-core/src/main/scala/org/apache/samza/coordinator/JobModelManager.scala b/samza-core/src/main/scala/org/apache/samza/coordinator/JobModelManager.scala
index 3243947..a34f976 100644
--- a/samza-core/src/main/scala/org/apache/samza/coordinator/JobModelManager.scala
+++ b/samza-core/src/main/scala/org/apache/samza/coordinator/JobModelManager.scala
@@ -110,7 +110,7 @@ object JobModelManager extends Logging {
val processorLocality: util.Map[String, LocationId] = getProcessorLocality(config, localityManager)
val taskModes: util.Map[TaskName, TaskMode] = taskAssignmentManager.readTaskModes()
- // we read the taskAssignment only for ActiveTasks
+ // We read the taskAssignment only for ActiveTasks
val taskAssignment: util.Map[String, String] = taskAssignmentManager.readTaskAssignment().
filterKeys(taskName => taskModes.get(new TaskName(taskName)).eq(TaskMode.Active))
@@ -136,10 +136,14 @@ object JobModelManager extends Logging {
sspToTaskMapping foreach { case (systemStreamPartition: SystemStreamPartition, taskNames: util.List[String]) =>
for (task <- taskNames) {
val taskName: TaskName = new TaskName(task)
- if (!taskPartitionAssignments.containsKey(taskName)) {
- taskPartitionAssignments.put(taskName, new util.ArrayList[SystemStreamPartition]())
+
+ // We read the partition assignments only for active-tasks
+ if (taskModes.get(taskName).eq(TaskMode.Active)) {
+ if (!taskPartitionAssignments.containsKey(taskName)) {
+ taskPartitionAssignments.put(taskName, new util.ArrayList[SystemStreamPartition]())
+ }
+ taskPartitionAssignments.get(taskName).add(systemStreamPartition)
}
- taskPartitionAssignments.get(taskName).add(systemStreamPartition)
}
}
new GrouperMetadataImpl(processorLocality, taskLocality, taskPartitionAssignments, taskNameToProcessorId)
@@ -216,7 +220,7 @@ object JobModelManager extends Logging {
// if the set of standby tasks has changed, e.g., when the replication-factor changed, or the active-tasks-set has
// changed, we log a warning and delete the existing mapping for these tasks
val previousStandbyTasks = taskAssignmentManager.readTaskModes().filter(x => x._2.eq(TaskMode.Standby))
- if(standbyTaskNames.asScala != previousStandbyTasks.keySet) {
+ if(standbyTaskNames.asScala.eq(previousStandbyTasks.keySet)) {
info("The set of standby tasks has changed, current standby tasks %s, previous standby tasks %s" format (standbyTaskNames, previousStandbyTasks.keySet))
taskAssignmentManager.deleteTaskContainerMappings(previousStandbyTasks.map(x => x._1.getTaskName).asJava)
}
diff --git a/samza-core/src/test/java/org/apache/samza/coordinator/TestJobModelManager.java b/samza-core/src/test/java/org/apache/samza/coordinator/TestJobModelManager.java
index 6e627b0..bf7f16f 100644
--- a/samza-core/src/test/java/org/apache/samza/coordinator/TestJobModelManager.java
+++ b/samza-core/src/test/java/org/apache/samza/coordinator/TestJobModelManager.java
@@ -186,7 +186,7 @@ public class TestJobModelManager {
// Mock the container to task assignment.
when(mockTaskAssignmentManager.readTaskAssignment()).thenReturn(taskAssignment);
- when(mockTaskAssignmentManager.readTaskModes()).thenReturn(Collections.singletonMap(new TaskName("task-0"), TaskMode.Active));
+ when(mockTaskAssignmentManager.readTaskModes()).thenReturn(ImmutableMap.of(new TaskName("task-0"), TaskMode.Active, new TaskName("task-1"), TaskMode.Active, new TaskName("task-2"), TaskMode.Active, new TaskName("task-3"), TaskMode.Active));
GrouperMetadataImpl grouperMetadata = JobModelManager.getGrouperMetadata(new MapConfig(), mockLocalityManager, mockTaskAssignmentManager, mockTaskPartitionAssignmentManager);