You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by na...@apache.org on 2015/07/16 23:23:51 UTC

samza git commit: SAMZA-704 : Create a tool to write coordinator stream

Repository: samza
Updated Branches:
  refs/heads/master 81542ecf4 -> a4b831d3d


SAMZA-704 : Create a tool to write coordinator stream


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/a4b831d3
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/a4b831d3
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/a4b831d3

Branch: refs/heads/master
Commit: a4b831d3d3a555bc3cca2b0819813c6fad8bd480
Parents: 81542ec
Author: Navina <na...@gmail.com>
Authored: Thu Jul 16 14:20:57 2015 -0700
Committer: Navina <na...@gmail.com>
Committed: Thu Jul 16 14:20:57 2015 -0700

----------------------------------------------------------------------
 checkstyle/import-control.xml                   |   1 +
 checkstyle/import-control.xml.orig              | 183 +++++++++++++++++++
 .../stream/CoordinatorStreamMessage.java        |   8 +-
 .../stream/CoordinatorStreamWriter.java         | 128 +++++++++++++
 .../CoordinatorStreamWriterCommandLine.scala    |  71 +++++++
 .../MockCoordinatorStreamSystemFactory.java     |  90 ++++++---
 .../TestCoordinatorStreamSystemProducer.java    |  59 +-----
 .../stream/TestCoordinatorStreamWriter.java     | 166 +++++++++++++++++
 .../main/bash/run-coordinator-stream-writer.sh  |  21 +++
 9 files changed, 653 insertions(+), 74 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/a4b831d3/checkstyle/import-control.xml
----------------------------------------------------------------------
diff --git a/checkstyle/import-control.xml b/checkstyle/import-control.xml
index eef3370..6654319 100644
--- a/checkstyle/import-control.xml
+++ b/checkstyle/import-control.xml
@@ -123,6 +123,7 @@
 
         <allow class="org.apache.samza.Partition" />
         <allow class="org.apache.samza.SamzaException" />
+        <allow class="joptsimple.OptionSet" />
     </subpackage>
 
     <subpackage name="checkpoint">

