You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by na...@apache.org on 2015/07/17 00:08:29 UTC

samza git commit: Revert "SAMZA-704 : Create a tool to write coordinator stream"

Repository: samza
Updated Branches:
  refs/heads/master a4b831d3d -> 01ee053ed


Revert "SAMZA-704 : Create a tool to write coordinator stream"

This reverts commit a4b831d3d3a555bc3cca2b0819813c6fad8bd480.


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/01ee053e
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/01ee053e
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/01ee053e

Branch: refs/heads/master
Commit: 01ee053ed720832a1ac679525f1aa2b29fe0463e
Parents: a4b831d
Author: Navina <na...@gmail.com>
Authored: Thu Jul 16 15:07:54 2015 -0700
Committer: Navina <na...@gmail.com>
Committed: Thu Jul 16 15:07:54 2015 -0700

----------------------------------------------------------------------
 checkstyle/import-control.xml                   |   1 -
 checkstyle/import-control.xml.orig              | 183 -------------------
 .../stream/CoordinatorStreamMessage.java        |   8 +-
 .../stream/CoordinatorStreamWriter.java         | 128 -------------
 .../CoordinatorStreamWriterCommandLine.scala    |  71 -------
 .../MockCoordinatorStreamSystemFactory.java     |  90 +++------
 .../TestCoordinatorStreamSystemProducer.java    |  59 +++++-
 .../stream/TestCoordinatorStreamWriter.java     | 166 -----------------
 .../main/bash/run-coordinator-stream-writer.sh  |  21 ---
 9 files changed, 74 insertions(+), 653 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/01ee053e/checkstyle/import-control.xml
----------------------------------------------------------------------
diff --git a/checkstyle/import-control.xml b/checkstyle/import-control.xml
index 6654319..eef3370 100644
--- a/checkstyle/import-control.xml
+++ b/checkstyle/import-control.xml
@@ -123,7 +123,6 @@
 
         <allow class="org.apache.samza.Partition" />
         <allow class="org.apache.samza.SamzaException" />
-        <allow class="joptsimple.OptionSet" />
     </subpackage>
 
     <subpackage name="checkpoint">

