You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by na...@apache.org on 2017/04/07 22:22:30 UTC

[3/3] samza git commit: SAMZA-1126 - Semantics of processorId in Samza

SAMZA-1126 - Semantics of processorId in Samza

Implementation based on [SEP-1](https://cwiki.apache.org/confluence/display/SAMZA/SEP-1%3A+Semantics+of+ProcessorId+in+Samza)

Author: navina <na...@apache.org>

Reviewers: Yi Pan <ni...@gmail.com>, Jacob Maes <jm...@linkedin.com>, Jagadish <ja...@apache.org>

Closes #103 from navina/SAMZA-1126


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/a7da1840
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/a7da1840
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/a7da1840

Branch: refs/heads/master
Commit: a7da1840f17a182b49c30451f35991c97fc51068
Parents: c74722b
Author: Navina Ramesh <na...@apache.org>
Authored: Fri Apr 7 15:22:13 2017 -0700
Committer: nramesh <nr...@linkedin.com>
Committed: Fri Apr 7 15:22:13 2017 -0700

----------------------------------------------------------------------
 .../samza/container/SamzaContainerContext.java  |   4 +-
 .../org/apache/samza/job/CommandBuilder.java    |   4 +-
 .../samza/runtime/ProcessorIdGenerator.java     |  51 +++
 .../AbstractContainerAllocator.java             |  12 +-
 .../clustermanager/ContainerProcessManager.java |  19 +-
 .../HostAwareContainerAllocator.java            |   2 +-
 .../clustermanager/SamzaApplicationState.java   |   2 +-
 .../clustermanager/SamzaResourceRequest.java    |   6 +-
 .../apache/samza/config/ApplicationConfig.java  |  60 +++
 .../apache/samza/container/LocalityManager.java |  12 +-
 .../grouper/task/GroupByContainerCount.java     |  43 ++-
 .../grouper/task/GroupByContainerIds.java       |  10 +-
 .../task/SingleContainerGrouperFactory.java     |   8 +-
 .../grouper/task/TaskAssignmentManager.java     |  10 +-
 .../container/grouper/task/TaskNameGrouper.java |   2 +-
 .../samza/coordinator/JobCoordinator.java       |  22 +-
 .../coordinator/JobCoordinatorFactory.java      |   3 +-
 .../messages/SetTaskContainerMapping.java       |   7 +-
 .../apache/samza/job/model/ContainerModel.java  |  31 +-
 .../org/apache/samza/job/model/JobModel.java    |  26 +-
 .../processor/SamzaContainerController.java     |  20 +-
 .../apache/samza/processor/StreamProcessor.java |  53 +--
 .../samza/runtime/LocalContainerRunner.java     |  10 +-
 .../org/apache/samza/runtime/UUIDGenerator.java |  41 ++
 .../model/JsonContainerModelMixIn.java          |   9 +-
 .../serializers/model/JsonJobModelMixIn.java    |   4 +-
 .../serializers/model/SamzaObjectMapper.java    |  30 +-
 .../standalone/StandaloneJobCoordinator.java    |  30 +-
 .../StandaloneJobCoordinatorFactory.java        |   4 +-
 .../apache/samza/storage/StorageRecovery.java   |   4 +-
 .../apache/samza/util/ClassLoaderHelper.java    |  19 +
 .../samza/zk/ZkCoordinationServiceFactory.java  |   4 +-
 .../org/apache/samza/zk/ZkJobCoordinator.java   |  40 +-
 .../samza/zk/ZkJobCoordinatorFactory.java       |  10 +-
 .../main/java/org/apache/samza/zk/ZkUtils.java  |  21 +-
 .../samza/config/ShellCommandConfig.scala       |   3 +-
 .../apache/samza/container/SamzaContainer.scala |   8 +-
 .../samza/coordinator/JobModelManager.scala     |   6 +-
 .../samza/job/local/ProcessJobFactory.scala     |   2 +-
 .../samza/job/local/ThreadJobFactory.scala      |   6 +-
 .../clustermanager/MockContainerAllocator.java  |   2 +-
 .../clustermanager/TestContainerAllocator.java  |  28 +-
 .../TestContainerProcessManager.java            |  25 +-
 .../TestContainerRequestState.java              |  12 +-
 .../TestHostAwareContainerAllocator.java        |  45 +--
 .../samza/container/TestLocalityManager.java    |  12 +-
 .../grouper/task/TestGroupByContainerCount.java | 377 ++++++++++---------
 .../grouper/task/TestGroupByContainerIds.java   |  54 ++-
 .../grouper/task/TestTaskAssignmentManager.java |  45 +--
 .../samza/container/mock/ContainerMocks.java    |  18 +-
 .../model/TestSamzaObjectMapper.java            |  61 ++-
 .../zk/TestZkBarrierForVersionUpgrade.java      |  12 +-
 .../apache/samza/zk/TestZkLeaderElector.java    |   1 -
 .../java/org/apache/samza/zk/TestZkUtils.java   |  10 +-
 .../samza/container/TestSamzaContainer.scala    |  56 ++-
 .../samza/container/TestTaskInstance.scala      |  10 +-
 .../samza/coordinator/TestJobCoordinator.scala  |  13 +-
 .../samza/job/TestShellCommandBuilder.scala     |   4 +-
 .../samza/storage/kv/RocksDbKeyValueReader.java |   2 +-
 .../java/org/apache/samza/rest/model/Task.java  |  12 +-
 .../samza/rest/proxy/task/SamzaTaskProxy.java   |   4 +-
 .../samza/monitor/TestLocalStoreMonitor.java    |   4 +-
 .../rest/resources/mock/MockTaskProxy.java      |   8 +-
 .../performance/TestKeyValuePerformance.scala   |   2 +-
 .../test/processor/TestStreamProcessor.java     |  19 +-
 .../test/integration/StreamTaskTestUtil.scala   |   3 +-
 .../org/apache/samza/job/yarn/YarnAppState.java |   2 +-
 .../job/yarn/YarnClusterResourceManager.java    |  23 +-
 .../samza/job/yarn/YarnContainerRunner.java     |   4 +-
 .../samza/validation/YarnJobValidationTool.java |   6 +-
 70 files changed, 898 insertions(+), 634 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/a7da1840/samza-api/src/main/java/org/apache/samza/container/SamzaContainerContext.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/container/SamzaContainerContext.java b/samza-api/src/main/java/org/apache/samza/container/SamzaContainerContext.java
index fd7333b..4076a51 100644
--- a/samza-api/src/main/java/org/apache/samza/container/SamzaContainerContext.java
+++ b/samza-api/src/main/java/org/apache/samza/container/SamzaContainerContext.java
@@ -28,7 +28,7 @@ import java.util.Collections;
  * A SamzaContainerContext maintains per-container information for the tasks it executes.
  */
 public class SamzaContainerContext {
-  public final int id;
+  public final String id;
   public final Config config;
   public final Collection<TaskName> taskNames;
 
@@ -40,7 +40,7 @@ public class SamzaContainerContext {
    * @param taskNames The set of taskName keys for which this container is responsible.
    */
   public SamzaContainerContext(
-      int id,
+      String id,
       Config config,
       Collection<TaskName> taskNames) {
     this.id = id;

http://git-wip-us.apache.org/repos/asf/samza/blob/a7da1840/samza-api/src/main/java/org/apache/samza/job/CommandBuilder.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/job/CommandBuilder.java b/samza-api/src/main/java/org/apache/samza/job/CommandBuilder.java
index 6d46f5d..fc7438b 100644
--- a/samza-api/src/main/java/org/apache/samza/job/CommandBuilder.java
+++ b/samza-api/src/main/java/org/apache/samza/job/CommandBuilder.java
@@ -30,7 +30,7 @@ import org.apache.samza.config.Config;
  */
 public abstract class CommandBuilder {
   protected Config config;
-  protected int id;
+  protected String id;
   protected URL url;
   protected String commandPath;
 
@@ -61,7 +61,7 @@ public abstract class CommandBuilder {
    *          associated with a specific instantiation of a SamzaContainer.
    * @return self to support a builder style of use.
    */
-  public CommandBuilder setId(int id) {
+  public CommandBuilder setId(String id) {
     this.id = id;
     return this;
   }

http://git-wip-us.apache.org/repos/asf/samza/blob/a7da1840/samza-api/src/main/java/org/apache/samza/runtime/ProcessorIdGenerator.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/runtime/ProcessorIdGenerator.java b/samza-api/src/main/java/org/apache/samza/runtime/ProcessorIdGenerator.java
new file mode 100644
index 0000000..8790d69
--- /dev/null
+++ b/samza-api/src/main/java/org/apache/samza/runtime/ProcessorIdGenerator.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.runtime;
+
+import org.apache.samza.annotation.InterfaceStability;
+import org.apache.samza.config.Config;
+
+@InterfaceStability.Evolving
+public interface ProcessorIdGenerator {
+  /**
+   * Generates a String representation to identify a single instance of StreamProcessor.
+   *
+   * This value can be representative of its current executing environment. It can also be custom-managed by the user,
+   * as long as it adheres to the specification below. More than one processor can co-exist within the same JVM,
+   * as long as their identifiers are guaranteed to be unique.
+   *
+   * <b>Specification of processor identifier</b>:
+   * <ul>
+   *  <li>Processor identifier has to be unique among the processors within a job</li>
+   *  <li>When more than one processor co-exist within the same JVM, the processor identifier can be of the format:
+   *  $x_$y, where 'x' is a unique identifier for the executing JVM and 'y' is a unique identifier for the
+   *  processor instance within the JVM. When there is only one processor within a JVM, 'x' should be sufficient to
+   *  uniquely identify the processor instance.</li>
+   * </ul>
+   *
+   * <b>Note</b>:
+   * In case of more than one processors within the same JVM, the custom implementation of ProcessorIdGenerator can
+   * contain a static counter, which is incremented on each call to generateProcessorId. The counter value can
+   * be treated as the identifier for the processor instance within the JVM.
+   *
+   * @param config Config instance
+   * @return String Identifier for the processor
+   */
+  String generateProcessorId(Config config);
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/a7da1840/samza-core/src/main/java/org/apache/samza/clustermanager/AbstractContainerAllocator.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/clustermanager/AbstractContainerAllocator.java b/samza-core/src/main/java/org/apache/samza/clustermanager/AbstractContainerAllocator.java
index d47f217..b83d83c 100644
--- a/samza-core/src/main/java/org/apache/samza/clustermanager/AbstractContainerAllocator.java
+++ b/samza-core/src/main/java/org/apache/samza/clustermanager/AbstractContainerAllocator.java
@@ -145,7 +145,7 @@ public abstract class AbstractContainerAllocator implements Runnable {
 
     // Update state
     resourceRequestState.updateStateAfterAssignment(request, preferredHost, resource);
-    int containerID = request.getContainerID();
+    String containerID = request.getContainerID();
 
     //run container on resource
     log.info("Found available resources on {}. Assigning request for container_id {} with "
@@ -176,9 +176,9 @@ public abstract class AbstractContainerAllocator implements Runnable {
    *                                - when host-affinity is not enabled, or
    *                                - when host-affinity is enabled and job is run for the first time
    */
-  public void requestResources(Map<Integer, String> resourceToHostMappings) {
-    for (Map.Entry<Integer, String> entry : resourceToHostMappings.entrySet()) {
-      int containerId = entry.getKey();
+  public void requestResources(Map<String, String> resourceToHostMappings) {
+    for (Map.Entry<String, String> entry : resourceToHostMappings.entrySet()) {
+      String containerId = entry.getKey();
       String preferredHost = entry.getValue();
       if (preferredHost == null)
         preferredHost = ResourceRequestState.ANY_HOST;
@@ -211,7 +211,7 @@ public abstract class AbstractContainerAllocator implements Runnable {
    *                            this request
    * @param preferredHost Name of the host that you prefer to run the container on
    */
-  public final void requestResource(int containerID, String preferredHost) {
+  public final void requestResource(String containerID, String preferredHost) {
     SamzaResourceRequest request = new SamzaResourceRequest(this.containerNumCpuCores, this.containerMemoryMb,
         preferredHost, containerID);
     resourceRequestState.addResourceRequest(request);
@@ -242,7 +242,7 @@ public abstract class AbstractContainerAllocator implements Runnable {
    * @param samzaContainerId to configure the builder with.
    * @return the constructed builder object
    */
-  private CommandBuilder getCommandBuilder(int samzaContainerId) {
+  private CommandBuilder getCommandBuilder(String samzaContainerId) {
     String cmdBuilderClassName = taskConfig.getCommandClass(ShellCommandBuilder.class.getName());
     CommandBuilder cmdBuilder = (CommandBuilder) Util.getObj(cmdBuilderClassName);
 

http://git-wip-us.apache.org/repos/asf/samza/blob/a7da1840/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerProcessManager.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerProcessManager.java b/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerProcessManager.java
index b4309d9..9b5e871 100644
--- a/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerProcessManager.java
+++ b/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerProcessManager.java
@@ -93,7 +93,7 @@ public class ContainerProcessManager implements ClusterResourceManager.Callback
    * value is the {@link ResourceFailure} object that has a count of failures.
    *
    */
-  private final Map<Integer, ResourceFailure> containerFailures = new HashMap<>();
+  private final Map<String, ResourceFailure> containerFailures = new HashMap<>();
 
   private final ContainerProcessManagerMetrics metrics;
 
@@ -173,7 +173,7 @@ public class ContainerProcessManager implements ClusterResourceManager.Callback
     state.neededContainers.set(containerCount);
 
     // Request initial set of containers
-    Map<Integer, String> containerToHostMapping = state.jobModelManager.jobModel().getAllContainerLocality();
+    Map<String, String> containerToHostMapping = state.jobModelManager.jobModel().getAllContainerLocality();
 
     containerAllocator.requestResources(containerToHostMapping);
 
@@ -228,8 +228,8 @@ public class ContainerProcessManager implements ClusterResourceManager.Callback
    */
   public void onResourceCompleted(SamzaResourceStatus containerStatus) {
     String containerIdStr = containerStatus.getResourceID();
-    int containerId = -1;
-    for (Map.Entry<Integer, SamzaResource> entry: state.runningContainers.entrySet()) {
+    String containerId = null;
+    for (Map.Entry<String, SamzaResource> entry: state.runningContainers.entrySet()) {
       if (entry.getValue().getResourceID().equals(containerStatus.getResourceID())) {
         log.info("Matching container ID found " + entry.getKey() + " " + entry.getValue());
 
@@ -237,10 +237,11 @@ public class ContainerProcessManager implements ClusterResourceManager.Callback
         break;
       }
     }
-    if (containerId == -1) {
+    if (containerId == null) {
       log.info("No matching container id found for " + containerStatus.toString());
+    } else {
+      state.runningContainers.remove(containerId);
     }
-    state.runningContainers.remove(containerId);
 
     int exitStatus = containerStatus.getExitCode();
     switch (exitStatus) {
@@ -249,7 +250,7 @@ public class ContainerProcessManager implements ClusterResourceManager.Callback
 
         state.completedContainers.incrementAndGet();
 
-        if (containerId != -1) {
+        if (containerId != null) {
           state.finishedContainers.incrementAndGet();
           containerFailures.remove(containerId);
         }
@@ -275,7 +276,7 @@ public class ContainerProcessManager implements ClusterResourceManager.Callback
         // clean up, and request a refactor container for the tasks. This only
         // should happen if the container was 'lost' due to node failure, not
         // if the AM released the container.
-        if (containerId != -1) {
+        if (containerId != null) {
           log.info("Released container {} was assigned task group ID {}. Requesting a refactor container for the task group.", containerIdStr, containerId);
 
           state.neededContainers.incrementAndGet();
@@ -295,7 +296,7 @@ public class ContainerProcessManager implements ClusterResourceManager.Callback
         state.failedContainersStatus.put(containerIdStr, containerStatus);
         state.jobHealthy.set(false);
 
-        if (containerId != -1) {
+        if (containerId != null) {
           state.neededContainers.incrementAndGet();
           // Find out previously running container location
           String lastSeenOn = state.jobModelManager.jobModel().getContainerToHostValue(containerId, SetContainerHostMapping.HOST_KEY);

http://git-wip-us.apache.org/repos/asf/samza/blob/a7da1840/samza-core/src/main/java/org/apache/samza/clustermanager/HostAwareContainerAllocator.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/clustermanager/HostAwareContainerAllocator.java b/samza-core/src/main/java/org/apache/samza/clustermanager/HostAwareContainerAllocator.java
index da73049..66e2246 100644
--- a/samza-core/src/main/java/org/apache/samza/clustermanager/HostAwareContainerAllocator.java
+++ b/samza-core/src/main/java/org/apache/samza/clustermanager/HostAwareContainerAllocator.java
@@ -64,7 +64,7 @@ public class HostAwareContainerAllocator extends AbstractContainerAllocator {
       SamzaResourceRequest request = peekPendingRequest();
       log.info("Handling request: " + request.getContainerID() + " " + request.getRequestTimestampMs() + " " + request.getPreferredHost());
       String preferredHost = request.getPreferredHost();
-      int containerID = request.getContainerID();
+      String containerID = request.getContainerID();
 
       if (hasAllocatedResource(preferredHost)) {
         // Found allocated container at preferredHost

http://git-wip-us.apache.org/repos/asf/samza/blob/a7da1840/samza-core/src/main/java/org/apache/samza/clustermanager/SamzaApplicationState.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/clustermanager/SamzaApplicationState.java b/samza-core/src/main/java/org/apache/samza/clustermanager/SamzaApplicationState.java
index cf91044..bde3fac 100644
--- a/samza-core/src/main/java/org/apache/samza/clustermanager/SamzaApplicationState.java
+++ b/samza-core/src/main/java/org/apache/samza/clustermanager/SamzaApplicationState.java
@@ -99,7 +99,7 @@ public class SamzaApplicationState {
    *  Map of the samzaContainerId to the {@link SamzaResource} on which it is running
    *  Modified by both the AMRMCallbackThread and the ContainerAllocator thread
    */
-  public final ConcurrentMap<Integer, SamzaResource> runningContainers = new ConcurrentHashMap<Integer, SamzaResource>(0);
+  public final ConcurrentMap<String, SamzaResource> runningContainers = new ConcurrentHashMap<String, SamzaResource>(0);
 
   /**
    * Final status of the application

http://git-wip-us.apache.org/repos/asf/samza/blob/a7da1840/samza-core/src/main/java/org/apache/samza/clustermanager/SamzaResourceRequest.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/clustermanager/SamzaResourceRequest.java b/samza-core/src/main/java/org/apache/samza/clustermanager/SamzaResourceRequest.java
index 3d1560f..4159ff2 100644
--- a/samza-core/src/main/java/org/apache/samza/clustermanager/SamzaResourceRequest.java
+++ b/samza-core/src/main/java/org/apache/samza/clustermanager/SamzaResourceRequest.java
@@ -55,14 +55,14 @@ public class SamzaResourceRequest implements Comparable<SamzaResourceRequest> {
   /**
    * The ID of the StreamProcessor which this request is for.
    */
-  private final int containerID;
+  private final String containerID;
 
   /**
    * The timestamp in millis when the request was created.
    */
   private final long requestTimestampMs;
 
-  public SamzaResourceRequest(int numCores, int memoryMB, String preferredHost, int expectedContainerID) {
+  public SamzaResourceRequest(int numCores, int memoryMB, String preferredHost, String expectedContainerID) {
     this.numCores = numCores;
     this.memoryMB = memoryMB;
     this.preferredHost = preferredHost;
@@ -72,7 +72,7 @@ public class SamzaResourceRequest implements Comparable<SamzaResourceRequest> {
     log.info("Resource Request created for {} on {} at {}", new Object[] {this.containerID, this.preferredHost, this.requestTimestampMs});
   }
 
-  public int getContainerID() {
+  public String getContainerID() {
     return containerID;
   }
 

http://git-wip-us.apache.org/repos/asf/samza/blob/a7da1840/samza-core/src/main/java/org/apache/samza/config/ApplicationConfig.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/config/ApplicationConfig.java b/samza-core/src/main/java/org/apache/samza/config/ApplicationConfig.java
new file mode 100644
index 0000000..708daa6
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/config/ApplicationConfig.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.config;
+
+/**
+ * Accessors for configs associated with Application scope
+ */
+public class ApplicationConfig extends MapConfig {
+  /**
+   * <p>processor.id is similar to the logical containerId generated in Samza. However, in addition to identifying the JVM
+   * of the processor, it also contains a segment to identify the instance of the
+   * {@link org.apache.samza.processor.StreamProcessor} within the JVM. More detail can be found in
+   * {@link org.apache.samza.runtime.ProcessorIdGenerator}. </p>
+   * <p>
+   * This is an important distinction because Samza 0.13.0 in Yarn has a 1:1 mapping between the processor and the Yarn
+   * container (JVM). However, Samza in an embedded execution can contain more than one processor within the same JVM.
+   * </p>
+   * <b>Note:</b>This identifier has to be unique across the instances of StreamProcessors.
+   * TODO: Deprecated in 0.13. After 0.13+, this id is generated using {@link org.apache.samza.runtime.ProcessorIdGenerator}
+   */
+  @Deprecated
+  public static final String PROCESSOR_ID = "processor.id";
+
+  /**
+   * Class implementing the {@link org.apache.samza.runtime.ProcessorIdGenerator} interface
+   * Used to generate a unique identifier for a {@link org.apache.samza.processor.StreamProcessor} based on the runtime
+   * environment. Hence, durability of the identifier is same as the guarantees provided by the runtime environment
+   */
+  public static final String APP_PROCESSOR_ID_GENERATOR_CLASS = "app.processor-id-generator.class";
+
+  public ApplicationConfig(Config config) {
+    super(config);
+  }
+
+  public String getAppProcessorIdGeneratorClass() {
+    return get(APP_PROCESSOR_ID_GENERATOR_CLASS, null);
+  }
+
+  @Deprecated
+  public String getProcessorId() {
+    return get(PROCESSOR_ID, null);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/a7da1840/samza-core/src/main/java/org/apache/samza/container/LocalityManager.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/container/LocalityManager.java b/samza-core/src/main/java/org/apache/samza/container/LocalityManager.java
index a615d4f..22380d3 100644
--- a/samza-core/src/main/java/org/apache/samza/container/LocalityManager.java
+++ b/samza-core/src/main/java/org/apache/samza/container/LocalityManager.java
@@ -37,7 +37,7 @@ import org.apache.samza.coordinator.stream.messages.SetContainerHostMapping;
  * */
 public class LocalityManager extends AbstractCoordinatorStreamManager {
   private static final Logger log = LoggerFactory.getLogger(LocalityManager.class);
-  private Map<Integer, Map<String, String>> containerToHostMapping = new HashMap<>();
+  private Map<String, Map<String, String>> containerToHostMapping = new HashMap<>();
   private final TaskAssignmentManager taskAssignmentManager;
   private final boolean writeOnly;
 
@@ -92,23 +92,23 @@ public class LocalityManager extends AbstractCoordinatorStreamManager {
    *
    * @return the map of containerId: (hostname, jmxAddress, jmxTunnelAddress)
    */
-  public Map<Integer, Map<String, String>> readContainerLocality() {
+  public Map<String, Map<String, String>> readContainerLocality() {
     if (this.writeOnly) {
       throw new UnsupportedOperationException("Read container locality function is not supported in write-only LocalityManager");
     }
 
-    Map<Integer, Map<String, String>> allMappings = new HashMap<>();
+    Map<String, Map<String, String>> allMappings = new HashMap<>();
     for (CoordinatorStreamMessage message: getBootstrappedStream(SetContainerHostMapping.TYPE)) {
       SetContainerHostMapping mapping = new SetContainerHostMapping(message);
       Map<String, String> localityMappings = new HashMap<>();
       localityMappings.put(SetContainerHostMapping.HOST_KEY, mapping.getHostLocality());
       localityMappings.put(SetContainerHostMapping.JMX_URL_KEY, mapping.getJmxUrl());
       localityMappings.put(SetContainerHostMapping.JMX_TUNNELING_URL_KEY, mapping.getJmxTunnelingUrl());
-      allMappings.put(Integer.parseInt(mapping.getKey()), localityMappings);
+      allMappings.put(mapping.getKey(), localityMappings);
     }
     containerToHostMapping = Collections.unmodifiableMap(allMappings);
 
-    for (Map.Entry<Integer, Map<String, String>> entry : containerToHostMapping.entrySet()) {
+    for (Map.Entry<String, Map<String, String>> entry : containerToHostMapping.entrySet()) {
       log.debug(String.format("Locality for container %s: %s", entry.getKey(), entry.getValue()));
     }
 
@@ -123,7 +123,7 @@ public class LocalityManager extends AbstractCoordinatorStreamManager {
    * @param jmxAddress  the JMX URL address
    * @param jmxTunnelingAddress  the JMX Tunnel URL address
    */
-  public void writeContainerToHostMapping(Integer containerId, String hostName, String jmxAddress, String jmxTunnelingAddress) {
+  public void writeContainerToHostMapping(String containerId, String hostName, String jmxAddress, String jmxTunnelingAddress) {
     Map<String, String> existingMappings = containerToHostMapping.get(containerId);
     String existingHostMapping = existingMappings != null ? existingMappings.get(SetContainerHostMapping.HOST_KEY) : null;
     if (existingHostMapping != null && !existingHostMapping.equals(hostName)) {

http://git-wip-us.apache.org/repos/asf/samza/blob/a7da1840/samza-core/src/main/java/org/apache/samza/container/grouper/task/GroupByContainerCount.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/container/grouper/task/GroupByContainerCount.java b/samza-core/src/main/java/org/apache/samza/container/grouper/task/GroupByContainerCount.java
index 5e6ccf8..246188e 100644
--- a/samza-core/src/main/java/org/apache/samza/container/grouper/task/GroupByContainerCount.java
+++ b/samza-core/src/main/java/org/apache/samza/container/grouper/task/GroupByContainerCount.java
@@ -27,6 +27,8 @@ import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+
+import org.apache.samza.SamzaException;
 import org.apache.samza.container.LocalityManager;
 import org.apache.samza.container.TaskName;
 import org.apache.samza.job.model.ContainerModel;
@@ -42,6 +44,8 @@ import org.slf4j.LoggerFactory;
  * happens to be). No consideration is given towards locality, even distribution
  * of aggregate SSPs within a container, even distribution of the number of
  * taskNames between containers, etc.
+ *
+ * TODO: SAMZA-1197 - need to modify balance to work with processorId strings
  */
 public class GroupByContainerCount implements BalancingTaskNameGrouper {
   private static final Logger log = LoggerFactory.getLogger(GroupByContainerCount.class);
@@ -74,7 +78,7 @@ public class GroupByContainerCount implements BalancingTaskNameGrouper {
     // Convert to a Set of ContainerModel
     Set<ContainerModel> containerModels = new HashSet<>();
     for (int i = 0; i < containerCount; i++) {
-      containerModels.add(new ContainerModel(i, taskGroups[i]));
+      containerModels.add(new ContainerModel(String.valueOf(i), i, taskGroups[i]));
     }
 
     return Collections.unmodifiableSet(containerModels);
@@ -142,7 +146,14 @@ public class GroupByContainerCount implements BalancingTaskNameGrouper {
    *                              if the previous mapping doesn't exist or isn't usable.
    */
   private List<TaskGroup> getPreviousContainers(TaskAssignmentManager taskAssignmentManager, int taskCount) {
-    Map<String, Integer> taskToContainerId = taskAssignmentManager.readTaskAssignment();
+    Map<String, String> taskToContainerId = taskAssignmentManager.readTaskAssignment();
+    taskToContainerId.values().forEach(id -> {
+        try {
+          int intId = Integer.parseInt(id);
+        } catch (NumberFormatException nfe) {
+          throw new SamzaException("GroupByContainerCount cannot handle non-integer processorIds!", nfe);
+        }
+      });
     if (taskToContainerId.isEmpty()) {
       log.info("No task assignment map was saved.");
       return null;
@@ -178,7 +189,7 @@ public class GroupByContainerCount implements BalancingTaskNameGrouper {
   private void saveTaskAssignments(Set<ContainerModel> containers, TaskAssignmentManager taskAssignmentManager) {
     for (ContainerModel container : containers) {
       for (TaskName taskName : container.getTasks().keySet()) {
-        taskAssignmentManager.writeTaskContainerMapping(taskName.getTaskName(), container.getContainerId());
+        taskAssignmentManager.writeTaskContainerMapping(taskName.getTaskName(), container.getProcessorId());
       }
     }
   }
@@ -211,7 +222,7 @@ public class GroupByContainerCount implements BalancingTaskNameGrouper {
   private List<TaskGroup> createContainers(int startContainerId, int endContainerId) {
     List<TaskGroup> containers = new ArrayList<>(endContainerId - startContainerId);
     for (int i = startContainerId; i < endContainerId; i++) {
-      TaskGroup taskGroup = new TaskGroup(i, new ArrayList<String>());
+      TaskGroup taskGroup = new TaskGroup(String.valueOf(i), new ArrayList<String>());
       containers.add(taskGroup);
     }
     return containers;
@@ -225,10 +236,11 @@ public class GroupByContainerCount implements BalancingTaskNameGrouper {
    * @param taskNamesToAssign     the list of tasks to assign to the containers.
    * @param containers            the containers (as {@link TaskGroup}) to which the tasks will be assigned.
    */
+  // TODO: Change logic from using int arrays to a Map<String, Integer> (id -> taskCount)
   private void assignTasksToContainers(int[] taskCountPerContainer, List<String> taskNamesToAssign,
       List<TaskGroup> containers) {
     for (TaskGroup taskGroup : containers) {
-      for (int j = taskGroup.size(); j < taskCountPerContainer[taskGroup.getContainerId()]; j++) {
+      for (int j = taskGroup.size(); j < taskCountPerContainer[Integer.valueOf(taskGroup.getContainerId())]; j++) {
         String taskName = taskNamesToAssign.remove(0);
         taskGroup.addTaskName(taskName);
         log.info("Assigned task {} to container {}", taskName, taskGroup.getContainerId());
@@ -283,7 +295,8 @@ public class GroupByContainerCount implements BalancingTaskNameGrouper {
         TaskModel model = taskNameToModel.get(taskName);
         containerTaskModels.put(model.getTaskName(), model);
       }
-      containerModels.add(new ContainerModel(container.containerId, containerTaskModels));
+      containerModels.add(
+          new ContainerModel(container.containerId, Integer.valueOf(container.containerId), containerTaskModels));
     }
     return Collections.unmodifiableSet(containerModels);
   }
@@ -294,14 +307,14 @@ public class GroupByContainerCount implements BalancingTaskNameGrouper {
    * @param taskToContainerId a map from each task name to the containerId to which it is assigned.
    * @return                  a list of TaskGroups ordered ascending by containerId.
    */
-  private List<TaskGroup> getOrderedContainers(Map<String, Integer> taskToContainerId) {
+  private List<TaskGroup> getOrderedContainers(Map<String, String> taskToContainerId) {
     log.debug("Got task to container map: {}", taskToContainerId);
 
     // Group tasks by container Id
-    HashMap<Integer, List<String>> containerIdToTaskNames = new HashMap<>();
-    for (Map.Entry<String, Integer> entry : taskToContainerId.entrySet()) {
+    HashMap<String, List<String>> containerIdToTaskNames = new HashMap<>();
+    for (Map.Entry<String, String> entry : taskToContainerId.entrySet()) {
       String taskName = entry.getKey();
-      Integer containerId = entry.getValue();
+      String containerId = entry.getValue();
       List<String> taskNames = containerIdToTaskNames.get(containerId);
       if (taskNames == null) {
         taskNames = new ArrayList<>();
@@ -313,8 +326,8 @@ public class GroupByContainerCount implements BalancingTaskNameGrouper {
     // Build container tasks
     List<TaskGroup> containerTasks = new ArrayList<>(containerIdToTaskNames.size());
     for (int i = 0; i < containerIdToTaskNames.size(); i++) {
-      if (containerIdToTaskNames.get(i) == null) throw new IllegalStateException("Task mapping is missing container: " + i);
-      containerTasks.add(new TaskGroup(i, containerIdToTaskNames.get(i)));
+      if (containerIdToTaskNames.get(String.valueOf(i)) == null) throw new IllegalStateException("Task mapping is missing container: " + i);
+      containerTasks.add(new TaskGroup(String.valueOf(i), containerIdToTaskNames.get(String.valueOf(i))));
     }
 
     return containerTasks;
@@ -327,15 +340,15 @@ public class GroupByContainerCount implements BalancingTaskNameGrouper {
    */
   private static class TaskGroup {
     private final List<String> taskNames = new LinkedList<>();
-    private final Integer containerId;
+    private final String containerId;
 
-    private TaskGroup(Integer containerId, List<String> taskNames) {
+    private TaskGroup(String containerId, List<String> taskNames) {
       this.containerId = containerId;
       Collections.sort(taskNames);        // For consistency because the taskNames came from a Map
       this.taskNames.addAll(taskNames);
     }
 
-    public Integer getContainerId() {
+    public String getContainerId() {
       return containerId;
     }
 

http://git-wip-us.apache.org/repos/asf/samza/blob/a7da1840/samza-core/src/main/java/org/apache/samza/container/grouper/task/GroupByContainerIds.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/container/grouper/task/GroupByContainerIds.java b/samza-core/src/main/java/org/apache/samza/container/grouper/task/GroupByContainerIds.java
index 6d3f673..f2d88cd 100644
--- a/samza-core/src/main/java/org/apache/samza/container/grouper/task/GroupByContainerIds.java
+++ b/samza-core/src/main/java/org/apache/samza/container/grouper/task/GroupByContainerIds.java
@@ -52,14 +52,14 @@ public class GroupByContainerIds implements TaskNameGrouper {
     if (startContainerCount > tasks.size())
       throw new IllegalArgumentException("number of containers="  + startContainerCount + " is bigger than number of tasks=" + tasks.size());
 
-    List<Integer> containerIds = new ArrayList<>(startContainerCount);
+    List<String> containerIds = new ArrayList<>(startContainerCount);
     for (int i = 0; i < startContainerCount; i++) {
-      containerIds.add(i);
+      containerIds.add(String.valueOf(i));
     }
     return group(tasks, containerIds);
   }
 
-  public Set<ContainerModel> group(Set<TaskModel> tasks, List<Integer> containersIds) {
+  public Set<ContainerModel> group(Set<TaskModel> tasks, List<String> containersIds) {
     if (tasks.isEmpty())
       throw new IllegalArgumentException("cannot group an empty set. containersIds=" + Arrays
           .toString(containersIds.toArray()));
@@ -89,7 +89,9 @@ public class GroupByContainerIds implements TaskNameGrouper {
     // Convert to a Set of ContainerModel
     Set<ContainerModel> containerModels = new HashSet<>();
     for (int i = 0; i < containerCount; i++) {
-      containerModels.add(new ContainerModel(containersIds.get(i), taskGroups[i]));
+      // containerId in ContainerModel constructor is set to -1 because processorId can be any string and does
+      // not have an integer equivalent. So, we set it to -1. After 0.13, this parameter will be removed.
+      containerModels.add(new ContainerModel(containersIds.get(i), -1, taskGroups[i]));
     }
 
     return Collections.unmodifiableSet(containerModels);

http://git-wip-us.apache.org/repos/asf/samza/blob/a7da1840/samza-core/src/main/java/org/apache/samza/container/grouper/task/SingleContainerGrouperFactory.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/container/grouper/task/SingleContainerGrouperFactory.java b/samza-core/src/main/java/org/apache/samza/container/grouper/task/SingleContainerGrouperFactory.java
index 980f2a9..15cd224 100644
--- a/samza-core/src/main/java/org/apache/samza/container/grouper/task/SingleContainerGrouperFactory.java
+++ b/samza-core/src/main/java/org/apache/samza/container/grouper/task/SingleContainerGrouperFactory.java
@@ -33,14 +33,14 @@ import java.util.Set;
 public class SingleContainerGrouperFactory implements TaskNameGrouperFactory {
   @Override
   public TaskNameGrouper build(Config config) {
-    return new SingleContainerGrouper(config.getInt(JobConfig.PROCESSOR_ID()));
+    return new SingleContainerGrouper(config.get(JobConfig.PROCESSOR_ID()));
   }
 }
 
 class SingleContainerGrouper implements TaskNameGrouper {
-  private final int containerId;
+  private final String containerId;
 
-  SingleContainerGrouper(int containerId) {
+  SingleContainerGrouper(String containerId) {
     this.containerId = containerId;
   }
 
@@ -50,7 +50,7 @@ class SingleContainerGrouper implements TaskNameGrouper {
     for (TaskModel taskModel: taskModels) {
       taskNameTaskModelMap.put(taskModel.getTaskName(), taskModel);
     }
-    ContainerModel containerModel = new ContainerModel(containerId, taskNameTaskModelMap);
+    ContainerModel containerModel = new ContainerModel(containerId, -1, taskNameTaskModelMap);
     return Collections.singleton(containerModel);
   }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/a7da1840/samza-core/src/main/java/org/apache/samza/container/grouper/task/TaskAssignmentManager.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/container/grouper/task/TaskAssignmentManager.java b/samza-core/src/main/java/org/apache/samza/container/grouper/task/TaskAssignmentManager.java
index 11207b2..d33a22b 100644
--- a/samza-core/src/main/java/org/apache/samza/container/grouper/task/TaskAssignmentManager.java
+++ b/samza-core/src/main/java/org/apache/samza/container/grouper/task/TaskAssignmentManager.java
@@ -38,7 +38,7 @@ import org.slf4j.LoggerFactory;
  * */
 public class TaskAssignmentManager extends AbstractCoordinatorStreamManager {
   private static final Logger log = LoggerFactory.getLogger(TaskAssignmentManager.class);
-  private final Map<String, Integer> taskNameToContainerId = new HashMap<>();
+  private final Map<String, String> taskNameToContainerId = new HashMap<>();
   private boolean registered = false;
 
   /**
@@ -70,7 +70,7 @@ public class TaskAssignmentManager extends AbstractCoordinatorStreamManager {
    *
    * @return the map of taskName: containerId
    */
-  public Map<String, Integer> readTaskAssignment() {
+  public Map<String, String> readTaskAssignment() {
     taskNameToContainerId.clear();
     for (CoordinatorStreamMessage message: getBootstrappedStream(SetTaskContainerMapping.TYPE)) {
       if (message.isDelete()) {
@@ -83,7 +83,7 @@ public class TaskAssignmentManager extends AbstractCoordinatorStreamManager {
       }
     }
 
-    for (Map.Entry<String, Integer> entry : taskNameToContainerId.entrySet()) {
+    for (Map.Entry<String, String> entry : taskNameToContainerId.entrySet()) {
       log.debug("Assignment for task \"{}\": {}", entry.getKey(), entry.getValue());
     }
 
@@ -96,8 +96,8 @@ public class TaskAssignmentManager extends AbstractCoordinatorStreamManager {
    * @param taskName    the task name
    * @param containerId the SamzaContainer ID or {@code null} to delete the mapping
    */
-  public void writeTaskContainerMapping(String taskName, Integer containerId) {
-    Integer existingContainerId = taskNameToContainerId.get(taskName);
+  public void writeTaskContainerMapping(String taskName, String containerId) {
+    String existingContainerId = taskNameToContainerId.get(taskName);
     if (existingContainerId != null && !existingContainerId.equals(containerId)) {
       log.info("Task \"{}\" moved from container {} to container {}", new Object[]{taskName, existingContainerId, containerId});
     } else {

http://git-wip-us.apache.org/repos/asf/samza/blob/a7da1840/samza-core/src/main/java/org/apache/samza/container/grouper/task/TaskNameGrouper.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/container/grouper/task/TaskNameGrouper.java b/samza-core/src/main/java/org/apache/samza/container/grouper/task/TaskNameGrouper.java
index d06bf62..71b80cc 100644
--- a/samza-core/src/main/java/org/apache/samza/container/grouper/task/TaskNameGrouper.java
+++ b/samza-core/src/main/java/org/apache/samza/container/grouper/task/TaskNameGrouper.java
@@ -52,7 +52,7 @@ public interface TaskNameGrouper {
    */
   Set<ContainerModel> group(Set<TaskModel> tasks);
 
-  default Set<ContainerModel> group(Set<TaskModel> tasks, List<Integer> containersIds) {
+  default Set<ContainerModel> group(Set<TaskModel> tasks, List<String> containersIds) {
     return group(tasks);
   }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/a7da1840/samza-core/src/main/java/org/apache/samza/coordinator/JobCoordinator.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/coordinator/JobCoordinator.java b/samza-core/src/main/java/org/apache/samza/coordinator/JobCoordinator.java
index 252e56b..af2ef6a 100644
--- a/samza-core/src/main/java/org/apache/samza/coordinator/JobCoordinator.java
+++ b/samza-core/src/main/java/org/apache/samza/coordinator/JobCoordinator.java
@@ -23,9 +23,13 @@ import org.apache.samza.job.model.JobModel;
 
 /**
  *  A JobCoordinator is a pluggable module in each process that provides the JobModel and the ID to the StreamProcessor.
- *  In some cases, ID assignment is completely config driven, while in other cases, ID assignment may require
- *  coordination with JobCoordinators of other StreamProcessors.
- *  */
+ *
+ *  It is the responsibility of the JobCoordinator to assign a unique identifier to the StreamProcessor
+ *  based on the underlying environment. In some cases, ID assignment is completely config driven, while in other
+ *  cases, ID assignment may require coordination with JobCoordinators of other StreamProcessors.
+ *
+ *  This interface contains methods required for the StreamProcessor to interact with JobCoordinator.
+ */
 @InterfaceStability.Evolving
 public interface JobCoordinator {
   /**
@@ -55,12 +59,16 @@ public interface JobCoordinator {
    * @throws InterruptedException if the current thread is interrupted while waiting for the JobCoordinator to start-up
    */
   boolean awaitStart(long timeoutMs) throws InterruptedException;
+
   /**
-   * Returns the logical ID assigned to the processor
-   * It is up to the user to ensure that different instances of StreamProcessor within a job have unique processor ID.
-   * @return integer representing the logical processor ID
+   * Returns the identifier assigned to the processor that is local to the instance of StreamProcessor.
+   *
+   * The semantics and format of the identifier returned should adhere to the specification defined in
+   * {@link org.apache.samza.runtime.ProcessorIdGenerator}
+   *
+   * @return String representing a unique logical processor ID
    */
-  int getProcessorId();
+  String getProcessorId();
 
   /**
    * Returns the current JobModel

http://git-wip-us.apache.org/repos/asf/samza/blob/a7da1840/samza-core/src/main/java/org/apache/samza/coordinator/JobCoordinatorFactory.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/coordinator/JobCoordinatorFactory.java b/samza-core/src/main/java/org/apache/samza/coordinator/JobCoordinatorFactory.java
index d15bce1..8553f59 100644
--- a/samza-core/src/main/java/org/apache/samza/coordinator/JobCoordinatorFactory.java
+++ b/samza-core/src/main/java/org/apache/samza/coordinator/JobCoordinatorFactory.java
@@ -26,11 +26,10 @@ import org.apache.samza.processor.SamzaContainerController;
 @InterfaceStability.Evolving
 public interface JobCoordinatorFactory {
   /**
-   * @param processorId Unique identifier for the processor
    * @param config Configs relevant for the JobCoordinator TODO: Separate JC related configs into a "JobCoordinatorConfig"
    * @param containerController Controller interface for starting and stopping container. In future, it may simply
    *                            pause the container and add/remove tasks
    * @return An instance of IJobCoordinator
    */
-  JobCoordinator getJobCoordinator(int processorId, Config config, SamzaContainerController containerController);
+  JobCoordinator getJobCoordinator(Config config, SamzaContainerController containerController);
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/samza/blob/a7da1840/samza-core/src/main/java/org/apache/samza/coordinator/stream/messages/SetTaskContainerMapping.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/coordinator/stream/messages/SetTaskContainerMapping.java b/samza-core/src/main/java/org/apache/samza/coordinator/stream/messages/SetTaskContainerMapping.java
index 431c05d..f8d4d43 100644
--- a/samza-core/src/main/java/org/apache/samza/coordinator/stream/messages/SetTaskContainerMapping.java
+++ b/samza-core/src/main/java/org/apache/samza/coordinator/stream/messages/SetTaskContainerMapping.java
@@ -43,8 +43,9 @@ public class SetTaskContainerMapping extends CoordinatorStreamMessage {
   public static final String TYPE = "set-task-container-assignment";
   public static final String CONTAINER_KEY = "containerId";
 
+
   /**
-   * SteContainerToHostMapping is used to set the container to host mapping information.
+   * SetContainerToHostMapping is used to set the container to host mapping information.
    * @param message which holds the container to host information.
    */
   public SetTaskContainerMapping(CoordinatorStreamMessage message) {
@@ -64,8 +65,8 @@ public class SetTaskContainerMapping extends CoordinatorStreamMessage {
     putMessageValue(CONTAINER_KEY, containerId);
   }
 
-  public Integer getTaskAssignment() {
-    return Integer.parseInt(getMessageValue(CONTAINER_KEY));
+  public String getTaskAssignment() {
+    return getMessageValue(CONTAINER_KEY);
   }
 
 

http://git-wip-us.apache.org/repos/asf/samza/blob/a7da1840/samza-core/src/main/java/org/apache/samza/job/model/ContainerModel.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/job/model/ContainerModel.java b/samza-core/src/main/java/org/apache/samza/job/model/ContainerModel.java
index ed721b1..bd4fa94 100644
--- a/samza-core/src/main/java/org/apache/samza/job/model/ContainerModel.java
+++ b/samza-core/src/main/java/org/apache/samza/job/model/ContainerModel.java
@@ -19,9 +19,10 @@
 
 package org.apache.samza.job.model;
 
+import org.apache.samza.container.TaskName;
+
 import java.util.Collections;
 import java.util.Map;
-import org.apache.samza.container.TaskName;
 
 /**
  * <p>
@@ -35,34 +36,49 @@ import org.apache.samza.container.TaskName;
  * containers have tasks. Each data model contains relevant information, such as
  * an id, partition information, etc.
  * </p>
+ * <p>
+ * <b>Note</b>: This class has a natural ordering that is inconsistent with equals.
+ * </p>
  */
-public class ContainerModel implements Comparable<ContainerModel> {
+public class ContainerModel {
+  @Deprecated
   private final int containerId;
+  private final String processorId;
   private final Map<TaskName, TaskModel> tasks;
 
-  public ContainerModel(int containerId, Map<TaskName, TaskModel> tasks) {
+  public ContainerModel(String processorId, int containerId, Map<TaskName, TaskModel> tasks) {
     this.containerId = containerId;
+    if (processorId == null) {
+      this.processorId = String.valueOf(containerId);
+    } else {
+      this.processorId = processorId;
+    }
     this.tasks = Collections.unmodifiableMap(tasks);
   }
 
+  @Deprecated
   public int getContainerId() {
     return containerId;
   }
 
+  public String getProcessorId() {
+    return processorId;
+  }
+
   public Map<TaskName, TaskModel> getTasks() {
     return tasks;
   }
 
   @Override
   public String toString() {
-    return "ContainerModel [containerId=" + containerId + ", tasks=" + tasks + "]";
+    return "ContainerModel [processorId=" + processorId + ", tasks=" + tasks + "]";
   }
 
   @Override
   public int hashCode() {
     final int prime = 31;
     int result = 1;
-    result = prime * result + containerId;
+    result = prime * result + ((processorId == null) ? 0 : processorId.hashCode());
     result = prime * result + ((tasks == null) ? 0 : tasks.hashCode());
     return result;
   }
@@ -76,7 +92,7 @@ public class ContainerModel implements Comparable<ContainerModel> {
     if (getClass() != obj.getClass())
       return false;
     ContainerModel other = (ContainerModel) obj;
-    if (containerId != other.containerId)
+    if (!processorId.equals(other.processorId))
       return false;
     if (tasks == null) {
       if (other.tasks != null)
@@ -86,7 +102,4 @@ public class ContainerModel implements Comparable<ContainerModel> {
     return true;
   }
 
-  public int compareTo(ContainerModel other) {
-    return containerId - other.getContainerId();
-  }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/a7da1840/samza-core/src/main/java/org/apache/samza/job/model/JobModel.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/job/model/JobModel.java b/samza-core/src/main/java/org/apache/samza/job/model/JobModel.java
index dbd6dcc..dbb3867 100644
--- a/samza-core/src/main/java/org/apache/samza/job/model/JobModel.java
+++ b/samza-core/src/main/java/org/apache/samza/job/model/JobModel.java
@@ -41,24 +41,24 @@ import org.apache.samza.coordinator.stream.messages.SetContainerHostMapping;
 public class JobModel {
   private static final String EMPTY_STRING = "";
   private final Config config;
-  private final Map<Integer, ContainerModel> containers;
+  private final Map<String, ContainerModel> containers;
 
   private final LocalityManager localityManager;
-  private Map<Integer, String> localityMappings = new HashMap<Integer, String>();
+  private Map<String, String> localityMappings = new HashMap<String, String>();
 
   public int maxChangeLogStreamPartitions;
 
-  public JobModel(Config config, Map<Integer, ContainerModel> containers) {
+  public JobModel(Config config, Map<String, ContainerModel> containers) {
     this(config, containers, null);
   }
 
-  public JobModel(Config config, Map<Integer, ContainerModel> containers, LocalityManager localityManager) {
+  public JobModel(Config config, Map<String, ContainerModel> containers, LocalityManager localityManager) {
     this.config = config;
     this.containers = Collections.unmodifiableMap(containers);
     this.localityManager = localityManager;
 
     if (localityManager == null) {
-      for (Integer containerId : containers.keySet()) {
+      for (String containerId : containers.keySet()) {
         localityMappings.put(containerId, null);
       }
     } else {
@@ -89,7 +89,7 @@ public class JobModel {
    * @param key mapping key which is one of the keys declared in {@link org.apache.samza.coordinator.stream.messages.SetContainerHostMapping}
    * @return the value if it exists for a given container and key, otherwise an empty string
    */
-  public String getContainerToHostValue(Integer containerId, String key) {
+  public String getContainerToHostValue(String containerId, String key) {
     if (localityManager == null) {
       return EMPTY_STRING;
     }
@@ -103,12 +103,12 @@ public class JobModel {
     return mappings.get(key);
   }
 
-  public Map<Integer, String> getAllContainerToHostValues(String key) {
+  public Map<String, String> getAllContainerToHostValues(String key) {
     if (localityManager == null) {
       return Collections.EMPTY_MAP;
     }
-    Map<Integer, String> allValues = new HashMap<>();
-    for (Map.Entry<Integer, Map<String, String>> entry : localityManager.readContainerLocality().entrySet()) {
+    Map<String, String> allValues = new HashMap<>();
+    for (Map.Entry<String, Map<String, String>> entry : localityManager.readContainerLocality().entrySet()) {
       String value = entry.getValue().get(key);
       if (value != null) {
         allValues.put(entry.getKey(), value);
@@ -118,8 +118,8 @@ public class JobModel {
   }
 
   private void populateContainerLocalityMappings() {
-    Map<Integer, Map<String, String>> allMappings = localityManager.readContainerLocality();
-    for (Integer containerId: containers.keySet()) {
+    Map<String, Map<String, String>> allMappings = localityManager.readContainerLocality();
+    for (String containerId: containers.keySet()) {
       if (allMappings.containsKey(containerId)) {
         localityMappings.put(containerId, allMappings.get(containerId).get(SetContainerHostMapping.HOST_KEY));
       } else {
@@ -128,14 +128,14 @@ public class JobModel {
     }
   }
 
-  public Map<Integer, String> getAllContainerLocality() {
+  public Map<String, String> getAllContainerLocality() {
     if (localityManager != null) {
       populateContainerLocalityMappings();
     }
     return localityMappings;
   }
 
-  public Map<Integer, ContainerModel> getContainers() {
+  public Map<String, ContainerModel> getContainers() {
     return containers;
   }
 

http://git-wip-us.apache.org/repos/asf/samza/blob/a7da1840/samza-core/src/main/java/org/apache/samza/processor/SamzaContainerController.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/processor/SamzaContainerController.java b/samza-core/src/main/java/org/apache/samza/processor/SamzaContainerController.java
index 76e2053..c292067 100644
--- a/samza-core/src/main/java/org/apache/samza/processor/SamzaContainerController.java
+++ b/samza-core/src/main/java/org/apache/samza/processor/SamzaContainerController.java
@@ -44,7 +44,7 @@ import java.util.concurrent.TimeoutException;
 public class SamzaContainerController {
   private static final Logger log = LoggerFactory.getLogger(SamzaContainerController.class);
 
-  private final ExecutorService executorService;
+  private ExecutorService executorService;
   private volatile SamzaContainer container;
   private final Map<String, MetricsReporter> metricsReporterMap;
   private final Object taskFactory;
@@ -60,16 +60,12 @@ public class SamzaContainerController {
    * @param taskFactory         Factory that be used create instances of {@link org.apache.samza.task.StreamTask} or
    *                            {@link org.apache.samza.task.AsyncStreamTask}
    * @param containerShutdownMs How long the Samza container should wait for an orderly shutdown of task instances
-   * @param processorId         Id of the processor
    * @param metricsReporterMap  Map of metric reporter name and {@link MetricsReporter} instance
    */
   public SamzaContainerController(
       Object taskFactory,
       long containerShutdownMs,
-      String processorId,
       Map<String, MetricsReporter> metricsReporterMap) {
-    this.executorService = Executors.newSingleThreadExecutor(new ThreadFactoryBuilder()
-        .setNameFormat("p" + processorId + "-container-thread-%d").build());
     this.taskFactory = taskFactory;
     this.metricsReporterMap = metricsReporterMap;
     if (containerShutdownMs == -1) {
@@ -94,11 +90,11 @@ public class SamzaContainerController {
   public void startContainer(ContainerModel containerModel, Config config, int maxChangelogStreamPartitions) {
     LocalityManager localityManager = null;
     if (new ClusterManagerConfig(config).getHostAffinityEnabled()) {
-      localityManager = SamzaContainer$.MODULE$.getLocalityManager(containerModel.getContainerId(), config);
+      localityManager = SamzaContainer$.MODULE$.getLocalityManager(containerModel.getProcessorId(), config);
     }
-    log.info("About to create container: " + containerModel.getContainerId());
+    log.info("About to create container: " + containerModel.getProcessorId());
     container = SamzaContainer$.MODULE$.apply(
-        containerModel.getContainerId(),
+        containerModel.getProcessorId(),
         containerModel,
         config,
         maxChangelogStreamPartitions,
@@ -106,7 +102,9 @@ public class SamzaContainerController {
         new JmxServer(),
         Util.<String, MetricsReporter>javaMapAsScalaMap(metricsReporterMap),
         taskFactory);
-    log.info("About to start container: " + containerModel.getContainerId());
+    log.info("About to start container: " + containerModel.getProcessorId());
+    executorService = Executors.newSingleThreadExecutor(new ThreadFactoryBuilder()
+          .setNameFormat("p-" + containerModel.getProcessorId() + "-container-thread-%d").build());
     containerFuture = executorService.submit(() -> container.run());
   }
 
@@ -148,6 +146,8 @@ public class SamzaContainerController {
    */
   public void shutdown() {
     stopContainer();
-    executorService.shutdown();
+    if (executorService != null) {
+      executorService.shutdown();
+    }
   }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/a7da1840/samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java b/samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java
index a39c3b9..5a8673a 100644
--- a/samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java
+++ b/samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java
@@ -19,9 +19,10 @@
 package org.apache.samza.processor;
 
 import org.apache.samza.annotation.InterfaceStability;
+import org.apache.samza.config.ApplicationConfig;
 import org.apache.samza.config.Config;
+import org.apache.samza.config.ConfigException;
 import org.apache.samza.config.JobCoordinatorConfig;
-import org.apache.samza.config.MapConfig;
 import org.apache.samza.config.TaskConfigJava;
 import org.apache.samza.coordinator.JobCoordinator;
 import org.apache.samza.coordinator.JobCoordinatorFactory;
@@ -29,10 +30,7 @@ import org.apache.samza.metrics.MetricsReporter;
 import org.apache.samza.task.AsyncStreamTaskFactory;
 import org.apache.samza.task.StreamTaskFactory;
 import org.apache.samza.util.Util;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
-import java.util.HashMap;
 import java.util.Map;
 
 /**
@@ -60,17 +58,6 @@ import java.util.Map;
  */
 @InterfaceStability.Evolving
 public class StreamProcessor {
-  private static final Logger log = LoggerFactory.getLogger(StreamProcessor.class);
-  /**
-   * processor.id is equivalent to containerId in samza. It is a logical identifier used by Samza for a processor.
-   * In a distributed environment, this logical identifier is mapped to a physical identifier of the resource. For
-   * example, Yarn provides a "containerId" for every resource it allocates.
-   * In an embedded environment, this identifier is provided by the user by directly using the StreamProcessor API.
-   * <p>
-   * <b>Note:</b>This identifier has to be unique across the instances of StreamProcessors.
-   */
-  private static final String PROCESSOR_ID = "processor.id";
-  private final int processorId;
   private final JobCoordinator jobCoordinator;
 
   /**
@@ -83,51 +70,49 @@ public class StreamProcessor {
    * <p>
    * <b>Note:</b> Lifecycle of the ExecutorService is fully managed by the StreamProcessor, and NOT exposed to the user
    *
-   * @param processorId            Unique identifier for a processor within the job. It has the same semantics as
-   *                               "containerId" in Samza
    * @param config                 Instance of config object - contains all configuration required for processing
    * @param customMetricsReporters Map of custom MetricReporter instances that are to be injected in the Samza job
    * @param asyncStreamTaskFactory The {@link AsyncStreamTaskFactory} to be used for creating task instances.
    */
-  public StreamProcessor(int processorId, Config config, Map<String, MetricsReporter> customMetricsReporters,
+  public StreamProcessor(Config config, Map<String, MetricsReporter> customMetricsReporters,
                          AsyncStreamTaskFactory asyncStreamTaskFactory) {
-    this(processorId, config, customMetricsReporters, (Object) asyncStreamTaskFactory);
+    this(config, customMetricsReporters, (Object) asyncStreamTaskFactory);
   }
 
 
   /**
-   *Same as {@link #StreamProcessor(int, Config, Map, AsyncStreamTaskFactory)}, except task instances are created
+   *Same as {@link #StreamProcessor(Config, Map, AsyncStreamTaskFactory)}, except task instances are created
    * using the provided {@link StreamTaskFactory}.
-   * @param processorId - this processor Id
    * @param config - config
    * @param customMetricsReporters metric Reporter
    * @param streamTaskFactory task factory to instantiate the Task
    */
-  public StreamProcessor(int processorId, Config config, Map<String, MetricsReporter> customMetricsReporters,
+  public StreamProcessor(Config config, Map<String, MetricsReporter> customMetricsReporters,
                          StreamTaskFactory streamTaskFactory) {
-    this(processorId, config, customMetricsReporters, (Object) streamTaskFactory);
+    this(config, customMetricsReporters, (Object) streamTaskFactory);
   }
 
-  private StreamProcessor(int processorId, Config config, Map<String, MetricsReporter> customMetricsReporters,
+  private StreamProcessor(Config config, Map<String, MetricsReporter> customMetricsReporters,
                           Object taskFactory) {
-    this.processorId = processorId;
-
-    Map<String, String> updatedConfigMap = new HashMap<>();
-    updatedConfigMap.putAll(config);
-    updatedConfigMap.put(PROCESSOR_ID, String.valueOf(this.processorId));
-    Config updatedConfig = new MapConfig(updatedConfigMap);
+    // TODO: This check to be removed after 0.13+
+    ApplicationConfig applicationConfig = new ApplicationConfig(config);
+    if (applicationConfig.getProcessorId() == null &&
+        applicationConfig.getAppProcessorIdGeneratorClass() == null) {
+      throw new ConfigException(
+          String.format("Expected either %s or %s to be configured", ApplicationConfig.PROCESSOR_ID,
+              ApplicationConfig.APP_PROCESSOR_ID_GENERATOR_CLASS));
+    }
 
     SamzaContainerController containerController = new SamzaContainerController(
         taskFactory,
-        new TaskConfigJava(updatedConfig).getShutdownMs(),
-        String.valueOf(processorId),
+        new TaskConfigJava(config).getShutdownMs(),
         customMetricsReporters);
 
     this.jobCoordinator = Util.
         <JobCoordinatorFactory>getObj(
-            new JobCoordinatorConfig(updatedConfig)
+            new JobCoordinatorConfig(config)
                 .getJobCoordinatorFactoryClassName())
-        .getJobCoordinator(processorId, updatedConfig, containerController);
+        .getJobCoordinator(config, containerController);
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/samza/blob/a7da1840/samza-core/src/main/java/org/apache/samza/runtime/LocalContainerRunner.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/runtime/LocalContainerRunner.java b/samza-core/src/main/java/org/apache/samza/runtime/LocalContainerRunner.java
index 49c3228..d790fb1 100644
--- a/samza-core/src/main/java/org/apache/samza/runtime/LocalContainerRunner.java
+++ b/samza-core/src/main/java/org/apache/samza/runtime/LocalContainerRunner.java
@@ -52,9 +52,9 @@ import org.slf4j.LoggerFactory;
 public class LocalContainerRunner extends AbstractApplicationRunner {
   private static final Logger log = LoggerFactory.getLogger(LocalContainerRunner.class);
   private final JobModel jobModel;
-  private final int containerId;
+  private final String containerId;
 
-  public LocalContainerRunner(JobModel jobModel, int containerId) {
+  public LocalContainerRunner(JobModel jobModel, String containerId) {
     super(jobModel.getConfig());
     this.jobModel = jobModel;
     this.containerId = containerId;
@@ -69,13 +69,13 @@ public class LocalContainerRunner extends AbstractApplicationRunner {
       Object taskFactory = TaskFactoryUtil.createTaskFactory(config, streamApp, this);
 
       SamzaContainer container = SamzaContainer$.MODULE$.apply(
-          containerModel.getContainerId(),
+          containerModel.getProcessorId(),
           containerModel,
           config,
           jobModel.maxChangeLogStreamPartitions,
           SamzaContainer.getLocalityManager(containerId, config),
           jmxServer,
-          Util.javaMapAsScalaMap(new HashMap<String, MetricsReporter>()),
+          Util.<String, MetricsReporter>javaMapAsScalaMap(new HashMap<>()),
           taskFactory);
 
       container.run();
@@ -104,7 +104,7 @@ public class LocalContainerRunner extends AbstractApplicationRunner {
         System.exit(1);
       });
 
-    Integer containerId = Integer.valueOf(System.getenv(ShellCommandConfig.ENV_CONTAINER_ID()));
+    String containerId = System.getenv(ShellCommandConfig.ENV_CONTAINER_ID());
     log.info(String.format("Got container ID: %d", containerId));
     String coordinatorUrl = System.getenv(ShellCommandConfig.ENV_COORDINATOR_URL());
     log.info(String.format("Got coordinator URL: %s", coordinatorUrl));

http://git-wip-us.apache.org/repos/asf/samza/blob/a7da1840/samza-core/src/main/java/org/apache/samza/runtime/UUIDGenerator.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/runtime/UUIDGenerator.java b/samza-core/src/main/java/org/apache/samza/runtime/UUIDGenerator.java
new file mode 100644
index 0000000..afc20b1
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/runtime/UUIDGenerator.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.runtime;
+
+import org.apache.samza.config.Config;
+
+import java.util.UUID;
+
+public class UUIDGenerator implements ProcessorIdGenerator {
+  /**
+   * Generates a String representation to identify the processor instance
+   * This value can be representative of its current executing environment. It can also be custom-managed by the user.
+   * <p>
+   * <b>Note</b>: When more than one processor exist within the same JVM, there is no need to use a static counter in
+   * this generator to adhere to the "$x_$y" format specified in {@link ProcessorIdGenerator} since each UUID is already
+   * unique by itself
+   *
+   * @param config Config instance
+   * @return String Identifier for the processor
+   */
+  @Override
+  public String generateProcessorId(Config config) {
+    return UUID.randomUUID().toString();
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/a7da1840/samza-core/src/main/java/org/apache/samza/serializers/model/JsonContainerModelMixIn.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/serializers/model/JsonContainerModelMixIn.java b/samza-core/src/main/java/org/apache/samza/serializers/model/JsonContainerModelMixIn.java
index f197a95..e19afec 100644
--- a/samza-core/src/main/java/org/apache/samza/serializers/model/JsonContainerModelMixIn.java
+++ b/samza-core/src/main/java/org/apache/samza/serializers/model/JsonContainerModelMixIn.java
@@ -19,23 +19,28 @@
 
 package org.apache.samza.serializers.model;
 
-import java.util.Map;
 import org.apache.samza.container.TaskName;
 import org.apache.samza.job.model.TaskModel;
 import org.codehaus.jackson.annotate.JsonCreator;
 import org.codehaus.jackson.annotate.JsonProperty;
 
+import java.util.Map;
+
 /**
  * A mix-in Jackson class to convert Samza's ContainerModel to/from JSON.
  */
 public abstract class JsonContainerModelMixIn {
   @JsonCreator
-  public JsonContainerModelMixIn(@JsonProperty("container-id") int containerId, @JsonProperty("tasks") Map<TaskName, TaskModel> tasks) {
+  public JsonContainerModelMixIn(@JsonProperty("processor-id") String processorId, @JsonProperty("tasks") Map<TaskName, TaskModel> tasks) {
   }
 
+  @Deprecated
   @JsonProperty("container-id")
   abstract int getContainerId();
 
+  @JsonProperty("processor-id")
+  abstract String getProcessorId();
+
   @JsonProperty("tasks")
   abstract Map<TaskName, TaskModel> getTasks();
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/a7da1840/samza-core/src/main/java/org/apache/samza/serializers/model/JsonJobModelMixIn.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/serializers/model/JsonJobModelMixIn.java b/samza-core/src/main/java/org/apache/samza/serializers/model/JsonJobModelMixIn.java
index 037b5e2..4b0c404 100644
--- a/samza-core/src/main/java/org/apache/samza/serializers/model/JsonJobModelMixIn.java
+++ b/samza-core/src/main/java/org/apache/samza/serializers/model/JsonJobModelMixIn.java
@@ -30,12 +30,12 @@ import org.codehaus.jackson.annotate.JsonProperty;
  */
 public abstract class JsonJobModelMixIn {
   @JsonCreator
-  public JsonJobModelMixIn(@JsonProperty("config") Config config, @JsonProperty("containers") Map<Integer, ContainerModel> containers) {
+  public JsonJobModelMixIn(@JsonProperty("config") Config config, @JsonProperty("containers") Map<String, ContainerModel> containers) {
   }
 
   @JsonProperty("config")
   abstract Config getConfig();
 
   @JsonProperty("containers")
-  abstract Map<Integer, ContainerModel> getContainers();
+  abstract Map<String, ContainerModel> getContainers();
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/samza/blob/a7da1840/samza-core/src/main/java/org/apache/samza/serializers/model/SamzaObjectMapper.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/serializers/model/SamzaObjectMapper.java b/samza-core/src/main/java/org/apache/samza/serializers/model/SamzaObjectMapper.java
index 83e6b8c..f8c4d43 100644
--- a/samza-core/src/main/java/org/apache/samza/serializers/model/SamzaObjectMapper.java
+++ b/samza-core/src/main/java/org/apache/samza/serializers/model/SamzaObjectMapper.java
@@ -19,10 +19,8 @@
 
 package org.apache.samza.serializers.model;
 
-import java.io.IOException;
-import java.util.HashMap;
-import java.util.Map;
 import org.apache.samza.Partition;
+import org.apache.samza.SamzaException;
 import org.apache.samza.config.Config;
 import org.apache.samza.config.MapConfig;
 import org.apache.samza.container.TaskName;
@@ -50,6 +48,10 @@ import org.codehaus.jackson.map.introspect.AnnotatedMethod;
 import org.codehaus.jackson.map.module.SimpleModule;
 import org.codehaus.jackson.type.TypeReference;
 
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
 /**
  * <p>
  * A collection of utility classes and (de)serializers to make Samza's job model
@@ -89,10 +91,30 @@ public class SamzaObjectMapper {
     mapper.getSerializationConfig().addMixInAnnotations(TaskModel.class, JsonTaskModelMixIn.class);
     mapper.getDeserializationConfig().addMixInAnnotations(TaskModel.class, JsonTaskModelMixIn.class);
     mapper.getSerializationConfig().addMixInAnnotations(ContainerModel.class, JsonContainerModelMixIn.class);
-    mapper.getDeserializationConfig().addMixInAnnotations(ContainerModel.class, JsonContainerModelMixIn.class);
     mapper.getSerializationConfig().addMixInAnnotations(JobModel.class, JsonJobModelMixIn.class);
     mapper.getDeserializationConfig().addMixInAnnotations(JobModel.class, JsonJobModelMixIn.class);
 
+    module.addDeserializer(ContainerModel.class, new JsonDeserializer<ContainerModel>() {
+      @Override
+      public ContainerModel deserialize(JsonParser jp, DeserializationContext ctxt) throws IOException, JsonProcessingException {
+        ObjectCodec oc = jp.getCodec();
+        JsonNode node = oc.readTree(jp);
+        int containerId = node.get("container-id").getIntValue();
+        if (node.get("container-id") == null) {
+          throw new SamzaException("JobModel did not contain a container-id. This can never happen. JobModel corrupt!");
+        }
+        String processorId;
+        if (node.get("processor-id") == null) {
+          processorId = String.valueOf(containerId);
+        } else {
+          processorId = node.get("processor-id").getTextValue();
+        }
+        Map<TaskName, TaskModel> tasksMapping =
+            OBJECT_MAPPER.readValue(node.get("tasks"), new TypeReference<Map<TaskName, TaskModel>>() { });
+        return new ContainerModel(processorId, containerId, tasksMapping);
+      }
+    });
+
     // Convert camel case to hyphenated field names, and register the module.
     mapper.setPropertyNamingStrategy(new CamelCaseToDashesStrategy());
     mapper.registerModule(module);

http://git-wip-us.apache.org/repos/asf/samza/blob/a7da1840/samza-core/src/main/java/org/apache/samza/standalone/StandaloneJobCoordinator.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/standalone/StandaloneJobCoordinator.java b/samza-core/src/main/java/org/apache/samza/standalone/StandaloneJobCoordinator.java
index b2927f4..7efc6df 100644
--- a/samza-core/src/main/java/org/apache/samza/standalone/StandaloneJobCoordinator.java
+++ b/samza-core/src/main/java/org/apache/samza/standalone/StandaloneJobCoordinator.java
@@ -19,22 +19,25 @@
 package org.apache.samza.standalone;
 
 import com.google.common.annotations.VisibleForTesting;
-import java.util.Collections;
 import org.apache.samza.SamzaException;
+import org.apache.samza.config.ApplicationConfig;
 import org.apache.samza.config.Config;
 import org.apache.samza.config.JavaSystemConfig;
 import org.apache.samza.coordinator.JobCoordinator;
 import org.apache.samza.coordinator.JobModelManager;
 import org.apache.samza.job.model.JobModel;
 import org.apache.samza.processor.SamzaContainerController;
+import org.apache.samza.runtime.ProcessorIdGenerator;
 import org.apache.samza.system.StreamMetadataCache;
 import org.apache.samza.system.SystemAdmin;
 import org.apache.samza.system.SystemFactory;
+import org.apache.samza.util.ClassLoaderHelper;
 import org.apache.samza.util.SystemClock;
 import org.apache.samza.util.Util;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
 
@@ -63,28 +66,37 @@ import java.util.Map;
  * */
 public class StandaloneJobCoordinator implements JobCoordinator {
   private static final Logger log = LoggerFactory.getLogger(StandaloneJobCoordinator.class);
-  private final int processorId;
+  private final String processorId;
   private final Config config;
   private final JobModel jobModel;
   private final SamzaContainerController containerController;
 
   @VisibleForTesting
   StandaloneJobCoordinator(
-      int processorId,
+      ProcessorIdGenerator processorIdGenerator,
       Config config,
       SamzaContainerController containerController,
       JobModel jobModel) {
-    this.processorId = processorId;
+    this.processorId = processorIdGenerator.generateProcessorId(config);
     this.config = config;
     this.containerController = containerController;
     this.jobModel = jobModel;
   }
 
-  public StandaloneJobCoordinator(int processorId, Config config, SamzaContainerController containerController) {
-    this.processorId = processorId;
+  public StandaloneJobCoordinator(Config config, SamzaContainerController containerController) {
     this.config = config;
     this.containerController = containerController;
 
+    ApplicationConfig appConfig = new ApplicationConfig(config);
+    if (appConfig.getProcessorId() != null) {     // TODO: This check to be removed after 0.13+
+      this.processorId = appConfig.getProcessorId();
+    } else {
+      ProcessorIdGenerator idGenerator =
+          ClassLoaderHelper.fromClassName(
+              new ApplicationConfig(config).getAppProcessorIdGeneratorClass(), ProcessorIdGenerator.class);
+      this.processorId = idGenerator.generateProcessorId(config);
+    }
+
     JavaSystemConfig systemConfig = new JavaSystemConfig(this.config);
     Map<String, SystemAdmin> systemAdmins = new HashMap<>();
     for (String systemName: systemConfig.getSystemNames()) {
@@ -113,7 +125,7 @@ public class StandaloneJobCoordinator implements JobCoordinator {
     // No-op
     JobModel jobModel = getJobModel();
     containerController.startContainer(
-        jobModel.getContainers().get(processorId),
+        jobModel.getContainers().get(getProcessorId()),
         jobModel.getConfig(),
         jobModel.maxChangeLogStreamPartitions);
   }
@@ -137,8 +149,8 @@ public class StandaloneJobCoordinator implements JobCoordinator {
   }
 
   @Override
-  public int getProcessorId() {
-    return this.processorId;
+  public String getProcessorId() {
+    return processorId;
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/samza/blob/a7da1840/samza-core/src/main/java/org/apache/samza/standalone/StandaloneJobCoordinatorFactory.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/standalone/StandaloneJobCoordinatorFactory.java b/samza-core/src/main/java/org/apache/samza/standalone/StandaloneJobCoordinatorFactory.java
index 7ca85c0..eada6e9 100644
--- a/samza-core/src/main/java/org/apache/samza/standalone/StandaloneJobCoordinatorFactory.java
+++ b/samza-core/src/main/java/org/apache/samza/standalone/StandaloneJobCoordinatorFactory.java
@@ -25,7 +25,7 @@ import org.apache.samza.processor.SamzaContainerController;
 
 public class StandaloneJobCoordinatorFactory  implements JobCoordinatorFactory {
   @Override
-  public JobCoordinator getJobCoordinator(int processorId, Config config, SamzaContainerController containerController) {
-    return new StandaloneJobCoordinator(processorId, config, containerController);
+  public JobCoordinator getJobCoordinator(Config config, SamzaContainerController containerController) {
+    return new StandaloneJobCoordinator(config, containerController);
   }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/samza/blob/a7da1840/samza-core/src/main/java/org/apache/samza/storage/StorageRecovery.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/storage/StorageRecovery.java b/samza-core/src/main/java/org/apache/samza/storage/StorageRecovery.java
index 9471a23..e50f221 100644
--- a/samza-core/src/main/java/org/apache/samza/storage/StorageRecovery.java
+++ b/samza-core/src/main/java/org/apache/samza/storage/StorageRecovery.java
@@ -65,7 +65,7 @@ public class StorageRecovery extends CommandLine {
   private HashMap<String, StorageEngineFactory<?, ?>> storageEngineFactories = new HashMap<String, StorageEngineFactory<?, ?>>();
   private HashMap<String, SystemFactory> systemFactories = new HashMap<String, SystemFactory>();
   private HashMap<String, SystemAdmin> systemAdmins = new HashMap<String, SystemAdmin>();
-  private Map<Integer, ContainerModel> containers = new HashMap<Integer, ContainerModel>();
+  private Map<String, ContainerModel> containers = new HashMap<String, ContainerModel>();
   private List<TaskStorageManager> taskStorageManagers = new ArrayList<TaskStorageManager>();
   private Logger log = LoggerFactory.getLogger(StorageRecovery.class);
 
@@ -211,7 +211,7 @@ public class StorageRecovery extends CommandLine {
 
     for (ContainerModel containerModel : containers.values()) {
       HashMap<String, StorageEngine> taskStores = new HashMap<String, StorageEngine>();
-      SamzaContainerContext containerContext = new SamzaContainerContext(containerModel.getContainerId(), jobConfig, containerModel.getTasks()
+      SamzaContainerContext containerContext = new SamzaContainerContext(containerModel.getProcessorId(), jobConfig, containerModel.getTasks()
           .keySet());
 
       for (TaskModel taskModel : containerModel.getTasks().values()) {

http://git-wip-us.apache.org/repos/asf/samza/blob/a7da1840/samza-core/src/main/java/org/apache/samza/util/ClassLoaderHelper.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/util/ClassLoaderHelper.java b/samza-core/src/main/java/org/apache/samza/util/ClassLoaderHelper.java
index f2b389b..3680b4f 100644
--- a/samza-core/src/main/java/org/apache/samza/util/ClassLoaderHelper.java
+++ b/samza-core/src/main/java/org/apache/samza/util/ClassLoaderHelper.java
@@ -19,6 +19,10 @@
 
 package org.apache.samza.util;
 
+import org.apache.samza.config.ConfigException;
+
+import java.lang.reflect.Constructor;
+
 public class ClassLoaderHelper {
 
   public static <T> T fromClassName(String className) throws ClassNotFoundException, InstantiationException, IllegalAccessException {
@@ -26,4 +30,19 @@ public class ClassLoaderHelper {
     T instance = clazz.newInstance();
     return instance;
   }
+
+  public static <T> T fromClassName(String className, Class<T> classType) {
+    try {
+      Class<?> idGeneratorClass = Class.forName(className);
+      if (!classType.isAssignableFrom(idGeneratorClass)) {
+        throw new ConfigException(String.format(
+            "Class %s is not of type %s", className, classType));
+      }
+      Constructor<?> constructor = idGeneratorClass.getConstructor();
+      return (T) constructor.newInstance();
+    } catch (Exception e) {
+      throw new ConfigException(String.format(
+          "Problem in loading %s class %s", classType, className), e);
+    }
+  }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/samza/blob/a7da1840/samza-core/src/main/java/org/apache/samza/zk/ZkCoordinationServiceFactory.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkCoordinationServiceFactory.java b/samza-core/src/main/java/org/apache/samza/zk/ZkCoordinationServiceFactory.java
index cc454e3..21a6b03 100644
--- a/samza-core/src/main/java/org/apache/samza/zk/ZkCoordinationServiceFactory.java
+++ b/samza-core/src/main/java/org/apache/samza/zk/ZkCoordinationServiceFactory.java
@@ -26,12 +26,10 @@ import org.apache.samza.coordinator.CoordinationServiceFactory;
 
 
 public class ZkCoordinationServiceFactory implements CoordinationServiceFactory {
-
-
   synchronized public CoordinationUtils getCoordinationService(String groupId, String participantId, Config config) {
     ZkConfig zkConfig = new ZkConfig(config);
     ZkClient zkClient = new ZkClient(zkConfig.getZkConnect(), zkConfig.getZkSessionTimeoutMs(), zkConfig.getZkConnectionTimeoutMs());
-    ZkUtils zkUtils = new ZkUtils(participantId, new ZkKeyBuilder(groupId), zkClient, zkConfig.getZkConnectionTimeoutMs());
+    ZkUtils zkUtils = new ZkUtils(new ZkKeyBuilder(groupId), zkClient, zkConfig.getZkConnectionTimeoutMs());
     ScheduleAfterDebounceTime debounceTimer = new ScheduleAfterDebounceTime();
     return new ZkCoordinationUtils(participantId, zkConfig, zkUtils, debounceTimer);
   }