http://git-wip-us.apache.org/repos/asf/samza/blob/a4b831d3/checkstyle/import-control.xml.orig
----------------------------------------------------------------------
diff --git a/checkstyle/import-control.xml.orig b/checkstyle/import-control.xml.orig
new file mode 100644
index 0000000..eef3370
--- /dev/null
+++ b/checkstyle/import-control.xml.orig
@@ -0,0 +1,183 @@
+<!DOCTYPE import-control PUBLIC
+    "-//Puppy Crawl//DTD Import Control 1.1//EN"
+    "http://www.puppycrawl.com/dtds/import_control_1_1.dtd">
+<!--
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+// 
+//    http://www.apache.org/licenses/LICENSE-2.0
+// 
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+--> 
+<import-control pkg="org.apache.samza">
+	
+	<!-- THINK HARD ABOUT THE LAYERING OF THE PROJECT BEFORE CHANGING THIS FILE -->
+	
+	<!-- common library dependencies -->
+	<allow pkg="java" />
+	<allow pkg="javax.management" />
+	<allow pkg="org.slf4j" />
+	<allow pkg="org.junit" />
+    <allow pkg="org.codehaus" />
+    <allow pkg="org.mockito" />
+    <allow pkg="org.apache.log4j" />
+    <allow pkg="org.apache.kafka" />
+
+    <subpackage name="config">
+        <allow class="org.apache.samza.SamzaException" />
+    </subpackage>
+
+    <subpackage name="serializers">
+        <allow pkg="org.apache.samza.config" />
+
+        <subpackage name="model">
+            <allow pkg="org.apache.samza.job.model" />
+            <allow pkg="org.apache.samza.util" />
+
+            <allow class="org.apache.samza.Partition" />
+            <allow class="org.apache.samza.container.TaskName" />
+            <allow class="org.apache.samza.system.SystemStreamPartition" />
+        </subpackage>
+    </subpackage>
+
+    <subpackage name="job">
+        <allow pkg="org.apache.samza.config" />
+
+        <subpackage name="model">
+            <allow class="org.apache.samza.Partition" />
+            <allow class="org.apache.samza.container.TaskName" />
+            <allow class="org.apache.samza.system.SystemStreamPartition" />
+            <allow class="org.apache.samza.container.LocalityManager" />
+        </subpackage>
+    </subpackage>
+
+    <subpackage name="system">
+        <allow pkg="org.apache.samza.config" />
+        <allow pkg="org.apache.samza.metrics" />
+        <allow pkg="org.apache.samza.serializers" />
+
+        <allow class="org.apache.samza.Partition" />
+        <allow class="org.apache.samza.SamzaException" />
+
+        <subpackage name="chooser">
+            <allow class="org.apache.samza.system.SystemStreamPartition" />
+            <allow class="org.apache.samza.system.IncomingMessageEnvelope" />
+        </subpackage>
+
+        <subpackage name="mock">
+            <allow pkg="org.apache.samza.system" />
+            <allow pkg="org.apache.samza.util" />
+        </subpackage>
+    </subpackage>
+
+    <subpackage name="util">
+        <allow pkg="org.apache.samza.metrics" />
+        <allow pkg="org.apache.samza.system" />
+
+        <allow class="org.apache.samza.Partition" />
+        <allow class="org.apache.samza.SamzaException" />
+    </subpackage>
+
+    <subpackage name="metrics">
+        <allow pkg="org.apache.samza.config" />
+        <allow pkg="org.apache.samza.util" />
+    </subpackage>
+
+    <subpackage name="task">
+        <allow pkg="org.apache.samza.config" />
+        <allow pkg="org.apache.samza.container" />
+        <allow pkg="org.apache.samza.metrics" />
+        <allow pkg="org.apache.samza.system" />
+    </subpackage>
+
+    <subpackage name="container">
+        <allow pkg="org.apache.samza.config" />
+        <allow pkg="org.apache.samza.coordinator.stream" />
+        <subpackage name="grouper">
+            <subpackage name="stream">
+                <allow pkg="org.apache.samza.container" />
+                <allow pkg="org.apache.samza.system" />
+            </subpackage>
+
+            <subpackage name="task">
+                <allow pkg="org.apache.samza.job" />
+            </subpackage>
+        </subpackage>
+    </subpackage>
+
+    <subpackage name="coordinator">
+        <allow pkg="org.apache.samza.checkpoint" />
+        <allow pkg="org.apache.samza.config" />
+        <allow pkg="org.apache.samza.metrics" />
+        <allow pkg="org.apache.samza.system" />
+        <allow pkg="org.apache.samza.serializers" />
+        <allow pkg="org.apache.samza.util" />
+
+        <allow class="org.apache.samza.Partition" />
+        <allow class="org.apache.samza.SamzaException" />
+    </subpackage>
+
+    <subpackage name="checkpoint">
+        <allow pkg="org.apache.samza.config" />
+        <allow pkg="org.apache.samza.container" />
+        <allow pkg="org.apache.samza.coordinator" />
+        <allow pkg="org.apache.samza.metrics" />
+        <allow pkg="org.apache.samza.system" />
+
+        <allow class="org.apache.samza.SamzaException" />
+    </subpackage>
+
+    <subpackage name="storage">
+        <allow pkg="org.apache.samza.container" />
+        <allow pkg="org.apache.samza.coordinator" />
+        <allow pkg="org.apache.samza.metrics" />
+        <allow pkg="org.apache.samza.serializers" />
+        <allow pkg="org.apache.samza.system" />
+        <allow pkg="org.apache.samza.task" />
+        <allow pkg="org.apache.samza.util" />
+        <allow pkg="org.apache.samza.job" />
+        <allow pkg="org.apache.samza.config" />
+        <allow pkg="joptsimple" />
+
+        <allow class="org.apache.samza.SamzaException" />
+        <allow class="org.apache.samza.Partition" />
+    </subpackage>
+
+    <subpackage name="logging">
+        <subpackage name="log4j">
+            <allow pkg="org.apache.samza.config" />
+            <allow pkg="org.apache.samza.coordinator" />
+            <allow pkg="org.apache.samza.job" />
+            <allow pkg="org.apache.samza.metrics" />
+            <allow pkg="org.apache.samza.system" />
+            <allow pkg="org.apache.samza.serializers" />
+            <allow pkg="org.apache.samza.util" />
+
+            <allow class="org.apache.samza.logging.log4j.serializers.LoggingEventJsonSerdeFactory" />
+            <allow class="org.apache.samza.logging.log4j.serializers.LoggingEventJsonSerde" />
+            <allow class="org.apache.samza.logging.log4j.serializers.LoggingEventStringSerdeFactory" />
+            <allow class="org.apache.samza.logging.log4j.serializers.LoggingEventStringSerde" />
+            <allow class="org.apache.samza.SamzaException" />
+        </subpackage>
+    </subpackage>
+
+    <subpackage name="test">
+        <subpackage name="integration">
+            <allow pkg="org.apache.samza.config" />
+            <allow pkg="org.apache.samza.container" />
+            <allow pkg="org.apache.samza.system" />
+            <allow pkg="org.apache.samza.storage" />
+            <allow pkg="org.apache.samza.task" />
+            <allow pkg="org.apache.samza.util" />
+        </subpackage>
+    </subpackage>
+
+</import-control>

