You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by pm...@apache.org on 2018/10/05 17:58:41 UTC
samza git commit: SAMZA-1933: Fix NPE in LocalityManager.
Repository: samza
Updated Branches:
refs/heads/master d48cf5f74 -> b101ff91a
SAMZA-1933: Fix NPE in LocalityManager.
Author: Shanthoosh Venkataraman <sp...@usc.edu>
Reviewers: Prateek Maheshwari <pm...@apache.org>
Closes #684 from shanthoosh/fix_NPE_in_task_assignment_manager
Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/b101ff91
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/b101ff91
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/b101ff91
Branch: refs/heads/master
Commit: b101ff91a8f9dae77fb64b846f150bcf8154e598
Parents: d48cf5f
Author: Shanthoosh Venkataraman <sp...@usc.edu>
Authored: Fri Oct 5 10:58:39 2018 -0700
Committer: Prateek Maheshwari <pm...@apache.org>
Committed: Fri Oct 5 10:58:39 2018 -0700
----------------------------------------------------------------------
.../apache/samza/container/LocalityManager.java | 8 --
.../grouper/task/GroupByContainerCount.java | 88 ++++++++-------
.../task/GroupByContainerCountFactory.java | 4 +-
.../grouper/task/TaskAssignmentManager.java | 2 +-
.../grouper/task/TestGroupByContainerCount.java | 106 +++++++++++--------
.../grouper/task/TestGroupByContainerIds.java | 12 ++-
.../grouper/task/TestTaskAssignmentManager.java | 12 ++-
.../samza/coordinator/TestJobModelManager.java | 11 +-
.../samza/rest/proxy/task/SamzaTaskProxy.java | 4 +-
.../webapp/TestApplicationMasterRestClient.java | 5 +-
10 files changed, 144 insertions(+), 108 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/samza/blob/b101ff91/samza-core/src/main/java/org/apache/samza/container/LocalityManager.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/container/LocalityManager.java b/samza-core/src/main/java/org/apache/samza/container/LocalityManager.java
index 20e86d9..63483b7 100644
--- a/samza-core/src/main/java/org/apache/samza/container/LocalityManager.java
+++ b/samza-core/src/main/java/org/apache/samza/container/LocalityManager.java
@@ -25,7 +25,6 @@ import java.util.HashMap;
import java.util.Map;
import org.apache.samza.config.Config;
import org.apache.samza.config.JobConfig;
-import org.apache.samza.container.grouper.task.TaskAssignmentManager;
import org.apache.samza.coordinator.stream.CoordinatorStreamKeySerde;
import org.apache.samza.coordinator.stream.CoordinatorStreamValueSerde;
import org.apache.samza.coordinator.stream.messages.SetContainerHostMapping;
@@ -48,7 +47,6 @@ public class LocalityManager {
private final Serde<String> keySerde;
private final Serde<String> valueSerde;
private final MetadataStore metadataStore;
- private final TaskAssignmentManager taskAssignmentManager;
/**
* Builds the LocalityManager based upon {@link Config} and {@link MetricsRegistry}.
@@ -81,7 +79,6 @@ public class LocalityManager {
this.metadataStore.init();
this.keySerde = keySerde;
this.valueSerde = valueSerde;
- this.taskAssignmentManager = new TaskAssignmentManager(config, metricsRegistry, keySerde, valueSerde);
}
/**
@@ -128,10 +125,5 @@ public class LocalityManager {
public void close() {
metadataStore.close();
- taskAssignmentManager.close();
- }
-
- public TaskAssignmentManager getTaskAssignmentManager() {
- return taskAssignmentManager;
}
}
http://git-wip-us.apache.org/repos/asf/samza/blob/b101ff91/samza-core/src/main/java/org/apache/samza/container/grouper/task/GroupByContainerCount.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/container/grouper/task/GroupByContainerCount.java b/samza-core/src/main/java/org/apache/samza/container/grouper/task/GroupByContainerCount.java
index b4d6c90..759f82e 100644
--- a/samza-core/src/main/java/org/apache/samza/container/grouper/task/GroupByContainerCount.java
+++ b/samza-core/src/main/java/org/apache/samza/container/grouper/task/GroupByContainerCount.java
@@ -29,10 +29,13 @@ import java.util.Map;
import java.util.Set;
import org.apache.samza.SamzaException;
+import org.apache.samza.config.Config;
+import org.apache.samza.config.JobConfig;
import org.apache.samza.container.LocalityManager;
import org.apache.samza.container.TaskName;
import org.apache.samza.job.model.ContainerModel;
import org.apache.samza.job.model.TaskModel;
+import org.apache.samza.metrics.MetricsRegistryMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -50,10 +53,12 @@ import org.slf4j.LoggerFactory;
public class GroupByContainerCount implements BalancingTaskNameGrouper {
private static final Logger log = LoggerFactory.getLogger(GroupByContainerCount.class);
private final int containerCount;
+ private final Config config;
- public GroupByContainerCount(int containerCount) {
+ public GroupByContainerCount(Config config) {
+ this.containerCount = new JobConfig(config).getContainerCount();
+ this.config = config;
if (containerCount <= 0) throw new IllegalArgumentException("Must have at least one container");
- this.containerCount = containerCount;
}
@Override
@@ -94,51 +99,56 @@ public class GroupByContainerCount implements BalancingTaskNameGrouper {
return group(tasks);
}
- TaskAssignmentManager taskAssignmentManager = localityManager.getTaskAssignmentManager();
- List<TaskGroup> containers = getPreviousContainers(taskAssignmentManager, tasks.size());
- if (containers == null || containers.size() == 1 || containerCount == 1) {
- log.info("Balancing does not apply. Invoking grouper.");
- Set<ContainerModel> models = group(tasks);
- saveTaskAssignments(models, taskAssignmentManager);
- return models;
- }
+ TaskAssignmentManager taskAssignmentManager = new TaskAssignmentManager(config, new MetricsRegistryMap());
+ taskAssignmentManager.init();
+ try {
+ List<TaskGroup> containers = getPreviousContainers(taskAssignmentManager, tasks.size());
+ if (containers == null || containers.size() == 1 || containerCount == 1) {
+ log.info("Balancing does not apply. Invoking grouper.");
+ Set<ContainerModel> models = group(tasks);
+ saveTaskAssignments(models, taskAssignmentManager);
+ return models;
+ }
- int prevContainerCount = containers.size();
- int containerDelta = containerCount - prevContainerCount;
- if (containerDelta == 0) {
- log.info("Container count has not changed. Reusing previous container models.");
- return buildContainerModels(tasks, containers);
- }
- log.info("Container count changed from {} to {}. Balancing tasks.", prevContainerCount, containerCount);
+ int prevContainerCount = containers.size();
+ int containerDelta = containerCount - prevContainerCount;
+ if (containerDelta == 0) {
+ log.info("Container count has not changed. Reusing previous container models.");
+ return buildContainerModels(tasks, containers);
+ }
+ log.info("Container count changed from {} to {}. Balancing tasks.", prevContainerCount, containerCount);
- // Calculate the expected task count per container
- int[] expectedTaskCountPerContainer = calculateTaskCountPerContainer(tasks.size(), prevContainerCount, containerCount);
+ // Calculate the expected task count per container
+ int[] expectedTaskCountPerContainer = calculateTaskCountPerContainer(tasks.size(), prevContainerCount, containerCount);
- // Collect excess tasks from over-assigned containers
- List<String> taskNamesToReassign = new LinkedList<>();
- for (int i = 0; i < prevContainerCount; i++) {
- TaskGroup taskGroup = containers.get(i);
- while (taskGroup.size() > expectedTaskCountPerContainer[i]) {
- taskNamesToReassign.add(taskGroup.removeTask());
+ // Collect excess tasks from over-assigned containers
+ List<String> taskNamesToReassign = new LinkedList<>();
+ for (int i = 0; i < prevContainerCount; i++) {
+ TaskGroup taskGroup = containers.get(i);
+ while (taskGroup.size() > expectedTaskCountPerContainer[i]) {
+ taskNamesToReassign.add(taskGroup.removeTask());
+ }
}
- }
- // Assign tasks to the under-assigned containers
- if (containerDelta > 0) {
- List<TaskGroup> newContainers = createContainers(prevContainerCount, containerCount);
- containers.addAll(newContainers);
- } else {
- containers = containers.subList(0, containerCount);
- }
- assignTasksToContainers(expectedTaskCountPerContainer, taskNamesToReassign, containers);
+ // Assign tasks to the under-assigned containers
+ if (containerDelta > 0) {
+ List<TaskGroup> newContainers = createContainers(prevContainerCount, containerCount);
+ containers.addAll(newContainers);
+ } else {
+ containers = containers.subList(0, containerCount);
+ }
+ assignTasksToContainers(expectedTaskCountPerContainer, taskNamesToReassign, containers);
- // Transform containers to containerModel
- Set<ContainerModel> models = buildContainerModels(tasks, containers);
+ // Transform containers to containerModel
+ Set<ContainerModel> models = buildContainerModels(tasks, containers);
- // Save the results
- saveTaskAssignments(models, taskAssignmentManager);
+ // Save the results
+ saveTaskAssignments(models, taskAssignmentManager);
- return models;
+ return models;
+ } finally {
+ taskAssignmentManager.close();
+ }
}
/**
http://git-wip-us.apache.org/repos/asf/samza/blob/b101ff91/samza-core/src/main/java/org/apache/samza/container/grouper/task/GroupByContainerCountFactory.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/container/grouper/task/GroupByContainerCountFactory.java b/samza-core/src/main/java/org/apache/samza/container/grouper/task/GroupByContainerCountFactory.java
index f0e9686..06aba33 100644
--- a/samza-core/src/main/java/org/apache/samza/container/grouper/task/GroupByContainerCountFactory.java
+++ b/samza-core/src/main/java/org/apache/samza/container/grouper/task/GroupByContainerCountFactory.java
@@ -19,8 +19,6 @@
package org.apache.samza.container.grouper.task;
import org.apache.samza.config.Config;
-import org.apache.samza.config.JobConfig;
-
/**
* Factory to build the GroupByContainerCount class.
@@ -28,6 +26,6 @@ import org.apache.samza.config.JobConfig;
public class GroupByContainerCountFactory implements TaskNameGrouperFactory {
@Override
public TaskNameGrouper build(Config config) {
- return new GroupByContainerCount(new JobConfig(config).getContainerCount());
+ return new GroupByContainerCount(config);
}
}
http://git-wip-us.apache.org/repos/asf/samza/blob/b101ff91/samza-core/src/main/java/org/apache/samza/container/grouper/task/TaskAssignmentManager.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/container/grouper/task/TaskAssignmentManager.java b/samza-core/src/main/java/org/apache/samza/container/grouper/task/TaskAssignmentManager.java
index 2bfd4c3..0ada91c 100644
--- a/samza-core/src/main/java/org/apache/samza/container/grouper/task/TaskAssignmentManager.java
+++ b/samza-core/src/main/java/org/apache/samza/container/grouper/task/TaskAssignmentManager.java
@@ -83,7 +83,7 @@ public class TaskAssignmentManager {
this.metadataStore = metadataStoreFactory.getMetadataStore(SetTaskContainerMapping.TYPE, config, metricsRegistry);
}
- public void init(Config config, MetricsRegistry metricsRegistry) {
+ public void init() {
this.metadataStore.init();
}
http://git-wip-us.apache.org/repos/asf/samza/blob/b101ff91/samza-core/src/test/java/org/apache/samza/container/grouper/task/TestGroupByContainerCount.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/container/grouper/task/TestGroupByContainerCount.java b/samza-core/src/test/java/org/apache/samza/container/grouper/task/TestGroupByContainerCount.java
index 8d2d394..0c2f2fb 100644
--- a/samza-core/src/test/java/org/apache/samza/container/grouper/task/TestGroupByContainerCount.java
+++ b/samza-core/src/test/java/org/apache/samza/container/grouper/task/TestGroupByContainerCount.java
@@ -25,42 +25,53 @@ import java.util.Map;
import java.util.Set;
import java.util.UUID;
import org.apache.samza.SamzaException;
+import org.apache.samza.config.Config;
+import org.apache.samza.config.JobConfig;
+import org.apache.samza.config.MapConfig;
import org.apache.samza.container.LocalityManager;
import org.apache.samza.job.model.ContainerModel;
import org.apache.samza.job.model.TaskModel;
import org.junit.Before;
import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mockito;
+import org.powermock.api.mockito.PowerMockito;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
import static org.apache.samza.container.mock.ContainerMocks.*;
import static org.junit.Assert.*;
import static org.mockito.Mockito.*;
+@RunWith(PowerMockRunner.class)
+@PrepareForTest({TaskAssignmentManager.class, GroupByContainerCount.class})
public class TestGroupByContainerCount {
private TaskAssignmentManager taskAssignmentManager;
private LocalityManager localityManager;
@Before
- public void setup() {
+ public void setup() throws Exception {
taskAssignmentManager = mock(TaskAssignmentManager.class);
localityManager = mock(LocalityManager.class);
- when(localityManager.getTaskAssignmentManager()).thenReturn(taskAssignmentManager);
+ PowerMockito.whenNew(TaskAssignmentManager.class).withAnyArguments().thenReturn(taskAssignmentManager);
+ Mockito.doNothing().when(taskAssignmentManager).init();
}
@Test(expected = IllegalArgumentException.class)
public void testGroupEmptyTasks() {
- new GroupByContainerCount(1).group(new HashSet());
+ new GroupByContainerCount(getConfig(1)).group(new HashSet());
}
@Test(expected = IllegalArgumentException.class)
public void testGroupFewerTasksThanContainers() {
Set<TaskModel> taskModels = new HashSet<>();
taskModels.add(getTaskModel(1));
- new GroupByContainerCount(2).group(taskModels);
+ new GroupByContainerCount(getConfig(2)).group(taskModels);
}
@Test(expected = UnsupportedOperationException.class)
public void testGrouperResultImmutable() {
Set<TaskModel> taskModels = generateTaskModels(3);
- Set<ContainerModel> containers = new GroupByContainerCount(3).group(taskModels);
+ Set<ContainerModel> containers = new GroupByContainerCount(getConfig(3)).group(taskModels);
containers.remove(containers.iterator().next());
}
@@ -68,7 +79,7 @@ public class TestGroupByContainerCount {
public void testGroupHappyPath() {
Set<TaskModel> taskModels = generateTaskModels(5);
- Set<ContainerModel> containers = new GroupByContainerCount(2).group(taskModels);
+ Set<ContainerModel> containers = new GroupByContainerCount(getConfig(2)).group(taskModels);
Map<String, ContainerModel> containersMap = new HashMap<>();
for (ContainerModel container : containers) {
@@ -95,7 +106,7 @@ public class TestGroupByContainerCount {
public void testGroupManyTasks() {
Set<TaskModel> taskModels = generateTaskModels(21);
- Set<ContainerModel> containers = new GroupByContainerCount(2).group(taskModels);
+ Set<ContainerModel> containers = new GroupByContainerCount(getConfig(2)).group(taskModels);
Map<String, ContainerModel> containersMap = new HashMap<>();
for (ContainerModel container : containers) {
@@ -163,11 +174,11 @@ public class TestGroupByContainerCount {
@Test
public void testBalancerAfterContainerIncrease() {
Set<TaskModel> taskModels = generateTaskModels(9);
- Set<ContainerModel> prevContainers = new GroupByContainerCount(2).group(taskModels);
+ Set<ContainerModel> prevContainers = new GroupByContainerCount(getConfig(2)).group(taskModels);
Map<String, String> prevTaskToContainerMapping = generateTaskContainerMapping(prevContainers);
when(taskAssignmentManager.readTaskAssignment()).thenReturn(prevTaskToContainerMapping);
- Set<ContainerModel> containers = new GroupByContainerCount(4).balance(taskModels, localityManager);
+ Set<ContainerModel> containers = new GroupByContainerCount(getConfig(4)).balance(taskModels, localityManager);
Map<String, ContainerModel> containersMap = new HashMap<>();
for (ContainerModel container : containers) {
@@ -245,11 +256,11 @@ public class TestGroupByContainerCount {
@Test
public void testBalancerAfterContainerDecrease() {
Set<TaskModel> taskModels = generateTaskModels(9);
- Set<ContainerModel> prevContainers = new GroupByContainerCount(4).group(taskModels);
+ Set<ContainerModel> prevContainers = new GroupByContainerCount(getConfig(4)).group(taskModels);
Map<String, String> prevTaskToContainerMapping = generateTaskContainerMapping(prevContainers);
when(taskAssignmentManager.readTaskAssignment()).thenReturn(prevTaskToContainerMapping);
- Set<ContainerModel> containers = new GroupByContainerCount(2).balance(taskModels, localityManager);
+ Set<ContainerModel> containers = new GroupByContainerCount(getConfig(2)).balance(taskModels, localityManager);
Map<String, ContainerModel> containersMap = new HashMap<>();
for (ContainerModel container : containers) {
@@ -320,15 +331,15 @@ public class TestGroupByContainerCount {
* T8 T7 T3
*/
@Test
- public void testBalancerMultipleReblances() {
+ public void testBalancerMultipleReblances() throws Exception {
// Before
Set<TaskModel> taskModels = generateTaskModels(9);
- Set<ContainerModel> prevContainers = new GroupByContainerCount(4).group(taskModels);
+ Set<ContainerModel> prevContainers = new GroupByContainerCount(getConfig(4)).group(taskModels);
Map<String, String> prevTaskToContainerMapping = generateTaskContainerMapping(prevContainers);
when(taskAssignmentManager.readTaskAssignment()).thenReturn(prevTaskToContainerMapping);
// First balance
- Set<ContainerModel> containers = new GroupByContainerCount(2).balance(taskModels, localityManager);
+ Set<ContainerModel> containers = new GroupByContainerCount(getConfig(2)).balance(taskModels, localityManager);
Map<String, ContainerModel> containersMap = new HashMap<>();
for (ContainerModel container : containers) {
@@ -380,9 +391,9 @@ public class TestGroupByContainerCount {
TaskAssignmentManager taskAssignmentManager2 = mock(TaskAssignmentManager.class);
when(taskAssignmentManager2.readTaskAssignment()).thenReturn(prevTaskToContainerMapping);
LocalityManager localityManager2 = mock(LocalityManager.class);
- when(localityManager2.getTaskAssignmentManager()).thenReturn(taskAssignmentManager2);
+ PowerMockito.whenNew(TaskAssignmentManager.class).withAnyArguments().thenReturn(taskAssignmentManager2);
- containers = new GroupByContainerCount(3).balance(taskModels, localityManager2);
+ containers = new GroupByContainerCount(getConfig(3)).balance(taskModels, localityManager2);
containersMap = new HashMap<>();
for (ContainerModel container : containers) {
@@ -455,11 +466,11 @@ public class TestGroupByContainerCount {
@Test
public void testBalancerAfterContainerSame() {
Set<TaskModel> taskModels = generateTaskModels(9);
- Set<ContainerModel> prevContainers = new GroupByContainerCount(2).group(taskModels);
+ Set<ContainerModel> prevContainers = new GroupByContainerCount(getConfig(2)).group(taskModels);
Map<String, String> prevTaskToContainerMapping = generateTaskContainerMapping(prevContainers);
when(taskAssignmentManager.readTaskAssignment()).thenReturn(prevTaskToContainerMapping);
- Set<ContainerModel> containers = new GroupByContainerCount(2).balance(taskModels, localityManager);
+ Set<ContainerModel> containers = new GroupByContainerCount(getConfig(2)).balance(taskModels, localityManager);
Map<String, ContainerModel> containersMap = new HashMap<>();
for (ContainerModel container : containers) {
@@ -529,7 +540,7 @@ public class TestGroupByContainerCount {
prevTaskToContainerMapping.put(getTaskName(8).getTaskName(), "1");
when(taskAssignmentManager.readTaskAssignment()).thenReturn(prevTaskToContainerMapping);
- Set<ContainerModel> containers = new GroupByContainerCount(2).balance(taskModels, localityManager);
+ Set<ContainerModel> containers = new GroupByContainerCount(getConfig(2)).balance(taskModels, localityManager);
Map<String, ContainerModel> containersMap = new HashMap<>();
for (ContainerModel container : containers) {
@@ -595,7 +606,7 @@ public class TestGroupByContainerCount {
prevTaskToContainerMapping.put(getTaskName(5).getTaskName(), "1");
when(taskAssignmentManager.readTaskAssignment()).thenReturn(prevTaskToContainerMapping);
- Set<ContainerModel> containers = new GroupByContainerCount(3).balance(taskModels, localityManager);
+ Set<ContainerModel> containers = new GroupByContainerCount(getConfig(3)).balance(taskModels, localityManager);
Map<String, ContainerModel> containersMap = new HashMap<>();
for (ContainerModel container : containers) {
@@ -636,12 +647,12 @@ public class TestGroupByContainerCount {
@Test
public void testBalancerOldContainerCountOne() {
Set<TaskModel> taskModels = generateTaskModels(3);
- Set<ContainerModel> prevContainers = new GroupByContainerCount(1).group(taskModels);
+ Set<ContainerModel> prevContainers = new GroupByContainerCount(getConfig(1)).group(taskModels);
Map<String, String> prevTaskToContainerMapping = generateTaskContainerMapping(prevContainers);
when(taskAssignmentManager.readTaskAssignment()).thenReturn(prevTaskToContainerMapping);
- Set<ContainerModel> groupContainers = new GroupByContainerCount(3).group(taskModels);
- Set<ContainerModel> balanceContainers = new GroupByContainerCount(3).balance(taskModels, localityManager);
+ Set<ContainerModel> groupContainers = new GroupByContainerCount(getConfig(3)).group(taskModels);
+ Set<ContainerModel> balanceContainers = new GroupByContainerCount(getConfig(3)).balance(taskModels, localityManager);
// Results should be the same as calling group()
assertEquals(groupContainers, balanceContainers);
@@ -657,12 +668,12 @@ public class TestGroupByContainerCount {
@Test
public void testBalancerNewContainerCountOne() {
Set<TaskModel> taskModels = generateTaskModels(3);
- Set<ContainerModel> prevContainers = new GroupByContainerCount(3).group(taskModels);
+ Set<ContainerModel> prevContainers = new GroupByContainerCount(getConfig(3)).group(taskModels);
Map<String, String> prevTaskToContainerMapping = generateTaskContainerMapping(prevContainers);
when(taskAssignmentManager.readTaskAssignment()).thenReturn(prevTaskToContainerMapping);
- Set<ContainerModel> groupContainers = new GroupByContainerCount(1).group(taskModels);
- Set<ContainerModel> balanceContainers = new GroupByContainerCount(1).balance(taskModels, localityManager);
+ Set<ContainerModel> groupContainers = new GroupByContainerCount(getConfig(1)).group(taskModels);
+ Set<ContainerModel> balanceContainers = new GroupByContainerCount(getConfig(1)).balance(taskModels, localityManager);
// Results should be the same as calling group()
assertEquals(groupContainers, balanceContainers);
@@ -677,10 +688,10 @@ public class TestGroupByContainerCount {
@Test
public void testBalancerEmptyTaskMapping() {
Set<TaskModel> taskModels = generateTaskModels(3);
- when(taskAssignmentManager.readTaskAssignment()).thenReturn(new HashMap<String, String>());
+ when(taskAssignmentManager.readTaskAssignment()).thenReturn(new HashMap<>());
- Set<ContainerModel> groupContainers = new GroupByContainerCount(1).group(taskModels);
- Set<ContainerModel> balanceContainers = new GroupByContainerCount(1).balance(taskModels, localityManager);
+ Set<ContainerModel> groupContainers = new GroupByContainerCount(getConfig(1)).group(taskModels);
+ Set<ContainerModel> balanceContainers = new GroupByContainerCount(getConfig(1)).balance(taskModels, localityManager);
// Results should be the same as calling group()
assertEquals(groupContainers, balanceContainers);
@@ -696,12 +707,12 @@ public class TestGroupByContainerCount {
public void testGroupTaskCountIncrease() {
int taskCount = 3;
Set<TaskModel> taskModels = generateTaskModels(taskCount);
- Set<ContainerModel> prevContainers = new GroupByContainerCount(2).group(generateTaskModels(taskCount - 1)); // Here's the key step
+ Set<ContainerModel> prevContainers = new GroupByContainerCount(getConfig(2)).group(generateTaskModels(taskCount - 1)); // Here's the key step
Map<String, String> prevTaskToContainerMapping = generateTaskContainerMapping(prevContainers);
when(taskAssignmentManager.readTaskAssignment()).thenReturn(prevTaskToContainerMapping);
- Set<ContainerModel> groupContainers = new GroupByContainerCount(1).group(taskModels);
- Set<ContainerModel> balanceContainers = new GroupByContainerCount(1).balance(taskModels, localityManager);
+ Set<ContainerModel> groupContainers = new GroupByContainerCount(getConfig(1)).group(taskModels);
+ Set<ContainerModel> balanceContainers = new GroupByContainerCount(getConfig(1)).balance(taskModels, localityManager);
// Results should be the same as calling group()
assertEquals(groupContainers, balanceContainers);
@@ -717,12 +728,12 @@ public class TestGroupByContainerCount {
public void testGroupTaskCountDecrease() {
int taskCount = 3;
Set<TaskModel> taskModels = generateTaskModels(taskCount);
- Set<ContainerModel> prevContainers = new GroupByContainerCount(3).group(generateTaskModels(taskCount + 1)); // Here's the key step
+ Set<ContainerModel> prevContainers = new GroupByContainerCount(getConfig(3)).group(generateTaskModels(taskCount + 1)); // Here's the key step
Map<String, String> prevTaskToContainerMapping = generateTaskContainerMapping(prevContainers);
when(taskAssignmentManager.readTaskAssignment()).thenReturn(prevTaskToContainerMapping);
- Set<ContainerModel> groupContainers = new GroupByContainerCount(1).group(taskModels);
- Set<ContainerModel> balanceContainers = new GroupByContainerCount(1).balance(taskModels, localityManager);
+ Set<ContainerModel> groupContainers = new GroupByContainerCount(getConfig(1)).group(taskModels);
+ Set<ContainerModel> balanceContainers = new GroupByContainerCount(getConfig(1)).balance(taskModels, localityManager);
// Results should be the same as calling group()
assertEquals(groupContainers, balanceContainers);
@@ -737,31 +748,31 @@ public class TestGroupByContainerCount {
@Test(expected = IllegalArgumentException.class)
public void testBalancerNewContainerCountGreaterThanTasks() {
Set<TaskModel> taskModels = generateTaskModels(3);
- Set<ContainerModel> prevContainers = new GroupByContainerCount(3).group(taskModels);
+ Set<ContainerModel> prevContainers = new GroupByContainerCount(getConfig(3)).group(taskModels);
Map<String, String> prevTaskToContainerMapping = generateTaskContainerMapping(prevContainers);
when(taskAssignmentManager.readTaskAssignment()).thenReturn(prevTaskToContainerMapping);
- new GroupByContainerCount(5).balance(taskModels, localityManager); // Should throw
+ new GroupByContainerCount(getConfig(5)).balance(taskModels, localityManager); // Should throw
}
@Test(expected = IllegalArgumentException.class)
public void testBalancerEmptyTasks() {
Set<TaskModel> taskModels = generateTaskModels(3);
- Set<ContainerModel> prevContainers = new GroupByContainerCount(3).group(taskModels);
+ Set<ContainerModel> prevContainers = new GroupByContainerCount(getConfig(3)).group(taskModels);
Map<String, String> prevTaskToContainerMapping = generateTaskContainerMapping(prevContainers);
when(taskAssignmentManager.readTaskAssignment()).thenReturn(prevTaskToContainerMapping);
- new GroupByContainerCount(5).balance(new HashSet<TaskModel>(), localityManager); // Should throw
+ new GroupByContainerCount(getConfig(5)).balance(new HashSet<>(), localityManager); // Should throw
}
@Test(expected = UnsupportedOperationException.class)
public void testBalancerResultImmutable() {
Set<TaskModel> taskModels = generateTaskModels(3);
- Set<ContainerModel> prevContainers = new GroupByContainerCount(3).group(taskModels);
+ Set<ContainerModel> prevContainers = new GroupByContainerCount(getConfig(3)).group(taskModels);
Map<String, String> prevTaskToContainerMapping = generateTaskContainerMapping(prevContainers);
when(taskAssignmentManager.readTaskAssignment()).thenReturn(prevTaskToContainerMapping);
- Set<ContainerModel> containers = new GroupByContainerCount(2).balance(taskModels, localityManager);
+ Set<ContainerModel> containers = new GroupByContainerCount(getConfig(2)).balance(taskModels, localityManager);
containers.remove(containers.iterator().next());
}
@@ -776,7 +787,7 @@ public class TestGroupByContainerCount {
Map<String, String> prevTaskToContainerMapping = generateTaskContainerMapping(prevContainers);
when(taskAssignmentManager.readTaskAssignment()).thenReturn(prevTaskToContainerMapping);
- new GroupByContainerCount(3).balance(taskModels, localityManager); //Should throw
+ new GroupByContainerCount(getConfig(3)).balance(taskModels, localityManager); //Should throw
}
@@ -784,10 +795,17 @@ public class TestGroupByContainerCount {
public void testBalancerWithNullLocalityManager() {
Set<TaskModel> taskModels = generateTaskModels(3);
- Set<ContainerModel> groupContainers = new GroupByContainerCount(3).group(taskModels);
- Set<ContainerModel> balanceContainers = new GroupByContainerCount(3).balance(taskModels, null);
+ Set<ContainerModel> groupContainers = new GroupByContainerCount(getConfig(3)).group(taskModels);
+ Set<ContainerModel> balanceContainers = new GroupByContainerCount(getConfig(3)).balance(taskModels, null);
// Results should be the same as calling group()
assertEquals(groupContainers, balanceContainers);
}
+
+
+ Config getConfig(int containerCount) {
+ Map<String, String> config = new HashMap<>();
+ config.put(JobConfig.JOB_CONTAINER_COUNT(), String.valueOf(containerCount));
+ return new MapConfig(config);
+ }
}
http://git-wip-us.apache.org/repos/asf/samza/blob/b101ff91/samza-core/src/test/java/org/apache/samza/container/grouper/task/TestGroupByContainerIds.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/container/grouper/task/TestGroupByContainerIds.java b/samza-core/src/test/java/org/apache/samza/container/grouper/task/TestGroupByContainerIds.java
index b9fe6fb..5bb78e8 100644
--- a/samza-core/src/test/java/org/apache/samza/container/grouper/task/TestGroupByContainerIds.java
+++ b/samza-core/src/test/java/org/apache/samza/container/grouper/task/TestGroupByContainerIds.java
@@ -37,21 +37,25 @@ import org.apache.samza.job.model.ContainerModel;
import org.apache.samza.job.model.TaskModel;
import org.junit.Before;
import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.powermock.api.mockito.PowerMockito;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
import static org.apache.samza.container.mock.ContainerMocks.*;
import static org.junit.Assert.*;
import static org.mockito.Mockito.*;
+@RunWith(PowerMockRunner.class)
+@PrepareForTest({TaskAssignmentManager.class, GroupByContainerIds.class})
public class TestGroupByContainerIds {
@Before
- public void setup() {
+ public void setup() throws Exception {
TaskAssignmentManager taskAssignmentManager = mock(TaskAssignmentManager.class);
LocalityManager localityManager = mock(LocalityManager.class);
- when(localityManager.getTaskAssignmentManager()).thenReturn(taskAssignmentManager);
-
-
+ PowerMockito.whenNew(TaskAssignmentManager.class).withAnyArguments().thenReturn(taskAssignmentManager);
}
private Config buildConfigForContainerCount(int count) {
http://git-wip-us.apache.org/repos/asf/samza/blob/b101ff91/samza-core/src/test/java/org/apache/samza/container/grouper/task/TestTaskAssignmentManager.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/container/grouper/task/TestTaskAssignmentManager.java b/samza-core/src/test/java/org/apache/samza/container/grouper/task/TestTaskAssignmentManager.java
index 879171e..fcdbf08 100644
--- a/samza-core/src/test/java/org/apache/samza/container/grouper/task/TestTaskAssignmentManager.java
+++ b/samza-core/src/test/java/org/apache/samza/container/grouper/task/TestTaskAssignmentManager.java
@@ -68,7 +68,7 @@ public class TestTaskAssignmentManager {
@Test
public void testTaskAssignmentManager() {
TaskAssignmentManager taskAssignmentManager = new TaskAssignmentManager(config, new MetricsRegistryMap());
- taskAssignmentManager.init(config, new MetricsRegistryMap());
+ taskAssignmentManager.init();
Map<String, String> expectedMap = ImmutableMap.of("Task0", "0", "Task1", "1", "Task2", "2", "Task3", "0", "Task4", "1");
@@ -83,9 +83,10 @@ public class TestTaskAssignmentManager {
taskAssignmentManager.close();
}
- @Test public void testDeleteMappings() {
+ @Test
+ public void testDeleteMappings() {
TaskAssignmentManager taskAssignmentManager = new TaskAssignmentManager(config, new MetricsRegistryMap());
- taskAssignmentManager.init(config, new MetricsRegistryMap());
+ taskAssignmentManager.init();
Map<String, String> expectedMap = ImmutableMap.of("Task0", "0", "Task1", "1");
@@ -104,9 +105,10 @@ public class TestTaskAssignmentManager {
taskAssignmentManager.close();
}
- @Test public void testTaskAssignmentManagerEmptyCoordinatorStream() {
+ @Test
+ public void testTaskAssignmentManagerEmptyCoordinatorStream() {
TaskAssignmentManager taskAssignmentManager = new TaskAssignmentManager(config, new MetricsRegistryMap());
- taskAssignmentManager.init(config, new MetricsRegistryMap());
+ taskAssignmentManager.init();
Map<String, String> expectedMap = new HashMap<>();
Map<String, String> localMap = taskAssignmentManager.readTaskAssignment();
http://git-wip-us.apache.org/repos/asf/samza/blob/b101ff91/samza-core/src/test/java/org/apache/samza/coordinator/TestJobModelManager.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/coordinator/TestJobModelManager.java b/samza-core/src/test/java/org/apache/samza/coordinator/TestJobModelManager.java
index 3130ed6..1dbf132 100644
--- a/samza-core/src/test/java/org/apache/samza/coordinator/TestJobModelManager.java
+++ b/samza-core/src/test/java/org/apache/samza/coordinator/TestJobModelManager.java
@@ -23,6 +23,7 @@ import org.apache.samza.Partition;
import org.apache.samza.config.Config;
import org.apache.samza.config.MapConfig;
import org.apache.samza.container.LocalityManager;
+import org.apache.samza.container.grouper.task.GroupByContainerCount;
import org.apache.samza.container.grouper.task.TaskAssignmentManager;
import org.apache.samza.coordinator.server.HttpServer;
import org.apache.samza.coordinator.stream.messages.SetContainerHostMapping;
@@ -45,12 +46,18 @@ import static org.mockito.Matchers.argThat;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
+import org.junit.runner.RunWith;
import org.mockito.ArgumentMatcher;
+import org.powermock.api.mockito.PowerMockito;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
import scala.collection.JavaConversions;
/**
* Unit tests for {@link JobModelManager}
*/
+@RunWith(PowerMockRunner.class)
+@PrepareForTest({TaskAssignmentManager.class, GroupByContainerCount.class})
public class TestJobModelManager {
private final TaskAssignmentManager mockTaskManager = mock(TaskAssignmentManager.class);
private final LocalityManager mockLocalityManager = mock(LocalityManager.class);
@@ -67,7 +74,7 @@ public class TestJobModelManager {
private JobModelManager jobModelManager;
@Before
- public void setup() {
+ public void setup() throws Exception {
when(mockLocalityManager.readContainerLocality()).thenReturn(this.localityMappings);
when(mockStreamMetadataCache.getStreamMetadata(argThat(new ArgumentMatcher<scala.collection.immutable.Set<SystemStream>>() {
@Override
@@ -77,7 +84,7 @@ public class TestJobModelManager {
}
}), anyBoolean())).thenReturn(mockStreamMetadataMap);
when(mockStreamMetadata.getSystemStreamPartitionMetadata()).thenReturn(mockSspMetadataMap);
- when(mockLocalityManager.getTaskAssignmentManager()).thenReturn(mockTaskManager);
+ PowerMockito.whenNew(TaskAssignmentManager.class).withAnyArguments().thenReturn(mockTaskManager);
when(mockTaskManager.readTaskAssignment()).thenReturn(Collections.EMPTY_MAP);
}
http://git-wip-us.apache.org/repos/asf/samza/blob/b101ff91/samza-rest/src/main/java/org/apache/samza/rest/proxy/task/SamzaTaskProxy.java
----------------------------------------------------------------------
diff --git a/samza-rest/src/main/java/org/apache/samza/rest/proxy/task/SamzaTaskProxy.java b/samza-rest/src/main/java/org/apache/samza/rest/proxy/task/SamzaTaskProxy.java
index b14d14b..daf665b 100644
--- a/samza-rest/src/main/java/org/apache/samza/rest/proxy/task/SamzaTaskProxy.java
+++ b/samza-rest/src/main/java/org/apache/samza/rest/proxy/task/SamzaTaskProxy.java
@@ -34,6 +34,7 @@ import org.apache.samza.config.JobConfig;
import org.apache.samza.config.MapConfig;
import org.apache.samza.config.StorageConfig;
import org.apache.samza.container.LocalityManager;
+import org.apache.samza.container.grouper.task.TaskAssignmentManager;
import org.apache.samza.coordinator.stream.CoordinatorStreamSystemConsumer;
import org.apache.samza.coordinator.stream.messages.SetContainerHostMapping;
import org.apache.samza.metrics.MetricsRegistryMap;
@@ -131,7 +132,8 @@ public class SamzaTaskProxy implements TaskProxy {
protected List<Task> readTasksFromCoordinatorStream(CoordinatorStreamSystemConsumer consumer) {
LocalityManager localityManager = new LocalityManager(consumer.getConfig(), new MetricsRegistryMap());
Map<String, Map<String, String>> containerIdToHostMapping = localityManager.readContainerLocality();
- Map<String, String> taskNameToContainerIdMapping = localityManager.getTaskAssignmentManager().readTaskAssignment();
+ TaskAssignmentManager taskAssignmentManager = new TaskAssignmentManager(consumer.getConfig(), new MetricsRegistryMap());
+ Map<String, String> taskNameToContainerIdMapping = taskAssignmentManager.readTaskAssignment();
StorageConfig storageConfig = new StorageConfig(consumer.getConfig());
List<String> storeNames = JavaConverters.seqAsJavaListConverter(storageConfig.getStoreNames()).asJava();
return taskNameToContainerIdMapping.entrySet()
http://git-wip-us.apache.org/repos/asf/samza/blob/b101ff91/samza-yarn/src/test/java/org/apache/samza/webapp/TestApplicationMasterRestClient.java
----------------------------------------------------------------------
diff --git a/samza-yarn/src/test/java/org/apache/samza/webapp/TestApplicationMasterRestClient.java b/samza-yarn/src/test/java/org/apache/samza/webapp/TestApplicationMasterRestClient.java
index 9c0dea7..d19badc 100644
--- a/samza-yarn/src/test/java/org/apache/samza/webapp/TestApplicationMasterRestClient.java
+++ b/samza-yarn/src/test/java/org/apache/samza/webapp/TestApplicationMasterRestClient.java
@@ -44,6 +44,7 @@ import org.apache.samza.SamzaException;
import org.apache.samza.clustermanager.SamzaApplicationState;
import org.apache.samza.clustermanager.SamzaResource;
import org.apache.samza.config.Config;
+import org.apache.samza.config.JobConfig;
import org.apache.samza.config.MapConfig;
import org.apache.samza.container.TaskName;
import org.apache.samza.container.grouper.task.GroupByContainerCount;
@@ -275,7 +276,9 @@ public class TestApplicationMasterRestClient {
new TaskModel(new TaskName("task2"),
ImmutableSet.of(new SystemStreamPartition(new SystemStream("system1", "stream1"), new Partition(1))),
new Partition(1)));
- GroupByContainerCount grouper = new GroupByContainerCount(2);
+ Map<String, String> config = new HashMap<>();
+ config.put(JobConfig.JOB_CONTAINER_COUNT(), String.valueOf(2));
+ GroupByContainerCount grouper = new GroupByContainerCount(new MapConfig(config));
Set<ContainerModel> containerModels = grouper.group(taskModels);
HashMap<String, ContainerModel> containers = new HashMap<>();
for (ContainerModel containerModel : containerModels) {