You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by na...@apache.org on 2016/05/06 19:40:04 UTC

samza git commit: SAMZA-946 - ConcurrentModificationException in TaskAssignmentManager when partition count changes.

Repository: samza
Updated Branches:
  refs/heads/master 5a673604a -> 9d3a68794


SAMZA-946 - ConcurrentModificationException in TaskAssignmentManager when partition count changes.


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/9d3a6879
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/9d3a6879
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/9d3a6879

Branch: refs/heads/master
Commit: 9d3a687948d8e16550a91d8a560c88a3ac5e9e50
Parents: 5a67360
Author: Jacob Maes <ja...@gmail.com>
Authored: Fri May 6 12:32:57 2016 -0700
Committer: Navina Ramesh <nr...@linkedin.com>
Committed: Fri May 6 12:32:57 2016 -0700

----------------------------------------------------------------------
 .../samza/config/DefaultChooserConfig.java      |  2 +-
 .../grouper/task/TaskAssignmentManager.java     | 21 +++-------
 .../grouper/task/TestTaskAssignmentManager.java | 40 ++++++++++++++++++++
 3 files changed, 46 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/9d3a6879/samza-core/src/main/java/org/apache/samza/config/DefaultChooserConfig.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/config/DefaultChooserConfig.java b/samza-core/src/main/java/org/apache/samza/config/DefaultChooserConfig.java
