You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by na...@apache.org on 2015/07/16 23:23:51 UTC
samza git commit: SAMZA-704 : Create a tool to write coordinator
stream
Repository: samza
Updated Branches:
refs/heads/master 81542ecf4 -> a4b831d3d
SAMZA-704 : Create a tool to write coordinator stream
Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/a4b831d3
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/a4b831d3
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/a4b831d3
Branch: refs/heads/master
Commit: a4b831d3d3a555bc3cca2b0819813c6fad8bd480
Parents: 81542ec
Author: Navina <na...@gmail.com>
Authored: Thu Jul 16 14:20:57 2015 -0700
Committer: Navina <na...@gmail.com>
Committed: Thu Jul 16 14:20:57 2015 -0700
----------------------------------------------------------------------
checkstyle/import-control.xml | 1 +
checkstyle/import-control.xml.orig | 183 +++++++++++++++++++
.../stream/CoordinatorStreamMessage.java | 8 +-
.../stream/CoordinatorStreamWriter.java | 128 +++++++++++++
.../CoordinatorStreamWriterCommandLine.scala | 71 +++++++
.../MockCoordinatorStreamSystemFactory.java | 90 ++++++---
.../TestCoordinatorStreamSystemProducer.java | 59 +-----
.../stream/TestCoordinatorStreamWriter.java | 166 +++++++++++++++++
.../main/bash/run-coordinator-stream-writer.sh | 21 +++
9 files changed, 653 insertions(+), 74 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/samza/blob/a4b831d3/checkstyle/import-control.xml
----------------------------------------------------------------------
diff --git a/checkstyle/import-control.xml b/checkstyle/import-control.xml
index eef3370..6654319 100644
--- a/checkstyle/import-control.xml
+++ b/checkstyle/import-control.xml
@@ -123,6 +123,7 @@
<allow class="org.apache.samza.Partition" />
<allow class="org.apache.samza.SamzaException" />
+ <allow class="joptsimple.OptionSet" />
</subpackage>
<subpackage name="checkpoint">
http://git-wip-us.apache.org/repos/asf/samza/blob/a4b831d3/checkstyle/import-control.xml.orig
----------------------------------------------------------------------
diff --git a/checkstyle/import-control.xml.orig b/checkstyle/import-control.xml.orig
new file mode 100644
index 0000000..eef3370
--- /dev/null
+++ b/checkstyle/import-control.xml.orig
@@ -0,0 +1,183 @@
+<!DOCTYPE import-control PUBLIC
+ "-//Puppy Crawl//DTD Import Control 1.1//EN"
+ "http://www.puppycrawl.com/dtds/import_control_1_1.dtd">
+<!--
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+-->
+<import-control pkg="org.apache.samza">
+
+ <!-- THINK HARD ABOUT THE LAYERING OF THE PROJECT BEFORE CHANGING THIS FILE -->
+
+ <!-- common library dependencies -->
+ <allow pkg="java" />
+ <allow pkg="javax.management" />
+ <allow pkg="org.slf4j" />
+ <allow pkg="org.junit" />
+ <allow pkg="org.codehaus" />
+ <allow pkg="org.mockito" />
+ <allow pkg="org.apache.log4j" />
+ <allow pkg="org.apache.kafka" />
+
+ <subpackage name="config">
+ <allow class="org.apache.samza.SamzaException" />
+ </subpackage>
+
+ <subpackage name="serializers">
+ <allow pkg="org.apache.samza.config" />
+
+ <subpackage name="model">
+ <allow pkg="org.apache.samza.job.model" />
+ <allow pkg="org.apache.samza.util" />
+
+ <allow class="org.apache.samza.Partition" />
+ <allow class="org.apache.samza.container.TaskName" />
+ <allow class="org.apache.samza.system.SystemStreamPartition" />
+ </subpackage>
+ </subpackage>
+
+ <subpackage name="job">
+ <allow pkg="org.apache.samza.config" />
+
+ <subpackage name="model">
+ <allow class="org.apache.samza.Partition" />
+ <allow class="org.apache.samza.container.TaskName" />
+ <allow class="org.apache.samza.system.SystemStreamPartition" />
+ <allow class="org.apache.samza.container.LocalityManager" />
+ </subpackage>
+ </subpackage>
+
+ <subpackage name="system">
+ <allow pkg="org.apache.samza.config" />
+ <allow pkg="org.apache.samza.metrics" />
+ <allow pkg="org.apache.samza.serializers" />
+
+ <allow class="org.apache.samza.Partition" />
+ <allow class="org.apache.samza.SamzaException" />
+
+ <subpackage name="chooser">
+ <allow class="org.apache.samza.system.SystemStreamPartition" />
+ <allow class="org.apache.samza.system.IncomingMessageEnvelope" />
+ </subpackage>
+
+ <subpackage name="mock">
+ <allow pkg="org.apache.samza.system" />
+ <allow pkg="org.apache.samza.util" />
+ </subpackage>
+ </subpackage>
+
+ <subpackage name="util">
+ <allow pkg="org.apache.samza.metrics" />
+ <allow pkg="org.apache.samza.system" />
+
+ <allow class="org.apache.samza.Partition" />
+ <allow class="org.apache.samza.SamzaException" />
+ </subpackage>
+
+ <subpackage name="metrics">
+ <allow pkg="org.apache.samza.config" />
+ <allow pkg="org.apache.samza.util" />
+ </subpackage>
+
+ <subpackage name="task">
+ <allow pkg="org.apache.samza.config" />
+ <allow pkg="org.apache.samza.container" />
+ <allow pkg="org.apache.samza.metrics" />
+ <allow pkg="org.apache.samza.system" />
+ </subpackage>
+
+ <subpackage name="container">
+ <allow pkg="org.apache.samza.config" />
+ <allow pkg="org.apache.samza.coordinator.stream" />
+ <subpackage name="grouper">
+ <subpackage name="stream">
+ <allow pkg="org.apache.samza.container" />
+ <allow pkg="org.apache.samza.system" />
+ </subpackage>
+
+ <subpackage name="task">
+ <allow pkg="org.apache.samza.job" />
+ </subpackage>
+ </subpackage>
+ </subpackage>
+
+ <subpackage name="coordinator">
+ <allow pkg="org.apache.samza.checkpoint" />
+ <allow pkg="org.apache.samza.config" />
+ <allow pkg="org.apache.samza.metrics" />
+ <allow pkg="org.apache.samza.system" />
+ <allow pkg="org.apache.samza.serializers" />
+ <allow pkg="org.apache.samza.util" />
+
+ <allow class="org.apache.samza.Partition" />
+ <allow class="org.apache.samza.SamzaException" />
+ </subpackage>
+
+ <subpackage name="checkpoint">
+ <allow pkg="org.apache.samza.config" />
+ <allow pkg="org.apache.samza.container" />
+ <allow pkg="org.apache.samza.coordinator" />
+ <allow pkg="org.apache.samza.metrics" />
+ <allow pkg="org.apache.samza.system" />
+
+ <allow class="org.apache.samza.SamzaException" />
+ </subpackage>
+
+ <subpackage name="storage">
+ <allow pkg="org.apache.samza.container" />
+ <allow pkg="org.apache.samza.coordinator" />
+ <allow pkg="org.apache.samza.metrics" />
+ <allow pkg="org.apache.samza.serializers" />
+ <allow pkg="org.apache.samza.system" />
+ <allow pkg="org.apache.samza.task" />
+ <allow pkg="org.apache.samza.util" />
+ <allow pkg="org.apache.samza.job" />
+ <allow pkg="org.apache.samza.config" />
+ <allow pkg="joptsimple" />
+
+ <allow class="org.apache.samza.SamzaException" />
+ <allow class="org.apache.samza.Partition" />
+ </subpackage>
+
+ <subpackage name="logging">
+ <subpackage name="log4j">
+ <allow pkg="org.apache.samza.config" />
+ <allow pkg="org.apache.samza.coordinator" />
+ <allow pkg="org.apache.samza.job" />
+ <allow pkg="org.apache.samza.metrics" />
+ <allow pkg="org.apache.samza.system" />
+ <allow pkg="org.apache.samza.serializers" />
+ <allow pkg="org.apache.samza.util" />
+
+ <allow class="org.apache.samza.logging.log4j.serializers.LoggingEventJsonSerdeFactory" />
+ <allow class="org.apache.samza.logging.log4j.serializers.LoggingEventJsonSerde" />
+ <allow class="org.apache.samza.logging.log4j.serializers.LoggingEventStringSerdeFactory" />
+ <allow class="org.apache.samza.logging.log4j.serializers.LoggingEventStringSerde" />
+ <allow class="org.apache.samza.SamzaException" />
+ </subpackage>
+ </subpackage>
+
+ <subpackage name="test">
+ <subpackage name="integration">
+ <allow pkg="org.apache.samza.config" />
+ <allow pkg="org.apache.samza.container" />
+ <allow pkg="org.apache.samza.system" />
+ <allow pkg="org.apache.samza.storage" />
+ <allow pkg="org.apache.samza.task" />
+ <allow pkg="org.apache.samza.util" />
+ </subpackage>
+ </subpackage>
+
+</import-control>
http://git-wip-us.apache.org/repos/asf/samza/blob/a4b831d3/samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamMessage.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamMessage.java b/samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamMessage.java
index 6bd1bd3..e5ab4fb 100644
--- a/samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamMessage.java
+++ b/samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamMessage.java
@@ -211,7 +211,7 @@ public class CoordinatorStreamMessage {
/**
* The type of the message is used to convert a generic
- * CoordinatorStreaMessage into a specific message, such as a SetConfig
+ * CoordinatorStreamMessage into a specific message, such as a SetConfig
* message.
*
* @return The type of the message.
@@ -235,14 +235,14 @@ public class CoordinatorStreamMessage {
}
/**
- * @return Whether the message signifies a delete or not.
+ * @return The username of a message.
*/
public String getUsername() {
return (String) this.messageMap.get("username");
}
/**
- * @return Whether the message signifies a delete or not.
+ * @return The timestamp of a message.
*/
public long getTimestamp() {
return (Long) this.messageMap.get("timestamp");
@@ -254,7 +254,7 @@ public class CoordinatorStreamMessage {
public Map<String, Object> getMessageMap() {
if (!isDelete) {
Map<String, Object> immutableMap = new HashMap<String, Object>(messageMap);
- // To make sure the values is not immutable, we overwrite it with an immutable version of the the values map.
+ // To make sure the values is immutable, we overwrite it with an immutable version of the the values map.
immutableMap.put("values", Collections.unmodifiableMap(getMessageValues()));
return Collections.unmodifiableMap(immutableMap);
} else {
http://git-wip-us.apache.org/repos/asf/samza/blob/a4b831d3/samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamWriter.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamWriter.java b/samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamWriter.java
new file mode 100644
index 0000000..f769756
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamWriter.java
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.coordinator.stream;
+
+import joptsimple.OptionSet;
+import org.apache.samza.config.Config;
+import org.apache.samza.metrics.MetricsRegistryMap;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * This class writes control messages to the CoordinatorStream.
+ * To use this class it, first, it should be initialized by the start() method,
+ * and then use the sendMessage() function to send all the control messages needed.
+ * Finally, the stop() method should be called.
+ * The control messages are in the format of a (type, key, value) where:
+ * type: defines the kind of message of the control message from the set {set-config}.
+ * key: defines a key to associate with the value. This can be null as well for messages with no value
+ * value: defines the value being sent along with the message. This can be null as well for messages with no value.
+ */
+public class CoordinatorStreamWriter {
+
+ private static final Logger log = LoggerFactory.getLogger(CoordinatorStreamWriter.class);
+ public final static String SOURCE = "coordinator-stream-writer";
+ public static final String SET_CONFIG_TYPE = CoordinatorStreamMessage.SetConfig.TYPE;
+
+ private CoordinatorStreamSystemProducer coordinatorStreamSystemProducer;
+
+
+ public CoordinatorStreamWriter(Config config) {
+ coordinatorStreamSystemProducer = new CoordinatorStreamSystemFactory().getCoordinatorStreamSystemProducer(config, new MetricsRegistryMap());
+ }
+
+ /**
+ * This method initializes the writer by starting a coordinator stream producer.
+ */
+ public void start() {
+ coordinatorStreamSystemProducer.register(CoordinatorStreamWriter.SOURCE);
+ coordinatorStreamSystemProducer.start();
+ log.info("Started coordinator stream writer.");
+ }
+
+ /**
+ * This method stops the writer and closes the coordinator stream producer
+ */
+ public void stop() {
+ log.info("Stopping the coordinator stream producer.");
+ coordinatorStreamSystemProducer.stop();
+ }
+
+ /**
+ * This method sends a message to the coordinator stream. This creates a message containing (type,key,value).
+ * For example if you want to set the number of yarn containers to 3, you would use
+ * ("set-config", "yarn.container.count", "3").
+ *
+ * @param type defines the kind of message of the control message from the set {"set-config"}.
+ * @param key defines a key to associate with the value. This can be null for messages with no key or value.
+ * @param value defines the value being sent along with the message. This can be null for messages with no value.
+ */
+ public void sendMessage(String type, String key, String value) {
+ //TODO: validate keys and values
+ if (type.equals(SET_CONFIG_TYPE)) {
+ sendSetConfigMessage(key, value);
+ } else {
+ throw new IllegalArgumentException("Type is invalid. The possible values are {" + SET_CONFIG_TYPE + "}");
+ }
+ }
+
+ /**
+ * This method sends message of type "set-config" to the coordinator stream
+ *
+ * @param key defines the name of the configuration being set. For example, for setting the number of yarn containers,
+ * the key is "yarn.container.count"
+ * @param value defines the value associated with the key. For example, if the key is "yarn.container.count" the value
+ * is the new number of containers.
+ */
+ private void sendSetConfigMessage(String key, String value) {
+ log.info("sent SetConfig message with key = " + key + " and value = " + value);
+ coordinatorStreamSystemProducer.send(new CoordinatorStreamMessage.SetConfig(CoordinatorStreamWriter.SOURCE, key, value));
+ }
+
+ /**
+ * Main function for using the CoordinatorStreamWriter. The main function starts a CoordinatorStreamWriter
+ * and sends control messages.
+ * To run the code use the following command:
+ * {path to samza deployment}/samza/bin/run-coordinator-stream-writer.sh --config-factory={config-factory} --config-path={path to config file of a job} --type={type of the message} --key={[optional] key of the message} --value={[optional] value of the message}
+ *
+ * @param args input arguments for running the writer. These arguments are:
+ * "config-factory" = The config file factory
+ * "config-path" = The path to config file of a job
+ * "type" = type of the message being written
+ * "key" = [optional] key of the message being written
+ * "value" = [optional] value of the message being written
+ */
+ public static void main(String[] args) {
+ CoordinatorStreamWriterCommandLine cmdline = new CoordinatorStreamWriterCommandLine();
+ OptionSet options = cmdline.parser().parse(args);
+ Config config = cmdline.loadConfig(options);
+ String type = cmdline.loadType(options);
+ String key = cmdline.loadKey(options);
+ String value = cmdline.loadValue(options);
+
+ CoordinatorStreamWriter writer = new CoordinatorStreamWriter(config);
+ writer.start();
+ writer.sendMessage(type, key, value);
+ writer.stop();
+ }
+
+}
+
http://git-wip-us.apache.org/repos/asf/samza/blob/a4b831d3/samza-core/src/main/scala/org/apache/samza/coordinator/stream/CoordinatorStreamWriterCommandLine.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/coordinator/stream/CoordinatorStreamWriterCommandLine.scala b/samza-core/src/main/scala/org/apache/samza/coordinator/stream/CoordinatorStreamWriterCommandLine.scala
new file mode 100644
index 0000000..0c17800
--- /dev/null
+++ b/samza-core/src/main/scala/org/apache/samza/coordinator/stream/CoordinatorStreamWriterCommandLine.scala
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.coordinator.stream
+
+import org.apache.samza.util.CommandLine
+import joptsimple.OptionSet
+
+class CoordinatorStreamWriterCommandLine extends CommandLine {
+
+ val messageType =
+ parser.accepts("type", "the type of the message being sent.")
+ .withRequiredArg
+ .ofType(classOf[java.lang.String])
+ .describedAs("Required field. This field is the type of the message being sent." +
+ " The possible values are {\"set-config\"}")
+
+
+ val messageKey =
+ parser.accepts("key", "the type of the message being sent")
+ .withRequiredArg
+ .ofType(classOf[java.lang.String])
+ .describedAs("key of the message")
+
+ val messageValue =
+ parser.accepts("value", "the type of the message being sent")
+ .withRequiredArg
+ .ofType(classOf[java.lang.String])
+ .describedAs("value of the message")
+
+ def loadType(options: OptionSet) = {
+ if (!options.has(messageType)) {
+ parser.printHelpOn(System.err)
+ System.exit(-1)
+ }
+ options.valueOf(messageType)
+ }
+
+ def loadKey(options: OptionSet): java.lang.String = {
+ if (options.has(messageKey)) {
+ options.valueOf(messageKey)
+ } else {
+ null
+ }
+ }
+
+ def loadValue(options: OptionSet) = {
+ var value: java.lang.String = null
+ if (options.has(messageValue)) {
+ value = options.valueOf(messageValue)
+ }
+
+ value
+ }
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/a4b831d3/samza-core/src/test/java/org/apache/samza/coordinator/stream/MockCoordinatorStreamSystemFactory.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/coordinator/stream/MockCoordinatorStreamSystemFactory.java b/samza-core/src/test/java/org/apache/samza/coordinator/stream/MockCoordinatorStreamSystemFactory.java
index 647cadb..84ae0b5 100644
--- a/samza-core/src/test/java/org/apache/samza/coordinator/stream/MockCoordinatorStreamSystemFactory.java
+++ b/samza-core/src/test/java/org/apache/samza/coordinator/stream/MockCoordinatorStreamSystemFactory.java
@@ -24,14 +24,18 @@ import org.apache.samza.config.Config;
import org.apache.samza.config.ConfigException;
import org.apache.samza.metrics.MetricsRegistry;
import org.apache.samza.system.SystemAdmin;
-import org.apache.samza.system.OutgoingMessageEnvelope;
import org.apache.samza.system.SystemConsumer;
import org.apache.samza.system.SystemFactory;
-import org.apache.samza.system.SystemProducer;
import org.apache.samza.system.SystemStreamPartition;
+import org.apache.samza.system.SystemProducer;
+import org.apache.samza.system.OutgoingMessageEnvelope;
import org.apache.samza.util.SinglePartitionWithoutOffsetsSystemAdmin;
import org.apache.samza.util.Util;
+import java.util.ArrayList;
+import java.util.List;
+
+
/**
* Helper for creating mock CoordinatorStreamConsumer and
* CoordinatorStreamConsumer. The CoordinatorStreamConsumer is meant to just
@@ -42,6 +46,7 @@ public class MockCoordinatorStreamSystemFactory implements SystemFactory {
private static SystemConsumer mockConsumer = null;
private static boolean useCachedConsumer = false;
+
public static void enableMockConsumerCache() {
mockConsumer = null;
useCachedConsumer = true;
@@ -54,9 +59,10 @@ public class MockCoordinatorStreamSystemFactory implements SystemFactory {
/**
* Returns a consumer that sends all configs to the coordinator stream.
+ *
* @param config Along with the configs, you can pass checkpoints and changelog stream messages into the stream.
* The expected pattern is cp:source:taskname -> ssp,offset for checkpoint (Use sspToString util)
- * ch:source:taskname -> changelogPartition for changelog
+ * ch:source:taskname -> changelogPartition for changelog
* Everything else is processed as normal config
*/
public SystemConsumer getConsumer(String systemName, Config config, MetricsRegistry registry) {
@@ -80,26 +86,10 @@ public class MockCoordinatorStreamSystemFactory implements SystemFactory {
}
/**
- * Returns a no-op producer.
+ * Returns a MockCoordinatorSystemProducer.
*/
public SystemProducer getProducer(String systemName, Config config, MetricsRegistry registry) {
- // A do-nothing producer.
- return new SystemProducer() {
- public void start() {
- }
-
- public void stop() {
- }
-
- public void register(String source) {
- }
-
- public void send(String source, OutgoingMessageEnvelope envelope) {
- }
-
- public void flush(String source) {
- }
- };
+ return new MockSystemProducer(null);
}
/**
@@ -115,4 +105,62 @@ public class MockCoordinatorStreamSystemFactory implements SystemFactory {
// Do nothing.
}
}
+
+ protected static class MockSystemProducer implements SystemProducer {
+ private final String expectedSource;
+ private final List<OutgoingMessageEnvelope> envelopes;
+ private boolean started = false;
+ private boolean registered = false;
+ private boolean flushed = false;
+
+ public MockSystemProducer(String expectedSource) {
+ this.expectedSource = expectedSource;
+ this.envelopes = new ArrayList<OutgoingMessageEnvelope>();
+ }
+
+
+ public void start() {
+ started = true;
+ }
+
+ public void stop() {
+ started = false;
+ }
+
+ public void register(String source) {
+ registered = true;
+ }
+
+ public void send(String source, OutgoingMessageEnvelope envelope) {
+ envelopes.add(envelope);
+ }
+
+ public void flush(String source) {
+ flushed = true;
+ }
+
+ public List<OutgoingMessageEnvelope> getEnvelopes() {
+ return envelopes;
+ }
+
+ public boolean isStarted() {
+ return started;
+ }
+
+ public boolean isStopped() {
+ return !started;
+ }
+
+ public boolean isRegistered() {
+ return registered;
+ }
+
+ public boolean isFlushed() {
+ return flushed;
+ }
+
+ public String getExpectedSource() {
+ return expectedSource;
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/samza/blob/a4b831d3/samza-core/src/test/java/org/apache/samza/coordinator/stream/TestCoordinatorStreamSystemProducer.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/coordinator/stream/TestCoordinatorStreamSystemProducer.java b/samza-core/src/test/java/org/apache/samza/coordinator/stream/TestCoordinatorStreamSystemProducer.java
index 68e3255..1ef07d0 100644
--- a/samza-core/src/test/java/org/apache/samza/coordinator/stream/TestCoordinatorStreamSystemProducer.java
+++ b/samza-core/src/test/java/org/apache/samza/coordinator/stream/TestCoordinatorStreamSystemProducer.java
@@ -23,7 +23,6 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
-import java.util.ArrayList;
import java.util.List;
import java.util.Map;
@@ -31,7 +30,6 @@ import org.apache.samza.SamzaException;
import org.apache.samza.serializers.model.SamzaObjectMapper;
import org.apache.samza.system.SystemAdmin;
import org.apache.samza.system.OutgoingMessageEnvelope;
-import org.apache.samza.system.SystemProducer;
import org.apache.samza.system.SystemStream;
import org.apache.samza.util.SinglePartitionWithoutOffsetsSystemAdmin;
import org.codehaus.jackson.type.TypeReference;
@@ -42,7 +40,7 @@ public class TestCoordinatorStreamSystemProducer {
public void testCoordinatorStreamSystemProducer() {
String source = "source";
SystemStream systemStream = new SystemStream("system", "stream");
- MockSystemProducer systemProducer = new MockSystemProducer(source);
+ MockCoordinatorSystemProducer systemProducer = new MockCoordinatorSystemProducer(source);
MockSystemAdmin systemAdmin = new MockSystemAdmin();
CoordinatorStreamSystemProducer producer = new CoordinatorStreamSystemProducer(systemStream, systemProducer, systemAdmin);
CoordinatorStreamMessage.SetConfig setConfig1 = new CoordinatorStreamMessage.SetConfig(source, "job.name", "my-job-name");
@@ -93,59 +91,22 @@ public class TestCoordinatorStreamSystemProducer {
}
}
- private static class MockSystemProducer implements SystemProducer {
- private final String expectedSource;
- private final List<OutgoingMessageEnvelope> envelopes;
- private boolean started = false;
- private boolean stopped = false;
- private boolean registered = false;
- private boolean flushed = false;
+ private static class MockCoordinatorSystemProducer extends MockCoordinatorStreamSystemFactory.MockSystemProducer {
- public MockSystemProducer(String expectedSource) {
- this.expectedSource = expectedSource;
- this.envelopes = new ArrayList<OutgoingMessageEnvelope>();
- }
-
- public void start() {
- started = true;
- }
-
- public void stop() {
- stopped = true;
+ public MockCoordinatorSystemProducer(String expectedSource) {
+ super(expectedSource);
}
+ @Override
public void register(String source) {
- assertEquals(expectedSource, source);
- registered = true;
- }
-
- public void send(String source, OutgoingMessageEnvelope envelope) {
- envelopes.add(envelope);
+ assertEquals(super.getExpectedSource(), source);
+ super.register(source);
}
+ @Override
public void flush(String source) {
- assertEquals(expectedSource, source);
- flushed = true;
- }
-
- public List<OutgoingMessageEnvelope> getEnvelopes() {
- return envelopes;
- }
-
- public boolean isStarted() {
- return started;
- }
-
- public boolean isStopped() {
- return stopped;
- }
-
- public boolean isRegistered() {
- return registered;
- }
-
- public boolean isFlushed() {
- return flushed;
+ assertEquals(super.getExpectedSource(), source);
+ super.flush(source);
}
}
}
http://git-wip-us.apache.org/repos/asf/samza/blob/a4b831d3/samza-core/src/test/java/org/apache/samza/coordinator/stream/TestCoordinatorStreamWriter.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/coordinator/stream/TestCoordinatorStreamWriter.java b/samza-core/src/test/java/org/apache/samza/coordinator/stream/TestCoordinatorStreamWriter.java
new file mode 100644
index 0000000..c484660
--- /dev/null
+++ b/samza-core/src/test/java/org/apache/samza/coordinator/stream/TestCoordinatorStreamWriter.java
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.coordinator.stream;
+
+import org.apache.samza.SamzaException;
+import org.apache.samza.config.Config;
+import org.apache.samza.config.MapConfig;
+import org.apache.samza.serializers.model.SamzaObjectMapper;
+import org.apache.samza.system.OutgoingMessageEnvelope;
+import org.codehaus.jackson.type.TypeReference;
+import org.junit.Test;
+
+import java.lang.reflect.Field;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.junit.Assert.*;
+
+/**
+ * This class is a unit test for the CoordinatorStreamWriter class.
+ */
+public class TestCoordinatorStreamWriter {
+
+ private CoordinatorStreamWriter coordinatorStreamWriter;
+ private MockCoordinatorStreamSystemFactory.MockSystemProducer systemProducer;
+
+ @Test
+ public void testCoordinatorStream() {
+
+ Map<String, String> configMap = new HashMap<>();
+ configMap.put("systems.coordinatorStreamWriter.samza.factory", "org.apache.samza.coordinator.stream.MockCoordinatorStreamSystemFactory");
+ configMap.put("job.name", "coordinator-stream-writer-test");
+ Config config = new MapConfig(configMap);
+ coordinatorStreamWriter = new CoordinatorStreamWriter(config);
+ boolean exceptionHappened = false;
+
+ try {
+
+ //get coordinator system producer
+ Field coordinatorProducerField = coordinatorStreamWriter.getClass().getDeclaredField("coordinatorStreamSystemProducer");
+ coordinatorProducerField.setAccessible(true);
+ assertNotNull(coordinatorProducerField.get(coordinatorStreamWriter));
+ CoordinatorStreamSystemProducer coordinatorStreamSystemProducer = (CoordinatorStreamSystemProducer) coordinatorProducerField.get(coordinatorStreamWriter);
+
+ //get mock system producer
+ Field systemProducerField = coordinatorStreamSystemProducer.getClass().getDeclaredField("systemProducer");
+ systemProducerField.setAccessible(true);
+ systemProducer = (MockCoordinatorStreamSystemFactory.MockSystemProducer) systemProducerField.get(coordinatorStreamSystemProducer);
+
+ testStart();
+ testSendMessage();
+ testStop();
+
+
+ } catch (NoSuchFieldException | IllegalAccessException | NoSuchMethodException | InvocationTargetException e) {
+ e.printStackTrace();
+ exceptionHappened = true;
+ }
+
+ assertFalse(exceptionHappened);
+
+
+ }
+
+
+ public void testStart() throws NoSuchFieldException, IllegalAccessException {
+
+ //checks before starting
+ assertFalse(systemProducer.isStarted());
+
+ //start and check if start has been done successfully
+ coordinatorStreamWriter.start();
+ assertTrue(systemProducer.isStarted());
+
+ }
+
+ public void testStop() throws NoSuchFieldException, IllegalAccessException {
+
+ //checks before stopping
+ assertTrue(systemProducer.isStarted());
+
+ //stop and check if stop has been done correctly
+ coordinatorStreamWriter.stop();
+ assertTrue(systemProducer.isStopped());
+ }
+
+ public void testSendMessage() throws NoSuchMethodException, InvocationTargetException, IllegalAccessException {
+
+ //check a correct message
+ assertEquals(0, systemProducer.getEnvelopes().size());
+ coordinatorStreamWriter.sendMessage("set-config", "key0", "value0");
+ assertEquals(1, systemProducer.getEnvelopes().size());
+
+ //check invalid input is handled
+ boolean exceptionHappened = false;
+ try {
+ coordinatorStreamWriter.sendMessage("invalid-type", "key-invalid", "value-invalid");
+ } catch (IllegalArgumentException e) {
+ exceptionHappened = true;
+ }
+ assertTrue(exceptionHappened);
+ assertEquals(1, systemProducer.getEnvelopes().size());
+
+
+ //check sendSetConfigMessage method works correctly
+ Class[] sendArgs = {String.class, String.class};
+ Method sendSetConfigMethod = coordinatorStreamWriter.getClass().getDeclaredMethod("sendSetConfigMessage", sendArgs);
+ sendSetConfigMethod.setAccessible(true);
+ sendSetConfigMethod.invoke(coordinatorStreamWriter, "key1", "value1");
+ assertEquals(2, systemProducer.getEnvelopes().size());
+
+
+ //check the messages are correct
+ List<OutgoingMessageEnvelope> envelopes = systemProducer.getEnvelopes();
+ OutgoingMessageEnvelope envelope0 = envelopes.get(0);
+ OutgoingMessageEnvelope envelope1 = envelopes.get(1);
+ TypeReference<Object[]> keyRef = new TypeReference<Object[]>() {
+ };
+ TypeReference<Map<String, Object>> msgRef = new TypeReference<Map<String, Object>>() {
+ };
+ assertEquals(2, envelopes.size());
+
+ assertEquals("key0", deserialize((byte[]) envelope0.getKey(), keyRef)[CoordinatorStreamMessage.KEY_INDEX]);
+ Map<String, String> values = (Map<String, String>) deserialize((byte[]) envelope0.getMessage(), msgRef).get("values");
+ assertEquals("value0", values.get("value"));
+
+ assertEquals("key1", deserialize((byte[]) envelope1.getKey(), keyRef)[CoordinatorStreamMessage.KEY_INDEX]);
+ values = (Map<String, String>) deserialize((byte[]) envelope1.getMessage(), msgRef).get("values");
+ assertEquals("value1", values.get("value"));
+ }
+
+ private <T> T deserialize(byte[] bytes, TypeReference<T> reference) {
+ try {
+ if (bytes != null) {
+ String valueStr = new String((byte[]) bytes, "UTF-8");
+ return SamzaObjectMapper.getObjectMapper().readValue(valueStr, reference);
+ }
+ } catch (Exception e) {
+ throw new SamzaException(e);
+ }
+
+ return null;
+ }
+
+}
+
http://git-wip-us.apache.org/repos/asf/samza/blob/a4b831d3/samza-shell/src/main/bash/run-coordinator-stream-writer.sh
----------------------------------------------------------------------
diff --git a/samza-shell/src/main/bash/run-coordinator-stream-writer.sh b/samza-shell/src/main/bash/run-coordinator-stream-writer.sh
new file mode 100644
index 0000000..d2249dd
--- /dev/null
+++ b/samza-shell/src/main/bash/run-coordinator-stream-writer.sh
@@ -0,0 +1,21 @@
+#!/bin/bash
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+[[ $JAVA_OPTS != *-Dlog4j.configuration* ]] && export JAVA_OPTS="$JAVA_OPTS -Dlog4j.configuration=file:$(dirname $0)/log4j-console.xml"
+
+exec $(dirname $0)/run-class.sh org.apache.samza.coordinator.stream.CoordinatorStreamWriter "$@"