http://git-wip-us.apache.org/repos/asf/samza/blob/a4b831d3/samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamMessage.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamMessage.java b/samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamMessage.java
index 6bd1bd3..e5ab4fb 100644
--- a/samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamMessage.java
+++ b/samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamMessage.java
@@ -211,7 +211,7 @@ public class CoordinatorStreamMessage {
 
   /**
    * The type of the message is used to convert a generic
-   * CoordinatorStreaMessage into a specific message, such as a SetConfig
+   * CoordinatorStreamMessage into a specific message, such as a SetConfig
    * message.
    *
    * @return The type of the message.
@@ -235,14 +235,14 @@ public class CoordinatorStreamMessage {
   }
 
   /**
-   * @return Whether the message signifies a delete or not.
+   * @return The username of a message.
    */
   public String getUsername() {
     return (String) this.messageMap.get("username");
   }
 
   /**
-   * @return Whether the message signifies a delete or not.
+   * @return The timestamp of a message.
    */
   public long getTimestamp() {
     return (Long) this.messageMap.get("timestamp");
@@ -254,7 +254,7 @@ public class CoordinatorStreamMessage {
   public Map<String, Object> getMessageMap() {
     if (!isDelete) {
       Map<String, Object> immutableMap = new HashMap<String, Object>(messageMap);
-      // To make sure the values is not immutable, we overwrite it with an immutable version of the the values map.
+      // To make sure the values is immutable, we overwrite it with an immutable version of the the values map.
       immutableMap.put("values", Collections.unmodifiableMap(getMessageValues()));
       return Collections.unmodifiableMap(immutableMap);
     } else {

http://git-wip-us.apache.org/repos/asf/samza/blob/a4b831d3/samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamWriter.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamWriter.java b/samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamWriter.java
new file mode 100644
index 0000000..f769756
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamWriter.java
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.coordinator.stream;
+
+import joptsimple.OptionSet;
+import org.apache.samza.config.Config;
+import org.apache.samza.metrics.MetricsRegistryMap;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * This class writes control messages to the CoordinatorStream.
+ * To use this class it, first, it should be initialized by the start() method,
+ * and then use the sendMessage() function to send all the control messages needed.
+ * Finally, the stop() method should be called.
+ * The control messages are in the format of a (type, key, value) where:
+ * type: defines the kind of message of the control message from the set {set-config}.
+ * key: defines a key to associate with the value. This can be null as well for messages with no value
+ * value: defines the value being sent along with the message. This can be null as well for messages with no value.
+ */
+public class CoordinatorStreamWriter {
+
+  private static final Logger log = LoggerFactory.getLogger(CoordinatorStreamWriter.class);
+  public final static String SOURCE = "coordinator-stream-writer";
+  public static final String SET_CONFIG_TYPE = CoordinatorStreamMessage.SetConfig.TYPE;
+
+  private CoordinatorStreamSystemProducer coordinatorStreamSystemProducer;
+
+
+  public CoordinatorStreamWriter(Config config) {
+    coordinatorStreamSystemProducer = new CoordinatorStreamSystemFactory().getCoordinatorStreamSystemProducer(config, new MetricsRegistryMap());
+  }
+
+  /**
+   * This method initializes the writer by starting a coordinator stream producer.
+   */
+  public void start() {
+    coordinatorStreamSystemProducer.register(CoordinatorStreamWriter.SOURCE);
+    coordinatorStreamSystemProducer.start();
+    log.info("Started coordinator stream writer.");
+  }
+
+  /**
+   * This method stops the writer and closes the coordinator stream producer
+   */
+  public void stop() {
+    log.info("Stopping the coordinator stream producer.");
+    coordinatorStreamSystemProducer.stop();
+  }
+
+  /**
+   * This method sends a message to the coordinator stream. This creates a message containing (type,key,value).
+   * For example if you want to set the number of yarn containers to 3, you would use
+   * ("set-config", "yarn.container.count", "3").
+   *
+   * @param type  defines the kind of message of the control message from the set {"set-config"}.
+   * @param key   defines a key to associate with the value.  This can be null for messages with no key or value.
+   * @param value defines the value being sent along with the message. This can be null for messages with no value.
+   */
+  public void sendMessage(String type, String key, String value) {
+    //TODO: validate keys and values
+    if (type.equals(SET_CONFIG_TYPE)) {
+      sendSetConfigMessage(key, value);
+    } else {
+      throw new IllegalArgumentException("Type is invalid. The possible values are {" + SET_CONFIG_TYPE + "}");
+    }
+  }
+
+  /**
+   * This method sends message of type "set-config" to the coordinator stream
+   *
+   * @param key   defines the name of the configuration being set. For example, for setting the number of yarn containers,
+   *              the key is "yarn.container.count"
+   * @param value defines the value associated with the key. For example, if the key is "yarn.container.count" the value
+   *              is the new number of containers.
+   */
+  private void sendSetConfigMessage(String key, String value) {
+    log.info("sent SetConfig message with key = " + key + " and value = " + value);
+    coordinatorStreamSystemProducer.send(new CoordinatorStreamMessage.SetConfig(CoordinatorStreamWriter.SOURCE, key, value));
+  }
+
+  /**
+   * Main function for using the CoordinatorStreamWriter. The main function starts a CoordinatorStreamWriter
+   * and sends control messages.
+   * To run the code use the following command:
+   * {path to samza deployment}/samza/bin/run-coordinator-stream-writer.sh  --config-factory={config-factory} --config-path={path to config file of a job} --type={type of the message} --key={[optional] key of the message} --value={[optional] value of the message}
+   *
+   * @param args input arguments for running the writer. These arguments are:
+   *             "config-factory" = The config file factory
+   *             "config-path" = The path to config file of a job
+   *             "type" = type of the message being written
+   *             "key" = [optional] key of the message being written
+   *             "value" = [optional] value of the message being written
+   */
+  public static void main(String[] args) {
+    CoordinatorStreamWriterCommandLine cmdline = new CoordinatorStreamWriterCommandLine();
+    OptionSet options = cmdline.parser().parse(args);
+    Config config = cmdline.loadConfig(options);
+    String type = cmdline.loadType(options);
+    String key = cmdline.loadKey(options);
+    String value = cmdline.loadValue(options);
+
+    CoordinatorStreamWriter writer = new CoordinatorStreamWriter(config);
+    writer.start();
+    writer.sendMessage(type, key, value);
+    writer.stop();
+  }
+
+}
+

http://git-wip-us.apache.org/repos/asf/samza/blob/a4b831d3/samza-core/src/main/scala/org/apache/samza/coordinator/stream/CoordinatorStreamWriterCommandLine.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/coordinator/stream/CoordinatorStreamWriterCommandLine.scala b/samza-core/src/main/scala/org/apache/samza/coordinator/stream/CoordinatorStreamWriterCommandLine.scala
new file mode 100644
index 0000000..0c17800
--- /dev/null
+++ b/samza-core/src/main/scala/org/apache/samza/coordinator/stream/CoordinatorStreamWriterCommandLine.scala
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.coordinator.stream
+
+import org.apache.samza.util.CommandLine
+import joptsimple.OptionSet
+
+class CoordinatorStreamWriterCommandLine extends CommandLine {
+
+  val messageType =
+    parser.accepts("type", "the type of the message being sent.")
+        .withRequiredArg
+        .ofType(classOf[java.lang.String])
+        .describedAs("Required field. This field is the type of the message being sent." +
+        " The possible values are {\"set-config\"}")
+
+
+  val messageKey =
+    parser.accepts("key", "the type of the message being sent")
+        .withRequiredArg
+        .ofType(classOf[java.lang.String])
+        .describedAs("key of the message")
+
+  val messageValue =
+    parser.accepts("value", "the type of the message being sent")
+        .withRequiredArg
+        .ofType(classOf[java.lang.String])
+        .describedAs("value of the message")
+
+  def loadType(options: OptionSet) = {
+    if (!options.has(messageType)) {
+      parser.printHelpOn(System.err)
+      System.exit(-1)
+    }
+    options.valueOf(messageType)
+  }
+
+  def loadKey(options: OptionSet): java.lang.String = {
+    if (options.has(messageKey)) {
+      options.valueOf(messageKey)
+    } else {
+      null
+    }
+  }
+
+  def loadValue(options: OptionSet) = {
+    var value: java.lang.String = null
+    if (options.has(messageValue)) {
+      value = options.valueOf(messageValue)
+    }
+
+    value
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/a4b831d3/samza-core/src/test/java/org/apache/samza/coordinator/stream/MockCoordinatorStreamSystemFactory.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/coordinator/stream/MockCoordinatorStreamSystemFactory.java b/samza-core/src/test/java/org/apache/samza/coordinator/stream/MockCoordinatorStreamSystemFactory.java
index 647cadb..84ae0b5 100644
--- a/samza-core/src/test/java/org/apache/samza/coordinator/stream/MockCoordinatorStreamSystemFactory.java
+++ b/samza-core/src/test/java/org/apache/samza/coordinator/stream/MockCoordinatorStreamSystemFactory.java
@@ -24,14 +24,18 @@ import org.apache.samza.config.Config;
 import org.apache.samza.config.ConfigException;
 import org.apache.samza.metrics.MetricsRegistry;
 import org.apache.samza.system.SystemAdmin;
-import org.apache.samza.system.OutgoingMessageEnvelope;
 import org.apache.samza.system.SystemConsumer;
 import org.apache.samza.system.SystemFactory;
-import org.apache.samza.system.SystemProducer;
 import org.apache.samza.system.SystemStreamPartition;
+import org.apache.samza.system.SystemProducer;
+import org.apache.samza.system.OutgoingMessageEnvelope;
 import org.apache.samza.util.SinglePartitionWithoutOffsetsSystemAdmin;
 import org.apache.samza.util.Util;
 
+import java.util.ArrayList;
+import java.util.List;
+
+
 /**
  * Helper for creating mock CoordinatorStreamConsumer and
  * CoordinatorStreamConsumer. The CoordinatorStreamConsumer is meant to just
@@ -42,6 +46,7 @@ public class MockCoordinatorStreamSystemFactory implements SystemFactory {
 
   private static SystemConsumer mockConsumer = null;
   private static boolean useCachedConsumer = false;
+
   public static void enableMockConsumerCache() {
     mockConsumer = null;
     useCachedConsumer = true;
@@ -54,9 +59,10 @@ public class MockCoordinatorStreamSystemFactory implements SystemFactory {
 
   /**
    * Returns a consumer that sends all configs to the coordinator stream.
+   *
    * @param config Along with the configs, you can pass checkpoints and changelog stream messages into the stream.
    *               The expected pattern is cp:source:taskname -> ssp,offset for checkpoint (Use sspToString util)
-   *                                       ch:source:taskname -> changelogPartition for changelog
+   *               ch:source:taskname -> changelogPartition for changelog
    *               Everything else is processed as normal config
    */
   public SystemConsumer getConsumer(String systemName, Config config, MetricsRegistry registry) {
@@ -80,26 +86,10 @@ public class MockCoordinatorStreamSystemFactory implements SystemFactory {
   }
 
   /**
-   * Returns a no-op producer.
+   * Returns a MockCoordinatorSystemProducer.
    */
   public SystemProducer getProducer(String systemName, Config config, MetricsRegistry registry) {
-    // A do-nothing producer.
-    return new SystemProducer() {
-      public void start() {
-      }
-
-      public void stop() {
-      }
-
-      public void register(String source) {
-      }
-
-      public void send(String source, OutgoingMessageEnvelope envelope) {
-      }
-
-      public void flush(String source) {
-      }
-    };
+    return new MockSystemProducer(null);
   }
 
   /**
@@ -115,4 +105,62 @@ public class MockCoordinatorStreamSystemFactory implements SystemFactory {
       // Do nothing.
     }
   }
+
+  protected static class MockSystemProducer implements SystemProducer {
+    private final String expectedSource;
+    private final List<OutgoingMessageEnvelope> envelopes;
+    private boolean started = false;
+    private boolean registered = false;
+    private boolean flushed = false;
+
+    public MockSystemProducer(String expectedSource) {
+      this.expectedSource = expectedSource;
+      this.envelopes = new ArrayList<OutgoingMessageEnvelope>();
+    }
+
+
+    public void start() {
+      started = true;
+    }
+
+    public void stop() {
+      started = false;
+    }
+
+    public void register(String source) {
+      registered = true;
+    }
+
+    public void send(String source, OutgoingMessageEnvelope envelope) {
+      envelopes.add(envelope);
+    }
+
+    public void flush(String source) {
+      flushed = true;
+    }
+
+    public List<OutgoingMessageEnvelope> getEnvelopes() {
+      return envelopes;
+    }
+
+    public boolean isStarted() {
+      return started;
+    }
+
+    public boolean isStopped() {
+      return !started;
+    }
+
+    public boolean isRegistered() {
+      return registered;
+    }
+
+    public boolean isFlushed() {
+      return flushed;
+    }
+
+    public String getExpectedSource() {
+      return expectedSource;
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/a4b831d3/samza-core/src/test/java/org/apache/samza/coordinator/stream/TestCoordinatorStreamSystemProducer.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/coordinator/stream/TestCoordinatorStreamSystemProducer.java b/samza-core/src/test/java/org/apache/samza/coordinator/stream/TestCoordinatorStreamSystemProducer.java
index 68e3255..1ef07d0 100644
--- a/samza-core/src/test/java/org/apache/samza/coordinator/stream/TestCoordinatorStreamSystemProducer.java
+++ b/samza-core/src/test/java/org/apache/samza/coordinator/stream/TestCoordinatorStreamSystemProducer.java
@@ -23,7 +23,6 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 
-import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
 
@@ -31,7 +30,6 @@ import org.apache.samza.SamzaException;
 import org.apache.samza.serializers.model.SamzaObjectMapper;
 import org.apache.samza.system.SystemAdmin;
 import org.apache.samza.system.OutgoingMessageEnvelope;
-import org.apache.samza.system.SystemProducer;
 import org.apache.samza.system.SystemStream;
 import org.apache.samza.util.SinglePartitionWithoutOffsetsSystemAdmin;
 import org.codehaus.jackson.type.TypeReference;
@@ -42,7 +40,7 @@ public class TestCoordinatorStreamSystemProducer {
   public void testCoordinatorStreamSystemProducer() {
     String source = "source";
     SystemStream systemStream = new SystemStream("system", "stream");
-    MockSystemProducer systemProducer = new MockSystemProducer(source);
+    MockCoordinatorSystemProducer systemProducer = new MockCoordinatorSystemProducer(source);
     MockSystemAdmin systemAdmin = new MockSystemAdmin();
     CoordinatorStreamSystemProducer producer = new CoordinatorStreamSystemProducer(systemStream, systemProducer, systemAdmin);
     CoordinatorStreamMessage.SetConfig setConfig1 = new CoordinatorStreamMessage.SetConfig(source, "job.name", "my-job-name");
@@ -93,59 +91,22 @@ public class TestCoordinatorStreamSystemProducer {
     }
   }
 
-  private static class MockSystemProducer implements SystemProducer {
-    private final String expectedSource;
-    private final List<OutgoingMessageEnvelope> envelopes;
-    private boolean started = false;
-    private boolean stopped = false;
-    private boolean registered = false;
-    private boolean flushed = false;
+  private static class MockCoordinatorSystemProducer extends MockCoordinatorStreamSystemFactory.MockSystemProducer {
 
-    public MockSystemProducer(String expectedSource) {
-      this.expectedSource = expectedSource;
-      this.envelopes = new ArrayList<OutgoingMessageEnvelope>();
-    }
-
-    public void start() {
-      started = true;
-    }
-
-    public void stop() {
-      stopped = true;
+    public MockCoordinatorSystemProducer(String expectedSource) {
+      super(expectedSource);
     }
 
+    @Override
     public void register(String source) {
-      assertEquals(expectedSource, source);
-      registered = true;
-    }
-
-    public void send(String source, OutgoingMessageEnvelope envelope) {
-      envelopes.add(envelope);
+      assertEquals(super.getExpectedSource(), source);
+      super.register(source);
     }
 
+    @Override
     public void flush(String source) {
-      assertEquals(expectedSource, source);
-      flushed = true;
-    }
-
-    public List<OutgoingMessageEnvelope> getEnvelopes() {
-      return envelopes;
-    }
-
-    public boolean isStarted() {
-      return started;
-    }
-
-    public boolean isStopped() {
-      return stopped;
-    }
-
-    public boolean isRegistered() {
-      return registered;
-    }
-
-    public boolean isFlushed() {
-      return flushed;
+      assertEquals(super.getExpectedSource(), source);
+      super.flush(source);
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/a4b831d3/samza-core/src/test/java/org/apache/samza/coordinator/stream/TestCoordinatorStreamWriter.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/coordinator/stream/TestCoordinatorStreamWriter.java b/samza-core/src/test/java/org/apache/samza/coordinator/stream/TestCoordinatorStreamWriter.java
new file mode 100644
index 0000000..c484660
--- /dev/null
+++ b/samza-core/src/test/java/org/apache/samza/coordinator/stream/TestCoordinatorStreamWriter.java
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.coordinator.stream;
+
+import org.apache.samza.SamzaException;
+import org.apache.samza.config.Config;
+import org.apache.samza.config.MapConfig;
+import org.apache.samza.serializers.model.SamzaObjectMapper;
+import org.apache.samza.system.OutgoingMessageEnvelope;
+import org.codehaus.jackson.type.TypeReference;
+import org.junit.Test;
+
+import java.lang.reflect.Field;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.junit.Assert.*;
+
+/**
+ * This class is a unit test for the CoordinatorStreamWriter class.
+ */
+public class TestCoordinatorStreamWriter {
+
+  private CoordinatorStreamWriter coordinatorStreamWriter;
+  private MockCoordinatorStreamSystemFactory.MockSystemProducer systemProducer;
+
+  @Test
+  public void testCoordinatorStream() {
+
+    Map<String, String> configMap = new HashMap<>();
+    configMap.put("systems.coordinatorStreamWriter.samza.factory", "org.apache.samza.coordinator.stream.MockCoordinatorStreamSystemFactory");
+    configMap.put("job.name", "coordinator-stream-writer-test");
+    Config config = new MapConfig(configMap);
+    coordinatorStreamWriter = new CoordinatorStreamWriter(config);
+    boolean exceptionHappened = false;
+
+    try {
+
+      //get coordinator system producer
+      Field coordinatorProducerField = coordinatorStreamWriter.getClass().getDeclaredField("coordinatorStreamSystemProducer");
+      coordinatorProducerField.setAccessible(true);
+      assertNotNull(coordinatorProducerField.get(coordinatorStreamWriter));
+      CoordinatorStreamSystemProducer coordinatorStreamSystemProducer = (CoordinatorStreamSystemProducer) coordinatorProducerField.get(coordinatorStreamWriter);
+
+      //get mock system producer
+      Field systemProducerField = coordinatorStreamSystemProducer.getClass().getDeclaredField("systemProducer");
+      systemProducerField.setAccessible(true);
+      systemProducer = (MockCoordinatorStreamSystemFactory.MockSystemProducer) systemProducerField.get(coordinatorStreamSystemProducer);
+
+      testStart();
+      testSendMessage();
+      testStop();
+
+
+    } catch (NoSuchFieldException | IllegalAccessException | NoSuchMethodException | InvocationTargetException e) {
+      e.printStackTrace();
+      exceptionHappened = true;
+    }
+
+    assertFalse(exceptionHappened);
+
+
+  }
+
+
+  public void testStart() throws NoSuchFieldException, IllegalAccessException {
+
+    //checks before starting
+    assertFalse(systemProducer.isStarted());
+
+    //start and check if start has been done successfully
+    coordinatorStreamWriter.start();
+    assertTrue(systemProducer.isStarted());
+
+  }
+
+  public void testStop() throws NoSuchFieldException, IllegalAccessException {
+
+    //checks before stopping
+    assertTrue(systemProducer.isStarted());
+
+    //stop and check if stop has been done correctly
+    coordinatorStreamWriter.stop();
+    assertTrue(systemProducer.isStopped());
+  }
+
+  public void testSendMessage() throws NoSuchMethodException, InvocationTargetException, IllegalAccessException {
+
+    //check a correct message
+    assertEquals(0, systemProducer.getEnvelopes().size());
+    coordinatorStreamWriter.sendMessage("set-config", "key0", "value0");
+    assertEquals(1, systemProducer.getEnvelopes().size());
+
+    //check invalid input is handled
+    boolean exceptionHappened = false;
+    try {
+      coordinatorStreamWriter.sendMessage("invalid-type", "key-invalid", "value-invalid");
+    } catch (IllegalArgumentException e) {
+      exceptionHappened = true;
+    }
+    assertTrue(exceptionHappened);
+    assertEquals(1, systemProducer.getEnvelopes().size());
+
+
+    //check sendSetConfigMessage method works correctly
+    Class[] sendArgs = {String.class, String.class};
+    Method sendSetConfigMethod = coordinatorStreamWriter.getClass().getDeclaredMethod("sendSetConfigMessage", sendArgs);
+    sendSetConfigMethod.setAccessible(true);
+    sendSetConfigMethod.invoke(coordinatorStreamWriter, "key1", "value1");
+    assertEquals(2, systemProducer.getEnvelopes().size());
+
+
+    //check the messages are correct
+    List<OutgoingMessageEnvelope> envelopes = systemProducer.getEnvelopes();
+    OutgoingMessageEnvelope envelope0 = envelopes.get(0);
+    OutgoingMessageEnvelope envelope1 = envelopes.get(1);
+    TypeReference<Object[]> keyRef = new TypeReference<Object[]>() {
+    };
+    TypeReference<Map<String, Object>> msgRef = new TypeReference<Map<String, Object>>() {
+    };
+    assertEquals(2, envelopes.size());
+
+    assertEquals("key0", deserialize((byte[]) envelope0.getKey(), keyRef)[CoordinatorStreamMessage.KEY_INDEX]);
+    Map<String, String> values = (Map<String, String>) deserialize((byte[]) envelope0.getMessage(), msgRef).get("values");
+    assertEquals("value0", values.get("value"));
+
+    assertEquals("key1", deserialize((byte[]) envelope1.getKey(), keyRef)[CoordinatorStreamMessage.KEY_INDEX]);
+    values = (Map<String, String>) deserialize((byte[]) envelope1.getMessage(), msgRef).get("values");
+    assertEquals("value1", values.get("value"));
+  }
+
+  private <T> T deserialize(byte[] bytes, TypeReference<T> reference) {
+    try {
+      if (bytes != null) {
+        String valueStr = new String((byte[]) bytes, "UTF-8");
+        return SamzaObjectMapper.getObjectMapper().readValue(valueStr, reference);
+      }
+    } catch (Exception e) {
+      throw new SamzaException(e);
+    }
+
+    return null;
+  }
+
+}
+

http://git-wip-us.apache.org/repos/asf/samza/blob/a4b831d3/samza-shell/src/main/bash/run-coordinator-stream-writer.sh
----------------------------------------------------------------------
diff --git a/samza-shell/src/main/bash/run-coordinator-stream-writer.sh b/samza-shell/src/main/bash/run-coordinator-stream-writer.sh
new file mode 100644
index 0000000..d2249dd
--- /dev/null
+++ b/samza-shell/src/main/bash/run-coordinator-stream-writer.sh
@@ -0,0 +1,21 @@
+#!/bin/bash
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+[[ $JAVA_OPTS != *-Dlog4j.configuration* ]] && export JAVA_OPTS="$JAVA_OPTS -Dlog4j.configuration=file:$(dirname $0)/log4j-console.xml"
+
+exec $(dirname $0)/run-class.sh org.apache.samza.coordinator.stream.CoordinatorStreamWriter "$@"