You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by ya...@apache.org on 2015/08/04 01:29:45 UTC

[2/2] samza git commit: SAMZA-682: refactored coordinator stream messages.

SAMZA-682: refactored coordinator stream messages.


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/59ec3b78
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/59ec3b78
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/59ec3b78

Branch: refs/heads/master
Commit: 59ec3b789c81d17e820e1587be66e756c4daeaa6
Parents: c9fa796
Author: József Márton Jung <j....@levi9.com>
Authored: Mon Aug 3 16:30:15 2015 -0700
Committer: Yan Fang <ya...@gmail.com>
Committed: Mon Aug 3 16:30:15 2015 -0700

----------------------------------------------------------------------
 checkstyle/import-control.xml                   |  12 +
 .../samza/checkpoint/CheckpointManager.java     |  44 +-
 .../apache/samza/container/LocalityManager.java |  44 +-
 .../AbstractCoordinatorStreamManager.java       | 112 ++++
 .../stream/CoordinatorStreamMessage.java        | 527 -------------------
 .../stream/CoordinatorStreamSystemConsumer.java |   6 +-
 .../stream/CoordinatorStreamSystemProducer.java |   4 +-
 .../stream/CoordinatorStreamWriter.java         |   5 +-
 .../messages/CoordinatorStreamMessage.java      | 321 +++++++++++
 .../coordinator/stream/messages/Delete.java     |  58 ++
 .../stream/messages/SetChangelogMapping.java    |  64 +++
 .../stream/messages/SetCheckpoint.java          |  74 +++
 .../coordinator/stream/messages/SetConfig.java  |  59 +++
 .../messages/SetContainerHostMapping.java       |  95 ++++
 .../org/apache/samza/job/model/JobModel.java    |   2 +-
 .../storage/ChangelogPartitionManager.java      |  45 +-
 .../samza/coordinator/JobCoordinator.scala      |   2 +-
 .../scala/org/apache/samza/job/JobRunner.scala  |   5 +-
 .../MockCoordinatorStreamWrappedConsumer.java   |   9 +-
 .../stream/TestCoordinatorStreamMessage.java    |  10 +-
 .../TestCoordinatorStreamSystemConsumer.java    |  15 +-
 .../TestCoordinatorStreamSystemProducer.java    |   9 +-
 .../stream/TestCoordinatorStreamWriter.java     |   1 +
 .../samza/container/TestSamzaContainer.scala    |   2 +-
 .../resources/scalate/WEB-INF/views/index.scaml |   4 +-
 25 files changed, 883 insertions(+), 646 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/59ec3b78/checkstyle/import-control.xml
----------------------------------------------------------------------
diff --git a/checkstyle/import-control.xml b/checkstyle/import-control.xml
index 6654319..24ed680 100644
--- a/checkstyle/import-control.xml
+++ b/checkstyle/import-control.xml
@@ -52,6 +52,8 @@
         <allow pkg="org.apache.samza.config" />
 
         <subpackage name="model">
+            <allow pkg="org.apache.samza.coordinator.stream.messages" />
+
             <allow class="org.apache.samza.Partition" />
             <allow class="org.apache.samza.container.TaskName" />
             <allow class="org.apache.samza.system.SystemStreamPartition" />
@@ -101,6 +103,7 @@
     <subpackage name="container">
         <allow pkg="org.apache.samza.config" />
         <allow pkg="org.apache.samza.coordinator.stream" />
+        <allow class="org.apache.samza.coordinator.stream.AbstractCoordinatorStreamManager" />
         <subpackage name="grouper">
             <subpackage name="stream">
                 <allow pkg="org.apache.samza.container" />
@@ -120,10 +123,12 @@
         <allow pkg="org.apache.samza.system" />
         <allow pkg="org.apache.samza.serializers" />
         <allow pkg="org.apache.samza.util" />
+        <allow pkg="org.apache.samza.coordinator.stream.messages" />
 
         <allow class="org.apache.samza.Partition" />
         <allow class="org.apache.samza.SamzaException" />
         <allow class="joptsimple.OptionSet" />
+        <allow class="org.apache.samza.container.TaskName" />
     </subpackage>
 
     <subpackage name="checkpoint">
@@ -134,6 +139,7 @@
         <allow pkg="org.apache.samza.system" />
 
         <allow class="org.apache.samza.SamzaException" />
+        <allow class="org.apache.samza.coordinator.stream.AbstractCoordinatorStreamManager" />
     </subpackage>
 
     <subpackage name="storage">
@@ -143,6 +149,7 @@
         <allow pkg="org.apache.samza.serializers" />
         <allow pkg="org.apache.samza.system" />
         <allow pkg="org.apache.samza.task" />
+        <allow class="org.apache.samza.coordinator.stream.AbstractCoordinatorStreamManager" />
         <allow pkg="org.apache.samza.util" />
         <allow pkg="org.apache.samza.job" />
         <allow pkg="org.apache.samza.config" />
@@ -181,4 +188,9 @@
         </subpackage>
     </subpackage>
 
+    <subpackage name="manager">
+        <allow pkg="org.apache.samza.coordinator.stream" />
+        <allow class="org.apache.samza.container.TaskName" />
+    </subpackage>
+
 </import-control>

http://git-wip-us.apache.org/repos/asf/samza/blob/59ec3b78/samza-core/src/main/java/org/apache/samza/checkpoint/CheckpointManager.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/checkpoint/CheckpointManager.java b/samza-core/src/main/java/org/apache/samza/checkpoint/CheckpointManager.java
index 7445996..0185751 100644
--- a/samza-core/src/main/java/org/apache/samza/checkpoint/CheckpointManager.java
+++ b/samza-core/src/main/java/org/apache/samza/checkpoint/CheckpointManager.java
@@ -22,12 +22,12 @@ package org.apache.samza.checkpoint;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Map;
-import java.util.Set;
 import org.apache.samza.container.TaskName;
-import org.apache.samza.coordinator.stream.CoordinatorStreamMessage;
-import org.apache.samza.coordinator.stream.CoordinatorStreamMessage.SetCheckpoint;
+import org.apache.samza.coordinator.stream.messages.CoordinatorStreamMessage;
+import org.apache.samza.coordinator.stream.messages.SetCheckpoint;
 import org.apache.samza.coordinator.stream.CoordinatorStreamSystemConsumer;
 import org.apache.samza.coordinator.stream.CoordinatorStreamSystemProducer;
+import org.apache.samza.coordinator.stream.AbstractCoordinatorStreamManager;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -36,34 +36,18 @@ import org.slf4j.LoggerFactory;
  * The CheckpointManager is used to persist and restore checkpoint information. The CheckpointManager uses
  * CoordinatorStream underneath to do this.
  */
