You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by ni...@apache.org on 2016/05/12 15:17:09 UTC
samza git commit: SAMZA-947: TaskAssignmentManager registration
exception when partition count changes.
Repository: samza
Updated Branches:
refs/heads/master 9d3a68794 -> 78a85047c
SAMZA-947: TaskAssignmentManager registration exception when partition count changes.
Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/78a85047
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/78a85047
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/78a85047
Branch: refs/heads/master
Commit: 78a85047c71d53ad9ad74e5eed5b9ccf1d9cce36
Parents: 9d3a687
Author: Jacob Maes <ja...@gmail.com>
Authored: Thu May 12 08:16:46 2016 -0700
Committer: Yi Pan (Data Infrastructure) <ni...@gmail.com>
Committed: Thu May 12 08:16:46 2016 -0700
----------------------------------------------------------------------
.../container/grouper/task/GroupByContainerCount.java | 1 -
.../container/grouper/task/TaskAssignmentManager.java | 12 +++++++++---
.../grouper/task/TestTaskAssignmentManager.java | 10 +++-------
3 files changed, 12 insertions(+), 11 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/samza/blob/78a85047/samza-core/src/main/java/org/apache/samza/container/grouper/task/GroupByContainerCount.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/container/grouper/task/GroupByContainerCount.java b/samza-core/src/main/java/org/apache/samza/container/grouper/task/GroupByContainerCount.java
index 286ea1b..5e6ccf8 100644
--- a/samza-core/src/main/java/org/apache/samza/container/grouper/task/GroupByContainerCount.java
+++ b/samza-core/src/main/java/org/apache/samza/container/grouper/task/GroupByContainerCount.java
@@ -176,7 +176,6 @@ public class GroupByContainerCount implements BalancingTaskNameGrouper {
* @param taskAssignmentManager the {@link TaskAssignmentManager} that will be used to save the mappings.
*/
private void saveTaskAssignments(Set<ContainerModel> containers, TaskAssignmentManager taskAssignmentManager) {
- taskAssignmentManager.register(null);
for (ContainerModel container : containers) {
for (TaskName taskName : container.getTasks().keySet()) {
taskAssignmentManager.writeTaskContainerMapping(taskName.getTaskName(), container.getContainerId());
http://git-wip-us.apache.org/repos/asf/samza/blob/78a85047/samza-core/src/main/java/org/apache/samza/container/grouper/task/TaskAssignmentManager.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/container/grouper/task/TaskAssignmentManager.java b/samza-core/src/main/java/org/apache/samza/container/grouper/task/TaskAssignmentManager.java
index 0cbdec8..6473dfb 100644
--- a/samza-core/src/main/java/org/apache/samza/container/grouper/task/TaskAssignmentManager.java
+++ b/samza-core/src/main/java/org/apache/samza/container/grouper/task/TaskAssignmentManager.java
@@ -39,6 +39,7 @@ import org.slf4j.LoggerFactory;
public class TaskAssignmentManager extends AbstractCoordinatorStreamManager {
private static final Logger log = LoggerFactory.getLogger(TaskAssignmentManager.class);
private final Map<String, Integer> taskNameToContainerId = new HashMap<>();
+ private boolean registered = false;
/**
* Default constructor that creates a read-write manager
@@ -49,13 +50,18 @@ public class TaskAssignmentManager extends AbstractCoordinatorStreamManager {
public TaskAssignmentManager(CoordinatorStreamSystemProducer coordinatorStreamProducer,
CoordinatorStreamSystemConsumer coordinatorStreamConsumer) {
super(coordinatorStreamProducer, coordinatorStreamConsumer, "SamzaTaskAssignmentManager");
+ register(null);
}
@Override
public void register(TaskName taskName) {
- // taskName will not be used. This producer is global scope.
- registerCoordinatorStreamConsumer();
- registerCoordinatorStreamProducer(getSource());
+ if (!registered) {
+ // taskName will not be used. This producer is global scope.
+ registerCoordinatorStreamProducer(getSource());
+ // We don't register the consumer because we don't manage the consumer's
+ // lifecycle. Also, we don't need to set any properties on the consumer.
+ registered = true;
+ }
}
/**
http://git-wip-us.apache.org/repos/asf/samza/blob/78a85047/samza-core/src/test/java/org/apache/samza/container/grouper/task/TestTaskAssignmentManager.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/container/grouper/task/TestTaskAssignmentManager.java b/samza-core/src/test/java/org/apache/samza/container/grouper/task/TestTaskAssignmentManager.java
index 19ab78e..d813fdc 100644
--- a/samza-core/src/test/java/org/apache/samza/container/grouper/task/TestTaskAssignmentManager.java
+++ b/samza-core/src/test/java/org/apache/samza/container/grouper/task/TestTaskAssignmentManager.java
@@ -23,7 +23,6 @@ import java.util.HashMap;
import java.util.Map;
import org.apache.samza.config.Config;
import org.apache.samza.config.MapConfig;
-import org.apache.samza.container.TaskName;
import org.apache.samza.coordinator.stream.MockCoordinatorStreamSystemFactory;
import org.apache.samza.coordinator.stream.MockCoordinatorStreamSystemFactory.MockCoordinatorStreamSystemConsumer;
import org.apache.samza.coordinator.stream.MockCoordinatorStreamSystemFactory.MockCoordinatorStreamSystemProducer;
@@ -61,12 +60,11 @@ public class TestTaskAssignmentManager {
mockCoordinatorStreamSystemFactory.getCoordinatorStreamSystemProducer(config, null);
MockCoordinatorStreamSystemConsumer consumer =
mockCoordinatorStreamSystemFactory.getCoordinatorStreamSystemConsumer(config, null);
+ consumer.register();
TaskAssignmentManager taskAssignmentManager = new TaskAssignmentManager(producer, consumer);
- taskAssignmentManager.register(new TaskName("ignoredTaskName"));
assertTrue(producer.isRegistered());
assertEquals(producer.getRegisteredSource(), "SamzaTaskAssignmentManager");
- assertTrue(consumer.isRegistered());
taskAssignmentManager.start();
assertTrue(producer.isStarted());
@@ -101,12 +99,11 @@ public class TestTaskAssignmentManager {
mockCoordinatorStreamSystemFactory.getCoordinatorStreamSystemProducer(config, null);
MockCoordinatorStreamSystemConsumer consumer =
mockCoordinatorStreamSystemFactory.getCoordinatorStreamSystemConsumer(config, null);
+ consumer.register();
TaskAssignmentManager taskAssignmentManager = new TaskAssignmentManager(producer, consumer);
- taskAssignmentManager.register(new TaskName("ignoredTaskName"));
assertTrue(producer.isRegistered());
assertEquals(producer.getRegisteredSource(), "SamzaTaskAssignmentManager");
- assertTrue(consumer.isRegistered());
taskAssignmentManager.start();
assertTrue(producer.isStarted());
@@ -141,12 +138,11 @@ public class TestTaskAssignmentManager {
mockCoordinatorStreamSystemFactory.getCoordinatorStreamSystemProducer(config, null);
MockCoordinatorStreamSystemConsumer consumer =
mockCoordinatorStreamSystemFactory.getCoordinatorStreamSystemConsumer(config, null);
+ consumer.register();
TaskAssignmentManager taskAssignmentManager = new TaskAssignmentManager(producer, consumer);
- taskAssignmentManager.register(new TaskName("ignoredTaskName"));
assertTrue(producer.isRegistered());
assertEquals(producer.getRegisteredSource(), "SamzaTaskAssignmentManager");
- assertTrue(consumer.isRegistered());
taskAssignmentManager.start();
assertTrue(producer.isStarted());