You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by na...@apache.org on 2017/04/07 22:22:28 UTC
[1/3] samza git commit: SAMZA-1126 - Semantics of processorId in Samza
Repository: samza
Updated Branches:
refs/heads/master c74722b47 -> a7da1840f
http://git-wip-us.apache.org/repos/asf/samza/blob/a7da1840/samza-core/src/test/java/org/apache/samza/container/grouper/task/TestTaskAssignmentManager.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/container/grouper/task/TestTaskAssignmentManager.java b/samza-core/src/test/java/org/apache/samza/container/grouper/task/TestTaskAssignmentManager.java
index d3eb7fb..1b5c904 100644
--- a/samza-core/src/test/java/org/apache/samza/container/grouper/task/TestTaskAssignmentManager.java
+++ b/samza-core/src/test/java/org/apache/samza/container/grouper/task/TestTaskAssignmentManager.java
@@ -34,9 +34,6 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
public class TestTaskAssignmentManager {
-
- private final MockCoordinatorStreamSystemFactory mockCoordinatorStreamSystemFactory =
- new MockCoordinatorStreamSystemFactory();
private final Config config = new MapConfig(
new HashMap<String, String>() {
{
@@ -56,6 +53,8 @@ public class TestTaskAssignmentManager {
}
@Test public void testTaskAssignmentManager() throws Exception {
+ MockCoordinatorStreamSystemFactory mockCoordinatorStreamSystemFactory =
+ new MockCoordinatorStreamSystemFactory();
MockCoordinatorStreamSystemProducer producer =
mockCoordinatorStreamSystemFactory.getCoordinatorStreamSystemProducer(config, null);
MockCoordinatorStreamSystemConsumer consumer =
@@ -70,22 +69,22 @@ public class TestTaskAssignmentManager {
assertTrue(producer.isStarted());
assertTrue(consumer.isStarted());
- Map<String, Integer> expectedMap =
- new HashMap<String, Integer>() {
+ Map<String, String> expectedMap =
+ new HashMap<String, String>() {
{
- this.put("Task0", new Integer(0));
- this.put("Task1", new Integer(1));
- this.put("Task2", new Integer(2));
- this.put("Task3", new Integer(0));
- this.put("Task4", new Integer(1));
+ this.put("Task0", "0");
+ this.put("Task1", "1");
+ this.put("Task2", "2");
+ this.put("Task3", "0");
+ this.put("Task4", "1");
}
};
- for (Map.Entry<String, Integer> entry : expectedMap.entrySet()) {
+ for (Map.Entry<String, String> entry : expectedMap.entrySet()) {
taskAssignmentManager.writeTaskContainerMapping(entry.getKey(), entry.getValue());
}
- Map<String, Integer> localMap = taskAssignmentManager.readTaskAssignment();
+ Map<String, String> localMap = taskAssignmentManager.readTaskAssignment();
assertEquals(expectedMap, localMap);
@@ -95,6 +94,8 @@ public class TestTaskAssignmentManager {
}
@Test public void testDeleteMappings() throws Exception {
+ MockCoordinatorStreamSystemFactory mockCoordinatorStreamSystemFactory =
+ new MockCoordinatorStreamSystemFactory();
MockCoordinatorStreamSystemProducer producer =
mockCoordinatorStreamSystemFactory.getCoordinatorStreamSystemProducer(config, null);
MockCoordinatorStreamSystemConsumer consumer =
@@ -109,23 +110,23 @@ public class TestTaskAssignmentManager {
assertTrue(producer.isStarted());
assertTrue(consumer.isStarted());
- Map<String, Integer> expectedMap =
- new HashMap<String, Integer>() {
+ Map<String, String> expectedMap =
+ new HashMap<String, String>() {
{
- this.put("Task0", new Integer(0));
- this.put("Task1", new Integer(1));
+ this.put("Task0", "0");
+ this.put("Task1", "1");
}
};
- for (Map.Entry<String, Integer> entry : expectedMap.entrySet()) {
+ for (Map.Entry<String, String> entry : expectedMap.entrySet()) {
taskAssignmentManager.writeTaskContainerMapping(entry.getKey(), entry.getValue());
}
- Map<String, Integer> localMap = taskAssignmentManager.readTaskAssignment();
+ Map<String, String> localMap = taskAssignmentManager.readTaskAssignment();
assertEquals(expectedMap, localMap);
taskAssignmentManager.deleteTaskContainerMappings(localMap.keySet());
- Map<String, Integer> deletedMap = taskAssignmentManager.readTaskAssignment();
+ Map<String, String> deletedMap = taskAssignmentManager.readTaskAssignment();
assertTrue(deletedMap.isEmpty());
taskAssignmentManager.stop();
@@ -134,6 +135,8 @@ public class TestTaskAssignmentManager {
}
@Test public void testTaskAssignmentManagerEmptyCoordinatorStream() throws Exception {
+ MockCoordinatorStreamSystemFactory mockCoordinatorStreamSystemFactory =
+ new MockCoordinatorStreamSystemFactory();
MockCoordinatorStreamSystemProducer producer =
mockCoordinatorStreamSystemFactory.getCoordinatorStreamSystemProducer(config, null);
MockCoordinatorStreamSystemConsumer consumer =
@@ -148,8 +151,8 @@ public class TestTaskAssignmentManager {
assertTrue(producer.isStarted());
assertTrue(consumer.isStarted());
- Map<String, Integer> expectedMap = new HashMap<>();
- Map<String, Integer> localMap = taskAssignmentManager.readTaskAssignment();
+ Map<String, String> expectedMap = new HashMap<>();
+ Map<String, String> localMap = taskAssignmentManager.readTaskAssignment();
assertEquals(expectedMap, localMap);
http://git-wip-us.apache.org/repos/asf/samza/blob/a7da1840/samza-core/src/test/java/org/apache/samza/container/mock/ContainerMocks.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/container/mock/ContainerMocks.java b/samza-core/src/test/java/org/apache/samza/container/mock/ContainerMocks.java
index 3b184d3..9369f4b 100644
--- a/samza-core/src/test/java/org/apache/samza/container/mock/ContainerMocks.java
+++ b/samza-core/src/test/java/org/apache/samza/container/mock/ContainerMocks.java
@@ -46,17 +46,17 @@ public class ContainerMocks {
}
j += taskCountPerContainer[i];
- models.add(createContainerModel(i, partitions));
+ models.add(createContainerModel(String.valueOf(i), partitions));
}
return models;
}
- public static Map<String, Integer> generateTaskAssignments(int numContainers, int taskCount) {
- Map<String, Integer> mapping = new HashMap<>(taskCount);
+ public static Map<String, String> generateTaskAssignments(int numContainers, int taskCount) {
+ Map<String, String> mapping = new HashMap<>(taskCount);
Set<ContainerModel> containers = generateContainerModels(numContainers, taskCount);
for (ContainerModel container : containers) {
for (TaskName taskName : container.getTasks().keySet()) {
- mapping.put(taskName.getTaskName(), container.getContainerId());
+ mapping.put(taskName.getTaskName(), container.getProcessorId());
}
}
return mapping;
@@ -73,12 +73,12 @@ public class ContainerMocks {
return newTaskCountPerContainer;
}
- public static ContainerModel createContainerModel(int containerId, int[] partitions) {
+ public static ContainerModel createContainerModel(String containerId, int[] partitions) {
Map<TaskName, TaskModel> tasks = new HashMap<>();
for (int partition : partitions) {
tasks.put(getTaskName(partition), getTaskModel(partition));
}
- return new ContainerModel(containerId, tasks);
+ return new ContainerModel(containerId, -1, tasks);
}
public static Set<TaskModel> generateTaskModels(int[] partitions) {
@@ -117,11 +117,11 @@ public class ContainerMocks {
return values;
}
- public static Map<String, Integer> generateTaskContainerMapping(Set<ContainerModel> containers) {
- Map<String, Integer> taskMapping = new HashMap<>();
+ public static Map<String, String> generateTaskContainerMapping(Set<ContainerModel> containers) {
+ Map<String, String> taskMapping = new HashMap<>();
for (ContainerModel container : containers) {
for (TaskName taskName : container.getTasks().keySet()) {
- taskMapping.put(taskName.getTaskName(), container.getContainerId());
+ taskMapping.put(taskName.getTaskName(), container.getProcessorId());
}
}
return taskMapping;
http://git-wip-us.apache.org/repos/asf/samza/blob/a7da1840/samza-core/src/test/java/org/apache/samza/serializers/model/TestSamzaObjectMapper.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/serializers/model/TestSamzaObjectMapper.java b/samza-core/src/test/java/org/apache/samza/serializers/model/TestSamzaObjectMapper.java
index 2c64598..505acac 100644
--- a/samza-core/src/test/java/org/apache/samza/serializers/model/TestSamzaObjectMapper.java
+++ b/samza-core/src/test/java/org/apache/samza/serializers/model/TestSamzaObjectMapper.java
@@ -19,12 +19,7 @@
package org.apache.samza.serializers.model;
-import static org.junit.Assert.assertEquals;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Set;
-
+import junit.framework.Assert;
import org.apache.samza.Partition;
import org.apache.samza.config.Config;
import org.apache.samza.config.MapConfig;
@@ -34,12 +29,22 @@ import org.apache.samza.job.model.JobModel;
import org.apache.samza.job.model.TaskModel;
import org.apache.samza.system.SystemStreamPartition;
import org.codehaus.jackson.map.ObjectMapper;
+import org.junit.Before;
import org.junit.Test;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+import static org.junit.Assert.assertEquals;
+
public class TestSamzaObjectMapper {
- @Test
- public void testJsonTaskModel() throws Exception {
- ObjectMapper mapper = SamzaObjectMapper.getObjectMapper();
+ private JobModel jobModel;
+
+ @Before
+ public void setup() throws IOException {
Map<String, String> configMap = new HashMap<String, String>();
Set<SystemStreamPartition> ssp = new HashSet<>();
configMap.put("a", "b");
@@ -49,12 +54,42 @@ public class TestSamzaObjectMapper {
TaskModel taskModel = new TaskModel(taskName, ssp, new Partition(2));
Map<TaskName, TaskModel> tasks = new HashMap<TaskName, TaskModel>();
tasks.put(taskName, taskModel);
- ContainerModel containerModel = new ContainerModel(1, tasks);
- Map<Integer, ContainerModel> containerMap = new HashMap<Integer, ContainerModel>();
- containerMap.put(Integer.valueOf(1), containerModel);
- JobModel jobModel = new JobModel(config, containerMap);
+ ContainerModel containerModel = new ContainerModel("1", 1, tasks);
+ Map<String, ContainerModel> containerMap = new HashMap<String, ContainerModel>();
+ containerMap.put("1", containerModel);
+ jobModel = new JobModel(config, containerMap);
+ }
+
+ @Test
+ public void testJsonTaskModel() throws Exception {
+ ObjectMapper mapper = SamzaObjectMapper.getObjectMapper();
+
String str = mapper.writeValueAsString(jobModel);
JobModel obj = mapper.readValue(str, JobModel.class);
assertEquals(jobModel, obj);
}
+
+ /**
+ * Critical test to guarantee compatibility between samza 0.12 container models and 0.13+
+ *
+ * Samza 0.12 contains only "container-id" (integer) in the ContainerModel. "processor-id" (String) is added in 0.13.
+ * When serializing, we serialize both the fields in 0.13. Deserialization correctly handles the fields in 0.13.
+ */
+ @Test
+ public void testContainerModelCompatible() {
+ try {
+ String newJobModelString = "{\"config\":{\"a\":\"b\"},\"containers\":{\"1\":{\"processor-id\":\"1\",\"container-id\":1,\"tasks\":{\"test\":{\"task-name\":\"test\",\"system-stream-partitions\":[{\"system\":\"foo\",\"partition\":1,\"stream\":\"bar\"}],\"changelog-partition\":2}}}},\"max-change-log-stream-partitions\":3,\"all-container-locality\":{\"1\":null}}";
+ ObjectMapper mapper = SamzaObjectMapper.getObjectMapper();
+ JobModel jobModel = mapper.readValue(newJobModelString, JobModel.class);
+
+ String oldJobModelString = "{\"config\":{\"a\":\"b\"},\"containers\":{\"1\":{\"container-id\":1,\"tasks\":{\"test\":{\"task-name\":\"test\",\"system-stream-partitions\":[{\"system\":\"foo\",\"partition\":1,\"stream\":\"bar\"}],\"changelog-partition\":2}}}},\"max-change-log-stream-partitions\":3,\"all-container-locality\":{\"1\":null}}";
+ ObjectMapper mapper1 = SamzaObjectMapper.getObjectMapper();
+ JobModel jobModel1 = mapper1.readValue(oldJobModelString, JobModel.class);
+
+ Assert.assertEquals(jobModel, jobModel1);
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+
}
http://git-wip-us.apache.org/repos/asf/samza/blob/a7da1840/samza-core/src/test/java/org/apache/samza/zk/TestZkBarrierForVersionUpgrade.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/zk/TestZkBarrierForVersionUpgrade.java b/samza-core/src/test/java/org/apache/samza/zk/TestZkBarrierForVersionUpgrade.java
index c0c0e6a..f5da9da 100644
--- a/samza-core/src/test/java/org/apache/samza/zk/TestZkBarrierForVersionUpgrade.java
+++ b/samza-core/src/test/java/org/apache/samza/zk/TestZkBarrierForVersionUpgrade.java
@@ -18,17 +18,13 @@
*/
package org.apache.samza.zk;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
import junit.framework.Assert;
import org.apache.samza.config.Config;
import org.apache.samza.config.MapConfig;
import org.apache.samza.config.ZkConfig;
import org.apache.samza.coordinator.BarrierForVersionUpgrade;
-import org.apache.samza.coordinator.CoordinationUtils;
import org.apache.samza.coordinator.CoordinationServiceFactory;
+import org.apache.samza.coordinator.CoordinationUtils;
import org.apache.samza.testUtils.EmbeddedZookeeper;
import org.junit.After;
import org.junit.AfterClass;
@@ -36,6 +32,11 @@ import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
public class TestZkBarrierForVersionUpgrade {
private static EmbeddedZookeeper zkServer = null;
@@ -190,6 +191,5 @@ public class TestZkBarrierForVersionUpgrade {
}
});
Assert.assertFalse(TestZkUtils.testWithDelayBackOff(() -> s.p1 && s.p2 && s.p3, 2, 400));
-
}
}
http://git-wip-us.apache.org/repos/asf/samza/blob/a7da1840/samza-core/src/test/java/org/apache/samza/zk/TestZkLeaderElector.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/zk/TestZkLeaderElector.java b/samza-core/src/test/java/org/apache/samza/zk/TestZkLeaderElector.java
index fb31054..e4f8944 100644
--- a/samza-core/src/test/java/org/apache/samza/zk/TestZkLeaderElector.java
+++ b/samza-core/src/test/java/org/apache/samza/zk/TestZkLeaderElector.java
@@ -531,7 +531,6 @@ public class TestZkLeaderElector {
private ZkUtils getZkUtilsWithNewClient(String processorId) {
ZkConnection zkConnection = ZkUtils.createZkConnection(testZkConnectionString, SESSION_TIMEOUT_MS);
return new ZkUtils(
- processorId,
KEY_BUILDER,
ZkUtils.createZkClient(zkConnection, CONNECTION_TIMEOUT_MS),
CONNECTION_TIMEOUT_MS);
http://git-wip-us.apache.org/repos/asf/samza/blob/a7da1840/samza-core/src/test/java/org/apache/samza/zk/TestZkUtils.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/zk/TestZkUtils.java b/samza-core/src/test/java/org/apache/samza/zk/TestZkUtils.java
index fba6d0f..749c674 100644
--- a/samza-core/src/test/java/org/apache/samza/zk/TestZkUtils.java
+++ b/samza-core/src/test/java/org/apache/samza/zk/TestZkUtils.java
@@ -18,9 +18,6 @@
*/
package org.apache.samza.zk;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.function.BooleanSupplier;
import org.I0Itec.zkclient.IZkDataListener;
import org.I0Itec.zkclient.ZkClient;
import org.I0Itec.zkclient.ZkConnection;
@@ -37,6 +34,10 @@ import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.function.BooleanSupplier;
+
public class TestZkUtils {
private static EmbeddedZookeeper zkServer = null;
private static final ZkKeyBuilder KEY_BUILDER = new ZkKeyBuilder("test");
@@ -67,7 +68,6 @@ public class TestZkUtils {
}
zkUtils = new ZkUtils(
- "testProcessorId",
KEY_BUILDER,
zkClient,
SESSION_TIMEOUT_MS);
@@ -188,7 +188,7 @@ public class TestZkUtils {
// create job model
Map<String, String> configMap = new HashMap<>();
- Map<Integer, ContainerModel> containers = new HashMap<>();
+ Map<String, ContainerModel> containers = new HashMap<>();
MapConfig config = new MapConfig(configMap);
JobModel jobModel = new JobModel(config, containers);
http://git-wip-us.apache.org/repos/asf/samza/blob/a7da1840/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala b/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala
index bb72b72..010ff7e 100644
--- a/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala
+++ b/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala
@@ -19,7 +19,6 @@
package org.apache.samza.container
-import java.lang.Thread.UncaughtExceptionHandler
import java.util
import java.util.concurrent.TimeUnit
import java.util.concurrent.atomic.AtomicReference
@@ -29,37 +28,24 @@ import org.apache.samza.checkpoint.{Checkpoint, CheckpointManager}
import org.apache.samza.config.{Config, MapConfig}
import org.apache.samza.coordinator.JobModelManager
import org.apache.samza.coordinator.server.{HttpServer, JobServlet}
-import org.apache.samza.job.model.ContainerModel
-import org.apache.samza.job.model.JobModel
-import org.apache.samza.job.model.TaskModel
-import org.apache.samza.serializers._
+import org.apache.samza.job.model.{ContainerModel, JobModel, TaskModel}
+import org.apache.samza.serializers.SerdeManager
import org.apache.samza.storage.TaskStorageManager
-import org.apache.samza.system.IncomingMessageEnvelope
-import org.apache.samza.system.StreamMetadataCache
-import org.apache.samza.system.SystemConsumer
-import org.apache.samza.system.SystemConsumers
-import org.apache.samza.system.SystemProducer
-import org.apache.samza.system.SystemProducers
-import org.apache.samza.system.SystemStream
-import org.apache.samza.system.SystemStreamPartition
import org.apache.samza.system.chooser.RoundRobinChooser
-import org.apache.samza.task.ClosableTask
-import org.apache.samza.task.InitableTask
-import org.apache.samza.task.MessageCollector
-import org.apache.samza.task.StreamTask
-import org.apache.samza.task.TaskContext
-import org.apache.samza.task.TaskCoordinator
-import org.apache.samza.task.TaskInstanceCollector
+import org.apache.samza.system.{IncomingMessageEnvelope, StreamMetadataCache, SystemConsumer, SystemConsumers, SystemProducer, SystemProducers, SystemStream, SystemStreamPartition}
+import org.apache.samza.task.{ClosableTask, InitableTask, MessageCollector, StreamTask, TaskContext, TaskCoordinator, TaskInstanceCollector}
import org.apache.samza.util.SinglePartitionWithoutOffsetsSystemAdmin
import org.junit.Assert._
import org.junit.Test
-import org.mockito.Mockito._
+import org.mockito.Mockito.when
import org.mockito.invocation.InvocationOnMock
import org.mockito.stubbing.Answer
import org.scalatest.junit.AssertionsForJUnit
import org.scalatest.mockito.MockitoSugar
import scala.collection.JavaConverters._
+import org.mockito.Mockito.when
+import scala.collection.JavaConversions._
class TestSamzaContainer extends AssertionsForJUnit with MockitoSugar {
@Test
@@ -71,9 +57,9 @@ class TestSamzaContainer extends AssertionsForJUnit with MockitoSugar {
new TaskName("t1") -> new TaskModel(new TaskName("t1"), offsets.keySet(), new Partition(0)),
new TaskName("t2") -> new TaskModel(new TaskName("t2"), offsets.keySet(), new Partition(0)))
val containers = Map(
- Integer.valueOf(0) -> new ContainerModel(0, tasks.asJava),
- Integer.valueOf(1) -> new ContainerModel(1, tasks.asJava))
- val jobModel = new JobModel(config, containers.asJava)
+ "0" -> new ContainerModel("0", 0, tasks),
+ "1" -> new ContainerModel("1", 0, tasks))
+ val jobModel = new JobModel(config, containers)
def jobModelGenerator(): JobModel = jobModel
val server = new HttpServer
val coordinator = new JobModelManager(jobModel, server)
@@ -96,9 +82,9 @@ class TestSamzaContainer extends AssertionsForJUnit with MockitoSugar {
new TaskName("t1") -> new TaskModel(new TaskName("t1"), offsets.keySet(), new Partition(0)),
new TaskName("t2") -> new TaskModel(new TaskName("t2"), offsets.keySet(), new Partition(0)))
val containers = Map(
- Integer.valueOf(0) -> new ContainerModel(0, tasks.asJava),
- Integer.valueOf(1) -> new ContainerModel(1, tasks.asJava))
- val jobModel = new JobModel(config, containers.asJava)
+ "0" -> new ContainerModel("0", 0, tasks),
+ "1" -> new ContainerModel("1", 1, tasks))
+ val jobModel = new JobModel(config, containers)
def jobModelGenerator(): JobModel = jobModel
val server = new HttpServer
val coordinator = new JobModelManager(jobModel, server)
@@ -126,12 +112,12 @@ class TestSamzaContainer extends AssertionsForJUnit with MockitoSugar {
new TaskName("t3") -> new TaskModel(new TaskName("t3"), offsets.keySet(), new Partition(2)),
new TaskName("t4") -> new TaskModel(new TaskName("t4"), offsets.keySet(), new Partition(3)),
new TaskName("t5") -> new TaskModel(new TaskName("t6"), offsets.keySet(), new Partition(4)))
- val containerModel1 = new ContainerModel(0, tasksForContainer1.asJava)
- val containerModel2 = new ContainerModel(1, tasksForContainer2.asJava)
+ val containerModel1 = new ContainerModel("0", 0, tasksForContainer1)
+ val containerModel2 = new ContainerModel("1", 1, tasksForContainer2)
val containers = Map(
- Integer.valueOf(0) -> containerModel1,
- Integer.valueOf(1) -> containerModel2)
- val jobModel = new JobModel(config, containers.asJava)
+ "0" -> containerModel1,
+ "1" -> containerModel2)
+ val jobModel = new JobModel(config, containers)
assertEquals(jobModel.maxChangeLogStreamPartitions, 5)
}
@@ -179,7 +165,7 @@ class TestSamzaContainer extends AssertionsForJUnit with MockitoSugar {
Map[String, SystemProducer](),
new SerdeManager)
val collector = new TaskInstanceCollector(producerMultiplexer)
- val containerContext = new SamzaContainerContext(0, config, Set(taskName).asJava)
+ val containerContext = new SamzaContainerContext("0", config, Set[TaskName](taskName))
val taskInstance: TaskInstance = new TaskInstance(
task,
taskName,
@@ -238,7 +224,7 @@ class TestSamzaContainer extends AssertionsForJUnit with MockitoSugar {
Map[String, SystemProducer](),
new SerdeManager)
val collector = new TaskInstanceCollector(producerMultiplexer)
- val containerContext = new SamzaContainerContext(0, config, Set(taskName).asJava)
+ val containerContext = new SamzaContainerContext("0", config, Set[TaskName](taskName))
val taskInstance: TaskInstance = new TaskInstance(
task,
taskName,
@@ -287,7 +273,7 @@ class TestSamzaContainer extends AssertionsForJUnit with MockitoSugar {
Map[String, SystemProducer](),
new SerdeManager)
val collector = new TaskInstanceCollector(producerMultiplexer)
- val containerContext = new SamzaContainerContext(0, config, Set(taskName).asJava)
+ val containerContext = new SamzaContainerContext("0", config, Set[TaskName](taskName))
val mockTaskStorageManager = mock[TaskStorageManager]
when(mockTaskStorageManager.init).thenAnswer(new Answer[String] {
http://git-wip-us.apache.org/repos/asf/samza/blob/a7da1840/samza-core/src/test/scala/org/apache/samza/container/TestTaskInstance.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/test/scala/org/apache/samza/container/TestTaskInstance.scala b/samza-core/src/test/scala/org/apache/samza/container/TestTaskInstance.scala
index 086eb85..40974a6 100644
--- a/samza-core/src/test/scala/org/apache/samza/container/TestTaskInstance.scala
+++ b/samza-core/src/test/scala/org/apache/samza/container/TestTaskInstance.scala
@@ -70,7 +70,7 @@ class TestTaskInstance {
val offsetManager = OffsetManager(Map(systemStream -> testSystemStreamMetadata), config)
val taskName = new TaskName("taskName")
val collector = new TaskInstanceCollector(producerMultiplexer)
- val containerContext = new SamzaContainerContext(0, config, Set(taskName).asJava)
+ val containerContext = new SamzaContainerContext("0", config, Set(taskName).asJava)
val taskInstance: TaskInstance = new TaskInstance(
task,
taskName,
@@ -165,7 +165,7 @@ class TestTaskInstance {
val offsetManager = OffsetManager(Map(systemStream -> testSystemStreamMetadata), config)
val taskName = new TaskName("taskName")
val collector = new TaskInstanceCollector(producerMultiplexer)
- val containerContext = new SamzaContainerContext(0, config, Set(taskName).asJava)
+ val containerContext = new SamzaContainerContext("0", config, Set(taskName).asJava)
val registry = new MetricsRegistryMap
val taskMetrics = new TaskInstanceMetrics(registry = registry)
@@ -222,7 +222,7 @@ class TestTaskInstance {
val offsetManager = OffsetManager(Map(systemStream -> testSystemStreamMetadata), config)
val taskName = new TaskName("taskName")
val collector = new TaskInstanceCollector(producerMultiplexer)
- val containerContext = new SamzaContainerContext(0, config, Set(taskName).asJava)
+ val containerContext = new SamzaContainerContext("0", config, Set(taskName).asJava)
val registry = new MetricsRegistryMap
val taskMetrics = new TaskInstanceMetrics(registry = registry)
@@ -281,7 +281,7 @@ class TestTaskInstance {
val metrics = new TaskInstanceMetrics()
val taskName = new TaskName("Offset Reset Task 0")
val collector = new TaskInstanceCollector(producers)
- val containerContext = new SamzaContainerContext(0, config, Set(taskName).asJava)
+ val containerContext = new SamzaContainerContext("0", config, Set(taskName).asJava)
val offsetManager = new OffsetManager()
@@ -316,7 +316,7 @@ class TestTaskInstance {
val metrics = new TaskInstanceMetrics()
val taskName = new TaskName("testing")
val collector = new TaskInstanceCollector(producers)
- val containerContext = new SamzaContainerContext(0, config, Set(taskName).asJava)
+ val containerContext = new SamzaContainerContext("0", config, Set(taskName).asJava)
val offsetManager = new OffsetManager()
offsetManager.startingOffsets += taskName -> Map(partition0 -> "0", partition1 -> "100")
val systemAdmins = Map("system" -> new MockSystemAdmin)
http://git-wip-us.apache.org/repos/asf/samza/blob/a7da1840/samza-core/src/test/scala/org/apache/samza/coordinator/TestJobCoordinator.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/test/scala/org/apache/samza/coordinator/TestJobCoordinator.scala b/samza-core/src/test/scala/org/apache/samza/coordinator/TestJobCoordinator.scala
index e7397bb..0b6dd8b 100644
--- a/samza-core/src/test/scala/org/apache/samza/coordinator/TestJobCoordinator.scala
+++ b/samza-core/src/test/scala/org/apache/samza/coordinator/TestJobCoordinator.scala
@@ -79,8 +79,8 @@ class TestJobCoordinator extends FlatSpec with PrivateMethodTester {
val container1Tasks = Map(
task1Name -> new TaskModel(task1Name, checkpoint1.keySet.asJava, new Partition(3)))
val containers = Map(
- Integer.valueOf(0) -> new ContainerModel(0, container0Tasks.asJava),
- Integer.valueOf(1) -> new ContainerModel(1, container1Tasks.asJava))
+ "0" -> new ContainerModel("0", 0, container0Tasks.asJava),
+ "1" -> new ContainerModel("1", 1, container1Tasks.asJava))
// The test does not pass offsets for task2 (Partition 2) to the checkpointmanager, this will verify that we get an offset 0 for this partition
@@ -151,9 +151,8 @@ class TestJobCoordinator extends FlatSpec with PrivateMethodTester {
val container1Tasks = Map(
task1Name -> new TaskModel(task1Name, ssp1.asJava, new Partition(3)))
val containers = Map(
- Integer.valueOf(0) -> new ContainerModel(0, container0Tasks.asJava),
- Integer.valueOf(1) -> new ContainerModel(1, container1Tasks.asJava))
-
+ Integer.valueOf(0) -> new ContainerModel("0", 0, container0Tasks.asJava),
+ Integer.valueOf(1) -> new ContainerModel("1", 1, container1Tasks.asJava))
val changelogInfo0 = MockCoordinatorStreamWrappedConsumer.CHANGELOGPREFIX + "mock:" + task0Name.getTaskName() -> "4"
// Configs which are processed by the MockCoordinatorStream as special configs which are interpreted as
@@ -211,7 +210,7 @@ class TestJobCoordinator extends FlatSpec with PrivateMethodTester {
val container0Tasks = Map(
task1Name -> new TaskModel(task1Name, Set(new SystemStreamPartition("test", "stream1", new Partition(1))).asJava, new Partition(0)))
val containers = Map(
- Integer.valueOf(0) -> new ContainerModel(0, container0Tasks.asJava))
+ "0" -> new ContainerModel("0", 0, container0Tasks.asJava))
val jobModel = new JobModel(config, containers.asJava)
assertEquals(config, coordinator.jobModel.getConfig)
assertEquals(jobModel, coordinator.jobModel)
@@ -233,7 +232,7 @@ class TestJobCoordinator extends FlatSpec with PrivateMethodTester {
task1Name -> new TaskModel(task1Name, Set(new SystemStreamPartition("test", "stream1", new Partition(1))).asJava, new Partition(0)))
val containers = Map(
- Integer.valueOf(0) -> new ContainerModel(0, container0Tasks.asJava))
+ "0" -> new ContainerModel("0", 0, container0Tasks.asJava))
val jobModel = new JobModel(config, containers.asJava)
assertEquals(config, coordinator.jobModel.getConfig)
assertEquals(jobModel, coordinator.jobModel)
http://git-wip-us.apache.org/repos/asf/samza/blob/a7da1840/samza-core/src/test/scala/org/apache/samza/job/TestShellCommandBuilder.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/test/scala/org/apache/samza/job/TestShellCommandBuilder.scala b/samza-core/src/test/scala/org/apache/samza/job/TestShellCommandBuilder.scala
index 47e1b0a..4df53fd 100644
--- a/samza-core/src/test/scala/org/apache/samza/job/TestShellCommandBuilder.scala
+++ b/samza-core/src/test/scala/org/apache/samza/job/TestShellCommandBuilder.scala
@@ -33,7 +33,7 @@ class TestShellCommandBuilder {
val config = new MapConfig(Map(ShellCommandConfig.COMMAND_SHELL_EXECUTE -> "foo").asJava)
val scb = new ShellCommandBuilder
scb.setConfig(config)
- scb.setId(1)
+ scb.setId("1")
scb.setUrl(new URL(urlStr))
val command = scb.buildCommand
val environment = scb.buildEnvironment
@@ -49,7 +49,7 @@ class TestShellCommandBuilder {
val config = new MapConfig(Map(ShellCommandConfig.COMMAND_SHELL_EXECUTE -> "foo").asJava)
val scb = new ShellCommandBuilder
scb.setConfig(config)
- scb.setId(1)
+ scb.setId("1")
scb.setUrl(new URL(urlStr))
val command = scb.buildCommand
assertEquals("foo", command)
http://git-wip-us.apache.org/repos/asf/samza/blob/a7da1840/samza-kv-rocksdb/src/main/java/org/apache/samza/storage/kv/RocksDbKeyValueReader.java
----------------------------------------------------------------------
diff --git a/samza-kv-rocksdb/src/main/java/org/apache/samza/storage/kv/RocksDbKeyValueReader.java b/samza-kv-rocksdb/src/main/java/org/apache/samza/storage/kv/RocksDbKeyValueReader.java
index f570422..8e8cc31 100644
--- a/samza-kv-rocksdb/src/main/java/org/apache/samza/storage/kv/RocksDbKeyValueReader.java
+++ b/samza-kv-rocksdb/src/main/java/org/apache/samza/storage/kv/RocksDbKeyValueReader.java
@@ -66,7 +66,7 @@ public class RocksDbKeyValueReader {
ArrayList<TaskName> taskNameList = new ArrayList<TaskName>();
taskNameList.add(new TaskName("read-rocks-db"));
SamzaContainerContext samzaContainerContext =
- new SamzaContainerContext(0, config, taskNameList);
+ new SamzaContainerContext("0", config, taskNameList);
Options options = RocksDbOptionsHelper.options(config, samzaContainerContext);
// open the db
http://git-wip-us.apache.org/repos/asf/samza/blob/a7da1840/samza-rest/src/main/java/org/apache/samza/rest/model/Task.java
----------------------------------------------------------------------
diff --git a/samza-rest/src/main/java/org/apache/samza/rest/model/Task.java b/samza-rest/src/main/java/org/apache/samza/rest/model/Task.java
index f1225ec..94e8370 100644
--- a/samza-rest/src/main/java/org/apache/samza/rest/model/Task.java
+++ b/samza-rest/src/main/java/org/apache/samza/rest/model/Task.java
@@ -36,7 +36,7 @@ public class Task {
private String taskName;
// containerId of the samza container in which the task is running
- private int containerId;
+ private String containerId;
// list of partitions that belong to the task.
private List<Partition> partitions;
@@ -49,7 +49,7 @@ public class Task {
public Task(@JsonProperty("preferredHost") String preferredHost,
@JsonProperty("taskName") String taskName,
- @JsonProperty("containerId") int containerId,
+ @JsonProperty("containerId") String containerId,
@JsonProperty("partitions") List<Partition> partitions,
@JsonProperty("storeNames") List<String> storeNames) {
this.preferredHost = preferredHost;
@@ -67,11 +67,11 @@ public class Task {
this.preferredHost = preferredHost;
}
- public int getContainerId() {
+ public String getContainerId() {
return containerId;
}
- public void setContainerId(int containerId) {
+ public void setContainerId(String containerId) {
this.containerId = containerId;
}
@@ -110,7 +110,7 @@ public class Task {
Task task = (Task) o;
- if (containerId != task.containerId) {
+ if (containerId != null && containerId.equals(task.containerId)) {
return false;
}
if (!preferredHost.equals(task.preferredHost)) {
@@ -129,7 +129,7 @@ public class Task {
public int hashCode() {
int result = preferredHost.hashCode();
result = 31 * result + taskName.hashCode();
- result = 31 * result + containerId;
+ result = 31 * result + containerId.hashCode();
result = 31 * result + partitions.hashCode();
result = 31 * result + storeNames.hashCode();
return result;
http://git-wip-us.apache.org/repos/asf/samza/blob/a7da1840/samza-rest/src/main/java/org/apache/samza/rest/proxy/task/SamzaTaskProxy.java
----------------------------------------------------------------------
diff --git a/samza-rest/src/main/java/org/apache/samza/rest/proxy/task/SamzaTaskProxy.java b/samza-rest/src/main/java/org/apache/samza/rest/proxy/task/SamzaTaskProxy.java
index 27c88e5..c40c168 100644
--- a/samza-rest/src/main/java/org/apache/samza/rest/proxy/task/SamzaTaskProxy.java
+++ b/samza-rest/src/main/java/org/apache/samza/rest/proxy/task/SamzaTaskProxy.java
@@ -91,10 +91,10 @@ public class SamzaTaskProxy implements TaskProxy {
StorageConfig storageConfig = new StorageConfig(jobModel.getConfig());
List<String> storeNames = JavaConverters.seqAsJavaListConverter(storageConfig.getStoreNames()).asJava();
- Map<Integer, String> containerLocality = jobModel.getAllContainerLocality();
+ Map<String, String> containerLocality = jobModel.getAllContainerLocality();
List<Task> tasks = new ArrayList<>();
for (ContainerModel containerModel : jobModel.getContainers().values()) {
- int containerId = containerModel.getContainerId();
+ String containerId = containerModel.getProcessorId();
String host = containerLocality.get(containerId);
for (TaskModel taskModel : containerModel.getTasks().values()) {
String taskName = taskModel.getTaskName().getTaskName();
http://git-wip-us.apache.org/repos/asf/samza/blob/a7da1840/samza-rest/src/test/java/org/apache/samza/monitor/TestLocalStoreMonitor.java
----------------------------------------------------------------------
diff --git a/samza-rest/src/test/java/org/apache/samza/monitor/TestLocalStoreMonitor.java b/samza-rest/src/test/java/org/apache/samza/monitor/TestLocalStoreMonitor.java
index 02ec321..d0b6962 100644
--- a/samza-rest/src/test/java/org/apache/samza/monitor/TestLocalStoreMonitor.java
+++ b/samza-rest/src/test/java/org/apache/samza/monitor/TestLocalStoreMonitor.java
@@ -64,7 +64,7 @@ public class TestLocalStoreMonitor {
// Set default return values for methods.
Mockito.when(jobsClientMock.getJobStatus(Mockito.any()))
.thenReturn(JobStatus.STOPPED);
- Task task = new Task("localHost", "test-task", 0,
+ Task task = new Task("localHost", "test-task", "0",
new ArrayList<>(), ImmutableList.of("test-store"));
Mockito.when(jobsClientMock.getTasks(Mockito.any()))
.thenReturn(ImmutableList.of(task));
@@ -136,7 +136,7 @@ public class TestLocalStoreMonitor {
// TODO: Fix in SAMZA-1183
//@Test
public void shouldDeleteTaskStoreWhenTaskPreferredStoreIsNotLocalHost() throws Exception {
- Task task = new Task("notLocalHost", "test-task", 0,
+ Task task = new Task("notLocalHost", "test-task", "0",
new ArrayList<>(), ImmutableList.of("test-store"));
Mockito.when(jobsClientMock.getTasks(Mockito.any()))
.thenReturn(ImmutableList.of(task));
http://git-wip-us.apache.org/repos/asf/samza/blob/a7da1840/samza-rest/src/test/java/org/apache/samza/rest/resources/mock/MockTaskProxy.java
----------------------------------------------------------------------
diff --git a/samza-rest/src/test/java/org/apache/samza/rest/resources/mock/MockTaskProxy.java b/samza-rest/src/test/java/org/apache/samza/rest/resources/mock/MockTaskProxy.java
index 45f252a..de741ba 100644
--- a/samza-rest/src/test/java/org/apache/samza/rest/resources/mock/MockTaskProxy.java
+++ b/samza-rest/src/test/java/org/apache/samza/rest/resources/mock/MockTaskProxy.java
@@ -41,11 +41,11 @@ public class MockTaskProxy extends SamzaTaskProxy {
new SystemStreamPartition(SYSTEM_NAME, STREAM_NAME, new Partition(PARTITION_ID)));
public static final String TASK_1_NAME = "Task1";
- public static final int TASK_1_CONTAINER_ID = 1;
+ public static final String TASK_1_CONTAINER_ID = "1";
public static final Partition CHANGE_LOG_PARTITION = new Partition(0);
public static final String TASK_2_NAME = "Task2";
- public static final int TASK_2_CONTAINER_ID = 2;
+ public static final String TASK_2_CONTAINER_ID = "2";
public MockTaskProxy() {
super(new TaskResourceConfig(new MapConfig()),
@@ -60,10 +60,10 @@ public class MockTaskProxy extends SamzaTaskProxy {
}
TaskModel task1Model = new TaskModel(new TaskName(TASK_1_NAME), SYSTEM_STREAM_PARTITIONS, CHANGE_LOG_PARTITION);
TaskModel task2Model = new TaskModel(new TaskName(TASK_2_NAME), SYSTEM_STREAM_PARTITIONS, CHANGE_LOG_PARTITION);
- ContainerModel task1ContainerModel = new ContainerModel(TASK_1_CONTAINER_ID,
+ ContainerModel task1ContainerModel = new ContainerModel(TASK_1_CONTAINER_ID, 1,
ImmutableMap.of(new TaskName(TASK_1_NAME),
task1Model));
- ContainerModel task2ContainerModel = new ContainerModel(TASK_2_CONTAINER_ID,
+ ContainerModel task2ContainerModel = new ContainerModel(TASK_2_CONTAINER_ID, 2,
ImmutableMap.of(new TaskName(TASK_2_NAME),
task2Model));
return new JobModel(new MapConfig(), ImmutableMap.of(TASK_1_CONTAINER_ID, task1ContainerModel,
http://git-wip-us.apache.org/repos/asf/samza/blob/a7da1840/samza-test/src/main/scala/org/apache/samza/test/performance/TestKeyValuePerformance.scala
----------------------------------------------------------------------
diff --git a/samza-test/src/main/scala/org/apache/samza/test/performance/TestKeyValuePerformance.scala b/samza-test/src/main/scala/org/apache/samza/test/performance/TestKeyValuePerformance.scala
index 5d1b497..8e853b7 100644
--- a/samza-test/src/main/scala/org/apache/samza/test/performance/TestKeyValuePerformance.scala
+++ b/samza-test/src/main/scala/org/apache/samza/test/performance/TestKeyValuePerformance.scala
@@ -116,7 +116,7 @@ object TestKeyValuePerformance extends Logging {
new TaskInstanceCollector(producerMultiplexer),
new MetricsRegistryMap,
null,
- new SamzaContainerContext(0, config, taskNames)
+ new SamzaContainerContext("0", config, taskNames)
)
val db = if(!engine.isInstanceOf[KeyValueStorageEngine[_,_]]) {
http://git-wip-us.apache.org/repos/asf/samza/blob/a7da1840/samza-test/src/test/java/org/apache/samza/test/processor/TestStreamProcessor.java
----------------------------------------------------------------------
diff --git a/samza-test/src/test/java/org/apache/samza/test/processor/TestStreamProcessor.java b/samza-test/src/test/java/org/apache/samza/test/processor/TestStreamProcessor.java
index 2c44aea..070e7a7 100644
--- a/samza-test/src/test/java/org/apache/samza/test/processor/TestStreamProcessor.java
+++ b/samza-test/src/test/java/org/apache/samza/test/processor/TestStreamProcessor.java
@@ -65,11 +65,11 @@ public class TestStreamProcessor extends StandaloneIntegrationTestHarness {
final String outputTopic = "output";
final int messageCount = 20;
- final Config configs = new MapConfig(createConfigs(testSystem, inputTopic, outputTopic, messageCount));
+ final Config configs = new MapConfig(createConfigs("1", testSystem, inputTopic, outputTopic, messageCount));
// Note: createTopics needs to be called before creating a StreamProcessor. Otherwise it fails with a
// TopicExistsException since StreamProcessor auto-creates them.
createTopics(inputTopic, outputTopic);
- final StreamProcessor processor = new StreamProcessor(1, new MapConfig(configs), new HashMap<>(), IdentityStreamTask::new);
+ final StreamProcessor processor = new StreamProcessor(new MapConfig(configs), new HashMap<>(), IdentityStreamTask::new);
produceMessages(inputTopic, messageCount);
run(processor, endLatch);
@@ -86,10 +86,10 @@ public class TestStreamProcessor extends StandaloneIntegrationTestHarness {
final String outputTopic = "output2";
final int messageCount = 20;
- final Config configs = new MapConfig(createConfigs(testSystem, inputTopic, outputTopic, messageCount));
+ final Config configs = new MapConfig(createConfigs("1", testSystem, inputTopic, outputTopic, messageCount));
createTopics(inputTopic, outputTopic);
final StreamTaskFactory stf = IdentityStreamTask::new;
- final StreamProcessor processor = new StreamProcessor(1, configs, new HashMap<>(), stf);
+ final StreamProcessor processor = new StreamProcessor(configs, new HashMap<>(), stf);
produceMessages(inputTopic, messageCount);
run(processor, endLatch);
@@ -106,11 +106,11 @@ public class TestStreamProcessor extends StandaloneIntegrationTestHarness {
final String outputTopic = "output3";
final int messageCount = 20;
- final Config configs = new MapConfig(createConfigs(testSystem, inputTopic, outputTopic, messageCount));
+ final Config configs = new MapConfig(createConfigs("1", testSystem, inputTopic, outputTopic, messageCount));
final ExecutorService executorService = Executors.newSingleThreadExecutor();
createTopics(inputTopic, outputTopic);
final AsyncStreamTaskFactory stf = () -> new AsyncStreamTaskAdapter(new IdentityStreamTask(), executorService);
- final StreamProcessor processor = new StreamProcessor(1, configs, new HashMap<>(), stf);
+ final StreamProcessor processor = new StreamProcessor(configs, new HashMap<>(), stf);
produceMessages(inputTopic, messageCount);
run(processor, endLatch);
@@ -128,11 +128,11 @@ public class TestStreamProcessor extends StandaloneIntegrationTestHarness {
final String outputTopic = "output4";
final int messageCount = 20;
- final Map<String, String> configMap = createConfigs(testSystem, inputTopic, outputTopic, messageCount);
+ final Map<String, String> configMap = createConfigs("1", testSystem, inputTopic, outputTopic, messageCount);
configMap.remove("task.class");
final Config configs = new MapConfig(configMap);
- StreamProcessor processor = new StreamProcessor(1, configs, new HashMap<>(), (StreamTaskFactory) null);
+ StreamProcessor processor = new StreamProcessor(configs, new HashMap<>(), (StreamTaskFactory) null);
run(processor, endLatch);
}
@@ -141,7 +141,7 @@ public class TestStreamProcessor extends StandaloneIntegrationTestHarness {
TestUtils.createTopic(zkUtils(), outputTopic, 1, 1, servers(), new Properties());
}
- private Map<String, String> createConfigs(String testSystem, String inputTopic, String outputTopic, int messageCount) {
+ private Map<String, String> createConfigs(String processorId, String testSystem, String inputTopic, String outputTopic, int messageCount) {
Map<String, String> configs = new HashMap<>();
configs.putAll(
StandaloneTestUtils.getStandaloneConfigs("test-job", "org.apache.samza.test.processor.IdentityStreamTask"));
@@ -152,6 +152,7 @@ public class TestStreamProcessor extends StandaloneIntegrationTestHarness {
configs.put("app.outputTopic", outputTopic);
configs.put("app.outputSystem", testSystem);
configs.put(ZkConfig.ZK_CONNECT, zkConnect());
+ configs.put("processor.id", processorId);
return configs;
}
http://git-wip-us.apache.org/repos/asf/samza/blob/a7da1840/samza-test/src/test/scala/org/apache/samza/test/integration/StreamTaskTestUtil.scala
----------------------------------------------------------------------
diff --git a/samza-test/src/test/scala/org/apache/samza/test/integration/StreamTaskTestUtil.scala b/samza-test/src/test/scala/org/apache/samza/test/integration/StreamTaskTestUtil.scala
index 7a107f6..cda2690 100644
--- a/samza-test/src/test/scala/org/apache/samza/test/integration/StreamTaskTestUtil.scala
+++ b/samza-test/src/test/scala/org/apache/samza/test/integration/StreamTaskTestUtil.scala
@@ -35,7 +35,7 @@ import org.apache.samza.Partition
import org.apache.samza.checkpoint.Checkpoint
import org.apache.kafka.common.protocol.SecurityProtocol
import org.apache.kafka.common.security.JaasUtils
-import org.apache.samza.config.{Config, KafkaProducerConfig, MapConfig}
+import org.apache.samza.config.{ApplicationConfig, Config, KafkaProducerConfig, MapConfig}
import org.apache.samza.container.TaskName
import org.apache.samza.job.local.ThreadJobFactory
import org.apache.samza.job.{ApplicationStatus, JobRunner, StreamJob}
@@ -81,6 +81,7 @@ object StreamTaskTestUtil {
var jobConfig = Map(
"job.factory.class" -> classOf[ThreadJobFactory].getCanonicalName,
"job.coordinator.system" -> "kafka",
+ ApplicationConfig.PROCESSOR_ID -> "1",
"task.inputs" -> "kafka.input",
"serializers.registry.string.class" -> "org.apache.samza.serializers.StringSerdeFactory",
"systems.kafka.samza.factory" -> "org.apache.samza.system.kafka.KafkaSystemFactory",
http://git-wip-us.apache.org/repos/asf/samza/blob/a7da1840/samza-yarn/src/main/java/org/apache/samza/job/yarn/YarnAppState.java
----------------------------------------------------------------------
diff --git a/samza-yarn/src/main/java/org/apache/samza/job/yarn/YarnAppState.java b/samza-yarn/src/main/java/org/apache/samza/job/yarn/YarnAppState.java
index 7e563f1..db67de6 100644
--- a/samza-yarn/src/main/java/org/apache/samza/job/yarn/YarnAppState.java
+++ b/samza-yarn/src/main/java/org/apache/samza/job/yarn/YarnAppState.java
@@ -49,7 +49,7 @@ public class YarnAppState {
* Modified by both the AMRMCallbackThread and the ContainerAllocator thread
*/
- public Map<Integer, YarnContainer> runningYarnContainers = new ConcurrentHashMap<Integer, YarnContainer>() ;
+ public Map<String, YarnContainer> runningYarnContainers = new ConcurrentHashMap<String, YarnContainer>() ;
public ConcurrentMap<String, ContainerStatus> failedContainersStatus = new ConcurrentHashMap<String, ContainerStatus>();
http://git-wip-us.apache.org/repos/asf/samza/blob/a7da1840/samza-yarn/src/main/java/org/apache/samza/job/yarn/YarnClusterResourceManager.java
----------------------------------------------------------------------
diff --git a/samza-yarn/src/main/java/org/apache/samza/job/yarn/YarnClusterResourceManager.java b/samza-yarn/src/main/java/org/apache/samza/job/yarn/YarnClusterResourceManager.java
index ae171c7..96a4488 100644
--- a/samza-yarn/src/main/java/org/apache/samza/job/yarn/YarnClusterResourceManager.java
+++ b/samza-yarn/src/main/java/org/apache/samza/job/yarn/YarnClusterResourceManager.java
@@ -63,7 +63,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
public class YarnClusterResourceManager extends ClusterResourceManager implements AMRMClientAsync.CallbackHandler {
- private final int INVALID_YARN_CONTAINER_ID = -1;
+ private final String INVALID_YARN_CONTAINER_ID = "-1";
/**
* The containerProcessManager instance to request resources from yarn.
@@ -264,8 +264,7 @@ public class YarnClusterResourceManager extends ClusterResourceManager implement
@Override
public void launchStreamProcessor(SamzaResource resource, CommandBuilder builder) throws SamzaContainerLaunchException {
String containerIDStr = builder.buildEnvironment().get(ShellCommandConfig.ENV_CONTAINER_ID());
- int containerID = Integer.parseInt(containerIDStr);
- log.info("Received launch request for {} on hostname {}", containerID , resource.getHost());
+ log.info("Received launch request for {} on hostname {}", containerIDStr , resource.getHost());
synchronized (lock) {
Container container = allocatedResources.get(resource);
@@ -274,8 +273,8 @@ public class YarnClusterResourceManager extends ClusterResourceManager implement
return;
}
- state.runningYarnContainers.put(containerID, new YarnContainer(container));
- yarnContainerRunner.runContainer(containerID, container, builder);
+ state.runningYarnContainers.put(containerIDStr, new YarnContainer(container));
+ yarnContainerRunner.runContainer(containerIDStr, container, builder);
}
}
@@ -290,10 +289,10 @@ public class YarnClusterResourceManager extends ClusterResourceManager implement
//In that case, this scan will turn into a lookup. This change will require changes/testing in the UI files because
//those UI stub templates operate on the YarnContainer object.
- private int getIDForContainer(String lookupContainerId) {
- int samzaContainerID = INVALID_YARN_CONTAINER_ID;
- for(Map.Entry<Integer, YarnContainer> entry : state.runningYarnContainers.entrySet()) {
- Integer key = entry.getKey();
+ private String getIDForContainer(String lookupContainerId) {
+ String samzaContainerID = INVALID_YARN_CONTAINER_ID;
+ for(Map.Entry<String, YarnContainer> entry : state.runningYarnContainers.entrySet()) {
+ String key = entry.getKey();
YarnContainer yarnContainer = entry.getValue();
String yarnContainerId = yarnContainer.id().toString();
if(yarnContainerId.equals(lookupContainerId)) {
@@ -319,7 +318,7 @@ public class YarnClusterResourceManager extends ClusterResourceManager implement
synchronized (lock) {
AMRMClient.ContainerRequest containerRequest = requestsMap.get(request);
if (containerRequest == null) {
- log.info("Cancellation of {} already done. ", containerRequest);
+ log.info("Cancellation of {} already done. ", request);
return;
}
requestsMap.remove(request);
@@ -386,12 +385,12 @@ public class YarnClusterResourceManager extends ClusterResourceManager implement
SamzaResourceStatus samzaResrcStatus = new SamzaResourceStatus(status.getContainerId().toString(), status.getDiagnostics(), status.getExitStatus());
samzaResrcStatuses.add(samzaResrcStatus);
- int completedContainerID = getIDForContainer(status.getContainerId().toString());
+ String completedContainerID = getIDForContainer(status.getContainerId().toString());
log.info("Completed container had ID: {}", completedContainerID);
//remove the container from the list of running containers, if failed with a non-zero exit code, add it to the list of
//failed containers.
- if(completedContainerID != INVALID_YARN_CONTAINER_ID){
+ if(!completedContainerID.equals(INVALID_YARN_CONTAINER_ID)){
if(state.runningYarnContainers.containsKey(completedContainerID)) {
log.info("Removing container ID {} from completed containers", completedContainerID);
state.runningYarnContainers.remove(completedContainerID);
http://git-wip-us.apache.org/repos/asf/samza/blob/a7da1840/samza-yarn/src/main/java/org/apache/samza/job/yarn/YarnContainerRunner.java
----------------------------------------------------------------------
diff --git a/samza-yarn/src/main/java/org/apache/samza/job/yarn/YarnContainerRunner.java b/samza-yarn/src/main/java/org/apache/samza/job/yarn/YarnContainerRunner.java
index c45fc7f..84ded62 100644
--- a/samza-yarn/src/main/java/org/apache/samza/job/yarn/YarnContainerRunner.java
+++ b/samza-yarn/src/main/java/org/apache/samza/job/yarn/YarnContainerRunner.java
@@ -89,7 +89,7 @@ public class YarnContainerRunner {
* @throws SamzaContainerLaunchException when there's an exception in submitting the request to the RM.
*
*/
- public void runContainer(int samzaContainerId, Container container, CommandBuilder cmdBuilder) throws SamzaContainerLaunchException {
+ public void runContainer(String samzaContainerId, Container container, CommandBuilder cmdBuilder) throws SamzaContainerLaunchException {
String containerIdStr = ConverterUtils.toString(container.getId());
log.info("Got available container ID ({}) for container: {}", samzaContainerId, container);
@@ -229,7 +229,7 @@ public class YarnContainerRunner {
* @param samzaContainerId the Samza container Id for logging purposes.
* @param env the Map of environment variables to their respective values.
*/
- private void printContainerEnvironmentVariables(int samzaContainerId, Map<String, String> env) {
+ private void printContainerEnvironmentVariables(String samzaContainerId, Map<String, String> env) {
StringBuilder sb = new StringBuilder();
for (Map.Entry<String, String> entry : env.entrySet()) {
sb.append(String.format("\n%s=%s", entry.getKey(), entry.getValue()));
http://git-wip-us.apache.org/repos/asf/samza/blob/a7da1840/samza-yarn/src/main/java/org/apache/samza/validation/YarnJobValidationTool.java
----------------------------------------------------------------------
diff --git a/samza-yarn/src/main/java/org/apache/samza/validation/YarnJobValidationTool.java b/samza-yarn/src/main/java/org/apache/samza/validation/YarnJobValidationTool.java
index 313de94..c1b1302 100644
--- a/samza-yarn/src/main/java/org/apache/samza/validation/YarnJobValidationTool.java
+++ b/samza-yarn/src/main/java/org/apache/samza/validation/YarnJobValidationTool.java
@@ -151,9 +151,9 @@ public class YarnJobValidationTool {
public void validateJmxMetrics() throws Exception {
JobModelManager jobModelManager = JobModelManager.apply(config);
validator.init(config);
- Map<Integer, String> jmxUrls = jobModelManager.jobModel().getAllContainerToHostValues(SetContainerHostMapping.JMX_TUNNELING_URL_KEY);
- for (Map.Entry<Integer, String> entry : jmxUrls.entrySet()) {
- Integer containerId = entry.getKey();
+ Map<String, String> jmxUrls = jobModelManager.jobModel().getAllContainerToHostValues(SetContainerHostMapping.JMX_TUNNELING_URL_KEY);
+ for (Map.Entry<String, String> entry : jmxUrls.entrySet()) {
+ String containerId = entry.getKey();
String jmxUrl = entry.getValue();
log.info("validate container " + containerId + " metrics with JMX: " + jmxUrl);
JmxMetricsAccessor jmxMetrics = new JmxMetricsAccessor(jmxUrl);
[2/3] samza git commit: SAMZA-1126 - Semantics of processorId in Samza
Posted by na...@apache.org.
http://git-wip-us.apache.org/repos/asf/samza/blob/a7da1840/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java b/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java
index b88753f..e3e21e9 100644
--- a/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java
+++ b/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java
@@ -19,6 +19,7 @@
package org.apache.samza.zk;
import org.apache.samza.SamzaException;
+import org.apache.samza.config.ApplicationConfig;
import org.apache.samza.config.Config;
import org.apache.samza.config.JavaSystemConfig;
import org.apache.samza.config.JobCoordinatorConfig;
@@ -28,9 +29,11 @@ import org.apache.samza.coordinator.JobCoordinator;
import org.apache.samza.coordinator.JobModelManager;
import org.apache.samza.job.model.JobModel;
import org.apache.samza.processor.SamzaContainerController;
+import org.apache.samza.runtime.ProcessorIdGenerator;
import org.apache.samza.system.StreamMetadataCache;
import org.apache.samza.system.SystemAdmin;
import org.apache.samza.system.SystemFactory;
+import org.apache.samza.util.ClassLoaderHelper;
import org.apache.samza.util.SystemClock;
import org.apache.samza.util.Util;
import org.slf4j.Logger;
@@ -51,7 +54,8 @@ public class ZkJobCoordinator implements JobCoordinator, ZkControllerListener {
private static final String JOB_MODEL_VERSION_BARRIER = "JobModelVersion";
private final ZkUtils zkUtils;
- private final int processorId;
+ private final String processorId;
+
private final ZkController zkController;
private final SamzaContainerController containerController;
private final ScheduleAfterDebounceTime debounceTimer;
@@ -64,14 +68,22 @@ public class ZkJobCoordinator implements JobCoordinator, ZkControllerListener {
private String newJobModelVersion; // version published in ZK (by the leader)
private JobModel jobModel;
- public ZkJobCoordinator(int processorId, String groupId, Config config, ScheduleAfterDebounceTime debounceTimer, ZkUtils zkUtils,
- SamzaContainerController containerController) {
+ public ZkJobCoordinator(String groupId, Config config, ScheduleAfterDebounceTime debounceTimer, ZkUtils zkUtils,
+ SamzaContainerController containerController) {
this.zkUtils = zkUtils;
this.keyBuilder = zkUtils.getKeyBuilder();
this.debounceTimer = debounceTimer;
- this.processorId = processorId;
+ ApplicationConfig appConfig = new ApplicationConfig(config);
+ if (appConfig.getProcessorId() != null) { // TODO: This check to be removed after 0.13+
+ this.processorId = appConfig.getProcessorId();
+ } else {
+ ProcessorIdGenerator idGenerator =
+ ClassLoaderHelper.fromClassName(
+ new ApplicationConfig(config).getAppProcessorIdGeneratorClass(), ProcessorIdGenerator.class);
+ this.processorId = idGenerator.generateProcessorId(config);
+ }
this.containerController = containerController;
- this.zkController = new ZkControllerImpl(String.valueOf(processorId), zkUtils, debounceTimer, this);
+ this.zkController = new ZkControllerImpl(processorId, zkUtils, debounceTimer, this);
this.config = config;
this.coordinationUtils = Util.
<CoordinationServiceFactory>getObj(
@@ -90,7 +102,7 @@ public class ZkJobCoordinator implements JobCoordinator, ZkControllerListener {
String systemFactoryClassName = systemConfig.getSystemFactory(systemName);
if (systemFactoryClassName == null) {
String msg = String.format("A stream uses system %s, which is missing from the configuration.", systemName);
- log.error(String.format(msg));
+ log.error(msg);
throw new SamzaException(msg);
}
SystemFactory systemFactory = Util.getObj(systemFactoryClassName);
@@ -105,10 +117,6 @@ public class ZkJobCoordinator implements JobCoordinator, ZkControllerListener {
zkController.register();
}
- public void cleanupZk() {
- zkUtils.deleteRoot();
- }
-
@Override
public void stop() {
zkController.stop();
@@ -123,7 +131,7 @@ public class ZkJobCoordinator implements JobCoordinator, ZkControllerListener {
}
@Override
- public int getProcessorId() {
+ public String getProcessorId() {
return processorId;
}
@@ -204,14 +212,12 @@ public class ZkJobCoordinator implements JobCoordinator, ZkControllerListener {
}
log.info("pid=" + processorId + "generating new model. Version = " + nextJMVersion);
- StringBuilder sb = new StringBuilder();
- List<Integer> containerIds = new ArrayList<>();
+ List<String> containerIds = new ArrayList<>();
for (String processor : currentProcessors) {
- String zkProcessorId = keyBuilder.parseIdFromPath(processor);
- sb.append(zkProcessorId).append(",");
- containerIds.add(Integer.valueOf(zkProcessorId));
+ String zkProcessorId = ZkKeyBuilder.parseIdFromPath(processor);
+ containerIds.add(zkProcessorId);
}
- log.info("generate new job model: processorsIds: " + sb.toString());
+ log.info("generate new job model: processorsIds: " + Arrays.toString(containerIds.toArray()));
jobModel = JobModelManager.readJobModel(this.config, Collections.emptyMap(), null, streamMetadataCache,
containerIds);
http://git-wip-us.apache.org/repos/asf/samza/blob/a7da1840/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinatorFactory.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinatorFactory.java b/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinatorFactory.java
index 915866d..6206baf 100644
--- a/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinatorFactory.java
+++ b/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinatorFactory.java
@@ -31,26 +31,22 @@ public class ZkJobCoordinatorFactory implements JobCoordinatorFactory {
/**
* Method to instantiate an implementation of JobCoordinator
*
- * @param processorId Indicates the StreamProcessor's id to which this Job Coordinator is associated with
- * @param config Configs relevant for the JobCoordinator TODO: Separate JC related configs into a "JobCoordinatorConfig"
+ * @param config Configs relevant for the JobCoordinator TODO: Separate JC related configs into a "JobCoordinatorConfig"
* @return An instance of IJobCoordinator
*/
@Override
- public JobCoordinator getJobCoordinator(int processorId, Config config, SamzaContainerController containerController) {
+ public JobCoordinator getJobCoordinator(Config config, SamzaContainerController containerController) {
JobConfig jobConfig = new JobConfig(config);
String groupName = String.format("%s-%s", jobConfig.getName().get(), jobConfig.getJobId().get());
ZkConfig zkConfig = new ZkConfig(config);
- String processorIdStr = String.valueOf(processorId);
ScheduleAfterDebounceTime debounceTimer = new ScheduleAfterDebounceTime();
ZkClient zkClient = new ZkClient(zkConfig.getZkConnect(), zkConfig.getZkSessionTimeoutMs(), zkConfig.getZkConnectionTimeoutMs());
return new ZkJobCoordinator(
- processorId,
"groupId", // TODO: Usage of groupId to be resolved in SAMZA-1173
- config,
+ config,
debounceTimer,
new ZkUtils(
- processorIdStr,
new ZkKeyBuilder(groupName),
zkClient,
zkConfig.getZkConnectionTimeoutMs()
http://git-wip-us.apache.org/repos/asf/samza/blob/a7da1840/samza-core/src/main/java/org/apache/samza/zk/ZkUtils.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkUtils.java b/samza-core/src/main/java/org/apache/samza/zk/ZkUtils.java
index 7a9b4d5..d77aab2 100644
--- a/samza-core/src/main/java/org/apache/samza/zk/ZkUtils.java
+++ b/samza-core/src/main/java/org/apache/samza/zk/ZkUtils.java
@@ -64,13 +64,11 @@ public class ZkUtils {
private volatile String ephemeralPath = null;
private final ZkKeyBuilder keyBuilder;
private final int connectionTimeoutMs;
- private final String processorId;
- public ZkUtils(String processorId, ZkKeyBuilder zkKeyBuilder, ZkClient zkClient, int connectionTimeoutMs) {
+ public ZkUtils(ZkKeyBuilder zkKeyBuilder, ZkClient zkClient, int connectionTimeoutMs) {
this.keyBuilder = zkKeyBuilder;
this.connectionTimeoutMs = connectionTimeoutMs;
this.zkClient = zkClient;
- this.processorId = processorId;
}
public void connect() throws ZkInterruptedException {
@@ -160,7 +158,7 @@ public class ZkUtils {
* @param dataListener describe this
*/
public void subscribeToJobModelVersionChange(IZkDataListener dataListener) {
- LOG.info("pid=" + processorId + " subscribing for jm version change at:" + keyBuilder.getJobModelVersionPath());
+ LOG.info(" subscribing for jm version change at:" + keyBuilder.getJobModelVersionPath());
zkClient.subscribeDataChanges(keyBuilder.getJobModelVersionPath(), dataListener);
}
@@ -175,7 +173,7 @@ public class ZkUtils {
try {
ObjectMapper mmapper = SamzaObjectMapper.getObjectMapper();
String jobModelStr = mmapper.writerWithDefaultPrettyPrinter().writeValueAsString(jobModel);
- LOG.info("pid=" + processorId + " jobModelAsString=" + jobModelStr);
+ LOG.info("jobModelAsString=" + jobModelStr);
zkClient.createPersistent(keyBuilder.getJobModelPath(jobModelVersion), jobModelStr);
LOG.info("wrote jobModel path =" + keyBuilder.getJobModelPath(jobModelVersion));
} catch (Exception e) {
@@ -190,7 +188,7 @@ public class ZkUtils {
* @return job model for this version
*/
public JobModel getJobModel(String jobModelVersion) {
- LOG.info("pid=" + processorId + "read the model ver=" + jobModelVersion + " from " + keyBuilder.getJobModelPath(jobModelVersion));
+ LOG.info("read the model ver=" + jobModelVersion + " from " + keyBuilder.getJobModelPath(jobModelVersion));
Object data = zkClient.readData(keyBuilder.getJobModelPath(jobModelVersion));
ObjectMapper mmapper = SamzaObjectMapper.getObjectMapper();
JobModel jm;
@@ -218,7 +216,7 @@ public class ZkUtils {
public void publishJobModelVersion(String oldVersion, String newVersion) {
Stat stat = new Stat();
String currentVersion = zkClient.<String>readData(keyBuilder.getJobModelVersionPath(), stat);
- LOG.info("pid=" + processorId + " publishing new version: " + newVersion + "; oldVersion = " + oldVersion + "(" + stat
+ LOG.info("publishing new version: " + newVersion + "; oldVersion = " + oldVersion + "(" + stat
.getVersion() + ")");
if (currentVersion != null && !currentVersion.equals(oldVersion)) {
@@ -234,9 +232,8 @@ public class ZkUtils {
LOG.error(msg, e);
throw new SamzaException(msg);
}
- LOG.info("pid=" + processorId +
- " published new version: " + newVersion + "; expected data version = " + (dataVersion + 1) + "(actual data version after update = " + stat.getVersion()
- + ")");
+ LOG.info("published new version: " + newVersion + "; expected data version = " + (dataVersion + 1) +
+ "(actual data version after update = " + stat.getVersion() + ")");
}
@@ -257,14 +254,14 @@ public class ZkUtils {
* @param listener - will be called when a processor is added or removed.
*/
public void subscribeToProcessorChange(IZkChildListener listener) {
- LOG.info("pid=" + processorId + " subscribing for child change at:" + keyBuilder.getProcessorsPath());
+ LOG.info("subscribing for child change at:" + keyBuilder.getProcessorsPath());
zkClient.subscribeChildChanges(keyBuilder.getProcessorsPath(), listener);
}
public void deleteRoot() {
String rootPath = keyBuilder.getRootPath();
if (rootPath != null && !rootPath.isEmpty() && zkClient.exists(rootPath)) {
- LOG.info("pid=" + processorId + " Deleteing root: " + rootPath);
+ LOG.info("Deleteing root: " + rootPath);
zkClient.deleteRecursive(rootPath);
}
}
http://git-wip-us.apache.org/repos/asf/samza/blob/a7da1840/samza-core/src/main/scala/org/apache/samza/config/ShellCommandConfig.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/config/ShellCommandConfig.scala b/samza-core/src/main/scala/org/apache/samza/config/ShellCommandConfig.scala
index f505322..1397ed5 100644
--- a/samza-core/src/main/scala/org/apache/samza/config/ShellCommandConfig.scala
+++ b/samza-core/src/main/scala/org/apache/samza/config/ShellCommandConfig.scala
@@ -26,8 +26,7 @@ object ShellCommandConfig {
val ENV_COORDINATOR_SYSTEM_CONFIG = "SAMZA_COORDINATOR_SYSTEM_CONFIG"
/**
- * The ID for a container. This is an integer number between 0 and
- * <number of containers>.
+ * The ID for a container. This is a string representation that is unique to the runtime environment.
*/
val ENV_CONTAINER_ID = "SAMZA_CONTAINER_ID"
http://git-wip-us.apache.org/repos/asf/samza/blob/a7da1840/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
index 96a337c..aba0d17 100644
--- a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
+++ b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
@@ -81,7 +81,7 @@ object SamzaContainer extends Logging {
val DEFAULT_READ_JOBMODEL_DELAY_MS = 100
val DISK_POLL_INTERVAL_KEY = "container.disk.poll.interval.ms"
- def getLocalityManager(containerId: Int, config: Config): LocalityManager = {
+ def getLocalityManager(containerId: String, config: Config): LocalityManager = {
val containerName = getSamzaContainerName(containerId)
val registryMap = new MetricsRegistryMap(containerName)
val coordinatorSystemProducer =
@@ -108,12 +108,12 @@ object SamzaContainer extends Logging {
classOf[JobModel])
}
- def getSamzaContainerName(containerId: Int): String = {
- "samza-container-%d" format containerId
+ def getSamzaContainerName(containerId: String): String = {
+ "samza-container-%s" format containerId
}
def apply(
- containerId: Int,
+ containerId: String,
containerModel: ContainerModel,
config: Config,
maxChangeLogStreamPartitions: Int,
http://git-wip-us.apache.org/repos/asf/samza/blob/a7da1840/samza-core/src/main/scala/org/apache/samza/coordinator/JobModelManager.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/coordinator/JobModelManager.scala b/samza-core/src/main/scala/org/apache/samza/coordinator/JobModelManager.scala
index 5e4677f..e39ea3b 100644
--- a/samza-core/src/main/scala/org/apache/samza/coordinator/JobModelManager.scala
+++ b/samza-core/src/main/scala/org/apache/samza/coordinator/JobModelManager.scala
@@ -151,7 +151,7 @@ object JobModelManager extends Logging {
localityManager: LocalityManager,
streamMetadataCache: StreamMetadataCache,
streamPartitionCountMonitor: StreamPartitionCountMonitor,
- containerIds: java.util.List[Integer]) = {
+ containerIds: java.util.List[String]) = {
val jobModel: JobModel = readJobModel(config, changeLogMapping, localityManager, streamMetadataCache, containerIds)
jobModelRef.set(jobModel)
@@ -219,7 +219,7 @@ object JobModelManager extends Logging {
changeLogPartitionMapping: util.Map[TaskName, Integer],
localityManager: LocalityManager,
streamMetadataCache: StreamMetadataCache,
- containerIds: java.util.List[Integer]): JobModel = {
+ containerIds: java.util.List[String]): JobModel = {
// Do grouping to fetch TaskName to SSP mapping
val allSystemStreamPartitions = getMatchedInputStreamPartitions(config, streamMetadataCache)
val grouper = getSystemStreamPartitionGrouper(config)
@@ -258,7 +258,7 @@ object JobModelManager extends Logging {
case _ => containerGrouper.group(taskModels.asJava, containerIds)
}
}
- val containerMap = containerModels.asScala.map { case (containerModel) => Integer.valueOf(containerModel.getContainerId) -> containerModel }.toMap
+ val containerMap = containerModels.asScala.map { case (containerModel) => containerModel.getProcessorId -> containerModel }.toMap
new JobModel(config, containerMap.asJava, localityManager)
}
http://git-wip-us.apache.org/repos/asf/samza/blob/a7da1840/samza-core/src/main/scala/org/apache/samza/job/local/ProcessJobFactory.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/job/local/ProcessJobFactory.scala b/samza-core/src/main/scala/org/apache/samza/job/local/ProcessJobFactory.scala
index 475df52..7a31567 100644
--- a/samza-core/src/main/scala/org/apache/samza/job/local/ProcessJobFactory.scala
+++ b/samza-core/src/main/scala/org/apache/samza/job/local/ProcessJobFactory.scala
@@ -64,7 +64,7 @@ class ProcessJobFactory extends StreamJobFactory with Logging {
commandBuilder
.setConfig(config)
- .setId(0)
+ .setId("0")
.setUrl(coordinator.server.getUrl)
.setCommandPath(fwkPath)
http://git-wip-us.apache.org/repos/asf/samza/blob/a7da1840/samza-core/src/main/scala/org/apache/samza/job/local/ThreadJobFactory.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/job/local/ThreadJobFactory.scala b/samza-core/src/main/scala/org/apache/samza/job/local/ThreadJobFactory.scala
index f218543..dcef3af 100644
--- a/samza-core/src/main/scala/org/apache/samza/job/local/ThreadJobFactory.scala
+++ b/samza-core/src/main/scala/org/apache/samza/job/local/ThreadJobFactory.scala
@@ -42,10 +42,10 @@ class ThreadJobFactory extends StreamJobFactory with Logging {
info("Creating a ThreadJob, which is only meant for debugging.")
val coordinator = JobModelManager(config)
val jobModel = coordinator.jobModel
- val containerModel = jobModel.getContainers.get(0)
+ val containerModel = jobModel.getContainers.get("0")
val jmxServer = new JmxServer
val streamApp = TaskFactoryUtil.createStreamApplication(config)
- val appRunner = new LocalContainerRunner(jobModel, 0)
+ val appRunner = new LocalContainerRunner(jobModel, "0")
val taskFactory = TaskFactoryUtil.createTaskFactory(config, streamApp, appRunner)
// Give developers a nice friendly warning if they've specified task.opts and are using a threaded job.
@@ -58,7 +58,7 @@ class ThreadJobFactory extends StreamJobFactory with Logging {
coordinator.start
new ThreadJob(
SamzaContainer(
- containerModel.getContainerId,
+ containerModel.getProcessorId,
containerModel,
config,
jobModel.maxChangeLogStreamPartitions,
http://git-wip-us.apache.org/repos/asf/samza/blob/a7da1840/samza-core/src/test/java/org/apache/samza/clustermanager/MockContainerAllocator.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/clustermanager/MockContainerAllocator.java b/samza-core/src/test/java/org/apache/samza/clustermanager/MockContainerAllocator.java
index 6189fe7..109ed47 100644
--- a/samza-core/src/test/java/org/apache/samza/clustermanager/MockContainerAllocator.java
+++ b/samza-core/src/test/java/org/apache/samza/clustermanager/MockContainerAllocator.java
@@ -34,7 +34,7 @@ public class MockContainerAllocator extends ContainerAllocator {
}
@Override
- public void requestResources(Map<Integer, String> containerToHostMappings) {
+ public void requestResources(Map<String, String> containerToHostMappings) {
requestedContainers += containerToHostMappings.size();
super.requestResources(containerToHostMappings);
}
http://git-wip-us.apache.org/repos/asf/samza/blob/a7da1840/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerAllocator.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerAllocator.java b/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerAllocator.java
index 5351bc3..989b82a 100644
--- a/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerAllocator.java
+++ b/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerAllocator.java
@@ -98,10 +98,10 @@ public class TestContainerAllocator {
//That way it becomes easier to mock objects. Save it for later.
HttpServer server = new MockHttpServer("/", 7777, null, new ServletHolder(DefaultServlet.class));
- Map<Integer, ContainerModel> containers = new java.util.HashMap<>();
+ Map<String, ContainerModel> containers = new java.util.HashMap<>();
for (int i = 0; i < containerCount; i++) {
- ContainerModel container = new ContainerModel(i, new HashMap<TaskName, TaskModel>());
- containers.put(i, container);
+ ContainerModel container = new ContainerModel(String.valueOf(i), i, new HashMap<TaskName, TaskModel>());
+ containers.put(String.valueOf(i), container);
}
JobModel jobModel = new JobModel(getConfig(), containers);
return new JobModelManager(jobModel, server, null);
@@ -130,12 +130,12 @@ public class TestContainerAllocator {
*/
@Test
public void testRequestContainers() throws Exception {
- Map<Integer, String> containersToHostMapping = new HashMap<Integer, String>() {
+ Map<String, String> containersToHostMapping = new HashMap<String, String>() {
{
- put(0, "abc");
- put(1, "def");
- put(2, null);
- put(3, "abc");
+ put("0", "abc");
+ put("1", "def");
+ put("2", null);
+ put("3", "abc");
}
};
@@ -160,9 +160,9 @@ public class TestContainerAllocator {
@Test
public void testRequestContainersWithNoMapping() throws Exception {
int containerCount = 4;
- Map<Integer, String> containersToHostMapping = new HashMap<Integer, String>();
+ Map<String, String> containersToHostMapping = new HashMap<String, String>();
for (int i = 0; i < containerCount; i++) {
- containersToHostMapping.put(i, null);
+ containersToHostMapping.put(String.valueOf(i), null);
}
allocatorThread.start();
@@ -208,7 +208,7 @@ public class TestContainerAllocator {
allocatorThread.start();
- containerAllocator.requestResource(0, "abc");
+ containerAllocator.requestResource("0", "abc");
containerAllocator.addResource(resource);
containerAllocator.addResource(resource1);
@@ -245,11 +245,11 @@ public class TestContainerAllocator {
assertEquals(2, requestState.assignedRequests.size());
SamzaResourceRequest request = requestState.assignedRequests.remove();
- assertEquals(0, request.getContainerID());
+ assertEquals("0", request.getContainerID());
assertEquals("2", request.getPreferredHost());
request = requestState.assignedRequests.remove();
- assertEquals(0, request.getContainerID());
+ assertEquals("0", request.getContainerID());
assertEquals("ANY_HOST", request.getPreferredHost());
// This routine should be called after the retry is assigned, but before it's started.
@@ -261,7 +261,7 @@ public class TestContainerAllocator {
state.neededContainers.set(1);
requestState.registerContainerListener(listener);
- containerAllocator.requestResource(0, "2");
+ containerAllocator.requestResource("0", "2");
containerAllocator.addResource(container);
containerAllocator.addResource(container1);
allocatorThread.start();
http://git-wip-us.apache.org/repos/asf/samza/blob/a7da1840/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerProcessManager.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerProcessManager.java b/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerProcessManager.java
index 0d61814..660012e 100644
--- a/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerProcessManager.java
+++ b/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerProcessManager.java
@@ -87,15 +87,14 @@ public class TestContainerProcessManager {
private SamzaApplicationState state = null;
-
private JobModelManager getCoordinator(int containerCount) {
- Map<Integer, ContainerModel> containers = new java.util.HashMap<>();
+ Map<String, ContainerModel> containers = new java.util.HashMap<>();
for (int i = 0; i < containerCount; i++) {
- ContainerModel container = new ContainerModel(i, new HashMap<TaskName, TaskModel>());
- containers.put(i, container);
+ ContainerModel container = new ContainerModel(String.valueOf(i), i, new HashMap<TaskName, TaskModel>());
+ containers.put(String.valueOf(i), container);
}
- Map<Integer, Map<String, String>> localityMap = new HashMap<>();
- localityMap.put(0, new HashMap<String, String>() { {
+ Map<String, Map<String, String>> localityMap = new HashMap<>();
+ localityMap.put("0", new HashMap<String, String>() { {
put(SetContainerHostMapping.HOST_KEY, "abc");
}
});
@@ -105,9 +104,7 @@ public class TestContainerProcessManager {
JobModel jobModel = new JobModel(getConfig(), containers, mockLocalityManager);
JobModelManager.jobModelRef().getAndSet(jobModel);
- JobModelManager reader = new JobModelManager(jobModel, this.server, null);
-
- return reader;
+ return new JobModelManager(jobModel, this.server, null);
}
@Before
@@ -137,7 +134,8 @@ public class TestContainerProcessManager {
manager
);
- AbstractContainerAllocator allocator = (AbstractContainerAllocator) getPrivateFieldFromTaskManager("containerAllocator", taskManager).get(taskManager);
+ AbstractContainerAllocator allocator =
+ (AbstractContainerAllocator) getPrivateFieldFromTaskManager("containerAllocator", taskManager).get(taskManager);
assertEquals(ContainerAllocator.class, allocator.getClass());
// Asserts that samza exposed container configs is honored by allocator thread
assertEquals(500, allocator.containerMemoryMb);
@@ -155,7 +153,8 @@ public class TestContainerProcessManager {
manager
);
- allocator = (AbstractContainerAllocator) getPrivateFieldFromTaskManager("containerAllocator", taskManager).get(taskManager);
+ allocator =
+ (AbstractContainerAllocator) getPrivateFieldFromTaskManager("containerAllocator", taskManager).get(taskManager);
assertEquals(HostAwareContainerAllocator.class, allocator.getClass());
// Asserts that samza exposed container configs is honored by allocator thread
assertEquals(500, allocator.containerMemoryMb);
@@ -244,6 +243,7 @@ public class TestContainerProcessManager {
assertTrue(taskManager.shouldShutdown());
}
+
/**
* Test Task Manager should request a new container when a task fails with unknown exit code
* When host-affinity is not enabled, it will always request for ANY_HOST
@@ -317,6 +317,7 @@ public class TestContainerProcessManager {
taskManager.stop();
}
+
/**
* Test AM requests a new container when a task fails
* Error codes with same behavior - Disk failure, preemption and aborted
@@ -416,8 +417,6 @@ public class TestContainerProcessManager {
taskManager1.onResourceAllocated(container2);
}
-
-
@After
public void teardown() {
server.stop();
http://git-wip-us.apache.org/repos/asf/samza/blob/a7da1840/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerRequestState.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerRequestState.java b/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerRequestState.java
index 7a514e8..3d52510 100644
--- a/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerRequestState.java
+++ b/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerRequestState.java
@@ -40,7 +40,7 @@ public class TestContainerRequestState {
public void testUpdateRequestState() {
// Host-affinity is enabled
ResourceRequestState state = new ResourceRequestState(true, manager);
- SamzaResourceRequest request = new SamzaResourceRequest(1, 1024, "abc", 0);
+ SamzaResourceRequest request = new SamzaResourceRequest(1, 1024, "abc", "0");
state.addResourceRequest(request);
assertNotNull(manager.resourceRequests);
@@ -57,7 +57,7 @@ public class TestContainerRequestState {
// Host-affinity is not enabled
ResourceRequestState state1 = new ResourceRequestState(false, manager);
- SamzaResourceRequest request1 = new SamzaResourceRequest(1, 1024, null, 1);
+ SamzaResourceRequest request1 = new SamzaResourceRequest(1, 1024, null, "1");
state1.addResourceRequest(request1);
assertNotNull(manager.resourceRequests);
@@ -71,7 +71,6 @@ public class TestContainerRequestState {
}
-
/**
* Test addContainer() updates the state correctly
*/
@@ -102,7 +101,7 @@ public class TestContainerRequestState {
assertEquals(container1, state1.getResourcesOnAHost(ANY_HOST).get(0));
// Container Allocated on a Requested Host
- state1.addResourceRequest(new SamzaResourceRequest(1, 1024, "abc", 0));
+ state1.addResourceRequest(new SamzaResourceRequest(1, 1024, "abc", "0"));
assertEquals(1, state1.numPendingRequests());
@@ -143,9 +142,9 @@ public class TestContainerRequestState {
public void testContainerAssignment() throws Exception {
// Host-affinity enabled
ResourceRequestState state = new ResourceRequestState(true, manager);
- SamzaResourceRequest request = new SamzaResourceRequest(1, 1024, "abc", 0);
+ SamzaResourceRequest request = new SamzaResourceRequest(1, 1024, "abc", "0");
- SamzaResourceRequest request1 = new SamzaResourceRequest(1, 1024, "def", 0);
+ SamzaResourceRequest request1 = new SamzaResourceRequest(1, 1024, "def", "0");
state.addResourceRequest(request);
state.addResourceRequest(request1);
@@ -194,5 +193,4 @@ public class TestContainerRequestState {
}
-
}
http://git-wip-us.apache.org/repos/asf/samza/blob/a7da1840/samza-core/src/test/java/org/apache/samza/clustermanager/TestHostAwareContainerAllocator.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/clustermanager/TestHostAwareContainerAllocator.java b/samza-core/src/test/java/org/apache/samza/clustermanager/TestHostAwareContainerAllocator.java
index b6651f2..83d31e2 100644
--- a/samza-core/src/test/java/org/apache/samza/clustermanager/TestHostAwareContainerAllocator.java
+++ b/samza-core/src/test/java/org/apache/samza/clustermanager/TestHostAwareContainerAllocator.java
@@ -65,16 +65,15 @@ public class TestHostAwareContainerAllocator {
allocatorThread = new Thread(containerAllocator);
}
-
/**
* Test request containers with no containerToHostMapping makes the right number of requests
*/
@Test
public void testRequestContainersWithNoMapping() throws Exception {
int containerCount = 4;
- Map<Integer, String> containersToHostMapping = new HashMap<Integer, String>();
+ Map<String, String> containersToHostMapping = new HashMap<String, String>();
for (int i = 0; i < containerCount; i++) {
- containersToHostMapping.put(i, null);
+ containersToHostMapping.put(String.valueOf(i), null);
}
allocatorThread.start();
@@ -95,10 +94,10 @@ public class TestHostAwareContainerAllocator {
*/
@Test
public void testAddContainerWithHostAffinity() throws Exception {
- containerAllocator.requestResources(new HashMap<Integer, String>() {
+ containerAllocator.requestResources(new HashMap<String, String>() {
{
- put(0, "abc");
- put(1, "xyz");
+ put("0", "abc");
+ put("1", "xyz");
}
});
@@ -153,7 +152,7 @@ public class TestHostAwareContainerAllocator {
allocatorThread.start();
- containerAllocator.requestResource(0, "abc");
+ containerAllocator.requestResource("0", "abc");
containerAllocator.addResource(resource0);
containerAllocator.addResource(resource1);
@@ -162,17 +161,14 @@ public class TestHostAwareContainerAllocator {
listener.verify();
}
-
-
-
@Test
public void testRequestContainers() throws Exception {
- Map<Integer, String> containersToHostMapping = new HashMap<Integer, String>() {
+ Map<String, String> containersToHostMapping = new HashMap<String, String>() {
{
- put(0, "abc");
- put(1, "def");
- put(2, null);
- put(3, "abc");
+ put("0", "abc");
+ put("1", "def");
+ put("2", null);
+ put("3", "abc");
}
};
@@ -221,11 +217,11 @@ public class TestHostAwareContainerAllocator {
assertEquals(2, requestState.assignedRequests.size());
SamzaResourceRequest request = requestState.assignedRequests.remove();
- assertEquals(0, request.getContainerID());
+ assertEquals("0", request.getContainerID());
assertEquals("2", request.getPreferredHost());
request = requestState.assignedRequests.remove();
- assertEquals(0, request.getContainerID());
+ assertEquals("0", request.getContainerID());
assertEquals("ANY_HOST", request.getPreferredHost());
// This routine should be called after the retry is assigned, but before it's started.
@@ -238,7 +234,7 @@ public class TestHostAwareContainerAllocator {
requestState.registerContainerListener(listener);
// Only request 1 container and we should see 2 assignments in the assertions above (because of the retry)
- containerAllocator.requestResource(0, "2");
+ containerAllocator.requestResource("0", "2");
containerAllocator.addResource(container1);
containerAllocator.addResource(container);
@@ -257,10 +253,10 @@ public class TestHostAwareContainerAllocator {
final SamzaResource resource0 = new SamzaResource(1, 1000, "xyz", "id1");
final SamzaResource resource1 = new SamzaResource(1, 1000, "zzz", "id2");
- Map<Integer, String> containersToHostMapping = new HashMap<Integer, String>() {
+ Map<String, String> containersToHostMapping = new HashMap<String, String>() {
{
- put(0, "abc");
- put(1, "def");
+ put("0", "abc");
+ put("1", "def");
}
};
containerAllocator.requestResources(containersToHostMapping);
@@ -315,7 +311,6 @@ public class TestHostAwareContainerAllocator {
containerAllocator.stop();
}
-
private static Config getConfig() {
Config config = new MapConfig(new HashMap<String, String>() {
{
@@ -344,10 +339,10 @@ public class TestHostAwareContainerAllocator {
//That way it becomes easier to mock objects. Save it for later.
HttpServer server = new MockHttpServer("/", 7777, null, new ServletHolder(DefaultServlet.class));
- Map<Integer, ContainerModel> containers = new java.util.HashMap<>();
+ Map<String, ContainerModel> containers = new java.util.HashMap<>();
for (int i = 0; i < containerCount; i++) {
- ContainerModel container = new ContainerModel(i, new HashMap<TaskName, TaskModel>());
- containers.put(i, container);
+ ContainerModel container = new ContainerModel(String.valueOf(i), i, new HashMap<TaskName, TaskModel>());
+ containers.put(String.valueOf(i), container);
}
JobModel jobModel = new JobModel(getConfig(), containers);
return new JobModelManager(jobModel, server, null);
http://git-wip-us.apache.org/repos/asf/samza/blob/a7da1840/samza-core/src/test/java/org/apache/samza/container/TestLocalityManager.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/container/TestLocalityManager.java b/samza-core/src/test/java/org/apache/samza/container/TestLocalityManager.java
index 5341141..07f721d 100644
--- a/samza-core/src/test/java/org/apache/samza/container/TestLocalityManager.java
+++ b/samza-core/src/test/java/org/apache/samza/container/TestLocalityManager.java
@@ -84,12 +84,12 @@ public class TestLocalityManager {
assertTrue(producer.isStarted());
assertTrue(consumer.isStarted());
- localityManager.writeContainerToHostMapping(0, "localhost", "jmx:localhost:8080", "jmx:tunnel:localhost:9090");
- Map<Integer, Map<String, String>> localMap = localityManager.readContainerLocality();
- Map<Integer, Map<String, String>> expectedMap =
- new HashMap<Integer, Map<String, String>>() {
+ localityManager.writeContainerToHostMapping("0", "localhost", "jmx:localhost:8080", "jmx:tunnel:localhost:9090");
+ Map<String, Map<String, String>> localMap = localityManager.readContainerLocality();
+ Map<String, Map<String, String>> expectedMap =
+ new HashMap<String, Map<String, String>>() {
{
- this.put(new Integer(0),
+ this.put("0",
new HashMap<String, String>() {
{
this.put(SetContainerHostMapping.HOST_KEY, "localhost");
@@ -118,7 +118,7 @@ public class TestLocalityManager {
localityManager.start();
assertTrue(producer.isStarted());
- localityManager.writeContainerToHostMapping(1, "localhost", "jmx:localhost:8181", "jmx:tunnel:localhost:9191");
+ localityManager.writeContainerToHostMapping("1", "localhost", "jmx:localhost:8181", "jmx:tunnel:localhost:9191");
try {
localityManager.readContainerLocality();
fail("Should have thrown UnsupportedOperationException");
http://git-wip-us.apache.org/repos/asf/samza/blob/a7da1840/samza-core/src/test/java/org/apache/samza/container/grouper/task/TestGroupByContainerCount.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/container/grouper/task/TestGroupByContainerCount.java b/samza-core/src/test/java/org/apache/samza/container/grouper/task/TestGroupByContainerCount.java
index 3fd39d7..de4de7c 100644
--- a/samza-core/src/test/java/org/apache/samza/container/grouper/task/TestGroupByContainerCount.java
+++ b/samza-core/src/test/java/org/apache/samza/container/grouper/task/TestGroupByContainerCount.java
@@ -18,16 +18,20 @@
*/
package org.apache.samza.container.grouper.task;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Set;
+import org.apache.samza.SamzaException;
import org.apache.samza.container.LocalityManager;
import org.apache.samza.job.model.ContainerModel;
import org.apache.samza.job.model.TaskModel;
import org.junit.Before;
import org.junit.Test;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+
import static org.apache.samza.container.mock.ContainerMocks.generateTaskContainerMapping;
import static org.apache.samza.container.mock.ContainerMocks.generateTaskModels;
import static org.apache.samza.container.mock.ContainerMocks.getTaskModel;
@@ -35,12 +39,16 @@ import static org.apache.samza.container.mock.ContainerMocks.getTaskName;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
-import static org.mockito.Mockito.*;
+import static org.mockito.Mockito.anyCollection;
+import static org.mockito.Mockito.anyString;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
public class TestGroupByContainerCount {
private TaskAssignmentManager taskAssignmentManager;
private LocalityManager localityManager;
-
@Before
public void setup() {
taskAssignmentManager = mock(TaskAssignmentManager.class);
@@ -73,18 +81,18 @@ public class TestGroupByContainerCount {
Set<ContainerModel> containers = new GroupByContainerCount(2).group(taskModels);
- Map<Integer, ContainerModel> containersMap = new HashMap<>();
+ Map<String, ContainerModel> containersMap = new HashMap<>();
for (ContainerModel container : containers) {
- containersMap.put(container.getContainerId(), container);
+ containersMap.put(container.getProcessorId(), container);
}
assertEquals(2, containers.size());
- ContainerModel container0 = containersMap.get(0);
- ContainerModel container1 = containersMap.get(1);
+ ContainerModel container0 = containersMap.get("0");
+ ContainerModel container1 = containersMap.get("1");
assertNotNull(container0);
assertNotNull(container1);
- assertEquals(0, container0.getContainerId());
- assertEquals(1, container1.getContainerId());
+ assertEquals("0", container0.getProcessorId());
+ assertEquals("1", container1.getProcessorId());
assertEquals(3, container0.getTasks().size());
assertEquals(2, container1.getTasks().size());
assertTrue(container0.getTasks().containsKey(getTaskName(0)));
@@ -100,18 +108,18 @@ public class TestGroupByContainerCount {
Set<ContainerModel> containers = new GroupByContainerCount(2).group(taskModels);
- Map<Integer, ContainerModel> containersMap = new HashMap<>();
+ Map<String, ContainerModel> containersMap = new HashMap<>();
for (ContainerModel container : containers) {
- containersMap.put(container.getContainerId(), container);
+ containersMap.put(container.getProcessorId(), container);
}
assertEquals(2, containers.size());
- ContainerModel container0 = containersMap.get(0);
- ContainerModel container1 = containersMap.get(1);
+ ContainerModel container0 = containersMap.get("0");
+ ContainerModel container1 = containersMap.get("1");
assertNotNull(container0);
assertNotNull(container1);
- assertEquals(0, container0.getContainerId());
- assertEquals(1, container1.getContainerId());
+ assertEquals("0", container0.getProcessorId());
+ assertEquals("1", container1.getProcessorId());
assertEquals(11, container0.getTasks().size());
assertEquals(10, container1.getTasks().size());
@@ -167,27 +175,27 @@ public class TestGroupByContainerCount {
public void testBalancerAfterContainerIncrease() {
Set<TaskModel> taskModels = generateTaskModels(9);
Set<ContainerModel> prevContainers = new GroupByContainerCount(2).group(taskModels);
- Map<String, Integer> prevTaskToContainerMapping = generateTaskContainerMapping(prevContainers);
+ Map<String, String> prevTaskToContainerMapping = generateTaskContainerMapping(prevContainers);
when(taskAssignmentManager.readTaskAssignment()).thenReturn(prevTaskToContainerMapping);
Set<ContainerModel> containers = new GroupByContainerCount(4).balance(taskModels, localityManager);
- Map<Integer, ContainerModel> containersMap = new HashMap<>();
+ Map<String, ContainerModel> containersMap = new HashMap<>();
for (ContainerModel container : containers) {
- containersMap.put(container.getContainerId(), container);
+ containersMap.put(container.getProcessorId(), container);
}
assertEquals(4, containers.size());
- ContainerModel container0 = containersMap.get(0);
- ContainerModel container1 = containersMap.get(1);
- ContainerModel container2 = containersMap.get(2);
- ContainerModel container3 = containersMap.get(3);
+ ContainerModel container0 = containersMap.get("0");
+ ContainerModel container1 = containersMap.get("1");
+ ContainerModel container2 = containersMap.get("2");
+ ContainerModel container3 = containersMap.get("3");
assertNotNull(container0);
assertNotNull(container1);
assertNotNull(container2);
assertNotNull(container3);
- assertEquals(0, container0.getContainerId());
- assertEquals(1, container1.getContainerId());
+ assertEquals("0", container0.getProcessorId());
+ assertEquals("1", container1.getProcessorId());
assertEquals(3, container0.getTasks().size());
assertEquals(2, container1.getTasks().size());
assertEquals(2, container2.getTasks().size());
@@ -207,18 +215,18 @@ public class TestGroupByContainerCount {
assertTrue(container3.getTasks().containsKey(getTaskName(7)));
// Verify task mappings are saved
- verify(taskAssignmentManager).writeTaskContainerMapping(getTaskName(0).getTaskName(), 0);
- verify(taskAssignmentManager).writeTaskContainerMapping(getTaskName(2).getTaskName(), 0);
- verify(taskAssignmentManager).writeTaskContainerMapping(getTaskName(4).getTaskName(), 0);
+ verify(taskAssignmentManager).writeTaskContainerMapping(getTaskName(0).getTaskName(), "0");
+ verify(taskAssignmentManager).writeTaskContainerMapping(getTaskName(2).getTaskName(), "0");
+ verify(taskAssignmentManager).writeTaskContainerMapping(getTaskName(4).getTaskName(), "0");
- verify(taskAssignmentManager).writeTaskContainerMapping(getTaskName(1).getTaskName(), 1);
- verify(taskAssignmentManager).writeTaskContainerMapping(getTaskName(3).getTaskName(), 1);
+ verify(taskAssignmentManager).writeTaskContainerMapping(getTaskName(1).getTaskName(), "1");
+ verify(taskAssignmentManager).writeTaskContainerMapping(getTaskName(3).getTaskName(), "1");
- verify(taskAssignmentManager).writeTaskContainerMapping(getTaskName(8).getTaskName(), 2);
- verify(taskAssignmentManager).writeTaskContainerMapping(getTaskName(6).getTaskName(), 2);
+ verify(taskAssignmentManager).writeTaskContainerMapping(getTaskName(8).getTaskName(), "2");
+ verify(taskAssignmentManager).writeTaskContainerMapping(getTaskName(6).getTaskName(), "2");
- verify(taskAssignmentManager).writeTaskContainerMapping(getTaskName(5).getTaskName(), 3);
- verify(taskAssignmentManager).writeTaskContainerMapping(getTaskName(7).getTaskName(), 3);
+ verify(taskAssignmentManager).writeTaskContainerMapping(getTaskName(5).getTaskName(), "3");
+ verify(taskAssignmentManager).writeTaskContainerMapping(getTaskName(7).getTaskName(), "3");
verify(taskAssignmentManager, never()).deleteTaskContainerMappings(anyCollection());
}
@@ -249,23 +257,23 @@ public class TestGroupByContainerCount {
public void testBalancerAfterContainerDecrease() {
Set<TaskModel> taskModels = generateTaskModels(9);
Set<ContainerModel> prevContainers = new GroupByContainerCount(4).group(taskModels);
- Map<String, Integer> prevTaskToContainerMapping = generateTaskContainerMapping(prevContainers);
+ Map<String, String> prevTaskToContainerMapping = generateTaskContainerMapping(prevContainers);
when(taskAssignmentManager.readTaskAssignment()).thenReturn(prevTaskToContainerMapping);
Set<ContainerModel> containers = new GroupByContainerCount(2).balance(taskModels, localityManager);
- Map<Integer, ContainerModel> containersMap = new HashMap<>();
+ Map<String, ContainerModel> containersMap = new HashMap<>();
for (ContainerModel container : containers) {
- containersMap.put(container.getContainerId(), container);
+ containersMap.put(container.getProcessorId(), container);
}
assertEquals(2, containers.size());
- ContainerModel container0 = containersMap.get(0);
- ContainerModel container1 = containersMap.get(1);
+ ContainerModel container0 = containersMap.get("0");
+ ContainerModel container1 = containersMap.get("1");
assertNotNull(container0);
assertNotNull(container1);
- assertEquals(0, container0.getContainerId());
- assertEquals(1, container1.getContainerId());
+ assertEquals("0", container0.getProcessorId());
+ assertEquals("1", container1.getProcessorId());
assertEquals(5, container0.getTasks().size());
assertEquals(4, container1.getTasks().size());
@@ -284,16 +292,16 @@ public class TestGroupByContainerCount {
assertTrue(container1.getTasks().containsKey(getTaskName(3)));
// Verify task mappings are saved
- verify(taskAssignmentManager).writeTaskContainerMapping(getTaskName(0).getTaskName(), 0);
- verify(taskAssignmentManager).writeTaskContainerMapping(getTaskName(4).getTaskName(), 0);
- verify(taskAssignmentManager).writeTaskContainerMapping(getTaskName(8).getTaskName(), 0);
- verify(taskAssignmentManager).writeTaskContainerMapping(getTaskName(6).getTaskName(), 0);
- verify(taskAssignmentManager).writeTaskContainerMapping(getTaskName(2).getTaskName(), 0);
+ verify(taskAssignmentManager).writeTaskContainerMapping(getTaskName(0).getTaskName(), "0");
+ verify(taskAssignmentManager).writeTaskContainerMapping(getTaskName(4).getTaskName(), "0");
+ verify(taskAssignmentManager).writeTaskContainerMapping(getTaskName(8).getTaskName(), "0");
+ verify(taskAssignmentManager).writeTaskContainerMapping(getTaskName(6).getTaskName(), "0");
+ verify(taskAssignmentManager).writeTaskContainerMapping(getTaskName(2).getTaskName(), "0");
- verify(taskAssignmentManager).writeTaskContainerMapping(getTaskName(1).getTaskName(), 1);
- verify(taskAssignmentManager).writeTaskContainerMapping(getTaskName(5).getTaskName(), 1);
- verify(taskAssignmentManager).writeTaskContainerMapping(getTaskName(7).getTaskName(), 1);
- verify(taskAssignmentManager).writeTaskContainerMapping(getTaskName(3).getTaskName(), 1);
+ verify(taskAssignmentManager).writeTaskContainerMapping(getTaskName(1).getTaskName(), "1");
+ verify(taskAssignmentManager).writeTaskContainerMapping(getTaskName(5).getTaskName(), "1");
+ verify(taskAssignmentManager).writeTaskContainerMapping(getTaskName(7).getTaskName(), "1");
+ verify(taskAssignmentManager).writeTaskContainerMapping(getTaskName(3).getTaskName(), "1");
verify(taskAssignmentManager, never()).deleteTaskContainerMappings(anyCollection());
}
@@ -327,24 +335,24 @@ public class TestGroupByContainerCount {
// Before
Set<TaskModel> taskModels = generateTaskModels(9);
Set<ContainerModel> prevContainers = new GroupByContainerCount(4).group(taskModels);
- Map<String, Integer> prevTaskToContainerMapping = generateTaskContainerMapping(prevContainers);
+ Map<String, String> prevTaskToContainerMapping = generateTaskContainerMapping(prevContainers);
when(taskAssignmentManager.readTaskAssignment()).thenReturn(prevTaskToContainerMapping);
// First balance
Set<ContainerModel> containers = new GroupByContainerCount(2).balance(taskModels, localityManager);
- Map<Integer, ContainerModel> containersMap = new HashMap<>();
+ Map<String, ContainerModel> containersMap = new HashMap<>();
for (ContainerModel container : containers) {
- containersMap.put(container.getContainerId(), container);
+ containersMap.put(container.getProcessorId(), container);
}
assertEquals(2, containers.size());
- ContainerModel container0 = containersMap.get(0);
- ContainerModel container1 = containersMap.get(1);
+ ContainerModel container0 = containersMap.get("0");
+ ContainerModel container1 = containersMap.get("1");
assertNotNull(container0);
assertNotNull(container1);
- assertEquals(0, container0.getContainerId());
- assertEquals(1, container1.getContainerId());
+ assertEquals("0", container0.getProcessorId());
+ assertEquals("1", container1.getProcessorId());
assertEquals(5, container0.getTasks().size());
assertEquals(4, container1.getTasks().size());
@@ -363,16 +371,16 @@ public class TestGroupByContainerCount {
assertTrue(container1.getTasks().containsKey(getTaskName(3)));
// Verify task mappings are saved
- verify(taskAssignmentManager).writeTaskContainerMapping(getTaskName(0).getTaskName(), 0);
- verify(taskAssignmentManager).writeTaskContainerMapping(getTaskName(4).getTaskName(), 0);
- verify(taskAssignmentManager).writeTaskContainerMapping(getTaskName(8).getTaskName(), 0);
- verify(taskAssignmentManager).writeTaskContainerMapping(getTaskName(6).getTaskName(), 0);
- verify(taskAssignmentManager).writeTaskContainerMapping(getTaskName(2).getTaskName(), 0);
+ verify(taskAssignmentManager).writeTaskContainerMapping(getTaskName(0).getTaskName(), "0");
+ verify(taskAssignmentManager).writeTaskContainerMapping(getTaskName(4).getTaskName(), "0");
+ verify(taskAssignmentManager).writeTaskContainerMapping(getTaskName(8).getTaskName(), "0");
+ verify(taskAssignmentManager).writeTaskContainerMapping(getTaskName(6).getTaskName(), "0");
+ verify(taskAssignmentManager).writeTaskContainerMapping(getTaskName(2).getTaskName(), "0");
- verify(taskAssignmentManager).writeTaskContainerMapping(getTaskName(1).getTaskName(), 1);
- verify(taskAssignmentManager).writeTaskContainerMapping(getTaskName(5).getTaskName(), 1);
- verify(taskAssignmentManager).writeTaskContainerMapping(getTaskName(7).getTaskName(), 1);
- verify(taskAssignmentManager).writeTaskContainerMapping(getTaskName(3).getTaskName(), 1);
+ verify(taskAssignmentManager).writeTaskContainerMapping(getTaskName(1).getTaskName(), "1");
+ verify(taskAssignmentManager).writeTaskContainerMapping(getTaskName(5).getTaskName(), "1");
+ verify(taskAssignmentManager).writeTaskContainerMapping(getTaskName(7).getTaskName(), "1");
+ verify(taskAssignmentManager).writeTaskContainerMapping(getTaskName(3).getTaskName(), "1");
verify(taskAssignmentManager, never()).deleteTaskContainerMappings(anyCollection());
@@ -389,19 +397,19 @@ public class TestGroupByContainerCount {
containersMap = new HashMap<>();
for (ContainerModel container : containers) {
- containersMap.put(container.getContainerId(), container);
+ containersMap.put(container.getProcessorId(), container);
}
assertEquals(3, containers.size());
- container0 = containersMap.get(0);
- container1 = containersMap.get(1);
- ContainerModel container2 = containersMap.get(2);
+ container0 = containersMap.get("0");
+ container1 = containersMap.get("1");
+ ContainerModel container2 = containersMap.get("2");
assertNotNull(container0);
assertNotNull(container1);
assertNotNull(container2);
- assertEquals(0, container0.getContainerId());
- assertEquals(1, container1.getContainerId());
- assertEquals(2, container2.getContainerId());
+ assertEquals("0", container0.getProcessorId());
+ assertEquals("1", container1.getProcessorId());
+ assertEquals("2", container2.getProcessorId());
assertEquals(3, container0.getTasks().size());
assertEquals(3, container1.getTasks().size());
assertEquals(3, container2.getTasks().size());
@@ -421,17 +429,17 @@ public class TestGroupByContainerCount {
assertTrue(container2.getTasks().containsKey(getTaskName(3)));
// Verify task mappings are saved
- verify(taskAssignmentManager2).writeTaskContainerMapping(getTaskName(0).getTaskName(), 0);
- verify(taskAssignmentManager2).writeTaskContainerMapping(getTaskName(4).getTaskName(), 0);
- verify(taskAssignmentManager2).writeTaskContainerMapping(getTaskName(8).getTaskName(), 0);
+ verify(taskAssignmentManager2).writeTaskContainerMapping(getTaskName(0).getTaskName(), "0");
+ verify(taskAssignmentManager2).writeTaskContainerMapping(getTaskName(4).getTaskName(), "0");
+ verify(taskAssignmentManager2).writeTaskContainerMapping(getTaskName(8).getTaskName(), "0");
- verify(taskAssignmentManager2).writeTaskContainerMapping(getTaskName(1).getTaskName(), 1);
- verify(taskAssignmentManager2).writeTaskContainerMapping(getTaskName(5).getTaskName(), 1);
- verify(taskAssignmentManager2).writeTaskContainerMapping(getTaskName(7).getTaskName(), 1);
+ verify(taskAssignmentManager2).writeTaskContainerMapping(getTaskName(1).getTaskName(), "1");
+ verify(taskAssignmentManager2).writeTaskContainerMapping(getTaskName(5).getTaskName(), "1");
+ verify(taskAssignmentManager2).writeTaskContainerMapping(getTaskName(7).getTaskName(), "1");
- verify(taskAssignmentManager2).writeTaskContainerMapping(getTaskName(6).getTaskName(), 2);
- verify(taskAssignmentManager2).writeTaskContainerMapping(getTaskName(2).getTaskName(), 2);
- verify(taskAssignmentManager2).writeTaskContainerMapping(getTaskName(3).getTaskName(), 2);
+ verify(taskAssignmentManager2).writeTaskContainerMapping(getTaskName(6).getTaskName(), "2");
+ verify(taskAssignmentManager2).writeTaskContainerMapping(getTaskName(2).getTaskName(), "2");
+ verify(taskAssignmentManager2).writeTaskContainerMapping(getTaskName(3).getTaskName(), "2");
verify(taskAssignmentManager2, never()).deleteTaskContainerMappings(anyCollection());
}
@@ -459,23 +467,23 @@ public class TestGroupByContainerCount {
public void testBalancerAfterContainerSame() {
Set<TaskModel> taskModels = generateTaskModels(9);
Set<ContainerModel> prevContainers = new GroupByContainerCount(2).group(taskModels);
- Map<String, Integer> prevTaskToContainerMapping = generateTaskContainerMapping(prevContainers);
+ Map<String, String> prevTaskToContainerMapping = generateTaskContainerMapping(prevContainers);
when(taskAssignmentManager.readTaskAssignment()).thenReturn(prevTaskToContainerMapping);
Set<ContainerModel> containers = new GroupByContainerCount(2).balance(taskModels, localityManager);
- Map<Integer, ContainerModel> containersMap = new HashMap<>();
+ Map<String, ContainerModel> containersMap = new HashMap<>();
for (ContainerModel container : containers) {
- containersMap.put(container.getContainerId(), container);
+ containersMap.put(container.getProcessorId(), container);
}
assertEquals(2, containers.size());
- ContainerModel container0 = containersMap.get(0);
- ContainerModel container1 = containersMap.get(1);
+ ContainerModel container0 = containersMap.get("0");
+ ContainerModel container1 = containersMap.get("1");
assertNotNull(container0);
assertNotNull(container1);
- assertEquals(0, container0.getContainerId());
- assertEquals(1, container1.getContainerId());
+ assertEquals("0", container0.getProcessorId());
+ assertEquals("1", container1.getProcessorId());
assertEquals(5, container0.getTasks().size());
assertEquals(4, container1.getTasks().size());
@@ -489,7 +497,7 @@ public class TestGroupByContainerCount {
assertTrue(container1.getTasks().containsKey(getTaskName(5)));
assertTrue(container1.getTasks().containsKey(getTaskName(7)));
- verify(taskAssignmentManager, never()).writeTaskContainerMapping(anyString(), anyInt());
+ verify(taskAssignmentManager, never()).writeTaskContainerMapping(anyString(), anyString());
verify(taskAssignmentManager, never()).deleteTaskContainerMappings(anyCollection());
}
@@ -520,32 +528,32 @@ public class TestGroupByContainerCount {
public void testBalancerAfterContainerSameCustomAssignment() {
Set<TaskModel> taskModels = generateTaskModels(9);
- Map<String, Integer> prevTaskToContainerMapping = new HashMap<>();
- prevTaskToContainerMapping.put(getTaskName(0).getTaskName(), 0);
- prevTaskToContainerMapping.put(getTaskName(1).getTaskName(), 0);
- prevTaskToContainerMapping.put(getTaskName(2).getTaskName(), 0);
- prevTaskToContainerMapping.put(getTaskName(3).getTaskName(), 0);
- prevTaskToContainerMapping.put(getTaskName(4).getTaskName(), 0);
- prevTaskToContainerMapping.put(getTaskName(5).getTaskName(), 0);
- prevTaskToContainerMapping.put(getTaskName(6).getTaskName(), 1);
- prevTaskToContainerMapping.put(getTaskName(7).getTaskName(), 1);
- prevTaskToContainerMapping.put(getTaskName(8).getTaskName(), 1);
+ Map<String, String> prevTaskToContainerMapping = new HashMap<>();
+ prevTaskToContainerMapping.put(getTaskName(0).getTaskName(), "0");
+ prevTaskToContainerMapping.put(getTaskName(1).getTaskName(), "0");
+ prevTaskToContainerMapping.put(getTaskName(2).getTaskName(), "0");
+ prevTaskToContainerMapping.put(getTaskName(3).getTaskName(), "0");
+ prevTaskToContainerMapping.put(getTaskName(4).getTaskName(), "0");
+ prevTaskToContainerMapping.put(getTaskName(5).getTaskName(), "0");
+ prevTaskToContainerMapping.put(getTaskName(6).getTaskName(), "1");
+ prevTaskToContainerMapping.put(getTaskName(7).getTaskName(), "1");
+ prevTaskToContainerMapping.put(getTaskName(8).getTaskName(), "1");
when(taskAssignmentManager.readTaskAssignment()).thenReturn(prevTaskToContainerMapping);
Set<ContainerModel> containers = new GroupByContainerCount(2).balance(taskModels, localityManager);
- Map<Integer, ContainerModel> containersMap = new HashMap<>();
+ Map<String, ContainerModel> containersMap = new HashMap<>();
for (ContainerModel container : containers) {
- containersMap.put(container.getContainerId(), container);
+ containersMap.put(container.getProcessorId(), container);
}
assertEquals(2, containers.size());
- ContainerModel container0 = containersMap.get(0);
- ContainerModel container1 = containersMap.get(1);
+ ContainerModel container0 = containersMap.get("0");
+ ContainerModel container1 = containersMap.get("1");
assertNotNull(container0);
assertNotNull(container1);
- assertEquals(0, container0.getContainerId());
- assertEquals(1, container1.getContainerId());
+ assertEquals("0", container0.getProcessorId());
+ assertEquals("1", container1.getProcessorId());
assertEquals(6, container0.getTasks().size());
assertEquals(3, container1.getTasks().size());
@@ -559,7 +567,7 @@ public class TestGroupByContainerCount {
assertTrue(container1.getTasks().containsKey(getTaskName(7)));
assertTrue(container1.getTasks().containsKey(getTaskName(8)));
- verify(taskAssignmentManager, never()).writeTaskContainerMapping(anyString(), anyInt());
+ verify(taskAssignmentManager, never()).writeTaskContainerMapping(anyString(), anyString());
verify(taskAssignmentManager, never()).deleteTaskContainerMappings(anyCollection());
}
@@ -589,32 +597,32 @@ public class TestGroupByContainerCount {
public void testBalancerAfterContainerSameCustomAssignmentAndContainerIncrease() {
Set<TaskModel> taskModels = generateTaskModels(6);
- Map<String, Integer> prevTaskToContainerMapping = new HashMap<>();
- prevTaskToContainerMapping.put(getTaskName(0).getTaskName(), 0);
- prevTaskToContainerMapping.put(getTaskName(1).getTaskName(), 1);
- prevTaskToContainerMapping.put(getTaskName(2).getTaskName(), 1);
- prevTaskToContainerMapping.put(getTaskName(3).getTaskName(), 1);
- prevTaskToContainerMapping.put(getTaskName(4).getTaskName(), 1);
- prevTaskToContainerMapping.put(getTaskName(5).getTaskName(), 1);
+ Map<String, String> prevTaskToContainerMapping = new HashMap<>();
+ prevTaskToContainerMapping.put(getTaskName(0).getTaskName(), "0");
+ prevTaskToContainerMapping.put(getTaskName(1).getTaskName(), "1");
+ prevTaskToContainerMapping.put(getTaskName(2).getTaskName(), "1");
+ prevTaskToContainerMapping.put(getTaskName(3).getTaskName(), "1");
+ prevTaskToContainerMapping.put(getTaskName(4).getTaskName(), "1");
+ prevTaskToContainerMapping.put(getTaskName(5).getTaskName(), "1");
when(taskAssignmentManager.readTaskAssignment()).thenReturn(prevTaskToContainerMapping);
Set<ContainerModel> containers = new GroupByContainerCount(3).balance(taskModels, localityManager);
- Map<Integer, ContainerModel> containersMap = new HashMap<>();
+ Map<String, ContainerModel> containersMap = new HashMap<>();
for (ContainerModel container : containers) {
- containersMap.put(container.getContainerId(), container);
+ containersMap.put(container.getProcessorId(), container);
}
assertEquals(3, containers.size());
- ContainerModel container0 = containersMap.get(0);
- ContainerModel container1 = containersMap.get(1);
- ContainerModel container2 = containersMap.get(2);
+ ContainerModel container0 = containersMap.get("0");
+ ContainerModel container1 = containersMap.get("1");
+ ContainerModel container2 = containersMap.get("2");
assertNotNull(container0);
assertNotNull(container1);
assertNotNull(container2);
- assertEquals(0, container0.getContainerId());
- assertEquals(1, container1.getContainerId());
- assertEquals(2, container2.getContainerId());
+ assertEquals("0", container0.getProcessorId());
+ assertEquals("1", container1.getProcessorId());
+ assertEquals("2", container2.getProcessorId());
assertEquals(2, container0.getTasks().size());
assertEquals(2, container1.getTasks().size());
assertEquals(2, container1.getTasks().size());
@@ -626,12 +634,12 @@ public class TestGroupByContainerCount {
assertTrue(container2.getTasks().containsKey(getTaskName(4)));
assertTrue(container2.getTasks().containsKey(getTaskName(3)));
- verify(taskAssignmentManager).writeTaskContainerMapping(getTaskName(0).getTaskName(), 0);
- verify(taskAssignmentManager).writeTaskContainerMapping(getTaskName(1).getTaskName(), 1);
- verify(taskAssignmentManager).writeTaskContainerMapping(getTaskName(2).getTaskName(), 1);
- verify(taskAssignmentManager).writeTaskContainerMapping(getTaskName(3).getTaskName(), 2);
- verify(taskAssignmentManager).writeTaskContainerMapping(getTaskName(4).getTaskName(), 2);
- verify(taskAssignmentManager).writeTaskContainerMapping(getTaskName(5).getTaskName(), 0);
+ verify(taskAssignmentManager).writeTaskContainerMapping(getTaskName(0).getTaskName(), "0");
+ verify(taskAssignmentManager).writeTaskContainerMapping(getTaskName(1).getTaskName(), "1");
+ verify(taskAssignmentManager).writeTaskContainerMapping(getTaskName(2).getTaskName(), "1");
+ verify(taskAssignmentManager).writeTaskContainerMapping(getTaskName(3).getTaskName(), "2");
+ verify(taskAssignmentManager).writeTaskContainerMapping(getTaskName(4).getTaskName(), "2");
+ verify(taskAssignmentManager).writeTaskContainerMapping(getTaskName(5).getTaskName(), "0");
verify(taskAssignmentManager, never()).deleteTaskContainerMappings(anyCollection());
}
@@ -640,26 +648,26 @@ public class TestGroupByContainerCount {
public void testBalancerOldContainerCountOne() {
Set<TaskModel> taskModels = generateTaskModels(3);
Set<ContainerModel> prevContainers = new GroupByContainerCount(1).group(taskModels);
- Map<String, Integer> prevTaskToContainerMapping = generateTaskContainerMapping(prevContainers);
+ Map<String, String> prevTaskToContainerMapping = generateTaskContainerMapping(prevContainers);
when(taskAssignmentManager.readTaskAssignment()).thenReturn(prevTaskToContainerMapping);
Set<ContainerModel> containers = new GroupByContainerCount(3).balance(taskModels, localityManager);
// Results should be the same as calling group()
- Map<Integer, ContainerModel> containersMap = new HashMap<>();
+ Map<String, ContainerModel> containersMap = new HashMap<>();
for (ContainerModel container : containers) {
- containersMap.put(container.getContainerId(), container);
+ containersMap.put(container.getProcessorId(), container);
}
assertEquals(3, containers.size());
- ContainerModel container0 = containersMap.get(0);
- ContainerModel container1 = containersMap.get(1);
- ContainerModel container2 = containersMap.get(2);
+ ContainerModel container0 = containersMap.get("0");
+ ContainerModel container1 = containersMap.get("1");
+ ContainerModel container2 = containersMap.get("2");
assertNotNull(container0);
assertNotNull(container1);
assertNotNull(container2);
- assertEquals(0, container0.getContainerId());
- assertEquals(1, container1.getContainerId());
- assertEquals(2, container2.getContainerId());
+ assertEquals("0", container0.getProcessorId());
+ assertEquals("1", container1.getProcessorId());
+ assertEquals("2", container2.getProcessorId());
assertEquals(1, container0.getTasks().size());
assertEquals(1, container1.getTasks().size());
assertEquals(1, container2.getTasks().size());
@@ -669,9 +677,9 @@ public class TestGroupByContainerCount {
assertTrue(container2.getTasks().containsKey(getTaskName(2)));
// Verify task mappings are saved
- verify(taskAssignmentManager).writeTaskContainerMapping(getTaskName(0).getTaskName(), 0);
- verify(taskAssignmentManager).writeTaskContainerMapping(getTaskName(1).getTaskName(), 1);
- verify(taskAssignmentManager).writeTaskContainerMapping(getTaskName(2).getTaskName(), 2);
+ verify(taskAssignmentManager).writeTaskContainerMapping(getTaskName(0).getTaskName(), "0");
+ verify(taskAssignmentManager).writeTaskContainerMapping(getTaskName(1).getTaskName(), "1");
+ verify(taskAssignmentManager).writeTaskContainerMapping(getTaskName(2).getTaskName(), "2");
verify(taskAssignmentManager, never()).deleteTaskContainerMappings(anyCollection());
}
@@ -680,30 +688,30 @@ public class TestGroupByContainerCount {
public void testBalancerNewContainerCountOne() {
Set<TaskModel> taskModels = generateTaskModels(3);
Set<ContainerModel> prevContainers = new GroupByContainerCount(3).group(taskModels);
- Map<String, Integer> prevTaskToContainerMapping = generateTaskContainerMapping(prevContainers);
+ Map<String, String> prevTaskToContainerMapping = generateTaskContainerMapping(prevContainers);
when(taskAssignmentManager.readTaskAssignment()).thenReturn(prevTaskToContainerMapping);
Set<ContainerModel> containers = new GroupByContainerCount(1).balance(taskModels, localityManager);
// Results should be the same as calling group
- Map<Integer, ContainerModel> containersMap = new HashMap<>();
+ Map<String, ContainerModel> containersMap = new HashMap<>();
for (ContainerModel container : containers) {
- containersMap.put(container.getContainerId(), container);
+ containersMap.put(container.getProcessorId(), container);
}
assertEquals(1, containers.size());
- ContainerModel container0 = containersMap.get(0);
+ ContainerModel container0 = containersMap.get("0");
assertNotNull(container0);
- assertEquals(0, container0.getContainerId());
+ assertEquals("0", container0.getProcessorId());
assertEquals(3, container0.getTasks().size());
assertTrue(container0.getTasks().containsKey(getTaskName(0)));
assertTrue(container0.getTasks().containsKey(getTaskName(1)));
assertTrue(container0.getTasks().containsKey(getTaskName(2)));
- verify(taskAssignmentManager).writeTaskContainerMapping(getTaskName(0).getTaskName(), 0);
- verify(taskAssignmentManager).writeTaskContainerMapping(getTaskName(1).getTaskName(), 0);
- verify(taskAssignmentManager).writeTaskContainerMapping(getTaskName(2).getTaskName(), 0);
+ verify(taskAssignmentManager).writeTaskContainerMapping(getTaskName(0).getTaskName(), "0");
+ verify(taskAssignmentManager).writeTaskContainerMapping(getTaskName(1).getTaskName(), "0");
+ verify(taskAssignmentManager).writeTaskContainerMapping(getTaskName(2).getTaskName(), "0");
verify(taskAssignmentManager, never()).deleteTaskContainerMappings(anyCollection());
}
@@ -711,29 +719,29 @@ public class TestGroupByContainerCount {
@Test
public void testBalancerEmptyTaskMapping() {
Set<TaskModel> taskModels = generateTaskModels(3);
- when(taskAssignmentManager.readTaskAssignment()).thenReturn(new HashMap<String, Integer>());
+ when(taskAssignmentManager.readTaskAssignment()).thenReturn(new HashMap<String, String>());
Set<ContainerModel> containers = new GroupByContainerCount(1).balance(taskModels, localityManager);
// Results should be the same as calling group
- Map<Integer, ContainerModel> containersMap = new HashMap<>();
+ Map<String, ContainerModel> containersMap = new HashMap<>();
for (ContainerModel container : containers) {
- containersMap.put(container.getContainerId(), container);
+ containersMap.put(container.getProcessorId(), container);
}
assertEquals(1, containers.size());
- ContainerModel container0 = containersMap.get(0);
+ ContainerModel container0 = containersMap.get("0");
assertNotNull(container0);
- assertEquals(0, container0.getContainerId());
+ assertEquals("0", container0.getProcessorId());
assertEquals(3, container0.getTasks().size());
assertTrue(container0.getTasks().containsKey(getTaskName(0)));
assertTrue(container0.getTasks().containsKey(getTaskName(1)));
assertTrue(container0.getTasks().containsKey(getTaskName(2)));
- verify(taskAssignmentManager).writeTaskContainerMapping(getTaskName(0).getTaskName(), 0);
- verify(taskAssignmentManager).writeTaskContainerMapping(getTaskName(1).getTaskName(), 0);
- verify(taskAssignmentManager).writeTaskContainerMapping(getTaskName(2).getTaskName(), 0);
+ verify(taskAssignmentManager).writeTaskContainerMapping(getTaskName(0).getTaskName(), "0");
+ verify(taskAssignmentManager).writeTaskContainerMapping(getTaskName(1).getTaskName(), "0");
+ verify(taskAssignmentManager).writeTaskContainerMapping(getTaskName(2).getTaskName(), "0");
verify(taskAssignmentManager, never()).deleteTaskContainerMappings(anyCollection());
}
@@ -743,30 +751,30 @@ public class TestGroupByContainerCount {
int taskCount = 3;
Set<TaskModel> taskModels = generateTaskModels(taskCount);
Set<ContainerModel> prevContainers = new GroupByContainerCount(2).group(generateTaskModels(taskCount - 1));
- Map<String, Integer> prevTaskToContainerMapping = generateTaskContainerMapping(prevContainers);
+ Map<String, String> prevTaskToContainerMapping = generateTaskContainerMapping(prevContainers);
when(taskAssignmentManager.readTaskAssignment()).thenReturn(prevTaskToContainerMapping);
Set<ContainerModel> containers = new GroupByContainerCount(1).balance(taskModels, localityManager);
// Results should be the same as calling group
- Map<Integer, ContainerModel> containersMap = new HashMap<>();
+ Map<String, ContainerModel> containersMap = new HashMap<>();
for (ContainerModel container : containers) {
- containersMap.put(container.getContainerId(), container);
+ containersMap.put(container.getProcessorId(), container);
}
assertEquals(1, containers.size());
- ContainerModel container0 = containersMap.get(0);
+ ContainerModel container0 = containersMap.get("0");
assertNotNull(container0);
- assertEquals(0, container0.getContainerId());
+ assertEquals("0", container0.getProcessorId());
assertEquals(3, container0.getTasks().size());
assertTrue(container0.getTasks().containsKey(getTaskName(0)));
assertTrue(container0.getTasks().containsKey(getTaskName(1)));
assertTrue(container0.getTasks().containsKey(getTaskName(2)));
- verify(taskAssignmentManager).writeTaskContainerMapping(getTaskName(0).getTaskName(), 0);
- verify(taskAssignmentManager).writeTaskContainerMapping(getTaskName(1).getTaskName(), 0);
- verify(taskAssignmentManager).writeTaskContainerMapping(getTaskName(2).getTaskName(), 0);
+ verify(taskAssignmentManager).writeTaskContainerMapping(getTaskName(0).getTaskName(), "0");
+ verify(taskAssignmentManager).writeTaskContainerMapping(getTaskName(1).getTaskName(), "0");
+ verify(taskAssignmentManager).writeTaskContainerMapping(getTaskName(2).getTaskName(), "0");
verify(taskAssignmentManager).deleteTaskContainerMappings(anyCollection());
}
@@ -776,30 +784,30 @@ public class TestGroupByContainerCount {
int taskCount = 3;
Set<TaskModel> taskModels = generateTaskModels(taskCount);
Set<ContainerModel> prevContainers = new GroupByContainerCount(3).group(generateTaskModels(taskCount + 1));
- Map<String, Integer> prevTaskToContainerMapping = generateTaskContainerMapping(prevContainers);
+ Map<String, String> prevTaskToContainerMapping = generateTaskContainerMapping(prevContainers);
when(taskAssignmentManager.readTaskAssignment()).thenReturn(prevTaskToContainerMapping);
Set<ContainerModel> containers = new GroupByContainerCount(1).balance(taskModels, localityManager);
// Results should be the same as calling group
- Map<Integer, ContainerModel> containersMap = new HashMap<>();
+ Map<String, ContainerModel> containersMap = new HashMap<>();
for (ContainerModel container : containers) {
- containersMap.put(container.getContainerId(), container);
+ containersMap.put(container.getProcessorId(), container);
}
assertEquals(1, containers.size());
- ContainerModel container0 = containersMap.get(0);
+ ContainerModel container0 = containersMap.get("0");
assertNotNull(container0);
- assertEquals(0, container0.getContainerId());
+ assertEquals("0", container0.getProcessorId());
assertEquals(3, container0.getTasks().size());
assertTrue(container0.getTasks().containsKey(getTaskName(0)));
assertTrue(container0.getTasks().containsKey(getTaskName(1)));
assertTrue(container0.getTasks().containsKey(getTaskName(2)));
- verify(taskAssignmentManager).writeTaskContainerMapping(getTaskName(0).getTaskName(), 0);
- verify(taskAssignmentManager).writeTaskContainerMapping(getTaskName(1).getTaskName(), 0);
- verify(taskAssignmentManager).writeTaskContainerMapping(getTaskName(2).getTaskName(), 0);
+ verify(taskAssignmentManager).writeTaskContainerMapping(getTaskName(0).getTaskName(), "0");
+ verify(taskAssignmentManager).writeTaskContainerMapping(getTaskName(1).getTaskName(), "0");
+ verify(taskAssignmentManager).writeTaskContainerMapping(getTaskName(2).getTaskName(), "0");
verify(taskAssignmentManager).deleteTaskContainerMappings(anyCollection());
}
@@ -808,7 +816,7 @@ public class TestGroupByContainerCount {
public void testBalancerNewContainerCountGreaterThanTasks() {
Set<TaskModel> taskModels = generateTaskModels(3);
Set<ContainerModel> prevContainers = new GroupByContainerCount(3).group(taskModels);
- Map<String, Integer> prevTaskToContainerMapping = generateTaskContainerMapping(prevContainers);
+ Map<String, String> prevTaskToContainerMapping = generateTaskContainerMapping(prevContainers);
when(taskAssignmentManager.readTaskAssignment()).thenReturn(prevTaskToContainerMapping);
new GroupByContainerCount(5).balance(taskModels, localityManager); // Should throw
@@ -818,7 +826,7 @@ public class TestGroupByContainerCount {
public void testBalancerEmptyTasks() {
Set<TaskModel> taskModels = generateTaskModels(3);
Set<ContainerModel> prevContainers = new GroupByContainerCount(3).group(taskModels);
- Map<String, Integer> prevTaskToContainerMapping = generateTaskContainerMapping(prevContainers);
+ Map<String, String> prevTaskToContainerMapping = generateTaskContainerMapping(prevContainers);
when(taskAssignmentManager.readTaskAssignment()).thenReturn(prevTaskToContainerMapping);
new GroupByContainerCount(5).balance(new HashSet<TaskModel>(), localityManager); // Should throw
@@ -828,10 +836,25 @@ public class TestGroupByContainerCount {
public void testBalancerResultImmutable() {
Set<TaskModel> taskModels = generateTaskModels(3);
Set<ContainerModel> prevContainers = new GroupByContainerCount(3).group(taskModels);
- Map<String, Integer> prevTaskToContainerMapping = generateTaskContainerMapping(prevContainers);
+ Map<String, String> prevTaskToContainerMapping = generateTaskContainerMapping(prevContainers);
when(taskAssignmentManager.readTaskAssignment()).thenReturn(prevTaskToContainerMapping);
Set<ContainerModel> containers = new GroupByContainerCount(2).balance(taskModels, localityManager);
containers.remove(containers.iterator().next());
}
+
+ @Test(expected = SamzaException.class)
+ public void testBalancerThrowsOnNonIntegerContainerIds() {
+ Set<TaskModel> taskModels = generateTaskModels(3);
+ Set<ContainerModel> prevContainers = new HashSet<>();
+ taskModels.forEach(model -> {
+ prevContainers.add(
+ new ContainerModel(UUID.randomUUID().toString(), -1, Collections.singletonMap(model.getTaskName(), model)));
+ });
+ Map<String, String> prevTaskToContainerMapping = generateTaskContainerMapping(prevContainers);
+ when(taskAssignmentManager.readTaskAssignment()).thenReturn(prevTaskToContainerMapping);
+
+ new GroupByContainerCount(3).balance(taskModels, localityManager); //Should throw
+
+ }
}
http://git-wip-us.apache.org/repos/asf/samza/blob/a7da1840/samza-core/src/test/java/org/apache/samza/container/grouper/task/TestGroupByContainerIds.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/container/grouper/task/TestGroupByContainerIds.java b/samza-core/src/test/java/org/apache/samza/container/grouper/task/TestGroupByContainerIds.java
index 82f2b7a..62131fe 100644
--- a/samza-core/src/test/java/org/apache/samza/container/grouper/task/TestGroupByContainerIds.java
+++ b/samza-core/src/test/java/org/apache/samza/container/grouper/task/TestGroupByContainerIds.java
@@ -44,13 +44,11 @@ import static org.mockito.Mockito.when;
public class TestGroupByContainerIds {
- private TaskAssignmentManager taskAssignmentManager;
- private LocalityManager localityManager;
@Before
public void setup() {
- taskAssignmentManager = mock(TaskAssignmentManager.class);
- localityManager = mock(LocalityManager.class);
+ TaskAssignmentManager taskAssignmentManager = mock(TaskAssignmentManager.class);
+ LocalityManager localityManager = mock(LocalityManager.class);
when(localityManager.getTaskAssignmentManager()).thenReturn(taskAssignmentManager);
@@ -94,18 +92,18 @@ public class TestGroupByContainerIds {
Set<ContainerModel> containers = buildSimpleGrouper(2).group(taskModels);
- Map<Integer, ContainerModel> containersMap = new HashMap<>();
+ Map<String, ContainerModel> containersMap = new HashMap<>();
for (ContainerModel container : containers) {
- containersMap.put(container.getContainerId(), container);
+ containersMap.put(container.getProcessorId(), container);
}
assertEquals(2, containers.size());
- ContainerModel container0 = containersMap.get(0);
- ContainerModel container1 = containersMap.get(1);
+ ContainerModel container0 = containersMap.get("0");
+ ContainerModel container1 = containersMap.get("1");
assertNotNull(container0);
assertNotNull(container1);
- assertEquals(0, container0.getContainerId());
- assertEquals(1, container1.getContainerId());
+ assertEquals("0", container0.getProcessorId());
+ assertEquals("1", container1.getProcessorId());
assertEquals(3, container0.getTasks().size());
assertEquals(2, container1.getTasks().size());
assertTrue(container0.getTasks().containsKey(getTaskName(0)));
@@ -119,27 +117,27 @@ public class TestGroupByContainerIds {
public void testGroupHappyPathWithListOfContainers() {
Set<TaskModel> taskModels = generateTaskModels(5);
- List<Integer> containerIds = new ArrayList<Integer>() {
+ List<String> containerIds = new ArrayList<String>() {
{
- add(4);
- add(2);
+ add("4");
+ add("2");
}
};
Set<ContainerModel> containers = buildSimpleGrouper().group(taskModels, containerIds);
- Map<Integer, ContainerModel> containersMap = new HashMap<>();
+ Map<String, ContainerModel> containersMap = new HashMap<>();
for (ContainerModel container : containers) {
- containersMap.put(container.getContainerId(), container);
+ containersMap.put(container.getProcessorId(), container);
}
assertEquals(2, containers.size());
- ContainerModel container0 = containersMap.get(4);
- ContainerModel container1 = containersMap.get(2);
+ ContainerModel container0 = containersMap.get("4");
+ ContainerModel container1 = containersMap.get("2");
assertNotNull(container0);
assertNotNull(container1);
- assertEquals(4, container0.getContainerId());
- assertEquals(2, container1.getContainerId());
+ assertEquals("4", container0.getProcessorId());
+ assertEquals("2", container1.getProcessorId());
assertEquals(3, container0.getTasks().size());
assertEquals(2, container1.getTasks().size());
assertTrue(container0.getTasks().containsKey(getTaskName(0)));
@@ -154,28 +152,28 @@ public class TestGroupByContainerIds {
public void testGroupManyTasks() {
Set<TaskModel> taskModels = generateTaskModels(21);
- List<Integer> containerIds = new ArrayList<Integer>() {
+ List<String> containerIds = new ArrayList<String>() {
{
- add(4);
- add(2);
+ add("4");
+ add("2");
}
};
Set<ContainerModel> containers = buildSimpleGrouper().group(taskModels, containerIds);
- Map<Integer, ContainerModel> containersMap = new HashMap<>();
+ Map<String, ContainerModel> containersMap = new HashMap<>();
for (ContainerModel container : containers) {
- containersMap.put(container.getContainerId(), container);
+ containersMap.put(container.getProcessorId(), container);
}
assertEquals(2, containers.size());
- ContainerModel container0 = containersMap.get(4);
- ContainerModel container1 = containersMap.get(2);
+ ContainerModel container0 = containersMap.get("4");
+ ContainerModel container1 = containersMap.get("2");
assertNotNull(container0);
assertNotNull(container1);
- assertEquals(4, container0.getContainerId());
- assertEquals(2, container1.getContainerId());
+ assertEquals("4", container0.getProcessorId());
+ assertEquals("2", container1.getProcessorId());
assertEquals(11, container0.getTasks().size());
assertEquals(10, container1.getTasks().size());
[3/3] samza git commit: SAMZA-1126 - Semantics of processorId in Samza
Posted by na...@apache.org.
SAMZA-1126 - Semantics of processorId in Samza
Implementation based on [SEP-1](https://cwiki.apache.org/confluence/display/SAMZA/SEP-1%3A+Semantics+of+ProcessorId+in+Samza)
Author: navina <na...@apache.org>
Reviewers: Yi Pan <ni...@gmail.com>, Jacob Maes <jm...@linkedin.com>, Jagadish <ja...@apache.org>
Closes #103 from navina/SAMZA-1126
Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/a7da1840
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/a7da1840
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/a7da1840
Branch: refs/heads/master
Commit: a7da1840f17a182b49c30451f35991c97fc51068
Parents: c74722b
Author: Navina Ramesh <na...@apache.org>
Authored: Fri Apr 7 15:22:13 2017 -0700
Committer: nramesh <nr...@linkedin.com>
Committed: Fri Apr 7 15:22:13 2017 -0700
----------------------------------------------------------------------
.../samza/container/SamzaContainerContext.java | 4 +-
.../org/apache/samza/job/CommandBuilder.java | 4 +-
.../samza/runtime/ProcessorIdGenerator.java | 51 +++
.../AbstractContainerAllocator.java | 12 +-
.../clustermanager/ContainerProcessManager.java | 19 +-
.../HostAwareContainerAllocator.java | 2 +-
.../clustermanager/SamzaApplicationState.java | 2 +-
.../clustermanager/SamzaResourceRequest.java | 6 +-
.../apache/samza/config/ApplicationConfig.java | 60 +++
.../apache/samza/container/LocalityManager.java | 12 +-
.../grouper/task/GroupByContainerCount.java | 43 ++-
.../grouper/task/GroupByContainerIds.java | 10 +-
.../task/SingleContainerGrouperFactory.java | 8 +-
.../grouper/task/TaskAssignmentManager.java | 10 +-
.../container/grouper/task/TaskNameGrouper.java | 2 +-
.../samza/coordinator/JobCoordinator.java | 22 +-
.../coordinator/JobCoordinatorFactory.java | 3 +-
.../messages/SetTaskContainerMapping.java | 7 +-
.../apache/samza/job/model/ContainerModel.java | 31 +-
.../org/apache/samza/job/model/JobModel.java | 26 +-
.../processor/SamzaContainerController.java | 20 +-
.../apache/samza/processor/StreamProcessor.java | 53 +--
.../samza/runtime/LocalContainerRunner.java | 10 +-
.../org/apache/samza/runtime/UUIDGenerator.java | 41 ++
.../model/JsonContainerModelMixIn.java | 9 +-
.../serializers/model/JsonJobModelMixIn.java | 4 +-
.../serializers/model/SamzaObjectMapper.java | 30 +-
.../standalone/StandaloneJobCoordinator.java | 30 +-
.../StandaloneJobCoordinatorFactory.java | 4 +-
.../apache/samza/storage/StorageRecovery.java | 4 +-
.../apache/samza/util/ClassLoaderHelper.java | 19 +
.../samza/zk/ZkCoordinationServiceFactory.java | 4 +-
.../org/apache/samza/zk/ZkJobCoordinator.java | 40 +-
.../samza/zk/ZkJobCoordinatorFactory.java | 10 +-
.../main/java/org/apache/samza/zk/ZkUtils.java | 21 +-
.../samza/config/ShellCommandConfig.scala | 3 +-
.../apache/samza/container/SamzaContainer.scala | 8 +-
.../samza/coordinator/JobModelManager.scala | 6 +-
.../samza/job/local/ProcessJobFactory.scala | 2 +-
.../samza/job/local/ThreadJobFactory.scala | 6 +-
.../clustermanager/MockContainerAllocator.java | 2 +-
.../clustermanager/TestContainerAllocator.java | 28 +-
.../TestContainerProcessManager.java | 25 +-
.../TestContainerRequestState.java | 12 +-
.../TestHostAwareContainerAllocator.java | 45 +--
.../samza/container/TestLocalityManager.java | 12 +-
.../grouper/task/TestGroupByContainerCount.java | 377 ++++++++++---------
.../grouper/task/TestGroupByContainerIds.java | 54 ++-
.../grouper/task/TestTaskAssignmentManager.java | 45 +--
.../samza/container/mock/ContainerMocks.java | 18 +-
.../model/TestSamzaObjectMapper.java | 61 ++-
.../zk/TestZkBarrierForVersionUpgrade.java | 12 +-
.../apache/samza/zk/TestZkLeaderElector.java | 1 -
.../java/org/apache/samza/zk/TestZkUtils.java | 10 +-
.../samza/container/TestSamzaContainer.scala | 56 ++-
.../samza/container/TestTaskInstance.scala | 10 +-
.../samza/coordinator/TestJobCoordinator.scala | 13 +-
.../samza/job/TestShellCommandBuilder.scala | 4 +-
.../samza/storage/kv/RocksDbKeyValueReader.java | 2 +-
.../java/org/apache/samza/rest/model/Task.java | 12 +-
.../samza/rest/proxy/task/SamzaTaskProxy.java | 4 +-
.../samza/monitor/TestLocalStoreMonitor.java | 4 +-
.../rest/resources/mock/MockTaskProxy.java | 8 +-
.../performance/TestKeyValuePerformance.scala | 2 +-
.../test/processor/TestStreamProcessor.java | 19 +-
.../test/integration/StreamTaskTestUtil.scala | 3 +-
.../org/apache/samza/job/yarn/YarnAppState.java | 2 +-
.../job/yarn/YarnClusterResourceManager.java | 23 +-
.../samza/job/yarn/YarnContainerRunner.java | 4 +-
.../samza/validation/YarnJobValidationTool.java | 6 +-
70 files changed, 898 insertions(+), 634 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/samza/blob/a7da1840/samza-api/src/main/java/org/apache/samza/container/SamzaContainerContext.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/container/SamzaContainerContext.java b/samza-api/src/main/java/org/apache/samza/container/SamzaContainerContext.java
index fd7333b..4076a51 100644
--- a/samza-api/src/main/java/org/apache/samza/container/SamzaContainerContext.java
+++ b/samza-api/src/main/java/org/apache/samza/container/SamzaContainerContext.java
@@ -28,7 +28,7 @@ import java.util.Collections;
* A SamzaContainerContext maintains per-container information for the tasks it executes.
*/
public class SamzaContainerContext {
- public final int id;
+ public final String id;
public final Config config;
public final Collection<TaskName> taskNames;
@@ -40,7 +40,7 @@ public class SamzaContainerContext {
* @param taskNames The set of taskName keys for which this container is responsible.
*/
public SamzaContainerContext(
- int id,
+ String id,
Config config,
Collection<TaskName> taskNames) {
this.id = id;
http://git-wip-us.apache.org/repos/asf/samza/blob/a7da1840/samza-api/src/main/java/org/apache/samza/job/CommandBuilder.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/job/CommandBuilder.java b/samza-api/src/main/java/org/apache/samza/job/CommandBuilder.java
index 6d46f5d..fc7438b 100644
--- a/samza-api/src/main/java/org/apache/samza/job/CommandBuilder.java
+++ b/samza-api/src/main/java/org/apache/samza/job/CommandBuilder.java
@@ -30,7 +30,7 @@ import org.apache.samza.config.Config;
*/
public abstract class CommandBuilder {
protected Config config;
- protected int id;
+ protected String id;
protected URL url;
protected String commandPath;
@@ -61,7 +61,7 @@ public abstract class CommandBuilder {
* associated with a specific instantiation of a SamzaContainer.
* @return self to support a builder style of use.
*/
- public CommandBuilder setId(int id) {
+ public CommandBuilder setId(String id) {
this.id = id;
return this;
}
http://git-wip-us.apache.org/repos/asf/samza/blob/a7da1840/samza-api/src/main/java/org/apache/samza/runtime/ProcessorIdGenerator.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/runtime/ProcessorIdGenerator.java b/samza-api/src/main/java/org/apache/samza/runtime/ProcessorIdGenerator.java
new file mode 100644
index 0000000..8790d69
--- /dev/null
+++ b/samza-api/src/main/java/org/apache/samza/runtime/ProcessorIdGenerator.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.runtime;
+
+import org.apache.samza.annotation.InterfaceStability;
+import org.apache.samza.config.Config;
+
+@InterfaceStability.Evolving
+public interface ProcessorIdGenerator {
+ /**
+ * Generates a String representation to identify a single instance of StreamProcessor.
+ *
+ * This value can be representative of its current executing environment. It can also be custom-managed by the user,
+ * as long as it adheres to the specification below. More than one processor can co-exist within the same JVM,
+ * as long as their identifiers are guaranteed to be unique.
+ *
+ * <b>Specification of processor identifier</b>:
+ * <ul>
+ * <li>Processor identifier has to be unique among the processors within a job</li>
+ * <li>When more than one processor co-exist within the same JVM, the processor identifier can be of the format:
+ * $x_$y, where 'x' is a unique identifier for the executing JVM and 'y' is a unique identifier for the
+ * processor instance within the JVM. When there is only one processor within a JVM, 'x' should be sufficient to
+ * uniquely identify the processor instance.</li>
+ * </ul>
+ *
+ * <b>Note</b>:
+ * In case of more than one processors within the same JVM, the custom implementation of ProcessorIdGenerator can
+ * contain a static counter, which is incremented on each call to generateProcessorId. The counter value can
+ * be treated as the identifier for the processor instance within the JVM.
+ *
+ * @param config Config instance
+ * @return String Identifier for the processor
+ */
+ String generateProcessorId(Config config);
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/a7da1840/samza-core/src/main/java/org/apache/samza/clustermanager/AbstractContainerAllocator.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/clustermanager/AbstractContainerAllocator.java b/samza-core/src/main/java/org/apache/samza/clustermanager/AbstractContainerAllocator.java
index d47f217..b83d83c 100644
--- a/samza-core/src/main/java/org/apache/samza/clustermanager/AbstractContainerAllocator.java
+++ b/samza-core/src/main/java/org/apache/samza/clustermanager/AbstractContainerAllocator.java
@@ -145,7 +145,7 @@ public abstract class AbstractContainerAllocator implements Runnable {
// Update state
resourceRequestState.updateStateAfterAssignment(request, preferredHost, resource);
- int containerID = request.getContainerID();
+ String containerID = request.getContainerID();
//run container on resource
log.info("Found available resources on {}. Assigning request for container_id {} with "
@@ -176,9 +176,9 @@ public abstract class AbstractContainerAllocator implements Runnable {
* - when host-affinity is not enabled, or
* - when host-affinity is enabled and job is run for the first time
*/
- public void requestResources(Map<Integer, String> resourceToHostMappings) {
- for (Map.Entry<Integer, String> entry : resourceToHostMappings.entrySet()) {
- int containerId = entry.getKey();
+ public void requestResources(Map<String, String> resourceToHostMappings) {
+ for (Map.Entry<String, String> entry : resourceToHostMappings.entrySet()) {
+ String containerId = entry.getKey();
String preferredHost = entry.getValue();
if (preferredHost == null)
preferredHost = ResourceRequestState.ANY_HOST;
@@ -211,7 +211,7 @@ public abstract class AbstractContainerAllocator implements Runnable {
* this request
* @param preferredHost Name of the host that you prefer to run the container on
*/
- public final void requestResource(int containerID, String preferredHost) {
+ public final void requestResource(String containerID, String preferredHost) {
SamzaResourceRequest request = new SamzaResourceRequest(this.containerNumCpuCores, this.containerMemoryMb,
preferredHost, containerID);
resourceRequestState.addResourceRequest(request);
@@ -242,7 +242,7 @@ public abstract class AbstractContainerAllocator implements Runnable {
* @param samzaContainerId to configure the builder with.
* @return the constructed builder object
*/
- private CommandBuilder getCommandBuilder(int samzaContainerId) {
+ private CommandBuilder getCommandBuilder(String samzaContainerId) {
String cmdBuilderClassName = taskConfig.getCommandClass(ShellCommandBuilder.class.getName());
CommandBuilder cmdBuilder = (CommandBuilder) Util.getObj(cmdBuilderClassName);
http://git-wip-us.apache.org/repos/asf/samza/blob/a7da1840/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerProcessManager.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerProcessManager.java b/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerProcessManager.java
index b4309d9..9b5e871 100644
--- a/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerProcessManager.java
+++ b/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerProcessManager.java
@@ -93,7 +93,7 @@ public class ContainerProcessManager implements ClusterResourceManager.Callback
* value is the {@link ResourceFailure} object that has a count of failures.
*
*/
- private final Map<Integer, ResourceFailure> containerFailures = new HashMap<>();
+ private final Map<String, ResourceFailure> containerFailures = new HashMap<>();
private final ContainerProcessManagerMetrics metrics;
@@ -173,7 +173,7 @@ public class ContainerProcessManager implements ClusterResourceManager.Callback
state.neededContainers.set(containerCount);
// Request initial set of containers
- Map<Integer, String> containerToHostMapping = state.jobModelManager.jobModel().getAllContainerLocality();
+ Map<String, String> containerToHostMapping = state.jobModelManager.jobModel().getAllContainerLocality();
containerAllocator.requestResources(containerToHostMapping);
@@ -228,8 +228,8 @@ public class ContainerProcessManager implements ClusterResourceManager.Callback
*/
public void onResourceCompleted(SamzaResourceStatus containerStatus) {
String containerIdStr = containerStatus.getResourceID();
- int containerId = -1;
- for (Map.Entry<Integer, SamzaResource> entry: state.runningContainers.entrySet()) {
+ String containerId = null;
+ for (Map.Entry<String, SamzaResource> entry: state.runningContainers.entrySet()) {
if (entry.getValue().getResourceID().equals(containerStatus.getResourceID())) {
log.info("Matching container ID found " + entry.getKey() + " " + entry.getValue());
@@ -237,10 +237,11 @@ public class ContainerProcessManager implements ClusterResourceManager.Callback
break;
}
}
- if (containerId == -1) {
+ if (containerId == null) {
log.info("No matching container id found for " + containerStatus.toString());
+ } else {
+ state.runningContainers.remove(containerId);
}
- state.runningContainers.remove(containerId);
int exitStatus = containerStatus.getExitCode();
switch (exitStatus) {
@@ -249,7 +250,7 @@ public class ContainerProcessManager implements ClusterResourceManager.Callback
state.completedContainers.incrementAndGet();
- if (containerId != -1) {
+ if (containerId != null) {
state.finishedContainers.incrementAndGet();
containerFailures.remove(containerId);
}
@@ -275,7 +276,7 @@ public class ContainerProcessManager implements ClusterResourceManager.Callback
// clean up, and request a refactor container for the tasks. This only
// should happen if the container was 'lost' due to node failure, not
// if the AM released the container.
- if (containerId != -1) {
+ if (containerId != null) {
log.info("Released container {} was assigned task group ID {}. Requesting a refactor container for the task group.", containerIdStr, containerId);
state.neededContainers.incrementAndGet();
@@ -295,7 +296,7 @@ public class ContainerProcessManager implements ClusterResourceManager.Callback
state.failedContainersStatus.put(containerIdStr, containerStatus);
state.jobHealthy.set(false);
- if (containerId != -1) {
+ if (containerId != null) {
state.neededContainers.incrementAndGet();
// Find out previously running container location
String lastSeenOn = state.jobModelManager.jobModel().getContainerToHostValue(containerId, SetContainerHostMapping.HOST_KEY);
http://git-wip-us.apache.org/repos/asf/samza/blob/a7da1840/samza-core/src/main/java/org/apache/samza/clustermanager/HostAwareContainerAllocator.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/clustermanager/HostAwareContainerAllocator.java b/samza-core/src/main/java/org/apache/samza/clustermanager/HostAwareContainerAllocator.java
index da73049..66e2246 100644
--- a/samza-core/src/main/java/org/apache/samza/clustermanager/HostAwareContainerAllocator.java
+++ b/samza-core/src/main/java/org/apache/samza/clustermanager/HostAwareContainerAllocator.java
@@ -64,7 +64,7 @@ public class HostAwareContainerAllocator extends AbstractContainerAllocator {
SamzaResourceRequest request = peekPendingRequest();
log.info("Handling request: " + request.getContainerID() + " " + request.getRequestTimestampMs() + " " + request.getPreferredHost());
String preferredHost = request.getPreferredHost();
- int containerID = request.getContainerID();
+ String containerID = request.getContainerID();
if (hasAllocatedResource(preferredHost)) {
// Found allocated container at preferredHost
http://git-wip-us.apache.org/repos/asf/samza/blob/a7da1840/samza-core/src/main/java/org/apache/samza/clustermanager/SamzaApplicationState.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/clustermanager/SamzaApplicationState.java b/samza-core/src/main/java/org/apache/samza/clustermanager/SamzaApplicationState.java
index cf91044..bde3fac 100644
--- a/samza-core/src/main/java/org/apache/samza/clustermanager/SamzaApplicationState.java
+++ b/samza-core/src/main/java/org/apache/samza/clustermanager/SamzaApplicationState.java
@@ -99,7 +99,7 @@ public class SamzaApplicationState {
* Map of the samzaContainerId to the {@link SamzaResource} on which it is running
* Modified by both the AMRMCallbackThread and the ContainerAllocator thread
*/
- public final ConcurrentMap<Integer, SamzaResource> runningContainers = new ConcurrentHashMap<Integer, SamzaResource>(0);
+ public final ConcurrentMap<String, SamzaResource> runningContainers = new ConcurrentHashMap<String, SamzaResource>(0);
/**
* Final status of the application
http://git-wip-us.apache.org/repos/asf/samza/blob/a7da1840/samza-core/src/main/java/org/apache/samza/clustermanager/SamzaResourceRequest.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/clustermanager/SamzaResourceRequest.java b/samza-core/src/main/java/org/apache/samza/clustermanager/SamzaResourceRequest.java
index 3d1560f..4159ff2 100644
--- a/samza-core/src/main/java/org/apache/samza/clustermanager/SamzaResourceRequest.java
+++ b/samza-core/src/main/java/org/apache/samza/clustermanager/SamzaResourceRequest.java
@@ -55,14 +55,14 @@ public class SamzaResourceRequest implements Comparable<SamzaResourceRequest> {
/**
* The ID of the StreamProcessor which this request is for.
*/
- private final int containerID;
+ private final String containerID;
/**
* The timestamp in millis when the request was created.
*/
private final long requestTimestampMs;
- public SamzaResourceRequest(int numCores, int memoryMB, String preferredHost, int expectedContainerID) {
+ public SamzaResourceRequest(int numCores, int memoryMB, String preferredHost, String expectedContainerID) {
this.numCores = numCores;
this.memoryMB = memoryMB;
this.preferredHost = preferredHost;
@@ -72,7 +72,7 @@ public class SamzaResourceRequest implements Comparable<SamzaResourceRequest> {
log.info("Resource Request created for {} on {} at {}", new Object[] {this.containerID, this.preferredHost, this.requestTimestampMs});
}
- public int getContainerID() {
+ public String getContainerID() {
return containerID;
}
http://git-wip-us.apache.org/repos/asf/samza/blob/a7da1840/samza-core/src/main/java/org/apache/samza/config/ApplicationConfig.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/config/ApplicationConfig.java b/samza-core/src/main/java/org/apache/samza/config/ApplicationConfig.java
new file mode 100644
index 0000000..708daa6
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/config/ApplicationConfig.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.config;
+
+/**
+ * Accessors for configs associated with Application scope
+ */
+public class ApplicationConfig extends MapConfig {
+ /**
+ * <p>processor.id is similar to the logical containerId generated in Samza. However, in addition to identifying the JVM
+ * of the processor, it also contains a segment to identify the instance of the
+ * {@link org.apache.samza.processor.StreamProcessor} within the JVM. More detail can be found in
+ * {@link org.apache.samza.runtime.ProcessorIdGenerator}. </p>
+ * <p>
+ * This is an important distinction because Samza 0.13.0 in Yarn has a 1:1 mapping between the processor and the Yarn
+ * container (JVM). However, Samza in an embedded execution can contain more than one processor within the same JVM.
+ * </p>
+ * <b>Note:</b>This identifier has to be unique across the instances of StreamProcessors.
+ * TODO: Deprecated in 0.13. After 0.13+, this id is generated using {@link org.apache.samza.runtime.ProcessorIdGenerator}
+ */
+ @Deprecated
+ public static final String PROCESSOR_ID = "processor.id";
+
+ /**
+ * Class implementing the {@link org.apache.samza.runtime.ProcessorIdGenerator} interface
+ * Used to generate a unique identifier for a {@link org.apache.samza.processor.StreamProcessor} based on the runtime
+ * environment. Hence, durability of the identifier is same as the guarantees provided by the runtime environment
+ */
+ public static final String APP_PROCESSOR_ID_GENERATOR_CLASS = "app.processor-id-generator.class";
+
+ public ApplicationConfig(Config config) {
+ super(config);
+ }
+
+ public String getAppProcessorIdGeneratorClass() {
+ return get(APP_PROCESSOR_ID_GENERATOR_CLASS, null);
+ }
+
+ @Deprecated
+ public String getProcessorId() {
+ return get(PROCESSOR_ID, null);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/a7da1840/samza-core/src/main/java/org/apache/samza/container/LocalityManager.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/container/LocalityManager.java b/samza-core/src/main/java/org/apache/samza/container/LocalityManager.java
index a615d4f..22380d3 100644
--- a/samza-core/src/main/java/org/apache/samza/container/LocalityManager.java
+++ b/samza-core/src/main/java/org/apache/samza/container/LocalityManager.java
@@ -37,7 +37,7 @@ import org.apache.samza.coordinator.stream.messages.SetContainerHostMapping;
* */
public class LocalityManager extends AbstractCoordinatorStreamManager {
private static final Logger log = LoggerFactory.getLogger(LocalityManager.class);
- private Map<Integer, Map<String, String>> containerToHostMapping = new HashMap<>();
+ private Map<String, Map<String, String>> containerToHostMapping = new HashMap<>();
private final TaskAssignmentManager taskAssignmentManager;
private final boolean writeOnly;
@@ -92,23 +92,23 @@ public class LocalityManager extends AbstractCoordinatorStreamManager {
*
* @return the map of containerId: (hostname, jmxAddress, jmxTunnelAddress)
*/
- public Map<Integer, Map<String, String>> readContainerLocality() {
+ public Map<String, Map<String, String>> readContainerLocality() {
if (this.writeOnly) {
throw new UnsupportedOperationException("Read container locality function is not supported in write-only LocalityManager");
}
- Map<Integer, Map<String, String>> allMappings = new HashMap<>();
+ Map<String, Map<String, String>> allMappings = new HashMap<>();
for (CoordinatorStreamMessage message: getBootstrappedStream(SetContainerHostMapping.TYPE)) {
SetContainerHostMapping mapping = new SetContainerHostMapping(message);
Map<String, String> localityMappings = new HashMap<>();
localityMappings.put(SetContainerHostMapping.HOST_KEY, mapping.getHostLocality());
localityMappings.put(SetContainerHostMapping.JMX_URL_KEY, mapping.getJmxUrl());
localityMappings.put(SetContainerHostMapping.JMX_TUNNELING_URL_KEY, mapping.getJmxTunnelingUrl());
- allMappings.put(Integer.parseInt(mapping.getKey()), localityMappings);
+ allMappings.put(mapping.getKey(), localityMappings);
}
containerToHostMapping = Collections.unmodifiableMap(allMappings);
- for (Map.Entry<Integer, Map<String, String>> entry : containerToHostMapping.entrySet()) {
+ for (Map.Entry<String, Map<String, String>> entry : containerToHostMapping.entrySet()) {
log.debug(String.format("Locality for container %s: %s", entry.getKey(), entry.getValue()));
}
@@ -123,7 +123,7 @@ public class LocalityManager extends AbstractCoordinatorStreamManager {
* @param jmxAddress the JMX URL address
* @param jmxTunnelingAddress the JMX Tunnel URL address
*/
- public void writeContainerToHostMapping(Integer containerId, String hostName, String jmxAddress, String jmxTunnelingAddress) {
+ public void writeContainerToHostMapping(String containerId, String hostName, String jmxAddress, String jmxTunnelingAddress) {
Map<String, String> existingMappings = containerToHostMapping.get(containerId);
String existingHostMapping = existingMappings != null ? existingMappings.get(SetContainerHostMapping.HOST_KEY) : null;
if (existingHostMapping != null && !existingHostMapping.equals(hostName)) {
http://git-wip-us.apache.org/repos/asf/samza/blob/a7da1840/samza-core/src/main/java/org/apache/samza/container/grouper/task/GroupByContainerCount.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/container/grouper/task/GroupByContainerCount.java b/samza-core/src/main/java/org/apache/samza/container/grouper/task/GroupByContainerCount.java
index 5e6ccf8..246188e 100644
--- a/samza-core/src/main/java/org/apache/samza/container/grouper/task/GroupByContainerCount.java
+++ b/samza-core/src/main/java/org/apache/samza/container/grouper/task/GroupByContainerCount.java
@@ -27,6 +27,8 @@ import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
+
+import org.apache.samza.SamzaException;
import org.apache.samza.container.LocalityManager;
import org.apache.samza.container.TaskName;
import org.apache.samza.job.model.ContainerModel;
@@ -42,6 +44,8 @@ import org.slf4j.LoggerFactory;
* happens to be). No consideration is given towards locality, even distribution
* of aggregate SSPs within a container, even distribution of the number of
* taskNames between containers, etc.
+ *
+ * TODO: SAMZA-1197 - need to modify balance to work with processorId strings
*/
public class GroupByContainerCount implements BalancingTaskNameGrouper {
private static final Logger log = LoggerFactory.getLogger(GroupByContainerCount.class);
@@ -74,7 +78,7 @@ public class GroupByContainerCount implements BalancingTaskNameGrouper {
// Convert to a Set of ContainerModel
Set<ContainerModel> containerModels = new HashSet<>();
for (int i = 0; i < containerCount; i++) {
- containerModels.add(new ContainerModel(i, taskGroups[i]));
+ containerModels.add(new ContainerModel(String.valueOf(i), i, taskGroups[i]));
}
return Collections.unmodifiableSet(containerModels);
@@ -142,7 +146,14 @@ public class GroupByContainerCount implements BalancingTaskNameGrouper {
* if the previous mapping doesn't exist or isn't usable.
*/
private List<TaskGroup> getPreviousContainers(TaskAssignmentManager taskAssignmentManager, int taskCount) {
- Map<String, Integer> taskToContainerId = taskAssignmentManager.readTaskAssignment();
+ Map<String, String> taskToContainerId = taskAssignmentManager.readTaskAssignment();
+ taskToContainerId.values().forEach(id -> {
+ try {
+ int intId = Integer.parseInt(id);
+ } catch (NumberFormatException nfe) {
+ throw new SamzaException("GroupByContainerCount cannot handle non-integer processorIds!", nfe);
+ }
+ });
if (taskToContainerId.isEmpty()) {
log.info("No task assignment map was saved.");
return null;
@@ -178,7 +189,7 @@ public class GroupByContainerCount implements BalancingTaskNameGrouper {
private void saveTaskAssignments(Set<ContainerModel> containers, TaskAssignmentManager taskAssignmentManager) {
for (ContainerModel container : containers) {
for (TaskName taskName : container.getTasks().keySet()) {
- taskAssignmentManager.writeTaskContainerMapping(taskName.getTaskName(), container.getContainerId());
+ taskAssignmentManager.writeTaskContainerMapping(taskName.getTaskName(), container.getProcessorId());
}
}
}
@@ -211,7 +222,7 @@ public class GroupByContainerCount implements BalancingTaskNameGrouper {
private List<TaskGroup> createContainers(int startContainerId, int endContainerId) {
List<TaskGroup> containers = new ArrayList<>(endContainerId - startContainerId);
for (int i = startContainerId; i < endContainerId; i++) {
- TaskGroup taskGroup = new TaskGroup(i, new ArrayList<String>());
+ TaskGroup taskGroup = new TaskGroup(String.valueOf(i), new ArrayList<String>());
containers.add(taskGroup);
}
return containers;
@@ -225,10 +236,11 @@ public class GroupByContainerCount implements BalancingTaskNameGrouper {
* @param taskNamesToAssign the list of tasks to assign to the containers.
* @param containers the containers (as {@link TaskGroup}) to which the tasks will be assigned.
*/
+ // TODO: Change logic from using int arrays to a Map<String, Integer> (id -> taskCount)
private void assignTasksToContainers(int[] taskCountPerContainer, List<String> taskNamesToAssign,
List<TaskGroup> containers) {
for (TaskGroup taskGroup : containers) {
- for (int j = taskGroup.size(); j < taskCountPerContainer[taskGroup.getContainerId()]; j++) {
+ for (int j = taskGroup.size(); j < taskCountPerContainer[Integer.valueOf(taskGroup.getContainerId())]; j++) {
String taskName = taskNamesToAssign.remove(0);
taskGroup.addTaskName(taskName);
log.info("Assigned task {} to container {}", taskName, taskGroup.getContainerId());
@@ -283,7 +295,8 @@ public class GroupByContainerCount implements BalancingTaskNameGrouper {
TaskModel model = taskNameToModel.get(taskName);
containerTaskModels.put(model.getTaskName(), model);
}
- containerModels.add(new ContainerModel(container.containerId, containerTaskModels));
+ containerModels.add(
+ new ContainerModel(container.containerId, Integer.valueOf(container.containerId), containerTaskModels));
}
return Collections.unmodifiableSet(containerModels);
}
@@ -294,14 +307,14 @@ public class GroupByContainerCount implements BalancingTaskNameGrouper {
* @param taskToContainerId a map from each task name to the containerId to which it is assigned.
* @return a list of TaskGroups ordered ascending by containerId.
*/
- private List<TaskGroup> getOrderedContainers(Map<String, Integer> taskToContainerId) {
+ private List<TaskGroup> getOrderedContainers(Map<String, String> taskToContainerId) {
log.debug("Got task to container map: {}", taskToContainerId);
// Group tasks by container Id
- HashMap<Integer, List<String>> containerIdToTaskNames = new HashMap<>();
- for (Map.Entry<String, Integer> entry : taskToContainerId.entrySet()) {
+ HashMap<String, List<String>> containerIdToTaskNames = new HashMap<>();
+ for (Map.Entry<String, String> entry : taskToContainerId.entrySet()) {
String taskName = entry.getKey();
- Integer containerId = entry.getValue();
+ String containerId = entry.getValue();
List<String> taskNames = containerIdToTaskNames.get(containerId);
if (taskNames == null) {
taskNames = new ArrayList<>();
@@ -313,8 +326,8 @@ public class GroupByContainerCount implements BalancingTaskNameGrouper {
// Build container tasks
List<TaskGroup> containerTasks = new ArrayList<>(containerIdToTaskNames.size());
for (int i = 0; i < containerIdToTaskNames.size(); i++) {
- if (containerIdToTaskNames.get(i) == null) throw new IllegalStateException("Task mapping is missing container: " + i);
- containerTasks.add(new TaskGroup(i, containerIdToTaskNames.get(i)));
+ if (containerIdToTaskNames.get(String.valueOf(i)) == null) throw new IllegalStateException("Task mapping is missing container: " + i);
+ containerTasks.add(new TaskGroup(String.valueOf(i), containerIdToTaskNames.get(String.valueOf(i))));
}
return containerTasks;
@@ -327,15 +340,15 @@ public class GroupByContainerCount implements BalancingTaskNameGrouper {
*/
private static class TaskGroup {
private final List<String> taskNames = new LinkedList<>();
- private final Integer containerId;
+ private final String containerId;
- private TaskGroup(Integer containerId, List<String> taskNames) {
+ private TaskGroup(String containerId, List<String> taskNames) {
this.containerId = containerId;
Collections.sort(taskNames); // For consistency because the taskNames came from a Map
this.taskNames.addAll(taskNames);
}
- public Integer getContainerId() {
+ public String getContainerId() {
return containerId;
}
http://git-wip-us.apache.org/repos/asf/samza/blob/a7da1840/samza-core/src/main/java/org/apache/samza/container/grouper/task/GroupByContainerIds.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/container/grouper/task/GroupByContainerIds.java b/samza-core/src/main/java/org/apache/samza/container/grouper/task/GroupByContainerIds.java
index 6d3f673..f2d88cd 100644
--- a/samza-core/src/main/java/org/apache/samza/container/grouper/task/GroupByContainerIds.java
+++ b/samza-core/src/main/java/org/apache/samza/container/grouper/task/GroupByContainerIds.java
@@ -52,14 +52,14 @@ public class GroupByContainerIds implements TaskNameGrouper {
if (startContainerCount > tasks.size())
throw new IllegalArgumentException("number of containers=" + startContainerCount + " is bigger than number of tasks=" + tasks.size());
- List<Integer> containerIds = new ArrayList<>(startContainerCount);
+ List<String> containerIds = new ArrayList<>(startContainerCount);
for (int i = 0; i < startContainerCount; i++) {
- containerIds.add(i);
+ containerIds.add(String.valueOf(i));
}
return group(tasks, containerIds);
}
- public Set<ContainerModel> group(Set<TaskModel> tasks, List<Integer> containersIds) {
+ public Set<ContainerModel> group(Set<TaskModel> tasks, List<String> containersIds) {
if (tasks.isEmpty())
throw new IllegalArgumentException("cannot group an empty set. containersIds=" + Arrays
.toString(containersIds.toArray()));
@@ -89,7 +89,9 @@ public class GroupByContainerIds implements TaskNameGrouper {
// Convert to a Set of ContainerModel
Set<ContainerModel> containerModels = new HashSet<>();
for (int i = 0; i < containerCount; i++) {
- containerModels.add(new ContainerModel(containersIds.get(i), taskGroups[i]));
+ // containerId in ContainerModel constructor is set to -1 because processorId can be any string and does
+ // not have an integer equivalent. So, we set it to -1. After 0.13, this parameter will be removed.
+ containerModels.add(new ContainerModel(containersIds.get(i), -1, taskGroups[i]));
}
return Collections.unmodifiableSet(containerModels);
http://git-wip-us.apache.org/repos/asf/samza/blob/a7da1840/samza-core/src/main/java/org/apache/samza/container/grouper/task/SingleContainerGrouperFactory.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/container/grouper/task/SingleContainerGrouperFactory.java b/samza-core/src/main/java/org/apache/samza/container/grouper/task/SingleContainerGrouperFactory.java
index 980f2a9..15cd224 100644
--- a/samza-core/src/main/java/org/apache/samza/container/grouper/task/SingleContainerGrouperFactory.java
+++ b/samza-core/src/main/java/org/apache/samza/container/grouper/task/SingleContainerGrouperFactory.java
@@ -33,14 +33,14 @@ import java.util.Set;
public class SingleContainerGrouperFactory implements TaskNameGrouperFactory {
@Override
public TaskNameGrouper build(Config config) {
- return new SingleContainerGrouper(config.getInt(JobConfig.PROCESSOR_ID()));
+ return new SingleContainerGrouper(config.get(JobConfig.PROCESSOR_ID()));
}
}
class SingleContainerGrouper implements TaskNameGrouper {
- private final int containerId;
+ private final String containerId;
- SingleContainerGrouper(int containerId) {
+ SingleContainerGrouper(String containerId) {
this.containerId = containerId;
}
@@ -50,7 +50,7 @@ class SingleContainerGrouper implements TaskNameGrouper {
for (TaskModel taskModel: taskModels) {
taskNameTaskModelMap.put(taskModel.getTaskName(), taskModel);
}
- ContainerModel containerModel = new ContainerModel(containerId, taskNameTaskModelMap);
+ ContainerModel containerModel = new ContainerModel(containerId, -1, taskNameTaskModelMap);
return Collections.singleton(containerModel);
}
}
http://git-wip-us.apache.org/repos/asf/samza/blob/a7da1840/samza-core/src/main/java/org/apache/samza/container/grouper/task/TaskAssignmentManager.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/container/grouper/task/TaskAssignmentManager.java b/samza-core/src/main/java/org/apache/samza/container/grouper/task/TaskAssignmentManager.java
index 11207b2..d33a22b 100644
--- a/samza-core/src/main/java/org/apache/samza/container/grouper/task/TaskAssignmentManager.java
+++ b/samza-core/src/main/java/org/apache/samza/container/grouper/task/TaskAssignmentManager.java
@@ -38,7 +38,7 @@ import org.slf4j.LoggerFactory;
* */
public class TaskAssignmentManager extends AbstractCoordinatorStreamManager {
private static final Logger log = LoggerFactory.getLogger(TaskAssignmentManager.class);
- private final Map<String, Integer> taskNameToContainerId = new HashMap<>();
+ private final Map<String, String> taskNameToContainerId = new HashMap<>();
private boolean registered = false;
/**
@@ -70,7 +70,7 @@ public class TaskAssignmentManager extends AbstractCoordinatorStreamManager {
*
* @return the map of taskName: containerId
*/
- public Map<String, Integer> readTaskAssignment() {
+ public Map<String, String> readTaskAssignment() {
taskNameToContainerId.clear();
for (CoordinatorStreamMessage message: getBootstrappedStream(SetTaskContainerMapping.TYPE)) {
if (message.isDelete()) {
@@ -83,7 +83,7 @@ public class TaskAssignmentManager extends AbstractCoordinatorStreamManager {
}
}
- for (Map.Entry<String, Integer> entry : taskNameToContainerId.entrySet()) {
+ for (Map.Entry<String, String> entry : taskNameToContainerId.entrySet()) {
log.debug("Assignment for task \"{}\": {}", entry.getKey(), entry.getValue());
}
@@ -96,8 +96,8 @@ public class TaskAssignmentManager extends AbstractCoordinatorStreamManager {
* @param taskName the task name
* @param containerId the SamzaContainer ID or {@code null} to delete the mapping
*/
- public void writeTaskContainerMapping(String taskName, Integer containerId) {
- Integer existingContainerId = taskNameToContainerId.get(taskName);
+ public void writeTaskContainerMapping(String taskName, String containerId) {
+ String existingContainerId = taskNameToContainerId.get(taskName);
if (existingContainerId != null && !existingContainerId.equals(containerId)) {
log.info("Task \"{}\" moved from container {} to container {}", new Object[]{taskName, existingContainerId, containerId});
} else {
http://git-wip-us.apache.org/repos/asf/samza/blob/a7da1840/samza-core/src/main/java/org/apache/samza/container/grouper/task/TaskNameGrouper.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/container/grouper/task/TaskNameGrouper.java b/samza-core/src/main/java/org/apache/samza/container/grouper/task/TaskNameGrouper.java
index d06bf62..71b80cc 100644
--- a/samza-core/src/main/java/org/apache/samza/container/grouper/task/TaskNameGrouper.java
+++ b/samza-core/src/main/java/org/apache/samza/container/grouper/task/TaskNameGrouper.java
@@ -52,7 +52,7 @@ public interface TaskNameGrouper {
*/
Set<ContainerModel> group(Set<TaskModel> tasks);
- default Set<ContainerModel> group(Set<TaskModel> tasks, List<Integer> containersIds) {
+ default Set<ContainerModel> group(Set<TaskModel> tasks, List<String> containersIds) {
return group(tasks);
}
}
http://git-wip-us.apache.org/repos/asf/samza/blob/a7da1840/samza-core/src/main/java/org/apache/samza/coordinator/JobCoordinator.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/coordinator/JobCoordinator.java b/samza-core/src/main/java/org/apache/samza/coordinator/JobCoordinator.java
index 252e56b..af2ef6a 100644
--- a/samza-core/src/main/java/org/apache/samza/coordinator/JobCoordinator.java
+++ b/samza-core/src/main/java/org/apache/samza/coordinator/JobCoordinator.java
@@ -23,9 +23,13 @@ import org.apache.samza.job.model.JobModel;
/**
* A JobCoordinator is a pluggable module in each process that provides the JobModel and the ID to the StreamProcessor.
- * In some cases, ID assignment is completely config driven, while in other cases, ID assignment may require
- * coordination with JobCoordinators of other StreamProcessors.
- * */
+ *
+ * It is the responsibility of the JobCoordinator to assign a unique identifier to the StreamProcessor
+ * based on the underlying environment. In some cases, ID assignment is completely config driven, while in other
+ * cases, ID assignment may require coordination with JobCoordinators of other StreamProcessors.
+ *
+ * This interface contains methods required for the StreamProcessor to interact with JobCoordinator.
+ */
@InterfaceStability.Evolving
public interface JobCoordinator {
/**
@@ -55,12 +59,16 @@ public interface JobCoordinator {
* @throws InterruptedException if the current thread is interrupted while waiting for the JobCoordinator to start-up
*/
boolean awaitStart(long timeoutMs) throws InterruptedException;
+
/**
- * Returns the logical ID assigned to the processor
- * It is up to the user to ensure that different instances of StreamProcessor within a job have unique processor ID.
- * @return integer representing the logical processor ID
+ * Returns the identifier assigned to the processor that is local to the instance of StreamProcessor.
+ *
+ * The semantics and format of the identifier returned should adhere to the specification defined in
+ * {@link org.apache.samza.runtime.ProcessorIdGenerator}
+ *
+ * @return String representing a unique logical processor ID
*/
- int getProcessorId();
+ String getProcessorId();
/**
* Returns the current JobModel
http://git-wip-us.apache.org/repos/asf/samza/blob/a7da1840/samza-core/src/main/java/org/apache/samza/coordinator/JobCoordinatorFactory.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/coordinator/JobCoordinatorFactory.java b/samza-core/src/main/java/org/apache/samza/coordinator/JobCoordinatorFactory.java
index d15bce1..8553f59 100644
--- a/samza-core/src/main/java/org/apache/samza/coordinator/JobCoordinatorFactory.java
+++ b/samza-core/src/main/java/org/apache/samza/coordinator/JobCoordinatorFactory.java
@@ -26,11 +26,10 @@ import org.apache.samza.processor.SamzaContainerController;
@InterfaceStability.Evolving
public interface JobCoordinatorFactory {
/**
- * @param processorId Unique identifier for the processor
* @param config Configs relevant for the JobCoordinator TODO: Separate JC related configs into a "JobCoordinatorConfig"
* @param containerController Controller interface for starting and stopping container. In future, it may simply
* pause the container and add/remove tasks
* @return An instance of IJobCoordinator
*/
- JobCoordinator getJobCoordinator(int processorId, Config config, SamzaContainerController containerController);
+ JobCoordinator getJobCoordinator(Config config, SamzaContainerController containerController);
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/samza/blob/a7da1840/samza-core/src/main/java/org/apache/samza/coordinator/stream/messages/SetTaskContainerMapping.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/coordinator/stream/messages/SetTaskContainerMapping.java b/samza-core/src/main/java/org/apache/samza/coordinator/stream/messages/SetTaskContainerMapping.java
index 431c05d..f8d4d43 100644
--- a/samza-core/src/main/java/org/apache/samza/coordinator/stream/messages/SetTaskContainerMapping.java
+++ b/samza-core/src/main/java/org/apache/samza/coordinator/stream/messages/SetTaskContainerMapping.java
@@ -43,8 +43,9 @@ public class SetTaskContainerMapping extends CoordinatorStreamMessage {
public static final String TYPE = "set-task-container-assignment";
public static final String CONTAINER_KEY = "containerId";
+
/**
- * SteContainerToHostMapping is used to set the container to host mapping information.
+ * SetContainerToHostMapping is used to set the container to host mapping information.
* @param message which holds the container to host information.
*/
public SetTaskContainerMapping(CoordinatorStreamMessage message) {
@@ -64,8 +65,8 @@ public class SetTaskContainerMapping extends CoordinatorStreamMessage {
putMessageValue(CONTAINER_KEY, containerId);
}
- public Integer getTaskAssignment() {
- return Integer.parseInt(getMessageValue(CONTAINER_KEY));
+ public String getTaskAssignment() {
+ return getMessageValue(CONTAINER_KEY);
}
http://git-wip-us.apache.org/repos/asf/samza/blob/a7da1840/samza-core/src/main/java/org/apache/samza/job/model/ContainerModel.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/job/model/ContainerModel.java b/samza-core/src/main/java/org/apache/samza/job/model/ContainerModel.java
index ed721b1..bd4fa94 100644
--- a/samza-core/src/main/java/org/apache/samza/job/model/ContainerModel.java
+++ b/samza-core/src/main/java/org/apache/samza/job/model/ContainerModel.java
@@ -19,9 +19,10 @@
package org.apache.samza.job.model;
+import org.apache.samza.container.TaskName;
+
import java.util.Collections;
import java.util.Map;
-import org.apache.samza.container.TaskName;
/**
* <p>
@@ -35,34 +36,49 @@ import org.apache.samza.container.TaskName;
* containers have tasks. Each data model contains relevant information, such as
* an id, partition information, etc.
* </p>
+ * <p>
+ * <b>Note</b>: This class has a natural ordering that is inconsistent with equals.
+ * </p>
*/
-public class ContainerModel implements Comparable<ContainerModel> {
+public class ContainerModel {
+ @Deprecated
private final int containerId;
+ private final String processorId;
private final Map<TaskName, TaskModel> tasks;
- public ContainerModel(int containerId, Map<TaskName, TaskModel> tasks) {
+ public ContainerModel(String processorId, int containerId, Map<TaskName, TaskModel> tasks) {
this.containerId = containerId;
+ if (processorId == null) {
+ this.processorId = String.valueOf(containerId);
+ } else {
+ this.processorId = processorId;
+ }
this.tasks = Collections.unmodifiableMap(tasks);
}
+ @Deprecated
public int getContainerId() {
return containerId;
}
+ public String getProcessorId() {
+ return processorId;
+ }
+
public Map<TaskName, TaskModel> getTasks() {
return tasks;
}
@Override
public String toString() {
- return "ContainerModel [containerId=" + containerId + ", tasks=" + tasks + "]";
+ return "ContainerModel [processorId=" + processorId + ", tasks=" + tasks + "]";
}
@Override
public int hashCode() {
final int prime = 31;
int result = 1;
- result = prime * result + containerId;
+ result = prime * result + ((processorId == null) ? 0 : processorId.hashCode());
result = prime * result + ((tasks == null) ? 0 : tasks.hashCode());
return result;
}
@@ -76,7 +92,7 @@ public class ContainerModel implements Comparable<ContainerModel> {
if (getClass() != obj.getClass())
return false;
ContainerModel other = (ContainerModel) obj;
- if (containerId != other.containerId)
+ if (!processorId.equals(other.processorId))
return false;
if (tasks == null) {
if (other.tasks != null)
@@ -86,7 +102,4 @@ public class ContainerModel implements Comparable<ContainerModel> {
return true;
}
- public int compareTo(ContainerModel other) {
- return containerId - other.getContainerId();
- }
}
http://git-wip-us.apache.org/repos/asf/samza/blob/a7da1840/samza-core/src/main/java/org/apache/samza/job/model/JobModel.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/job/model/JobModel.java b/samza-core/src/main/java/org/apache/samza/job/model/JobModel.java
index dbd6dcc..dbb3867 100644
--- a/samza-core/src/main/java/org/apache/samza/job/model/JobModel.java
+++ b/samza-core/src/main/java/org/apache/samza/job/model/JobModel.java
@@ -41,24 +41,24 @@ import org.apache.samza.coordinator.stream.messages.SetContainerHostMapping;
public class JobModel {
private static final String EMPTY_STRING = "";
private final Config config;
- private final Map<Integer, ContainerModel> containers;
+ private final Map<String, ContainerModel> containers;
private final LocalityManager localityManager;
- private Map<Integer, String> localityMappings = new HashMap<Integer, String>();
+ private Map<String, String> localityMappings = new HashMap<String, String>();
public int maxChangeLogStreamPartitions;
- public JobModel(Config config, Map<Integer, ContainerModel> containers) {
+ public JobModel(Config config, Map<String, ContainerModel> containers) {
this(config, containers, null);
}
- public JobModel(Config config, Map<Integer, ContainerModel> containers, LocalityManager localityManager) {
+ public JobModel(Config config, Map<String, ContainerModel> containers, LocalityManager localityManager) {
this.config = config;
this.containers = Collections.unmodifiableMap(containers);
this.localityManager = localityManager;
if (localityManager == null) {
- for (Integer containerId : containers.keySet()) {
+ for (String containerId : containers.keySet()) {
localityMappings.put(containerId, null);
}
} else {
@@ -89,7 +89,7 @@ public class JobModel {
* @param key mapping key which is one of the keys declared in {@link org.apache.samza.coordinator.stream.messages.SetContainerHostMapping}
* @return the value if it exists for a given container and key, otherwise an empty string
*/
- public String getContainerToHostValue(Integer containerId, String key) {
+ public String getContainerToHostValue(String containerId, String key) {
if (localityManager == null) {
return EMPTY_STRING;
}
@@ -103,12 +103,12 @@ public class JobModel {
return mappings.get(key);
}
- public Map<Integer, String> getAllContainerToHostValues(String key) {
+ public Map<String, String> getAllContainerToHostValues(String key) {
if (localityManager == null) {
return Collections.EMPTY_MAP;
}
- Map<Integer, String> allValues = new HashMap<>();
- for (Map.Entry<Integer, Map<String, String>> entry : localityManager.readContainerLocality().entrySet()) {
+ Map<String, String> allValues = new HashMap<>();
+ for (Map.Entry<String, Map<String, String>> entry : localityManager.readContainerLocality().entrySet()) {
String value = entry.getValue().get(key);
if (value != null) {
allValues.put(entry.getKey(), value);
@@ -118,8 +118,8 @@ public class JobModel {
}
private void populateContainerLocalityMappings() {
- Map<Integer, Map<String, String>> allMappings = localityManager.readContainerLocality();
- for (Integer containerId: containers.keySet()) {
+ Map<String, Map<String, String>> allMappings = localityManager.readContainerLocality();
+ for (String containerId: containers.keySet()) {
if (allMappings.containsKey(containerId)) {
localityMappings.put(containerId, allMappings.get(containerId).get(SetContainerHostMapping.HOST_KEY));
} else {
@@ -128,14 +128,14 @@ public class JobModel {
}
}
- public Map<Integer, String> getAllContainerLocality() {
+ public Map<String, String> getAllContainerLocality() {
if (localityManager != null) {
populateContainerLocalityMappings();
}
return localityMappings;
}
- public Map<Integer, ContainerModel> getContainers() {
+ public Map<String, ContainerModel> getContainers() {
return containers;
}
http://git-wip-us.apache.org/repos/asf/samza/blob/a7da1840/samza-core/src/main/java/org/apache/samza/processor/SamzaContainerController.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/processor/SamzaContainerController.java b/samza-core/src/main/java/org/apache/samza/processor/SamzaContainerController.java
index 76e2053..c292067 100644
--- a/samza-core/src/main/java/org/apache/samza/processor/SamzaContainerController.java
+++ b/samza-core/src/main/java/org/apache/samza/processor/SamzaContainerController.java
@@ -44,7 +44,7 @@ import java.util.concurrent.TimeoutException;
public class SamzaContainerController {
private static final Logger log = LoggerFactory.getLogger(SamzaContainerController.class);
- private final ExecutorService executorService;
+ private ExecutorService executorService;
private volatile SamzaContainer container;
private final Map<String, MetricsReporter> metricsReporterMap;
private final Object taskFactory;
@@ -60,16 +60,12 @@ public class SamzaContainerController {
* @param taskFactory Factory that be used create instances of {@link org.apache.samza.task.StreamTask} or
* {@link org.apache.samza.task.AsyncStreamTask}
* @param containerShutdownMs How long the Samza container should wait for an orderly shutdown of task instances
- * @param processorId Id of the processor
* @param metricsReporterMap Map of metric reporter name and {@link MetricsReporter} instance
*/
public SamzaContainerController(
Object taskFactory,
long containerShutdownMs,
- String processorId,
Map<String, MetricsReporter> metricsReporterMap) {
- this.executorService = Executors.newSingleThreadExecutor(new ThreadFactoryBuilder()
- .setNameFormat("p" + processorId + "-container-thread-%d").build());
this.taskFactory = taskFactory;
this.metricsReporterMap = metricsReporterMap;
if (containerShutdownMs == -1) {
@@ -94,11 +90,11 @@ public class SamzaContainerController {
public void startContainer(ContainerModel containerModel, Config config, int maxChangelogStreamPartitions) {
LocalityManager localityManager = null;
if (new ClusterManagerConfig(config).getHostAffinityEnabled()) {
- localityManager = SamzaContainer$.MODULE$.getLocalityManager(containerModel.getContainerId(), config);
+ localityManager = SamzaContainer$.MODULE$.getLocalityManager(containerModel.getProcessorId(), config);
}
- log.info("About to create container: " + containerModel.getContainerId());
+ log.info("About to create container: " + containerModel.getProcessorId());
container = SamzaContainer$.MODULE$.apply(
- containerModel.getContainerId(),
+ containerModel.getProcessorId(),
containerModel,
config,
maxChangelogStreamPartitions,
@@ -106,7 +102,9 @@ public class SamzaContainerController {
new JmxServer(),
Util.<String, MetricsReporter>javaMapAsScalaMap(metricsReporterMap),
taskFactory);
- log.info("About to start container: " + containerModel.getContainerId());
+ log.info("About to start container: " + containerModel.getProcessorId());
+ executorService = Executors.newSingleThreadExecutor(new ThreadFactoryBuilder()
+ .setNameFormat("p-" + containerModel.getProcessorId() + "-container-thread-%d").build());
containerFuture = executorService.submit(() -> container.run());
}
@@ -148,6 +146,8 @@ public class SamzaContainerController {
*/
public void shutdown() {
stopContainer();
- executorService.shutdown();
+ if (executorService != null) {
+ executorService.shutdown();
+ }
}
}
http://git-wip-us.apache.org/repos/asf/samza/blob/a7da1840/samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java b/samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java
index a39c3b9..5a8673a 100644
--- a/samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java
+++ b/samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java
@@ -19,9 +19,10 @@
package org.apache.samza.processor;
import org.apache.samza.annotation.InterfaceStability;
+import org.apache.samza.config.ApplicationConfig;
import org.apache.samza.config.Config;
+import org.apache.samza.config.ConfigException;
import org.apache.samza.config.JobCoordinatorConfig;
-import org.apache.samza.config.MapConfig;
import org.apache.samza.config.TaskConfigJava;
import org.apache.samza.coordinator.JobCoordinator;
import org.apache.samza.coordinator.JobCoordinatorFactory;
@@ -29,10 +30,7 @@ import org.apache.samza.metrics.MetricsReporter;
import org.apache.samza.task.AsyncStreamTaskFactory;
import org.apache.samza.task.StreamTaskFactory;
import org.apache.samza.util.Util;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import java.util.HashMap;
import java.util.Map;
/**
@@ -60,17 +58,6 @@ import java.util.Map;
*/
@InterfaceStability.Evolving
public class StreamProcessor {
- private static final Logger log = LoggerFactory.getLogger(StreamProcessor.class);
- /**
- * processor.id is equivalent to containerId in samza. It is a logical identifier used by Samza for a processor.
- * In a distributed environment, this logical identifier is mapped to a physical identifier of the resource. For
- * example, Yarn provides a "containerId" for every resource it allocates.
- * In an embedded environment, this identifier is provided by the user by directly using the StreamProcessor API.
- * <p>
- * <b>Note:</b>This identifier has to be unique across the instances of StreamProcessors.
- */
- private static final String PROCESSOR_ID = "processor.id";
- private final int processorId;
private final JobCoordinator jobCoordinator;
/**
@@ -83,51 +70,49 @@ public class StreamProcessor {
* <p>
* <b>Note:</b> Lifecycle of the ExecutorService is fully managed by the StreamProcessor, and NOT exposed to the user
*
- * @param processorId Unique identifier for a processor within the job. It has the same semantics as
- * "containerId" in Samza
* @param config Instance of config object - contains all configuration required for processing
* @param customMetricsReporters Map of custom MetricReporter instances that are to be injected in the Samza job
* @param asyncStreamTaskFactory The {@link AsyncStreamTaskFactory} to be used for creating task instances.
*/
- public StreamProcessor(int processorId, Config config, Map<String, MetricsReporter> customMetricsReporters,
+ public StreamProcessor(Config config, Map<String, MetricsReporter> customMetricsReporters,
AsyncStreamTaskFactory asyncStreamTaskFactory) {
- this(processorId, config, customMetricsReporters, (Object) asyncStreamTaskFactory);
+ this(config, customMetricsReporters, (Object) asyncStreamTaskFactory);
}
/**
- *Same as {@link #StreamProcessor(int, Config, Map, AsyncStreamTaskFactory)}, except task instances are created
+ *Same as {@link #StreamProcessor(Config, Map, AsyncStreamTaskFactory)}, except task instances are created
* using the provided {@link StreamTaskFactory}.
- * @param processorId - this processor Id
* @param config - config
* @param customMetricsReporters metric Reporter
* @param streamTaskFactory task factory to instantiate the Task
*/
- public StreamProcessor(int processorId, Config config, Map<String, MetricsReporter> customMetricsReporters,
+ public StreamProcessor(Config config, Map<String, MetricsReporter> customMetricsReporters,
StreamTaskFactory streamTaskFactory) {
- this(processorId, config, customMetricsReporters, (Object) streamTaskFactory);
+ this(config, customMetricsReporters, (Object) streamTaskFactory);
}
- private StreamProcessor(int processorId, Config config, Map<String, MetricsReporter> customMetricsReporters,
+ private StreamProcessor(Config config, Map<String, MetricsReporter> customMetricsReporters,
Object taskFactory) {
- this.processorId = processorId;
-
- Map<String, String> updatedConfigMap = new HashMap<>();
- updatedConfigMap.putAll(config);
- updatedConfigMap.put(PROCESSOR_ID, String.valueOf(this.processorId));
- Config updatedConfig = new MapConfig(updatedConfigMap);
+ // TODO: This check to be removed after 0.13+
+ ApplicationConfig applicationConfig = new ApplicationConfig(config);
+ if (applicationConfig.getProcessorId() == null &&
+ applicationConfig.getAppProcessorIdGeneratorClass() == null) {
+ throw new ConfigException(
+ String.format("Expected either %s or %s to be configured", ApplicationConfig.PROCESSOR_ID,
+ ApplicationConfig.APP_PROCESSOR_ID_GENERATOR_CLASS));
+ }
SamzaContainerController containerController = new SamzaContainerController(
taskFactory,
- new TaskConfigJava(updatedConfig).getShutdownMs(),
- String.valueOf(processorId),
+ new TaskConfigJava(config).getShutdownMs(),
customMetricsReporters);
this.jobCoordinator = Util.
<JobCoordinatorFactory>getObj(
- new JobCoordinatorConfig(updatedConfig)
+ new JobCoordinatorConfig(config)
.getJobCoordinatorFactoryClassName())
- .getJobCoordinator(processorId, updatedConfig, containerController);
+ .getJobCoordinator(config, containerController);
}
/**
http://git-wip-us.apache.org/repos/asf/samza/blob/a7da1840/samza-core/src/main/java/org/apache/samza/runtime/LocalContainerRunner.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/runtime/LocalContainerRunner.java b/samza-core/src/main/java/org/apache/samza/runtime/LocalContainerRunner.java
index 49c3228..d790fb1 100644
--- a/samza-core/src/main/java/org/apache/samza/runtime/LocalContainerRunner.java
+++ b/samza-core/src/main/java/org/apache/samza/runtime/LocalContainerRunner.java
@@ -52,9 +52,9 @@ import org.slf4j.LoggerFactory;
public class LocalContainerRunner extends AbstractApplicationRunner {
private static final Logger log = LoggerFactory.getLogger(LocalContainerRunner.class);
private final JobModel jobModel;
- private final int containerId;
+ private final String containerId;
- public LocalContainerRunner(JobModel jobModel, int containerId) {
+ public LocalContainerRunner(JobModel jobModel, String containerId) {
super(jobModel.getConfig());
this.jobModel = jobModel;
this.containerId = containerId;
@@ -69,13 +69,13 @@ public class LocalContainerRunner extends AbstractApplicationRunner {
Object taskFactory = TaskFactoryUtil.createTaskFactory(config, streamApp, this);
SamzaContainer container = SamzaContainer$.MODULE$.apply(
- containerModel.getContainerId(),
+ containerModel.getProcessorId(),
containerModel,
config,
jobModel.maxChangeLogStreamPartitions,
SamzaContainer.getLocalityManager(containerId, config),
jmxServer,
- Util.javaMapAsScalaMap(new HashMap<String, MetricsReporter>()),
+ Util.<String, MetricsReporter>javaMapAsScalaMap(new HashMap<>()),
taskFactory);
container.run();
@@ -104,7 +104,7 @@ public class LocalContainerRunner extends AbstractApplicationRunner {
System.exit(1);
});
- Integer containerId = Integer.valueOf(System.getenv(ShellCommandConfig.ENV_CONTAINER_ID()));
+ String containerId = System.getenv(ShellCommandConfig.ENV_CONTAINER_ID());
log.info(String.format("Got container ID: %d", containerId));
String coordinatorUrl = System.getenv(ShellCommandConfig.ENV_COORDINATOR_URL());
log.info(String.format("Got coordinator URL: %s", coordinatorUrl));
http://git-wip-us.apache.org/repos/asf/samza/blob/a7da1840/samza-core/src/main/java/org/apache/samza/runtime/UUIDGenerator.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/runtime/UUIDGenerator.java b/samza-core/src/main/java/org/apache/samza/runtime/UUIDGenerator.java
new file mode 100644
index 0000000..afc20b1
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/runtime/UUIDGenerator.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.runtime;
+
+import org.apache.samza.config.Config;
+
+import java.util.UUID;
+
+public class UUIDGenerator implements ProcessorIdGenerator {
+ /**
+ * Generates a String representation to identify the processor instance
+ * This value can be representative of its current executing environment. It can also be custom-managed by the user.
+ * <p>
+ * <b>Note</b>: When more than one processor exist within the same JVM, there is no need to use a static counter in
+ * this generator to adhere to the "$x_$y" format specified in {@link ProcessorIdGenerator} since each UUID is already
+ * unique by itself
+ *
+ * @param config Config instance
+ * @return String Identifier for the processor
+ */
+ @Override
+ public String generateProcessorId(Config config) {
+ return UUID.randomUUID().toString();
+ }
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/a7da1840/samza-core/src/main/java/org/apache/samza/serializers/model/JsonContainerModelMixIn.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/serializers/model/JsonContainerModelMixIn.java b/samza-core/src/main/java/org/apache/samza/serializers/model/JsonContainerModelMixIn.java
index f197a95..e19afec 100644
--- a/samza-core/src/main/java/org/apache/samza/serializers/model/JsonContainerModelMixIn.java
+++ b/samza-core/src/main/java/org/apache/samza/serializers/model/JsonContainerModelMixIn.java
@@ -19,23 +19,28 @@
package org.apache.samza.serializers.model;
-import java.util.Map;
import org.apache.samza.container.TaskName;
import org.apache.samza.job.model.TaskModel;
import org.codehaus.jackson.annotate.JsonCreator;
import org.codehaus.jackson.annotate.JsonProperty;
+import java.util.Map;
+
/**
* A mix-in Jackson class to convert Samza's ContainerModel to/from JSON.
*/
public abstract class JsonContainerModelMixIn {
@JsonCreator
- public JsonContainerModelMixIn(@JsonProperty("container-id") int containerId, @JsonProperty("tasks") Map<TaskName, TaskModel> tasks) {
+ public JsonContainerModelMixIn(@JsonProperty("processor-id") String processorId, @JsonProperty("tasks") Map<TaskName, TaskModel> tasks) {
}
+ @Deprecated
@JsonProperty("container-id")
abstract int getContainerId();
+ @JsonProperty("processor-id")
+ abstract String getProcessorId();
+
@JsonProperty("tasks")
abstract Map<TaskName, TaskModel> getTasks();
}
http://git-wip-us.apache.org/repos/asf/samza/blob/a7da1840/samza-core/src/main/java/org/apache/samza/serializers/model/JsonJobModelMixIn.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/serializers/model/JsonJobModelMixIn.java b/samza-core/src/main/java/org/apache/samza/serializers/model/JsonJobModelMixIn.java
index 037b5e2..4b0c404 100644
--- a/samza-core/src/main/java/org/apache/samza/serializers/model/JsonJobModelMixIn.java
+++ b/samza-core/src/main/java/org/apache/samza/serializers/model/JsonJobModelMixIn.java
@@ -30,12 +30,12 @@ import org.codehaus.jackson.annotate.JsonProperty;
*/
public abstract class JsonJobModelMixIn {
@JsonCreator
- public JsonJobModelMixIn(@JsonProperty("config") Config config, @JsonProperty("containers") Map<Integer, ContainerModel> containers) {
+ public JsonJobModelMixIn(@JsonProperty("config") Config config, @JsonProperty("containers") Map<String, ContainerModel> containers) {
}
@JsonProperty("config")
abstract Config getConfig();
@JsonProperty("containers")
- abstract Map<Integer, ContainerModel> getContainers();
+ abstract Map<String, ContainerModel> getContainers();
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/samza/blob/a7da1840/samza-core/src/main/java/org/apache/samza/serializers/model/SamzaObjectMapper.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/serializers/model/SamzaObjectMapper.java b/samza-core/src/main/java/org/apache/samza/serializers/model/SamzaObjectMapper.java
index 83e6b8c..f8c4d43 100644
--- a/samza-core/src/main/java/org/apache/samza/serializers/model/SamzaObjectMapper.java
+++ b/samza-core/src/main/java/org/apache/samza/serializers/model/SamzaObjectMapper.java
@@ -19,10 +19,8 @@
package org.apache.samza.serializers.model;
-import java.io.IOException;
-import java.util.HashMap;
-import java.util.Map;
import org.apache.samza.Partition;
+import org.apache.samza.SamzaException;
import org.apache.samza.config.Config;
import org.apache.samza.config.MapConfig;
import org.apache.samza.container.TaskName;
@@ -50,6 +48,10 @@ import org.codehaus.jackson.map.introspect.AnnotatedMethod;
import org.codehaus.jackson.map.module.SimpleModule;
import org.codehaus.jackson.type.TypeReference;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
/**
* <p>
* A collection of utility classes and (de)serializers to make Samza's job model
@@ -89,10 +91,30 @@ public class SamzaObjectMapper {
mapper.getSerializationConfig().addMixInAnnotations(TaskModel.class, JsonTaskModelMixIn.class);
mapper.getDeserializationConfig().addMixInAnnotations(TaskModel.class, JsonTaskModelMixIn.class);
mapper.getSerializationConfig().addMixInAnnotations(ContainerModel.class, JsonContainerModelMixIn.class);
- mapper.getDeserializationConfig().addMixInAnnotations(ContainerModel.class, JsonContainerModelMixIn.class);
mapper.getSerializationConfig().addMixInAnnotations(JobModel.class, JsonJobModelMixIn.class);
mapper.getDeserializationConfig().addMixInAnnotations(JobModel.class, JsonJobModelMixIn.class);
+ module.addDeserializer(ContainerModel.class, new JsonDeserializer<ContainerModel>() {
+ @Override
+ public ContainerModel deserialize(JsonParser jp, DeserializationContext ctxt) throws IOException, JsonProcessingException {
+ ObjectCodec oc = jp.getCodec();
+ JsonNode node = oc.readTree(jp);
+ int containerId = node.get("container-id").getIntValue();
+ if (node.get("container-id") == null) {
+ throw new SamzaException("JobModel did not contain a container-id. This can never happen. JobModel corrupt!");
+ }
+ String processorId;
+ if (node.get("processor-id") == null) {
+ processorId = String.valueOf(containerId);
+ } else {
+ processorId = node.get("processor-id").getTextValue();
+ }
+ Map<TaskName, TaskModel> tasksMapping =
+ OBJECT_MAPPER.readValue(node.get("tasks"), new TypeReference<Map<TaskName, TaskModel>>() { });
+ return new ContainerModel(processorId, containerId, tasksMapping);
+ }
+ });
+
// Convert camel case to hyphenated field names, and register the module.
mapper.setPropertyNamingStrategy(new CamelCaseToDashesStrategy());
mapper.registerModule(module);
http://git-wip-us.apache.org/repos/asf/samza/blob/a7da1840/samza-core/src/main/java/org/apache/samza/standalone/StandaloneJobCoordinator.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/standalone/StandaloneJobCoordinator.java b/samza-core/src/main/java/org/apache/samza/standalone/StandaloneJobCoordinator.java
index b2927f4..7efc6df 100644
--- a/samza-core/src/main/java/org/apache/samza/standalone/StandaloneJobCoordinator.java
+++ b/samza-core/src/main/java/org/apache/samza/standalone/StandaloneJobCoordinator.java
@@ -19,22 +19,25 @@
package org.apache.samza.standalone;
import com.google.common.annotations.VisibleForTesting;
-import java.util.Collections;
import org.apache.samza.SamzaException;
+import org.apache.samza.config.ApplicationConfig;
import org.apache.samza.config.Config;
import org.apache.samza.config.JavaSystemConfig;
import org.apache.samza.coordinator.JobCoordinator;
import org.apache.samza.coordinator.JobModelManager;
import org.apache.samza.job.model.JobModel;
import org.apache.samza.processor.SamzaContainerController;
+import org.apache.samza.runtime.ProcessorIdGenerator;
import org.apache.samza.system.StreamMetadataCache;
import org.apache.samza.system.SystemAdmin;
import org.apache.samza.system.SystemFactory;
+import org.apache.samza.util.ClassLoaderHelper;
import org.apache.samza.util.SystemClock;
import org.apache.samza.util.Util;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
@@ -63,28 +66,37 @@ import java.util.Map;
* */
public class StandaloneJobCoordinator implements JobCoordinator {
private static final Logger log = LoggerFactory.getLogger(StandaloneJobCoordinator.class);
- private final int processorId;
+ private final String processorId;
private final Config config;
private final JobModel jobModel;
private final SamzaContainerController containerController;
@VisibleForTesting
StandaloneJobCoordinator(
- int processorId,
+ ProcessorIdGenerator processorIdGenerator,
Config config,
SamzaContainerController containerController,
JobModel jobModel) {
- this.processorId = processorId;
+ this.processorId = processorIdGenerator.generateProcessorId(config);
this.config = config;
this.containerController = containerController;
this.jobModel = jobModel;
}
- public StandaloneJobCoordinator(int processorId, Config config, SamzaContainerController containerController) {
- this.processorId = processorId;
+ public StandaloneJobCoordinator(Config config, SamzaContainerController containerController) {
this.config = config;
this.containerController = containerController;
+ ApplicationConfig appConfig = new ApplicationConfig(config);
+ if (appConfig.getProcessorId() != null) { // TODO: This check to be removed after 0.13+
+ this.processorId = appConfig.getProcessorId();
+ } else {
+ ProcessorIdGenerator idGenerator =
+ ClassLoaderHelper.fromClassName(
+ new ApplicationConfig(config).getAppProcessorIdGeneratorClass(), ProcessorIdGenerator.class);
+ this.processorId = idGenerator.generateProcessorId(config);
+ }
+
JavaSystemConfig systemConfig = new JavaSystemConfig(this.config);
Map<String, SystemAdmin> systemAdmins = new HashMap<>();
for (String systemName: systemConfig.getSystemNames()) {
@@ -113,7 +125,7 @@ public class StandaloneJobCoordinator implements JobCoordinator {
// No-op
JobModel jobModel = getJobModel();
containerController.startContainer(
- jobModel.getContainers().get(processorId),
+ jobModel.getContainers().get(getProcessorId()),
jobModel.getConfig(),
jobModel.maxChangeLogStreamPartitions);
}
@@ -137,8 +149,8 @@ public class StandaloneJobCoordinator implements JobCoordinator {
}
@Override
- public int getProcessorId() {
- return this.processorId;
+ public String getProcessorId() {
+ return processorId;
}
@Override
http://git-wip-us.apache.org/repos/asf/samza/blob/a7da1840/samza-core/src/main/java/org/apache/samza/standalone/StandaloneJobCoordinatorFactory.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/standalone/StandaloneJobCoordinatorFactory.java b/samza-core/src/main/java/org/apache/samza/standalone/StandaloneJobCoordinatorFactory.java
index 7ca85c0..eada6e9 100644
--- a/samza-core/src/main/java/org/apache/samza/standalone/StandaloneJobCoordinatorFactory.java
+++ b/samza-core/src/main/java/org/apache/samza/standalone/StandaloneJobCoordinatorFactory.java
@@ -25,7 +25,7 @@ import org.apache.samza.processor.SamzaContainerController;
public class StandaloneJobCoordinatorFactory implements JobCoordinatorFactory {
@Override
- public JobCoordinator getJobCoordinator(int processorId, Config config, SamzaContainerController containerController) {
- return new StandaloneJobCoordinator(processorId, config, containerController);
+ public JobCoordinator getJobCoordinator(Config config, SamzaContainerController containerController) {
+ return new StandaloneJobCoordinator(config, containerController);
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/samza/blob/a7da1840/samza-core/src/main/java/org/apache/samza/storage/StorageRecovery.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/storage/StorageRecovery.java b/samza-core/src/main/java/org/apache/samza/storage/StorageRecovery.java
index 9471a23..e50f221 100644
--- a/samza-core/src/main/java/org/apache/samza/storage/StorageRecovery.java
+++ b/samza-core/src/main/java/org/apache/samza/storage/StorageRecovery.java
@@ -65,7 +65,7 @@ public class StorageRecovery extends CommandLine {
private HashMap<String, StorageEngineFactory<?, ?>> storageEngineFactories = new HashMap<String, StorageEngineFactory<?, ?>>();
private HashMap<String, SystemFactory> systemFactories = new HashMap<String, SystemFactory>();
private HashMap<String, SystemAdmin> systemAdmins = new HashMap<String, SystemAdmin>();
- private Map<Integer, ContainerModel> containers = new HashMap<Integer, ContainerModel>();
+ private Map<String, ContainerModel> containers = new HashMap<String, ContainerModel>();
private List<TaskStorageManager> taskStorageManagers = new ArrayList<TaskStorageManager>();
private Logger log = LoggerFactory.getLogger(StorageRecovery.class);
@@ -211,7 +211,7 @@ public class StorageRecovery extends CommandLine {
for (ContainerModel containerModel : containers.values()) {
HashMap<String, StorageEngine> taskStores = new HashMap<String, StorageEngine>();
- SamzaContainerContext containerContext = new SamzaContainerContext(containerModel.getContainerId(), jobConfig, containerModel.getTasks()
+ SamzaContainerContext containerContext = new SamzaContainerContext(containerModel.getProcessorId(), jobConfig, containerModel.getTasks()
.keySet());
for (TaskModel taskModel : containerModel.getTasks().values()) {
http://git-wip-us.apache.org/repos/asf/samza/blob/a7da1840/samza-core/src/main/java/org/apache/samza/util/ClassLoaderHelper.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/util/ClassLoaderHelper.java b/samza-core/src/main/java/org/apache/samza/util/ClassLoaderHelper.java
index f2b389b..3680b4f 100644
--- a/samza-core/src/main/java/org/apache/samza/util/ClassLoaderHelper.java
+++ b/samza-core/src/main/java/org/apache/samza/util/ClassLoaderHelper.java
@@ -19,6 +19,10 @@
package org.apache.samza.util;
+import org.apache.samza.config.ConfigException;
+
+import java.lang.reflect.Constructor;
+
public class ClassLoaderHelper {
public static <T> T fromClassName(String className) throws ClassNotFoundException, InstantiationException, IllegalAccessException {
@@ -26,4 +30,19 @@ public class ClassLoaderHelper {
T instance = clazz.newInstance();
return instance;
}
+
+ public static <T> T fromClassName(String className, Class<T> classType) {
+ try {
+ Class<?> idGeneratorClass = Class.forName(className);
+ if (!classType.isAssignableFrom(idGeneratorClass)) {
+ throw new ConfigException(String.format(
+ "Class %s is not of type %s", className, classType));
+ }
+ Constructor<?> constructor = idGeneratorClass.getConstructor();
+ return (T) constructor.newInstance();
+ } catch (Exception e) {
+ throw new ConfigException(String.format(
+ "Problem in loading %s class %s", classType, className), e);
+ }
+ }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/samza/blob/a7da1840/samza-core/src/main/java/org/apache/samza/zk/ZkCoordinationServiceFactory.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkCoordinationServiceFactory.java b/samza-core/src/main/java/org/apache/samza/zk/ZkCoordinationServiceFactory.java
index cc454e3..21a6b03 100644
--- a/samza-core/src/main/java/org/apache/samza/zk/ZkCoordinationServiceFactory.java
+++ b/samza-core/src/main/java/org/apache/samza/zk/ZkCoordinationServiceFactory.java
@@ -26,12 +26,10 @@ import org.apache.samza.coordinator.CoordinationServiceFactory;
public class ZkCoordinationServiceFactory implements CoordinationServiceFactory {
-
-
synchronized public CoordinationUtils getCoordinationService(String groupId, String participantId, Config config) {
ZkConfig zkConfig = new ZkConfig(config);
ZkClient zkClient = new ZkClient(zkConfig.getZkConnect(), zkConfig.getZkSessionTimeoutMs(), zkConfig.getZkConnectionTimeoutMs());
- ZkUtils zkUtils = new ZkUtils(participantId, new ZkKeyBuilder(groupId), zkClient, zkConfig.getZkConnectionTimeoutMs());
+ ZkUtils zkUtils = new ZkUtils(new ZkKeyBuilder(groupId), zkClient, zkConfig.getZkConnectionTimeoutMs());
ScheduleAfterDebounceTime debounceTimer = new ScheduleAfterDebounceTime();
return new ZkCoordinationUtils(participantId, zkConfig, zkUtils, debounceTimer);
}