You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by jm...@apache.org on 2017/06/26 20:43:43 UTC
samza git commit: SAMZA-1346: GroupByContainerCount.balance() should guard against null…
Repository: samza
Updated Branches:
refs/heads/master 4875842b3 -> fde243475
SAMZA-1346: GroupByContainerCount.balance() should guard against null…
… LocalityManager
Author: Jacob Maes <jm...@linkedin.com>
Reviewers: Chris Pettitt <cp...@linkedin.com>
Closes #232 from jmakes/samza-1346
Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/fde24347
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/fde24347
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/fde24347
Branch: refs/heads/master
Commit: fde2434751e79f89246cb93bfa6527a3b787d4f6
Parents: 4875842
Author: Jacob Maes <jm...@linkedin.com>
Authored: Mon Jun 26 13:43:33 2017 -0700
Committer: Jacob Maes <jm...@linkedin.com>
Committed: Mon Jun 26 13:43:33 2017 -0700
----------------------------------------------------------------------
.../grouper/task/GroupByContainerCount.java | 5 +
.../grouper/task/TestGroupByContainerCount.java | 120 +++++--------------
2 files changed, 37 insertions(+), 88 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/samza/blob/fde24347/samza-core/src/main/java/org/apache/samza/container/grouper/task/GroupByContainerCount.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/container/grouper/task/GroupByContainerCount.java b/samza-core/src/main/java/org/apache/samza/container/grouper/task/GroupByContainerCount.java
index 246188e..74c69d6 100644
--- a/samza-core/src/main/java/org/apache/samza/container/grouper/task/GroupByContainerCount.java
+++ b/samza-core/src/main/java/org/apache/samza/container/grouper/task/GroupByContainerCount.java
@@ -89,6 +89,11 @@ public class GroupByContainerCount implements BalancingTaskNameGrouper {
validateTasks(tasks);
+ if (localityManager == null) {
+ log.info("Locality manager is null. Cannot read or write task assignments. Invoking grouper.");
+ return group(tasks);
+ }
+
TaskAssignmentManager taskAssignmentManager = localityManager.getTaskAssignmentManager();
List<TaskGroup> containers = getPreviousContainers(taskAssignmentManager, tasks.size());
if (containers == null || containers.size() == 1 || containerCount == 1) {
http://git-wip-us.apache.org/repos/asf/samza/blob/fde24347/samza-core/src/test/java/org/apache/samza/container/grouper/task/TestGroupByContainerCount.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/container/grouper/task/TestGroupByContainerCount.java b/samza-core/src/test/java/org/apache/samza/container/grouper/task/TestGroupByContainerCount.java
index de4de7c..e89d673 100644
--- a/samza-core/src/test/java/org/apache/samza/container/grouper/task/TestGroupByContainerCount.java
+++ b/samza-core/src/test/java/org/apache/samza/container/grouper/task/TestGroupByContainerCount.java
@@ -651,30 +651,11 @@ public class TestGroupByContainerCount {
Map<String, String> prevTaskToContainerMapping = generateTaskContainerMapping(prevContainers);
when(taskAssignmentManager.readTaskAssignment()).thenReturn(prevTaskToContainerMapping);
- Set<ContainerModel> containers = new GroupByContainerCount(3).balance(taskModels, localityManager);
+ Set<ContainerModel> groupContainers = new GroupByContainerCount(3).group(taskModels);
+ Set<ContainerModel> balanceContainers = new GroupByContainerCount(3).balance(taskModels, localityManager);
// Results should be the same as calling group()
- Map<String, ContainerModel> containersMap = new HashMap<>();
- for (ContainerModel container : containers) {
- containersMap.put(container.getProcessorId(), container);
- }
- assertEquals(3, containers.size());
- ContainerModel container0 = containersMap.get("0");
- ContainerModel container1 = containersMap.get("1");
- ContainerModel container2 = containersMap.get("2");
- assertNotNull(container0);
- assertNotNull(container1);
- assertNotNull(container2);
- assertEquals("0", container0.getProcessorId());
- assertEquals("1", container1.getProcessorId());
- assertEquals("2", container2.getProcessorId());
- assertEquals(1, container0.getTasks().size());
- assertEquals(1, container1.getTasks().size());
- assertEquals(1, container2.getTasks().size());
-
- assertTrue(container0.getTasks().containsKey(getTaskName(0)));
- assertTrue(container1.getTasks().containsKey(getTaskName(1)));
- assertTrue(container2.getTasks().containsKey(getTaskName(2)));
+ assertEquals(groupContainers, balanceContainers);
// Verify task mappings are saved
verify(taskAssignmentManager).writeTaskContainerMapping(getTaskName(0).getTaskName(), "0");
@@ -691,23 +672,11 @@ public class TestGroupByContainerCount {
Map<String, String> prevTaskToContainerMapping = generateTaskContainerMapping(prevContainers);
when(taskAssignmentManager.readTaskAssignment()).thenReturn(prevTaskToContainerMapping);
- Set<ContainerModel> containers = new GroupByContainerCount(1).balance(taskModels, localityManager);
+ Set<ContainerModel> groupContainers = new GroupByContainerCount(1).group(taskModels);
+ Set<ContainerModel> balanceContainers = new GroupByContainerCount(1).balance(taskModels, localityManager);
- // Results should be the same as calling group
- Map<String, ContainerModel> containersMap = new HashMap<>();
- for (ContainerModel container : containers) {
- containersMap.put(container.getProcessorId(), container);
- }
-
- assertEquals(1, containers.size());
- ContainerModel container0 = containersMap.get("0");
- assertNotNull(container0);
- assertEquals("0", container0.getProcessorId());
- assertEquals(3, container0.getTasks().size());
-
- assertTrue(container0.getTasks().containsKey(getTaskName(0)));
- assertTrue(container0.getTasks().containsKey(getTaskName(1)));
- assertTrue(container0.getTasks().containsKey(getTaskName(2)));
+ // Results should be the same as calling group()
+ assertEquals(groupContainers, balanceContainers);
verify(taskAssignmentManager).writeTaskContainerMapping(getTaskName(0).getTaskName(), "0");
verify(taskAssignmentManager).writeTaskContainerMapping(getTaskName(1).getTaskName(), "0");
@@ -721,23 +690,11 @@ public class TestGroupByContainerCount {
Set<TaskModel> taskModels = generateTaskModels(3);
when(taskAssignmentManager.readTaskAssignment()).thenReturn(new HashMap<String, String>());
- Set<ContainerModel> containers = new GroupByContainerCount(1).balance(taskModels, localityManager);
-
- // Results should be the same as calling group
- Map<String, ContainerModel> containersMap = new HashMap<>();
- for (ContainerModel container : containers) {
- containersMap.put(container.getProcessorId(), container);
- }
-
- assertEquals(1, containers.size());
- ContainerModel container0 = containersMap.get("0");
- assertNotNull(container0);
- assertEquals("0", container0.getProcessorId());
- assertEquals(3, container0.getTasks().size());
+ Set<ContainerModel> groupContainers = new GroupByContainerCount(1).group(taskModels);
+ Set<ContainerModel> balanceContainers = new GroupByContainerCount(1).balance(taskModels, localityManager);
- assertTrue(container0.getTasks().containsKey(getTaskName(0)));
- assertTrue(container0.getTasks().containsKey(getTaskName(1)));
- assertTrue(container0.getTasks().containsKey(getTaskName(2)));
+ // Results should be the same as calling group()
+ assertEquals(groupContainers, balanceContainers);
verify(taskAssignmentManager).writeTaskContainerMapping(getTaskName(0).getTaskName(), "0");
verify(taskAssignmentManager).writeTaskContainerMapping(getTaskName(1).getTaskName(), "0");
@@ -750,27 +707,15 @@ public class TestGroupByContainerCount {
public void testGroupTaskCountIncrease() {
int taskCount = 3;
Set<TaskModel> taskModels = generateTaskModels(taskCount);
- Set<ContainerModel> prevContainers = new GroupByContainerCount(2).group(generateTaskModels(taskCount - 1));
+ Set<ContainerModel> prevContainers = new GroupByContainerCount(2).group(generateTaskModels(taskCount - 1)); // Here's the key step
Map<String, String> prevTaskToContainerMapping = generateTaskContainerMapping(prevContainers);
when(taskAssignmentManager.readTaskAssignment()).thenReturn(prevTaskToContainerMapping);
- Set<ContainerModel> containers = new GroupByContainerCount(1).balance(taskModels, localityManager);
-
- // Results should be the same as calling group
- Map<String, ContainerModel> containersMap = new HashMap<>();
- for (ContainerModel container : containers) {
- containersMap.put(container.getProcessorId(), container);
- }
+ Set<ContainerModel> groupContainers = new GroupByContainerCount(1).group(taskModels);
+ Set<ContainerModel> balanceContainers = new GroupByContainerCount(1).balance(taskModels, localityManager);
- assertEquals(1, containers.size());
- ContainerModel container0 = containersMap.get("0");
- assertNotNull(container0);
- assertEquals("0", container0.getProcessorId());
- assertEquals(3, container0.getTasks().size());
-
- assertTrue(container0.getTasks().containsKey(getTaskName(0)));
- assertTrue(container0.getTasks().containsKey(getTaskName(1)));
- assertTrue(container0.getTasks().containsKey(getTaskName(2)));
+ // Results should be the same as calling group()
+ assertEquals(groupContainers, balanceContainers);
verify(taskAssignmentManager).writeTaskContainerMapping(getTaskName(0).getTaskName(), "0");
verify(taskAssignmentManager).writeTaskContainerMapping(getTaskName(1).getTaskName(), "0");
@@ -783,27 +728,15 @@ public class TestGroupByContainerCount {
public void testGroupTaskCountDecrease() {
int taskCount = 3;
Set<TaskModel> taskModels = generateTaskModels(taskCount);
- Set<ContainerModel> prevContainers = new GroupByContainerCount(3).group(generateTaskModels(taskCount + 1));
+ Set<ContainerModel> prevContainers = new GroupByContainerCount(3).group(generateTaskModels(taskCount + 1)); // Here's the key step
Map<String, String> prevTaskToContainerMapping = generateTaskContainerMapping(prevContainers);
when(taskAssignmentManager.readTaskAssignment()).thenReturn(prevTaskToContainerMapping);
- Set<ContainerModel> containers = new GroupByContainerCount(1).balance(taskModels, localityManager);
+ Set<ContainerModel> groupContainers = new GroupByContainerCount(1).group(taskModels);
+ Set<ContainerModel> balanceContainers = new GroupByContainerCount(1).balance(taskModels, localityManager);
- // Results should be the same as calling group
- Map<String, ContainerModel> containersMap = new HashMap<>();
- for (ContainerModel container : containers) {
- containersMap.put(container.getProcessorId(), container);
- }
-
- assertEquals(1, containers.size());
- ContainerModel container0 = containersMap.get("0");
- assertNotNull(container0);
- assertEquals("0", container0.getProcessorId());
- assertEquals(3, container0.getTasks().size());
-
- assertTrue(container0.getTasks().containsKey(getTaskName(0)));
- assertTrue(container0.getTasks().containsKey(getTaskName(1)));
- assertTrue(container0.getTasks().containsKey(getTaskName(2)));
+ // Results should be the same as calling group()
+ assertEquals(groupContainers, balanceContainers);
verify(taskAssignmentManager).writeTaskContainerMapping(getTaskName(0).getTaskName(), "0");
verify(taskAssignmentManager).writeTaskContainerMapping(getTaskName(1).getTaskName(), "0");
@@ -857,4 +790,15 @@ public class TestGroupByContainerCount {
new GroupByContainerCount(3).balance(taskModels, localityManager); //Should throw
}
+
+ @Test
+ public void testBalancerWithNullLocalityManager() {
+ Set<TaskModel> taskModels = generateTaskModels(3);
+
+ Set<ContainerModel> groupContainers = new GroupByContainerCount(3).group(taskModels);
+ Set<ContainerModel> balanceContainers = new GroupByContainerCount(3).balance(taskModels, null);
+
+ // Results should be the same as calling group()
+ assertEquals(groupContainers, balanceContainers);
+ }
}