You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by na...@apache.org on 2016/05/06 19:40:04 UTC
samza git commit: SAMZA-946 - ConcurrentModificationException in
TaskAssignmentManager when partition count changes.
Repository: samza
Updated Branches:
refs/heads/master 5a673604a -> 9d3a68794
SAMZA-946 - ConcurrentModificationException in TaskAssignmentManager when partition count changes.
Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/9d3a6879
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/9d3a6879
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/9d3a6879
Branch: refs/heads/master
Commit: 9d3a687948d8e16550a91d8a560c88a3ac5e9e50
Parents: 5a67360
Author: Jacob Maes <ja...@gmail.com>
Authored: Fri May 6 12:32:57 2016 -0700
Committer: Navina Ramesh <nr...@linkedin.com>
Committed: Fri May 6 12:32:57 2016 -0700
----------------------------------------------------------------------
.../samza/config/DefaultChooserConfig.java | 2 +-
.../grouper/task/TaskAssignmentManager.java | 21 +++-------
.../grouper/task/TestTaskAssignmentManager.java | 40 ++++++++++++++++++++
3 files changed, 46 insertions(+), 17 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/samza/blob/9d3a6879/samza-core/src/main/java/org/apache/samza/config/DefaultChooserConfig.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/config/DefaultChooserConfig.java b/samza-core/src/main/java/org/apache/samza/config/DefaultChooserConfig.java
index d242d14..237c6f9 100644
--- a/samza-core/src/main/java/org/apache/samza/config/DefaultChooserConfig.java
+++ b/samza-core/src/main/java/org/apache/samza/config/DefaultChooserConfig.java
@@ -64,7 +64,7 @@ public class DefaultChooserConfig extends MapConfig {
/**
* Gets the priority of every SystemStream for which the priority
- * was explicitly configured with a value >=0.
+ * was explicitly configured with a value >=0.
*
* @return the explicitly-configured stream priorities as a map from
* SystemStream to the configured priority value. Streams that
http://git-wip-us.apache.org/repos/asf/samza/blob/9d3a6879/samza-core/src/main/java/org/apache/samza/container/grouper/task/TaskAssignmentManager.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/container/grouper/task/TaskAssignmentManager.java b/samza-core/src/main/java/org/apache/samza/container/grouper/task/TaskAssignmentManager.java
index ec5cf3d..0cbdec8 100644
--- a/samza-core/src/main/java/org/apache/samza/container/grouper/task/TaskAssignmentManager.java
+++ b/samza-core/src/main/java/org/apache/samza/container/grouper/task/TaskAssignmentManager.java
@@ -38,7 +38,7 @@ import org.slf4j.LoggerFactory;
* */
public class TaskAssignmentManager extends AbstractCoordinatorStreamManager {
private static final Logger log = LoggerFactory.getLogger(TaskAssignmentManager.class);
- private Map<String, Integer> taskNameToContainerId = new HashMap<>();
+ private final Map<String, Integer> taskNameToContainerId = new HashMap<>();
/**
* Default constructor that creates a read-write manager
@@ -51,16 +51,6 @@ public class TaskAssignmentManager extends AbstractCoordinatorStreamManager {
super(coordinatorStreamProducer, coordinatorStreamConsumer, "SamzaTaskAssignmentManager");
}
- /**
- * Special constructor that creates a write-only {@link TaskAssignmentManager} that only writes
- * to coordinator stream in SamzaContainer
- *
- * @param coordinatorStreamSystemProducer producer to the coordinator stream
- */
- public TaskAssignmentManager(CoordinatorStreamSystemProducer coordinatorStreamSystemProducer) {
- this(coordinatorStreamSystemProducer, null);
- }
-
@Override
public void register(TaskName taskName) {
// taskName will not be used. This producer is global scope.
@@ -75,24 +65,23 @@ public class TaskAssignmentManager extends AbstractCoordinatorStreamManager {
* @return the map of taskName: containerId
*/
public Map<String, Integer> readTaskAssignment() {
- Map<String, Integer> allMappings = new HashMap<>();
+ taskNameToContainerId.clear();
for (CoordinatorStreamMessage message: getBootstrappedStream(SetTaskContainerMapping.TYPE)) {
if (message.isDelete()) {
- allMappings.remove(message.getKey());
+ taskNameToContainerId.remove(message.getKey());
log.debug("Got TaskContainerMapping delete message: {}", message);
} else {
SetTaskContainerMapping mapping = new SetTaskContainerMapping(message);
- allMappings.put(mapping.getKey(), mapping.getTaskAssignment());
+ taskNameToContainerId.put(mapping.getKey(), mapping.getTaskAssignment());
log.debug("Got TaskContainerMapping message: {}", mapping);
}
}
- taskNameToContainerId = allMappings;
for (Map.Entry<String, Integer> entry : taskNameToContainerId.entrySet()) {
log.debug("Assignment for task \"{}\": {}", entry.getKey(), entry.getValue());
}
- return Collections.unmodifiableMap(allMappings);
+ return Collections.unmodifiableMap(new HashMap<>(taskNameToContainerId));
}
/**
http://git-wip-us.apache.org/repos/asf/samza/blob/9d3a6879/samza-core/src/test/java/org/apache/samza/container/grouper/task/TestTaskAssignmentManager.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/container/grouper/task/TestTaskAssignmentManager.java b/samza-core/src/test/java/org/apache/samza/container/grouper/task/TestTaskAssignmentManager.java
index 7f83494..19ab78e 100644
--- a/samza-core/src/test/java/org/apache/samza/container/grouper/task/TestTaskAssignmentManager.java
+++ b/samza-core/src/test/java/org/apache/samza/container/grouper/task/TestTaskAssignmentManager.java
@@ -96,6 +96,46 @@ public class TestTaskAssignmentManager {
assertTrue(consumer.isStopped());
}
+ @Test public void testDeleteMappings() throws Exception {
+ MockCoordinatorStreamSystemProducer producer =
+ mockCoordinatorStreamSystemFactory.getCoordinatorStreamSystemProducer(config, null);
+ MockCoordinatorStreamSystemConsumer consumer =
+ mockCoordinatorStreamSystemFactory.getCoordinatorStreamSystemConsumer(config, null);
+ TaskAssignmentManager taskAssignmentManager = new TaskAssignmentManager(producer, consumer);
+
+ taskAssignmentManager.register(new TaskName("ignoredTaskName"));
+ assertTrue(producer.isRegistered());
+ assertEquals(producer.getRegisteredSource(), "SamzaTaskAssignmentManager");
+ assertTrue(consumer.isRegistered());
+
+ taskAssignmentManager.start();
+ assertTrue(producer.isStarted());
+ assertTrue(consumer.isStarted());
+
+ Map<String, Integer> expectedMap =
+ new HashMap<String, Integer>() {
+ {
+ this.put("Task0", new Integer(0));
+ this.put("Task1", new Integer(1));
+ }
+ };
+
+ for (Map.Entry<String, Integer> entry : expectedMap.entrySet()) {
+ taskAssignmentManager.writeTaskContainerMapping(entry.getKey(), entry.getValue());
+ }
+
+ Map<String, Integer> localMap = taskAssignmentManager.readTaskAssignment();
+ assertEquals(expectedMap, localMap);
+
+ taskAssignmentManager.deleteTaskContainerMappings(localMap.keySet());
+ Map<String, Integer> deletedMap = taskAssignmentManager.readTaskAssignment();
+ assertTrue(deletedMap.isEmpty());
+
+ taskAssignmentManager.stop();
+ assertTrue(producer.isStopped());
+ assertTrue(consumer.isStopped());
+ }
+
@Test public void testTaskAssignmentManagerEmptyCoordinatorStream() throws Exception {
MockCoordinatorStreamSystemProducer producer =
mockCoordinatorStreamSystemFactory.getCoordinatorStreamSystemProducer(config, null);