You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by ni...@apache.org on 2017/06/20 15:35:55 UTC
samza git commit: SAMZA-1334: fix pre-condition for
ContainerAllocator to work properly
Repository: samza
Updated Branches:
refs/heads/master e827d150f -> 8aa75467e
SAMZA-1334: fix pre-condition for ContainerAllocator to work properly
We have observed issues when the LocalityManager reports the container locality mapping while the host-affinity is disabled in ContainerAllocator, in which the ContainerAllocator failed to release extra containers.
Hence, fix is in the form of make sure the pre-condition is met for the ContainerAllocator w/o host-affinity: the localityMap from the JobModel should contain no preferred host info.
Author: Yi Pan (Data Infrastructure) <ni...@gmail.com>
Reviewers: Jagadish <ja...@gmail.com>
Closes #228 from nickpan47/SAMZA-1334 and squashes the following commits:
ad3320f [Yi Pan (Data Infrastructure)] SAMZA-1334: fix the pre-conditions for ContainerAllocator to work properly. Make sure JobModel is generated w/o LocalityManager if host-affinity is disabled
f76fff1 [Yi Pan (Data Infrastructure)] WIP: SAMZA-1334 fix
Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/8aa75467
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/8aa75467
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/8aa75467
Branch: refs/heads/master
Commit: 8aa75467e1d4cc7eb606f63dfe6cb667c4a58460
Parents: e827d15
Author: Yi Pan (Data Infrastructure) <ni...@gmail.com>
Authored: Tue Jun 20 08:35:39 2017 -0700
Committer: Yi Pan (Data Infrastructure) <ni...@gmail.com>
Committed: Tue Jun 20 08:35:39 2017 -0700
----------------------------------------------------------------------
.../org/apache/samza/job/model/JobModel.java | 4 +-
.../samza/coordinator/JobModelManager.scala | 12 +-
.../samza/clustermanager/MockHttpServer.java | 56 --------
.../clustermanager/TestContainerAllocator.java | 33 ++---
.../TestContainerProcessManager.java | 42 +++---
.../TestHostAwareContainerAllocator.java | 41 +++---
.../coordinator/JobModelManagerTestUtil.java | 66 +++++++++
.../samza/coordinator/TestJobModelManager.java | 135 +++++++++++++++++++
.../apache/samza/testUtils/MockHttpServer.java | 56 ++++++++
9 files changed, 323 insertions(+), 122 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/samza/blob/8aa75467/samza-core/src/main/java/org/apache/samza/job/model/JobModel.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/job/model/JobModel.java b/samza-core/src/main/java/org/apache/samza/job/model/JobModel.java
index dbb3867..1115faf 100644
--- a/samza-core/src/main/java/org/apache/samza/job/model/JobModel.java
+++ b/samza-core/src/main/java/org/apache/samza/job/model/JobModel.java
@@ -44,7 +44,7 @@ public class JobModel {
private final Map<String, ContainerModel> containers;
private final LocalityManager localityManager;
- private Map<String, String> localityMappings = new HashMap<String, String>();
+ private final Map<String, String> localityMappings;
public int maxChangeLogStreamPartitions;
@@ -57,6 +57,8 @@ public class JobModel {
this.containers = Collections.unmodifiableMap(containers);
this.localityManager = localityManager;
+ // initialize container localityMappings
+ this.localityMappings = new HashMap<>();
if (localityManager == null) {
for (String containerId : containers.keySet()) {
localityMappings.put(containerId, null);
http://git-wip-us.apache.org/repos/asf/samza/blob/8aa75467/samza-core/src/main/scala/org/apache/samza/coordinator/JobModelManager.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/coordinator/JobModelManager.scala b/samza-core/src/main/scala/org/apache/samza/coordinator/JobModelManager.scala
index 353e297..6319173 100644
--- a/samza-core/src/main/scala/org/apache/samza/coordinator/JobModelManager.scala
+++ b/samza-core/src/main/scala/org/apache/samza/coordinator/JobModelManager.scala
@@ -23,6 +23,7 @@ package org.apache.samza.coordinator
import java.util
import java.util.concurrent.atomic.AtomicReference
+import org.apache.samza.config.ClusterManagerConfig
import org.apache.samza.config.JobConfig.Config2Job
import org.apache.samza.config.SystemConfig.Config2System
import org.apache.samza.config.TaskConfig.Config2Task
@@ -98,6 +99,7 @@ object JobModelManager extends Logging {
info("Got config: %s" format config)
val changelogManager = new ChangelogPartitionManager(coordinatorSystemProducer, coordinatorSystemConsumer, SOURCE)
changelogManager.start()
+
val localityManager = new LocalityManager(coordinatorSystemProducer, coordinatorSystemConsumer)
// We don't need to start() localityManager as they share the same instances with checkpoint and changelog managers.
// TODO: This code will go away with refactoring - SAMZA-678
@@ -228,6 +230,8 @@ object JobModelManager extends Logging {
val groups = grouper.group(allSystemStreamPartitions.asJava)
info("SystemStreamPartitionGrouper %s has grouped the SystemStreamPartitions into %d tasks with the following taskNames: %s" format(grouper, groups.size(), groups.keySet()))
+ val isHostAffinityEnabled = new ClusterManagerConfig(config).getHostAffinityEnabled
+
// If no mappings are present(first time the job is running) we return -1, this will allow 0 to be the first change
// mapping.
var maxChangelogPartitionId = changeLogPartitionMapping.asScala.values.map(_.toInt).toList.sorted.lastOption.getOrElse(-1)
@@ -256,13 +260,17 @@ object JobModelManager extends Logging {
val containerGrouper = containerGrouperFactory.build(config)
val containerModels = {
containerGrouper match {
- case grouper: BalancingTaskNameGrouper => grouper.balance(taskModels.asJava, localityManager)
+ case grouper: BalancingTaskNameGrouper if isHostAffinityEnabled => grouper.balance(taskModels.asJava, localityManager)
case _ => containerGrouper.group(taskModels.asJava, containerIds)
}
}
val containerMap = containerModels.asScala.map { case (containerModel) => containerModel.getProcessorId -> containerModel }.toMap
- new JobModel(config, containerMap.asJava, localityManager)
+ if (isHostAffinityEnabled) {
+ new JobModel(config, containerMap.asJava, localityManager)
+ } else {
+ new JobModel(config, containerMap.asJava)
+ }
}
/**
http://git-wip-us.apache.org/repos/asf/samza/blob/8aa75467/samza-core/src/test/java/org/apache/samza/clustermanager/MockHttpServer.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/clustermanager/MockHttpServer.java b/samza-core/src/test/java/org/apache/samza/clustermanager/MockHttpServer.java
deleted file mode 100644
index 4f44ced..0000000
--- a/samza-core/src/test/java/org/apache/samza/clustermanager/MockHttpServer.java
+++ /dev/null
@@ -1,56 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.clustermanager;
-
-import org.apache.samza.coordinator.server.HttpServer;
-import org.eclipse.jetty.servlet.ServletHolder;
-
-import java.net.MalformedURLException;
-import java.net.URL;
-
-public class MockHttpServer extends HttpServer {
-
- public MockHttpServer(String rootPath, int port, String resourceBasePath, ServletHolder defaultHolder) {
- super(rootPath, port, resourceBasePath, defaultHolder);
- start();
- }
-
- @Override
- public void start() {
- super.running_$eq(true);
- }
-
- @Override
- public void stop() {
- super.running_$eq(false);
- }
-
- @Override
- public URL getUrl() {
- if (running()) {
- try {
- return new URL("http://localhost:12345/");
- } catch (MalformedURLException mue) {
- mue.printStackTrace();
- }
- }
- return null;
- }
-}
http://git-wip-us.apache.org/repos/asf/samza/blob/8aa75467/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerAllocator.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerAllocator.java b/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerAllocator.java
index 989b82a..1e9d372 100644
--- a/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerAllocator.java
+++ b/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerAllocator.java
@@ -21,12 +21,9 @@ package org.apache.samza.clustermanager;
import org.apache.samza.config.Config;
import org.apache.samza.config.MapConfig;
-import org.apache.samza.container.TaskName;
import org.apache.samza.coordinator.JobModelManager;
-import org.apache.samza.coordinator.server.HttpServer;
-import org.apache.samza.job.model.ContainerModel;
-import org.apache.samza.job.model.JobModel;
-import org.apache.samza.job.model.TaskModel;
+import org.apache.samza.coordinator.JobModelManagerTestUtil;
+import org.apache.samza.testUtils.MockHttpServer;
import org.eclipse.jetty.servlet.DefaultServlet;
import org.eclipse.jetty.servlet.ServletHolder;
import org.junit.After;
@@ -47,8 +44,9 @@ public class TestContainerAllocator {
private final MockClusterResourceManagerCallback callback = new MockClusterResourceManagerCallback();
private final MockClusterResourceManager manager = new MockClusterResourceManager(callback);
private final Config config = getConfig();
- private final JobModelManager reader = getJobModelReader(1);
- private final SamzaApplicationState state = new SamzaApplicationState(reader);
+ private final JobModelManager jobModelManager = JobModelManagerTestUtil.getJobModelManager(config, 1,
+ new MockHttpServer("/", 7777, null, new ServletHolder(DefaultServlet.class)));
+ private final SamzaApplicationState state = new SamzaApplicationState(jobModelManager);
private ContainerAllocator containerAllocator;
private MockContainerRequestState requestState;
private Thread allocatorThread;
@@ -67,7 +65,7 @@ public class TestContainerAllocator {
@After
public void teardown() throws Exception {
- reader.stop();
+ jobModelManager.stop();
containerAllocator.stop();
}
@@ -93,19 +91,6 @@ public class TestContainerAllocator {
return new MapConfig(map);
}
- private static JobModelManager getJobModelReader(int containerCount) {
- //Ideally, the JobModelReader should be constructed independent of HttpServer.
- //That way it becomes easier to mock objects. Save it for later.
-
- HttpServer server = new MockHttpServer("/", 7777, null, new ServletHolder(DefaultServlet.class));
- Map<String, ContainerModel> containers = new java.util.HashMap<>();
- for (int i = 0; i < containerCount; i++) {
- ContainerModel container = new ContainerModel(String.valueOf(i), i, new HashMap<TaskName, TaskModel>());
- containers.put(String.valueOf(i), container);
- }
- JobModel jobModel = new JobModel(getConfig(), containers);
- return new JobModelManager(jobModel, server, null);
- }
/**
@@ -132,10 +117,10 @@ public class TestContainerAllocator {
public void testRequestContainers() throws Exception {
Map<String, String> containersToHostMapping = new HashMap<String, String>() {
{
- put("0", "abc");
- put("1", "def");
+ put("0", null);
+ put("1", null);
put("2", null);
- put("3", "abc");
+ put("3", null);
}
};
http://git-wip-us.apache.org/repos/asf/samza/blob/8aa75467/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerProcessManager.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerProcessManager.java b/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerProcessManager.java
index 660012e..8199559 100644
--- a/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerProcessManager.java
+++ b/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerProcessManager.java
@@ -23,14 +23,12 @@ import org.apache.samza.config.Config;
import org.apache.samza.config.JobConfig;
import org.apache.samza.config.MapConfig;
import org.apache.samza.container.LocalityManager;
-import org.apache.samza.container.TaskName;
import org.apache.samza.coordinator.JobModelManager;
+import org.apache.samza.coordinator.JobModelManagerTestUtil;
import org.apache.samza.coordinator.server.HttpServer;
import org.apache.samza.coordinator.stream.messages.SetContainerHostMapping;
-import org.apache.samza.job.model.ContainerModel;
-import org.apache.samza.job.model.JobModel;
-import org.apache.samza.job.model.TaskModel;
import org.apache.samza.metrics.MetricsRegistryMap;
+import org.apache.samza.testUtils.MockHttpServer;
import org.eclipse.jetty.servlet.DefaultServlet;
import org.eclipse.jetty.servlet.ServletHolder;
import org.junit.After;
@@ -79,7 +77,7 @@ public class TestContainerProcessManager {
private Config getConfigWithHostAffinity() {
Map<String, String> map = new HashMap<>();
map.putAll(config);
- map.put("yarn.samza.host-affinity.enabled", "true");
+ map.put("job.host-affinity.enabled", "true");
return new MapConfig(map);
}
@@ -87,30 +85,24 @@ public class TestContainerProcessManager {
private SamzaApplicationState state = null;
- private JobModelManager getCoordinator(int containerCount) {
- Map<String, ContainerModel> containers = new java.util.HashMap<>();
- for (int i = 0; i < containerCount; i++) {
- ContainerModel container = new ContainerModel(String.valueOf(i), i, new HashMap<TaskName, TaskModel>());
- containers.put(String.valueOf(i), container);
- }
+ private JobModelManager getJobModelManagerWithHostAffinity(int containerCount) {
Map<String, Map<String, String>> localityMap = new HashMap<>();
localityMap.put("0", new HashMap<String, String>() { {
put(SetContainerHostMapping.HOST_KEY, "abc");
- }
- });
+ } });
LocalityManager mockLocalityManager = mock(LocalityManager.class);
when(mockLocalityManager.readContainerLocality()).thenReturn(localityMap);
- JobModel jobModel = new JobModel(getConfig(), containers, mockLocalityManager);
- JobModelManager.jobModelRef().getAndSet(jobModel);
+ return JobModelManagerTestUtil.getJobModelManagerWithLocalityManager(getConfig(), containerCount, mockLocalityManager, this.server);
+ }
- return new JobModelManager(jobModel, this.server, null);
+ private JobModelManager getJobModelManagerWithoutHostAffinity(int containerCount) {
+ return JobModelManagerTestUtil.getJobModelManager(getConfig(), containerCount, this.server);
}
@Before
public void setup() throws Exception {
server = new MockHttpServer("/", 7777, null, new ServletHolder(DefaultServlet.class));
- state = new SamzaApplicationState(getCoordinator(1));
}
private Field getPrivateFieldFromTaskManager(String fieldName, ContainerProcessManager object) throws Exception {
@@ -127,6 +119,7 @@ public class TestContainerProcessManager {
conf.put("yarn.container.memory.mb", "500");
conf.put("yarn.container.cpu.cores", "5");
+ state = new SamzaApplicationState(getJobModelManagerWithoutHostAffinity(1));
ContainerProcessManager taskManager = new ContainerProcessManager(
new MapConfig(conf),
state,
@@ -146,6 +139,7 @@ public class TestContainerProcessManager {
conf.put("yarn.container.memory.mb", "500");
conf.put("yarn.container.cpu.cores", "5");
+ state = new SamzaApplicationState(getJobModelManagerWithHostAffinity(1));
taskManager = new ContainerProcessManager(
new MapConfig(conf),
state,
@@ -164,6 +158,8 @@ public class TestContainerProcessManager {
@Test
public void testOnInit() throws Exception {
Config conf = getConfig();
+ state = new SamzaApplicationState(getJobModelManagerWithoutHostAffinity(1));
+
ContainerProcessManager taskManager = new ContainerProcessManager(
new MapConfig(conf),
state,
@@ -200,6 +196,8 @@ public class TestContainerProcessManager {
@Test
public void testOnShutdown() throws Exception {
Config conf = getConfig();
+ state = new SamzaApplicationState(getJobModelManagerWithoutHostAffinity(1));
+
ContainerProcessManager taskManager = new ContainerProcessManager(
new MapConfig(conf),
state,
@@ -226,6 +224,8 @@ public class TestContainerProcessManager {
@Test
public void testTaskManagerShouldStopWhenContainersFinish() {
Config conf = getConfig();
+ state = new SamzaApplicationState(getJobModelManagerWithoutHostAffinity(1));
+
ContainerProcessManager taskManager = new ContainerProcessManager(
new MapConfig(conf),
state,
@@ -251,6 +251,7 @@ public class TestContainerProcessManager {
@Test
public void testNewContainerRequestedOnFailureWithUnknownCode() throws Exception {
Config conf = getConfig();
+ state = new SamzaApplicationState(getJobModelManagerWithoutHostAffinity(1));
ContainerProcessManager taskManager = new ContainerProcessManager(
new MapConfig(conf),
@@ -330,6 +331,8 @@ public class TestContainerProcessManager {
config.putAll(getConfig());
config.remove("yarn.container.retry.count");
+ state = new SamzaApplicationState(getJobModelManagerWithoutHostAffinity(1));
+
ContainerProcessManager taskManager = new ContainerProcessManager(
new MapConfig(conf),
state,
@@ -393,8 +396,11 @@ public class TestContainerProcessManager {
@Test
public void testAppMasterWithFwk() {
+ Config conf = getConfig();
+ state = new SamzaApplicationState(getJobModelManagerWithoutHostAffinity(1));
+
ContainerProcessManager taskManager = new ContainerProcessManager(
- new MapConfig(config),
+ new MapConfig(conf),
state,
new MetricsRegistryMap(),
manager
http://git-wip-us.apache.org/repos/asf/samza/blob/8aa75467/samza-core/src/test/java/org/apache/samza/clustermanager/TestHostAwareContainerAllocator.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/clustermanager/TestHostAwareContainerAllocator.java b/samza-core/src/test/java/org/apache/samza/clustermanager/TestHostAwareContainerAllocator.java
index 83d31e2..32ec2d2 100644
--- a/samza-core/src/test/java/org/apache/samza/clustermanager/TestHostAwareContainerAllocator.java
+++ b/samza-core/src/test/java/org/apache/samza/clustermanager/TestHostAwareContainerAllocator.java
@@ -26,12 +26,11 @@ import java.util.concurrent.atomic.AtomicInteger;
import org.apache.samza.config.Config;
import org.apache.samza.config.MapConfig;
-import org.apache.samza.container.TaskName;
+import org.apache.samza.container.LocalityManager;
import org.apache.samza.coordinator.JobModelManager;
-import org.apache.samza.coordinator.server.HttpServer;
-import org.apache.samza.job.model.ContainerModel;
-import org.apache.samza.job.model.JobModel;
-import org.apache.samza.job.model.TaskModel;
+import org.apache.samza.coordinator.JobModelManagerTestUtil;
+import org.apache.samza.coordinator.stream.messages.SetContainerHostMapping;
+import org.apache.samza.testUtils.MockHttpServer;
import org.eclipse.jetty.servlet.DefaultServlet;
import org.eclipse.jetty.servlet.ServletHolder;
import org.junit.After;
@@ -42,13 +41,28 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
public class TestHostAwareContainerAllocator {
private final MockClusterResourceManagerCallback callback = new MockClusterResourceManagerCallback();
private final MockClusterResourceManager manager = new MockClusterResourceManager(callback);
private final Config config = getConfig();
- private final JobModelManager reader = getJobModelManager(1);
+ private final JobModelManager reader = initializeJobModelManager(config, 1);
+
+ private JobModelManager initializeJobModelManager(Config config, int containerCount) {
+ Map<String, Map<String, String>> localityMap = new HashMap<>();
+ localityMap.put("0", new HashMap<String, String>() { {
+ put(SetContainerHostMapping.HOST_KEY, "abc");
+ } });
+ LocalityManager mockLocalityManager = mock(LocalityManager.class);
+ when(mockLocalityManager.readContainerLocality()).thenReturn(localityMap);
+
+ return JobModelManagerTestUtil.getJobModelManagerWithLocalityManager(getConfig(), containerCount, mockLocalityManager,
+ new MockHttpServer("/", 7777, null, new ServletHolder(DefaultServlet.class)));
+ }
+
private final SamzaApplicationState state = new SamzaApplicationState(reader);
private HostAwareContainerAllocator containerAllocator;
private final int timeoutMillis = 1000;
@@ -334,19 +348,4 @@ public class TestHostAwareContainerAllocator {
return new MapConfig(map);
}
- private static JobModelManager getJobModelManager(int containerCount) {
- //Ideally, the JobModelReader should be constructed independent of HttpServer.
- //That way it becomes easier to mock objects. Save it for later.
-
- HttpServer server = new MockHttpServer("/", 7777, null, new ServletHolder(DefaultServlet.class));
- Map<String, ContainerModel> containers = new java.util.HashMap<>();
- for (int i = 0; i < containerCount; i++) {
- ContainerModel container = new ContainerModel(String.valueOf(i), i, new HashMap<TaskName, TaskModel>());
- containers.put(String.valueOf(i), container);
- }
- JobModel jobModel = new JobModel(getConfig(), containers);
- return new JobModelManager(jobModel, server, null);
- }
-
-
}
http://git-wip-us.apache.org/repos/asf/samza/blob/8aa75467/samza-core/src/test/java/org/apache/samza/coordinator/JobModelManagerTestUtil.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/coordinator/JobModelManagerTestUtil.java b/samza-core/src/test/java/org/apache/samza/coordinator/JobModelManagerTestUtil.java
new file mode 100644
index 0000000..b7514c4
--- /dev/null
+++ b/samza-core/src/test/java/org/apache/samza/coordinator/JobModelManagerTestUtil.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.coordinator;
+
+import org.apache.samza.config.Config;
+import org.apache.samza.container.LocalityManager;
+import org.apache.samza.container.TaskName;
+import org.apache.samza.coordinator.server.HttpServer;
+import org.apache.samza.job.model.ContainerModel;
+import org.apache.samza.job.model.JobModel;
+import org.apache.samza.job.model.TaskModel;
+import org.apache.samza.system.StreamMetadataCache;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Utils to create instances of {@link JobModelManager} in unit tests
+ */
+public class JobModelManagerTestUtil {
+
+ public static JobModelManager getJobModelManager(Config config, int containerCount, HttpServer server) {
+ return getJobModelManagerWithLocalityManager(config, containerCount, null, server);
+ }
+
+ public static JobModelManager getJobModelManagerWithLocalityManager(Config config, int containerCount, LocalityManager localityManager, HttpServer server) {
+ Map<String, ContainerModel> containers = new java.util.HashMap<>();
+ for (int i = 0; i < containerCount; i++) {
+ ContainerModel container = new ContainerModel(String.valueOf(i), i, new HashMap<TaskName, TaskModel>());
+ containers.put(String.valueOf(i), container);
+ }
+ JobModel jobModel = new JobModel(config, containers, localityManager);
+ return new JobModelManager(jobModel, server, null);
+ }
+
+ public static JobModelManager getJobModelManagerUsingReadModel(Config config, int containerCount, StreamMetadataCache streamMetadataCache,
+ LocalityManager locManager, HttpServer server) {
+ List<String> containerIds = new ArrayList<>();
+ for (int i = 0; i < containerCount; i++) {
+ containerIds.add(String.valueOf(i));
+ }
+ JobModel jobModel = JobModelManager.readJobModel(config, new HashMap<>(), locManager, streamMetadataCache, containerIds);
+ return new JobModelManager(jobModel, server, null);
+ }
+
+
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/8aa75467/samza-core/src/test/java/org/apache/samza/coordinator/TestJobModelManager.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/coordinator/TestJobModelManager.java b/samza-core/src/test/java/org/apache/samza/coordinator/TestJobModelManager.java
new file mode 100644
index 0000000..1d6fc65
--- /dev/null
+++ b/samza-core/src/test/java/org/apache/samza/coordinator/TestJobModelManager.java
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.coordinator;
+
+import org.apache.samza.Partition;
+import org.apache.samza.config.Config;
+import org.apache.samza.config.MapConfig;
+import org.apache.samza.container.LocalityManager;
+import org.apache.samza.container.grouper.task.TaskAssignmentManager;
+import org.apache.samza.coordinator.server.HttpServer;
+import org.apache.samza.coordinator.stream.messages.SetContainerHostMapping;
+import org.apache.samza.system.StreamMetadataCache;
+import org.apache.samza.system.SystemStream;
+import org.apache.samza.system.SystemStreamMetadata;
+import org.apache.samza.testUtils.MockHttpServer;
+import org.eclipse.jetty.servlet.DefaultServlet;
+import org.eclipse.jetty.servlet.ServletHolder;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Collections;
+
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Matchers.anyBoolean;
+import static org.mockito.Matchers.argThat;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import org.mockito.ArgumentMatcher;
+import scala.collection.JavaConversions;
+
+/**
+ * Unit tests for {@link JobModelManager}
+ */
+public class TestJobModelManager {
+ private final TaskAssignmentManager mockTaskManager = mock(TaskAssignmentManager.class);
+ private final LocalityManager mockLocalityManager = mock(LocalityManager.class);
+ private final Map<String, Map<String, String>> localityMappings = new HashMap<>();
+ private final HttpServer server = new MockHttpServer("/", 7777, null, new ServletHolder(DefaultServlet.class));
+ private final SystemStream inputStream = new SystemStream("test-system", "test-stream");
+ private final SystemStreamMetadata.SystemStreamPartitionMetadata mockSspMetadata = mock(SystemStreamMetadata.SystemStreamPartitionMetadata.class);
+ private final Map<Partition, SystemStreamMetadata.SystemStreamPartitionMetadata> mockSspMetadataMap = Collections.singletonMap(new Partition(0), mockSspMetadata);
+ private final SystemStreamMetadata mockStreamMetadata = mock(SystemStreamMetadata.class);
+ private final scala.collection.immutable.Map<SystemStream, SystemStreamMetadata> mockStreamMetadataMap = new scala.collection.immutable.Map.Map1(inputStream, mockStreamMetadata);
+ private final StreamMetadataCache mockStreamMetadataCache = mock(StreamMetadataCache.class);
+ private final scala.collection.immutable.Set<SystemStream> inputStreamSet = JavaConversions.asScalaSet(Collections.singleton(inputStream)).toSet();
+
+ private JobModelManager jobModelManager;
+
+ @Before
+ public void setup() {
+ when(mockLocalityManager.readContainerLocality()).thenReturn(this.localityMappings);
+ when(mockStreamMetadataCache.getStreamMetadata(argThat(new ArgumentMatcher<scala.collection.immutable.Set<SystemStream>>() {
+ @Override
+ public boolean matches(Object argument) {
+ scala.collection.immutable.Set<SystemStream> set = (scala.collection.immutable.Set<SystemStream>) argument;
+ return set.equals(inputStreamSet);
+ }
+ }), anyBoolean())).thenReturn(mockStreamMetadataMap);
+ when(mockStreamMetadata.getSystemStreamPartitionMetadata()).thenReturn(mockSspMetadataMap);
+ when(mockLocalityManager.getTaskAssignmentManager()).thenReturn(mockTaskManager);
+ when(mockTaskManager.readTaskAssignment()).thenReturn(Collections.EMPTY_MAP);
+ }
+
+ @Test
+ public void testLocalityMapWithHostAffinity() {
+ Config config = new MapConfig(new HashMap<String, String>() {
+ {
+ put("yarn.container.count", "1");
+ put("systems.test-system.samza.factory", "org.apache.samza.job.yarn.MockSystemFactory");
+ put("yarn.container.memory.mb", "512");
+ put("yarn.package.path", "/foo");
+ put("task.inputs", "test-system.test-stream");
+ put("systems.test-system.samza.key.serde", "org.apache.samza.serializers.JsonSerde");
+ put("systems.test-system.samza.msg.serde", "org.apache.samza.serializers.JsonSerde");
+ put("yarn.container.retry.count", "1");
+ put("yarn.container.retry.window.ms", "1999999999");
+ put("yarn.allocator.sleep.ms", "10");
+ put("job.host-affinity.enabled", "true");
+ }
+ });
+
+ this.localityMappings.put("0", new HashMap<String, String>() { {
+ put(SetContainerHostMapping.HOST_KEY, "abc-affinity");
+ } });
+ this.jobModelManager = JobModelManagerTestUtil.getJobModelManagerUsingReadModel(config, 1, mockStreamMetadataCache, mockLocalityManager, server);
+
+ assertEquals(jobModelManager.jobModel().getAllContainerLocality(), new HashMap<String, String>() { { this.put("0", "abc-affinity"); } });
+ }
+
+ @Test
+ public void testLocalityMapWithoutHostAffinity() {
+ Config config = new MapConfig(new HashMap<String, String>() {
+ {
+ put("yarn.container.count", "1");
+ put("systems.test-system.samza.factory", "org.apache.samza.job.yarn.MockSystemFactory");
+ put("yarn.container.memory.mb", "512");
+ put("yarn.package.path", "/foo");
+ put("task.inputs", "test-system.test-stream");
+ put("systems.test-system.samza.key.serde", "org.apache.samza.serializers.JsonSerde");
+ put("systems.test-system.samza.msg.serde", "org.apache.samza.serializers.JsonSerde");
+ put("yarn.container.retry.count", "1");
+ put("yarn.container.retry.window.ms", "1999999999");
+ put("yarn.allocator.sleep.ms", "10");
+ put("job.host-affinity.enabled", "false");
+ }
+ });
+
+ this.localityMappings.put("0", new HashMap<String, String>() { {
+ put(SetContainerHostMapping.HOST_KEY, "abc-affinity");
+ } });
+ this.jobModelManager = JobModelManagerTestUtil.getJobModelManagerUsingReadModel(config, 1, mockStreamMetadataCache, mockLocalityManager, server);
+
+ assertEquals(jobModelManager.jobModel().getAllContainerLocality(), new HashMap<String, String>() { { this.put("0", null); } });
+ }
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/8aa75467/samza-core/src/test/java/org/apache/samza/testUtils/MockHttpServer.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/testUtils/MockHttpServer.java b/samza-core/src/test/java/org/apache/samza/testUtils/MockHttpServer.java
new file mode 100644
index 0000000..493297e
--- /dev/null
+++ b/samza-core/src/test/java/org/apache/samza/testUtils/MockHttpServer.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.testUtils;
+
+import org.apache.samza.coordinator.server.HttpServer;
+import org.eclipse.jetty.servlet.ServletHolder;
+
+import java.net.MalformedURLException;
+import java.net.URL;
+
+public class MockHttpServer extends HttpServer {
+
+ public MockHttpServer(String rootPath, int port, String resourceBasePath, ServletHolder defaultHolder) {
+ super(rootPath, port, resourceBasePath, defaultHolder);
+ start();
+ }
+
+ @Override
+ public void start() {
+ super.running_$eq(true);
+ }
+
+ @Override
+ public void stop() {
+ super.running_$eq(false);
+ }
+
+ @Override
+ public URL getUrl() {
+ if (running()) {
+ try {
+ return new URL("http://localhost:12345/");
+ } catch (MalformedURLException mue) {
+ mue.printStackTrace();
+ }
+ }
+ return null;
+ }
+}