You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by ni...@apache.org on 2016/05/12 15:17:09 UTC

samza git commit: SAMZA-947: TaskAssignmentManager registration exception when partition count changes.

Repository: samza
Updated Branches:
  refs/heads/master 9d3a68794 -> 78a85047c


SAMZA-947: TaskAssignmentManager registration exception when partition count changes.


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/78a85047
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/78a85047
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/78a85047

Branch: refs/heads/master
Commit: 78a85047c71d53ad9ad74e5eed5b9ccf1d9cce36
Parents: 9d3a687
Author: Jacob Maes <ja...@gmail.com>
Authored: Thu May 12 08:16:46 2016 -0700
Committer: Yi Pan (Data Infrastructure) <ni...@gmail.com>
Committed: Thu May 12 08:16:46 2016 -0700

----------------------------------------------------------------------
 .../container/grouper/task/GroupByContainerCount.java   |  1 -
 .../container/grouper/task/TaskAssignmentManager.java   | 12 +++++++++---
 .../grouper/task/TestTaskAssignmentManager.java         | 10 +++-------
 3 files changed, 12 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/78a85047/samza-core/src/main/java/org/apache/samza/container/grouper/task/GroupByContainerCount.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/container/grouper/task/GroupByContainerCount.java b/samza-core/src/main/java/org/apache/samza/container/grouper/task/GroupByContainerCount.java
