You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by na...@apache.org on 2017/04/07 22:22:28 UTC

[1/3] samza git commit: SAMZA-1126 - Semantics of processorId in Samza

Repository: samza
Updated Branches:
  refs/heads/master c74722b47 -> a7da1840f


http://git-wip-us.apache.org/repos/asf/samza/blob/a7da1840/samza-core/src/test/java/org/apache/samza/container/grouper/task/TestTaskAssignmentManager.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/container/grouper/task/TestTaskAssignmentManager.java b/samza-core/src/test/java/org/apache/samza/container/grouper/task/TestTaskAssignmentManager.java
index d3eb7fb..1b5c904 100644
--- a/samza-core/src/test/java/org/apache/samza/container/grouper/task/TestTaskAssignmentManager.java
+++ b/samza-core/src/test/java/org/apache/samza/container/grouper/task/TestTaskAssignmentManager.java
@@ -34,9 +34,6 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 
 public class TestTaskAssignmentManager {
-
-  private final MockCoordinatorStreamSystemFactory mockCoordinatorStreamSystemFactory =
-      new MockCoordinatorStreamSystemFactory();
   private final Config config = new MapConfig(
       new HashMap<String, String>() {
         {
@@ -56,6 +53,8 @@ public class TestTaskAssignmentManager {
   }
 
   @Test public void testTaskAssignmentManager() throws Exception {
+    MockCoordinatorStreamSystemFactory mockCoordinatorStreamSystemFactory =
+        new MockCoordinatorStreamSystemFactory();
     MockCoordinatorStreamSystemProducer producer =
         mockCoordinatorStreamSystemFactory.getCoordinatorStreamSystemProducer(config, null);
     MockCoordinatorStreamSystemConsumer consumer =
@@ -70,22 +69,22 @@ public class TestTaskAssignmentManager {
     assertTrue(producer.isStarted());
     assertTrue(consumer.isStarted());
 
-    Map<String, Integer> expectedMap =
-      new HashMap<String, Integer>() {
+    Map<String, String> expectedMap =
+      new HashMap<String, String>() {
         {
-          this.put("Task0", new Integer(0));
-          this.put("Task1", new Integer(1));
-          this.put("Task2", new Integer(2));
-          this.put("Task3", new Integer(0));
-          this.put("Task4", new Integer(1));
+          this.put("Task0", "0");
+          this.put("Task1", "1");
+          this.put("Task2", "2");
+          this.put("Task3", "0");
+          this.put("Task4", "1");
         }
       };
 
-    for (Map.Entry<String, Integer> entry : expectedMap.entrySet()) {
+    for (Map.Entry<String, String> entry : expectedMap.entrySet()) {
       taskAssignmentManager.writeTaskContainerMapping(entry.getKey(), entry.getValue());
     }
 
-    Map<String, Integer> localMap = taskAssignmentManager.readTaskAssignment();
+    Map<String, String> localMap = taskAssignmentManager.readTaskAssignment();
 
     assertEquals(expectedMap, localMap);
 
@@ -95,6 +94,8 @@ public class TestTaskAssignmentManager {
   }
 
   @Test public void testDeleteMappings() throws Exception {
+    MockCoordinatorStreamSystemFactory mockCoordinatorStreamSystemFactory =
+        new MockCoordinatorStreamSystemFactory();
     MockCoordinatorStreamSystemProducer producer =
         mockCoordinatorStreamSystemFactory.getCoordinatorStreamSystemProducer(config, null);
     MockCoordinatorStreamSystemConsumer consumer =
@@ -109,23 +110,23 @@ public class TestTaskAssignmentManager {
     assertTrue(producer.isStarted());
     assertTrue(consumer.isStarted());
 
-    Map<String, Integer> expectedMap =
-      new HashMap<String, Integer>() {
+    Map<String, String> expectedMap =
+      new HashMap<String, String>() {
         {
-          this.put("Task0", new Integer(0));
-          this.put("Task1", new Integer(1));
+          this.put("Task0", "0");
+          this.put("Task1", "1");
         }
       };
 
-    for (Map.Entry<String, Integer> entry : expectedMap.entrySet()) {
+    for (Map.Entry<String, String> entry : expectedMap.entrySet()) {
       taskAssignmentManager.writeTaskContainerMapping(entry.getKey(), entry.getValue());
     }
 
-    Map<String, Integer> localMap = taskAssignmentManager.readTaskAssignment();
+    Map<String, String> localMap = taskAssignmentManager.readTaskAssignment();
     assertEquals(expectedMap, localMap);
 
     taskAssignmentManager.deleteTaskContainerMappings(localMap.keySet());
-    Map<String, Integer> deletedMap = taskAssignmentManager.readTaskAssignment();
+    Map<String, String> deletedMap = taskAssignmentManager.readTaskAssignment();
     assertTrue(deletedMap.isEmpty());
 
     taskAssignmentManager.stop();
@@ -134,6 +135,8 @@ public class TestTaskAssignmentManager {
   }
 
   @Test public void testTaskAssignmentManagerEmptyCoordinatorStream() throws Exception {
+    MockCoordinatorStreamSystemFactory mockCoordinatorStreamSystemFactory =
+        new MockCoordinatorStreamSystemFactory();
     MockCoordinatorStreamSystemProducer producer =
         mockCoordinatorStreamSystemFactory.getCoordinatorStreamSystemProducer(config, null);
     MockCoordinatorStreamSystemConsumer consumer =
@@ -148,8 +151,8 @@ public class TestTaskAssignmentManager {
     assertTrue(producer.isStarted());
     assertTrue(consumer.isStarted());
 
-    Map<String, Integer> expectedMap = new HashMap<>();
-    Map<String, Integer> localMap = taskAssignmentManager.readTaskAssignment();
+    Map<String, String> expectedMap = new HashMap<>();
+    Map<String, String> localMap = taskAssignmentManager.readTaskAssignment();
 
     assertEquals(expectedMap, localMap);
 

http://git-wip-us.apache.org/repos/asf/samza/blob/a7da1840/samza-core/src/test/java/org/apache/samza/container/mock/ContainerMocks.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/container/mock/ContainerMocks.java b/samza-core/src/test/java/org/apache/samza/container/mock/ContainerMocks.java
index 3b184d3..9369f4b 100644
--- a/samza-core/src/test/java/org/apache/samza/container/mock/ContainerMocks.java
+++ b/samza-core/src/test/java/org/apache/samza/container/mock/ContainerMocks.java
@@ -46,17 +46,17 @@ public class ContainerMocks {
       }
       j += taskCountPerContainer[i];
 
-      models.add(createContainerModel(i, partitions));
+      models.add(createContainerModel(String.valueOf(i), partitions));
     }
     return models;
   }
 
-  public static Map<String, Integer> generateTaskAssignments(int numContainers, int taskCount) {
-    Map<String, Integer> mapping = new HashMap<>(taskCount);
+  public static Map<String, String> generateTaskAssignments(int numContainers, int taskCount) {
+    Map<String, String> mapping = new HashMap<>(taskCount);
     Set<ContainerModel> containers = generateContainerModels(numContainers, taskCount);
     for (ContainerModel container : containers) {
       for (TaskName taskName : container.getTasks().keySet()) {
-        mapping.put(taskName.getTaskName(), container.getContainerId());
+        mapping.put(taskName.getTaskName(), container.getProcessorId());
       }
     }
     return mapping;
@@ -73,12 +73,12 @@ public class ContainerMocks {
     return newTaskCountPerContainer;
   }
 
-  public static ContainerModel createContainerModel(int containerId, int[] partitions) {
+  public static ContainerModel createContainerModel(String containerId, int[] partitions) {
     Map<TaskName, TaskModel> tasks = new HashMap<>();
     for (int partition : partitions) {
       tasks.put(getTaskName(partition), getTaskModel(partition));
     }
-    return new ContainerModel(containerId, tasks);
+    return new ContainerModel(containerId, -1, tasks);
   }
 
   public static Set<TaskModel> generateTaskModels(int[] partitions) {
@@ -117,11 +117,11 @@ public class ContainerMocks {
     return values;
   }
 
-  public static Map<String, Integer> generateTaskContainerMapping(Set<ContainerModel> containers) {
-    Map<String, Integer> taskMapping = new HashMap<>();
+  public static Map<String, String> generateTaskContainerMapping(Set<ContainerModel> containers) {
+    Map<String, String> taskMapping = new HashMap<>();
     for (ContainerModel container : containers) {
       for (TaskName taskName : container.getTasks().keySet()) {
-        taskMapping.put(taskName.getTaskName(), container.getContainerId());
+        taskMapping.put(taskName.getTaskName(), container.getProcessorId());
       }
     }
     return taskMapping;

http://git-wip-us.apache.org/repos/asf/samza/blob/a7da1840/samza-core/src/test/java/org/apache/samza/serializers/model/TestSamzaObjectMapper.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/serializers/model/TestSamzaObjectMapper.java b/samza-core/src/test/java/org/apache/samza/serializers/model/TestSamzaObjectMapper.java
index 2c64598..505acac 100644
--- a/samza-core/src/test/java/org/apache/samza/serializers/model/TestSamzaObjectMapper.java
+++ b/samza-core/src/test/java/org/apache/samza/serializers/model/TestSamzaObjectMapper.java
@@ -19,12 +19,7 @@
 
 package org.apache.samza.serializers.model;
 
-import static org.junit.Assert.assertEquals;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Set;
-
+import junit.framework.Assert;
 import org.apache.samza.Partition;
 import org.apache.samza.config.Config;
 import org.apache.samza.config.MapConfig;
@@ -34,12 +29,22 @@ import org.apache.samza.job.model.JobModel;
 import org.apache.samza.job.model.TaskModel;
 import org.apache.samza.system.SystemStreamPartition;
 import org.codehaus.jackson.map.ObjectMapper;
+import org.junit.Before;
 import org.junit.Test;
 
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+import static org.junit.Assert.assertEquals;
+
 public class TestSamzaObjectMapper {
-  @Test
-  public void testJsonTaskModel() throws Exception {
-    ObjectMapper mapper = SamzaObjectMapper.getObjectMapper();
+  private JobModel jobModel;
+
+  @Before
+  public void setup() throws IOException {
     Map<String, String> configMap = new HashMap<String, String>();
     Set<SystemStreamPartition> ssp = new HashSet<>();
     configMap.put("a", "b");
@@ -49,12 +54,42 @@ public class TestSamzaObjectMapper {
     TaskModel taskModel = new TaskModel(taskName, ssp, new Partition(2));
     Map<TaskName, TaskModel> tasks = new HashMap<TaskName, TaskModel>();
     tasks.put(taskName, taskModel);
-    ContainerModel containerModel = new ContainerModel(1, tasks);
-    Map<Integer, ContainerModel> containerMap = new HashMap<Integer, ContainerModel>();
-    containerMap.put(Integer.valueOf(1), containerModel);
-    JobModel jobModel = new JobModel(config, containerMap);
+    ContainerModel containerModel = new ContainerModel("1", 1, tasks);
+    Map<String, ContainerModel> containerMap = new HashMap<String, ContainerModel>();
+    containerMap.put("1", containerModel);
+    jobModel = new JobModel(config, containerMap);
+  }
+
+  @Test
+  public void testJsonTaskModel() throws Exception {
+    ObjectMapper mapper = SamzaObjectMapper.getObjectMapper();
+
     String str = mapper.writeValueAsString(jobModel);
     JobModel obj = mapper.readValue(str, JobModel.class);
     assertEquals(jobModel, obj);
   }
+
+  /**
+   * Critical test to guarantee compatibility between samza 0.12 container models and 0.13+
+   *
+   * Samza 0.12 contains only "container-id" (integer) in the ContainerModel. "processor-id" (String) is added in 0.13.
+   * When serializing, we serialize both the fields in 0.13. Deserialization correctly handles the fields in 0.13.
+   */
+  @Test
+  public void testContainerModelCompatible() {
+    try {
+      String newJobModelString = "{\"config\":{\"a\":\"b\"},\"containers\":{\"1\":{\"processor-id\":\"1\",\"container-id\":1,\"tasks\":{\"test\":{\"task-name\":\"test\",\"system-stream-partitions\":[{\"system\":\"foo\",\"partition\":1,\"stream\":\"bar\"}],\"changelog-partition\":2}}}},\"max-change-log-stream-partitions\":3,\"all-container-locality\":{\"1\":null}}";
+      ObjectMapper mapper = SamzaObjectMapper.getObjectMapper();
+      JobModel jobModel = mapper.readValue(newJobModelString, JobModel.class);
+
+      String oldJobModelString = "{\"config\":{\"a\":\"b\"},\"containers\":{\"1\":{\"container-id\":1,\"tasks\":{\"test\":{\"task-name\":\"test\",\"system-stream-partitions\":[{\"system\":\"foo\",\"partition\":1,\"stream\":\"bar\"}],\"changelog-partition\":2}}}},\"max-change-log-stream-partitions\":3,\"all-container-locality\":{\"1\":null}}";
+      ObjectMapper mapper1 = SamzaObjectMapper.getObjectMapper();
+      JobModel jobModel1 = mapper1.readValue(oldJobModelString, JobModel.class);
+
+      Assert.assertEquals(jobModel, jobModel1);
+    } catch (Exception e) {
+      e.printStackTrace();
+    }
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/a7da1840/samza-core/src/test/java/org/apache/samza/zk/TestZkBarrierForVersionUpgrade.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/zk/TestZkBarrierForVersionUpgrade.java b/samza-core/src/test/java/org/apache/samza/zk/TestZkBarrierForVersionUpgrade.java
index c0c0e6a..f5da9da 100644
--- a/samza-core/src/test/java/org/apache/samza/zk/TestZkBarrierForVersionUpgrade.java
+++ b/samza-core/src/test/java/org/apache/samza/zk/TestZkBarrierForVersionUpgrade.java
@@ -18,17 +18,13 @@
  */
 package org.apache.samza.zk;
 
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
 import junit.framework.Assert;
 import org.apache.samza.config.Config;
 import org.apache.samza.config.MapConfig;
 import org.apache.samza.config.ZkConfig;
 import org.apache.samza.coordinator.BarrierForVersionUpgrade;
-import org.apache.samza.coordinator.CoordinationUtils;
 import org.apache.samza.coordinator.CoordinationServiceFactory;
+import org.apache.samza.coordinator.CoordinationUtils;
 import org.apache.samza.testUtils.EmbeddedZookeeper;
 import org.junit.After;
 import org.junit.AfterClass;
@@ -36,6 +32,11 @@ import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.Test;
 
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
 
 public class TestZkBarrierForVersionUpgrade {
   private static EmbeddedZookeeper zkServer = null;
@@ -190,6 +191,5 @@ public class TestZkBarrierForVersionUpgrade {
       }
     });
     Assert.assertFalse(TestZkUtils.testWithDelayBackOff(() -> s.p1 && s.p2 && s.p3, 2, 400));
-
   }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/a7da1840/samza-core/src/test/java/org/apache/samza/zk/TestZkLeaderElector.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/zk/TestZkLeaderElector.java b/samza-core/src/test/java/org/apache/samza/zk/TestZkLeaderElector.java
index fb31054..e4f8944 100644
--- a/samza-core/src/test/java/org/apache/samza/zk/TestZkLeaderElector.java
+++ b/samza-core/src/test/java/org/apache/samza/zk/TestZkLeaderElector.java
@@ -531,7 +531,6 @@ public class TestZkLeaderElector {
   private ZkUtils getZkUtilsWithNewClient(String processorId) {
     ZkConnection zkConnection = ZkUtils.createZkConnection(testZkConnectionString, SESSION_TIMEOUT_MS);
     return new ZkUtils(
-        processorId,
         KEY_BUILDER,
         ZkUtils.createZkClient(zkConnection, CONNECTION_TIMEOUT_MS),
         CONNECTION_TIMEOUT_MS);

http://git-wip-us.apache.org/repos/asf/samza/blob/a7da1840/samza-core/src/test/java/org/apache/samza/zk/TestZkUtils.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/zk/TestZkUtils.java b/samza-core/src/test/java/org/apache/samza/zk/TestZkUtils.java
index fba6d0f..749c674 100644
--- a/samza-core/src/test/java/org/apache/samza/zk/TestZkUtils.java
+++ b/samza-core/src/test/java/org/apache/samza/zk/TestZkUtils.java
@@ -18,9 +18,6 @@
  */
 package org.apache.samza.zk;
 
-import java.util.HashMap;
-import java.util.Map;
-import java.util.function.BooleanSupplier;
 import org.I0Itec.zkclient.IZkDataListener;
 import org.I0Itec.zkclient.ZkClient;
 import org.I0Itec.zkclient.ZkConnection;
@@ -37,6 +34,10 @@ import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.Test;
 
+import java.util.HashMap;
+import java.util.Map;
+import java.util.function.BooleanSupplier;
+
 public class TestZkUtils {
   private static EmbeddedZookeeper zkServer = null;
   private static final ZkKeyBuilder KEY_BUILDER = new ZkKeyBuilder("test");
@@ -67,7 +68,6 @@ public class TestZkUtils {
     }
 
     zkUtils = new ZkUtils(
-        "testProcessorId",
         KEY_BUILDER,
         zkClient,
         SESSION_TIMEOUT_MS);
@@ -188,7 +188,7 @@ public class TestZkUtils {
 
     // create job model
     Map<String, String> configMap = new HashMap<>();
-    Map<Integer, ContainerModel> containers = new HashMap<>();
+    Map<String, ContainerModel> containers = new HashMap<>();
     MapConfig config = new MapConfig(configMap);
     JobModel jobModel = new JobModel(config, containers);
 

http://git-wip-us.apache.org/repos/asf/samza/blob/a7da1840/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala b/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala
index bb72b72..010ff7e 100644
--- a/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala
+++ b/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala
@@ -19,7 +19,6 @@
 
 package org.apache.samza.container
 
-import java.lang.Thread.UncaughtExceptionHandler
 import java.util
 import java.util.concurrent.TimeUnit
 import java.util.concurrent.atomic.AtomicReference
@@ -29,37 +28,24 @@ import org.apache.samza.checkpoint.{Checkpoint, CheckpointManager}
 import org.apache.samza.config.{Config, MapConfig}
 import org.apache.samza.coordinator.JobModelManager
 import org.apache.samza.coordinator.server.{HttpServer, JobServlet}
-import org.apache.samza.job.model.ContainerModel
-import org.apache.samza.job.model.JobModel
-import org.apache.samza.job.model.TaskModel
-import org.apache.samza.serializers._
+import org.apache.samza.job.model.{ContainerModel, JobModel, TaskModel}
+import org.apache.samza.serializers.SerdeManager
 import org.apache.samza.storage.TaskStorageManager
-import org.apache.samza.system.IncomingMessageEnvelope
-import org.apache.samza.system.StreamMetadataCache
-import org.apache.samza.system.SystemConsumer
-import org.apache.samza.system.SystemConsumers
-import org.apache.samza.system.SystemProducer
-import org.apache.samza.system.SystemProducers
-import org.apache.samza.system.SystemStream
-import org.apache.samza.system.SystemStreamPartition
 import org.apache.samza.system.chooser.RoundRobinChooser
-import org.apache.samza.task.ClosableTask
-import org.apache.samza.task.InitableTask
-import org.apache.samza.task.MessageCollector
-import org.apache.samza.task.StreamTask
-import org.apache.samza.task.TaskContext
-import org.apache.samza.task.TaskCoordinator
-import org.apache.samza.task.TaskInstanceCollector
+import org.apache.samza.system.{IncomingMessageEnvelope, StreamMetadataCache, SystemConsumer, SystemConsumers, SystemProducer, SystemProducers, SystemStream, SystemStreamPartition}
+import org.apache.samza.task.{ClosableTask, InitableTask, MessageCollector, StreamTask, TaskContext, TaskCoordinator, TaskInstanceCollector}
 import org.apache.samza.util.SinglePartitionWithoutOffsetsSystemAdmin
 import org.junit.Assert._
 import org.junit.Test
-import org.mockito.Mockito._
+import org.mockito.Mockito.when
 import org.mockito.invocation.InvocationOnMock
 import org.mockito.stubbing.Answer
 import org.scalatest.junit.AssertionsForJUnit
 import org.scalatest.mockito.MockitoSugar
 
 import scala.collection.JavaConverters._
+import org.mockito.Mockito.when
+import scala.collection.JavaConversions._
 
 class TestSamzaContainer extends AssertionsForJUnit with MockitoSugar {
   @Test
@@ -71,9 +57,9 @@ class TestSamzaContainer extends AssertionsForJUnit with MockitoSugar {
       new TaskName("t1") -> new TaskModel(new TaskName("t1"), offsets.keySet(), new Partition(0)),
       new TaskName("t2") -> new TaskModel(new TaskName("t2"), offsets.keySet(), new Partition(0)))
     val containers = Map(
-      Integer.valueOf(0) -> new ContainerModel(0, tasks.asJava),
-      Integer.valueOf(1) -> new ContainerModel(1, tasks.asJava))
-    val jobModel = new JobModel(config, containers.asJava)
+      "0" -> new ContainerModel("0", 0, tasks),
+      "1" -> new ContainerModel("1", 0, tasks))
+    val jobModel = new JobModel(config, containers)
     def jobModelGenerator(): JobModel = jobModel
     val server = new HttpServer
     val coordinator = new JobModelManager(jobModel, server)
@@ -96,9 +82,9 @@ class TestSamzaContainer extends AssertionsForJUnit with MockitoSugar {
       new TaskName("t1") -> new TaskModel(new TaskName("t1"), offsets.keySet(), new Partition(0)),
       new TaskName("t2") -> new TaskModel(new TaskName("t2"), offsets.keySet(), new Partition(0)))
     val containers = Map(
-      Integer.valueOf(0) -> new ContainerModel(0, tasks.asJava),
-      Integer.valueOf(1) -> new ContainerModel(1, tasks.asJava))
-    val jobModel = new JobModel(config, containers.asJava)
+      "0" -> new ContainerModel("0", 0, tasks),
+      "1" -> new ContainerModel("1", 1, tasks))
+    val jobModel = new JobModel(config, containers)
     def jobModelGenerator(): JobModel = jobModel
     val server = new HttpServer
     val coordinator = new JobModelManager(jobModel, server)
@@ -126,12 +112,12 @@ class TestSamzaContainer extends AssertionsForJUnit with MockitoSugar {
       new TaskName("t3") -> new TaskModel(new TaskName("t3"), offsets.keySet(), new Partition(2)),
       new TaskName("t4") -> new TaskModel(new TaskName("t4"), offsets.keySet(), new Partition(3)),
       new TaskName("t5") -> new TaskModel(new TaskName("t6"), offsets.keySet(), new Partition(4)))
-    val containerModel1 = new ContainerModel(0, tasksForContainer1.asJava)
-    val containerModel2 = new ContainerModel(1, tasksForContainer2.asJava)
+    val containerModel1 = new ContainerModel("0", 0, tasksForContainer1)
+    val containerModel2 = new ContainerModel("1", 1, tasksForContainer2)
     val containers = Map(
-      Integer.valueOf(0) -> containerModel1,
-      Integer.valueOf(1) -> containerModel2)
-    val jobModel = new JobModel(config, containers.asJava)
+      "0" -> containerModel1,
+      "1" -> containerModel2)
+    val jobModel = new JobModel(config, containers)
     assertEquals(jobModel.maxChangeLogStreamPartitions, 5)
   }
 
@@ -179,7 +165,7 @@ class TestSamzaContainer extends AssertionsForJUnit with MockitoSugar {
       Map[String, SystemProducer](),
       new SerdeManager)
     val collector = new TaskInstanceCollector(producerMultiplexer)
-    val containerContext = new SamzaContainerContext(0, config, Set(taskName).asJava)
+    val containerContext = new SamzaContainerContext("0", config, Set[TaskName](taskName))
     val taskInstance: TaskInstance = new TaskInstance(
       task,
       taskName,
@@ -238,7 +224,7 @@ class TestSamzaContainer extends AssertionsForJUnit with MockitoSugar {
       Map[String, SystemProducer](),
       new SerdeManager)
     val collector = new TaskInstanceCollector(producerMultiplexer)
-    val containerContext = new SamzaContainerContext(0, config, Set(taskName).asJava)
+    val containerContext = new SamzaContainerContext("0", config, Set[TaskName](taskName))
     val taskInstance: TaskInstance = new TaskInstance(
       task,
       taskName,
@@ -287,7 +273,7 @@ class TestSamzaContainer extends AssertionsForJUnit with MockitoSugar {
       Map[String, SystemProducer](),
       new SerdeManager)
     val collector = new TaskInstanceCollector(producerMultiplexer)
-    val containerContext = new SamzaContainerContext(0, config, Set(taskName).asJava)
+    val containerContext = new SamzaContainerContext("0", config, Set[TaskName](taskName))
     val mockTaskStorageManager = mock[TaskStorageManager]
 
     when(mockTaskStorageManager.init).thenAnswer(new Answer[String] {

http://git-wip-us.apache.org/repos/asf/samza/blob/a7da1840/samza-core/src/test/scala/org/apache/samza/container/TestTaskInstance.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/test/scala/org/apache/samza/container/TestTaskInstance.scala b/samza-core/src/test/scala/org/apache/samza/container/TestTaskInstance.scala
index 086eb85..40974a6 100644
--- a/samza-core/src/test/scala/org/apache/samza/container/TestTaskInstance.scala
+++ b/samza-core/src/test/scala/org/apache/samza/container/TestTaskInstance.scala
@@ -70,7 +70,7 @@ class TestTaskInstance {
     val offsetManager = OffsetManager(Map(systemStream -> testSystemStreamMetadata), config)
     val taskName = new TaskName("taskName")
     val collector = new TaskInstanceCollector(producerMultiplexer)
-    val containerContext = new SamzaContainerContext(0, config, Set(taskName).asJava)
+    val containerContext = new SamzaContainerContext("0", config, Set(taskName).asJava)
     val taskInstance: TaskInstance = new TaskInstance(
       task,
       taskName,
@@ -165,7 +165,7 @@ class TestTaskInstance {
     val offsetManager = OffsetManager(Map(systemStream -> testSystemStreamMetadata), config)
     val taskName = new TaskName("taskName")
     val collector = new TaskInstanceCollector(producerMultiplexer)
-    val containerContext = new SamzaContainerContext(0, config, Set(taskName).asJava)
+    val containerContext = new SamzaContainerContext("0", config, Set(taskName).asJava)
 
     val registry = new MetricsRegistryMap
     val taskMetrics = new TaskInstanceMetrics(registry = registry)
@@ -222,7 +222,7 @@ class TestTaskInstance {
     val offsetManager = OffsetManager(Map(systemStream -> testSystemStreamMetadata), config)
     val taskName = new TaskName("taskName")
     val collector = new TaskInstanceCollector(producerMultiplexer)
-    val containerContext = new SamzaContainerContext(0, config, Set(taskName).asJava)
+    val containerContext = new SamzaContainerContext("0", config, Set(taskName).asJava)
 
     val registry = new MetricsRegistryMap
     val taskMetrics = new TaskInstanceMetrics(registry = registry)
@@ -281,7 +281,7 @@ class TestTaskInstance {
     val metrics = new TaskInstanceMetrics()
     val taskName = new TaskName("Offset Reset Task 0")
     val collector = new TaskInstanceCollector(producers)
-    val containerContext = new SamzaContainerContext(0, config, Set(taskName).asJava)
+    val containerContext = new SamzaContainerContext("0", config, Set(taskName).asJava)
 
     val offsetManager = new OffsetManager()
 
@@ -316,7 +316,7 @@ class TestTaskInstance {
     val metrics = new TaskInstanceMetrics()
     val taskName = new TaskName("testing")
     val collector = new TaskInstanceCollector(producers)
-    val containerContext = new SamzaContainerContext(0, config, Set(taskName).asJava)
+    val containerContext = new SamzaContainerContext("0", config, Set(taskName).asJava)
     val offsetManager = new OffsetManager()
     offsetManager.startingOffsets += taskName -> Map(partition0 -> "0", partition1 -> "100")
     val systemAdmins = Map("system" -> new MockSystemAdmin)

http://git-wip-us.apache.org/repos/asf/samza/blob/a7da1840/samza-core/src/test/scala/org/apache/samza/coordinator/TestJobCoordinator.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/test/scala/org/apache/samza/coordinator/TestJobCoordinator.scala b/samza-core/src/test/scala/org/apache/samza/coordinator/TestJobCoordinator.scala
index e7397bb..0b6dd8b 100644
--- a/samza-core/src/test/scala/org/apache/samza/coordinator/TestJobCoordinator.scala
+++ b/samza-core/src/test/scala/org/apache/samza/coordinator/TestJobCoordinator.scala
@@ -79,8 +79,8 @@ class TestJobCoordinator extends FlatSpec with PrivateMethodTester {
     val container1Tasks = Map(
       task1Name -> new TaskModel(task1Name, checkpoint1.keySet.asJava, new Partition(3)))
     val containers = Map(
-      Integer.valueOf(0) -> new ContainerModel(0, container0Tasks.asJava),
-      Integer.valueOf(1) -> new ContainerModel(1, container1Tasks.asJava))
+      "0" -> new ContainerModel("0", 0, container0Tasks.asJava),
+      "1" -> new ContainerModel("1", 1, container1Tasks.asJava))
 
 
     // The test does not pass offsets for task2 (Partition 2) to the checkpointmanager, this will verify that we get an offset 0 for this partition
@@ -151,9 +151,8 @@ class TestJobCoordinator extends FlatSpec with PrivateMethodTester {
     val container1Tasks = Map(
       task1Name -> new TaskModel(task1Name, ssp1.asJava, new Partition(3)))
     val containers = Map(
-      Integer.valueOf(0) -> new ContainerModel(0, container0Tasks.asJava),
-      Integer.valueOf(1) -> new ContainerModel(1, container1Tasks.asJava))
-
+      Integer.valueOf(0) -> new ContainerModel("0", 0, container0Tasks.asJava),
+      Integer.valueOf(1) -> new ContainerModel("1", 1, container1Tasks.asJava))
     val changelogInfo0 = MockCoordinatorStreamWrappedConsumer.CHANGELOGPREFIX + "mock:" + task0Name.getTaskName() -> "4"
 
     // Configs which are processed by the MockCoordinatorStream as special configs which are interpreted as
@@ -211,7 +210,7 @@ class TestJobCoordinator extends FlatSpec with PrivateMethodTester {
     val container0Tasks = Map(
       task1Name -> new TaskModel(task1Name, Set(new SystemStreamPartition("test", "stream1", new Partition(1))).asJava, new Partition(0)))
     val containers = Map(
-      Integer.valueOf(0) -> new ContainerModel(0, container0Tasks.asJava))
+      "0" -> new ContainerModel("0", 0, container0Tasks.asJava))
     val jobModel = new JobModel(config, containers.asJava)
     assertEquals(config, coordinator.jobModel.getConfig)
     assertEquals(jobModel, coordinator.jobModel)
@@ -233,7 +232,7 @@ class TestJobCoordinator extends FlatSpec with PrivateMethodTester {
       task1Name -> new TaskModel(task1Name, Set(new SystemStreamPartition("test", "stream1", new Partition(1))).asJava, new Partition(0)))
 
     val containers = Map(
-      Integer.valueOf(0) -> new ContainerModel(0, container0Tasks.asJava))
+      "0" -> new ContainerModel("0", 0, container0Tasks.asJava))
     val jobModel = new JobModel(config, containers.asJava)
     assertEquals(config, coordinator.jobModel.getConfig)
     assertEquals(jobModel, coordinator.jobModel)

http://git-wip-us.apache.org/repos/asf/samza/blob/a7da1840/samza-core/src/test/scala/org/apache/samza/job/TestShellCommandBuilder.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/test/scala/org/apache/samza/job/TestShellCommandBuilder.scala b/samza-core/src/test/scala/org/apache/samza/job/TestShellCommandBuilder.scala
index 47e1b0a..4df53fd 100644
--- a/samza-core/src/test/scala/org/apache/samza/job/TestShellCommandBuilder.scala
+++ b/samza-core/src/test/scala/org/apache/samza/job/TestShellCommandBuilder.scala
@@ -33,7 +33,7 @@ class TestShellCommandBuilder {
     val config = new MapConfig(Map(ShellCommandConfig.COMMAND_SHELL_EXECUTE -> "foo").asJava)
     val scb = new ShellCommandBuilder
     scb.setConfig(config)
-    scb.setId(1)
+    scb.setId("1")
     scb.setUrl(new URL(urlStr))
     val command = scb.buildCommand
     val environment = scb.buildEnvironment
@@ -49,7 +49,7 @@ class TestShellCommandBuilder {
     val config = new MapConfig(Map(ShellCommandConfig.COMMAND_SHELL_EXECUTE -> "foo").asJava)
     val scb = new ShellCommandBuilder
     scb.setConfig(config)
-    scb.setId(1)
+    scb.setId("1")
     scb.setUrl(new URL(urlStr))
     val command = scb.buildCommand
     assertEquals("foo", command)

http://git-wip-us.apache.org/repos/asf/samza/blob/a7da1840/samza-kv-rocksdb/src/main/java/org/apache/samza/storage/kv/RocksDbKeyValueReader.java
----------------------------------------------------------------------
diff --git a/samza-kv-rocksdb/src/main/java/org/apache/samza/storage/kv/RocksDbKeyValueReader.java b/samza-kv-rocksdb/src/main/java/org/apache/samza/storage/kv/RocksDbKeyValueReader.java
index f570422..8e8cc31 100644
--- a/samza-kv-rocksdb/src/main/java/org/apache/samza/storage/kv/RocksDbKeyValueReader.java
+++ b/samza-kv-rocksdb/src/main/java/org/apache/samza/storage/kv/RocksDbKeyValueReader.java
@@ -66,7 +66,7 @@ public class RocksDbKeyValueReader {
     ArrayList<TaskName> taskNameList = new ArrayList<TaskName>();
     taskNameList.add(new TaskName("read-rocks-db"));
     SamzaContainerContext samzaContainerContext =
-        new SamzaContainerContext(0,  config, taskNameList);
+        new SamzaContainerContext("0",  config, taskNameList);
     Options options = RocksDbOptionsHelper.options(config, samzaContainerContext);
 
     // open the db

http://git-wip-us.apache.org/repos/asf/samza/blob/a7da1840/samza-rest/src/main/java/org/apache/samza/rest/model/Task.java
----------------------------------------------------------------------
diff --git a/samza-rest/src/main/java/org/apache/samza/rest/model/Task.java b/samza-rest/src/main/java/org/apache/samza/rest/model/Task.java
index f1225ec..94e8370 100644
--- a/samza-rest/src/main/java/org/apache/samza/rest/model/Task.java
+++ b/samza-rest/src/main/java/org/apache/samza/rest/model/Task.java
@@ -36,7 +36,7 @@ public class Task {
   private String taskName;
 
   // containerId of the samza container in which the task is running
-  private int containerId;
+  private String containerId;
 
   // list of partitions that belong to the task.
   private List<Partition> partitions;
@@ -49,7 +49,7 @@ public class Task {
 
   public Task(@JsonProperty("preferredHost") String preferredHost,
               @JsonProperty("taskName") String taskName,
-              @JsonProperty("containerId") int containerId,
+              @JsonProperty("containerId") String containerId,
               @JsonProperty("partitions") List<Partition> partitions,
               @JsonProperty("storeNames") List<String> storeNames) {
     this.preferredHost = preferredHost;
@@ -67,11 +67,11 @@ public class Task {
     this.preferredHost = preferredHost;
   }
 
-  public int getContainerId() {
+  public String getContainerId() {
     return containerId;
   }
 
-  public void setContainerId(int containerId) {
+  public void setContainerId(String containerId) {
     this.containerId = containerId;
   }
 
@@ -110,7 +110,7 @@ public class Task {
 
     Task task = (Task) o;
 
-    if (containerId != task.containerId) {
+    if (containerId != null && containerId.equals(task.containerId)) {
       return false;
     }
     if (!preferredHost.equals(task.preferredHost)) {
@@ -129,7 +129,7 @@ public class Task {
   public int hashCode() {
     int result = preferredHost.hashCode();
     result = 31 * result + taskName.hashCode();
-    result = 31 * result + containerId;
+    result = 31 * result + containerId.hashCode();
     result = 31 * result + partitions.hashCode();
     result = 31 * result + storeNames.hashCode();
     return result;

http://git-wip-us.apache.org/repos/asf/samza/blob/a7da1840/samza-rest/src/main/java/org/apache/samza/rest/proxy/task/SamzaTaskProxy.java
----------------------------------------------------------------------
diff --git a/samza-rest/src/main/java/org/apache/samza/rest/proxy/task/SamzaTaskProxy.java b/samza-rest/src/main/java/org/apache/samza/rest/proxy/task/SamzaTaskProxy.java
index 27c88e5..c40c168 100644
--- a/samza-rest/src/main/java/org/apache/samza/rest/proxy/task/SamzaTaskProxy.java
+++ b/samza-rest/src/main/java/org/apache/samza/rest/proxy/task/SamzaTaskProxy.java
@@ -91,10 +91,10 @@ public class SamzaTaskProxy implements TaskProxy {
     StorageConfig storageConfig = new StorageConfig(jobModel.getConfig());
 
     List<String> storeNames = JavaConverters.seqAsJavaListConverter(storageConfig.getStoreNames()).asJava();
-    Map<Integer, String> containerLocality = jobModel.getAllContainerLocality();
+    Map<String, String> containerLocality = jobModel.getAllContainerLocality();
     List<Task> tasks = new ArrayList<>();
     for (ContainerModel containerModel : jobModel.getContainers().values()) {
-      int containerId = containerModel.getContainerId();
+      String containerId = containerModel.getProcessorId();
       String host = containerLocality.get(containerId);
       for (TaskModel taskModel : containerModel.getTasks().values()) {
         String taskName = taskModel.getTaskName().getTaskName();

http://git-wip-us.apache.org/repos/asf/samza/blob/a7da1840/samza-rest/src/test/java/org/apache/samza/monitor/TestLocalStoreMonitor.java
----------------------------------------------------------------------
diff --git a/samza-rest/src/test/java/org/apache/samza/monitor/TestLocalStoreMonitor.java b/samza-rest/src/test/java/org/apache/samza/monitor/TestLocalStoreMonitor.java
index 02ec321..d0b6962 100644
--- a/samza-rest/src/test/java/org/apache/samza/monitor/TestLocalStoreMonitor.java
+++ b/samza-rest/src/test/java/org/apache/samza/monitor/TestLocalStoreMonitor.java
@@ -64,7 +64,7 @@ public class TestLocalStoreMonitor {
     // Set default return values for methods.
     Mockito.when(jobsClientMock.getJobStatus(Mockito.any()))
            .thenReturn(JobStatus.STOPPED);
-    Task task = new Task("localHost", "test-task", 0,
+    Task task = new Task("localHost", "test-task", "0",
                          new ArrayList<>(), ImmutableList.of("test-store"));
     Mockito.when(jobsClientMock.getTasks(Mockito.any()))
            .thenReturn(ImmutableList.of(task));
@@ -136,7 +136,7 @@ public class TestLocalStoreMonitor {
   // TODO: Fix in SAMZA-1183
   //@Test
   public void shouldDeleteTaskStoreWhenTaskPreferredStoreIsNotLocalHost() throws Exception {
-    Task task = new Task("notLocalHost", "test-task", 0,
+    Task task = new Task("notLocalHost", "test-task", "0",
                          new ArrayList<>(), ImmutableList.of("test-store"));
     Mockito.when(jobsClientMock.getTasks(Mockito.any()))
            .thenReturn(ImmutableList.of(task));

http://git-wip-us.apache.org/repos/asf/samza/blob/a7da1840/samza-rest/src/test/java/org/apache/samza/rest/resources/mock/MockTaskProxy.java
----------------------------------------------------------------------
diff --git a/samza-rest/src/test/java/org/apache/samza/rest/resources/mock/MockTaskProxy.java b/samza-rest/src/test/java/org/apache/samza/rest/resources/mock/MockTaskProxy.java
index 45f252a..de741ba 100644
--- a/samza-rest/src/test/java/org/apache/samza/rest/resources/mock/MockTaskProxy.java
+++ b/samza-rest/src/test/java/org/apache/samza/rest/resources/mock/MockTaskProxy.java
@@ -41,11 +41,11 @@ public class MockTaskProxy extends SamzaTaskProxy {
       new SystemStreamPartition(SYSTEM_NAME, STREAM_NAME, new Partition(PARTITION_ID)));
 
   public static final String TASK_1_NAME = "Task1";
-  public static final int TASK_1_CONTAINER_ID = 1;
+  public static final String TASK_1_CONTAINER_ID = "1";
   public static final Partition CHANGE_LOG_PARTITION = new Partition(0);
 
   public static final String TASK_2_NAME = "Task2";
-  public static final int TASK_2_CONTAINER_ID = 2;
+  public static final String TASK_2_CONTAINER_ID = "2";
 
   public MockTaskProxy() {
     super(new TaskResourceConfig(new MapConfig()),
@@ -60,10 +60,10 @@ public class MockTaskProxy extends SamzaTaskProxy {
     }
     TaskModel task1Model = new TaskModel(new TaskName(TASK_1_NAME), SYSTEM_STREAM_PARTITIONS, CHANGE_LOG_PARTITION);
     TaskModel task2Model = new TaskModel(new TaskName(TASK_2_NAME), SYSTEM_STREAM_PARTITIONS, CHANGE_LOG_PARTITION);
-    ContainerModel task1ContainerModel = new ContainerModel(TASK_1_CONTAINER_ID,
+    ContainerModel task1ContainerModel = new ContainerModel(TASK_1_CONTAINER_ID, 1,
                                                             ImmutableMap.of(new TaskName(TASK_1_NAME),
                                                                             task1Model));
-    ContainerModel task2ContainerModel = new ContainerModel(TASK_2_CONTAINER_ID,
+    ContainerModel task2ContainerModel = new ContainerModel(TASK_2_CONTAINER_ID, 2,
                                                             ImmutableMap.of(new TaskName(TASK_2_NAME),
                                                                             task2Model));
     return new JobModel(new MapConfig(), ImmutableMap.of(TASK_1_CONTAINER_ID, task1ContainerModel,

http://git-wip-us.apache.org/repos/asf/samza/blob/a7da1840/samza-test/src/main/scala/org/apache/samza/test/performance/TestKeyValuePerformance.scala
----------------------------------------------------------------------
diff --git a/samza-test/src/main/scala/org/apache/samza/test/performance/TestKeyValuePerformance.scala b/samza-test/src/main/scala/org/apache/samza/test/performance/TestKeyValuePerformance.scala
index 5d1b497..8e853b7 100644
--- a/samza-test/src/main/scala/org/apache/samza/test/performance/TestKeyValuePerformance.scala
+++ b/samza-test/src/main/scala/org/apache/samza/test/performance/TestKeyValuePerformance.scala
@@ -116,7 +116,7 @@ object TestKeyValuePerformance extends Logging {
           new TaskInstanceCollector(producerMultiplexer),
           new MetricsRegistryMap,
           null,
-          new SamzaContainerContext(0, config, taskNames)
+          new SamzaContainerContext("0", config, taskNames)
         )
 
         val db = if(!engine.isInstanceOf[KeyValueStorageEngine[_,_]]) {

http://git-wip-us.apache.org/repos/asf/samza/blob/a7da1840/samza-test/src/test/java/org/apache/samza/test/processor/TestStreamProcessor.java
----------------------------------------------------------------------
diff --git a/samza-test/src/test/java/org/apache/samza/test/processor/TestStreamProcessor.java b/samza-test/src/test/java/org/apache/samza/test/processor/TestStreamProcessor.java
index 2c44aea..070e7a7 100644
--- a/samza-test/src/test/java/org/apache/samza/test/processor/TestStreamProcessor.java
+++ b/samza-test/src/test/java/org/apache/samza/test/processor/TestStreamProcessor.java
@@ -65,11 +65,11 @@ public class TestStreamProcessor extends StandaloneIntegrationTestHarness {
     final String outputTopic = "output";
     final int messageCount = 20;
 
-    final Config configs = new MapConfig(createConfigs(testSystem, inputTopic, outputTopic, messageCount));
+    final Config configs = new MapConfig(createConfigs("1", testSystem, inputTopic, outputTopic, messageCount));
     // Note: createTopics needs to be called before creating a StreamProcessor. Otherwise it fails with a
     // TopicExistsException since StreamProcessor auto-creates them.
     createTopics(inputTopic, outputTopic);
-    final StreamProcessor processor = new StreamProcessor(1, new MapConfig(configs), new HashMap<>(), IdentityStreamTask::new);
+    final StreamProcessor processor = new StreamProcessor(new MapConfig(configs), new HashMap<>(), IdentityStreamTask::new);
 
     produceMessages(inputTopic, messageCount);
     run(processor, endLatch);
@@ -86,10 +86,10 @@ public class TestStreamProcessor extends StandaloneIntegrationTestHarness {
     final String outputTopic = "output2";
     final int messageCount = 20;
 
-    final Config configs = new MapConfig(createConfigs(testSystem, inputTopic, outputTopic, messageCount));
+    final Config configs = new MapConfig(createConfigs("1", testSystem, inputTopic, outputTopic, messageCount));
     createTopics(inputTopic, outputTopic);
     final StreamTaskFactory stf = IdentityStreamTask::new;
-    final StreamProcessor processor = new StreamProcessor(1, configs, new HashMap<>(), stf);
+    final StreamProcessor processor = new StreamProcessor(configs, new HashMap<>(), stf);
 
     produceMessages(inputTopic, messageCount);
     run(processor, endLatch);
@@ -106,11 +106,11 @@ public class TestStreamProcessor extends StandaloneIntegrationTestHarness {
     final String outputTopic = "output3";
     final int messageCount = 20;
 
-    final Config configs = new MapConfig(createConfigs(testSystem, inputTopic, outputTopic, messageCount));
+    final Config configs = new MapConfig(createConfigs("1", testSystem, inputTopic, outputTopic, messageCount));
     final ExecutorService executorService = Executors.newSingleThreadExecutor();
     createTopics(inputTopic, outputTopic);
     final AsyncStreamTaskFactory stf = () -> new AsyncStreamTaskAdapter(new IdentityStreamTask(), executorService);
-    final StreamProcessor processor = new StreamProcessor(1, configs, new HashMap<>(), stf);
+    final StreamProcessor processor = new StreamProcessor(configs, new HashMap<>(), stf);
 
     produceMessages(inputTopic, messageCount);
     run(processor, endLatch);
@@ -128,11 +128,11 @@ public class TestStreamProcessor extends StandaloneIntegrationTestHarness {
     final String outputTopic = "output4";
     final int messageCount = 20;
 
-    final Map<String, String> configMap = createConfigs(testSystem, inputTopic, outputTopic, messageCount);
+    final Map<String, String> configMap = createConfigs("1", testSystem, inputTopic, outputTopic, messageCount);
     configMap.remove("task.class");
     final Config configs = new MapConfig(configMap);
 
-    StreamProcessor processor = new StreamProcessor(1, configs, new HashMap<>(), (StreamTaskFactory) null);
+    StreamProcessor processor = new StreamProcessor(configs, new HashMap<>(), (StreamTaskFactory) null);
     run(processor, endLatch);
   }
 
@@ -141,7 +141,7 @@ public class TestStreamProcessor extends StandaloneIntegrationTestHarness {
     TestUtils.createTopic(zkUtils(), outputTopic, 1, 1, servers(), new Properties());
   }
 
-  private Map<String, String> createConfigs(String testSystem, String inputTopic, String outputTopic, int messageCount) {
+  private Map<String, String> createConfigs(String processorId, String testSystem, String inputTopic, String outputTopic, int messageCount) {
     Map<String, String> configs = new HashMap<>();
     configs.putAll(
         StandaloneTestUtils.getStandaloneConfigs("test-job", "org.apache.samza.test.processor.IdentityStreamTask"));
@@ -152,6 +152,7 @@ public class TestStreamProcessor extends StandaloneIntegrationTestHarness {
     configs.put("app.outputTopic", outputTopic);
     configs.put("app.outputSystem", testSystem);
     configs.put(ZkConfig.ZK_CONNECT, zkConnect());
+    configs.put("processor.id", processorId);
     return configs;
   }
 

http://git-wip-us.apache.org/repos/asf/samza/blob/a7da1840/samza-test/src/test/scala/org/apache/samza/test/integration/StreamTaskTestUtil.scala
----------------------------------------------------------------------
diff --git a/samza-test/src/test/scala/org/apache/samza/test/integration/StreamTaskTestUtil.scala b/samza-test/src/test/scala/org/apache/samza/test/integration/StreamTaskTestUtil.scala
index 7a107f6..cda2690 100644
--- a/samza-test/src/test/scala/org/apache/samza/test/integration/StreamTaskTestUtil.scala
+++ b/samza-test/src/test/scala/org/apache/samza/test/integration/StreamTaskTestUtil.scala
@@ -35,7 +35,7 @@ import org.apache.samza.Partition
 import org.apache.samza.checkpoint.Checkpoint
 import org.apache.kafka.common.protocol.SecurityProtocol
 import org.apache.kafka.common.security.JaasUtils
-import org.apache.samza.config.{Config, KafkaProducerConfig, MapConfig}
+import org.apache.samza.config.{ApplicationConfig, Config, KafkaProducerConfig, MapConfig}
 import org.apache.samza.container.TaskName
 import org.apache.samza.job.local.ThreadJobFactory
 import org.apache.samza.job.{ApplicationStatus, JobRunner, StreamJob}
@@ -81,6 +81,7 @@ object StreamTaskTestUtil {
   var jobConfig = Map(
     "job.factory.class" -> classOf[ThreadJobFactory].getCanonicalName,
     "job.coordinator.system" -> "kafka",
+    ApplicationConfig.PROCESSOR_ID -> "1",
     "task.inputs" -> "kafka.input",
     "serializers.registry.string.class" -> "org.apache.samza.serializers.StringSerdeFactory",
     "systems.kafka.samza.factory" -> "org.apache.samza.system.kafka.KafkaSystemFactory",

http://git-wip-us.apache.org/repos/asf/samza/blob/a7da1840/samza-yarn/src/main/java/org/apache/samza/job/yarn/YarnAppState.java
----------------------------------------------------------------------
diff --git a/samza-yarn/src/main/java/org/apache/samza/job/yarn/YarnAppState.java b/samza-yarn/src/main/java/org/apache/samza/job/yarn/YarnAppState.java
index 7e563f1..db67de6 100644
--- a/samza-yarn/src/main/java/org/apache/samza/job/yarn/YarnAppState.java
+++ b/samza-yarn/src/main/java/org/apache/samza/job/yarn/YarnAppState.java
@@ -49,7 +49,7 @@ public class YarnAppState {
   * Modified by both the AMRMCallbackThread and the ContainerAllocator thread
   */
 
-  public Map<Integer, YarnContainer> runningYarnContainers = new ConcurrentHashMap<Integer, YarnContainer>()  ;
+  public Map<String, YarnContainer> runningYarnContainers = new ConcurrentHashMap<String, YarnContainer>()  ;
 
   public ConcurrentMap<String, ContainerStatus> failedContainersStatus = new ConcurrentHashMap<String, ContainerStatus>();
 

http://git-wip-us.apache.org/repos/asf/samza/blob/a7da1840/samza-yarn/src/main/java/org/apache/samza/job/yarn/YarnClusterResourceManager.java
----------------------------------------------------------------------
diff --git a/samza-yarn/src/main/java/org/apache/samza/job/yarn/YarnClusterResourceManager.java b/samza-yarn/src/main/java/org/apache/samza/job/yarn/YarnClusterResourceManager.java
index ae171c7..96a4488 100644
--- a/samza-yarn/src/main/java/org/apache/samza/job/yarn/YarnClusterResourceManager.java
+++ b/samza-yarn/src/main/java/org/apache/samza/job/yarn/YarnClusterResourceManager.java
@@ -63,7 +63,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
 
 public class YarnClusterResourceManager extends ClusterResourceManager implements AMRMClientAsync.CallbackHandler {
 
-  private final int INVALID_YARN_CONTAINER_ID = -1;
+  private final String INVALID_YARN_CONTAINER_ID = "-1";
 
   /**
    * The containerProcessManager instance to request resources from yarn.
@@ -264,8 +264,7 @@ public class YarnClusterResourceManager extends ClusterResourceManager implement
   @Override
   public void launchStreamProcessor(SamzaResource resource, CommandBuilder builder) throws SamzaContainerLaunchException {
     String containerIDStr = builder.buildEnvironment().get(ShellCommandConfig.ENV_CONTAINER_ID());
-    int containerID = Integer.parseInt(containerIDStr);
-    log.info("Received launch request for {} on hostname {}", containerID , resource.getHost());
+    log.info("Received launch request for {} on hostname {}", containerIDStr , resource.getHost());
 
     synchronized (lock) {
       Container container = allocatedResources.get(resource);
@@ -274,8 +273,8 @@ public class YarnClusterResourceManager extends ClusterResourceManager implement
         return;
       }
 
-      state.runningYarnContainers.put(containerID, new YarnContainer(container));
-      yarnContainerRunner.runContainer(containerID, container, builder);
+      state.runningYarnContainers.put(containerIDStr, new YarnContainer(container));
+      yarnContainerRunner.runContainer(containerIDStr, container, builder);
     }
   }
 
@@ -290,10 +289,10 @@ public class YarnClusterResourceManager extends ClusterResourceManager implement
   //In that case, this scan will turn into a lookup. This change will require changes/testing in the UI files because
   //those UI stub templates operate on the YarnContainer object.
 
-  private int getIDForContainer(String lookupContainerId) {
-    int samzaContainerID = INVALID_YARN_CONTAINER_ID;
-    for(Map.Entry<Integer, YarnContainer> entry : state.runningYarnContainers.entrySet()) {
-      Integer key = entry.getKey();
+  private String getIDForContainer(String lookupContainerId) {
+    String samzaContainerID = INVALID_YARN_CONTAINER_ID;
+    for(Map.Entry<String, YarnContainer> entry : state.runningYarnContainers.entrySet()) {
+      String key = entry.getKey();
       YarnContainer yarnContainer = entry.getValue();
       String yarnContainerId = yarnContainer.id().toString();
       if(yarnContainerId.equals(lookupContainerId)) {
@@ -319,7 +318,7 @@ public class YarnClusterResourceManager extends ClusterResourceManager implement
     synchronized (lock) {
       AMRMClient.ContainerRequest containerRequest = requestsMap.get(request);
       if (containerRequest == null) {
-        log.info("Cancellation of {} already done. ", containerRequest);
+        log.info("Cancellation of {} already done. ", request);
         return;
       }
       requestsMap.remove(request);
@@ -386,12 +385,12 @@ public class YarnClusterResourceManager extends ClusterResourceManager implement
       SamzaResourceStatus samzaResrcStatus = new SamzaResourceStatus(status.getContainerId().toString(), status.getDiagnostics(), status.getExitStatus());
       samzaResrcStatuses.add(samzaResrcStatus);
 
-      int completedContainerID = getIDForContainer(status.getContainerId().toString());
+      String completedContainerID = getIDForContainer(status.getContainerId().toString());
       log.info("Completed container had ID: {}", completedContainerID);
 
       //remove the container from the list of running containers, if failed with a non-zero exit code, add it to the list of
       //failed containers.
-      if(completedContainerID != INVALID_YARN_CONTAINER_ID){
+      if(!completedContainerID.equals(INVALID_YARN_CONTAINER_ID)){
         if(state.runningYarnContainers.containsKey(completedContainerID)) {
           log.info("Removing container ID {} from completed containers", completedContainerID);
           state.runningYarnContainers.remove(completedContainerID);

http://git-wip-us.apache.org/repos/asf/samza/blob/a7da1840/samza-yarn/src/main/java/org/apache/samza/job/yarn/YarnContainerRunner.java
----------------------------------------------------------------------
diff --git a/samza-yarn/src/main/java/org/apache/samza/job/yarn/YarnContainerRunner.java b/samza-yarn/src/main/java/org/apache/samza/job/yarn/YarnContainerRunner.java
index c45fc7f..84ded62 100644
--- a/samza-yarn/src/main/java/org/apache/samza/job/yarn/YarnContainerRunner.java
+++ b/samza-yarn/src/main/java/org/apache/samza/job/yarn/YarnContainerRunner.java
@@ -89,7 +89,7 @@ public class YarnContainerRunner {
    * @throws SamzaContainerLaunchException  when there's an exception in submitting the request to the RM.
    *
    */
-  public void runContainer(int samzaContainerId, Container container, CommandBuilder cmdBuilder) throws SamzaContainerLaunchException {
+  public void runContainer(String samzaContainerId, Container container, CommandBuilder cmdBuilder) throws SamzaContainerLaunchException {
     String containerIdStr = ConverterUtils.toString(container.getId());
     log.info("Got available container ID ({}) for container: {}", samzaContainerId, container);
 
@@ -229,7 +229,7 @@ public class YarnContainerRunner {
    * @param samzaContainerId  the Samza container Id for logging purposes.
    * @param env               the Map of environment variables to their respective values.
    */
-  private void printContainerEnvironmentVariables(int samzaContainerId, Map<String, String> env) {
+  private void printContainerEnvironmentVariables(String samzaContainerId, Map<String, String> env) {
     StringBuilder sb = new StringBuilder();
     for (Map.Entry<String, String> entry : env.entrySet()) {
       sb.append(String.format("\n%s=%s", entry.getKey(), entry.getValue()));

http://git-wip-us.apache.org/repos/asf/samza/blob/a7da1840/samza-yarn/src/main/java/org/apache/samza/validation/YarnJobValidationTool.java
----------------------------------------------------------------------
diff --git a/samza-yarn/src/main/java/org/apache/samza/validation/YarnJobValidationTool.java b/samza-yarn/src/main/java/org/apache/samza/validation/YarnJobValidationTool.java
index 313de94..c1b1302 100644
--- a/samza-yarn/src/main/java/org/apache/samza/validation/YarnJobValidationTool.java
+++ b/samza-yarn/src/main/java/org/apache/samza/validation/YarnJobValidationTool.java
@@ -151,9 +151,9 @@ public class YarnJobValidationTool {
   public void validateJmxMetrics() throws Exception {
     JobModelManager jobModelManager = JobModelManager.apply(config);
     validator.init(config);
-    Map<Integer, String> jmxUrls = jobModelManager.jobModel().getAllContainerToHostValues(SetContainerHostMapping.JMX_TUNNELING_URL_KEY);
-    for (Map.Entry<Integer, String> entry : jmxUrls.entrySet()) {
-      Integer containerId = entry.getKey();
+    Map<String, String> jmxUrls = jobModelManager.jobModel().getAllContainerToHostValues(SetContainerHostMapping.JMX_TUNNELING_URL_KEY);
+    for (Map.Entry<String, String> entry : jmxUrls.entrySet()) {
+      String containerId = entry.getKey();
       String jmxUrl = entry.getValue();
       log.info("validate container " + containerId + " metrics with JMX: " + jmxUrl);
       JmxMetricsAccessor jmxMetrics = new JmxMetricsAccessor(jmxUrl);


[2/3] samza git commit: SAMZA-1126 - Semantics of processorId in Samza

Posted by na...@apache.org.
http://git-wip-us.apache.org/repos/asf/samza/blob/a7da1840/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java b/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java
index b88753f..e3e21e9 100644
--- a/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java
+++ b/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java
@@ -19,6 +19,7 @@
 package org.apache.samza.zk;
 
 import org.apache.samza.SamzaException;
+import org.apache.samza.config.ApplicationConfig;
 import org.apache.samza.config.Config;
 import org.apache.samza.config.JavaSystemConfig;
 import org.apache.samza.config.JobCoordinatorConfig;
@@ -28,9 +29,11 @@ import org.apache.samza.coordinator.JobCoordinator;
 import org.apache.samza.coordinator.JobModelManager;
 import org.apache.samza.job.model.JobModel;
 import org.apache.samza.processor.SamzaContainerController;
+import org.apache.samza.runtime.ProcessorIdGenerator;
 import org.apache.samza.system.StreamMetadataCache;
 import org.apache.samza.system.SystemAdmin;
 import org.apache.samza.system.SystemFactory;
+import org.apache.samza.util.ClassLoaderHelper;
 import org.apache.samza.util.SystemClock;
 import org.apache.samza.util.Util;
 import org.slf4j.Logger;
@@ -51,7 +54,8 @@ public class ZkJobCoordinator implements JobCoordinator, ZkControllerListener {
   private static final String JOB_MODEL_VERSION_BARRIER = "JobModelVersion";
 
   private final ZkUtils zkUtils;
-  private final int processorId;
+  private final String processorId;
+
   private final ZkController zkController;
   private final SamzaContainerController containerController;
   private final ScheduleAfterDebounceTime debounceTimer;
@@ -64,14 +68,22 @@ public class ZkJobCoordinator implements JobCoordinator, ZkControllerListener {
   private String newJobModelVersion;  // version published in ZK (by the leader)
   private JobModel jobModel;
 
-  public ZkJobCoordinator(int processorId, String groupId, Config config, ScheduleAfterDebounceTime debounceTimer, ZkUtils zkUtils,
-      SamzaContainerController containerController) {
+  public ZkJobCoordinator(String groupId, Config config, ScheduleAfterDebounceTime debounceTimer, ZkUtils zkUtils,
+                          SamzaContainerController containerController) {
     this.zkUtils = zkUtils;
     this.keyBuilder = zkUtils.getKeyBuilder();
     this.debounceTimer = debounceTimer;
-    this.processorId = processorId;
+    ApplicationConfig appConfig = new ApplicationConfig(config);
+    if (appConfig.getProcessorId() != null) {    // TODO: This check to be removed after 0.13+
+      this.processorId = appConfig.getProcessorId();
+    } else {
+      ProcessorIdGenerator idGenerator =
+          ClassLoaderHelper.fromClassName(
+              new ApplicationConfig(config).getAppProcessorIdGeneratorClass(), ProcessorIdGenerator.class);
+      this.processorId = idGenerator.generateProcessorId(config);
+    }
     this.containerController = containerController;
-    this.zkController = new ZkControllerImpl(String.valueOf(processorId), zkUtils, debounceTimer, this);
+    this.zkController = new ZkControllerImpl(processorId, zkUtils, debounceTimer, this);
     this.config = config;
     this.coordinationUtils = Util.
         <CoordinationServiceFactory>getObj(
@@ -90,7 +102,7 @@ public class ZkJobCoordinator implements JobCoordinator, ZkControllerListener {
       String systemFactoryClassName = systemConfig.getSystemFactory(systemName);
       if (systemFactoryClassName == null) {
         String msg = String.format("A stream uses system %s, which is missing from the configuration.", systemName);
-        log.error(String.format(msg));
+        log.error(msg);
         throw new SamzaException(msg);
       }
       SystemFactory systemFactory = Util.getObj(systemFactoryClassName);
@@ -105,10 +117,6 @@ public class ZkJobCoordinator implements JobCoordinator, ZkControllerListener {
     zkController.register();
   }
 
-  public void cleanupZk() {
-    zkUtils.deleteRoot();
-  }
-
   @Override
   public void stop() {
     zkController.stop();
@@ -123,7 +131,7 @@ public class ZkJobCoordinator implements JobCoordinator, ZkControllerListener {
   }
 
   @Override
-  public int getProcessorId() {
+  public String getProcessorId() {
     return processorId;
   }
 
@@ -204,14 +212,12 @@ public class ZkJobCoordinator implements JobCoordinator, ZkControllerListener {
     }
     log.info("pid=" + processorId + "generating new model. Version = " + nextJMVersion);
 
-    StringBuilder sb = new StringBuilder();
-    List<Integer> containerIds = new ArrayList<>();
+    List<String> containerIds = new ArrayList<>();
     for (String processor : currentProcessors) {
-      String zkProcessorId = keyBuilder.parseIdFromPath(processor);
-      sb.append(zkProcessorId).append(",");
-      containerIds.add(Integer.valueOf(zkProcessorId));
+      String zkProcessorId = ZkKeyBuilder.parseIdFromPath(processor);
+      containerIds.add(zkProcessorId);
     }
-    log.info("generate new job model: processorsIds: " + sb.toString());
+    log.info("generate new job model: processorsIds: " + Arrays.toString(containerIds.toArray()));
 
     jobModel = JobModelManager.readJobModel(this.config, Collections.emptyMap(), null, streamMetadataCache,
         containerIds);

http://git-wip-us.apache.org/repos/asf/samza/blob/a7da1840/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinatorFactory.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinatorFactory.java b/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinatorFactory.java
index 915866d..6206baf 100644
--- a/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinatorFactory.java
+++ b/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinatorFactory.java
@@ -31,26 +31,22 @@ public class ZkJobCoordinatorFactory implements JobCoordinatorFactory {
   /**
    * Method to instantiate an implementation of JobCoordinator
    *
-   * @param processorId Indicates the StreamProcessor's id to which this Job Coordinator is associated with
-   * @param config      Configs relevant for the JobCoordinator TODO: Separate JC related configs into a "JobCoordinatorConfig"
+   * @param config  Configs relevant for the JobCoordinator TODO: Separate JC related configs into a "JobCoordinatorConfig"
    * @return An instance of IJobCoordinator
    */
   @Override
-  public JobCoordinator getJobCoordinator(int processorId, Config config, SamzaContainerController containerController) {
+  public JobCoordinator getJobCoordinator(Config config, SamzaContainerController containerController) {
     JobConfig jobConfig = new JobConfig(config);
     String groupName = String.format("%s-%s", jobConfig.getName().get(), jobConfig.getJobId().get());
     ZkConfig zkConfig = new ZkConfig(config);
-    String processorIdStr = String.valueOf(processorId);
     ScheduleAfterDebounceTime debounceTimer = new ScheduleAfterDebounceTime();
     ZkClient zkClient = new ZkClient(zkConfig.getZkConnect(), zkConfig.getZkSessionTimeoutMs(), zkConfig.getZkConnectionTimeoutMs());
 
     return new ZkJobCoordinator(
-        processorId,
         "groupId",  // TODO: Usage of groupId to be resolved in SAMZA-1173
-        config,
+         config,
         debounceTimer,
         new ZkUtils(
-            processorIdStr,
             new ZkKeyBuilder(groupName),
             zkClient,
             zkConfig.getZkConnectionTimeoutMs()

http://git-wip-us.apache.org/repos/asf/samza/blob/a7da1840/samza-core/src/main/java/org/apache/samza/zk/ZkUtils.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkUtils.java b/samza-core/src/main/java/org/apache/samza/zk/ZkUtils.java
index 7a9b4d5..d77aab2 100644
--- a/samza-core/src/main/java/org/apache/samza/zk/ZkUtils.java
+++ b/samza-core/src/main/java/org/apache/samza/zk/ZkUtils.java
@@ -64,13 +64,11 @@ public class ZkUtils {
   private volatile String ephemeralPath = null;
   private final ZkKeyBuilder keyBuilder;
   private final int connectionTimeoutMs;
-  private final String processorId;
 
-  public ZkUtils(String processorId, ZkKeyBuilder zkKeyBuilder, ZkClient zkClient, int connectionTimeoutMs) {
+  public ZkUtils(ZkKeyBuilder zkKeyBuilder, ZkClient zkClient, int connectionTimeoutMs) {
     this.keyBuilder = zkKeyBuilder;
     this.connectionTimeoutMs = connectionTimeoutMs;
     this.zkClient = zkClient;
-    this.processorId = processorId;
   }
 
   public void connect() throws ZkInterruptedException {
@@ -160,7 +158,7 @@ public class ZkUtils {
     * @param dataListener describe this
     */
   public void subscribeToJobModelVersionChange(IZkDataListener dataListener) {
-    LOG.info("pid=" + processorId + " subscribing for jm version change at:" + keyBuilder.getJobModelVersionPath());
+    LOG.info(" subscribing for jm version change at:" + keyBuilder.getJobModelVersionPath());
     zkClient.subscribeDataChanges(keyBuilder.getJobModelVersionPath(), dataListener);
   }
 
@@ -175,7 +173,7 @@ public class ZkUtils {
     try {
       ObjectMapper mmapper = SamzaObjectMapper.getObjectMapper();
       String jobModelStr = mmapper.writerWithDefaultPrettyPrinter().writeValueAsString(jobModel);
-      LOG.info("pid=" + processorId + " jobModelAsString=" + jobModelStr);
+      LOG.info("jobModelAsString=" + jobModelStr);
       zkClient.createPersistent(keyBuilder.getJobModelPath(jobModelVersion), jobModelStr);
       LOG.info("wrote jobModel path =" + keyBuilder.getJobModelPath(jobModelVersion));
     } catch (Exception e) {
@@ -190,7 +188,7 @@ public class ZkUtils {
    * @return job model for this version
    */
   public JobModel getJobModel(String jobModelVersion) {
-    LOG.info("pid=" + processorId + "read the model ver=" + jobModelVersion + " from " + keyBuilder.getJobModelPath(jobModelVersion));
+    LOG.info("read the model ver=" + jobModelVersion + " from " + keyBuilder.getJobModelPath(jobModelVersion));
     Object data = zkClient.readData(keyBuilder.getJobModelPath(jobModelVersion));
     ObjectMapper mmapper = SamzaObjectMapper.getObjectMapper();
     JobModel jm;
@@ -218,7 +216,7 @@ public class ZkUtils {
   public void publishJobModelVersion(String oldVersion, String newVersion) {
     Stat stat = new Stat();
     String currentVersion = zkClient.<String>readData(keyBuilder.getJobModelVersionPath(), stat);
-    LOG.info("pid=" + processorId + " publishing new version: " + newVersion + "; oldVersion = " + oldVersion + "(" + stat
+    LOG.info("publishing new version: " + newVersion + "; oldVersion = " + oldVersion + "(" + stat
         .getVersion() + ")");
 
     if (currentVersion != null && !currentVersion.equals(oldVersion)) {
@@ -234,9 +232,8 @@ public class ZkUtils {
       LOG.error(msg, e);
       throw new SamzaException(msg);
     }
-    LOG.info("pid=" + processorId +
-        " published new version: " + newVersion + "; expected data version = " + (dataVersion  + 1) + "(actual data version after update = " + stat.getVersion()
-        +    ")");
+    LOG.info("published new version: " + newVersion + "; expected data version = " + (dataVersion  + 1) +
+        "(actual data version after update = " + stat.getVersion() +    ")");
   }
 
 
@@ -257,14 +254,14 @@ public class ZkUtils {
    * @param listener - will be called when a processor is added or removed.
    */
   public void subscribeToProcessorChange(IZkChildListener listener) {
-    LOG.info("pid=" + processorId + " subscribing for child change at:" + keyBuilder.getProcessorsPath());
+    LOG.info("subscribing for child change at:" + keyBuilder.getProcessorsPath());
     zkClient.subscribeChildChanges(keyBuilder.getProcessorsPath(), listener);
   }
 
   public void deleteRoot() {
     String rootPath = keyBuilder.getRootPath();
     if (rootPath != null && !rootPath.isEmpty() && zkClient.exists(rootPath)) {
-      LOG.info("pid=" + processorId + " Deleteing root: " + rootPath);
+      LOG.info("Deleteing root: " + rootPath);
       zkClient.deleteRecursive(rootPath);
     }
   }

http://git-wip-us.apache.org/repos/asf/samza/blob/a7da1840/samza-core/src/main/scala/org/apache/samza/config/ShellCommandConfig.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/config/ShellCommandConfig.scala b/samza-core/src/main/scala/org/apache/samza/config/ShellCommandConfig.scala
index f505322..1397ed5 100644
--- a/samza-core/src/main/scala/org/apache/samza/config/ShellCommandConfig.scala
+++ b/samza-core/src/main/scala/org/apache/samza/config/ShellCommandConfig.scala
@@ -26,8 +26,7 @@ object ShellCommandConfig {
   val ENV_COORDINATOR_SYSTEM_CONFIG = "SAMZA_COORDINATOR_SYSTEM_CONFIG"
 
   /**
-   * The ID for a container. This is an integer number between 0 and
-   * &lt;number of containers&gt;.
+   * The ID for a container. This is a string representation that is unique to the runtime environment.
    */
   val ENV_CONTAINER_ID = "SAMZA_CONTAINER_ID"
 

http://git-wip-us.apache.org/repos/asf/samza/blob/a7da1840/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
index 96a337c..aba0d17 100644
--- a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
+++ b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
@@ -81,7 +81,7 @@ object SamzaContainer extends Logging {
   val DEFAULT_READ_JOBMODEL_DELAY_MS = 100
   val DISK_POLL_INTERVAL_KEY = "container.disk.poll.interval.ms"
 
-  def getLocalityManager(containerId: Int, config: Config): LocalityManager = {
+  def getLocalityManager(containerId: String, config: Config): LocalityManager = {
     val containerName = getSamzaContainerName(containerId)
     val registryMap = new MetricsRegistryMap(containerName)
     val coordinatorSystemProducer =
@@ -108,12 +108,12 @@ object SamzaContainer extends Logging {
         classOf[JobModel])
   }
 
-  def getSamzaContainerName(containerId: Int): String = {
-    "samza-container-%d" format containerId
+  def getSamzaContainerName(containerId: String): String = {
+    "samza-container-%s" format containerId
   }
 
   def apply(
-    containerId: Int,
+    containerId: String,
     containerModel: ContainerModel,
     config: Config,
     maxChangeLogStreamPartitions: Int,

http://git-wip-us.apache.org/repos/asf/samza/blob/a7da1840/samza-core/src/main/scala/org/apache/samza/coordinator/JobModelManager.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/coordinator/JobModelManager.scala b/samza-core/src/main/scala/org/apache/samza/coordinator/JobModelManager.scala
index 5e4677f..e39ea3b 100644
--- a/samza-core/src/main/scala/org/apache/samza/coordinator/JobModelManager.scala
+++ b/samza-core/src/main/scala/org/apache/samza/coordinator/JobModelManager.scala
@@ -151,7 +151,7 @@ object JobModelManager extends Logging {
                                 localityManager: LocalityManager,
                                 streamMetadataCache: StreamMetadataCache,
                                 streamPartitionCountMonitor: StreamPartitionCountMonitor,
-                                containerIds: java.util.List[Integer]) = {
+                                containerIds: java.util.List[String]) = {
     val jobModel: JobModel = readJobModel(config, changeLogMapping, localityManager, streamMetadataCache, containerIds)
     jobModelRef.set(jobModel)
 
@@ -219,7 +219,7 @@ object JobModelManager extends Logging {
                    changeLogPartitionMapping: util.Map[TaskName, Integer],
                    localityManager: LocalityManager,
                    streamMetadataCache: StreamMetadataCache,
-                   containerIds: java.util.List[Integer]): JobModel = {
+                   containerIds: java.util.List[String]): JobModel = {
     // Do grouping to fetch TaskName to SSP mapping
     val allSystemStreamPartitions = getMatchedInputStreamPartitions(config, streamMetadataCache)
     val grouper = getSystemStreamPartitionGrouper(config)
@@ -258,7 +258,7 @@ object JobModelManager extends Logging {
         case _ => containerGrouper.group(taskModels.asJava, containerIds)
       }
     }
-    val containerMap = containerModels.asScala.map { case (containerModel) => Integer.valueOf(containerModel.getContainerId) -> containerModel }.toMap
+    val containerMap = containerModels.asScala.map { case (containerModel) => containerModel.getProcessorId -> containerModel }.toMap
 
     new JobModel(config, containerMap.asJava, localityManager)
   }

http://git-wip-us.apache.org/repos/asf/samza/blob/a7da1840/samza-core/src/main/scala/org/apache/samza/job/local/ProcessJobFactory.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/job/local/ProcessJobFactory.scala b/samza-core/src/main/scala/org/apache/samza/job/local/ProcessJobFactory.scala
index 475df52..7a31567 100644
--- a/samza-core/src/main/scala/org/apache/samza/job/local/ProcessJobFactory.scala
+++ b/samza-core/src/main/scala/org/apache/samza/job/local/ProcessJobFactory.scala
@@ -64,7 +64,7 @@ class ProcessJobFactory extends StreamJobFactory with Logging {
 
     commandBuilder
             .setConfig(config)
-            .setId(0)
+            .setId("0")
             .setUrl(coordinator.server.getUrl)
             .setCommandPath(fwkPath)
 

http://git-wip-us.apache.org/repos/asf/samza/blob/a7da1840/samza-core/src/main/scala/org/apache/samza/job/local/ThreadJobFactory.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/job/local/ThreadJobFactory.scala b/samza-core/src/main/scala/org/apache/samza/job/local/ThreadJobFactory.scala
index f218543..dcef3af 100644
--- a/samza-core/src/main/scala/org/apache/samza/job/local/ThreadJobFactory.scala
+++ b/samza-core/src/main/scala/org/apache/samza/job/local/ThreadJobFactory.scala
@@ -42,10 +42,10 @@ class ThreadJobFactory extends StreamJobFactory with Logging {
     info("Creating a ThreadJob, which is only meant for debugging.")
     val coordinator = JobModelManager(config)
     val jobModel = coordinator.jobModel
-    val containerModel = jobModel.getContainers.get(0)
+    val containerModel = jobModel.getContainers.get("0")
     val jmxServer = new JmxServer
     val streamApp = TaskFactoryUtil.createStreamApplication(config)
-    val appRunner = new LocalContainerRunner(jobModel, 0)
+    val appRunner = new LocalContainerRunner(jobModel, "0")
     val taskFactory = TaskFactoryUtil.createTaskFactory(config, streamApp, appRunner)
 
     // Give developers a nice friendly warning if they've specified task.opts and are using a threaded job.
@@ -58,7 +58,7 @@ class ThreadJobFactory extends StreamJobFactory with Logging {
       coordinator.start
       new ThreadJob(
             SamzaContainer(
-              containerModel.getContainerId,
+              containerModel.getProcessorId,
               containerModel,
               config,
               jobModel.maxChangeLogStreamPartitions,

http://git-wip-us.apache.org/repos/asf/samza/blob/a7da1840/samza-core/src/test/java/org/apache/samza/clustermanager/MockContainerAllocator.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/clustermanager/MockContainerAllocator.java b/samza-core/src/test/java/org/apache/samza/clustermanager/MockContainerAllocator.java
index 6189fe7..109ed47 100644
--- a/samza-core/src/test/java/org/apache/samza/clustermanager/MockContainerAllocator.java
+++ b/samza-core/src/test/java/org/apache/samza/clustermanager/MockContainerAllocator.java
@@ -34,7 +34,7 @@ public class MockContainerAllocator extends ContainerAllocator {
   }
 
   @Override
-  public void requestResources(Map<Integer, String> containerToHostMappings) {
+  public void requestResources(Map<String, String> containerToHostMappings) {
     requestedContainers += containerToHostMappings.size();
     super.requestResources(containerToHostMappings);
   }

http://git-wip-us.apache.org/repos/asf/samza/blob/a7da1840/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerAllocator.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerAllocator.java b/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerAllocator.java
index 5351bc3..989b82a 100644
--- a/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerAllocator.java
+++ b/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerAllocator.java
@@ -98,10 +98,10 @@ public class TestContainerAllocator {
     //That way it becomes easier to mock objects. Save it for later.
 
     HttpServer server = new MockHttpServer("/", 7777, null, new ServletHolder(DefaultServlet.class));
-    Map<Integer, ContainerModel> containers = new java.util.HashMap<>();
+    Map<String, ContainerModel> containers = new java.util.HashMap<>();
     for (int i = 0; i < containerCount; i++) {
-      ContainerModel container = new ContainerModel(i, new HashMap<TaskName, TaskModel>());
-      containers.put(i, container);
+      ContainerModel container = new ContainerModel(String.valueOf(i), i, new HashMap<TaskName, TaskModel>());
+      containers.put(String.valueOf(i), container);
     }
     JobModel jobModel = new JobModel(getConfig(), containers);
     return new JobModelManager(jobModel, server, null);
@@ -130,12 +130,12 @@ public class TestContainerAllocator {
    */
   @Test
   public void testRequestContainers() throws Exception {
-    Map<Integer, String> containersToHostMapping = new HashMap<Integer, String>() {
+    Map<String, String> containersToHostMapping = new HashMap<String, String>() {
       {
-        put(0, "abc");
-        put(1, "def");
-        put(2, null);
-        put(3, "abc");
+        put("0", "abc");
+        put("1", "def");
+        put("2", null);
+        put("3", "abc");
       }
     };
 
@@ -160,9 +160,9 @@ public class TestContainerAllocator {
   @Test
   public void testRequestContainersWithNoMapping() throws Exception {
     int containerCount = 4;
-    Map<Integer, String> containersToHostMapping = new HashMap<Integer, String>();
+    Map<String, String> containersToHostMapping = new HashMap<String, String>();
     for (int i = 0; i < containerCount; i++) {
-      containersToHostMapping.put(i, null);
+      containersToHostMapping.put(String.valueOf(i), null);
     }
     allocatorThread.start();
 
@@ -208,7 +208,7 @@ public class TestContainerAllocator {
 
     allocatorThread.start();
 
-    containerAllocator.requestResource(0, "abc");
+    containerAllocator.requestResource("0", "abc");
 
     containerAllocator.addResource(resource);
     containerAllocator.addResource(resource1);
@@ -245,11 +245,11 @@ public class TestContainerAllocator {
             assertEquals(2, requestState.assignedRequests.size());
 
             SamzaResourceRequest request = requestState.assignedRequests.remove();
-            assertEquals(0, request.getContainerID());
+            assertEquals("0", request.getContainerID());
             assertEquals("2", request.getPreferredHost());
 
             request = requestState.assignedRequests.remove();
-            assertEquals(0, request.getContainerID());
+            assertEquals("0", request.getContainerID());
             assertEquals("ANY_HOST", request.getPreferredHost());
 
             // This routine should be called after the retry is assigned, but before it's started.
@@ -261,7 +261,7 @@ public class TestContainerAllocator {
     state.neededContainers.set(1);
     requestState.registerContainerListener(listener);
 
-    containerAllocator.requestResource(0, "2");
+    containerAllocator.requestResource("0", "2");
     containerAllocator.addResource(container);
     containerAllocator.addResource(container1);
     allocatorThread.start();

http://git-wip-us.apache.org/repos/asf/samza/blob/a7da1840/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerProcessManager.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerProcessManager.java b/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerProcessManager.java
index 0d61814..660012e 100644
--- a/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerProcessManager.java
+++ b/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerProcessManager.java
@@ -87,15 +87,14 @@ public class TestContainerProcessManager {
 
   private SamzaApplicationState state = null;
 
-
   private JobModelManager getCoordinator(int containerCount) {
-    Map<Integer, ContainerModel> containers = new java.util.HashMap<>();
+    Map<String, ContainerModel> containers = new java.util.HashMap<>();
     for (int i = 0; i < containerCount; i++) {
-      ContainerModel container = new ContainerModel(i, new HashMap<TaskName, TaskModel>());
-      containers.put(i, container);
+      ContainerModel container = new ContainerModel(String.valueOf(i), i, new HashMap<TaskName, TaskModel>());
+      containers.put(String.valueOf(i), container);
     }
-    Map<Integer, Map<String, String>> localityMap = new HashMap<>();
-    localityMap.put(0, new HashMap<String, String>() { {
+    Map<String, Map<String, String>> localityMap = new HashMap<>();
+    localityMap.put("0", new HashMap<String, String>() { {
         put(SetContainerHostMapping.HOST_KEY, "abc");
       }
     });
@@ -105,9 +104,7 @@ public class TestContainerProcessManager {
     JobModel jobModel = new JobModel(getConfig(), containers, mockLocalityManager);
     JobModelManager.jobModelRef().getAndSet(jobModel);
 
-    JobModelManager reader = new JobModelManager(jobModel, this.server, null);
-
-    return reader;
+    return new JobModelManager(jobModel, this.server, null);
   }
 
   @Before
@@ -137,7 +134,8 @@ public class TestContainerProcessManager {
         manager
     );
 
-    AbstractContainerAllocator allocator = (AbstractContainerAllocator) getPrivateFieldFromTaskManager("containerAllocator", taskManager).get(taskManager);
+    AbstractContainerAllocator allocator =
+        (AbstractContainerAllocator) getPrivateFieldFromTaskManager("containerAllocator", taskManager).get(taskManager);
     assertEquals(ContainerAllocator.class, allocator.getClass());
     // Asserts that samza exposed container configs is honored by allocator thread
     assertEquals(500, allocator.containerMemoryMb);
@@ -155,7 +153,8 @@ public class TestContainerProcessManager {
         manager
     );
 
-    allocator = (AbstractContainerAllocator) getPrivateFieldFromTaskManager("containerAllocator", taskManager).get(taskManager);
+    allocator =
+        (AbstractContainerAllocator) getPrivateFieldFromTaskManager("containerAllocator", taskManager).get(taskManager);
     assertEquals(HostAwareContainerAllocator.class, allocator.getClass());
     // Asserts that samza exposed container configs is honored by allocator thread
     assertEquals(500, allocator.containerMemoryMb);
@@ -244,6 +243,7 @@ public class TestContainerProcessManager {
     assertTrue(taskManager.shouldShutdown());
   }
 
+
   /**
    * Test Task Manager should request a new container when a task fails with unknown exit code
    * When host-affinity is not enabled, it will always request for ANY_HOST
@@ -317,6 +317,7 @@ public class TestContainerProcessManager {
     taskManager.stop();
   }
 
+
   /**
    * Test AM requests a new container when a task fails
    * Error codes with same behavior - Disk failure, preemption and aborted
@@ -416,8 +417,6 @@ public class TestContainerProcessManager {
     taskManager1.onResourceAllocated(container2);
   }
 
-
-
   @After
   public void teardown() {
     server.stop();

http://git-wip-us.apache.org/repos/asf/samza/blob/a7da1840/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerRequestState.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerRequestState.java b/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerRequestState.java
index 7a514e8..3d52510 100644
--- a/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerRequestState.java
+++ b/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerRequestState.java
@@ -40,7 +40,7 @@ public class TestContainerRequestState {
   public void testUpdateRequestState() {
     // Host-affinity is enabled
     ResourceRequestState state = new ResourceRequestState(true, manager);
-    SamzaResourceRequest request = new SamzaResourceRequest(1, 1024, "abc", 0);
+    SamzaResourceRequest request = new SamzaResourceRequest(1, 1024, "abc", "0");
     state.addResourceRequest(request);
 
     assertNotNull(manager.resourceRequests);
@@ -57,7 +57,7 @@ public class TestContainerRequestState {
 
     // Host-affinity is not enabled
     ResourceRequestState state1 = new ResourceRequestState(false, manager);
-    SamzaResourceRequest request1 = new SamzaResourceRequest(1, 1024, null, 1);
+    SamzaResourceRequest request1 = new SamzaResourceRequest(1, 1024, null, "1");
     state1.addResourceRequest(request1);
 
     assertNotNull(manager.resourceRequests);
@@ -71,7 +71,6 @@ public class TestContainerRequestState {
 
   }
 
-
   /**
    * Test addContainer() updates the state correctly
    */
@@ -102,7 +101,7 @@ public class TestContainerRequestState {
     assertEquals(container1, state1.getResourcesOnAHost(ANY_HOST).get(0));
 
     // Container Allocated on a Requested Host
-    state1.addResourceRequest(new SamzaResourceRequest(1, 1024, "abc", 0));
+    state1.addResourceRequest(new SamzaResourceRequest(1, 1024, "abc", "0"));
 
     assertEquals(1, state1.numPendingRequests());
 
@@ -143,9 +142,9 @@ public class TestContainerRequestState {
   public void testContainerAssignment() throws Exception {
     // Host-affinity enabled
     ResourceRequestState state = new ResourceRequestState(true, manager);
-    SamzaResourceRequest request = new SamzaResourceRequest(1, 1024, "abc", 0);
+    SamzaResourceRequest request = new SamzaResourceRequest(1, 1024, "abc", "0");
 
-    SamzaResourceRequest request1 = new SamzaResourceRequest(1, 1024, "def", 0);
+    SamzaResourceRequest request1 = new SamzaResourceRequest(1, 1024, "def", "0");
 
     state.addResourceRequest(request);
     state.addResourceRequest(request1);
@@ -194,5 +193,4 @@ public class TestContainerRequestState {
 
   }
 
-
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/a7da1840/samza-core/src/test/java/org/apache/samza/clustermanager/TestHostAwareContainerAllocator.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/clustermanager/TestHostAwareContainerAllocator.java b/samza-core/src/test/java/org/apache/samza/clustermanager/TestHostAwareContainerAllocator.java
index b6651f2..83d31e2 100644
--- a/samza-core/src/test/java/org/apache/samza/clustermanager/TestHostAwareContainerAllocator.java
+++ b/samza-core/src/test/java/org/apache/samza/clustermanager/TestHostAwareContainerAllocator.java
@@ -65,16 +65,15 @@ public class TestHostAwareContainerAllocator {
     allocatorThread = new Thread(containerAllocator);
   }
 
-
   /**
    * Test request containers with no containerToHostMapping makes the right number of requests
    */
   @Test
   public void testRequestContainersWithNoMapping() throws Exception {
     int containerCount = 4;
-    Map<Integer, String> containersToHostMapping = new HashMap<Integer, String>();
+    Map<String, String> containersToHostMapping = new HashMap<String, String>();
     for (int i = 0; i < containerCount; i++) {
-      containersToHostMapping.put(i, null);
+      containersToHostMapping.put(String.valueOf(i), null);
     }
 
     allocatorThread.start();
@@ -95,10 +94,10 @@ public class TestHostAwareContainerAllocator {
    */
   @Test
   public void testAddContainerWithHostAffinity() throws Exception {
-    containerAllocator.requestResources(new HashMap<Integer, String>() {
+    containerAllocator.requestResources(new HashMap<String, String>() {
       {
-        put(0, "abc");
-        put(1, "xyz");
+        put("0", "abc");
+        put("1", "xyz");
       }
     });
 
@@ -153,7 +152,7 @@ public class TestHostAwareContainerAllocator {
 
     allocatorThread.start();
 
-    containerAllocator.requestResource(0, "abc");
+    containerAllocator.requestResource("0", "abc");
 
     containerAllocator.addResource(resource0);
     containerAllocator.addResource(resource1);
@@ -162,17 +161,14 @@ public class TestHostAwareContainerAllocator {
     listener.verify();
   }
 
-
-
-
   @Test
   public void testRequestContainers() throws Exception {
-    Map<Integer, String> containersToHostMapping = new HashMap<Integer, String>() {
+    Map<String, String> containersToHostMapping = new HashMap<String, String>() {
       {
-        put(0, "abc");
-        put(1, "def");
-        put(2, null);
-        put(3, "abc");
+        put("0", "abc");
+        put("1", "def");
+        put("2", null);
+        put("3", "abc");
       }
     };
 
@@ -221,11 +217,11 @@ public class TestHostAwareContainerAllocator {
             assertEquals(2, requestState.assignedRequests.size());
 
             SamzaResourceRequest request = requestState.assignedRequests.remove();
-            assertEquals(0, request.getContainerID());
+            assertEquals("0", request.getContainerID());
             assertEquals("2", request.getPreferredHost());
 
             request = requestState.assignedRequests.remove();
-            assertEquals(0, request.getContainerID());
+            assertEquals("0", request.getContainerID());
             assertEquals("ANY_HOST", request.getPreferredHost());
 
             // This routine should be called after the retry is assigned, but before it's started.
@@ -238,7 +234,7 @@ public class TestHostAwareContainerAllocator {
     requestState.registerContainerListener(listener);
 
     // Only request 1 container and we should see 2 assignments in the assertions above (because of the retry)
-    containerAllocator.requestResource(0, "2");
+    containerAllocator.requestResource("0", "2");
     containerAllocator.addResource(container1);
     containerAllocator.addResource(container);
 
@@ -257,10 +253,10 @@ public class TestHostAwareContainerAllocator {
     final SamzaResource resource0 = new SamzaResource(1, 1000, "xyz", "id1");
     final SamzaResource resource1 = new SamzaResource(1, 1000, "zzz", "id2");
 
-    Map<Integer, String> containersToHostMapping = new HashMap<Integer, String>() {
+    Map<String, String> containersToHostMapping = new HashMap<String, String>() {
       {
-        put(0, "abc");
-        put(1, "def");
+        put("0", "abc");
+        put("1", "def");
       }
     };
     containerAllocator.requestResources(containersToHostMapping);
@@ -315,7 +311,6 @@ public class TestHostAwareContainerAllocator {
     containerAllocator.stop();
   }
 
-
   private static Config getConfig() {
     Config config = new MapConfig(new HashMap<String, String>() {
       {
@@ -344,10 +339,10 @@ public class TestHostAwareContainerAllocator {
     //That way it becomes easier to mock objects. Save it for later.
 
     HttpServer server = new MockHttpServer("/", 7777, null, new ServletHolder(DefaultServlet.class));
-    Map<Integer, ContainerModel> containers = new java.util.HashMap<>();
+    Map<String, ContainerModel> containers = new java.util.HashMap<>();
     for (int i = 0; i < containerCount; i++) {
-      ContainerModel container = new ContainerModel(i, new HashMap<TaskName, TaskModel>());
-      containers.put(i, container);
+      ContainerModel container = new ContainerModel(String.valueOf(i), i, new HashMap<TaskName, TaskModel>());
+      containers.put(String.valueOf(i), container);
     }
     JobModel jobModel = new JobModel(getConfig(), containers);
     return new JobModelManager(jobModel, server, null);

http://git-wip-us.apache.org/repos/asf/samza/blob/a7da1840/samza-core/src/test/java/org/apache/samza/container/TestLocalityManager.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/container/TestLocalityManager.java b/samza-core/src/test/java/org/apache/samza/container/TestLocalityManager.java
index 5341141..07f721d 100644
--- a/samza-core/src/test/java/org/apache/samza/container/TestLocalityManager.java
+++ b/samza-core/src/test/java/org/apache/samza/container/TestLocalityManager.java
@@ -84,12 +84,12 @@ public class TestLocalityManager {
     assertTrue(producer.isStarted());
     assertTrue(consumer.isStarted());
 
-    localityManager.writeContainerToHostMapping(0, "localhost", "jmx:localhost:8080", "jmx:tunnel:localhost:9090");
-    Map<Integer, Map<String, String>> localMap = localityManager.readContainerLocality();
-    Map<Integer, Map<String, String>> expectedMap =
-      new HashMap<Integer, Map<String, String>>() {
+    localityManager.writeContainerToHostMapping("0", "localhost", "jmx:localhost:8080", "jmx:tunnel:localhost:9090");
+    Map<String, Map<String, String>> localMap = localityManager.readContainerLocality();
+    Map<String, Map<String, String>> expectedMap =
+      new HashMap<String, Map<String, String>>() {
         {
-          this.put(new Integer(0),
+          this.put("0",
             new HashMap<String, String>() {
               {
                 this.put(SetContainerHostMapping.HOST_KEY, "localhost");
@@ -118,7 +118,7 @@ public class TestLocalityManager {
     localityManager.start();
     assertTrue(producer.isStarted());
 
-    localityManager.writeContainerToHostMapping(1, "localhost", "jmx:localhost:8181", "jmx:tunnel:localhost:9191");
+    localityManager.writeContainerToHostMapping("1", "localhost", "jmx:localhost:8181", "jmx:tunnel:localhost:9191");
     try {
       localityManager.readContainerLocality();
       fail("Should have thrown UnsupportedOperationException");

http://git-wip-us.apache.org/repos/asf/samza/blob/a7da1840/samza-core/src/test/java/org/apache/samza/container/grouper/task/TestGroupByContainerCount.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/container/grouper/task/TestGroupByContainerCount.java b/samza-core/src/test/java/org/apache/samza/container/grouper/task/TestGroupByContainerCount.java
index 3fd39d7..de4de7c 100644
--- a/samza-core/src/test/java/org/apache/samza/container/grouper/task/TestGroupByContainerCount.java
+++ b/samza-core/src/test/java/org/apache/samza/container/grouper/task/TestGroupByContainerCount.java
@@ -18,16 +18,20 @@
  */
 package org.apache.samza.container.grouper.task;
 
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Set;
+import org.apache.samza.SamzaException;
 import org.apache.samza.container.LocalityManager;
 import org.apache.samza.job.model.ContainerModel;
 import org.apache.samza.job.model.TaskModel;
 import org.junit.Before;
 import org.junit.Test;
 
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+
 import static org.apache.samza.container.mock.ContainerMocks.generateTaskContainerMapping;
 import static org.apache.samza.container.mock.ContainerMocks.generateTaskModels;
 import static org.apache.samza.container.mock.ContainerMocks.getTaskModel;
@@ -35,12 +39,16 @@ import static org.apache.samza.container.mock.ContainerMocks.getTaskName;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
-import static org.mockito.Mockito.*;
+import static org.mockito.Mockito.anyCollection;
+import static org.mockito.Mockito.anyString;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
 
 public class TestGroupByContainerCount {
   private TaskAssignmentManager taskAssignmentManager;
   private LocalityManager localityManager;
-
   @Before
   public void setup() {
     taskAssignmentManager = mock(TaskAssignmentManager.class);
@@ -73,18 +81,18 @@ public class TestGroupByContainerCount {
 
     Set<ContainerModel> containers = new GroupByContainerCount(2).group(taskModels);
 
-    Map<Integer, ContainerModel> containersMap = new HashMap<>();
+    Map<String, ContainerModel> containersMap = new HashMap<>();
     for (ContainerModel container : containers) {
-      containersMap.put(container.getContainerId(), container);
+      containersMap.put(container.getProcessorId(), container);
     }
 
     assertEquals(2, containers.size());
-    ContainerModel container0 = containersMap.get(0);
-    ContainerModel container1 = containersMap.get(1);
+    ContainerModel container0 = containersMap.get("0");
+    ContainerModel container1 = containersMap.get("1");
     assertNotNull(container0);
     assertNotNull(container1);
-    assertEquals(0, container0.getContainerId());
-    assertEquals(1, container1.getContainerId());
+    assertEquals("0", container0.getProcessorId());
+    assertEquals("1", container1.getProcessorId());
     assertEquals(3, container0.getTasks().size());
     assertEquals(2, container1.getTasks().size());
     assertTrue(container0.getTasks().containsKey(getTaskName(0)));
@@ -100,18 +108,18 @@ public class TestGroupByContainerCount {
 
     Set<ContainerModel> containers = new GroupByContainerCount(2).group(taskModels);
 
-    Map<Integer, ContainerModel> containersMap = new HashMap<>();
+    Map<String, ContainerModel> containersMap = new HashMap<>();
     for (ContainerModel container : containers) {
-      containersMap.put(container.getContainerId(), container);
+      containersMap.put(container.getProcessorId(), container);
     }
 
     assertEquals(2, containers.size());
-    ContainerModel container0 = containersMap.get(0);
-    ContainerModel container1 = containersMap.get(1);
+    ContainerModel container0 = containersMap.get("0");
+    ContainerModel container1 = containersMap.get("1");
     assertNotNull(container0);
     assertNotNull(container1);
-    assertEquals(0, container0.getContainerId());
-    assertEquals(1, container1.getContainerId());
+    assertEquals("0", container0.getProcessorId());
+    assertEquals("1", container1.getProcessorId());
     assertEquals(11, container0.getTasks().size());
     assertEquals(10, container1.getTasks().size());
 
@@ -167,27 +175,27 @@ public class TestGroupByContainerCount {
   public void testBalancerAfterContainerIncrease() {
     Set<TaskModel> taskModels = generateTaskModels(9);
     Set<ContainerModel> prevContainers = new GroupByContainerCount(2).group(taskModels);
-    Map<String, Integer> prevTaskToContainerMapping = generateTaskContainerMapping(prevContainers);
+    Map<String, String> prevTaskToContainerMapping = generateTaskContainerMapping(prevContainers);
     when(taskAssignmentManager.readTaskAssignment()).thenReturn(prevTaskToContainerMapping);
 
     Set<ContainerModel> containers = new GroupByContainerCount(4).balance(taskModels, localityManager);
 
-    Map<Integer, ContainerModel> containersMap = new HashMap<>();
+    Map<String, ContainerModel> containersMap = new HashMap<>();
     for (ContainerModel container : containers) {
-      containersMap.put(container.getContainerId(), container);
+      containersMap.put(container.getProcessorId(), container);
     }
 
     assertEquals(4, containers.size());
-    ContainerModel container0 = containersMap.get(0);
-    ContainerModel container1 = containersMap.get(1);
-    ContainerModel container2 = containersMap.get(2);
-    ContainerModel container3 = containersMap.get(3);
+    ContainerModel container0 = containersMap.get("0");
+    ContainerModel container1 = containersMap.get("1");
+    ContainerModel container2 = containersMap.get("2");
+    ContainerModel container3 = containersMap.get("3");
     assertNotNull(container0);
     assertNotNull(container1);
     assertNotNull(container2);
     assertNotNull(container3);
-    assertEquals(0, container0.getContainerId());
-    assertEquals(1, container1.getContainerId());
+    assertEquals("0", container0.getProcessorId());
+    assertEquals("1", container1.getProcessorId());
     assertEquals(3, container0.getTasks().size());
     assertEquals(2, container1.getTasks().size());
     assertEquals(2, container2.getTasks().size());
@@ -207,18 +215,18 @@ public class TestGroupByContainerCount {
     assertTrue(container3.getTasks().containsKey(getTaskName(7)));
 
     // Verify task mappings are saved
-    verify(taskAssignmentManager).writeTaskContainerMapping(getTaskName(0).getTaskName(), 0);
-    verify(taskAssignmentManager).writeTaskContainerMapping(getTaskName(2).getTaskName(), 0);
-    verify(taskAssignmentManager).writeTaskContainerMapping(getTaskName(4).getTaskName(), 0);
+    verify(taskAssignmentManager).writeTaskContainerMapping(getTaskName(0).getTaskName(), "0");
+    verify(taskAssignmentManager).writeTaskContainerMapping(getTaskName(2).getTaskName(), "0");
+    verify(taskAssignmentManager).writeTaskContainerMapping(getTaskName(4).getTaskName(), "0");
 
-    verify(taskAssignmentManager).writeTaskContainerMapping(getTaskName(1).getTaskName(), 1);
-    verify(taskAssignmentManager).writeTaskContainerMapping(getTaskName(3).getTaskName(), 1);
+    verify(taskAssignmentManager).writeTaskContainerMapping(getTaskName(1).getTaskName(), "1");
+    verify(taskAssignmentManager).writeTaskContainerMapping(getTaskName(3).getTaskName(), "1");
 
-    verify(taskAssignmentManager).writeTaskContainerMapping(getTaskName(8).getTaskName(), 2);
-    verify(taskAssignmentManager).writeTaskContainerMapping(getTaskName(6).getTaskName(), 2);
+    verify(taskAssignmentManager).writeTaskContainerMapping(getTaskName(8).getTaskName(), "2");
+    verify(taskAssignmentManager).writeTaskContainerMapping(getTaskName(6).getTaskName(), "2");
 
-    verify(taskAssignmentManager).writeTaskContainerMapping(getTaskName(5).getTaskName(), 3);
-    verify(taskAssignmentManager).writeTaskContainerMapping(getTaskName(7).getTaskName(), 3);
+    verify(taskAssignmentManager).writeTaskContainerMapping(getTaskName(5).getTaskName(), "3");
+    verify(taskAssignmentManager).writeTaskContainerMapping(getTaskName(7).getTaskName(), "3");
 
     verify(taskAssignmentManager, never()).deleteTaskContainerMappings(anyCollection());
   }
@@ -249,23 +257,23 @@ public class TestGroupByContainerCount {
   public void testBalancerAfterContainerDecrease() {
     Set<TaskModel> taskModels = generateTaskModels(9);
     Set<ContainerModel> prevContainers = new GroupByContainerCount(4).group(taskModels);
-    Map<String, Integer> prevTaskToContainerMapping = generateTaskContainerMapping(prevContainers);
+    Map<String, String> prevTaskToContainerMapping = generateTaskContainerMapping(prevContainers);
     when(taskAssignmentManager.readTaskAssignment()).thenReturn(prevTaskToContainerMapping);
 
     Set<ContainerModel> containers = new GroupByContainerCount(2).balance(taskModels, localityManager);
 
-    Map<Integer, ContainerModel> containersMap = new HashMap<>();
+    Map<String, ContainerModel> containersMap = new HashMap<>();
     for (ContainerModel container : containers) {
-      containersMap.put(container.getContainerId(), container);
+      containersMap.put(container.getProcessorId(), container);
     }
 
     assertEquals(2, containers.size());
-    ContainerModel container0 = containersMap.get(0);
-    ContainerModel container1 = containersMap.get(1);
+    ContainerModel container0 = containersMap.get("0");
+    ContainerModel container1 = containersMap.get("1");
     assertNotNull(container0);
     assertNotNull(container1);
-    assertEquals(0, container0.getContainerId());
-    assertEquals(1, container1.getContainerId());
+    assertEquals("0", container0.getProcessorId());
+    assertEquals("1", container1.getProcessorId());
     assertEquals(5, container0.getTasks().size());
     assertEquals(4, container1.getTasks().size());
 
@@ -284,16 +292,16 @@ public class TestGroupByContainerCount {
     assertTrue(container1.getTasks().containsKey(getTaskName(3)));
 
     // Verify task mappings are saved
-    verify(taskAssignmentManager).writeTaskContainerMapping(getTaskName(0).getTaskName(), 0);
-    verify(taskAssignmentManager).writeTaskContainerMapping(getTaskName(4).getTaskName(), 0);
-    verify(taskAssignmentManager).writeTaskContainerMapping(getTaskName(8).getTaskName(), 0);
-    verify(taskAssignmentManager).writeTaskContainerMapping(getTaskName(6).getTaskName(), 0);
-    verify(taskAssignmentManager).writeTaskContainerMapping(getTaskName(2).getTaskName(), 0);
+    verify(taskAssignmentManager).writeTaskContainerMapping(getTaskName(0).getTaskName(), "0");
+    verify(taskAssignmentManager).writeTaskContainerMapping(getTaskName(4).getTaskName(), "0");
+    verify(taskAssignmentManager).writeTaskContainerMapping(getTaskName(8).getTaskName(), "0");
+    verify(taskAssignmentManager).writeTaskContainerMapping(getTaskName(6).getTaskName(), "0");
+    verify(taskAssignmentManager).writeTaskContainerMapping(getTaskName(2).getTaskName(), "0");
 
-    verify(taskAssignmentManager).writeTaskContainerMapping(getTaskName(1).getTaskName(), 1);
-    verify(taskAssignmentManager).writeTaskContainerMapping(getTaskName(5).getTaskName(), 1);
-    verify(taskAssignmentManager).writeTaskContainerMapping(getTaskName(7).getTaskName(), 1);
-    verify(taskAssignmentManager).writeTaskContainerMapping(getTaskName(3).getTaskName(), 1);
+    verify(taskAssignmentManager).writeTaskContainerMapping(getTaskName(1).getTaskName(), "1");
+    verify(taskAssignmentManager).writeTaskContainerMapping(getTaskName(5).getTaskName(), "1");
+    verify(taskAssignmentManager).writeTaskContainerMapping(getTaskName(7).getTaskName(), "1");
+    verify(taskAssignmentManager).writeTaskContainerMapping(getTaskName(3).getTaskName(), "1");
 
     verify(taskAssignmentManager, never()).deleteTaskContainerMappings(anyCollection());
   }
@@ -327,24 +335,24 @@ public class TestGroupByContainerCount {
     // Before
     Set<TaskModel> taskModels = generateTaskModels(9);
     Set<ContainerModel> prevContainers = new GroupByContainerCount(4).group(taskModels);
-    Map<String, Integer> prevTaskToContainerMapping = generateTaskContainerMapping(prevContainers);
+    Map<String, String> prevTaskToContainerMapping = generateTaskContainerMapping(prevContainers);
     when(taskAssignmentManager.readTaskAssignment()).thenReturn(prevTaskToContainerMapping);
 
     // First balance
     Set<ContainerModel> containers = new GroupByContainerCount(2).balance(taskModels, localityManager);
 
-    Map<Integer, ContainerModel> containersMap = new HashMap<>();
+    Map<String, ContainerModel> containersMap = new HashMap<>();
     for (ContainerModel container : containers) {
-      containersMap.put(container.getContainerId(), container);
+      containersMap.put(container.getProcessorId(), container);
     }
 
     assertEquals(2, containers.size());
-    ContainerModel container0 = containersMap.get(0);
-    ContainerModel container1 = containersMap.get(1);
+    ContainerModel container0 = containersMap.get("0");
+    ContainerModel container1 = containersMap.get("1");
     assertNotNull(container0);
     assertNotNull(container1);
-    assertEquals(0, container0.getContainerId());
-    assertEquals(1, container1.getContainerId());
+    assertEquals("0", container0.getProcessorId());
+    assertEquals("1", container1.getProcessorId());
     assertEquals(5, container0.getTasks().size());
     assertEquals(4, container1.getTasks().size());
 
@@ -363,16 +371,16 @@ public class TestGroupByContainerCount {
     assertTrue(container1.getTasks().containsKey(getTaskName(3)));
 
     // Verify task mappings are saved
-    verify(taskAssignmentManager).writeTaskContainerMapping(getTaskName(0).getTaskName(), 0);
-    verify(taskAssignmentManager).writeTaskContainerMapping(getTaskName(4).getTaskName(), 0);
-    verify(taskAssignmentManager).writeTaskContainerMapping(getTaskName(8).getTaskName(), 0);
-    verify(taskAssignmentManager).writeTaskContainerMapping(getTaskName(6).getTaskName(), 0);
-    verify(taskAssignmentManager).writeTaskContainerMapping(getTaskName(2).getTaskName(), 0);
+    verify(taskAssignmentManager).writeTaskContainerMapping(getTaskName(0).getTaskName(), "0");
+    verify(taskAssignmentManager).writeTaskContainerMapping(getTaskName(4).getTaskName(), "0");
+    verify(taskAssignmentManager).writeTaskContainerMapping(getTaskName(8).getTaskName(), "0");
+    verify(taskAssignmentManager).writeTaskContainerMapping(getTaskName(6).getTaskName(), "0");
+    verify(taskAssignmentManager).writeTaskContainerMapping(getTaskName(2).getTaskName(), "0");
 
-    verify(taskAssignmentManager).writeTaskContainerMapping(getTaskName(1).getTaskName(), 1);
-    verify(taskAssignmentManager).writeTaskContainerMapping(getTaskName(5).getTaskName(), 1);
-    verify(taskAssignmentManager).writeTaskContainerMapping(getTaskName(7).getTaskName(), 1);
-    verify(taskAssignmentManager).writeTaskContainerMapping(getTaskName(3).getTaskName(), 1);
+    verify(taskAssignmentManager).writeTaskContainerMapping(getTaskName(1).getTaskName(), "1");
+    verify(taskAssignmentManager).writeTaskContainerMapping(getTaskName(5).getTaskName(), "1");
+    verify(taskAssignmentManager).writeTaskContainerMapping(getTaskName(7).getTaskName(), "1");
+    verify(taskAssignmentManager).writeTaskContainerMapping(getTaskName(3).getTaskName(), "1");
 
     verify(taskAssignmentManager, never()).deleteTaskContainerMappings(anyCollection());
 
@@ -389,19 +397,19 @@ public class TestGroupByContainerCount {
 
     containersMap = new HashMap<>();
     for (ContainerModel container : containers) {
-      containersMap.put(container.getContainerId(), container);
+      containersMap.put(container.getProcessorId(), container);
     }
 
     assertEquals(3, containers.size());
-    container0 = containersMap.get(0);
-    container1 = containersMap.get(1);
-    ContainerModel container2 = containersMap.get(2);
+    container0 = containersMap.get("0");
+    container1 = containersMap.get("1");
+    ContainerModel container2 = containersMap.get("2");
     assertNotNull(container0);
     assertNotNull(container1);
     assertNotNull(container2);
-    assertEquals(0, container0.getContainerId());
-    assertEquals(1, container1.getContainerId());
-    assertEquals(2, container2.getContainerId());
+    assertEquals("0", container0.getProcessorId());
+    assertEquals("1", container1.getProcessorId());
+    assertEquals("2", container2.getProcessorId());
     assertEquals(3, container0.getTasks().size());
     assertEquals(3, container1.getTasks().size());
     assertEquals(3, container2.getTasks().size());
@@ -421,17 +429,17 @@ public class TestGroupByContainerCount {
     assertTrue(container2.getTasks().containsKey(getTaskName(3)));
 
     // Verify task mappings are saved
-    verify(taskAssignmentManager2).writeTaskContainerMapping(getTaskName(0).getTaskName(), 0);
-    verify(taskAssignmentManager2).writeTaskContainerMapping(getTaskName(4).getTaskName(), 0);
-    verify(taskAssignmentManager2).writeTaskContainerMapping(getTaskName(8).getTaskName(), 0);
+    verify(taskAssignmentManager2).writeTaskContainerMapping(getTaskName(0).getTaskName(), "0");
+    verify(taskAssignmentManager2).writeTaskContainerMapping(getTaskName(4).getTaskName(), "0");
+    verify(taskAssignmentManager2).writeTaskContainerMapping(getTaskName(8).getTaskName(), "0");
 
-    verify(taskAssignmentManager2).writeTaskContainerMapping(getTaskName(1).getTaskName(), 1);
-    verify(taskAssignmentManager2).writeTaskContainerMapping(getTaskName(5).getTaskName(), 1);
-    verify(taskAssignmentManager2).writeTaskContainerMapping(getTaskName(7).getTaskName(), 1);
+    verify(taskAssignmentManager2).writeTaskContainerMapping(getTaskName(1).getTaskName(), "1");
+    verify(taskAssignmentManager2).writeTaskContainerMapping(getTaskName(5).getTaskName(), "1");
+    verify(taskAssignmentManager2).writeTaskContainerMapping(getTaskName(7).getTaskName(), "1");
 
-    verify(taskAssignmentManager2).writeTaskContainerMapping(getTaskName(6).getTaskName(), 2);
-    verify(taskAssignmentManager2).writeTaskContainerMapping(getTaskName(2).getTaskName(), 2);
-    verify(taskAssignmentManager2).writeTaskContainerMapping(getTaskName(3).getTaskName(), 2);
+    verify(taskAssignmentManager2).writeTaskContainerMapping(getTaskName(6).getTaskName(), "2");
+    verify(taskAssignmentManager2).writeTaskContainerMapping(getTaskName(2).getTaskName(), "2");
+    verify(taskAssignmentManager2).writeTaskContainerMapping(getTaskName(3).getTaskName(), "2");
 
     verify(taskAssignmentManager2, never()).deleteTaskContainerMappings(anyCollection());
   }
@@ -459,23 +467,23 @@ public class TestGroupByContainerCount {
   public void testBalancerAfterContainerSame() {
     Set<TaskModel> taskModels = generateTaskModels(9);
     Set<ContainerModel> prevContainers = new GroupByContainerCount(2).group(taskModels);
-    Map<String, Integer> prevTaskToContainerMapping = generateTaskContainerMapping(prevContainers);
+    Map<String, String> prevTaskToContainerMapping = generateTaskContainerMapping(prevContainers);
     when(taskAssignmentManager.readTaskAssignment()).thenReturn(prevTaskToContainerMapping);
 
     Set<ContainerModel> containers = new GroupByContainerCount(2).balance(taskModels, localityManager);
 
-    Map<Integer, ContainerModel> containersMap = new HashMap<>();
+    Map<String, ContainerModel> containersMap = new HashMap<>();
     for (ContainerModel container : containers) {
-      containersMap.put(container.getContainerId(), container);
+      containersMap.put(container.getProcessorId(), container);
     }
 
     assertEquals(2, containers.size());
-    ContainerModel container0 = containersMap.get(0);
-    ContainerModel container1 = containersMap.get(1);
+    ContainerModel container0 = containersMap.get("0");
+    ContainerModel container1 = containersMap.get("1");
     assertNotNull(container0);
     assertNotNull(container1);
-    assertEquals(0, container0.getContainerId());
-    assertEquals(1, container1.getContainerId());
+    assertEquals("0", container0.getProcessorId());
+    assertEquals("1", container1.getProcessorId());
     assertEquals(5, container0.getTasks().size());
     assertEquals(4, container1.getTasks().size());
 
@@ -489,7 +497,7 @@ public class TestGroupByContainerCount {
     assertTrue(container1.getTasks().containsKey(getTaskName(5)));
     assertTrue(container1.getTasks().containsKey(getTaskName(7)));
 
-    verify(taskAssignmentManager, never()).writeTaskContainerMapping(anyString(), anyInt());
+    verify(taskAssignmentManager, never()).writeTaskContainerMapping(anyString(), anyString());
     verify(taskAssignmentManager, never()).deleteTaskContainerMappings(anyCollection());
   }
 
@@ -520,32 +528,32 @@ public class TestGroupByContainerCount {
   public void testBalancerAfterContainerSameCustomAssignment() {
     Set<TaskModel> taskModels = generateTaskModels(9);
 
-    Map<String, Integer> prevTaskToContainerMapping = new HashMap<>();
-    prevTaskToContainerMapping.put(getTaskName(0).getTaskName(), 0);
-    prevTaskToContainerMapping.put(getTaskName(1).getTaskName(), 0);
-    prevTaskToContainerMapping.put(getTaskName(2).getTaskName(), 0);
-    prevTaskToContainerMapping.put(getTaskName(3).getTaskName(), 0);
-    prevTaskToContainerMapping.put(getTaskName(4).getTaskName(), 0);
-    prevTaskToContainerMapping.put(getTaskName(5).getTaskName(), 0);
-    prevTaskToContainerMapping.put(getTaskName(6).getTaskName(), 1);
-    prevTaskToContainerMapping.put(getTaskName(7).getTaskName(), 1);
-    prevTaskToContainerMapping.put(getTaskName(8).getTaskName(), 1);
+    Map<String, String> prevTaskToContainerMapping = new HashMap<>();
+    prevTaskToContainerMapping.put(getTaskName(0).getTaskName(), "0");
+    prevTaskToContainerMapping.put(getTaskName(1).getTaskName(), "0");
+    prevTaskToContainerMapping.put(getTaskName(2).getTaskName(), "0");
+    prevTaskToContainerMapping.put(getTaskName(3).getTaskName(), "0");
+    prevTaskToContainerMapping.put(getTaskName(4).getTaskName(), "0");
+    prevTaskToContainerMapping.put(getTaskName(5).getTaskName(), "0");
+    prevTaskToContainerMapping.put(getTaskName(6).getTaskName(), "1");
+    prevTaskToContainerMapping.put(getTaskName(7).getTaskName(), "1");
+    prevTaskToContainerMapping.put(getTaskName(8).getTaskName(), "1");
     when(taskAssignmentManager.readTaskAssignment()).thenReturn(prevTaskToContainerMapping);
 
     Set<ContainerModel> containers = new GroupByContainerCount(2).balance(taskModels, localityManager);
 
-    Map<Integer, ContainerModel> containersMap = new HashMap<>();
+    Map<String, ContainerModel> containersMap = new HashMap<>();
     for (ContainerModel container : containers) {
-      containersMap.put(container.getContainerId(), container);
+      containersMap.put(container.getProcessorId(), container);
     }
 
     assertEquals(2, containers.size());
-    ContainerModel container0 = containersMap.get(0);
-    ContainerModel container1 = containersMap.get(1);
+    ContainerModel container0 = containersMap.get("0");
+    ContainerModel container1 = containersMap.get("1");
     assertNotNull(container0);
     assertNotNull(container1);
-    assertEquals(0, container0.getContainerId());
-    assertEquals(1, container1.getContainerId());
+    assertEquals("0", container0.getProcessorId());
+    assertEquals("1", container1.getProcessorId());
     assertEquals(6, container0.getTasks().size());
     assertEquals(3, container1.getTasks().size());
 
@@ -559,7 +567,7 @@ public class TestGroupByContainerCount {
     assertTrue(container1.getTasks().containsKey(getTaskName(7)));
     assertTrue(container1.getTasks().containsKey(getTaskName(8)));
 
-    verify(taskAssignmentManager, never()).writeTaskContainerMapping(anyString(), anyInt());
+    verify(taskAssignmentManager, never()).writeTaskContainerMapping(anyString(), anyString());
     verify(taskAssignmentManager, never()).deleteTaskContainerMappings(anyCollection());
   }
 
@@ -589,32 +597,32 @@ public class TestGroupByContainerCount {
   public void testBalancerAfterContainerSameCustomAssignmentAndContainerIncrease() {
     Set<TaskModel> taskModels = generateTaskModels(6);
 
-    Map<String, Integer> prevTaskToContainerMapping = new HashMap<>();
-    prevTaskToContainerMapping.put(getTaskName(0).getTaskName(), 0);
-    prevTaskToContainerMapping.put(getTaskName(1).getTaskName(), 1);
-    prevTaskToContainerMapping.put(getTaskName(2).getTaskName(), 1);
-    prevTaskToContainerMapping.put(getTaskName(3).getTaskName(), 1);
-    prevTaskToContainerMapping.put(getTaskName(4).getTaskName(), 1);
-    prevTaskToContainerMapping.put(getTaskName(5).getTaskName(), 1);
+    Map<String, String> prevTaskToContainerMapping = new HashMap<>();
+    prevTaskToContainerMapping.put(getTaskName(0).getTaskName(), "0");
+    prevTaskToContainerMapping.put(getTaskName(1).getTaskName(), "1");
+    prevTaskToContainerMapping.put(getTaskName(2).getTaskName(), "1");
+    prevTaskToContainerMapping.put(getTaskName(3).getTaskName(), "1");
+    prevTaskToContainerMapping.put(getTaskName(4).getTaskName(), "1");
+    prevTaskToContainerMapping.put(getTaskName(5).getTaskName(), "1");
     when(taskAssignmentManager.readTaskAssignment()).thenReturn(prevTaskToContainerMapping);
 
     Set<ContainerModel> containers = new GroupByContainerCount(3).balance(taskModels, localityManager);
 
-    Map<Integer, ContainerModel> containersMap = new HashMap<>();
+    Map<String, ContainerModel> containersMap = new HashMap<>();
     for (ContainerModel container : containers) {
-      containersMap.put(container.getContainerId(), container);
+      containersMap.put(container.getProcessorId(), container);
     }
 
     assertEquals(3, containers.size());
-    ContainerModel container0 = containersMap.get(0);
-    ContainerModel container1 = containersMap.get(1);
-    ContainerModel container2 = containersMap.get(2);
+    ContainerModel container0 = containersMap.get("0");
+    ContainerModel container1 = containersMap.get("1");
+    ContainerModel container2 = containersMap.get("2");
     assertNotNull(container0);
     assertNotNull(container1);
     assertNotNull(container2);
-    assertEquals(0, container0.getContainerId());
-    assertEquals(1, container1.getContainerId());
-    assertEquals(2, container2.getContainerId());
+    assertEquals("0", container0.getProcessorId());
+    assertEquals("1", container1.getProcessorId());
+    assertEquals("2", container2.getProcessorId());
     assertEquals(2, container0.getTasks().size());
     assertEquals(2, container1.getTasks().size());
     assertEquals(2, container1.getTasks().size());
@@ -626,12 +634,12 @@ public class TestGroupByContainerCount {
     assertTrue(container2.getTasks().containsKey(getTaskName(4)));
     assertTrue(container2.getTasks().containsKey(getTaskName(3)));
 
-    verify(taskAssignmentManager).writeTaskContainerMapping(getTaskName(0).getTaskName(), 0);
-    verify(taskAssignmentManager).writeTaskContainerMapping(getTaskName(1).getTaskName(), 1);
-    verify(taskAssignmentManager).writeTaskContainerMapping(getTaskName(2).getTaskName(), 1);
-    verify(taskAssignmentManager).writeTaskContainerMapping(getTaskName(3).getTaskName(), 2);
-    verify(taskAssignmentManager).writeTaskContainerMapping(getTaskName(4).getTaskName(), 2);
-    verify(taskAssignmentManager).writeTaskContainerMapping(getTaskName(5).getTaskName(), 0);
+    verify(taskAssignmentManager).writeTaskContainerMapping(getTaskName(0).getTaskName(), "0");
+    verify(taskAssignmentManager).writeTaskContainerMapping(getTaskName(1).getTaskName(), "1");
+    verify(taskAssignmentManager).writeTaskContainerMapping(getTaskName(2).getTaskName(), "1");
+    verify(taskAssignmentManager).writeTaskContainerMapping(getTaskName(3).getTaskName(), "2");
+    verify(taskAssignmentManager).writeTaskContainerMapping(getTaskName(4).getTaskName(), "2");
+    verify(taskAssignmentManager).writeTaskContainerMapping(getTaskName(5).getTaskName(), "0");
 
     verify(taskAssignmentManager, never()).deleteTaskContainerMappings(anyCollection());
   }
@@ -640,26 +648,26 @@ public class TestGroupByContainerCount {
   public void testBalancerOldContainerCountOne() {
     Set<TaskModel> taskModels = generateTaskModels(3);
     Set<ContainerModel> prevContainers = new GroupByContainerCount(1).group(taskModels);
-    Map<String, Integer> prevTaskToContainerMapping = generateTaskContainerMapping(prevContainers);
+    Map<String, String> prevTaskToContainerMapping = generateTaskContainerMapping(prevContainers);
     when(taskAssignmentManager.readTaskAssignment()).thenReturn(prevTaskToContainerMapping);
 
     Set<ContainerModel> containers = new GroupByContainerCount(3).balance(taskModels, localityManager);
 
     // Results should be the same as calling group()
-    Map<Integer, ContainerModel> containersMap = new HashMap<>();
+    Map<String, ContainerModel> containersMap = new HashMap<>();
     for (ContainerModel container : containers) {
-      containersMap.put(container.getContainerId(), container);
+      containersMap.put(container.getProcessorId(), container);
     }
     assertEquals(3, containers.size());
-    ContainerModel container0 = containersMap.get(0);
-    ContainerModel container1 = containersMap.get(1);
-    ContainerModel container2 = containersMap.get(2);
+    ContainerModel container0 = containersMap.get("0");
+    ContainerModel container1 = containersMap.get("1");
+    ContainerModel container2 = containersMap.get("2");
     assertNotNull(container0);
     assertNotNull(container1);
     assertNotNull(container2);
-    assertEquals(0, container0.getContainerId());
-    assertEquals(1, container1.getContainerId());
-    assertEquals(2, container2.getContainerId());
+    assertEquals("0", container0.getProcessorId());
+    assertEquals("1", container1.getProcessorId());
+    assertEquals("2", container2.getProcessorId());
     assertEquals(1, container0.getTasks().size());
     assertEquals(1, container1.getTasks().size());
     assertEquals(1, container2.getTasks().size());
@@ -669,9 +677,9 @@ public class TestGroupByContainerCount {
     assertTrue(container2.getTasks().containsKey(getTaskName(2)));
 
     // Verify task mappings are saved
-    verify(taskAssignmentManager).writeTaskContainerMapping(getTaskName(0).getTaskName(), 0);
-    verify(taskAssignmentManager).writeTaskContainerMapping(getTaskName(1).getTaskName(), 1);
-    verify(taskAssignmentManager).writeTaskContainerMapping(getTaskName(2).getTaskName(), 2);
+    verify(taskAssignmentManager).writeTaskContainerMapping(getTaskName(0).getTaskName(), "0");
+    verify(taskAssignmentManager).writeTaskContainerMapping(getTaskName(1).getTaskName(), "1");
+    verify(taskAssignmentManager).writeTaskContainerMapping(getTaskName(2).getTaskName(), "2");
 
     verify(taskAssignmentManager, never()).deleteTaskContainerMappings(anyCollection());
   }
@@ -680,30 +688,30 @@ public class TestGroupByContainerCount {
   public void testBalancerNewContainerCountOne() {
     Set<TaskModel> taskModels = generateTaskModels(3);
     Set<ContainerModel> prevContainers = new GroupByContainerCount(3).group(taskModels);
-    Map<String, Integer> prevTaskToContainerMapping = generateTaskContainerMapping(prevContainers);
+    Map<String, String> prevTaskToContainerMapping = generateTaskContainerMapping(prevContainers);
     when(taskAssignmentManager.readTaskAssignment()).thenReturn(prevTaskToContainerMapping);
 
     Set<ContainerModel> containers = new GroupByContainerCount(1).balance(taskModels, localityManager);
 
     // Results should be the same as calling group
-    Map<Integer, ContainerModel> containersMap = new HashMap<>();
+    Map<String, ContainerModel> containersMap = new HashMap<>();
     for (ContainerModel container : containers) {
-      containersMap.put(container.getContainerId(), container);
+      containersMap.put(container.getProcessorId(), container);
     }
 
     assertEquals(1, containers.size());
-    ContainerModel container0 = containersMap.get(0);
+    ContainerModel container0 = containersMap.get("0");
     assertNotNull(container0);
-    assertEquals(0, container0.getContainerId());
+    assertEquals("0", container0.getProcessorId());
     assertEquals(3, container0.getTasks().size());
 
     assertTrue(container0.getTasks().containsKey(getTaskName(0)));
     assertTrue(container0.getTasks().containsKey(getTaskName(1)));
     assertTrue(container0.getTasks().containsKey(getTaskName(2)));
 
-    verify(taskAssignmentManager).writeTaskContainerMapping(getTaskName(0).getTaskName(), 0);
-    verify(taskAssignmentManager).writeTaskContainerMapping(getTaskName(1).getTaskName(), 0);
-    verify(taskAssignmentManager).writeTaskContainerMapping(getTaskName(2).getTaskName(), 0);
+    verify(taskAssignmentManager).writeTaskContainerMapping(getTaskName(0).getTaskName(), "0");
+    verify(taskAssignmentManager).writeTaskContainerMapping(getTaskName(1).getTaskName(), "0");
+    verify(taskAssignmentManager).writeTaskContainerMapping(getTaskName(2).getTaskName(), "0");
 
     verify(taskAssignmentManager, never()).deleteTaskContainerMappings(anyCollection());
   }
@@ -711,29 +719,29 @@ public class TestGroupByContainerCount {
   @Test
   public void testBalancerEmptyTaskMapping() {
     Set<TaskModel> taskModels = generateTaskModels(3);
-    when(taskAssignmentManager.readTaskAssignment()).thenReturn(new HashMap<String, Integer>());
+    when(taskAssignmentManager.readTaskAssignment()).thenReturn(new HashMap<String, String>());
 
     Set<ContainerModel> containers = new GroupByContainerCount(1).balance(taskModels, localityManager);
 
     // Results should be the same as calling group
-    Map<Integer, ContainerModel> containersMap = new HashMap<>();
+    Map<String, ContainerModel> containersMap = new HashMap<>();
     for (ContainerModel container : containers) {
-      containersMap.put(container.getContainerId(), container);
+      containersMap.put(container.getProcessorId(), container);
     }
 
     assertEquals(1, containers.size());
-    ContainerModel container0 = containersMap.get(0);
+    ContainerModel container0 = containersMap.get("0");
     assertNotNull(container0);
-    assertEquals(0, container0.getContainerId());
+    assertEquals("0", container0.getProcessorId());
     assertEquals(3, container0.getTasks().size());
 
     assertTrue(container0.getTasks().containsKey(getTaskName(0)));
     assertTrue(container0.getTasks().containsKey(getTaskName(1)));
     assertTrue(container0.getTasks().containsKey(getTaskName(2)));
 
-    verify(taskAssignmentManager).writeTaskContainerMapping(getTaskName(0).getTaskName(), 0);
-    verify(taskAssignmentManager).writeTaskContainerMapping(getTaskName(1).getTaskName(), 0);
-    verify(taskAssignmentManager).writeTaskContainerMapping(getTaskName(2).getTaskName(), 0);
+    verify(taskAssignmentManager).writeTaskContainerMapping(getTaskName(0).getTaskName(), "0");
+    verify(taskAssignmentManager).writeTaskContainerMapping(getTaskName(1).getTaskName(), "0");
+    verify(taskAssignmentManager).writeTaskContainerMapping(getTaskName(2).getTaskName(), "0");
 
     verify(taskAssignmentManager, never()).deleteTaskContainerMappings(anyCollection());
   }
@@ -743,30 +751,30 @@ public class TestGroupByContainerCount {
     int taskCount = 3;
     Set<TaskModel> taskModels = generateTaskModels(taskCount);
     Set<ContainerModel> prevContainers = new GroupByContainerCount(2).group(generateTaskModels(taskCount - 1));
-    Map<String, Integer> prevTaskToContainerMapping = generateTaskContainerMapping(prevContainers);
+    Map<String, String> prevTaskToContainerMapping = generateTaskContainerMapping(prevContainers);
     when(taskAssignmentManager.readTaskAssignment()).thenReturn(prevTaskToContainerMapping);
 
     Set<ContainerModel> containers = new GroupByContainerCount(1).balance(taskModels, localityManager);
 
     // Results should be the same as calling group
-    Map<Integer, ContainerModel> containersMap = new HashMap<>();
+    Map<String, ContainerModel> containersMap = new HashMap<>();
     for (ContainerModel container : containers) {
-      containersMap.put(container.getContainerId(), container);
+      containersMap.put(container.getProcessorId(), container);
     }
 
     assertEquals(1, containers.size());
-    ContainerModel container0 = containersMap.get(0);
+    ContainerModel container0 = containersMap.get("0");
     assertNotNull(container0);
-    assertEquals(0, container0.getContainerId());
+    assertEquals("0", container0.getProcessorId());
     assertEquals(3, container0.getTasks().size());
 
     assertTrue(container0.getTasks().containsKey(getTaskName(0)));
     assertTrue(container0.getTasks().containsKey(getTaskName(1)));
     assertTrue(container0.getTasks().containsKey(getTaskName(2)));
 
-    verify(taskAssignmentManager).writeTaskContainerMapping(getTaskName(0).getTaskName(), 0);
-    verify(taskAssignmentManager).writeTaskContainerMapping(getTaskName(1).getTaskName(), 0);
-    verify(taskAssignmentManager).writeTaskContainerMapping(getTaskName(2).getTaskName(), 0);
+    verify(taskAssignmentManager).writeTaskContainerMapping(getTaskName(0).getTaskName(), "0");
+    verify(taskAssignmentManager).writeTaskContainerMapping(getTaskName(1).getTaskName(), "0");
+    verify(taskAssignmentManager).writeTaskContainerMapping(getTaskName(2).getTaskName(), "0");
 
     verify(taskAssignmentManager).deleteTaskContainerMappings(anyCollection());
   }
@@ -776,30 +784,30 @@ public class TestGroupByContainerCount {
     int taskCount = 3;
     Set<TaskModel> taskModels = generateTaskModels(taskCount);
     Set<ContainerModel> prevContainers = new GroupByContainerCount(3).group(generateTaskModels(taskCount + 1));
-    Map<String, Integer> prevTaskToContainerMapping = generateTaskContainerMapping(prevContainers);
+    Map<String, String> prevTaskToContainerMapping = generateTaskContainerMapping(prevContainers);
     when(taskAssignmentManager.readTaskAssignment()).thenReturn(prevTaskToContainerMapping);
 
     Set<ContainerModel> containers = new GroupByContainerCount(1).balance(taskModels, localityManager);
 
     // Results should be the same as calling group
-    Map<Integer, ContainerModel> containersMap = new HashMap<>();
+    Map<String, ContainerModel> containersMap = new HashMap<>();
     for (ContainerModel container : containers) {
-      containersMap.put(container.getContainerId(), container);
+      containersMap.put(container.getProcessorId(), container);
     }
 
     assertEquals(1, containers.size());
-    ContainerModel container0 = containersMap.get(0);
+    ContainerModel container0 = containersMap.get("0");
     assertNotNull(container0);
-    assertEquals(0, container0.getContainerId());
+    assertEquals("0", container0.getProcessorId());
     assertEquals(3, container0.getTasks().size());
 
     assertTrue(container0.getTasks().containsKey(getTaskName(0)));
     assertTrue(container0.getTasks().containsKey(getTaskName(1)));
     assertTrue(container0.getTasks().containsKey(getTaskName(2)));
 
-    verify(taskAssignmentManager).writeTaskContainerMapping(getTaskName(0).getTaskName(), 0);
-    verify(taskAssignmentManager).writeTaskContainerMapping(getTaskName(1).getTaskName(), 0);
-    verify(taskAssignmentManager).writeTaskContainerMapping(getTaskName(2).getTaskName(), 0);
+    verify(taskAssignmentManager).writeTaskContainerMapping(getTaskName(0).getTaskName(), "0");
+    verify(taskAssignmentManager).writeTaskContainerMapping(getTaskName(1).getTaskName(), "0");
+    verify(taskAssignmentManager).writeTaskContainerMapping(getTaskName(2).getTaskName(), "0");
 
     verify(taskAssignmentManager).deleteTaskContainerMappings(anyCollection());
   }
@@ -808,7 +816,7 @@ public class TestGroupByContainerCount {
   public void testBalancerNewContainerCountGreaterThanTasks() {
     Set<TaskModel> taskModels = generateTaskModels(3);
     Set<ContainerModel> prevContainers = new GroupByContainerCount(3).group(taskModels);
-    Map<String, Integer> prevTaskToContainerMapping = generateTaskContainerMapping(prevContainers);
+    Map<String, String> prevTaskToContainerMapping = generateTaskContainerMapping(prevContainers);
     when(taskAssignmentManager.readTaskAssignment()).thenReturn(prevTaskToContainerMapping);
 
     new GroupByContainerCount(5).balance(taskModels, localityManager);     // Should throw
@@ -818,7 +826,7 @@ public class TestGroupByContainerCount {
   public void testBalancerEmptyTasks() {
     Set<TaskModel> taskModels = generateTaskModels(3);
     Set<ContainerModel> prevContainers = new GroupByContainerCount(3).group(taskModels);
-    Map<String, Integer> prevTaskToContainerMapping = generateTaskContainerMapping(prevContainers);
+    Map<String, String> prevTaskToContainerMapping = generateTaskContainerMapping(prevContainers);
     when(taskAssignmentManager.readTaskAssignment()).thenReturn(prevTaskToContainerMapping);
 
     new GroupByContainerCount(5).balance(new HashSet<TaskModel>(), localityManager);     // Should throw
@@ -828,10 +836,25 @@ public class TestGroupByContainerCount {
   public void testBalancerResultImmutable() {
     Set<TaskModel> taskModels = generateTaskModels(3);
     Set<ContainerModel> prevContainers = new GroupByContainerCount(3).group(taskModels);
-    Map<String, Integer> prevTaskToContainerMapping = generateTaskContainerMapping(prevContainers);
+    Map<String, String> prevTaskToContainerMapping = generateTaskContainerMapping(prevContainers);
     when(taskAssignmentManager.readTaskAssignment()).thenReturn(prevTaskToContainerMapping);
 
     Set<ContainerModel> containers = new GroupByContainerCount(2).balance(taskModels, localityManager);
     containers.remove(containers.iterator().next());
   }
+
+  @Test(expected = SamzaException.class)
+  public void testBalancerThrowsOnNonIntegerContainerIds() {
+    Set<TaskModel> taskModels = generateTaskModels(3);
+    Set<ContainerModel> prevContainers = new HashSet<>();
+    taskModels.forEach(model -> {
+        prevContainers.add(
+          new ContainerModel(UUID.randomUUID().toString(), -1, Collections.singletonMap(model.getTaskName(), model)));
+      });
+    Map<String, String> prevTaskToContainerMapping = generateTaskContainerMapping(prevContainers);
+    when(taskAssignmentManager.readTaskAssignment()).thenReturn(prevTaskToContainerMapping);
+
+    new GroupByContainerCount(3).balance(taskModels, localityManager); //Should throw
+
+  }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/a7da1840/samza-core/src/test/java/org/apache/samza/container/grouper/task/TestGroupByContainerIds.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/container/grouper/task/TestGroupByContainerIds.java b/samza-core/src/test/java/org/apache/samza/container/grouper/task/TestGroupByContainerIds.java
index 82f2b7a..62131fe 100644
--- a/samza-core/src/test/java/org/apache/samza/container/grouper/task/TestGroupByContainerIds.java
+++ b/samza-core/src/test/java/org/apache/samza/container/grouper/task/TestGroupByContainerIds.java
@@ -44,13 +44,11 @@ import static org.mockito.Mockito.when;
 
 
 public class TestGroupByContainerIds {
-  private TaskAssignmentManager taskAssignmentManager;
-  private LocalityManager localityManager;
 
   @Before
   public void setup() {
-    taskAssignmentManager = mock(TaskAssignmentManager.class);
-    localityManager = mock(LocalityManager.class);
+    TaskAssignmentManager taskAssignmentManager = mock(TaskAssignmentManager.class);
+    LocalityManager localityManager = mock(LocalityManager.class);
     when(localityManager.getTaskAssignmentManager()).thenReturn(taskAssignmentManager);
 
 
@@ -94,18 +92,18 @@ public class TestGroupByContainerIds {
 
     Set<ContainerModel> containers = buildSimpleGrouper(2).group(taskModels);
 
-    Map<Integer, ContainerModel> containersMap = new HashMap<>();
+    Map<String, ContainerModel> containersMap = new HashMap<>();
     for (ContainerModel container : containers) {
-      containersMap.put(container.getContainerId(), container);
+      containersMap.put(container.getProcessorId(), container);
     }
 
     assertEquals(2, containers.size());
-    ContainerModel container0 = containersMap.get(0);
-    ContainerModel container1 = containersMap.get(1);
+    ContainerModel container0 = containersMap.get("0");
+    ContainerModel container1 = containersMap.get("1");
     assertNotNull(container0);
     assertNotNull(container1);
-    assertEquals(0, container0.getContainerId());
-    assertEquals(1, container1.getContainerId());
+    assertEquals("0", container0.getProcessorId());
+    assertEquals("1", container1.getProcessorId());
     assertEquals(3, container0.getTasks().size());
     assertEquals(2, container1.getTasks().size());
     assertTrue(container0.getTasks().containsKey(getTaskName(0)));
@@ -119,27 +117,27 @@ public class TestGroupByContainerIds {
   public void testGroupHappyPathWithListOfContainers() {
     Set<TaskModel> taskModels = generateTaskModels(5);
 
-    List<Integer> containerIds = new ArrayList<Integer>() {
+    List<String> containerIds = new ArrayList<String>() {
       {
-        add(4);
-        add(2);
+        add("4");
+        add("2");
       }
     };
 
     Set<ContainerModel> containers = buildSimpleGrouper().group(taskModels, containerIds);
 
-    Map<Integer, ContainerModel> containersMap = new HashMap<>();
+    Map<String, ContainerModel> containersMap = new HashMap<>();
     for (ContainerModel container : containers) {
-      containersMap.put(container.getContainerId(), container);
+      containersMap.put(container.getProcessorId(), container);
     }
 
     assertEquals(2, containers.size());
-    ContainerModel container0 = containersMap.get(4);
-    ContainerModel container1 = containersMap.get(2);
+    ContainerModel container0 = containersMap.get("4");
+    ContainerModel container1 = containersMap.get("2");
     assertNotNull(container0);
     assertNotNull(container1);
-    assertEquals(4, container0.getContainerId());
-    assertEquals(2, container1.getContainerId());
+    assertEquals("4", container0.getProcessorId());
+    assertEquals("2", container1.getProcessorId());
     assertEquals(3, container0.getTasks().size());
     assertEquals(2, container1.getTasks().size());
     assertTrue(container0.getTasks().containsKey(getTaskName(0)));
@@ -154,28 +152,28 @@ public class TestGroupByContainerIds {
   public void testGroupManyTasks() {
     Set<TaskModel> taskModels = generateTaskModels(21);
 
-    List<Integer> containerIds = new ArrayList<Integer>() {
+    List<String> containerIds = new ArrayList<String>() {
       {
-        add(4);
-        add(2);
+        add("4");
+        add("2");
       }
     };
 
 
     Set<ContainerModel> containers = buildSimpleGrouper().group(taskModels, containerIds);
 
-    Map<Integer, ContainerModel> containersMap = new HashMap<>();
+    Map<String, ContainerModel> containersMap = new HashMap<>();
     for (ContainerModel container : containers) {
-      containersMap.put(container.getContainerId(), container);
+      containersMap.put(container.getProcessorId(), container);
     }
 
     assertEquals(2, containers.size());
-    ContainerModel container0 = containersMap.get(4);
-    ContainerModel container1 = containersMap.get(2);
+    ContainerModel container0 = containersMap.get("4");
+    ContainerModel container1 = containersMap.get("2");
     assertNotNull(container0);
     assertNotNull(container1);
-    assertEquals(4, container0.getContainerId());
-    assertEquals(2, container1.getContainerId());
+    assertEquals("4", container0.getProcessorId());
+    assertEquals("2", container1.getProcessorId());
     assertEquals(11, container0.getTasks().size());
     assertEquals(10, container1.getTasks().size());
 


[3/3] samza git commit: SAMZA-1126 - Semantics of processorId in Samza

Posted by na...@apache.org.
SAMZA-1126 - Semantics of processorId in Samza

Implementation based on [SEP-1](https://cwiki.apache.org/confluence/display/SAMZA/SEP-1%3A+Semantics+of+ProcessorId+in+Samza)

Author: navina <na...@apache.org>

Reviewers: Yi Pan <ni...@gmail.com>, Jacob Maes <jm...@linkedin.com>, Jagadish <ja...@apache.org>

Closes #103 from navina/SAMZA-1126


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/a7da1840
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/a7da1840
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/a7da1840

Branch: refs/heads/master
Commit: a7da1840f17a182b49c30451f35991c97fc51068
Parents: c74722b
Author: Navina Ramesh <na...@apache.org>
Authored: Fri Apr 7 15:22:13 2017 -0700
Committer: nramesh <nr...@linkedin.com>
Committed: Fri Apr 7 15:22:13 2017 -0700

----------------------------------------------------------------------
 .../samza/container/SamzaContainerContext.java  |   4 +-
 .../org/apache/samza/job/CommandBuilder.java    |   4 +-
 .../samza/runtime/ProcessorIdGenerator.java     |  51 +++
 .../AbstractContainerAllocator.java             |  12 +-
 .../clustermanager/ContainerProcessManager.java |  19 +-
 .../HostAwareContainerAllocator.java            |   2 +-
 .../clustermanager/SamzaApplicationState.java   |   2 +-
 .../clustermanager/SamzaResourceRequest.java    |   6 +-
 .../apache/samza/config/ApplicationConfig.java  |  60 +++
 .../apache/samza/container/LocalityManager.java |  12 +-
 .../grouper/task/GroupByContainerCount.java     |  43 ++-
 .../grouper/task/GroupByContainerIds.java       |  10 +-
 .../task/SingleContainerGrouperFactory.java     |   8 +-
 .../grouper/task/TaskAssignmentManager.java     |  10 +-
 .../container/grouper/task/TaskNameGrouper.java |   2 +-
 .../samza/coordinator/JobCoordinator.java       |  22 +-
 .../coordinator/JobCoordinatorFactory.java      |   3 +-
 .../messages/SetTaskContainerMapping.java       |   7 +-
 .../apache/samza/job/model/ContainerModel.java  |  31 +-
 .../org/apache/samza/job/model/JobModel.java    |  26 +-
 .../processor/SamzaContainerController.java     |  20 +-
 .../apache/samza/processor/StreamProcessor.java |  53 +--
 .../samza/runtime/LocalContainerRunner.java     |  10 +-
 .../org/apache/samza/runtime/UUIDGenerator.java |  41 ++
 .../model/JsonContainerModelMixIn.java          |   9 +-
 .../serializers/model/JsonJobModelMixIn.java    |   4 +-
 .../serializers/model/SamzaObjectMapper.java    |  30 +-
 .../standalone/StandaloneJobCoordinator.java    |  30 +-
 .../StandaloneJobCoordinatorFactory.java        |   4 +-
 .../apache/samza/storage/StorageRecovery.java   |   4 +-
 .../apache/samza/util/ClassLoaderHelper.java    |  19 +
 .../samza/zk/ZkCoordinationServiceFactory.java  |   4 +-
 .../org/apache/samza/zk/ZkJobCoordinator.java   |  40 +-
 .../samza/zk/ZkJobCoordinatorFactory.java       |  10 +-
 .../main/java/org/apache/samza/zk/ZkUtils.java  |  21 +-
 .../samza/config/ShellCommandConfig.scala       |   3 +-
 .../apache/samza/container/SamzaContainer.scala |   8 +-
 .../samza/coordinator/JobModelManager.scala     |   6 +-
 .../samza/job/local/ProcessJobFactory.scala     |   2 +-
 .../samza/job/local/ThreadJobFactory.scala      |   6 +-
 .../clustermanager/MockContainerAllocator.java  |   2 +-
 .../clustermanager/TestContainerAllocator.java  |  28 +-
 .../TestContainerProcessManager.java            |  25 +-
 .../TestContainerRequestState.java              |  12 +-
 .../TestHostAwareContainerAllocator.java        |  45 +--
 .../samza/container/TestLocalityManager.java    |  12 +-
 .../grouper/task/TestGroupByContainerCount.java | 377 ++++++++++---------
 .../grouper/task/TestGroupByContainerIds.java   |  54 ++-
 .../grouper/task/TestTaskAssignmentManager.java |  45 +--
 .../samza/container/mock/ContainerMocks.java    |  18 +-
 .../model/TestSamzaObjectMapper.java            |  61 ++-
 .../zk/TestZkBarrierForVersionUpgrade.java      |  12 +-
 .../apache/samza/zk/TestZkLeaderElector.java    |   1 -
 .../java/org/apache/samza/zk/TestZkUtils.java   |  10 +-
 .../samza/container/TestSamzaContainer.scala    |  56 ++-
 .../samza/container/TestTaskInstance.scala      |  10 +-
 .../samza/coordinator/TestJobCoordinator.scala  |  13 +-
 .../samza/job/TestShellCommandBuilder.scala     |   4 +-
 .../samza/storage/kv/RocksDbKeyValueReader.java |   2 +-
 .../java/org/apache/samza/rest/model/Task.java  |  12 +-
 .../samza/rest/proxy/task/SamzaTaskProxy.java   |   4 +-
 .../samza/monitor/TestLocalStoreMonitor.java    |   4 +-
 .../rest/resources/mock/MockTaskProxy.java      |   8 +-
 .../performance/TestKeyValuePerformance.scala   |   2 +-
 .../test/processor/TestStreamProcessor.java     |  19 +-
 .../test/integration/StreamTaskTestUtil.scala   |   3 +-
 .../org/apache/samza/job/yarn/YarnAppState.java |   2 +-
 .../job/yarn/YarnClusterResourceManager.java    |  23 +-
 .../samza/job/yarn/YarnContainerRunner.java     |   4 +-
 .../samza/validation/YarnJobValidationTool.java |   6 +-
 70 files changed, 898 insertions(+), 634 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/a7da1840/samza-api/src/main/java/org/apache/samza/container/SamzaContainerContext.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/container/SamzaContainerContext.java b/samza-api/src/main/java/org/apache/samza/container/SamzaContainerContext.java
index fd7333b..4076a51 100644
--- a/samza-api/src/main/java/org/apache/samza/container/SamzaContainerContext.java
+++ b/samza-api/src/main/java/org/apache/samza/container/SamzaContainerContext.java
@@ -28,7 +28,7 @@ import java.util.Collections;
  * A SamzaContainerContext maintains per-container information for the tasks it executes.
  */
 public class SamzaContainerContext {
-  public final int id;
+  public final String id;
   public final Config config;
   public final Collection<TaskName> taskNames;
 
@@ -40,7 +40,7 @@ public class SamzaContainerContext {
    * @param taskNames The set of taskName keys for which this container is responsible.
    */
   public SamzaContainerContext(
-      int id,
+      String id,
       Config config,
       Collection<TaskName> taskNames) {
     this.id = id;

http://git-wip-us.apache.org/repos/asf/samza/blob/a7da1840/samza-api/src/main/java/org/apache/samza/job/CommandBuilder.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/job/CommandBuilder.java b/samza-api/src/main/java/org/apache/samza/job/CommandBuilder.java
index 6d46f5d..fc7438b 100644
--- a/samza-api/src/main/java/org/apache/samza/job/CommandBuilder.java
+++ b/samza-api/src/main/java/org/apache/samza/job/CommandBuilder.java
@@ -30,7 +30,7 @@ import org.apache.samza.config.Config;
  */
 public abstract class CommandBuilder {
   protected Config config;
-  protected int id;
+  protected String id;
   protected URL url;
   protected String commandPath;
 
@@ -61,7 +61,7 @@ public abstract class CommandBuilder {
    *          associated with a specific instantiation of a SamzaContainer.
    * @return self to support a builder style of use.
    */
-  public CommandBuilder setId(int id) {
+  public CommandBuilder setId(String id) {
     this.id = id;
     return this;
   }

http://git-wip-us.apache.org/repos/asf/samza/blob/a7da1840/samza-api/src/main/java/org/apache/samza/runtime/ProcessorIdGenerator.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/runtime/ProcessorIdGenerator.java b/samza-api/src/main/java/org/apache/samza/runtime/ProcessorIdGenerator.java
new file mode 100644
index 0000000..8790d69
--- /dev/null
+++ b/samza-api/src/main/java/org/apache/samza/runtime/ProcessorIdGenerator.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.runtime;
+
+import org.apache.samza.annotation.InterfaceStability;
+import org.apache.samza.config.Config;
+
+@InterfaceStability.Evolving
+public interface ProcessorIdGenerator {
+  /**
+   * Generates a String representation to identify a single instance of StreamProcessor.
+   *
+   * This value can be representative of its current executing environment. It can also be custom-managed by the user,
+   * as long as it adheres to the specification below. More than one processor can co-exist within the same JVM,
+   * as long as their identifiers are guaranteed to be unique.
+   *
+   * <b>Specification of processor identifier</b>:
+   * <ul>
+   *  <li>Processor identifier has to be unique among the processors within a job</li>
+   *  <li>When more than one processor co-exist within the same JVM, the processor identifier can be of the format:
+   *  $x_$y, where 'x' is a unique identifier for the executing JVM and 'y' is a unique identifier for the
+   *  processor instance within the JVM. When there is only one processor within a JVM, 'x' should be sufficient to
+   *  uniquely identify the processor instance.</li>
+   * </ul>
+   *
+   * <b>Note</b>:
+   * In case of more than one processors within the same JVM, the custom implementation of ProcessorIdGenerator can
+   * contain a static counter, which is incremented on each call to generateProcessorId. The counter value can
+   * be treated as the identifier for the processor instance within the JVM.
+   *
+   * @param config Config instance
+   * @return String Identifier for the processor
+   */
+  String generateProcessorId(Config config);
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/a7da1840/samza-core/src/main/java/org/apache/samza/clustermanager/AbstractContainerAllocator.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/clustermanager/AbstractContainerAllocator.java b/samza-core/src/main/java/org/apache/samza/clustermanager/AbstractContainerAllocator.java
index d47f217..b83d83c 100644
--- a/samza-core/src/main/java/org/apache/samza/clustermanager/AbstractContainerAllocator.java
+++ b/samza-core/src/main/java/org/apache/samza/clustermanager/AbstractContainerAllocator.java
@@ -145,7 +145,7 @@ public abstract class AbstractContainerAllocator implements Runnable {
 
     // Update state
     resourceRequestState.updateStateAfterAssignment(request, preferredHost, resource);
-    int containerID = request.getContainerID();
+    String containerID = request.getContainerID();
 
     //run container on resource
     log.info("Found available resources on {}. Assigning request for container_id {} with "
@@ -176,9 +176,9 @@ public abstract class AbstractContainerAllocator implements Runnable {
    *                                - when host-affinity is not enabled, or
    *                                - when host-affinity is enabled and job is run for the first time
    */
-  public void requestResources(Map<Integer, String> resourceToHostMappings) {
-    for (Map.Entry<Integer, String> entry : resourceToHostMappings.entrySet()) {
-      int containerId = entry.getKey();
+  public void requestResources(Map<String, String> resourceToHostMappings) {
+    for (Map.Entry<String, String> entry : resourceToHostMappings.entrySet()) {
+      String containerId = entry.getKey();
       String preferredHost = entry.getValue();
       if (preferredHost == null)
         preferredHost = ResourceRequestState.ANY_HOST;
@@ -211,7 +211,7 @@ public abstract class AbstractContainerAllocator implements Runnable {
    *                            this request
    * @param preferredHost Name of the host that you prefer to run the container on
    */
-  public final void requestResource(int containerID, String preferredHost) {
+  public final void requestResource(String containerID, String preferredHost) {
     SamzaResourceRequest request = new SamzaResourceRequest(this.containerNumCpuCores, this.containerMemoryMb,
         preferredHost, containerID);
     resourceRequestState.addResourceRequest(request);
@@ -242,7 +242,7 @@ public abstract class AbstractContainerAllocator implements Runnable {
    * @param samzaContainerId to configure the builder with.
    * @return the constructed builder object
    */
-  private CommandBuilder getCommandBuilder(int samzaContainerId) {
+  private CommandBuilder getCommandBuilder(String samzaContainerId) {
     String cmdBuilderClassName = taskConfig.getCommandClass(ShellCommandBuilder.class.getName());
     CommandBuilder cmdBuilder = (CommandBuilder) Util.getObj(cmdBuilderClassName);
 

http://git-wip-us.apache.org/repos/asf/samza/blob/a7da1840/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerProcessManager.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerProcessManager.java b/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerProcessManager.java
index b4309d9..9b5e871 100644
--- a/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerProcessManager.java
+++ b/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerProcessManager.java
@@ -93,7 +93,7 @@ public class ContainerProcessManager implements ClusterResourceManager.Callback
    * value is the {@link ResourceFailure} object that has a count of failures.
    *
    */
-  private final Map<Integer, ResourceFailure> containerFailures = new HashMap<>();
+  private final Map<String, ResourceFailure> containerFailures = new HashMap<>();
 
   private final ContainerProcessManagerMetrics metrics;
 
@@ -173,7 +173,7 @@ public class ContainerProcessManager implements ClusterResourceManager.Callback
     state.neededContainers.set(containerCount);
 
     // Request initial set of containers
-    Map<Integer, String> containerToHostMapping = state.jobModelManager.jobModel().getAllContainerLocality();
+    Map<String, String> containerToHostMapping = state.jobModelManager.jobModel().getAllContainerLocality();
 
     containerAllocator.requestResources(containerToHostMapping);
 
@@ -228,8 +228,8 @@ public class ContainerProcessManager implements ClusterResourceManager.Callback
    */
   public void onResourceCompleted(SamzaResourceStatus containerStatus) {
     String containerIdStr = containerStatus.getResourceID();
-    int containerId = -1;
-    for (Map.Entry<Integer, SamzaResource> entry: state.runningContainers.entrySet()) {
+    String containerId = null;
+    for (Map.Entry<String, SamzaResource> entry: state.runningContainers.entrySet()) {
       if (entry.getValue().getResourceID().equals(containerStatus.getResourceID())) {
         log.info("Matching container ID found " + entry.getKey() + " " + entry.getValue());
 
@@ -237,10 +237,11 @@ public class ContainerProcessManager implements ClusterResourceManager.Callback
         break;
       }
     }
-    if (containerId == -1) {
+    if (containerId == null) {
       log.info("No matching container id found for " + containerStatus.toString());
+    } else {
+      state.runningContainers.remove(containerId);
     }
-    state.runningContainers.remove(containerId);
 
     int exitStatus = containerStatus.getExitCode();
     switch (exitStatus) {
@@ -249,7 +250,7 @@ public class ContainerProcessManager implements ClusterResourceManager.Callback
 
         state.completedContainers.incrementAndGet();
 
-        if (containerId != -1) {
+        if (containerId != null) {
           state.finishedContainers.incrementAndGet();
           containerFailures.remove(containerId);
         }
@@ -275,7 +276,7 @@ public class ContainerProcessManager implements ClusterResourceManager.Callback
         // clean up, and request a refactor container for the tasks. This only
         // should happen if the container was 'lost' due to node failure, not
         // if the AM released the container.
-        if (containerId != -1) {
+        if (containerId != null) {
           log.info("Released container {} was assigned task group ID {}. Requesting a refactor container for the task group.", containerIdStr, containerId);
 
           state.neededContainers.incrementAndGet();
@@ -295,7 +296,7 @@ public class ContainerProcessManager implements ClusterResourceManager.Callback
         state.failedContainersStatus.put(containerIdStr, containerStatus);
         state.jobHealthy.set(false);
 
-        if (containerId != -1) {
+        if (containerId != null) {
           state.neededContainers.incrementAndGet();
           // Find out previously running container location
           String lastSeenOn = state.jobModelManager.jobModel().getContainerToHostValue(containerId, SetContainerHostMapping.HOST_KEY);

http://git-wip-us.apache.org/repos/asf/samza/blob/a7da1840/samza-core/src/main/java/org/apache/samza/clustermanager/HostAwareContainerAllocator.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/clustermanager/HostAwareContainerAllocator.java b/samza-core/src/main/java/org/apache/samza/clustermanager/HostAwareContainerAllocator.java
index da73049..66e2246 100644
--- a/samza-core/src/main/java/org/apache/samza/clustermanager/HostAwareContainerAllocator.java
+++ b/samza-core/src/main/java/org/apache/samza/clustermanager/HostAwareContainerAllocator.java
@@ -64,7 +64,7 @@ public class HostAwareContainerAllocator extends AbstractContainerAllocator {
       SamzaResourceRequest request = peekPendingRequest();
       log.info("Handling request: " + request.getContainerID() + " " + request.getRequestTimestampMs() + " " + request.getPreferredHost());
       String preferredHost = request.getPreferredHost();
-      int containerID = request.getContainerID();
+      String containerID = request.getContainerID();
 
       if (hasAllocatedResource(preferredHost)) {
         // Found allocated container at preferredHost

http://git-wip-us.apache.org/repos/asf/samza/blob/a7da1840/samza-core/src/main/java/org/apache/samza/clustermanager/SamzaApplicationState.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/clustermanager/SamzaApplicationState.java b/samza-core/src/main/java/org/apache/samza/clustermanager/SamzaApplicationState.java
index cf91044..bde3fac 100644
--- a/samza-core/src/main/java/org/apache/samza/clustermanager/SamzaApplicationState.java
+++ b/samza-core/src/main/java/org/apache/samza/clustermanager/SamzaApplicationState.java
@@ -99,7 +99,7 @@ public class SamzaApplicationState {
    *  Map of the samzaContainerId to the {@link SamzaResource} on which it is running
    *  Modified by both the AMRMCallbackThread and the ContainerAllocator thread
    */
-  public final ConcurrentMap<Integer, SamzaResource> runningContainers = new ConcurrentHashMap<Integer, SamzaResource>(0);
+  public final ConcurrentMap<String, SamzaResource> runningContainers = new ConcurrentHashMap<String, SamzaResource>(0);
 
   /**
    * Final status of the application

http://git-wip-us.apache.org/repos/asf/samza/blob/a7da1840/samza-core/src/main/java/org/apache/samza/clustermanager/SamzaResourceRequest.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/clustermanager/SamzaResourceRequest.java b/samza-core/src/main/java/org/apache/samza/clustermanager/SamzaResourceRequest.java
index 3d1560f..4159ff2 100644
--- a/samza-core/src/main/java/org/apache/samza/clustermanager/SamzaResourceRequest.java
+++ b/samza-core/src/main/java/org/apache/samza/clustermanager/SamzaResourceRequest.java
@@ -55,14 +55,14 @@ public class SamzaResourceRequest implements Comparable<SamzaResourceRequest> {
   /**
    * The ID of the StreamProcessor which this request is for.
    */
-  private final int containerID;
+  private final String containerID;
 
   /**
    * The timestamp in millis when the request was created.
    */
   private final long requestTimestampMs;
 
-  public SamzaResourceRequest(int numCores, int memoryMB, String preferredHost, int expectedContainerID) {
+  public SamzaResourceRequest(int numCores, int memoryMB, String preferredHost, String expectedContainerID) {
     this.numCores = numCores;
     this.memoryMB = memoryMB;
     this.preferredHost = preferredHost;
@@ -72,7 +72,7 @@ public class SamzaResourceRequest implements Comparable<SamzaResourceRequest> {
     log.info("Resource Request created for {} on {} at {}", new Object[] {this.containerID, this.preferredHost, this.requestTimestampMs});
   }
 
-  public int getContainerID() {
+  public String getContainerID() {
     return containerID;
   }
 

http://git-wip-us.apache.org/repos/asf/samza/blob/a7da1840/samza-core/src/main/java/org/apache/samza/config/ApplicationConfig.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/config/ApplicationConfig.java b/samza-core/src/main/java/org/apache/samza/config/ApplicationConfig.java
new file mode 100644
index 0000000..708daa6
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/config/ApplicationConfig.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.config;
+
+/**
+ * Accessors for configs associated with Application scope
+ */
+public class ApplicationConfig extends MapConfig {
+  /**
+   * <p>processor.id is similar to the logical containerId generated in Samza. However, in addition to identifying the JVM
+   * of the processor, it also contains a segment to identify the instance of the
+   * {@link org.apache.samza.processor.StreamProcessor} within the JVM. More detail can be found in
+   * {@link org.apache.samza.runtime.ProcessorIdGenerator}. </p>
+   * <p>
+   * This is an important distinction because Samza 0.13.0 in Yarn has a 1:1 mapping between the processor and the Yarn
+   * container (JVM). However, Samza in an embedded execution can contain more than one processor within the same JVM.
+   * </p>
+   * <b>Note:</b>This identifier has to be unique across the instances of StreamProcessors.
+   * TODO: Deprecated in 0.13. After 0.13+, this id is generated using {@link org.apache.samza.runtime.ProcessorIdGenerator}
+   */
+  @Deprecated
+  public static final String PROCESSOR_ID = "processor.id";
+
+  /**
+   * Class implementing the {@link org.apache.samza.runtime.ProcessorIdGenerator} interface
+   * Used to generate a unique identifier for a {@link org.apache.samza.processor.StreamProcessor} based on the runtime
+   * environment. Hence, durability of the identifier is same as the guarantees provided by the runtime environment
+   */
+  public static final String APP_PROCESSOR_ID_GENERATOR_CLASS = "app.processor-id-generator.class";
+
+  public ApplicationConfig(Config config) {
+    super(config);
+  }
+
+  public String getAppProcessorIdGeneratorClass() {
+    return get(APP_PROCESSOR_ID_GENERATOR_CLASS, null);
+  }
+
+  @Deprecated
+  public String getProcessorId() {
+    return get(PROCESSOR_ID, null);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/a7da1840/samza-core/src/main/java/org/apache/samza/container/LocalityManager.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/container/LocalityManager.java b/samza-core/src/main/java/org/apache/samza/container/LocalityManager.java
index a615d4f..22380d3 100644
--- a/samza-core/src/main/java/org/apache/samza/container/LocalityManager.java
+++ b/samza-core/src/main/java/org/apache/samza/container/LocalityManager.java
@@ -37,7 +37,7 @@ import org.apache.samza.coordinator.stream.messages.SetContainerHostMapping;
  * */
 public class LocalityManager extends AbstractCoordinatorStreamManager {
   private static final Logger log = LoggerFactory.getLogger(LocalityManager.class);
-  private Map<Integer, Map<String, String>> containerToHostMapping = new HashMap<>();
+  private Map<String, Map<String, String>> containerToHostMapping = new HashMap<>();
   private final TaskAssignmentManager taskAssignmentManager;
   private final boolean writeOnly;
 
@@ -92,23 +92,23 @@ public class LocalityManager extends AbstractCoordinatorStreamManager {
    *
    * @return the map of containerId: (hostname, jmxAddress, jmxTunnelAddress)
    */
-  public Map<Integer, Map<String, String>> readContainerLocality() {
+  public Map<String, Map<String, String>> readContainerLocality() {
     if (this.writeOnly) {
       throw new UnsupportedOperationException("Read container locality function is not supported in write-only LocalityManager");
     }
 
-    Map<Integer, Map<String, String>> allMappings = new HashMap<>();
+    Map<String, Map<String, String>> allMappings = new HashMap<>();
     for (CoordinatorStreamMessage message: getBootstrappedStream(SetContainerHostMapping.TYPE)) {
       SetContainerHostMapping mapping = new SetContainerHostMapping(message);
       Map<String, String> localityMappings = new HashMap<>();
       localityMappings.put(SetContainerHostMapping.HOST_KEY, mapping.getHostLocality());
       localityMappings.put(SetContainerHostMapping.JMX_URL_KEY, mapping.getJmxUrl());
       localityMappings.put(SetContainerHostMapping.JMX_TUNNELING_URL_KEY, mapping.getJmxTunnelingUrl());
-      allMappings.put(Integer.parseInt(mapping.getKey()), localityMappings);
+      allMappings.put(mapping.getKey(), localityMappings);
     }
     containerToHostMapping = Collections.unmodifiableMap(allMappings);
 
-    for (Map.Entry<Integer, Map<String, String>> entry : containerToHostMapping.entrySet()) {
+    for (Map.Entry<String, Map<String, String>> entry : containerToHostMapping.entrySet()) {
       log.debug(String.format("Locality for container %s: %s", entry.getKey(), entry.getValue()));
     }
 
@@ -123,7 +123,7 @@ public class LocalityManager extends AbstractCoordinatorStreamManager {
    * @param jmxAddress  the JMX URL address
    * @param jmxTunnelingAddress  the JMX Tunnel URL address
    */
-  public void writeContainerToHostMapping(Integer containerId, String hostName, String jmxAddress, String jmxTunnelingAddress) {
+  public void writeContainerToHostMapping(String containerId, String hostName, String jmxAddress, String jmxTunnelingAddress) {
     Map<String, String> existingMappings = containerToHostMapping.get(containerId);
     String existingHostMapping = existingMappings != null ? existingMappings.get(SetContainerHostMapping.HOST_KEY) : null;
     if (existingHostMapping != null && !existingHostMapping.equals(hostName)) {

http://git-wip-us.apache.org/repos/asf/samza/blob/a7da1840/samza-core/src/main/java/org/apache/samza/container/grouper/task/GroupByContainerCount.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/container/grouper/task/GroupByContainerCount.java b/samza-core/src/main/java/org/apache/samza/container/grouper/task/GroupByContainerCount.java
index 5e6ccf8..246188e 100644
--- a/samza-core/src/main/java/org/apache/samza/container/grouper/task/GroupByContainerCount.java
+++ b/samza-core/src/main/java/org/apache/samza/container/grouper/task/GroupByContainerCount.java
@@ -27,6 +27,8 @@ import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+
+import org.apache.samza.SamzaException;
 import org.apache.samza.container.LocalityManager;
 import org.apache.samza.container.TaskName;
 import org.apache.samza.job.model.ContainerModel;
@@ -42,6 +44,8 @@ import org.slf4j.LoggerFactory;
  * happens to be). No consideration is given towards locality, even distribution
  * of aggregate SSPs within a container, even distribution of the number of
  * taskNames between containers, etc.
+ *
+ * TODO: SAMZA-1197 - need to modify balance to work with processorId strings
  */
 public class GroupByContainerCount implements BalancingTaskNameGrouper {
   private static final Logger log = LoggerFactory.getLogger(GroupByContainerCount.class);
@@ -74,7 +78,7 @@ public class GroupByContainerCount implements BalancingTaskNameGrouper {
     // Convert to a Set of ContainerModel
     Set<ContainerModel> containerModels = new HashSet<>();
     for (int i = 0; i < containerCount; i++) {
-      containerModels.add(new ContainerModel(i, taskGroups[i]));
+      containerModels.add(new ContainerModel(String.valueOf(i), i, taskGroups[i]));
     }
 
     return Collections.unmodifiableSet(containerModels);
@@ -142,7 +146,14 @@ public class GroupByContainerCount implements BalancingTaskNameGrouper {
    *                              if the previous mapping doesn't exist or isn't usable.
    */
   private List<TaskGroup> getPreviousContainers(TaskAssignmentManager taskAssignmentManager, int taskCount) {
-    Map<String, Integer> taskToContainerId = taskAssignmentManager.readTaskAssignment();
+    Map<String, String> taskToContainerId = taskAssignmentManager.readTaskAssignment();
+    taskToContainerId.values().forEach(id -> {
+        try {
+          int intId = Integer.parseInt(id);
+        } catch (NumberFormatException nfe) {
+          throw new SamzaException("GroupByContainerCount cannot handle non-integer processorIds!", nfe);
+        }
+      });
     if (taskToContainerId.isEmpty()) {
       log.info("No task assignment map was saved.");
       return null;
@@ -178,7 +189,7 @@ public class GroupByContainerCount implements BalancingTaskNameGrouper {
   private void saveTaskAssignments(Set<ContainerModel> containers, TaskAssignmentManager taskAssignmentManager) {
     for (ContainerModel container : containers) {
       for (TaskName taskName : container.getTasks().keySet()) {
-        taskAssignmentManager.writeTaskContainerMapping(taskName.getTaskName(), container.getContainerId());
+        taskAssignmentManager.writeTaskContainerMapping(taskName.getTaskName(), container.getProcessorId());
       }
     }
   }
@@ -211,7 +222,7 @@ public class GroupByContainerCount implements BalancingTaskNameGrouper {
   private List<TaskGroup> createContainers(int startContainerId, int endContainerId) {
     List<TaskGroup> containers = new ArrayList<>(endContainerId - startContainerId);
     for (int i = startContainerId; i < endContainerId; i++) {
-      TaskGroup taskGroup = new TaskGroup(i, new ArrayList<String>());
+      TaskGroup taskGroup = new TaskGroup(String.valueOf(i), new ArrayList<String>());
       containers.add(taskGroup);
     }
     return containers;
@@ -225,10 +236,11 @@ public class GroupByContainerCount implements BalancingTaskNameGrouper {
    * @param taskNamesToAssign     the list of tasks to assign to the containers.
    * @param containers            the containers (as {@link TaskGroup}) to which the tasks will be assigned.
    */
+  // TODO: Change logic from using int arrays to a Map<String, Integer> (id -> taskCount)
   private void assignTasksToContainers(int[] taskCountPerContainer, List<String> taskNamesToAssign,
       List<TaskGroup> containers) {
     for (TaskGroup taskGroup : containers) {
-      for (int j = taskGroup.size(); j < taskCountPerContainer[taskGroup.getContainerId()]; j++) {
+      for (int j = taskGroup.size(); j < taskCountPerContainer[Integer.valueOf(taskGroup.getContainerId())]; j++) {
         String taskName = taskNamesToAssign.remove(0);
         taskGroup.addTaskName(taskName);
         log.info("Assigned task {} to container {}", taskName, taskGroup.getContainerId());
@@ -283,7 +295,8 @@ public class GroupByContainerCount implements BalancingTaskNameGrouper {
         TaskModel model = taskNameToModel.get(taskName);
         containerTaskModels.put(model.getTaskName(), model);
       }
-      containerModels.add(new ContainerModel(container.containerId, containerTaskModels));
+      containerModels.add(
+          new ContainerModel(container.containerId, Integer.valueOf(container.containerId), containerTaskModels));
     }
     return Collections.unmodifiableSet(containerModels);
   }
@@ -294,14 +307,14 @@ public class GroupByContainerCount implements BalancingTaskNameGrouper {
    * @param taskToContainerId a map from each task name to the containerId to which it is assigned.
    * @return                  a list of TaskGroups ordered ascending by containerId.
    */
-  private List<TaskGroup> getOrderedContainers(Map<String, Integer> taskToContainerId) {
+  private List<TaskGroup> getOrderedContainers(Map<String, String> taskToContainerId) {
     log.debug("Got task to container map: {}", taskToContainerId);
 
     // Group tasks by container Id
-    HashMap<Integer, List<String>> containerIdToTaskNames = new HashMap<>();
-    for (Map.Entry<String, Integer> entry : taskToContainerId.entrySet()) {
+    HashMap<String, List<String>> containerIdToTaskNames = new HashMap<>();
+    for (Map.Entry<String, String> entry : taskToContainerId.entrySet()) {
       String taskName = entry.getKey();
-      Integer containerId = entry.getValue();
+      String containerId = entry.getValue();
       List<String> taskNames = containerIdToTaskNames.get(containerId);
       if (taskNames == null) {
         taskNames = new ArrayList<>();
@@ -313,8 +326,8 @@ public class GroupByContainerCount implements BalancingTaskNameGrouper {
     // Build container tasks
     List<TaskGroup> containerTasks = new ArrayList<>(containerIdToTaskNames.size());
     for (int i = 0; i < containerIdToTaskNames.size(); i++) {
-      if (containerIdToTaskNames.get(i) == null) throw new IllegalStateException("Task mapping is missing container: " + i);
-      containerTasks.add(new TaskGroup(i, containerIdToTaskNames.get(i)));
+      if (containerIdToTaskNames.get(String.valueOf(i)) == null) throw new IllegalStateException("Task mapping is missing container: " + i);
+      containerTasks.add(new TaskGroup(String.valueOf(i), containerIdToTaskNames.get(String.valueOf(i))));
     }
 
     return containerTasks;
@@ -327,15 +340,15 @@ public class GroupByContainerCount implements BalancingTaskNameGrouper {
    */
   private static class TaskGroup {
     private final List<String> taskNames = new LinkedList<>();
-    private final Integer containerId;
+    private final String containerId;
 
-    private TaskGroup(Integer containerId, List<String> taskNames) {
+    private TaskGroup(String containerId, List<String> taskNames) {
       this.containerId = containerId;
       Collections.sort(taskNames);        // For consistency because the taskNames came from a Map
       this.taskNames.addAll(taskNames);
     }
 
-    public Integer getContainerId() {
+    public String getContainerId() {
       return containerId;
     }
 

http://git-wip-us.apache.org/repos/asf/samza/blob/a7da1840/samza-core/src/main/java/org/apache/samza/container/grouper/task/GroupByContainerIds.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/container/grouper/task/GroupByContainerIds.java b/samza-core/src/main/java/org/apache/samza/container/grouper/task/GroupByContainerIds.java
index 6d3f673..f2d88cd 100644
--- a/samza-core/src/main/java/org/apache/samza/container/grouper/task/GroupByContainerIds.java
+++ b/samza-core/src/main/java/org/apache/samza/container/grouper/task/GroupByContainerIds.java
@@ -52,14 +52,14 @@ public class GroupByContainerIds implements TaskNameGrouper {
     if (startContainerCount > tasks.size())
       throw new IllegalArgumentException("number of containers="  + startContainerCount + " is bigger than number of tasks=" + tasks.size());
 
-    List<Integer> containerIds = new ArrayList<>(startContainerCount);
+    List<String> containerIds = new ArrayList<>(startContainerCount);
     for (int i = 0; i < startContainerCount; i++) {
-      containerIds.add(i);
+      containerIds.add(String.valueOf(i));
     }
     return group(tasks, containerIds);
   }
 
-  public Set<ContainerModel> group(Set<TaskModel> tasks, List<Integer> containersIds) {
+  public Set<ContainerModel> group(Set<TaskModel> tasks, List<String> containersIds) {
     if (tasks.isEmpty())
       throw new IllegalArgumentException("cannot group an empty set. containersIds=" + Arrays
           .toString(containersIds.toArray()));
@@ -89,7 +89,9 @@ public class GroupByContainerIds implements TaskNameGrouper {
     // Convert to a Set of ContainerModel
     Set<ContainerModel> containerModels = new HashSet<>();
     for (int i = 0; i < containerCount; i++) {
-      containerModels.add(new ContainerModel(containersIds.get(i), taskGroups[i]));
+      // containerId in ContainerModel constructor is set to -1 because processorId can be any string and does
+      // not have an integer equivalent. So, we set it to -1. After 0.13, this parameter will be removed.
+      containerModels.add(new ContainerModel(containersIds.get(i), -1, taskGroups[i]));
     }
 
     return Collections.unmodifiableSet(containerModels);

http://git-wip-us.apache.org/repos/asf/samza/blob/a7da1840/samza-core/src/main/java/org/apache/samza/container/grouper/task/SingleContainerGrouperFactory.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/container/grouper/task/SingleContainerGrouperFactory.java b/samza-core/src/main/java/org/apache/samza/container/grouper/task/SingleContainerGrouperFactory.java
index 980f2a9..15cd224 100644
--- a/samza-core/src/main/java/org/apache/samza/container/grouper/task/SingleContainerGrouperFactory.java
+++ b/samza-core/src/main/java/org/apache/samza/container/grouper/task/SingleContainerGrouperFactory.java
@@ -33,14 +33,14 @@ import java.util.Set;
 public class SingleContainerGrouperFactory implements TaskNameGrouperFactory {
   @Override
   public TaskNameGrouper build(Config config) {
-    return new SingleContainerGrouper(config.getInt(JobConfig.PROCESSOR_ID()));
+    return new SingleContainerGrouper(config.get(JobConfig.PROCESSOR_ID()));
   }
 }
 
 class SingleContainerGrouper implements TaskNameGrouper {
-  private final int containerId;
+  private final String containerId;
 
-  SingleContainerGrouper(int containerId) {
+  SingleContainerGrouper(String containerId) {
     this.containerId = containerId;
   }
 
@@ -50,7 +50,7 @@ class SingleContainerGrouper implements TaskNameGrouper {
     for (TaskModel taskModel: taskModels) {
       taskNameTaskModelMap.put(taskModel.getTaskName(), taskModel);
     }
-    ContainerModel containerModel = new ContainerModel(containerId, taskNameTaskModelMap);
+    ContainerModel containerModel = new ContainerModel(containerId, -1, taskNameTaskModelMap);
     return Collections.singleton(containerModel);
   }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/a7da1840/samza-core/src/main/java/org/apache/samza/container/grouper/task/TaskAssignmentManager.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/container/grouper/task/TaskAssignmentManager.java b/samza-core/src/main/java/org/apache/samza/container/grouper/task/TaskAssignmentManager.java
index 11207b2..d33a22b 100644
--- a/samza-core/src/main/java/org/apache/samza/container/grouper/task/TaskAssignmentManager.java
+++ b/samza-core/src/main/java/org/apache/samza/container/grouper/task/TaskAssignmentManager.java
@@ -38,7 +38,7 @@ import org.slf4j.LoggerFactory;
  * */
 public class TaskAssignmentManager extends AbstractCoordinatorStreamManager {
   private static final Logger log = LoggerFactory.getLogger(TaskAssignmentManager.class);
-  private final Map<String, Integer> taskNameToContainerId = new HashMap<>();
+  private final Map<String, String> taskNameToContainerId = new HashMap<>();
   private boolean registered = false;
 
   /**
@@ -70,7 +70,7 @@ public class TaskAssignmentManager extends AbstractCoordinatorStreamManager {
    *
    * @return the map of taskName: containerId
    */
-  public Map<String, Integer> readTaskAssignment() {
+  public Map<String, String> readTaskAssignment() {
     taskNameToContainerId.clear();
     for (CoordinatorStreamMessage message: getBootstrappedStream(SetTaskContainerMapping.TYPE)) {
       if (message.isDelete()) {
@@ -83,7 +83,7 @@ public class TaskAssignmentManager extends AbstractCoordinatorStreamManager {
       }
     }
 
-    for (Map.Entry<String, Integer> entry : taskNameToContainerId.entrySet()) {
+    for (Map.Entry<String, String> entry : taskNameToContainerId.entrySet()) {
       log.debug("Assignment for task \"{}\": {}", entry.getKey(), entry.getValue());
     }
 
@@ -96,8 +96,8 @@ public class TaskAssignmentManager extends AbstractCoordinatorStreamManager {
    * @param taskName    the task name
    * @param containerId the SamzaContainer ID or {@code null} to delete the mapping
    */
-  public void writeTaskContainerMapping(String taskName, Integer containerId) {
-    Integer existingContainerId = taskNameToContainerId.get(taskName);
+  public void writeTaskContainerMapping(String taskName, String containerId) {
+    String existingContainerId = taskNameToContainerId.get(taskName);
     if (existingContainerId != null && !existingContainerId.equals(containerId)) {
       log.info("Task \"{}\" moved from container {} to container {}", new Object[]{taskName, existingContainerId, containerId});
     } else {

http://git-wip-us.apache.org/repos/asf/samza/blob/a7da1840/samza-core/src/main/java/org/apache/samza/container/grouper/task/TaskNameGrouper.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/container/grouper/task/TaskNameGrouper.java b/samza-core/src/main/java/org/apache/samza/container/grouper/task/TaskNameGrouper.java
index d06bf62..71b80cc 100644
--- a/samza-core/src/main/java/org/apache/samza/container/grouper/task/TaskNameGrouper.java
+++ b/samza-core/src/main/java/org/apache/samza/container/grouper/task/TaskNameGrouper.java
@@ -52,7 +52,7 @@ public interface TaskNameGrouper {
    */
   Set<ContainerModel> group(Set<TaskModel> tasks);
 
-  default Set<ContainerModel> group(Set<TaskModel> tasks, List<Integer> containersIds) {
+  default Set<ContainerModel> group(Set<TaskModel> tasks, List<String> containersIds) {
     return group(tasks);
   }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/a7da1840/samza-core/src/main/java/org/apache/samza/coordinator/JobCoordinator.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/coordinator/JobCoordinator.java b/samza-core/src/main/java/org/apache/samza/coordinator/JobCoordinator.java
index 252e56b..af2ef6a 100644
--- a/samza-core/src/main/java/org/apache/samza/coordinator/JobCoordinator.java
+++ b/samza-core/src/main/java/org/apache/samza/coordinator/JobCoordinator.java
@@ -23,9 +23,13 @@ import org.apache.samza.job.model.JobModel;
 
 /**
  *  A JobCoordinator is a pluggable module in each process that provides the JobModel and the ID to the StreamProcessor.
- *  In some cases, ID assignment is completely config driven, while in other cases, ID assignment may require
- *  coordination with JobCoordinators of other StreamProcessors.
- *  */
+ *
+ *  It is the responsibility of the JobCoordinator to assign a unique identifier to the StreamProcessor
+ *  based on the underlying environment. In some cases, ID assignment is completely config driven, while in other
+ *  cases, ID assignment may require coordination with JobCoordinators of other StreamProcessors.
+ *
+ *  This interface contains methods required for the StreamProcessor to interact with JobCoordinator.
+ */
 @InterfaceStability.Evolving
 public interface JobCoordinator {
   /**
@@ -55,12 +59,16 @@ public interface JobCoordinator {
    * @throws InterruptedException if the current thread is interrupted while waiting for the JobCoordinator to start-up
    */
   boolean awaitStart(long timeoutMs) throws InterruptedException;
+
   /**
-   * Returns the logical ID assigned to the processor
-   * It is up to the user to ensure that different instances of StreamProcessor within a job have unique processor ID.
-   * @return integer representing the logical processor ID
+   * Returns the identifier assigned to the processor that is local to the instance of StreamProcessor.
+   *
+   * The semantics and format of the identifier returned should adhere to the specification defined in
+   * {@link org.apache.samza.runtime.ProcessorIdGenerator}
+   *
+   * @return String representing a unique logical processor ID
    */
-  int getProcessorId();
+  String getProcessorId();
 
   /**
    * Returns the current JobModel

http://git-wip-us.apache.org/repos/asf/samza/blob/a7da1840/samza-core/src/main/java/org/apache/samza/coordinator/JobCoordinatorFactory.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/coordinator/JobCoordinatorFactory.java b/samza-core/src/main/java/org/apache/samza/coordinator/JobCoordinatorFactory.java
index d15bce1..8553f59 100644
--- a/samza-core/src/main/java/org/apache/samza/coordinator/JobCoordinatorFactory.java
+++ b/samza-core/src/main/java/org/apache/samza/coordinator/JobCoordinatorFactory.java
@@ -26,11 +26,10 @@ import org.apache.samza.processor.SamzaContainerController;
 @InterfaceStability.Evolving
 public interface JobCoordinatorFactory {
   /**
-   * @param processorId Unique identifier for the processor
    * @param config Configs relevant for the JobCoordinator TODO: Separate JC related configs into a "JobCoordinatorConfig"
    * @param containerController Controller interface for starting and stopping container. In future, it may simply
    *                            pause the container and add/remove tasks
    * @return An instance of IJobCoordinator
    */
-  JobCoordinator getJobCoordinator(int processorId, Config config, SamzaContainerController containerController);
+  JobCoordinator getJobCoordinator(Config config, SamzaContainerController containerController);
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/samza/blob/a7da1840/samza-core/src/main/java/org/apache/samza/coordinator/stream/messages/SetTaskContainerMapping.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/coordinator/stream/messages/SetTaskContainerMapping.java b/samza-core/src/main/java/org/apache/samza/coordinator/stream/messages/SetTaskContainerMapping.java
index 431c05d..f8d4d43 100644
--- a/samza-core/src/main/java/org/apache/samza/coordinator/stream/messages/SetTaskContainerMapping.java
+++ b/samza-core/src/main/java/org/apache/samza/coordinator/stream/messages/SetTaskContainerMapping.java
@@ -43,8 +43,9 @@ public class SetTaskContainerMapping extends CoordinatorStreamMessage {
   public static final String TYPE = "set-task-container-assignment";
   public static final String CONTAINER_KEY = "containerId";
 
+
   /**
-   * SteContainerToHostMapping is used to set the container to host mapping information.
+   * SetContainerToHostMapping is used to set the container to host mapping information.
    * @param message which holds the container to host information.
    */
   public SetTaskContainerMapping(CoordinatorStreamMessage message) {
@@ -64,8 +65,8 @@ public class SetTaskContainerMapping extends CoordinatorStreamMessage {
     putMessageValue(CONTAINER_KEY, containerId);
   }
 
-  public Integer getTaskAssignment() {
-    return Integer.parseInt(getMessageValue(CONTAINER_KEY));
+  public String getTaskAssignment() {
+    return getMessageValue(CONTAINER_KEY);
   }
 
 

http://git-wip-us.apache.org/repos/asf/samza/blob/a7da1840/samza-core/src/main/java/org/apache/samza/job/model/ContainerModel.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/job/model/ContainerModel.java b/samza-core/src/main/java/org/apache/samza/job/model/ContainerModel.java
index ed721b1..bd4fa94 100644
--- a/samza-core/src/main/java/org/apache/samza/job/model/ContainerModel.java
+++ b/samza-core/src/main/java/org/apache/samza/job/model/ContainerModel.java
@@ -19,9 +19,10 @@
 
 package org.apache.samza.job.model;
 
+import org.apache.samza.container.TaskName;
+
 import java.util.Collections;
 import java.util.Map;
-import org.apache.samza.container.TaskName;
 
 /**
  * <p>
@@ -35,34 +36,49 @@ import org.apache.samza.container.TaskName;
  * containers have tasks. Each data model contains relevant information, such as
  * an id, partition information, etc.
  * </p>
+ * <p>
+ * <b>Note</b>: This class has a natural ordering that is inconsistent with equals.
+ * </p>
  */
-public class ContainerModel implements Comparable<ContainerModel> {
+public class ContainerModel {
+  @Deprecated
   private final int containerId;
+  private final String processorId;
   private final Map<TaskName, TaskModel> tasks;
 
-  public ContainerModel(int containerId, Map<TaskName, TaskModel> tasks) {
+  public ContainerModel(String processorId, int containerId, Map<TaskName, TaskModel> tasks) {
     this.containerId = containerId;
+    if (processorId == null) {
+      this.processorId = String.valueOf(containerId);
+    } else {
+      this.processorId = processorId;
+    }
     this.tasks = Collections.unmodifiableMap(tasks);
   }
 
+  @Deprecated
   public int getContainerId() {
     return containerId;
   }
 
+  public String getProcessorId() {
+    return processorId;
+  }
+
   public Map<TaskName, TaskModel> getTasks() {
     return tasks;
   }
 
   @Override
   public String toString() {
-    return "ContainerModel [containerId=" + containerId + ", tasks=" + tasks + "]";
+    return "ContainerModel [processorId=" + processorId + ", tasks=" + tasks + "]";
   }
 
   @Override
   public int hashCode() {
     final int prime = 31;
     int result = 1;
-    result = prime * result + containerId;
+    result = prime * result + ((processorId == null) ? 0 : processorId.hashCode());
     result = prime * result + ((tasks == null) ? 0 : tasks.hashCode());
     return result;
   }
@@ -76,7 +92,7 @@ public class ContainerModel implements Comparable<ContainerModel> {
     if (getClass() != obj.getClass())
       return false;
     ContainerModel other = (ContainerModel) obj;
-    if (containerId != other.containerId)
+    if (!processorId.equals(other.processorId))
       return false;
     if (tasks == null) {
       if (other.tasks != null)
@@ -86,7 +102,4 @@ public class ContainerModel implements Comparable<ContainerModel> {
     return true;
   }
 
-  public int compareTo(ContainerModel other) {
-    return containerId - other.getContainerId();
-  }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/a7da1840/samza-core/src/main/java/org/apache/samza/job/model/JobModel.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/job/model/JobModel.java b/samza-core/src/main/java/org/apache/samza/job/model/JobModel.java
index dbd6dcc..dbb3867 100644
--- a/samza-core/src/main/java/org/apache/samza/job/model/JobModel.java
+++ b/samza-core/src/main/java/org/apache/samza/job/model/JobModel.java
@@ -41,24 +41,24 @@ import org.apache.samza.coordinator.stream.messages.SetContainerHostMapping;
 public class JobModel {
   private static final String EMPTY_STRING = "";
   private final Config config;
-  private final Map<Integer, ContainerModel> containers;
+  private final Map<String, ContainerModel> containers;
 
   private final LocalityManager localityManager;
-  private Map<Integer, String> localityMappings = new HashMap<Integer, String>();
+  private Map<String, String> localityMappings = new HashMap<String, String>();
 
   public int maxChangeLogStreamPartitions;
 
-  public JobModel(Config config, Map<Integer, ContainerModel> containers) {
+  public JobModel(Config config, Map<String, ContainerModel> containers) {
     this(config, containers, null);
   }
 
-  public JobModel(Config config, Map<Integer, ContainerModel> containers, LocalityManager localityManager) {
+  public JobModel(Config config, Map<String, ContainerModel> containers, LocalityManager localityManager) {
     this.config = config;
     this.containers = Collections.unmodifiableMap(containers);
     this.localityManager = localityManager;
 
     if (localityManager == null) {
-      for (Integer containerId : containers.keySet()) {
+      for (String containerId : containers.keySet()) {
         localityMappings.put(containerId, null);
       }
     } else {
@@ -89,7 +89,7 @@ public class JobModel {
    * @param key mapping key which is one of the keys declared in {@link org.apache.samza.coordinator.stream.messages.SetContainerHostMapping}
    * @return the value if it exists for a given container and key, otherwise an empty string
    */
-  public String getContainerToHostValue(Integer containerId, String key) {
+  public String getContainerToHostValue(String containerId, String key) {
     if (localityManager == null) {
       return EMPTY_STRING;
     }
@@ -103,12 +103,12 @@ public class JobModel {
     return mappings.get(key);
   }
 
-  public Map<Integer, String> getAllContainerToHostValues(String key) {
+  public Map<String, String> getAllContainerToHostValues(String key) {
     if (localityManager == null) {
       return Collections.EMPTY_MAP;
     }
-    Map<Integer, String> allValues = new HashMap<>();
-    for (Map.Entry<Integer, Map<String, String>> entry : localityManager.readContainerLocality().entrySet()) {
+    Map<String, String> allValues = new HashMap<>();
+    for (Map.Entry<String, Map<String, String>> entry : localityManager.readContainerLocality().entrySet()) {
       String value = entry.getValue().get(key);
       if (value != null) {
         allValues.put(entry.getKey(), value);
@@ -118,8 +118,8 @@ public class JobModel {
   }
 
   private void populateContainerLocalityMappings() {
-    Map<Integer, Map<String, String>> allMappings = localityManager.readContainerLocality();
-    for (Integer containerId: containers.keySet()) {
+    Map<String, Map<String, String>> allMappings = localityManager.readContainerLocality();
+    for (String containerId: containers.keySet()) {
       if (allMappings.containsKey(containerId)) {
         localityMappings.put(containerId, allMappings.get(containerId).get(SetContainerHostMapping.HOST_KEY));
       } else {
@@ -128,14 +128,14 @@ public class JobModel {
     }
   }
 
-  public Map<Integer, String> getAllContainerLocality() {
+  public Map<String, String> getAllContainerLocality() {
     if (localityManager != null) {
       populateContainerLocalityMappings();
     }
     return localityMappings;
   }
 
-  public Map<Integer, ContainerModel> getContainers() {
+  public Map<String, ContainerModel> getContainers() {
     return containers;
   }
 

http://git-wip-us.apache.org/repos/asf/samza/blob/a7da1840/samza-core/src/main/java/org/apache/samza/processor/SamzaContainerController.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/processor/SamzaContainerController.java b/samza-core/src/main/java/org/apache/samza/processor/SamzaContainerController.java
index 76e2053..c292067 100644
--- a/samza-core/src/main/java/org/apache/samza/processor/SamzaContainerController.java
+++ b/samza-core/src/main/java/org/apache/samza/processor/SamzaContainerController.java
@@ -44,7 +44,7 @@ import java.util.concurrent.TimeoutException;
 public class SamzaContainerController {
   private static final Logger log = LoggerFactory.getLogger(SamzaContainerController.class);
 
-  private final ExecutorService executorService;
+  private ExecutorService executorService;
   private volatile SamzaContainer container;
   private final Map<String, MetricsReporter> metricsReporterMap;
   private final Object taskFactory;
@@ -60,16 +60,12 @@ public class SamzaContainerController {
    * @param taskFactory         Factory that be used create instances of {@link org.apache.samza.task.StreamTask} or
    *                            {@link org.apache.samza.task.AsyncStreamTask}
    * @param containerShutdownMs How long the Samza container should wait for an orderly shutdown of task instances
-   * @param processorId         Id of the processor
    * @param metricsReporterMap  Map of metric reporter name and {@link MetricsReporter} instance
    */
   public SamzaContainerController(
       Object taskFactory,
       long containerShutdownMs,
-      String processorId,
       Map<String, MetricsReporter> metricsReporterMap) {
-    this.executorService = Executors.newSingleThreadExecutor(new ThreadFactoryBuilder()
-        .setNameFormat("p" + processorId + "-container-thread-%d").build());
     this.taskFactory = taskFactory;
     this.metricsReporterMap = metricsReporterMap;
     if (containerShutdownMs == -1) {
@@ -94,11 +90,11 @@ public class SamzaContainerController {
   public void startContainer(ContainerModel containerModel, Config config, int maxChangelogStreamPartitions) {
     LocalityManager localityManager = null;
     if (new ClusterManagerConfig(config).getHostAffinityEnabled()) {
-      localityManager = SamzaContainer$.MODULE$.getLocalityManager(containerModel.getContainerId(), config);
+      localityManager = SamzaContainer$.MODULE$.getLocalityManager(containerModel.getProcessorId(), config);
     }
-    log.info("About to create container: " + containerModel.getContainerId());
+    log.info("About to create container: " + containerModel.getProcessorId());
     container = SamzaContainer$.MODULE$.apply(
-        containerModel.getContainerId(),
+        containerModel.getProcessorId(),
         containerModel,
         config,
         maxChangelogStreamPartitions,
@@ -106,7 +102,9 @@ public class SamzaContainerController {
         new JmxServer(),
         Util.<String, MetricsReporter>javaMapAsScalaMap(metricsReporterMap),
         taskFactory);
-    log.info("About to start container: " + containerModel.getContainerId());
+    log.info("About to start container: " + containerModel.getProcessorId());
+    executorService = Executors.newSingleThreadExecutor(new ThreadFactoryBuilder()
+          .setNameFormat("p-" + containerModel.getProcessorId() + "-container-thread-%d").build());
     containerFuture = executorService.submit(() -> container.run());
   }
 
@@ -148,6 +146,8 @@ public class SamzaContainerController {
    */
   public void shutdown() {
     stopContainer();
-    executorService.shutdown();
+    if (executorService != null) {
+      executorService.shutdown();
+    }
   }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/a7da1840/samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java b/samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java
index a39c3b9..5a8673a 100644
--- a/samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java
+++ b/samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java
@@ -19,9 +19,10 @@
 package org.apache.samza.processor;
 
 import org.apache.samza.annotation.InterfaceStability;
+import org.apache.samza.config.ApplicationConfig;
 import org.apache.samza.config.Config;
+import org.apache.samza.config.ConfigException;
 import org.apache.samza.config.JobCoordinatorConfig;
-import org.apache.samza.config.MapConfig;
 import org.apache.samza.config.TaskConfigJava;
 import org.apache.samza.coordinator.JobCoordinator;
 import org.apache.samza.coordinator.JobCoordinatorFactory;
@@ -29,10 +30,7 @@ import org.apache.samza.metrics.MetricsReporter;
 import org.apache.samza.task.AsyncStreamTaskFactory;
 import org.apache.samza.task.StreamTaskFactory;
 import org.apache.samza.util.Util;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
-import java.util.HashMap;
 import java.util.Map;
 
 /**
@@ -60,17 +58,6 @@ import java.util.Map;
  */
 @InterfaceStability.Evolving
 public class StreamProcessor {
-  private static final Logger log = LoggerFactory.getLogger(StreamProcessor.class);
-  /**
-   * processor.id is equivalent to containerId in samza. It is a logical identifier used by Samza for a processor.
-   * In a distributed environment, this logical identifier is mapped to a physical identifier of the resource. For
-   * example, Yarn provides a "containerId" for every resource it allocates.
-   * In an embedded environment, this identifier is provided by the user by directly using the StreamProcessor API.
-   * <p>
-   * <b>Note:</b>This identifier has to be unique across the instances of StreamProcessors.
-   */
-  private static final String PROCESSOR_ID = "processor.id";
-  private final int processorId;
   private final JobCoordinator jobCoordinator;
 
   /**
@@ -83,51 +70,49 @@ public class StreamProcessor {
    * <p>
    * <b>Note:</b> Lifecycle of the ExecutorService is fully managed by the StreamProcessor, and NOT exposed to the user
    *
-   * @param processorId            Unique identifier for a processor within the job. It has the same semantics as
-   *                               "containerId" in Samza
    * @param config                 Instance of config object - contains all configuration required for processing
    * @param customMetricsReporters Map of custom MetricReporter instances that are to be injected in the Samza job
    * @param asyncStreamTaskFactory The {@link AsyncStreamTaskFactory} to be used for creating task instances.
    */
-  public StreamProcessor(int processorId, Config config, Map<String, MetricsReporter> customMetricsReporters,
+  public StreamProcessor(Config config, Map<String, MetricsReporter> customMetricsReporters,
                          AsyncStreamTaskFactory asyncStreamTaskFactory) {
-    this(processorId, config, customMetricsReporters, (Object) asyncStreamTaskFactory);
+    this(config, customMetricsReporters, (Object) asyncStreamTaskFactory);
   }
 
 
   /**
-   *Same as {@link #StreamProcessor(int, Config, Map, AsyncStreamTaskFactory)}, except task instances are created
+   *Same as {@link #StreamProcessor(Config, Map, AsyncStreamTaskFactory)}, except task instances are created
    * using the provided {@link StreamTaskFactory}.
-   * @param processorId - this processor Id
    * @param config - config
    * @param customMetricsReporters metric Reporter
    * @param streamTaskFactory task factory to instantiate the Task
    */
-  public StreamProcessor(int processorId, Config config, Map<String, MetricsReporter> customMetricsReporters,
+  public StreamProcessor(Config config, Map<String, MetricsReporter> customMetricsReporters,
                          StreamTaskFactory streamTaskFactory) {
-    this(processorId, config, customMetricsReporters, (Object) streamTaskFactory);
+    this(config, customMetricsReporters, (Object) streamTaskFactory);
   }
 
-  private StreamProcessor(int processorId, Config config, Map<String, MetricsReporter> customMetricsReporters,
+  private StreamProcessor(Config config, Map<String, MetricsReporter> customMetricsReporters,
                           Object taskFactory) {
-    this.processorId = processorId;
-
-    Map<String, String> updatedConfigMap = new HashMap<>();
-    updatedConfigMap.putAll(config);
-    updatedConfigMap.put(PROCESSOR_ID, String.valueOf(this.processorId));
-    Config updatedConfig = new MapConfig(updatedConfigMap);
+    // TODO: This check to be removed after 0.13+
+    ApplicationConfig applicationConfig = new ApplicationConfig(config);
+    if (applicationConfig.getProcessorId() == null &&
+        applicationConfig.getAppProcessorIdGeneratorClass() == null) {
+      throw new ConfigException(
+          String.format("Expected either %s or %s to be configured", ApplicationConfig.PROCESSOR_ID,
+              ApplicationConfig.APP_PROCESSOR_ID_GENERATOR_CLASS));
+    }
 
     SamzaContainerController containerController = new SamzaContainerController(
         taskFactory,
-        new TaskConfigJava(updatedConfig).getShutdownMs(),
-        String.valueOf(processorId),
+        new TaskConfigJava(config).getShutdownMs(),
         customMetricsReporters);
 
     this.jobCoordinator = Util.
         <JobCoordinatorFactory>getObj(
-            new JobCoordinatorConfig(updatedConfig)
+            new JobCoordinatorConfig(config)
                 .getJobCoordinatorFactoryClassName())
-        .getJobCoordinator(processorId, updatedConfig, containerController);
+        .getJobCoordinator(config, containerController);
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/samza/blob/a7da1840/samza-core/src/main/java/org/apache/samza/runtime/LocalContainerRunner.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/runtime/LocalContainerRunner.java b/samza-core/src/main/java/org/apache/samza/runtime/LocalContainerRunner.java
index 49c3228..d790fb1 100644
--- a/samza-core/src/main/java/org/apache/samza/runtime/LocalContainerRunner.java
+++ b/samza-core/src/main/java/org/apache/samza/runtime/LocalContainerRunner.java
@@ -52,9 +52,9 @@ import org.slf4j.LoggerFactory;
 public class LocalContainerRunner extends AbstractApplicationRunner {
   private static final Logger log = LoggerFactory.getLogger(LocalContainerRunner.class);
   private final JobModel jobModel;
-  private final int containerId;
+  private final String containerId;
 
-  public LocalContainerRunner(JobModel jobModel, int containerId) {
+  public LocalContainerRunner(JobModel jobModel, String containerId) {
     super(jobModel.getConfig());
     this.jobModel = jobModel;
     this.containerId = containerId;
@@ -69,13 +69,13 @@ public class LocalContainerRunner extends AbstractApplicationRunner {
       Object taskFactory = TaskFactoryUtil.createTaskFactory(config, streamApp, this);
 
       SamzaContainer container = SamzaContainer$.MODULE$.apply(
-          containerModel.getContainerId(),
+          containerModel.getProcessorId(),
           containerModel,
           config,
           jobModel.maxChangeLogStreamPartitions,
           SamzaContainer.getLocalityManager(containerId, config),
           jmxServer,
-          Util.javaMapAsScalaMap(new HashMap<String, MetricsReporter>()),
+          Util.<String, MetricsReporter>javaMapAsScalaMap(new HashMap<>()),
           taskFactory);
 
       container.run();
@@ -104,7 +104,7 @@ public class LocalContainerRunner extends AbstractApplicationRunner {
         System.exit(1);
       });
 
-    Integer containerId = Integer.valueOf(System.getenv(ShellCommandConfig.ENV_CONTAINER_ID()));
+    String containerId = System.getenv(ShellCommandConfig.ENV_CONTAINER_ID());
     log.info(String.format("Got container ID: %d", containerId));
     String coordinatorUrl = System.getenv(ShellCommandConfig.ENV_COORDINATOR_URL());
     log.info(String.format("Got coordinator URL: %s", coordinatorUrl));

http://git-wip-us.apache.org/repos/asf/samza/blob/a7da1840/samza-core/src/main/java/org/apache/samza/runtime/UUIDGenerator.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/runtime/UUIDGenerator.java b/samza-core/src/main/java/org/apache/samza/runtime/UUIDGenerator.java
new file mode 100644
index 0000000..afc20b1
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/runtime/UUIDGenerator.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.runtime;
+
+import org.apache.samza.config.Config;
+
+import java.util.UUID;
+
+public class UUIDGenerator implements ProcessorIdGenerator {
+  /**
+   * Generates a String representation to identify the processor instance
+   * This value can be representative of its current executing environment. It can also be custom-managed by the user.
+   * <p>
+   * <b>Note</b>: When more than one processor exist within the same JVM, there is no need to use a static counter in
+   * this generator to adhere to the "$x_$y" format specified in {@link ProcessorIdGenerator} since each UUID is already
+   * unique by itself
+   *
+   * @param config Config instance
+   * @return String Identifier for the processor
+   */
+  @Override
+  public String generateProcessorId(Config config) {
+    return UUID.randomUUID().toString();
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/a7da1840/samza-core/src/main/java/org/apache/samza/serializers/model/JsonContainerModelMixIn.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/serializers/model/JsonContainerModelMixIn.java b/samza-core/src/main/java/org/apache/samza/serializers/model/JsonContainerModelMixIn.java
index f197a95..e19afec 100644
--- a/samza-core/src/main/java/org/apache/samza/serializers/model/JsonContainerModelMixIn.java
+++ b/samza-core/src/main/java/org/apache/samza/serializers/model/JsonContainerModelMixIn.java
@@ -19,23 +19,28 @@
 
 package org.apache.samza.serializers.model;
 
-import java.util.Map;
 import org.apache.samza.container.TaskName;
 import org.apache.samza.job.model.TaskModel;
 import org.codehaus.jackson.annotate.JsonCreator;
 import org.codehaus.jackson.annotate.JsonProperty;
 
+import java.util.Map;
+
 /**
  * A mix-in Jackson class to convert Samza's ContainerModel to/from JSON.
  */
 public abstract class JsonContainerModelMixIn {
   @JsonCreator
-  public JsonContainerModelMixIn(@JsonProperty("container-id") int containerId, @JsonProperty("tasks") Map<TaskName, TaskModel> tasks) {
+  public JsonContainerModelMixIn(@JsonProperty("processor-id") String processorId, @JsonProperty("tasks") Map<TaskName, TaskModel> tasks) {
   }
 
+  @Deprecated
   @JsonProperty("container-id")
   abstract int getContainerId();
 
+  @JsonProperty("processor-id")
+  abstract String getProcessorId();
+
   @JsonProperty("tasks")
   abstract Map<TaskName, TaskModel> getTasks();
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/a7da1840/samza-core/src/main/java/org/apache/samza/serializers/model/JsonJobModelMixIn.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/serializers/model/JsonJobModelMixIn.java b/samza-core/src/main/java/org/apache/samza/serializers/model/JsonJobModelMixIn.java
index 037b5e2..4b0c404 100644
--- a/samza-core/src/main/java/org/apache/samza/serializers/model/JsonJobModelMixIn.java
+++ b/samza-core/src/main/java/org/apache/samza/serializers/model/JsonJobModelMixIn.java
@@ -30,12 +30,12 @@ import org.codehaus.jackson.annotate.JsonProperty;
  */
 public abstract class JsonJobModelMixIn {
   @JsonCreator
-  public JsonJobModelMixIn(@JsonProperty("config") Config config, @JsonProperty("containers") Map<Integer, ContainerModel> containers) {
+  public JsonJobModelMixIn(@JsonProperty("config") Config config, @JsonProperty("containers") Map<String, ContainerModel> containers) {
   }
 
   @JsonProperty("config")
   abstract Config getConfig();
 
   @JsonProperty("containers")
-  abstract Map<Integer, ContainerModel> getContainers();
+  abstract Map<String, ContainerModel> getContainers();
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/samza/blob/a7da1840/samza-core/src/main/java/org/apache/samza/serializers/model/SamzaObjectMapper.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/serializers/model/SamzaObjectMapper.java b/samza-core/src/main/java/org/apache/samza/serializers/model/SamzaObjectMapper.java
index 83e6b8c..f8c4d43 100644
--- a/samza-core/src/main/java/org/apache/samza/serializers/model/SamzaObjectMapper.java
+++ b/samza-core/src/main/java/org/apache/samza/serializers/model/SamzaObjectMapper.java
@@ -19,10 +19,8 @@
 
 package org.apache.samza.serializers.model;
 
-import java.io.IOException;
-import java.util.HashMap;
-import java.util.Map;
 import org.apache.samza.Partition;
+import org.apache.samza.SamzaException;
 import org.apache.samza.config.Config;
 import org.apache.samza.config.MapConfig;
 import org.apache.samza.container.TaskName;
@@ -50,6 +48,10 @@ import org.codehaus.jackson.map.introspect.AnnotatedMethod;
 import org.codehaus.jackson.map.module.SimpleModule;
 import org.codehaus.jackson.type.TypeReference;
 
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
 /**
  * <p>
  * A collection of utility classes and (de)serializers to make Samza's job model
@@ -89,10 +91,30 @@ public class SamzaObjectMapper {
     mapper.getSerializationConfig().addMixInAnnotations(TaskModel.class, JsonTaskModelMixIn.class);
     mapper.getDeserializationConfig().addMixInAnnotations(TaskModel.class, JsonTaskModelMixIn.class);
     mapper.getSerializationConfig().addMixInAnnotations(ContainerModel.class, JsonContainerModelMixIn.class);
-    mapper.getDeserializationConfig().addMixInAnnotations(ContainerModel.class, JsonContainerModelMixIn.class);
     mapper.getSerializationConfig().addMixInAnnotations(JobModel.class, JsonJobModelMixIn.class);
     mapper.getDeserializationConfig().addMixInAnnotations(JobModel.class, JsonJobModelMixIn.class);
 
+    module.addDeserializer(ContainerModel.class, new JsonDeserializer<ContainerModel>() {
+      @Override
+      public ContainerModel deserialize(JsonParser jp, DeserializationContext ctxt) throws IOException, JsonProcessingException {
+        ObjectCodec oc = jp.getCodec();
+        JsonNode node = oc.readTree(jp);
+        int containerId = node.get("container-id").getIntValue();
+        if (node.get("container-id") == null) {
+          throw new SamzaException("JobModel did not contain a container-id. This can never happen. JobModel corrupt!");
+        }
+        String processorId;
+        if (node.get("processor-id") == null) {
+          processorId = String.valueOf(containerId);
+        } else {
+          processorId = node.get("processor-id").getTextValue();
+        }
+        Map<TaskName, TaskModel> tasksMapping =
+            OBJECT_MAPPER.readValue(node.get("tasks"), new TypeReference<Map<TaskName, TaskModel>>() { });
+        return new ContainerModel(processorId, containerId, tasksMapping);
+      }
+    });
+
     // Convert camel case to hyphenated field names, and register the module.
     mapper.setPropertyNamingStrategy(new CamelCaseToDashesStrategy());
     mapper.registerModule(module);

http://git-wip-us.apache.org/repos/asf/samza/blob/a7da1840/samza-core/src/main/java/org/apache/samza/standalone/StandaloneJobCoordinator.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/standalone/StandaloneJobCoordinator.java b/samza-core/src/main/java/org/apache/samza/standalone/StandaloneJobCoordinator.java
index b2927f4..7efc6df 100644
--- a/samza-core/src/main/java/org/apache/samza/standalone/StandaloneJobCoordinator.java
+++ b/samza-core/src/main/java/org/apache/samza/standalone/StandaloneJobCoordinator.java
@@ -19,22 +19,25 @@
 package org.apache.samza.standalone;
 
 import com.google.common.annotations.VisibleForTesting;
-import java.util.Collections;
 import org.apache.samza.SamzaException;
+import org.apache.samza.config.ApplicationConfig;
 import org.apache.samza.config.Config;
 import org.apache.samza.config.JavaSystemConfig;
 import org.apache.samza.coordinator.JobCoordinator;
 import org.apache.samza.coordinator.JobModelManager;
 import org.apache.samza.job.model.JobModel;
 import org.apache.samza.processor.SamzaContainerController;
+import org.apache.samza.runtime.ProcessorIdGenerator;
 import org.apache.samza.system.StreamMetadataCache;
 import org.apache.samza.system.SystemAdmin;
 import org.apache.samza.system.SystemFactory;
+import org.apache.samza.util.ClassLoaderHelper;
 import org.apache.samza.util.SystemClock;
 import org.apache.samza.util.Util;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
 
@@ -63,28 +66,37 @@ import java.util.Map;
  * */
 public class StandaloneJobCoordinator implements JobCoordinator {
   private static final Logger log = LoggerFactory.getLogger(StandaloneJobCoordinator.class);
-  private final int processorId;
+  private final String processorId;
   private final Config config;
   private final JobModel jobModel;
   private final SamzaContainerController containerController;
 
   @VisibleForTesting
   StandaloneJobCoordinator(
-      int processorId,
+      ProcessorIdGenerator processorIdGenerator,
       Config config,
       SamzaContainerController containerController,
       JobModel jobModel) {
-    this.processorId = processorId;
+    this.processorId = processorIdGenerator.generateProcessorId(config);
     this.config = config;
     this.containerController = containerController;
     this.jobModel = jobModel;
   }
 
-  public StandaloneJobCoordinator(int processorId, Config config, SamzaContainerController containerController) {
-    this.processorId = processorId;
+  public StandaloneJobCoordinator(Config config, SamzaContainerController containerController) {
     this.config = config;
     this.containerController = containerController;
 
+    ApplicationConfig appConfig = new ApplicationConfig(config);
+    if (appConfig.getProcessorId() != null) {     // TODO: This check to be removed after 0.13+
+      this.processorId = appConfig.getProcessorId();
+    } else {
+      ProcessorIdGenerator idGenerator =
+          ClassLoaderHelper.fromClassName(
+              new ApplicationConfig(config).getAppProcessorIdGeneratorClass(), ProcessorIdGenerator.class);
+      this.processorId = idGenerator.generateProcessorId(config);
+    }
+
     JavaSystemConfig systemConfig = new JavaSystemConfig(this.config);
     Map<String, SystemAdmin> systemAdmins = new HashMap<>();
     for (String systemName: systemConfig.getSystemNames()) {
@@ -113,7 +125,7 @@ public class StandaloneJobCoordinator implements JobCoordinator {
     // No-op
     JobModel jobModel = getJobModel();
     containerController.startContainer(
-        jobModel.getContainers().get(processorId),
+        jobModel.getContainers().get(getProcessorId()),
         jobModel.getConfig(),
         jobModel.maxChangeLogStreamPartitions);
   }
@@ -137,8 +149,8 @@ public class StandaloneJobCoordinator implements JobCoordinator {
   }
 
   @Override
-  public int getProcessorId() {
-    return this.processorId;
+  public String getProcessorId() {
+    return processorId;
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/samza/blob/a7da1840/samza-core/src/main/java/org/apache/samza/standalone/StandaloneJobCoordinatorFactory.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/standalone/StandaloneJobCoordinatorFactory.java b/samza-core/src/main/java/org/apache/samza/standalone/StandaloneJobCoordinatorFactory.java
index 7ca85c0..eada6e9 100644
--- a/samza-core/src/main/java/org/apache/samza/standalone/StandaloneJobCoordinatorFactory.java
+++ b/samza-core/src/main/java/org/apache/samza/standalone/StandaloneJobCoordinatorFactory.java
@@ -25,7 +25,7 @@ import org.apache.samza.processor.SamzaContainerController;
 
 public class StandaloneJobCoordinatorFactory  implements JobCoordinatorFactory {
   @Override
-  public JobCoordinator getJobCoordinator(int processorId, Config config, SamzaContainerController containerController) {
-    return new StandaloneJobCoordinator(processorId, config, containerController);
+  public JobCoordinator getJobCoordinator(Config config, SamzaContainerController containerController) {
+    return new StandaloneJobCoordinator(config, containerController);
   }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/samza/blob/a7da1840/samza-core/src/main/java/org/apache/samza/storage/StorageRecovery.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/storage/StorageRecovery.java b/samza-core/src/main/java/org/apache/samza/storage/StorageRecovery.java
index 9471a23..e50f221 100644
--- a/samza-core/src/main/java/org/apache/samza/storage/StorageRecovery.java
+++ b/samza-core/src/main/java/org/apache/samza/storage/StorageRecovery.java
@@ -65,7 +65,7 @@ public class StorageRecovery extends CommandLine {
   private HashMap<String, StorageEngineFactory<?, ?>> storageEngineFactories = new HashMap<String, StorageEngineFactory<?, ?>>();
   private HashMap<String, SystemFactory> systemFactories = new HashMap<String, SystemFactory>();
   private HashMap<String, SystemAdmin> systemAdmins = new HashMap<String, SystemAdmin>();
-  private Map<Integer, ContainerModel> containers = new HashMap<Integer, ContainerModel>();
+  private Map<String, ContainerModel> containers = new HashMap<String, ContainerModel>();
   private List<TaskStorageManager> taskStorageManagers = new ArrayList<TaskStorageManager>();
   private Logger log = LoggerFactory.getLogger(StorageRecovery.class);
 
@@ -211,7 +211,7 @@ public class StorageRecovery extends CommandLine {
 
     for (ContainerModel containerModel : containers.values()) {
       HashMap<String, StorageEngine> taskStores = new HashMap<String, StorageEngine>();
-      SamzaContainerContext containerContext = new SamzaContainerContext(containerModel.getContainerId(), jobConfig, containerModel.getTasks()
+      SamzaContainerContext containerContext = new SamzaContainerContext(containerModel.getProcessorId(), jobConfig, containerModel.getTasks()
           .keySet());
 
       for (TaskModel taskModel : containerModel.getTasks().values()) {

http://git-wip-us.apache.org/repos/asf/samza/blob/a7da1840/samza-core/src/main/java/org/apache/samza/util/ClassLoaderHelper.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/util/ClassLoaderHelper.java b/samza-core/src/main/java/org/apache/samza/util/ClassLoaderHelper.java
index f2b389b..3680b4f 100644
--- a/samza-core/src/main/java/org/apache/samza/util/ClassLoaderHelper.java
+++ b/samza-core/src/main/java/org/apache/samza/util/ClassLoaderHelper.java
@@ -19,6 +19,10 @@
 
 package org.apache.samza.util;
 
+import org.apache.samza.config.ConfigException;
+
+import java.lang.reflect.Constructor;
+
 public class ClassLoaderHelper {
 
   public static <T> T fromClassName(String className) throws ClassNotFoundException, InstantiationException, IllegalAccessException {
@@ -26,4 +30,19 @@ public class ClassLoaderHelper {
     T instance = clazz.newInstance();
     return instance;
   }
+
+  public static <T> T fromClassName(String className, Class<T> classType) {
+    try {
+      Class<?> idGeneratorClass = Class.forName(className);
+      if (!classType.isAssignableFrom(idGeneratorClass)) {
+        throw new ConfigException(String.format(
+            "Class %s is not of type %s", className, classType));
+      }
+      Constructor<?> constructor = idGeneratorClass.getConstructor();
+      return (T) constructor.newInstance();
+    } catch (Exception e) {
+      throw new ConfigException(String.format(
+          "Problem in loading %s class %s", classType, className), e);
+    }
+  }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/samza/blob/a7da1840/samza-core/src/main/java/org/apache/samza/zk/ZkCoordinationServiceFactory.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkCoordinationServiceFactory.java b/samza-core/src/main/java/org/apache/samza/zk/ZkCoordinationServiceFactory.java
index cc454e3..21a6b03 100644
--- a/samza-core/src/main/java/org/apache/samza/zk/ZkCoordinationServiceFactory.java
+++ b/samza-core/src/main/java/org/apache/samza/zk/ZkCoordinationServiceFactory.java
@@ -26,12 +26,10 @@ import org.apache.samza.coordinator.CoordinationServiceFactory;
 
 
 public class ZkCoordinationServiceFactory implements CoordinationServiceFactory {
-
-
   synchronized public CoordinationUtils getCoordinationService(String groupId, String participantId, Config config) {
     ZkConfig zkConfig = new ZkConfig(config);
     ZkClient zkClient = new ZkClient(zkConfig.getZkConnect(), zkConfig.getZkSessionTimeoutMs(), zkConfig.getZkConnectionTimeoutMs());
-    ZkUtils zkUtils = new ZkUtils(participantId, new ZkKeyBuilder(groupId), zkClient, zkConfig.getZkConnectionTimeoutMs());
+    ZkUtils zkUtils = new ZkUtils(new ZkKeyBuilder(groupId), zkClient, zkConfig.getZkConnectionTimeoutMs());
     ScheduleAfterDebounceTime debounceTimer = new ScheduleAfterDebounceTime();
     return new ZkCoordinationUtils(participantId, zkConfig, zkUtils, debounceTimer);
   }