-public class CheckpointManager {
+public class CheckpointManager extends AbstractCoordinatorStreamManager {
 
   private static final Logger log = LoggerFactory.getLogger(CheckpointManager.class);
-  private final CoordinatorStreamSystemProducer coordinatorStreamProducer;
-  private final CoordinatorStreamSystemConsumer coordinatorStreamConsumer;
   private final Map<TaskName, Checkpoint> taskNamesToOffsets;
   private final HashSet<TaskName> taskNames;
-  private String source;
-
-  public CheckpointManager(CoordinatorStreamSystemProducer coordinatorStreamProducer,
-      CoordinatorStreamSystemConsumer coordinatorStreamConsumer) {
-    this.coordinatorStreamConsumer = coordinatorStreamConsumer;
-    this.coordinatorStreamProducer = coordinatorStreamProducer;
-    taskNamesToOffsets = new HashMap<TaskName, Checkpoint>();
-    taskNames = new HashSet<TaskName>();
-    this.source = "Unknown";
-  }
 
   public CheckpointManager(CoordinatorStreamSystemProducer coordinatorStreamProducer,
       CoordinatorStreamSystemConsumer coordinatorStreamConsumer,
       String source) {
-    this(coordinatorStreamProducer, coordinatorStreamConsumer);
-    this.source = source;
-  }
-
-  public void start() {
-    coordinatorStreamProducer.start();
-    coordinatorStreamConsumer.start();
+    super(coordinatorStreamProducer, coordinatorStreamConsumer, source);
+    taskNamesToOffsets = new HashMap<TaskName, Checkpoint>();
+    taskNames = new HashSet<TaskName>();
   }
 
   /**
@@ -73,8 +57,8 @@ public class CheckpointManager {
   public void register(TaskName taskName) {
     log.debug("Adding taskName {} to {}", taskName, this);
     taskNames.add(taskName);
-    coordinatorStreamConsumer.register();
-    coordinatorStreamProducer.register(taskName.getTaskName());
+    registerCoordinatorStreamConsumer();
+    registerCoordinatorStreamProducer(taskName.getTaskName());
   }
 
   /**
@@ -84,8 +68,7 @@ public class CheckpointManager {
    */
   public void writeCheckpoint(TaskName taskName, Checkpoint checkpoint) {
     log.debug("Writing checkpoint for Task: {} with offsets: {}", taskName.getTaskName(), checkpoint.getOffsets());
-    SetCheckpoint checkPointMessage = new SetCheckpoint(source, taskName.getTaskName(), checkpoint);
-    coordinatorStreamProducer.send(checkPointMessage);
+    send(new SetCheckpoint(getSource(), taskName.getTaskName(), checkpoint));
   }
 
   /**
@@ -96,8 +79,7 @@ public class CheckpointManager {
   public Checkpoint readLastCheckpoint(TaskName taskName) {
     // Bootstrap each time to make sure that we are caught up with the stream, the bootstrap will just catch up on consecutive calls
     log.debug("Reading checkpoint for Task: {}", taskName.getTaskName());
-    Set<CoordinatorStreamMessage> bootstrappedStream = coordinatorStreamConsumer.getBootstrappedStream(SetCheckpoint.TYPE);
-    for (CoordinatorStreamMessage coordinatorStreamMessage : bootstrappedStream) {
+    for (CoordinatorStreamMessage coordinatorStreamMessage : getBootstrappedStream(SetCheckpoint.TYPE)) {
       SetCheckpoint setCheckpoint = new SetCheckpoint(coordinatorStreamMessage);
       TaskName taskNameInCheckpoint = new TaskName(setCheckpoint.getKey());
       if (taskNames.contains(taskNameInCheckpoint)) {
@@ -108,8 +90,4 @@ public class CheckpointManager {
     return taskNamesToOffsets.get(taskName);
   }
 
-  public void stop() {
-    coordinatorStreamConsumer.stop();
-    coordinatorStreamProducer.stop();
-  }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/59ec3b78/samza-core/src/main/java/org/apache/samza/container/LocalityManager.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/container/LocalityManager.java b/samza-core/src/main/java/org/apache/samza/container/LocalityManager.java
index 55c258f..c567bf4 100644
--- a/samza-core/src/main/java/org/apache/samza/container/LocalityManager.java
+++ b/samza-core/src/main/java/org/apache/samza/container/LocalityManager.java
@@ -19,55 +19,53 @@
 
 package org.apache.samza.container;
 
-import org.apache.samza.coordinator.stream.CoordinatorStreamMessage;
+import org.apache.samza.coordinator.stream.messages.CoordinatorStreamMessage;
 import org.apache.samza.coordinator.stream.CoordinatorStreamSystemConsumer;
 import org.apache.samza.coordinator.stream.CoordinatorStreamSystemProducer;
+import org.apache.samza.coordinator.stream.AbstractCoordinatorStreamManager;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
-import org.apache.samza.coordinator.stream.CoordinatorStreamMessage.SetContainerHostMapping;
+import org.apache.samza.coordinator.stream.messages.SetContainerHostMapping;
 
 /**
  * Locality Manager is used to persist and read the container-to-host
  * assignment information from the coordinator stream
  * */
-public class LocalityManager {
+public class LocalityManager extends AbstractCoordinatorStreamManager {
   private static final Logger log = LoggerFactory.getLogger(LocalityManager.class);
-  private final CoordinatorStreamSystemConsumer coordinatorStreamConsumer;
-  private final CoordinatorStreamSystemProducer coordinatorStreamProducer;
-  private static final String SOURCE = "SamzaContainer-";
   private Map<Integer, Map<String, String>> containerToHostMapping;
 
   public LocalityManager(CoordinatorStreamSystemProducer coordinatorStreamProducer,
                          CoordinatorStreamSystemConsumer coordinatorStreamConsumer) {
-    this.coordinatorStreamConsumer = coordinatorStreamConsumer;
-    this.coordinatorStreamProducer = coordinatorStreamProducer;
+    super(coordinatorStreamProducer, coordinatorStreamConsumer, "SamzaContainer-");
     this.containerToHostMapping = new HashMap<>();
   }
 
-  public void start() {
-    coordinatorStreamProducer.start();
-    coordinatorStreamConsumer.start();
+  /**
+   * This method is not supported in {@link LocalityManager}. Use {@link LocalityManager#register(String)} instead.
+   *
+   * @throws UnsupportedOperationException in the case if a {@link TaskName} is passed
+   */
+  public void register(TaskName taskName) {
+    throw new UnsupportedOperationException("TaskName cannot be registered with LocalityManager");
   }
 
-  public void stop() {
-    coordinatorStreamConsumer.stop();
-    coordinatorStreamProducer.stop();
-  }
-
-  /*
-   * Register with source suffix that is containerId
-   * */
+  /**
+   * Registers the locality manager with a source suffix that is container id
+   *
+   * @param sourceSuffix the source suffix which is a container id
+   */
   public void register(String sourceSuffix) {
-    coordinatorStreamConsumer.register();
-    coordinatorStreamProducer.register(LocalityManager.SOURCE + sourceSuffix);
+    registerCoordinatorStreamConsumer();
+    registerCoordinatorStreamProducer(getSource() + sourceSuffix);
   }
 
   public Map<Integer, Map<String, String>> readContainerLocality() {
     Map<Integer, Map<String, String>> allMappings = new HashMap<>();
-    for (CoordinatorStreamMessage message: coordinatorStreamConsumer.getBootstrappedStream(SetContainerHostMapping.TYPE)) {
+    for (CoordinatorStreamMessage message: getBootstrappedStream(SetContainerHostMapping.TYPE)) {
       SetContainerHostMapping mapping = new SetContainerHostMapping(message);
       Map<String, String> localityMappings = new HashMap<>();
       localityMappings.put(SetContainerHostMapping.IP_KEY, mapping.getHostLocality());
@@ -88,7 +86,7 @@ public class LocalityManager {
     } else {
       log.info("Container {} started at {}", containerId, hostHttpAddress);
     }
-    coordinatorStreamProducer.send(new SetContainerHostMapping(SOURCE + containerId, String.valueOf(containerId), hostHttpAddress, jmxAddress, jmxTunnelingAddress));
+    send(new SetContainerHostMapping(getSource() + containerId, String.valueOf(containerId), hostHttpAddress, jmxAddress, jmxTunnelingAddress));
     Map<String, String> mappings = new HashMap<>();
     mappings.put(SetContainerHostMapping.IP_KEY, hostHttpAddress);
     mappings.put(SetContainerHostMapping.JMX_URL_KEY, jmxAddress);

http://git-wip-us.apache.org/repos/asf/samza/blob/59ec3b78/samza-core/src/main/java/org/apache/samza/coordinator/stream/AbstractCoordinatorStreamManager.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/coordinator/stream/AbstractCoordinatorStreamManager.java b/samza-core/src/main/java/org/apache/samza/coordinator/stream/AbstractCoordinatorStreamManager.java
new file mode 100644
index 0000000..ca97ce8
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/coordinator/stream/AbstractCoordinatorStreamManager.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.coordinator.stream;
+
+import org.apache.samza.container.TaskName;
+import org.apache.samza.coordinator.stream.messages.CoordinatorStreamMessage;
+
+import java.util.Set;
+
+/**
+ * Abstract class which handles the common functionality for coordinator stream consumer and producer
+ */
+public abstract class AbstractCoordinatorStreamManager {
+  private final CoordinatorStreamSystemProducer coordinatorStreamProducer;
+  private final CoordinatorStreamSystemConsumer coordinatorStreamConsumer;
+  private final String source;
+
+  /**
+   * Creates a new {@link AbstractCoordinatorStreamManager} with a given coordinator stream producer, consumer and with a given source.
+   * @param coordinatorStreamProducer the {@link CoordinatorStreamSystemProducer} which should be used with the {@link AbstractCoordinatorStreamManager}
+   * @param coordinatorStreamConsumer the {@link CoordinatorStreamSystemConsumer} which should be used with the {@link AbstractCoordinatorStreamManager}
+   * @param source ths source for the coordinator stream producer
+   */
+  protected AbstractCoordinatorStreamManager(CoordinatorStreamSystemProducer coordinatorStreamProducer, CoordinatorStreamSystemConsumer coordinatorStreamConsumer, String source) {
+    this.coordinatorStreamProducer = coordinatorStreamProducer;
+    this.coordinatorStreamConsumer = coordinatorStreamConsumer;
+    this.source = source;
+  }
+
+  /**
+   * Starts the underlying coordinator stream producer and consumer.
+   */
+  public void start() {
+    coordinatorStreamProducer.start();
+    coordinatorStreamConsumer.start();
+  }
+
+  /**
+   * Stops the underlying coordinator stream producer and consumer.
+   */
+  public void stop() {
+    coordinatorStreamConsumer.stop();
+    coordinatorStreamProducer.stop();
+  }
+
+  /**
+   * Sends a {@link CoordinatorStreamMessage} using the underlying system producer.
+   * @param message message which should be sent to producer
+   */
+  public void send(CoordinatorStreamMessage message) {
+    coordinatorStreamProducer.send(message);
+  }
+
+  /**
+   * Returns a set of messages from the bootstrapped stream for a given source.
+   * @param source the source of the given messages
+   * @return a set of {@link CoordinatorStreamMessage} if messages exists for the given source, else an empty set
+   */
+  public Set<CoordinatorStreamMessage> getBootstrappedStream(String source) {
+    return coordinatorStreamConsumer.getBootstrappedStream(source);
+  }
+
+  /**
+   * Register the coordinator stream consumer.
+   */
+  protected void registerCoordinatorStreamConsumer() {
+    coordinatorStreamConsumer.register();
+  }
+
+  /**
+   * Registers the coordinator stream producer for a given source.
+   * @param source the source to register
+   */
+  protected void registerCoordinatorStreamProducer(String source) {
+    coordinatorStreamProducer.register(source);
+  }
+
+  /**
+   * Returns the source name which is managed by {@link AbstractCoordinatorStreamManager}.
+   * @return the source name
+   */
+  protected String getSource() {
+    return source;
+  }
+
+  /**
+   * Registers a consumer and a produces. Every subclass should implement it's logic for registration.<br><br>
+   * Registering a single consumer and a single producer can be done with {@link AbstractCoordinatorStreamManager#registerCoordinatorStreamConsumer()}
+   * and {@link AbstractCoordinatorStreamManager#registerCoordinatorStreamProducer(String)} methods respectively.<br>
+   * These methods can be used in the concrete implementation of this register method.
+   *
+   * @param taskName name which should be used with the producer
+   */
+  public abstract void register(TaskName taskName);
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/59ec3b78/samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamMessage.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamMessage.java b/samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamMessage.java
deleted file mode 100644
index e5ab4fb..0000000
--- a/samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamMessage.java
+++ /dev/null
@@ -1,527 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.coordinator.stream;
-
-import java.net.InetAddress;
-import java.net.UnknownHostException;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Map;
-
-import org.apache.samza.checkpoint.Checkpoint;
-import org.apache.samza.system.SystemStreamPartition;
-import org.apache.samza.util.Util;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * <p>
- * Represents a message for the job coordinator. All messages in the coordinator
- * stream must wrap the CoordinatorStreamMessage class. Coordinator stream
- * messages are modeled as key/value pairs. The key is a list of well defined
- * fields: version, type, and key. The value is a map. There are some
- * pre-defined fields (such as timestamp, host, etc) for the value map, which
- * are common to all messages.
- * </p>
- *
- * <p>
- * The full structure for a CoordinatorStreamMessage is:
- * </p>
- *
- * <pre>
- * key =&gt; [1, "set-config", "job.name"] 
- *
- * message =&gt; {
- *   "host": "192.168.0.1",
- *   "username": "criccomini",
- *   "source": "job-runner",
- *   "timestamp": 123456789,
- *   "values": {
- *     "value": "my-job-name"
- *   }
- * }
- * </pre>
- *
- * Where the key's structure is:
- *
- * <pre>
- * key =&gt; [&lt;version&gt;, &lt;type&gt;, &lt;key&gt;]
- * </pre>
- *
- * <p>
- * Note that the white space in the above JSON blobs are done for legibility.
- * Over the wire, the JSON should be compact, and no unnecessary white space
- * should be used. This is extremely important for key serialization, since a
- * key with [1,"set-config","job.name"] and [1, "set-config", "job.name"] will
- * be evaluated as two different keys, and Kafka will not log compact them (if
- * Kafka is used as the underlying system for a coordinator stream).
- * </p>
- *
- * <p>
- * The "values" map in the message is defined on a per-message-type basis. For
- * set-config messages, there is just a single key/value pair, where the "value"
- * key is defined. For offset messages, there will be multiple key/values pairs
- * in "values" (one for each SystemStreamPartition/offset pair for a given
- * TaskName).
- * </p>
- *
- * <p>
- * The most important fields are type, key, and values. The type field (defined
- * as index 1 in the key list) defines the kind of message, the key (defined as
- * index 2 in the key list) defines a key to associate with the values, and the
- * values map defines a set of values associated with the type. A concrete
- * example would be a config message of type "set-config" with key "job.name"
- * and values {"value": "my-job-name"}.
- * </p>
- */
-public class CoordinatorStreamMessage {
-  public static final int VERSION_INDEX = 0;
-  public static final int TYPE_INDEX = 1;
-  public static final int KEY_INDEX = 2;
-
-  private static final Logger log = LoggerFactory.getLogger(CoordinatorStreamMessage.class);
-
-  /**
-   * Protocol version for coordinator stream messages. This version number must
-   * be incremented any time new messages are added to the coordinator stream,
-   * or changes are made to the key/message headers.
-   */
-  public static final int VERSION = 1;
-
-  /**
-   * Contains all key fields. Currently, this includes the type of the message,
-   * the key associated with the type (e.g. type: set-config key: job.name), and
-   * the version of the protocol. The indices are defined as the INDEX static
-   * variables above.
-   */
-  private final Object[] keyArray;
-
-  /**
-   * Contains all fields for the message. This includes who sent the message,
-   * the host, etc. It also includes a "values" map, which contains all values
-   * associated with the key of the message. If set-config/job.name were used as
-   * the type/key of the message, then values would contain
-   * {"value":"my-job-name"}.
-   */
-  private final Map<String, Object> messageMap;
-  private boolean isDelete;
-
-  public CoordinatorStreamMessage(CoordinatorStreamMessage message) {
-    this(message.getKeyArray(), message.getMessageMap());
-  }
-
-  public CoordinatorStreamMessage(Object[] keyArray, Map<String, Object> messageMap) {
-    this.keyArray = keyArray;
-    this.messageMap = messageMap;
-    this.isDelete = messageMap == null;
-  }
-
-  public CoordinatorStreamMessage(String source) {
-    this(source, new Object[] {Integer.valueOf(VERSION), null, null}, new HashMap<String, Object>());
-  }
-
-  public CoordinatorStreamMessage(String source, Object[] keyArray, Map<String, Object> messageMap) {
-    this(keyArray, messageMap);
-    if (!isDelete) {
-      this.messageMap.put("values", new HashMap<String, String>());
-      setSource(source);
-      setUsername(System.getProperty("user.name"));
-      setTimestamp(System.currentTimeMillis());
-
-      try {
-        setHost(InetAddress.getLocalHost().getHostAddress());
-      } catch (UnknownHostException e) {
-        log.warn("Unable to retrieve host for current machine. Setting coordinator stream message host field to an empty string.");
-        setHost("");
-      }
-    }
-
-    setVersion(VERSION);
-  }
-
-  protected void setIsDelete(boolean isDelete) {
-    this.isDelete = isDelete;
-  }
-
-  protected void setHost(String host) {
-    messageMap.put("host", host);
-  }
-
-  protected void setUsername(String username) {
-    messageMap.put("username", username);
-  }
-
-  protected void setSource(String source) {
-    messageMap.put("source", source);
-  }
-
-  protected void setTimestamp(long timestamp) {
-    messageMap.put("timestamp", Long.valueOf(timestamp));
-  }
-
-  protected void setVersion(int version) {
-    this.keyArray[VERSION_INDEX] = Integer.valueOf(version);
-  }
-
-  protected void setType(String type) {
-    this.keyArray[TYPE_INDEX] = type;
-  }
-
-  protected void setKey(String key) {
-    this.keyArray[KEY_INDEX] = key;
-  }
-
-  @SuppressWarnings("unchecked")
-  protected Map<String, String> getMessageValues() {
-    return (Map<String, String>) this.messageMap.get("values");
-  }
-
-  protected String getMessageValue(String key) {
-    return getMessageValues().get(key);
-  }
-
-  /**
-   * @param key
-   *           The key inside the messageMap, please only use human readable string (no JSON or such) - this allows
-   *           easy mutation of the coordinator stream outside of Samza (scripts)
-   * @param value
-   *           The value corresponding to the key, should also be a simple string
-   */
-  protected void putMessageValue(String key, String value) {
-    getMessageValues().put(key, value);
-  }
-
-  /**
-   * The type of the message is used to convert a generic
-   * CoordinatorStreamMessage into a specific message, such as a SetConfig
-   * message.
-   *
-   * @return The type of the message.
-   */
-  public String getType() {
-    return (String) this.keyArray[TYPE_INDEX];
-  }
-
-  /**
-   * @return The whole key map including both the key and type of the message.
-   */
-  public Object[] getKeyArray() {
-    return this.keyArray;
-  }
-
-  /**
-   * @return Whether the message signifies a delete or not.
-   */
-  public boolean isDelete() {
-    return isDelete;
-  }
-
-  /**
-   * @return The username of a message.
-   */
-  public String getUsername() {
-    return (String) this.messageMap.get("username");
-  }
-
-  /**
-   * @return The timestamp of a message.
-   */
-  public long getTimestamp() {
-    return (Long) this.messageMap.get("timestamp");
-  }
-
-  /**
-   * @return The whole message map including header information.
-   */
-  public Map<String, Object> getMessageMap() {
-    if (!isDelete) {
-      Map<String, Object> immutableMap = new HashMap<String, Object>(messageMap);
-      // To make sure the values is immutable, we overwrite it with an immutable version of the the values map.
-      immutableMap.put("values", Collections.unmodifiableMap(getMessageValues()));
-      return Collections.unmodifiableMap(immutableMap);
-    } else {
-      return null;
-    }
-  }
-
-  /**
-   * @return The source that sent the coordinator message. This is a string
-   *         defined by the sender.
-   */
-  public String getSource() {
-    return (String) this.messageMap.get("source");
-  }
-
-  /**
-   * @return The protocol version that the message conforms to.
-   */
-  public int getVersion() {
-    return (Integer) this.keyArray[VERSION_INDEX];
-  }
-
-  /**
-   * @return The key for a message. The key's meaning is defined by the type of
-   *         the message.
-   */
-  public String getKey() {
-    return (String) this.keyArray[KEY_INDEX];
-  }
-
-  @Override
-  public String toString() {
-    return "CoordinatorStreamMessage [keyArray=" + Arrays.toString(keyArray) + ", messageMap=" + messageMap + ", isDelete=" + isDelete + "]";
-  }
-
-  @Override
-  public int hashCode() {
-    final int prime = 31;
-    int result = 1;
-    result = prime * result + (isDelete ? 1231 : 1237);
-    result = prime * result + Arrays.hashCode(keyArray);
-    result = prime * result + ((messageMap == null) ? 0 : messageMap.hashCode());
-    return result;
-  }
-
-  @Override
-  public boolean equals(Object obj) {
-    if (this == obj)
-      return true;
-    if (obj == null)
-      return false;
-    if (getClass() != obj.getClass())
-      return false;
-    CoordinatorStreamMessage other = (CoordinatorStreamMessage) obj;
-    if (isDelete != other.isDelete)
-      return false;
-    if (!Arrays.equals(keyArray, other.keyArray))
-      return false;
-    if (messageMap == null) {
-      if (other.messageMap != null)
-        return false;
-    } else if (!messageMap.equals(other.messageMap))
-      return false;
-    return true;
-  }
-
-  /**
-   * A coordinator stream message that tells the job coordinator to set a
-   * specific configuration.
-   */
-  public static class SetConfig extends CoordinatorStreamMessage {
-    public static final String TYPE = "set-config";
-
-    public SetConfig(CoordinatorStreamMessage message) {
-      super(message.getKeyArray(), message.getMessageMap());
-    }
-
-    public SetConfig(String source, String key, String value) {
-      super(source);
-      setType(TYPE);
-      setKey(key);
-      putMessageValue("value", value);
-    }
-
-    public String getConfigValue() {
-      return (String) getMessageValue("value");
-    }
-  }
-
-  public static class Delete extends CoordinatorStreamMessage {
-    public Delete(String source, String key, String type) {
-      this(source, key, type, VERSION);
-    }
-
-    /**
-     * <p>
-     * Delete messages must take the type of another CoordinatorStreamMessage
-     * (e.g. SetConfig) to define the type of message that's being deleted.
-     * Considering Kafka's log compaction, for example, the keys of a message
-     * and its delete key must match exactly:
-     * </p>
-     *
-     * <pre>
-     * k=&gt;[1,"job.name","set-config"] .. v=&gt; {..some stuff..}
-     * v=&gt;[1,"job.name","set-config"] .. v=&gt; null
-     * </pre>
-     *
-     * <p>
-     * Deletes are modeled as a CoordinatorStreamMessage with a null message
-     * map, and a key that's identical to the key map that's to be deleted.
-     * </p>
-     *
-     * @param source
-     *          The source ID of the sender of the delete message.
-     * @param key
-     *          The key to delete.
-     * @param type
-     *          The type of message to delete. Must correspond to one of hte
-     *          other CoordinatorStreamMessages.
-     * @param version
-     *          The protocol version.
-     */
-    public Delete(String source, String key, String type, int version) {
-      super(source);
-      setType(type);
-      setKey(key);
-      setVersion(version);
-      setIsDelete(true);
-    }
-  }
-
-  /**
-   * The SetCheckpoint is used to store the checkpoint messages for a particular task.
-   * The structure looks like:
-   * {
-   * Key: TaskName
-   * Type: set-checkpoint
-   * Source: ContainerID
-   * MessageMap:
-   *  {
-   *     SSP1 : offset,
-   *     SSP2 : offset
-   *  }
-   * }
-   */
-  public static class SetCheckpoint extends CoordinatorStreamMessage {
-    public static final String TYPE = "set-checkpoint";
-
-    public SetCheckpoint(CoordinatorStreamMessage message) {
-      super(message.getKeyArray(), message.getMessageMap());
-    }
-
-    /**
-     *
-     * @param source The source writing the checkpoint
-     * @param key The key for the checkpoint message (Typically task name)
-     * @param checkpoint Checkpoint message to be written to the stream
-     */
-    public SetCheckpoint(String source, String key, Checkpoint checkpoint) {
-      super(source);
-      setType(TYPE);
-      setKey(key);
-      Map<SystemStreamPartition, String> offsets = checkpoint.getOffsets();
-      for (Map.Entry<SystemStreamPartition, String> systemStreamPartitionStringEntry : offsets.entrySet()) {
-        putMessageValue(Util.sspToString(systemStreamPartitionStringEntry.getKey()), systemStreamPartitionStringEntry.getValue());
-      }
-    }
-
-    public Checkpoint getCheckpoint() {
-      Map<SystemStreamPartition, String> offsetMap = new HashMap<SystemStreamPartition, String>();
-      for (Map.Entry<String, String> sspToOffsetEntry : getMessageValues().entrySet()) {
-        offsetMap.put(Util.stringToSsp(sspToOffsetEntry.getKey()), sspToOffsetEntry.getValue());
-      }
-      return new Checkpoint(offsetMap);
-    }
-  }
-
-  /**
-   * The SetChanglog is used to store the changelog parition information for a particular task.
-   * The structure looks like:
-   * {
-   * Key: TaskName
-   * Type: set-changelog
-   * Source: ContainerID
-   * MessageMap:
-   *  {
-   *     "Partition" : partitionNumber (They key is just a dummy key here, the value contains the actual partition)
-   *  }
-   * }
-   */
-  public static class SetChangelogMapping extends CoordinatorStreamMessage {
-    public static final String TYPE = "set-changelog";
-
-    public SetChangelogMapping(CoordinatorStreamMessage message) {
-      super(message.getKeyArray(), message.getMessageMap());
-    }
-
-    /**
-     *
-     * @param source Source writing the change log mapping
-     * @param taskName The task name to be used in the mapping
-     * @param changelogPartitionNumber The partition to which the task's changelog is mapped to
-     */
-    public SetChangelogMapping(String source, String taskName, int changelogPartitionNumber) {
-      super(source);
-      setType(TYPE);
-      setKey(taskName);
-      putMessageValue("Partition", String.valueOf(changelogPartitionNumber));
-    }
-
-    public String getTaskName() {
-      return getKey();
-    }
-
-    public int getPartition() {
-      return Integer.parseInt(getMessageValue("Partition"));
-    }
-  }
-
-  /**
-   * SetContainerHostMapping is used internally by the samza framework to
-   * persist the container-to-host mappings.
-   *
-   * Structure of the message looks like:
-   * {
-   *     Key: $ContainerId
-   *     Type: set-container-host-assignment
-   *     Source: "SamzaContainer-$ContainerId"
-   *     MessageMap:
-   *     {
-   *         ip: InetAddressString,
-   *         jmx-url: jmxAddressString
-   *         jmx-tunneling-url: jmxTunnelingAddressString
-   *     }
-   * }
-   * */
-  public static class SetContainerHostMapping extends CoordinatorStreamMessage {
-    public static final String TYPE = "set-container-host-assignment";
-    public static final String IP_KEY = "ip";
-    public static final String JMX_URL_KEY = "jmx-url";
-    public static final String JMX_TUNNELING_URL_KEY = "jmx-tunneling-url";
-
-    public SetContainerHostMapping(CoordinatorStreamMessage message) {
-      super(message.getKeyArray(), message.getMessageMap());
-    }
-
-    public SetContainerHostMapping(String source, String key, String hostHttpAddress, String jmxAddress, String jmxTunnelingAddress) {
-      super(source);
-      setType(TYPE);
-      setKey(key);
-      putMessageValue(IP_KEY, hostHttpAddress);
-      putMessageValue(JMX_URL_KEY, jmxAddress);
-      putMessageValue(JMX_TUNNELING_URL_KEY, jmxTunnelingAddress);
-    }
-
-    public String getHostLocality() {
-      return getMessageValue(IP_KEY);
-
-    }
-
-    public String getJmxUrl() {
-      return getMessageValue(JMX_URL_KEY);
-    }
-
-    public String getJmxTunnelingUrl() {
-      return getMessageValue(JMX_TUNNELING_URL_KEY);
-    }
-
-  }
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/59ec3b78/samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamSystemConsumer.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamSystemConsumer.java b/samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamSystemConsumer.java
index b1078bd..2277a73 100644
--- a/samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamSystemConsumer.java
+++ b/samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamSystemConsumer.java
@@ -30,6 +30,8 @@ import org.apache.samza.Partition;
 import org.apache.samza.SamzaException;
 import org.apache.samza.config.Config;
 import org.apache.samza.config.MapConfig;
+import org.apache.samza.coordinator.stream.messages.CoordinatorStreamMessage;
+import org.apache.samza.coordinator.stream.messages.SetConfig;
 import org.apache.samza.serializers.JsonSerde;
 import org.apache.samza.serializers.Serde;
 import org.apache.samza.system.IncomingMessageEnvelope;
@@ -146,12 +148,12 @@ public class CoordinatorStreamSystemConsumer {
         CoordinatorStreamMessage coordinatorStreamMessage = new CoordinatorStreamMessage(keyArray, valueMap);
         log.debug("Received coordinator stream message: {}", coordinatorStreamMessage);
         bootstrappedStreamSet.add(coordinatorStreamMessage);
-        if (CoordinatorStreamMessage.SetConfig.TYPE.equals(coordinatorStreamMessage.getType())) {
+        if (SetConfig.TYPE.equals(coordinatorStreamMessage.getType())) {
           String configKey = coordinatorStreamMessage.getKey();
           if (coordinatorStreamMessage.isDelete()) {
             configMap.remove(configKey);
           } else {
-            String configValue = new CoordinatorStreamMessage.SetConfig(coordinatorStreamMessage).getConfigValue();
+            String configValue = new SetConfig(coordinatorStreamMessage).getConfigValue();
             configMap.put(configKey, configValue);
           }
         }

http://git-wip-us.apache.org/repos/asf/samza/blob/59ec3b78/samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamSystemProducer.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamSystemProducer.java b/samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamSystemProducer.java
index 92f8907..42ae00b 100644
--- a/samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamSystemProducer.java
+++ b/samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamSystemProducer.java
@@ -25,6 +25,8 @@ import java.util.Map;
 
 import org.apache.samza.SamzaException;
 import org.apache.samza.config.Config;
+import org.apache.samza.coordinator.stream.messages.CoordinatorStreamMessage;
+import org.apache.samza.coordinator.stream.messages.SetConfig;
 import org.apache.samza.serializers.JsonSerde;
 import org.apache.samza.serializers.Serde;
 import org.apache.samza.system.SystemAdmin;
@@ -126,7 +128,7 @@ public class CoordinatorStreamSystemProducer {
   public void writeConfig(String source, Config config) {
     log.debug("Writing config: {}", config);
     for (Map.Entry<String, String> configPair : config.entrySet()) {
-      send(new CoordinatorStreamMessage.SetConfig(source, configPair.getKey(), configPair.getValue()));
+      send(new SetConfig(source, configPair.getKey(), configPair.getValue()));
     }
     systemProducer.flush(source);
   }

http://git-wip-us.apache.org/repos/asf/samza/blob/59ec3b78/samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamWriter.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamWriter.java b/samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamWriter.java
index f769756..77594dc 100644
--- a/samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamWriter.java
+++ b/samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamWriter.java
@@ -21,6 +21,7 @@ package org.apache.samza.coordinator.stream;
 
 import joptsimple.OptionSet;
 import org.apache.samza.config.Config;
+import org.apache.samza.coordinator.stream.messages.SetConfig;
 import org.apache.samza.metrics.MetricsRegistryMap;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -40,7 +41,7 @@ public class CoordinatorStreamWriter {
 
   private static final Logger log = LoggerFactory.getLogger(CoordinatorStreamWriter.class);
   public final static String SOURCE = "coordinator-stream-writer";
-  public static final String SET_CONFIG_TYPE = CoordinatorStreamMessage.SetConfig.TYPE;
+  public static final String SET_CONFIG_TYPE = SetConfig.TYPE;
 
   private CoordinatorStreamSystemProducer coordinatorStreamSystemProducer;
 
@@ -94,7 +95,7 @@ public class CoordinatorStreamWriter {
    */
   private void sendSetConfigMessage(String key, String value) {
     log.info("sent SetConfig message with key = " + key + " and value = " + value);
-    coordinatorStreamSystemProducer.send(new CoordinatorStreamMessage.SetConfig(CoordinatorStreamWriter.SOURCE, key, value));
+    coordinatorStreamSystemProducer.send(new SetConfig(CoordinatorStreamWriter.SOURCE, key, value));
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/samza/blob/59ec3b78/samza-core/src/main/java/org/apache/samza/coordinator/stream/messages/CoordinatorStreamMessage.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/coordinator/stream/messages/CoordinatorStreamMessage.java b/samza-core/src/main/java/org/apache/samza/coordinator/stream/messages/CoordinatorStreamMessage.java
new file mode 100644
index 0000000..e3adc85
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/coordinator/stream/messages/CoordinatorStreamMessage.java
@@ -0,0 +1,321 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.coordinator.stream.messages;
+
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * <p>
+ * Represents a message for the job coordinator. All messages in the coordinator
+ * stream must wrap the CoordinatorStreamMessage class. Coordinator stream
+ * messages are modeled as key/value pairs. The key is a list of well defined
+ * fields: version, type, and key. The value is a map. There are some
+ * pre-defined fields (such as timestamp, host, etc) for the value map, which
+ * are common to all messages.
+ * </p>
+ *
+ * <p>
+ * The full structure for a CoordinatorStreamMessage is:
+ * </p>
+ *
+ * <pre>
+ * key =&gt; [1, "set-config", "job.name"] 
+ *
+ * message =&gt; {
+ *   "host": "192.168.0.1",
+ *   "username": "criccomini",
+ *   "source": "job-runner",
+ *   "timestamp": 123456789,
+ *   "values": {
+ *     "value": "my-job-name"
+ *   }
+ * }
+ * </pre>
+ *
+ * Where the key's structure is:
+ *
+ * <pre>
+ * key =&gt; [&lt;version&gt;, &lt;type&gt;, &lt;key&gt;]
+ * </pre>
+ *
+ * <p>
+ * Note that the white space in the above JSON blobs are done for legibility.
+ * Over the wire, the JSON should be compact, and no unnecessary white space
+ * should be used. This is extremely important for key serialization, since a
+ * key with [1,"set-config","job.name"] and [1, "set-config", "job.name"] will
+ * be evaluated as two different keys, and Kafka will not log compact them (if
+ * Kafka is used as the underlying system for a coordinator stream).
+ * </p>
+ *
+ * <p>
+ * The "values" map in the message is defined on a per-message-type basis. For
+ * set-config messages, there is just a single key/value pair, where the "value"
+ * key is defined. For offset messages, there will be multiple key/values pairs
+ * in "values" (one for each SystemStreamPartition/offset pair for a given
+ * TaskName).
+ * </p>
+ *
+ * <p>
+ * The most important fields are type, key, and values. The type field (defined
+ * as index 1 in the key list) defines the kind of message, the key (defined as
+ * index 2 in the key list) defines a key to associate with the values, and the
+ * values map defines a set of values associated with the type. A concrete
+ * example would be a config message of type "set-config" with key "job.name"
+ * and values {"value": "my-job-name"}.
+ * </p>
+ */
+public class CoordinatorStreamMessage {
+  public static final int VERSION_INDEX = 0;
+  public static final int TYPE_INDEX = 1;
+  public static final int KEY_INDEX = 2;
+
+  private static final Logger log = LoggerFactory.getLogger(CoordinatorStreamMessage.class);
+
+  /**
+   * Protocol version for coordinator stream messages. This version number must
+   * be incremented any time new messages are added to the coordinator stream,
+   * or changes are made to the key/message headers.
+   */
+  public static final int VERSION = 1;
+
+  /**
+   * Contains all key fields. Currently, this includes the type of the message,
+   * the key associated with the type (e.g. type: set-config key: job.name), and
+   * the version of the protocol. The indices are defined as the INDEX static
+   * variables above.
+   */
+  private final Object[] keyArray;
+
+  /**
+   * Contains all fields for the message. This includes who sent the message,
+   * the host, etc. It also includes a "values" map, which contains all values
+   * associated with the key of the message. If set-config/job.name were used as
+   * the type/key of the message, then values would contain
+   * {"value":"my-job-name"}.
+   */
+  private final Map<String, Object> messageMap;
+  private boolean isDelete;
+
+  public CoordinatorStreamMessage(CoordinatorStreamMessage message) {
+    this(message.getKeyArray(), message.getMessageMap());
+  }
+
+  public CoordinatorStreamMessage(Object[] keyArray, Map<String, Object> messageMap) {
+    this.keyArray = keyArray;
+    this.messageMap = messageMap;
+    this.isDelete = messageMap == null;
+  }
+
+  public CoordinatorStreamMessage(String source) {
+    this(source, new Object[] {Integer.valueOf(VERSION), null, null}, new HashMap<String, Object>());
+  }
+
+  public CoordinatorStreamMessage(String source, Object[] keyArray, Map<String, Object> messageMap) {
+    this(keyArray, messageMap);
+    if (!isDelete) {
+      this.messageMap.put("values", new HashMap<String, String>());
+      setSource(source);
+      setUsername(System.getProperty("user.name"));
+      setTimestamp(System.currentTimeMillis());
+
+      try {
+        setHost(InetAddress.getLocalHost().getHostAddress());
+      } catch (UnknownHostException e) {
+        log.warn("Unable to retrieve host for current machine. Setting coordinator stream message host field to an empty string.");
+        setHost("");
+      }
+    }
+
+    setVersion(VERSION);
+  }
+
+  protected void setIsDelete(boolean isDelete) {
+    this.isDelete = isDelete;
+  }
+
+  protected void setHost(String host) {
+    messageMap.put("host", host);
+  }
+
+  protected void setUsername(String username) {
+    messageMap.put("username", username);
+  }
+
+  protected void setSource(String source) {
+    messageMap.put("source", source);
+  }
+
+  protected void setTimestamp(long timestamp) {
+    messageMap.put("timestamp", Long.valueOf(timestamp));
+  }
+
+  protected void setVersion(int version) {
+    this.keyArray[VERSION_INDEX] = Integer.valueOf(version);
+  }
+
+  protected void setType(String type) {
+    this.keyArray[TYPE_INDEX] = type;
+  }
+
+  protected void setKey(String key) {
+    this.keyArray[KEY_INDEX] = key;
+  }
+
+  @SuppressWarnings("unchecked")
+  protected Map<String, String> getMessageValues() {
+    return (Map<String, String>) this.messageMap.get("values");
+  }
+
+  protected String getMessageValue(String key) {
+    return getMessageValues().get(key);
+  }
+
+  /**
+   * @param key
+   *           The key inside the messageMap, please only use human readable string (no JSON or such) - this allows
+   *           easy mutation of the coordinator stream outside of Samza (scripts)
+   * @param value
+   *           The value corresponding to the key, should also be a simple string
+   */
+  protected void putMessageValue(String key, String value) {
+    getMessageValues().put(key, value);
+  }
+
+  /**
+   * The type of the message is used to convert a generic
+   * CoordinatorStreamMessage into a specific message, such as a SetConfig
+   * message.
+   *
+   * @return The type of the message.
+   */
+  public String getType() {
+    return (String) this.keyArray[TYPE_INDEX];
+  }
+
+  /**
+   * @return The whole key map including both the key and type of the message.
+   */
+  public Object[] getKeyArray() {
+    return this.keyArray;
+  }
+
+  /**
+   * @return Whether the message signifies a delete or not.
+   */
+  public boolean isDelete() {
+    return isDelete;
+  }
+
+  /**
+   * @return The username of a message.
+   */
+  public String getUsername() {
+    return (String) this.messageMap.get("username");
+  }
+
+  /**
+   * @return The timestamp of a message.
+   */
+  public long getTimestamp() {
+    return (Long) this.messageMap.get("timestamp");
+  }
+
+  /**
+   * @return The whole message map including header information.
+   */
+  public Map<String, Object> getMessageMap() {
+    if (!isDelete) {
+      Map<String, Object> immutableMap = new HashMap<String, Object>(messageMap);
+      // To make sure the values is immutable, we overwrite it with an immutable version of the the values map.
+      immutableMap.put("values", Collections.unmodifiableMap(getMessageValues()));
+      return Collections.unmodifiableMap(immutableMap);
+    } else {
+      return null;
+    }
+  }
+
+  /**
+   * @return The source that sent the coordinator message. This is a string
+   *         defined by the sender.
+   */
+  public String getSource() {
+    return (String) this.messageMap.get("source");
+  }
+
+  /**
+   * @return The protocol version that the message conforms to.
+   */
+  public int getVersion() {
+    return (Integer) this.keyArray[VERSION_INDEX];
+  }
+
+  /**
+   * @return The key for a message. The key's meaning is defined by the type of
+   *         the message.
+   */
+  public String getKey() {
+    return (String) this.keyArray[KEY_INDEX];
+  }
+
+  @Override
+  public String toString() {
+    return "CoordinatorStreamMessage [keyArray=" + Arrays.toString(keyArray) + ", messageMap=" + messageMap + ", isDelete=" + isDelete + "]";
+  }
+
+  @Override
+  public int hashCode() {
+    final int prime = 31;
+    int result = 1;
+    result = prime * result + (isDelete ? 1231 : 1237);
+    result = prime * result + Arrays.hashCode(keyArray);
+    result = prime * result + ((messageMap == null) ? 0 : messageMap.hashCode());
+    return result;
+  }
+
+  @Override
+  public boolean equals(Object obj) {
+    if (this == obj)
+      return true;
+    if (obj == null)
+      return false;
+    if (getClass() != obj.getClass())
+      return false;
+    CoordinatorStreamMessage other = (CoordinatorStreamMessage) obj;
+    if (isDelete != other.isDelete)
+      return false;
+    if (!Arrays.equals(keyArray, other.keyArray))
+      return false;
+    if (messageMap == null) {
+      if (other.messageMap != null)
+        return false;
+    } else if (!messageMap.equals(other.messageMap))
+      return false;
+    return true;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/59ec3b78/samza-core/src/main/java/org/apache/samza/coordinator/stream/messages/Delete.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/coordinator/stream/messages/Delete.java b/samza-core/src/main/java/org/apache/samza/coordinator/stream/messages/Delete.java
new file mode 100644
index 0000000..964ab39
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/coordinator/stream/messages/Delete.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.coordinator.stream.messages;
+
+public class Delete extends CoordinatorStreamMessage {
+  public Delete(String source, String key, String type) {
+    this(source, key, type, VERSION);
+  }
+
+  /**
+   * <p>
+   * Delete messages must take the type of another CoordinatorStreamMessage
+   * (e.g. SetConfig) to define the type of message that's being deleted.
+   * Considering Kafka's log compaction, for example, the keys of a message
+   * and its delete key must match exactly:
+   * </p>
+   *
+   * <pre>
+   * k=&gt;[1,"job.name","set-config"] .. v=&gt; {..some stuff..}
+   * v=&gt;[1,"job.name","set-config"] .. v=&gt; null
+   * </pre>
+   *
+   * <p>
+   * Deletes are modeled as a CoordinatorStreamMessage with a null message
+   * map, and a key that's identical to the key map that's to be deleted.
+   * </p>
+   *
+   * @param source  The source ID of the sender of the delete message.
+   * @param key     The key to delete.
+   * @param type    The type of message to delete. Must correspond to one of hte
+   *                other CoordinatorStreamMessages.
+   * @param version The protocol version.
+   */
+  public Delete(String source, String key, String type, int version) {
+    super(source);
+    setType(type);
+    setKey(key);
+    setVersion(version);
+    setIsDelete(true);
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/59ec3b78/samza-core/src/main/java/org/apache/samza/coordinator/stream/messages/SetChangelogMapping.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/coordinator/stream/messages/SetChangelogMapping.java b/samza-core/src/main/java/org/apache/samza/coordinator/stream/messages/SetChangelogMapping.java
new file mode 100644
index 0000000..279ae06
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/coordinator/stream/messages/SetChangelogMapping.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.coordinator.stream.messages;
+
+/**
+ * The {@link SetChangelogMapping} message is used to store the changelog partition information for a particular task.
+ * The structure looks like:
+ * {
+ * Key: TaskName
+ * Type: set-changelog
+ * Source: ContainerID
+ * MessageMap:
+ *  {
+ *     "Partition" : partitionNumber (They key is just a dummy key here, the value contains the actual partition)
+ *  }
+ * }
+ */
+public class SetChangelogMapping extends CoordinatorStreamMessage {
+  public static final String TYPE = "set-changelog";
+
+  private static final String CHANGELOG_VALUE_KEY = "Partition";
+
+  public SetChangelogMapping(CoordinatorStreamMessage message) {
+    super(message.getKeyArray(), message.getMessageMap());
+  }
+
+  /**
+   * The change log mapping message is used to store changelog partition information for a given task name.
+   * @param source Source writing the change log mapping
+   * @param taskName The task name to be used in the mapping
+   * @param changelogPartitionNumber The partition to which the task's changelog is mapped to
+   */
+  public SetChangelogMapping(String source, String taskName, int changelogPartitionNumber) {
+    super(source);
+    setType(TYPE);
+    setKey(taskName);
+    putMessageValue(CHANGELOG_VALUE_KEY, String.valueOf(changelogPartitionNumber));
+  }
+
+  public String getTaskName() {
+    return getKey();
+  }
+
+  public int getPartition() {
+    return Integer.parseInt(getMessageValue(CHANGELOG_VALUE_KEY));
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/59ec3b78/samza-core/src/main/java/org/apache/samza/coordinator/stream/messages/SetCheckpoint.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/coordinator/stream/messages/SetCheckpoint.java b/samza-core/src/main/java/org/apache/samza/coordinator/stream/messages/SetCheckpoint.java
new file mode 100644
index 0000000..21afa85
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/coordinator/stream/messages/SetCheckpoint.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.coordinator.stream.messages;
+
+import org.apache.samza.checkpoint.Checkpoint;
+import org.apache.samza.system.SystemStreamPartition;
+import org.apache.samza.util.Util;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * The SetCheckpoint is used to store the checkpoint messages for a particular task.
+ * The structure looks like:
+ * {
+ * Key: TaskName
+ * Type: set-checkpoint
+ * Source: ContainerID
+ * MessageMap:
+ *  {
+ *     SSP1 : offset,
+ *     SSP2 : offset
+ *  }
+ * }
+ */
+public class SetCheckpoint extends CoordinatorStreamMessage {
+  public static final String TYPE = "set-checkpoint";
+
+  public SetCheckpoint(CoordinatorStreamMessage message) {
+    super(message.getKeyArray(), message.getMessageMap());
+  }
+
+  /**
+   * The SetCheckpoint is used to store checkpoint message for a given task.
+   *
+   * @param source The source writing the checkpoint
+   * @param key The key for the checkpoint message (Typically task name)
+   * @param checkpoint Checkpoint message to be written to the stream
+   */
+  public SetCheckpoint(String source, String key, Checkpoint checkpoint) {
+    super(source);
+    setType(TYPE);
+    setKey(key);
+    Map<SystemStreamPartition, String> offsets = checkpoint.getOffsets();
+    for (Map.Entry<SystemStreamPartition, String> systemStreamPartitionStringEntry : offsets.entrySet()) {
+      putMessageValue(Util.sspToString(systemStreamPartitionStringEntry.getKey()), systemStreamPartitionStringEntry.getValue());
+    }
+  }
+
+  public Checkpoint getCheckpoint() {
+    Map<SystemStreamPartition, String> offsetMap = new HashMap<SystemStreamPartition, String>();
+    for (Map.Entry<String, String> sspToOffsetEntry : getMessageValues().entrySet()) {
+      offsetMap.put(Util.stringToSsp(sspToOffsetEntry.getKey()), sspToOffsetEntry.getValue());
+    }
+    return new Checkpoint(offsetMap);
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/59ec3b78/samza-core/src/main/java/org/apache/samza/coordinator/stream/messages/SetConfig.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/coordinator/stream/messages/SetConfig.java b/samza-core/src/main/java/org/apache/samza/coordinator/stream/messages/SetConfig.java
new file mode 100644
index 0000000..bbb12f7
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/coordinator/stream/messages/SetConfig.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.coordinator.stream.messages;
+
+/**
+ * A coordinator stream message that tells the job coordinator to set a
+ * specific configuration.
+ */
+public class SetConfig extends CoordinatorStreamMessage {
+  public static final String TYPE = "set-config";
+  private static final String CONFIG_VALUE_KEY = "value";
+
+  /**
+   * The SetConfig message is used to store a specific configuration.
+   * @param message message to store which holds the configuration.
+   */
+  public SetConfig(CoordinatorStreamMessage message) {
+    super(message.getKeyArray(), message.getMessageMap());
+  }
+
+  /**
+   * The SetConfig message is used to store a specific configuration.
+   * This constructor is used to create a SetConfig message for a given source for a specific config key and config value.
+   * @param source the source of the config
+   * @param key the key for the given config
+   * @param value the value for the given config
+   */
+  public SetConfig(String source, String key, String value) {
+    super(source);
+    setType(TYPE);
+    setKey(key);
+    putMessageValue(CONFIG_VALUE_KEY, value);
+  }
+
+  /**
+   * Return the configuration value as a string.
+   * @return the configuration as a string
+   */
+  public String getConfigValue() {
+    return getMessageValue(CONFIG_VALUE_KEY);
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/59ec3b78/samza-core/src/main/java/org/apache/samza/coordinator/stream/messages/SetContainerHostMapping.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/coordinator/stream/messages/SetContainerHostMapping.java b/samza-core/src/main/java/org/apache/samza/coordinator/stream/messages/SetContainerHostMapping.java
new file mode 100644
index 0000000..5455881
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/coordinator/stream/messages/SetContainerHostMapping.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.coordinator.stream.messages;
+
+/**
+ * SetContainerHostMapping is used internally by the Samza framework to
+ * persist the container-to-host mappings.
+ *
+ * Structure of the message looks like:
+ * {
+ *     Key: $ContainerId
+ *     Type: set-container-host-assignment
+ *     Source: "SamzaContainer-$ContainerId"
+ *     MessageMap:
+ *     {
+ *         ip: InetAddressString,
+ *         jmx-url: jmxAddressString
+ *         jmx-tunneling-url: jmxTunnelingAddressString
+ *     }
+ * }
+ * */
+public class SetContainerHostMapping extends CoordinatorStreamMessage {
+  public static final String TYPE = "set-container-host-assignment";
+  public static final String IP_KEY = "ip";
+  public static final String JMX_URL_KEY = "jmx-url";
+  public static final String JMX_TUNNELING_URL_KEY = "jmx-tunneling-url";
+
+  /**
+   * SteContainerToHostMapping is used to set the container to host mapping information.
+   * @param message which holds the container to host information.
+   */
+  public SetContainerHostMapping(CoordinatorStreamMessage message) {
+    super(message.getKeyArray(), message.getMessageMap());
+  }
+
+  /**
+   * SteContainerToHostMapping is used to set the container to host mapping information.
+   * @param source the source of the message
+   * @param key the key which is used to persist the message
+   * @param hostHttpAddress the IP address of the container
+   * @param jmxAddress the JMX address of the container
+   * @param jmxTunnelingAddress the JMX tunneling address of the container
+   */
+  public SetContainerHostMapping(String source, String key, String hostHttpAddress, String jmxAddress, String jmxTunnelingAddress) {
+    super(source);
+    setType(TYPE);
+    setKey(key);
+    putMessageValue(IP_KEY, hostHttpAddress);
+    putMessageValue(JMX_URL_KEY, jmxAddress);
+    putMessageValue(JMX_TUNNELING_URL_KEY, jmxTunnelingAddress);
+  }
+
+  /**
+   * Returns the IP address of the container.
+   * @return the container IP address
+   */
+  public String getHostLocality() {
+    return getMessageValue(IP_KEY);
+
+  }
+
+  /**
+   * Returns the JMX url of the container.
+   * @return the JMX url
+   */
+  public String getJmxUrl() {
+    return getMessageValue(JMX_URL_KEY);
+  }
+
+  /**
+   * Returns the JMX tunneling url of the container
+   * @return the JMX tunneling url
+   */
+  public String getJmxTunnelingUrl() {
+    return getMessageValue(JMX_TUNNELING_URL_KEY);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/59ec3b78/samza-core/src/main/java/org/apache/samza/job/model/JobModel.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/job/model/JobModel.java b/samza-core/src/main/java/org/apache/samza/job/model/JobModel.java
index ad6387d..7b59274 100644
--- a/samza-core/src/main/java/org/apache/samza/job/model/JobModel.java
+++ b/samza-core/src/main/java/org/apache/samza/job/model/JobModel.java
@@ -74,7 +74,7 @@ public class JobModel {
    * Returns the container to host mapping for a given container ID and mapping key
    *
    * @param containerId the ID of the container
-   * @param key mapping key which is one of the keys declared in {@link org.apache.samza.coordinator.stream.CoordinatorStreamMessage.SetContainerHostMapping}
+   * @param key mapping key which is one of the keys declared in {@link org.apache.samza.coordinator.stream.messages.SetContainerHostMapping}
    * @return the value if it exists for a given container and key, otherwise an empty string
    */
   public String getContainerToHostValue(Integer containerId, String key) {

http://git-wip-us.apache.org/repos/asf/samza/blob/59ec3b78/samza-core/src/main/java/org/apache/samza/storage/ChangelogPartitionManager.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/storage/ChangelogPartitionManager.java b/samza-core/src/main/java/org/apache/samza/storage/ChangelogPartitionManager.java
index 7d3409c..7e274c7 100644
--- a/samza-core/src/main/java/org/apache/samza/storage/ChangelogPartitionManager.java
+++ b/samza-core/src/main/java/org/apache/samza/storage/ChangelogPartitionManager.java
@@ -21,12 +21,12 @@ package org.apache.samza.storage;
 
 import java.util.HashMap;
 import java.util.Map;
-import java.util.Set;
 import org.apache.samza.container.TaskName;
-import org.apache.samza.coordinator.stream.CoordinatorStreamMessage;
-import org.apache.samza.coordinator.stream.CoordinatorStreamMessage.SetChangelogMapping;
+import org.apache.samza.coordinator.stream.messages.CoordinatorStreamMessage;
+import org.apache.samza.coordinator.stream.messages.SetChangelogMapping;
 import org.apache.samza.coordinator.stream.CoordinatorStreamSystemConsumer;
 import org.apache.samza.coordinator.stream.CoordinatorStreamSystemProducer;
+import org.apache.samza.coordinator.stream.AbstractCoordinatorStreamManager;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -34,36 +34,15 @@ import org.slf4j.LoggerFactory;
 /**
  * The Changelog manager is used to persist and read the changelog information from the coordinator stream.
  */
-public class ChangelogPartitionManager {
+public class ChangelogPartitionManager extends AbstractCoordinatorStreamManager {
 
   private static final Logger log = LoggerFactory.getLogger(ChangelogPartitionManager.class);
-  private final CoordinatorStreamSystemProducer coordinatorStreamProducer;
-  private final CoordinatorStreamSystemConsumer coordinatorStreamConsumer;
   private boolean isCoordinatorConsumerRegistered = false;
-  private String source;
-
-  public ChangelogPartitionManager(CoordinatorStreamSystemProducer coordinatorStreamProducer,
-      CoordinatorStreamSystemConsumer coordinatorStreamConsumer) {
-    this.coordinatorStreamConsumer = coordinatorStreamConsumer;
-    this.coordinatorStreamProducer = coordinatorStreamProducer;
-    this.source = "Unknown";
-  }
 
   public ChangelogPartitionManager(CoordinatorStreamSystemProducer coordinatorStreamProducer,
       CoordinatorStreamSystemConsumer coordinatorStreamConsumer,
       String source) {
-    this(coordinatorStreamProducer, coordinatorStreamConsumer);
-    this.source = source;
-  }
-
-  public void start() {
-    coordinatorStreamProducer.start();
-    coordinatorStreamConsumer.start();
-  }
-
-  public void stop() {
-    coordinatorStreamConsumer.stop();
-    coordinatorStreamProducer.stop();
+    super(coordinatorStreamProducer, coordinatorStreamConsumer, source);
   }
 
   /**
@@ -73,10 +52,10 @@ public class ChangelogPartitionManager {
   public void register(TaskName taskName) {
     log.debug("Adding taskName {} to {}", taskName, this);
     if (!isCoordinatorConsumerRegistered) {
-      coordinatorStreamConsumer.register();
+      registerCoordinatorStreamConsumer();
       isCoordinatorConsumerRegistered = true;
     }
-    coordinatorStreamProducer.register(taskName.getTaskName());
+    registerCoordinatorStreamProducer(taskName.getTaskName());
   }
 
   /**
@@ -85,9 +64,8 @@ public class ChangelogPartitionManager {
    */
   public Map<TaskName, Integer> readChangeLogPartitionMapping() {
     log.debug("Reading changelog partition information");
-    Set<CoordinatorStreamMessage> bootstrappedStream = coordinatorStreamConsumer.getBootstrappedStream(SetChangelogMapping.TYPE);
-    HashMap<TaskName, Integer> changelogMapping = new HashMap<TaskName, Integer>();
-    for (CoordinatorStreamMessage coordinatorStreamMessage : bootstrappedStream) {
+    final HashMap<TaskName, Integer> changelogMapping = new HashMap<TaskName, Integer>();
+    for (CoordinatorStreamMessage coordinatorStreamMessage : getBootstrappedStream(SetChangelogMapping.TYPE)) {
       SetChangelogMapping changelogMapEntry = new SetChangelogMapping(coordinatorStreamMessage);
       changelogMapping.put(new TaskName(changelogMapEntry.getTaskName()), changelogMapEntry.getPartition());
       log.debug("TaskName: {} is mapped to {}", changelogMapEntry.getTaskName(), changelogMapEntry.getPartition());
@@ -104,10 +82,7 @@ public class ChangelogPartitionManager {
     log.debug("Updating changelog information with: ");
     for (Map.Entry<TaskName, Integer> entry : changelogEntries.entrySet()) {
       log.debug("TaskName: {} to Partition: {}", entry.getKey().getTaskName(), entry.getValue());
-      SetChangelogMapping changelogMapping = new SetChangelogMapping(source,
-          entry.getKey().getTaskName(),
-          entry.getValue());
-      coordinatorStreamProducer.send(changelogMapping);
+      send(new SetChangelogMapping(getSource(), entry.getKey().getTaskName(), entry.getValue()));
     }
   }
 

http://git-wip-us.apache.org/repos/asf/samza/blob/59ec3b78/samza-core/src/main/scala/org/apache/samza/coordinator/JobCoordinator.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/coordinator/JobCoordinator.scala b/samza-core/src/main/scala/org/apache/samza/coordinator/JobCoordinator.scala
index 0dbf14b..ea2eaa5 100644
--- a/samza-core/src/main/scala/org/apache/samza/coordinator/JobCoordinator.scala
+++ b/samza-core/src/main/scala/org/apache/samza/coordinator/JobCoordinator.scala
@@ -322,4 +322,4 @@ class JobCoordinator(
       info("Stopped checkpoint manager.")
     }
   }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/59ec3b78/samza-core/src/main/scala/org/apache/samza/job/JobRunner.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/job/JobRunner.scala b/samza-core/src/main/scala/org/apache/samza/job/JobRunner.scala
index 1c178a6..d7c928c 100644
--- a/samza-core/src/main/scala/org/apache/samza/job/JobRunner.scala
+++ b/samza-core/src/main/scala/org/apache/samza/job/JobRunner.scala
@@ -22,6 +22,7 @@ package org.apache.samza.job
 import org.apache.samza.SamzaException
 import org.apache.samza.config.Config
 import org.apache.samza.config.JobConfig.Config2Job
+import org.apache.samza.coordinator.stream.messages.{Delete, SetConfig, CoordinatorStreamMessage}
 import org.apache.samza.job.ApplicationStatus.Running
 import org.apache.samza.util.CommandLine
 import org.apache.samza.util.Logging
@@ -32,8 +33,6 @@ import org.apache.samza.config.ConfigException
 import org.apache.samza.config.SystemConfig
 import org.apache.samza.system.SystemFactory
 import org.apache.samza.metrics.MetricsRegistryMap
-import org.apache.samza.coordinator.stream.CoordinatorStreamMessage
-import org.apache.samza.coordinator.stream.CoordinatorStreamMessage.SetConfig
 import org.apache.samza.coordinator.stream.CoordinatorStreamSystemProducer
 import org.apache.samza.system.SystemStream
 import org.apache.samza.coordinator.stream.CoordinatorStreamSystemFactory
@@ -85,7 +84,7 @@ class JobRunner(config: Config) extends Logging {
     val oldConfig = coordinatorSystemConsumer.getConfig();
     info("Deleting old configs that are no longer defined: %s".format(oldConfig.keySet -- config.keySet))
     (oldConfig.keySet -- config.keySet).foreach(key => {
-      coordinatorSystemProducer.send(new CoordinatorStreamMessage.Delete(JobRunner.SOURCE, key, SetConfig.TYPE))
+      coordinatorSystemProducer.send(new Delete(JobRunner.SOURCE, key, SetConfig.TYPE))
     })
     coordinatorSystemProducer.stop
 

http://git-wip-us.apache.org/repos/asf/samza/blob/59ec3b78/samza-core/src/test/java/org/apache/samza/coordinator/stream/MockCoordinatorStreamWrappedConsumer.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/coordinator/stream/MockCoordinatorStreamWrappedConsumer.java b/samza-core/src/test/java/org/apache/samza/coordinator/stream/MockCoordinatorStreamWrappedConsumer.java
index e454593..47a44b1 100644
--- a/samza-core/src/test/java/org/apache/samza/coordinator/stream/MockCoordinatorStreamWrappedConsumer.java
+++ b/samza-core/src/test/java/org/apache/samza/coordinator/stream/MockCoordinatorStreamWrappedConsumer.java
@@ -27,6 +27,9 @@ import java.util.concurrent.CountDownLatch;
 import org.apache.samza.SamzaException;
 import org.apache.samza.checkpoint.Checkpoint;
 import org.apache.samza.config.Config;
+import org.apache.samza.coordinator.stream.messages.SetChangelogMapping;
+import org.apache.samza.coordinator.stream.messages.SetCheckpoint;
+import org.apache.samza.coordinator.stream.messages.SetConfig;
 import org.apache.samza.serializers.model.SamzaObjectMapper;
 import org.apache.samza.system.IncomingMessageEnvelope;
 import org.apache.samza.system.SystemStreamPartition;
@@ -76,17 +79,17 @@ public class MockCoordinatorStreamWrappedConsumer extends BlockingEnvelopeMap {
           HashMap<SystemStreamPartition, String> checkpointMap = new HashMap<SystemStreamPartition, String>();
           checkpointMap.put(Util.stringToSsp(sspOffsetPair[0]), sspOffsetPair[1]);
           Checkpoint cp = new Checkpoint(checkpointMap);
-          CoordinatorStreamMessage.SetCheckpoint setCheckpoint = new CoordinatorStreamMessage.SetCheckpoint(checkpointInfo[1], checkpointInfo[2], cp);
+          SetCheckpoint setCheckpoint = new SetCheckpoint(checkpointInfo[1], checkpointInfo[2], cp);
           keyBytes = MAPPER.writeValueAsString(setCheckpoint.getKeyArray()).getBytes("UTF-8");
           messgeBytes = MAPPER.writeValueAsString(setCheckpoint.getMessageMap()).getBytes("UTF-8");
         } else if (configPair.getKey().startsWith(CHANGELOGPREFIX)) {
           String[] changelogInfo = configPair.getKey().split(":");
           String changeLogPartition = configPair.getValue();
-          CoordinatorStreamMessage.SetChangelogMapping changelogMapping = new CoordinatorStreamMessage.SetChangelogMapping(changelogInfo[1], changelogInfo[2], Integer.parseInt(changeLogPartition));
+          SetChangelogMapping changelogMapping = new SetChangelogMapping(changelogInfo[1], changelogInfo[2], Integer.parseInt(changeLogPartition));
           keyBytes = MAPPER.writeValueAsString(changelogMapping.getKeyArray()).getBytes("UTF-8");
           messgeBytes = MAPPER.writeValueAsString(changelogMapping.getMessageMap()).getBytes("UTF-8");
         } else {
-          CoordinatorStreamMessage.SetConfig setConfig = new CoordinatorStreamMessage.SetConfig("source", configPair.getKey(), configPair.getValue());
+          SetConfig setConfig = new SetConfig("source", configPair.getKey(), configPair.getValue());
           keyBytes = MAPPER.writeValueAsString(setConfig.getKeyArray()).getBytes("UTF-8");
           messgeBytes = MAPPER.writeValueAsString(setConfig.getMessageMap()).getBytes("UTF-8");
         }

http://git-wip-us.apache.org/repos/asf/samza/blob/59ec3b78/samza-core/src/test/java/org/apache/samza/coordinator/stream/TestCoordinatorStreamMessage.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/coordinator/stream/TestCoordinatorStreamMessage.java b/samza-core/src/test/java/org/apache/samza/coordinator/stream/TestCoordinatorStreamMessage.java
index ac26a01..60e2e5d 100644
--- a/samza-core/src/test/java/org/apache/samza/coordinator/stream/TestCoordinatorStreamMessage.java
+++ b/samza-core/src/test/java/org/apache/samza/coordinator/stream/TestCoordinatorStreamMessage.java
@@ -24,6 +24,10 @@ import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.assertFalse;
+
+import org.apache.samza.coordinator.stream.messages.CoordinatorStreamMessage;
+import org.apache.samza.coordinator.stream.messages.Delete;
+import org.apache.samza.coordinator.stream.messages.SetConfig;
 import org.junit.Test;
 
 public class TestCoordinatorStreamMessage {
@@ -48,8 +52,8 @@ public class TestCoordinatorStreamMessage {
 
   @Test
   public void testSetConfig() {
-    CoordinatorStreamMessage.SetConfig setConfig = new CoordinatorStreamMessage.SetConfig("source", "key", "value");
-    assertEquals(CoordinatorStreamMessage.SetConfig.TYPE, setConfig.getType());
+    SetConfig setConfig = new SetConfig("source", "key", "value");
+    assertEquals(SetConfig.TYPE, setConfig.getType());
     assertEquals("key", setConfig.getKey());
     assertEquals("value", setConfig.getConfigValue());
     assertFalse(setConfig.isDelete());
@@ -58,7 +62,7 @@ public class TestCoordinatorStreamMessage {
 
   @Test
   public void testDelete() {
-    CoordinatorStreamMessage.Delete delete = new CoordinatorStreamMessage.Delete("source2", "key", "delete-type");
+    Delete delete = new Delete("source2", "key", "delete-type");
     assertEquals("delete-type", delete.getType());
     assertEquals("key", delete.getKey());
     assertNull(delete.getMessageMap());

http://git-wip-us.apache.org/repos/asf/samza/blob/59ec3b78/samza-core/src/test/java/org/apache/samza/coordinator/stream/TestCoordinatorStreamSystemConsumer.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/coordinator/stream/TestCoordinatorStreamSystemConsumer.java b/samza-core/src/test/java/org/apache/samza/coordinator/stream/TestCoordinatorStreamSystemConsumer.java
index c25f6a7..370cfb7 100644
--- a/samza-core/src/test/java/org/apache/samza/coordinator/stream/TestCoordinatorStreamSystemConsumer.java
+++ b/samza-core/src/test/java/org/apache/samza/coordinator/stream/TestCoordinatorStreamSystemConsumer.java
@@ -33,6 +33,9 @@ import java.util.Set;
 
 import org.apache.samza.Partition;
 import org.apache.samza.SamzaException;
+import org.apache.samza.coordinator.stream.messages.CoordinatorStreamMessage;
+import org.apache.samza.coordinator.stream.messages.Delete;
+import org.apache.samza.coordinator.stream.messages.SetConfig;
 import org.apache.samza.serializers.model.SamzaObjectMapper;
 import org.apache.samza.system.IncomingMessageEnvelope;
 import org.apache.samza.system.SystemConsumer;
@@ -72,9 +75,9 @@ public class TestCoordinatorStreamSystemConsumer {
   private boolean testOrder(Set<CoordinatorStreamMessage> bootstrappedStreamSet) {
     int initialSize = bootstrappedStreamSet.size();
     List<CoordinatorStreamMessage> listStreamMessages = new ArrayList<CoordinatorStreamMessage>();
-    listStreamMessages.add(new CoordinatorStreamMessage.SetConfig("order1", "job.name.order1", "my-order1-name"));
-    listStreamMessages.add(new CoordinatorStreamMessage.SetConfig("order2", "job.name.order2", "my-order2-name"));
-    listStreamMessages.add(new CoordinatorStreamMessage.SetConfig("order3", "job.name.order3", "my-order3-name"));
+    listStreamMessages.add(new SetConfig("order1", "job.name.order1", "my-order1-name"));
+    listStreamMessages.add(new SetConfig("order2", "job.name.order2", "my-order2-name"));
+    listStreamMessages.add(new SetConfig("order3", "job.name.order3", "my-order3-name"));
     bootstrappedStreamSet.addAll(listStreamMessages);
     Iterator<CoordinatorStreamMessage> iter = bootstrappedStreamSet.iterator();
 
@@ -126,9 +129,9 @@ public class TestCoordinatorStreamSystemConsumer {
 
       if (pollCount++ == 0) {
         List<IncomingMessageEnvelope> list = new ArrayList<IncomingMessageEnvelope>();
-        CoordinatorStreamMessage.SetConfig setConfig1 = new CoordinatorStreamMessage.SetConfig("test", "job.name", "my-job-name");
-        CoordinatorStreamMessage.SetConfig setConfig2 = new CoordinatorStreamMessage.SetConfig("test", "job.id", "1234");
-        CoordinatorStreamMessage.Delete delete = new CoordinatorStreamMessage.Delete("test", "job.name", CoordinatorStreamMessage.SetConfig.TYPE);
+        SetConfig setConfig1 = new SetConfig("test", "job.name", "my-job-name");
+        SetConfig setConfig2 = new SetConfig("test", "job.id", "1234");
+        Delete delete = new Delete("test", "job.name", SetConfig.TYPE);
         list.add(new IncomingMessageEnvelope(systemStreamPartition, null, serialize(setConfig1.getKeyArray()), serialize(setConfig1.getMessageMap())));
         list.add(new IncomingMessageEnvelope(systemStreamPartition, null, serialize(setConfig2.getKeyArray()), serialize(setConfig2.getMessageMap())));
         list.add(new IncomingMessageEnvelope(systemStreamPartition, null, serialize(delete.getKeyArray()), delete.getMessageMap()));

http://git-wip-us.apache.org/repos/asf/samza/blob/59ec3b78/samza-core/src/test/java/org/apache/samza/coordinator/stream/TestCoordinatorStreamSystemProducer.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/coordinator/stream/TestCoordinatorStreamSystemProducer.java b/samza-core/src/test/java/org/apache/samza/coordinator/stream/TestCoordinatorStreamSystemProducer.java
index 1ef07d0..9e5e06c 100644
--- a/samza-core/src/test/java/org/apache/samza/coordinator/stream/TestCoordinatorStreamSystemProducer.java
+++ b/samza-core/src/test/java/org/apache/samza/coordinator/stream/TestCoordinatorStreamSystemProducer.java
@@ -27,6 +27,9 @@ import java.util.List;
 import java.util.Map;
 
 import org.apache.samza.SamzaException;
+import org.apache.samza.coordinator.stream.messages.CoordinatorStreamMessage;
+import org.apache.samza.coordinator.stream.messages.Delete;
+import org.apache.samza.coordinator.stream.messages.SetConfig;
 import org.apache.samza.serializers.model.SamzaObjectMapper;
 import org.apache.samza.system.SystemAdmin;
 import org.apache.samza.system.OutgoingMessageEnvelope;
@@ -43,9 +46,9 @@ public class TestCoordinatorStreamSystemProducer {
     MockCoordinatorSystemProducer systemProducer = new MockCoordinatorSystemProducer(source);
     MockSystemAdmin systemAdmin = new MockSystemAdmin();
     CoordinatorStreamSystemProducer producer = new CoordinatorStreamSystemProducer(systemStream, systemProducer, systemAdmin);
-    CoordinatorStreamMessage.SetConfig setConfig1 = new CoordinatorStreamMessage.SetConfig(source, "job.name", "my-job-name");
-    CoordinatorStreamMessage.SetConfig setConfig2 = new CoordinatorStreamMessage.SetConfig(source, "job.id", "1234");
-    CoordinatorStreamMessage.Delete delete = new CoordinatorStreamMessage.Delete(source, "job.name", CoordinatorStreamMessage.SetConfig.TYPE);
+    SetConfig setConfig1 = new SetConfig(source, "job.name", "my-job-name");
+    SetConfig setConfig2 = new SetConfig(source, "job.id", "1234");
+    Delete delete = new Delete(source, "job.name", SetConfig.TYPE);
     assertFalse(systemProducer.isRegistered());
     producer.register(source);
     assertTrue(systemProducer.isRegistered());