http://git-wip-us.apache.org/repos/asf/samza/blob/01ee053e/checkstyle/import-control.xml.orig
----------------------------------------------------------------------
diff --git a/checkstyle/import-control.xml.orig b/checkstyle/import-control.xml.orig
deleted file mode 100644
index eef3370..0000000
--- a/checkstyle/import-control.xml.orig
+++ /dev/null
@@ -1,183 +0,0 @@
-<!DOCTYPE import-control PUBLIC
-    "-//Puppy Crawl//DTD Import Control 1.1//EN"
-    "http://www.puppycrawl.com/dtds/import_control_1_1.dtd">
-<!--
-// Licensed to the Apache Software Foundation (ASF) under one or more
-// contributor license agreements.  See the NOTICE file distributed with
-// this work for additional information regarding copyright ownership.
-// The ASF licenses this file to You under the Apache License, Version 2.0
-// (the "License"); you may not use this file except in compliance with
-// the License.  You may obtain a copy of the License at
-// 
-//    http://www.apache.org/licenses/LICENSE-2.0
-// 
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
---> 
-<import-control pkg="org.apache.samza">
-	
-	<!-- THINK HARD ABOUT THE LAYERING OF THE PROJECT BEFORE CHANGING THIS FILE -->
-	
-	<!-- common library dependencies -->
-	<allow pkg="java" />
-	<allow pkg="javax.management" />
-	<allow pkg="org.slf4j" />
-	<allow pkg="org.junit" />
-    <allow pkg="org.codehaus" />
-    <allow pkg="org.mockito" />
-    <allow pkg="org.apache.log4j" />
-    <allow pkg="org.apache.kafka" />
-
-    <subpackage name="config">
-        <allow class="org.apache.samza.SamzaException" />
-    </subpackage>
-
-    <subpackage name="serializers">
-        <allow pkg="org.apache.samza.config" />
-
-        <subpackage name="model">
-            <allow pkg="org.apache.samza.job.model" />
-            <allow pkg="org.apache.samza.util" />
-
-            <allow class="org.apache.samza.Partition" />
-            <allow class="org.apache.samza.container.TaskName" />
-            <allow class="org.apache.samza.system.SystemStreamPartition" />
-        </subpackage>
-    </subpackage>
-
-    <subpackage name="job">
-        <allow pkg="org.apache.samza.config" />
-
-        <subpackage name="model">
-            <allow class="org.apache.samza.Partition" />
-            <allow class="org.apache.samza.container.TaskName" />
-            <allow class="org.apache.samza.system.SystemStreamPartition" />
-            <allow class="org.apache.samza.container.LocalityManager" />
-        </subpackage>
-    </subpackage>
-
-    <subpackage name="system">
-        <allow pkg="org.apache.samza.config" />
-        <allow pkg="org.apache.samza.metrics" />
-        <allow pkg="org.apache.samza.serializers" />
-
-        <allow class="org.apache.samza.Partition" />
-        <allow class="org.apache.samza.SamzaException" />
-
-        <subpackage name="chooser">
-            <allow class="org.apache.samza.system.SystemStreamPartition" />
-            <allow class="org.apache.samza.system.IncomingMessageEnvelope" />
-        </subpackage>
-
-        <subpackage name="mock">
-            <allow pkg="org.apache.samza.system" />
-            <allow pkg="org.apache.samza.util" />
-        </subpackage>
-    </subpackage>
-
-    <subpackage name="util">
-        <allow pkg="org.apache.samza.metrics" />
-        <allow pkg="org.apache.samza.system" />
-
-        <allow class="org.apache.samza.Partition" />
-        <allow class="org.apache.samza.SamzaException" />
-    </subpackage>
-
-    <subpackage name="metrics">
-        <allow pkg="org.apache.samza.config" />
-        <allow pkg="org.apache.samza.util" />
-    </subpackage>
-
-    <subpackage name="task">
-        <allow pkg="org.apache.samza.config" />
-        <allow pkg="org.apache.samza.container" />
-        <allow pkg="org.apache.samza.metrics" />
-        <allow pkg="org.apache.samza.system" />
-    </subpackage>
-
-    <subpackage name="container">
-        <allow pkg="org.apache.samza.config" />
-        <allow pkg="org.apache.samza.coordinator.stream" />
-        <subpackage name="grouper">
-            <subpackage name="stream">
-                <allow pkg="org.apache.samza.container" />
-                <allow pkg="org.apache.samza.system" />
-            </subpackage>
-
-            <subpackage name="task">
-                <allow pkg="org.apache.samza.job" />
-            </subpackage>
-        </subpackage>
-    </subpackage>
-
-    <subpackage name="coordinator">
-        <allow pkg="org.apache.samza.checkpoint" />
-        <allow pkg="org.apache.samza.config" />
-        <allow pkg="org.apache.samza.metrics" />
-        <allow pkg="org.apache.samza.system" />
-        <allow pkg="org.apache.samza.serializers" />
-        <allow pkg="org.apache.samza.util" />
-
-        <allow class="org.apache.samza.Partition" />
-        <allow class="org.apache.samza.SamzaException" />
-    </subpackage>
-
-    <subpackage name="checkpoint">
-        <allow pkg="org.apache.samza.config" />
-        <allow pkg="org.apache.samza.container" />
-        <allow pkg="org.apache.samza.coordinator" />
-        <allow pkg="org.apache.samza.metrics" />
-        <allow pkg="org.apache.samza.system" />
-
-        <allow class="org.apache.samza.SamzaException" />
-    </subpackage>
-
-    <subpackage name="storage">
-        <allow pkg="org.apache.samza.container" />
-        <allow pkg="org.apache.samza.coordinator" />
-        <allow pkg="org.apache.samza.metrics" />
-        <allow pkg="org.apache.samza.serializers" />
-        <allow pkg="org.apache.samza.system" />
-        <allow pkg="org.apache.samza.task" />
-        <allow pkg="org.apache.samza.util" />
-        <allow pkg="org.apache.samza.job" />
-        <allow pkg="org.apache.samza.config" />
-        <allow pkg="joptsimple" />
-
-        <allow class="org.apache.samza.SamzaException" />
-        <allow class="org.apache.samza.Partition" />
-    </subpackage>
-
-    <subpackage name="logging">
-        <subpackage name="log4j">
-            <allow pkg="org.apache.samza.config" />
-            <allow pkg="org.apache.samza.coordinator" />
-            <allow pkg="org.apache.samza.job" />
-            <allow pkg="org.apache.samza.metrics" />
-            <allow pkg="org.apache.samza.system" />
-            <allow pkg="org.apache.samza.serializers" />
-            <allow pkg="org.apache.samza.util" />
-
-            <allow class="org.apache.samza.logging.log4j.serializers.LoggingEventJsonSerdeFactory" />
-            <allow class="org.apache.samza.logging.log4j.serializers.LoggingEventJsonSerde" />
-            <allow class="org.apache.samza.logging.log4j.serializers.LoggingEventStringSerdeFactory" />
-            <allow class="org.apache.samza.logging.log4j.serializers.LoggingEventStringSerde" />
-            <allow class="org.apache.samza.SamzaException" />
-        </subpackage>
-    </subpackage>
-
-    <subpackage name="test">
-        <subpackage name="integration">
-            <allow pkg="org.apache.samza.config" />
-            <allow pkg="org.apache.samza.container" />
-            <allow pkg="org.apache.samza.system" />
-            <allow pkg="org.apache.samza.storage" />
-            <allow pkg="org.apache.samza.task" />
-            <allow pkg="org.apache.samza.util" />
-        </subpackage>
-    </subpackage>
-
-</import-control>

