You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by jm...@apache.org on 2017/06/26 20:43:43 UTC

samza git commit: SAMZA-1346: GroupByContainerCount.balance() should guard against null…

Repository: samza
Updated Branches:
  refs/heads/master 4875842b3 -> fde243475


SAMZA-1346: GroupByContainerCount.balance() should guard against null…

… LocalityManager

Author: Jacob Maes <jm...@linkedin.com>

Reviewers: Chris Pettitt <cp...@linkedin.com>

Closes #232 from jmakes/samza-1346


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/fde24347
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/fde24347
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/fde24347

Branch: refs/heads/master
Commit: fde2434751e79f89246cb93bfa6527a3b787d4f6
Parents: 4875842
Author: Jacob Maes <jm...@linkedin.com>
Authored: Mon Jun 26 13:43:33 2017 -0700
Committer: Jacob Maes <jm...@linkedin.com>
Committed: Mon Jun 26 13:43:33 2017 -0700

----------------------------------------------------------------------
 .../grouper/task/GroupByContainerCount.java     |   5 +
 .../grouper/task/TestGroupByContainerCount.java | 120 +++++--------------
 2 files changed, 37 insertions(+), 88 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/fde24347/samza-core/src/main/java/org/apache/samza/container/grouper/task/GroupByContainerCount.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/container/grouper/task/GroupByContainerCount.java b/samza-core/src/main/java/org/apache/samza/container/grouper/task/GroupByContainerCount.java