index d242d14..237c6f9 100644
--- a/samza-core/src/main/java/org/apache/samza/config/DefaultChooserConfig.java
+++ b/samza-core/src/main/java/org/apache/samza/config/DefaultChooserConfig.java
@@ -64,7 +64,7 @@ public class DefaultChooserConfig extends MapConfig {
 
   /**
    * Gets the priority of every SystemStream for which the priority
-   * was explicitly configured with a value &gt=0.
+   * was explicitly configured with a value &gt;=0.
    *
    * @return  the explicitly-configured stream priorities as a map from
    *          SystemStream to the configured priority value. Streams that

http://git-wip-us.apache.org/repos/asf/samza/blob/9d3a6879/samza-core/src/main/java/org/apache/samza/container/grouper/task/TaskAssignmentManager.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/container/grouper/task/TaskAssignmentManager.java b/samza-core/src/main/java/org/apache/samza/container/grouper/task/TaskAssignmentManager.java
index ec5cf3d..0cbdec8 100644
--- a/samza-core/src/main/java/org/apache/samza/container/grouper/task/TaskAssignmentManager.java
+++ b/samza-core/src/main/java/org/apache/samza/container/grouper/task/TaskAssignmentManager.java
@@ -38,7 +38,7 @@ import org.slf4j.LoggerFactory;
  * */
 public class TaskAssignmentManager extends AbstractCoordinatorStreamManager {
   private static final Logger log = LoggerFactory.getLogger(TaskAssignmentManager.class);
-  private Map<String, Integer> taskNameToContainerId = new HashMap<>();
+  private final Map<String, Integer> taskNameToContainerId = new HashMap<>();
 
   /**
    * Default constructor that creates a read-write manager
@@ -51,16 +51,6 @@ public class TaskAssignmentManager extends AbstractCoordinatorStreamManager {
     super(coordinatorStreamProducer, coordinatorStreamConsumer, "SamzaTaskAssignmentManager");
   }
 
-  /**
-   * Special constructor that creates a write-only {@link TaskAssignmentManager} that only writes
-   * to coordinator stream in SamzaContainer
-   *
-   * @param coordinatorStreamSystemProducer producer to the coordinator stream
-   */
-  public TaskAssignmentManager(CoordinatorStreamSystemProducer coordinatorStreamSystemProducer) {
-    this(coordinatorStreamSystemProducer, null);
-  }
-
   @Override
   public void register(TaskName taskName) {
     // taskName will not be used. This producer is global scope.
@@ -75,24 +65,23 @@ public class TaskAssignmentManager extends AbstractCoordinatorStreamManager {
    * @return the map of taskName: containerId
    */
   public Map<String, Integer> readTaskAssignment() {
-    Map<String, Integer> allMappings = new HashMap<>();
+    taskNameToContainerId.clear();
     for (CoordinatorStreamMessage message: getBootstrappedStream(SetTaskContainerMapping.TYPE)) {
       if (message.isDelete()) {
-        allMappings.remove(message.getKey());
+        taskNameToContainerId.remove(message.getKey());
         log.debug("Got TaskContainerMapping delete message: {}", message);
       } else {
         SetTaskContainerMapping mapping = new SetTaskContainerMapping(message);
-        allMappings.put(mapping.getKey(), mapping.getTaskAssignment());
+        taskNameToContainerId.put(mapping.getKey(), mapping.getTaskAssignment());
         log.debug("Got TaskContainerMapping message: {}", mapping);
       }
     }
-    taskNameToContainerId = allMappings;
 
     for (Map.Entry<String, Integer> entry : taskNameToContainerId.entrySet()) {
       log.debug("Assignment for task \"{}\": {}", entry.getKey(), entry.getValue());
     }
 
-    return Collections.unmodifiableMap(allMappings);
+    return Collections.unmodifiableMap(new HashMap<>(taskNameToContainerId));
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/samza/blob/9d3a6879/samza-core/src/test/java/org/apache/samza/container/grouper/task/TestTaskAssignmentManager.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/container/grouper/task/TestTaskAssignmentManager.java b/samza-core/src/test/java/org/apache/samza/container/grouper/task/TestTaskAssignmentManager.java
index 7f83494..19ab78e 100644
--- a/samza-core/src/test/java/org/apache/samza/container/grouper/task/TestTaskAssignmentManager.java
+++ b/samza-core/src/test/java/org/apache/samza/container/grouper/task/TestTaskAssignmentManager.java
@@ -96,6 +96,46 @@ public class TestTaskAssignmentManager {
     assertTrue(consumer.isStopped());
   }
 
+  @Test public void testDeleteMappings() throws Exception {
+    MockCoordinatorStreamSystemProducer producer =
+        mockCoordinatorStreamSystemFactory.getCoordinatorStreamSystemProducer(config, null);
+    MockCoordinatorStreamSystemConsumer consumer =
+        mockCoordinatorStreamSystemFactory.getCoordinatorStreamSystemConsumer(config, null);
+    TaskAssignmentManager taskAssignmentManager = new TaskAssignmentManager(producer, consumer);
+
+    taskAssignmentManager.register(new TaskName("ignoredTaskName"));
+    assertTrue(producer.isRegistered());
+    assertEquals(producer.getRegisteredSource(), "SamzaTaskAssignmentManager");
+    assertTrue(consumer.isRegistered());
+
+    taskAssignmentManager.start();
+    assertTrue(producer.isStarted());
+    assertTrue(consumer.isStarted());
+
+    Map<String, Integer> expectedMap =
+        new HashMap<String, Integer>() {
+          {
+            this.put("Task0", new Integer(0));
+            this.put("Task1", new Integer(1));
+          }
+        };
+
+    for (Map.Entry<String, Integer> entry : expectedMap.entrySet()) {
+      taskAssignmentManager.writeTaskContainerMapping(entry.getKey(), entry.getValue());
+    }
+
+    Map<String, Integer> localMap = taskAssignmentManager.readTaskAssignment();
+    assertEquals(expectedMap, localMap);
+
+    taskAssignmentManager.deleteTaskContainerMappings(localMap.keySet());
+    Map<String, Integer> deletedMap = taskAssignmentManager.readTaskAssignment();
+    assertTrue(deletedMap.isEmpty());
+
+    taskAssignmentManager.stop();
+    assertTrue(producer.isStopped());
+    assertTrue(consumer.isStopped());
+  }
+
   @Test public void testTaskAssignmentManagerEmptyCoordinatorStream() throws Exception {
     MockCoordinatorStreamSystemProducer producer =
         mockCoordinatorStreamSystemFactory.getCoordinatorStreamSystemProducer(config, null);