http://git-wip-us.apache.org/repos/asf/samza/blob/01ee053e/samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamMessage.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamMessage.java b/samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamMessage.java
index e5ab4fb..6bd1bd3 100644
--- a/samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamMessage.java
+++ b/samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamMessage.java
@@ -211,7 +211,7 @@ public class CoordinatorStreamMessage {
 
   /**
    * The type of the message is used to convert a generic
-   * CoordinatorStreamMessage into a specific message, such as a SetConfig
+   * CoordinatorStreaMessage into a specific message, such as a SetConfig
    * message.
    *
    * @return The type of the message.
@@ -235,14 +235,14 @@ public class CoordinatorStreamMessage {
   }
 
   /**
-   * @return The username of a message.
+   * @return Whether the message signifies a delete or not.
    */
   public String getUsername() {
     return (String) this.messageMap.get("username");
   }
 
   /**
-   * @return The timestamp of a message.
+   * @return Whether the message signifies a delete or not.
    */
   public long getTimestamp() {
     return (Long) this.messageMap.get("timestamp");
@@ -254,7 +254,7 @@ public class CoordinatorStreamMessage {
   public Map<String, Object> getMessageMap() {
     if (!isDelete) {
       Map<String, Object> immutableMap = new HashMap<String, Object>(messageMap);
-      // To make sure the values is immutable, we overwrite it with an immutable version of the the values map.
+      // To make sure the values is not immutable, we overwrite it with an immutable version of the the values map.
       immutableMap.put("values", Collections.unmodifiableMap(getMessageValues()));
       return Collections.unmodifiableMap(immutableMap);
     } else {

http://git-wip-us.apache.org/repos/asf/samza/blob/01ee053e/samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamWriter.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamWriter.java b/samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamWriter.java
deleted file mode 100644
index f769756..0000000
--- a/samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamWriter.java
+++ /dev/null
@@ -1,128 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.coordinator.stream;
-
-import joptsimple.OptionSet;
-import org.apache.samza.config.Config;
-import org.apache.samza.metrics.MetricsRegistryMap;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-
-/**
- * This class writes control messages to the CoordinatorStream.
- * To use this class it, first, it should be initialized by the start() method,
- * and then use the sendMessage() function to send all the control messages needed.
- * Finally, the stop() method should be called.
- * The control messages are in the format of a (type, key, value) where:
- * type: defines the kind of message of the control message from the set {set-config}.
- * key: defines a key to associate with the value. This can be null as well for messages with no value
- * value: defines the value being sent along with the message. This can be null as well for messages with no value.
- */
-public class CoordinatorStreamWriter {
-
-  private static final Logger log = LoggerFactory.getLogger(CoordinatorStreamWriter.class);
-  public final static String SOURCE = "coordinator-stream-writer";
-  public static final String SET_CONFIG_TYPE = CoordinatorStreamMessage.SetConfig.TYPE;
-
-  private CoordinatorStreamSystemProducer coordinatorStreamSystemProducer;
-
-
-  public CoordinatorStreamWriter(Config config) {
-    coordinatorStreamSystemProducer = new CoordinatorStreamSystemFactory().getCoordinatorStreamSystemProducer(config, new MetricsRegistryMap());
-  }
-
-  /**
-   * This method initializes the writer by starting a coordinator stream producer.
-   */
-  public void start() {
-    coordinatorStreamSystemProducer.register(CoordinatorStreamWriter.SOURCE);
-    coordinatorStreamSystemProducer.start();
-    log.info("Started coordinator stream writer.");
-  }
-
-  /**
-   * This method stops the writer and closes the coordinator stream producer
-   */
-  public void stop() {
-    log.info("Stopping the coordinator stream producer.");
-    coordinatorStreamSystemProducer.stop();
-  }
-
-  /**
-   * This method sends a message to the coordinator stream. This creates a message containing (type,key,value).
-   * For example if you want to set the number of yarn containers to 3, you would use
-   * ("set-config", "yarn.container.count", "3").
-   *
-   * @param type  defines the kind of message of the control message from the set {"set-config"}.
-   * @param key   defines a key to associate with the value.  This can be null for messages with no key or value.
-   * @param value defines the value being sent along with the message. This can be null for messages with no value.
-   */
-  public void sendMessage(String type, String key, String value) {
-    //TODO: validate keys and values
-    if (type.equals(SET_CONFIG_TYPE)) {
-      sendSetConfigMessage(key, value);
-    } else {
-      throw new IllegalArgumentException("Type is invalid. The possible values are {" + SET_CONFIG_TYPE + "}");
-    }
-  }
-
-  /**
-   * This method sends message of type "set-config" to the coordinator stream
-   *
-   * @param key   defines the name of the configuration being set. For example, for setting the number of yarn containers,
-   *              the key is "yarn.container.count"
-   * @param value defines the value associated with the key. For example, if the key is "yarn.container.count" the value
-   *              is the new number of containers.
-   */
-  private void sendSetConfigMessage(String key, String value) {
-    log.info("sent SetConfig message with key = " + key + " and value = " + value);
-    coordinatorStreamSystemProducer.send(new CoordinatorStreamMessage.SetConfig(CoordinatorStreamWriter.SOURCE, key, value));
-  }
-
-  /**
-   * Main function for using the CoordinatorStreamWriter. The main function starts a CoordinatorStreamWriter
-   * and sends control messages.
-   * To run the code use the following command:
-   * {path to samza deployment}/samza/bin/run-coordinator-stream-writer.sh  --config-factory={config-factory} --config-path={path to config file of a job} --type={type of the message} --key={[optional] key of the message} --value={[optional] value of the message}
-   *
-   * @param args input arguments for running the writer. These arguments are:
-   *             "config-factory" = The config file factory
-   *             "config-path" = The path to config file of a job
-   *             "type" = type of the message being written
-   *             "key" = [optional] key of the message being written
-   *             "value" = [optional] value of the message being written
-   */
-  public static void main(String[] args) {
-    CoordinatorStreamWriterCommandLine cmdline = new CoordinatorStreamWriterCommandLine();
-    OptionSet options = cmdline.parser().parse(args);
-    Config config = cmdline.loadConfig(options);
-    String type = cmdline.loadType(options);
-    String key = cmdline.loadKey(options);
-    String value = cmdline.loadValue(options);
-
-    CoordinatorStreamWriter writer = new CoordinatorStreamWriter(config);
-    writer.start();
-    writer.sendMessage(type, key, value);
-    writer.stop();
-  }
-
-}
-

http://git-wip-us.apache.org/repos/asf/samza/blob/01ee053e/samza-core/src/main/scala/org/apache/samza/coordinator/stream/CoordinatorStreamWriterCommandLine.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/coordinator/stream/CoordinatorStreamWriterCommandLine.scala b/samza-core/src/main/scala/org/apache/samza/coordinator/stream/CoordinatorStreamWriterCommandLine.scala
deleted file mode 100644
index 0c17800..0000000
--- a/samza-core/src/main/scala/org/apache/samza/coordinator/stream/CoordinatorStreamWriterCommandLine.scala
+++ /dev/null
@@ -1,71 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.coordinator.stream
-
-import org.apache.samza.util.CommandLine
-import joptsimple.OptionSet
-
-class CoordinatorStreamWriterCommandLine extends CommandLine {
-
-  val messageType =
-    parser.accepts("type", "the type of the message being sent.")
-        .withRequiredArg
-        .ofType(classOf[java.lang.String])
-        .describedAs("Required field. This field is the type of the message being sent." +
-        " The possible values are {\"set-config\"}")
-
-
-  val messageKey =
-    parser.accepts("key", "the type of the message being sent")
-        .withRequiredArg
-        .ofType(classOf[java.lang.String])
-        .describedAs("key of the message")
-
-  val messageValue =
-    parser.accepts("value", "the type of the message being sent")
-        .withRequiredArg
-        .ofType(classOf[java.lang.String])
-        .describedAs("value of the message")
-
-  def loadType(options: OptionSet) = {
-    if (!options.has(messageType)) {
-      parser.printHelpOn(System.err)
-      System.exit(-1)
-    }
-    options.valueOf(messageType)
-  }
-
-  def loadKey(options: OptionSet): java.lang.String = {
-    if (options.has(messageKey)) {
-      options.valueOf(messageKey)
-    } else {
-      null
-    }
-  }
-
-  def loadValue(options: OptionSet) = {
-    var value: java.lang.String = null
-    if (options.has(messageValue)) {
-      value = options.valueOf(messageValue)
-    }
-
-    value
-  }
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/01ee053e/samza-core/src/test/java/org/apache/samza/coordinator/stream/MockCoordinatorStreamSystemFactory.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/coordinator/stream/MockCoordinatorStreamSystemFactory.java b/samza-core/src/test/java/org/apache/samza/coordinator/stream/MockCoordinatorStreamSystemFactory.java
index 84ae0b5..647cadb 100644
--- a/samza-core/src/test/java/org/apache/samza/coordinator/stream/MockCoordinatorStreamSystemFactory.java
+++ b/samza-core/src/test/java/org/apache/samza/coordinator/stream/MockCoordinatorStreamSystemFactory.java
@@ -24,18 +24,14 @@ import org.apache.samza.config.Config;
 import org.apache.samza.config.ConfigException;
 import org.apache.samza.metrics.MetricsRegistry;
 import org.apache.samza.system.SystemAdmin;
+import org.apache.samza.system.OutgoingMessageEnvelope;
 import org.apache.samza.system.SystemConsumer;
 import org.apache.samza.system.SystemFactory;
-import org.apache.samza.system.SystemStreamPartition;
 import org.apache.samza.system.SystemProducer;
-import org.apache.samza.system.OutgoingMessageEnvelope;
+import org.apache.samza.system.SystemStreamPartition;
 import org.apache.samza.util.SinglePartitionWithoutOffsetsSystemAdmin;
 import org.apache.samza.util.Util;
 
-import java.util.ArrayList;
-import java.util.List;
-
-
 /**
  * Helper for creating mock CoordinatorStreamConsumer and
  * CoordinatorStreamConsumer. The CoordinatorStreamConsumer is meant to just
@@ -46,7 +42,6 @@ public class MockCoordinatorStreamSystemFactory implements SystemFactory {
 
   private static SystemConsumer mockConsumer = null;
   private static boolean useCachedConsumer = false;
-
   public static void enableMockConsumerCache() {
     mockConsumer = null;
     useCachedConsumer = true;
@@ -59,10 +54,9 @@ public class MockCoordinatorStreamSystemFactory implements SystemFactory {
 
   /**
    * Returns a consumer that sends all configs to the coordinator stream.
-   *
    * @param config Along with the configs, you can pass checkpoints and changelog stream messages into the stream.
    *               The expected pattern is cp:source:taskname -> ssp,offset for checkpoint (Use sspToString util)
-   *               ch:source:taskname -> changelogPartition for changelog
+   *                                       ch:source:taskname -> changelogPartition for changelog
    *               Everything else is processed as normal config
    */
   public SystemConsumer getConsumer(String systemName, Config config, MetricsRegistry registry) {
@@ -86,10 +80,26 @@ public class MockCoordinatorStreamSystemFactory implements SystemFactory {
   }
 
   /**
-   * Returns a MockCoordinatorSystemProducer.
+   * Returns a no-op producer.
    */
   public SystemProducer getProducer(String systemName, Config config, MetricsRegistry registry) {
-    return new MockSystemProducer(null);
+    // A do-nothing producer.
+    return new SystemProducer() {
+      public void start() {
+      }
+
+      public void stop() {
+      }
+
+      public void register(String source) {
+      }
+
+      public void send(String source, OutgoingMessageEnvelope envelope) {
+      }
+
+      public void flush(String source) {
+      }
+    };
   }
 
   /**
@@ -105,62 +115,4 @@ public class MockCoordinatorStreamSystemFactory implements SystemFactory {
       // Do nothing.
     }
   }
-
-  protected static class MockSystemProducer implements SystemProducer {
-    private final String expectedSource;
-    private final List<OutgoingMessageEnvelope> envelopes;
-    private boolean started = false;
-    private boolean registered = false;
-    private boolean flushed = false;
-
-    public MockSystemProducer(String expectedSource) {
-      this.expectedSource = expectedSource;
-      this.envelopes = new ArrayList<OutgoingMessageEnvelope>();
-    }
-
-
-    public void start() {
-      started = true;
-    }
-
-    public void stop() {
-      started = false;
-    }
-
-    public void register(String source) {
-      registered = true;
-    }
-
-    public void send(String source, OutgoingMessageEnvelope envelope) {
-      envelopes.add(envelope);
-    }
-
-    public void flush(String source) {
-      flushed = true;
-    }
-
-    public List<OutgoingMessageEnvelope> getEnvelopes() {
-      return envelopes;
-    }
-
-    public boolean isStarted() {
-      return started;
-    }
-
-    public boolean isStopped() {
-      return !started;
-    }
-
-    public boolean isRegistered() {
-      return registered;
-    }
-
-    public boolean isFlushed() {
-      return flushed;
-    }
-
-    public String getExpectedSource() {
-      return expectedSource;
-    }
-  }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/01ee053e/samza-core/src/test/java/org/apache/samza/coordinator/stream/TestCoordinatorStreamSystemProducer.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/coordinator/stream/TestCoordinatorStreamSystemProducer.java b/samza-core/src/test/java/org/apache/samza/coordinator/stream/TestCoordinatorStreamSystemProducer.java
index 1ef07d0..68e3255 100644
--- a/samza-core/src/test/java/org/apache/samza/coordinator/stream/TestCoordinatorStreamSystemProducer.java
+++ b/samza-core/src/test/java/org/apache/samza/coordinator/stream/TestCoordinatorStreamSystemProducer.java
@@ -23,6 +23,7 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 
+import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
 
@@ -30,6 +31,7 @@ import org.apache.samza.SamzaException;
 import org.apache.samza.serializers.model.SamzaObjectMapper;
 import org.apache.samza.system.SystemAdmin;
 import org.apache.samza.system.OutgoingMessageEnvelope;
+import org.apache.samza.system.SystemProducer;
 import org.apache.samza.system.SystemStream;
 import org.apache.samza.util.SinglePartitionWithoutOffsetsSystemAdmin;
 import org.codehaus.jackson.type.TypeReference;
@@ -40,7 +42,7 @@ public class TestCoordinatorStreamSystemProducer {
   public void testCoordinatorStreamSystemProducer() {
     String source = "source";
     SystemStream systemStream = new SystemStream("system", "stream");
-    MockCoordinatorSystemProducer systemProducer = new MockCoordinatorSystemProducer(source);
+    MockSystemProducer systemProducer = new MockSystemProducer(source);
     MockSystemAdmin systemAdmin = new MockSystemAdmin();
     CoordinatorStreamSystemProducer producer = new CoordinatorStreamSystemProducer(systemStream, systemProducer, systemAdmin);
     CoordinatorStreamMessage.SetConfig setConfig1 = new CoordinatorStreamMessage.SetConfig(source, "job.name", "my-job-name");
@@ -91,22 +93,59 @@ public class TestCoordinatorStreamSystemProducer {
     }
   }
 
-  private static class MockCoordinatorSystemProducer extends MockCoordinatorStreamSystemFactory.MockSystemProducer {
+  private static class MockSystemProducer implements SystemProducer {
+    private final String expectedSource;
+    private final List<OutgoingMessageEnvelope> envelopes;
+    private boolean started = false;
+    private boolean stopped = false;
+    private boolean registered = false;
+    private boolean flushed = false;
 
-    public MockCoordinatorSystemProducer(String expectedSource) {
-      super(expectedSource);
+    public MockSystemProducer(String expectedSource) {
+      this.expectedSource = expectedSource;
+      this.envelopes = new ArrayList<OutgoingMessageEnvelope>();
+    }
+
+    public void start() {
+      started = true;
+    }
+
+    public void stop() {
+      stopped = true;
     }
 
-    @Override
     public void register(String source) {
-      assertEquals(super.getExpectedSource(), source);
-      super.register(source);
+      assertEquals(expectedSource, source);
+      registered = true;
+    }
+
+    public void send(String source, OutgoingMessageEnvelope envelope) {
+      envelopes.add(envelope);
     }
 
-    @Override
     public void flush(String source) {
-      assertEquals(super.getExpectedSource(), source);
-      super.flush(source);
+      assertEquals(expectedSource, source);
+      flushed = true;
+    }
+
+    public List<OutgoingMessageEnvelope> getEnvelopes() {
+      return envelopes;
+    }
+
+    public boolean isStarted() {
+      return started;
+    }
+
+    public boolean isStopped() {
+      return stopped;
+    }
+
+    public boolean isRegistered() {
+      return registered;
+    }
+
+    public boolean isFlushed() {
+      return flushed;
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/01ee053e/samza-core/src/test/java/org/apache/samza/coordinator/stream/TestCoordinatorStreamWriter.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/coordinator/stream/TestCoordinatorStreamWriter.java b/samza-core/src/test/java/org/apache/samza/coordinator/stream/TestCoordinatorStreamWriter.java
deleted file mode 100644
index c484660..0000000
--- a/samza-core/src/test/java/org/apache/samza/coordinator/stream/TestCoordinatorStreamWriter.java
+++ /dev/null
@@ -1,166 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.coordinator.stream;
-
-import org.apache.samza.SamzaException;
-import org.apache.samza.config.Config;
-import org.apache.samza.config.MapConfig;
-import org.apache.samza.serializers.model.SamzaObjectMapper;
-import org.apache.samza.system.OutgoingMessageEnvelope;
-import org.codehaus.jackson.type.TypeReference;
-import org.junit.Test;
-
-import java.lang.reflect.Field;
-import java.lang.reflect.InvocationTargetException;
-import java.lang.reflect.Method;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import static org.junit.Assert.*;
-
-/**
- * This class is a unit test for the CoordinatorStreamWriter class.
- */
-public class TestCoordinatorStreamWriter {
-
-  private CoordinatorStreamWriter coordinatorStreamWriter;
-  private MockCoordinatorStreamSystemFactory.MockSystemProducer systemProducer;
-
-  @Test
-  public void testCoordinatorStream() {
-
-    Map<String, String> configMap = new HashMap<>();
-    configMap.put("systems.coordinatorStreamWriter.samza.factory", "org.apache.samza.coordinator.stream.MockCoordinatorStreamSystemFactory");
-    configMap.put("job.name", "coordinator-stream-writer-test");
-    Config config = new MapConfig(configMap);
-    coordinatorStreamWriter = new CoordinatorStreamWriter(config);
-    boolean exceptionHappened = false;
-
-    try {
-
-      //get coordinator system producer
-      Field coordinatorProducerField = coordinatorStreamWriter.getClass().getDeclaredField("coordinatorStreamSystemProducer");
-      coordinatorProducerField.setAccessible(true);
-      assertNotNull(coordinatorProducerField.get(coordinatorStreamWriter));
-      CoordinatorStreamSystemProducer coordinatorStreamSystemProducer = (CoordinatorStreamSystemProducer) coordinatorProducerField.get(coordinatorStreamWriter);
-
-      //get mock system producer
-      Field systemProducerField = coordinatorStreamSystemProducer.getClass().getDeclaredField("systemProducer");
-      systemProducerField.setAccessible(true);
-      systemProducer = (MockCoordinatorStreamSystemFactory.MockSystemProducer) systemProducerField.get(coordinatorStreamSystemProducer);
-
-      testStart();
-      testSendMessage();
-      testStop();
-
-
-    } catch (NoSuchFieldException | IllegalAccessException | NoSuchMethodException | InvocationTargetException e) {
-      e.printStackTrace();
-      exceptionHappened = true;
-    }
-
-    assertFalse(exceptionHappened);
-
-
-  }
-
-
-  public void testStart() throws NoSuchFieldException, IllegalAccessException {
-
-    //checks before starting
-    assertFalse(systemProducer.isStarted());
-
-    //start and check if start has been done successfully
-    coordinatorStreamWriter.start();
-    assertTrue(systemProducer.isStarted());
-
-  }
-
-  public void testStop() throws NoSuchFieldException, IllegalAccessException {
-
-    //checks before stopping
-    assertTrue(systemProducer.isStarted());
-
-    //stop and check if stop has been done correctly
-    coordinatorStreamWriter.stop();
-    assertTrue(systemProducer.isStopped());
-  }
-
-  public void testSendMessage() throws NoSuchMethodException, InvocationTargetException, IllegalAccessException {
-
-    //check a correct message
-    assertEquals(0, systemProducer.getEnvelopes().size());
-    coordinatorStreamWriter.sendMessage("set-config", "key0", "value0");
-    assertEquals(1, systemProducer.getEnvelopes().size());
-
-    //check invalid input is handled
-    boolean exceptionHappened = false;
-    try {
-      coordinatorStreamWriter.sendMessage("invalid-type", "key-invalid", "value-invalid");
-    } catch (IllegalArgumentException e) {
-      exceptionHappened = true;
-    }
-    assertTrue(exceptionHappened);
-    assertEquals(1, systemProducer.getEnvelopes().size());
-
-
-    //check sendSetConfigMessage method works correctly
-    Class[] sendArgs = {String.class, String.class};
-    Method sendSetConfigMethod = coordinatorStreamWriter.getClass().getDeclaredMethod("sendSetConfigMessage", sendArgs);
-    sendSetConfigMethod.setAccessible(true);
-    sendSetConfigMethod.invoke(coordinatorStreamWriter, "key1", "value1");
-    assertEquals(2, systemProducer.getEnvelopes().size());
-
-
-    //check the messages are correct
-    List<OutgoingMessageEnvelope> envelopes = systemProducer.getEnvelopes();
-    OutgoingMessageEnvelope envelope0 = envelopes.get(0);
-    OutgoingMessageEnvelope envelope1 = envelopes.get(1);
-    TypeReference<Object[]> keyRef = new TypeReference<Object[]>() {
-    };
-    TypeReference<Map<String, Object>> msgRef = new TypeReference<Map<String, Object>>() {
-    };
-    assertEquals(2, envelopes.size());
-
-    assertEquals("key0", deserialize((byte[]) envelope0.getKey(), keyRef)[CoordinatorStreamMessage.KEY_INDEX]);
-    Map<String, String> values = (Map<String, String>) deserialize((byte[]) envelope0.getMessage(), msgRef).get("values");
-    assertEquals("value0", values.get("value"));
-
-    assertEquals("key1", deserialize((byte[]) envelope1.getKey(), keyRef)[CoordinatorStreamMessage.KEY_INDEX]);
-    values = (Map<String, String>) deserialize((byte[]) envelope1.getMessage(), msgRef).get("values");
-    assertEquals("value1", values.get("value"));
-  }
-
-  private <T> T deserialize(byte[] bytes, TypeReference<T> reference) {
-    try {
-      if (bytes != null) {
-        String valueStr = new String((byte[]) bytes, "UTF-8");
-        return SamzaObjectMapper.getObjectMapper().readValue(valueStr, reference);
-      }
-    } catch (Exception e) {
-      throw new SamzaException(e);
-    }
-
-    return null;
-  }
-
-}
-

http://git-wip-us.apache.org/repos/asf/samza/blob/01ee053e/samza-shell/src/main/bash/run-coordinator-stream-writer.sh
----------------------------------------------------------------------
diff --git a/samza-shell/src/main/bash/run-coordinator-stream-writer.sh b/samza-shell/src/main/bash/run-coordinator-stream-writer.sh
deleted file mode 100644
index d2249dd..0000000
--- a/samza-shell/src/main/bash/run-coordinator-stream-writer.sh
+++ /dev/null
@@ -1,21 +0,0 @@
-#!/bin/bash
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-#
-#   http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied.  See the License for the
-# specific language governing permissions and limitations
-# under the License.
-
-[[ $JAVA_OPTS != *-Dlog4j.configuration* ]] && export JAVA_OPTS="$JAVA_OPTS -Dlog4j.configuration=file:$(dirname $0)/log4j-console.xml"
-
-exec $(dirname $0)/run-class.sh org.apache.samza.coordinator.stream.CoordinatorStreamWriter "$@"