index 246188e..74c69d6 100644
--- a/samza-core/src/main/java/org/apache/samza/container/grouper/task/GroupByContainerCount.java
+++ b/samza-core/src/main/java/org/apache/samza/container/grouper/task/GroupByContainerCount.java
@@ -89,6 +89,11 @@ public class GroupByContainerCount implements BalancingTaskNameGrouper {
 
     validateTasks(tasks);
 
+    if (localityManager == null) {
+      log.info("Locality manager is null. Cannot read or write task assignments. Invoking grouper.");
+      return group(tasks);
+    }
+
     TaskAssignmentManager taskAssignmentManager = localityManager.getTaskAssignmentManager();
     List<TaskGroup> containers = getPreviousContainers(taskAssignmentManager, tasks.size());
     if (containers == null || containers.size() == 1 || containerCount == 1) {

http://git-wip-us.apache.org/repos/asf/samza/blob/fde24347/samza-core/src/test/java/org/apache/samza/container/grouper/task/TestGroupByContainerCount.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/container/grouper/task/TestGroupByContainerCount.java b/samza-core/src/test/java/org/apache/samza/container/grouper/task/TestGroupByContainerCount.java
index de4de7c..e89d673 100644
--- a/samza-core/src/test/java/org/apache/samza/container/grouper/task/TestGroupByContainerCount.java
+++ b/samza-core/src/test/java/org/apache/samza/container/grouper/task/TestGroupByContainerCount.java
@@ -651,30 +651,11 @@ public class TestGroupByContainerCount {
     Map<String, String> prevTaskToContainerMapping = generateTaskContainerMapping(prevContainers);
     when(taskAssignmentManager.readTaskAssignment()).thenReturn(prevTaskToContainerMapping);
 
-    Set<ContainerModel> containers = new GroupByContainerCount(3).balance(taskModels, localityManager);
+    Set<ContainerModel> groupContainers = new GroupByContainerCount(3).group(taskModels);
+    Set<ContainerModel> balanceContainers = new GroupByContainerCount(3).balance(taskModels, localityManager);
 
     // Results should be the same as calling group()
-    Map<String, ContainerModel> containersMap = new HashMap<>();
-    for (ContainerModel container : containers) {
-      containersMap.put(container.getProcessorId(), container);
-    }
-    assertEquals(3, containers.size());
-    ContainerModel container0 = containersMap.get("0");
-    ContainerModel container1 = containersMap.get("1");
-    ContainerModel container2 = containersMap.get("2");
-    assertNotNull(container0);
-    assertNotNull(container1);
-    assertNotNull(container2);
-    assertEquals("0", container0.getProcessorId());
-    assertEquals("1", container1.getProcessorId());
-    assertEquals("2", container2.getProcessorId());
-    assertEquals(1, container0.getTasks().size());
-    assertEquals(1, container1.getTasks().size());
-    assertEquals(1, container2.getTasks().size());
-
-    assertTrue(container0.getTasks().containsKey(getTaskName(0)));
-    assertTrue(container1.getTasks().containsKey(getTaskName(1)));
-    assertTrue(container2.getTasks().containsKey(getTaskName(2)));
+    assertEquals(groupContainers, balanceContainers);
 
     // Verify task mappings are saved
     verify(taskAssignmentManager).writeTaskContainerMapping(getTaskName(0).getTaskName(), "0");
@@ -691,23 +672,11 @@ public class TestGroupByContainerCount {
     Map<String, String> prevTaskToContainerMapping = generateTaskContainerMapping(prevContainers);
     when(taskAssignmentManager.readTaskAssignment()).thenReturn(prevTaskToContainerMapping);
 
-    Set<ContainerModel> containers = new GroupByContainerCount(1).balance(taskModels, localityManager);
+    Set<ContainerModel> groupContainers = new GroupByContainerCount(1).group(taskModels);
+    Set<ContainerModel> balanceContainers = new GroupByContainerCount(1).balance(taskModels, localityManager);
 
-    // Results should be the same as calling group
-    Map<String, ContainerModel> containersMap = new HashMap<>();
-    for (ContainerModel container : containers) {
-      containersMap.put(container.getProcessorId(), container);
-    }
-
-    assertEquals(1, containers.size());
-    ContainerModel container0 = containersMap.get("0");
-    assertNotNull(container0);
-    assertEquals("0", container0.getProcessorId());
-    assertEquals(3, container0.getTasks().size());
-
-    assertTrue(container0.getTasks().containsKey(getTaskName(0)));
-    assertTrue(container0.getTasks().containsKey(getTaskName(1)));
-    assertTrue(container0.getTasks().containsKey(getTaskName(2)));
+    // Results should be the same as calling group()
+    assertEquals(groupContainers, balanceContainers);
 
     verify(taskAssignmentManager).writeTaskContainerMapping(getTaskName(0).getTaskName(), "0");
     verify(taskAssignmentManager).writeTaskContainerMapping(getTaskName(1).getTaskName(), "0");
@@ -721,23 +690,11 @@ public class TestGroupByContainerCount {
     Set<TaskModel> taskModels = generateTaskModels(3);
     when(taskAssignmentManager.readTaskAssignment()).thenReturn(new HashMap<String, String>());
 
-    Set<ContainerModel> containers = new GroupByContainerCount(1).balance(taskModels, localityManager);
-
-    // Results should be the same as calling group
-    Map<String, ContainerModel> containersMap = new HashMap<>();
-    for (ContainerModel container : containers) {
-      containersMap.put(container.getProcessorId(), container);
-    }
-
-    assertEquals(1, containers.size());
-    ContainerModel container0 = containersMap.get("0");
-    assertNotNull(container0);
-    assertEquals("0", container0.getProcessorId());
-    assertEquals(3, container0.getTasks().size());
+    Set<ContainerModel> groupContainers = new GroupByContainerCount(1).group(taskModels);
+    Set<ContainerModel> balanceContainers = new GroupByContainerCount(1).balance(taskModels, localityManager);
 
-    assertTrue(container0.getTasks().containsKey(getTaskName(0)));
-    assertTrue(container0.getTasks().containsKey(getTaskName(1)));
-    assertTrue(container0.getTasks().containsKey(getTaskName(2)));
+    // Results should be the same as calling group()
+    assertEquals(groupContainers, balanceContainers);
 
     verify(taskAssignmentManager).writeTaskContainerMapping(getTaskName(0).getTaskName(), "0");
     verify(taskAssignmentManager).writeTaskContainerMapping(getTaskName(1).getTaskName(), "0");
@@ -750,27 +707,15 @@ public class TestGroupByContainerCount {
   public void testGroupTaskCountIncrease() {
     int taskCount = 3;
     Set<TaskModel> taskModels = generateTaskModels(taskCount);
-    Set<ContainerModel> prevContainers = new GroupByContainerCount(2).group(generateTaskModels(taskCount - 1));
+    Set<ContainerModel> prevContainers = new GroupByContainerCount(2).group(generateTaskModels(taskCount - 1)); // Here's the key step
     Map<String, String> prevTaskToContainerMapping = generateTaskContainerMapping(prevContainers);
     when(taskAssignmentManager.readTaskAssignment()).thenReturn(prevTaskToContainerMapping);
 
-    Set<ContainerModel> containers = new GroupByContainerCount(1).balance(taskModels, localityManager);
-
-    // Results should be the same as calling group
-    Map<String, ContainerModel> containersMap = new HashMap<>();
-    for (ContainerModel container : containers) {
-      containersMap.put(container.getProcessorId(), container);
-    }
+    Set<ContainerModel> groupContainers = new GroupByContainerCount(1).group(taskModels);
+    Set<ContainerModel> balanceContainers = new GroupByContainerCount(1).balance(taskModels, localityManager);
 
-    assertEquals(1, containers.size());
-    ContainerModel container0 = containersMap.get("0");
-    assertNotNull(container0);
-    assertEquals("0", container0.getProcessorId());
-    assertEquals(3, container0.getTasks().size());
-
-    assertTrue(container0.getTasks().containsKey(getTaskName(0)));
-    assertTrue(container0.getTasks().containsKey(getTaskName(1)));
-    assertTrue(container0.getTasks().containsKey(getTaskName(2)));
+    // Results should be the same as calling group()
+    assertEquals(groupContainers, balanceContainers);
 
     verify(taskAssignmentManager).writeTaskContainerMapping(getTaskName(0).getTaskName(), "0");
     verify(taskAssignmentManager).writeTaskContainerMapping(getTaskName(1).getTaskName(), "0");
@@ -783,27 +728,15 @@ public class TestGroupByContainerCount {
   public void testGroupTaskCountDecrease() {
     int taskCount = 3;
     Set<TaskModel> taskModels = generateTaskModels(taskCount);
-    Set<ContainerModel> prevContainers = new GroupByContainerCount(3).group(generateTaskModels(taskCount + 1));
+    Set<ContainerModel> prevContainers = new GroupByContainerCount(3).group(generateTaskModels(taskCount + 1)); // Here's the key step
     Map<String, String> prevTaskToContainerMapping = generateTaskContainerMapping(prevContainers);
     when(taskAssignmentManager.readTaskAssignment()).thenReturn(prevTaskToContainerMapping);
 
-    Set<ContainerModel> containers = new GroupByContainerCount(1).balance(taskModels, localityManager);
+    Set<ContainerModel> groupContainers = new GroupByContainerCount(1).group(taskModels);
+    Set<ContainerModel> balanceContainers = new GroupByContainerCount(1).balance(taskModels, localityManager);
 
-    // Results should be the same as calling group
-    Map<String, ContainerModel> containersMap = new HashMap<>();
-    for (ContainerModel container : containers) {
-      containersMap.put(container.getProcessorId(), container);
-    }
-
-    assertEquals(1, containers.size());
-    ContainerModel container0 = containersMap.get("0");
-    assertNotNull(container0);
-    assertEquals("0", container0.getProcessorId());
-    assertEquals(3, container0.getTasks().size());
-
-    assertTrue(container0.getTasks().containsKey(getTaskName(0)));
-    assertTrue(container0.getTasks().containsKey(getTaskName(1)));
-    assertTrue(container0.getTasks().containsKey(getTaskName(2)));
+    // Results should be the same as calling group()
+    assertEquals(groupContainers, balanceContainers);
 
     verify(taskAssignmentManager).writeTaskContainerMapping(getTaskName(0).getTaskName(), "0");
     verify(taskAssignmentManager).writeTaskContainerMapping(getTaskName(1).getTaskName(), "0");
@@ -857,4 +790,15 @@ public class TestGroupByContainerCount {
     new GroupByContainerCount(3).balance(taskModels, localityManager); //Should throw
 
   }
+
+  @Test
+  public void testBalancerWithNullLocalityManager() {
+    Set<TaskModel> taskModels = generateTaskModels(3);
+
+    Set<ContainerModel> groupContainers = new GroupByContainerCount(3).group(taskModels);
+    Set<ContainerModel> balanceContainers = new GroupByContainerCount(3).balance(taskModels, null);
+
+    // Results should be the same as calling group()
+    assertEquals(groupContainers, balanceContainers);
+  }
 }