index 286ea1b..5e6ccf8 100644
--- a/samza-core/src/main/java/org/apache/samza/container/grouper/task/GroupByContainerCount.java
+++ b/samza-core/src/main/java/org/apache/samza/container/grouper/task/GroupByContainerCount.java
@@ -176,7 +176,6 @@ public class GroupByContainerCount implements BalancingTaskNameGrouper {
    * @param taskAssignmentManager the {@link TaskAssignmentManager} that will be used to save the mappings.
    */
   private void saveTaskAssignments(Set<ContainerModel> containers, TaskAssignmentManager taskAssignmentManager) {
-    taskAssignmentManager.register(null);
     for (ContainerModel container : containers) {
       for (TaskName taskName : container.getTasks().keySet()) {
         taskAssignmentManager.writeTaskContainerMapping(taskName.getTaskName(), container.getContainerId());

http://git-wip-us.apache.org/repos/asf/samza/blob/78a85047/samza-core/src/main/java/org/apache/samza/container/grouper/task/TaskAssignmentManager.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/container/grouper/task/TaskAssignmentManager.java b/samza-core/src/main/java/org/apache/samza/container/grouper/task/TaskAssignmentManager.java
index 0cbdec8..6473dfb 100644
--- a/samza-core/src/main/java/org/apache/samza/container/grouper/task/TaskAssignmentManager.java
+++ b/samza-core/src/main/java/org/apache/samza/container/grouper/task/TaskAssignmentManager.java
@@ -39,6 +39,7 @@ import org.slf4j.LoggerFactory;
 public class TaskAssignmentManager extends AbstractCoordinatorStreamManager {
   private static final Logger log = LoggerFactory.getLogger(TaskAssignmentManager.class);
   private final Map<String, Integer> taskNameToContainerId = new HashMap<>();
+  private boolean registered = false;
 
   /**
    * Default constructor that creates a read-write manager
@@ -49,13 +50,18 @@ public class TaskAssignmentManager extends AbstractCoordinatorStreamManager {
   public TaskAssignmentManager(CoordinatorStreamSystemProducer coordinatorStreamProducer,
                          CoordinatorStreamSystemConsumer coordinatorStreamConsumer) {
     super(coordinatorStreamProducer, coordinatorStreamConsumer, "SamzaTaskAssignmentManager");
+    register(null);
   }
 
   @Override
   public void register(TaskName taskName) {
-    // taskName will not be used. This producer is global scope.
-    registerCoordinatorStreamConsumer();
-    registerCoordinatorStreamProducer(getSource());
+    if (!registered) {
+      // taskName will not be used. This producer is global scope.
+      registerCoordinatorStreamProducer(getSource());
+      // We don't register the consumer because we don't manage the consumer's
+      // lifecycle. Also, we don't need to set any properties on the consumer.
+      registered = true;
+    }
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/samza/blob/78a85047/samza-core/src/test/java/org/apache/samza/container/grouper/task/TestTaskAssignmentManager.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/container/grouper/task/TestTaskAssignmentManager.java b/samza-core/src/test/java/org/apache/samza/container/grouper/task/TestTaskAssignmentManager.java
index 19ab78e..d813fdc 100644
--- a/samza-core/src/test/java/org/apache/samza/container/grouper/task/TestTaskAssignmentManager.java
+++ b/samza-core/src/test/java/org/apache/samza/container/grouper/task/TestTaskAssignmentManager.java
@@ -23,7 +23,6 @@ import java.util.HashMap;
 import java.util.Map;
 import org.apache.samza.config.Config;
 import org.apache.samza.config.MapConfig;
-import org.apache.samza.container.TaskName;
 import org.apache.samza.coordinator.stream.MockCoordinatorStreamSystemFactory;
 import org.apache.samza.coordinator.stream.MockCoordinatorStreamSystemFactory.MockCoordinatorStreamSystemConsumer;
 import org.apache.samza.coordinator.stream.MockCoordinatorStreamSystemFactory.MockCoordinatorStreamSystemProducer;
@@ -61,12 +60,11 @@ public class TestTaskAssignmentManager {
         mockCoordinatorStreamSystemFactory.getCoordinatorStreamSystemProducer(config, null);
     MockCoordinatorStreamSystemConsumer consumer =
         mockCoordinatorStreamSystemFactory.getCoordinatorStreamSystemConsumer(config, null);
+    consumer.register();
     TaskAssignmentManager taskAssignmentManager = new TaskAssignmentManager(producer, consumer);
 
-    taskAssignmentManager.register(new TaskName("ignoredTaskName"));
     assertTrue(producer.isRegistered());
     assertEquals(producer.getRegisteredSource(), "SamzaTaskAssignmentManager");
-    assertTrue(consumer.isRegistered());
 
     taskAssignmentManager.start();
     assertTrue(producer.isStarted());
@@ -101,12 +99,11 @@ public class TestTaskAssignmentManager {
         mockCoordinatorStreamSystemFactory.getCoordinatorStreamSystemProducer(config, null);
     MockCoordinatorStreamSystemConsumer consumer =
         mockCoordinatorStreamSystemFactory.getCoordinatorStreamSystemConsumer(config, null);
+    consumer.register();
     TaskAssignmentManager taskAssignmentManager = new TaskAssignmentManager(producer, consumer);
 
-    taskAssignmentManager.register(new TaskName("ignoredTaskName"));
     assertTrue(producer.isRegistered());
     assertEquals(producer.getRegisteredSource(), "SamzaTaskAssignmentManager");
-    assertTrue(consumer.isRegistered());
 
     taskAssignmentManager.start();
     assertTrue(producer.isStarted());
@@ -141,12 +138,11 @@ public class TestTaskAssignmentManager {
         mockCoordinatorStreamSystemFactory.getCoordinatorStreamSystemProducer(config, null);
     MockCoordinatorStreamSystemConsumer consumer =
         mockCoordinatorStreamSystemFactory.getCoordinatorStreamSystemConsumer(config, null);
+    consumer.register();
     TaskAssignmentManager taskAssignmentManager = new TaskAssignmentManager(producer, consumer);
 
-    taskAssignmentManager.register(new TaskName("ignoredTaskName"));
     assertTrue(producer.isRegistered());
     assertEquals(producer.getRegisteredSource(), "SamzaTaskAssignmentManager");
-    assertTrue(consumer.isRegistered());
 
     taskAssignmentManager.start();
     assertTrue(producer.isStarted());