You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by na...@apache.org on 2015/07/17 00:08:29 UTC
samza git commit: Revert "SAMZA-704 : Create a tool to write
coordinator stream"
Repository: samza
Updated Branches:
refs/heads/master a4b831d3d -> 01ee053ed
Revert "SAMZA-704 : Create a tool to write coordinator stream"
This reverts commit a4b831d3d3a555bc3cca2b0819813c6fad8bd480.
Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/01ee053e
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/01ee053e
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/01ee053e
Branch: refs/heads/master
Commit: 01ee053ed720832a1ac679525f1aa2b29fe0463e
Parents: a4b831d
Author: Navina <na...@gmail.com>
Authored: Thu Jul 16 15:07:54 2015 -0700
Committer: Navina <na...@gmail.com>
Committed: Thu Jul 16 15:07:54 2015 -0700
----------------------------------------------------------------------
checkstyle/import-control.xml | 1 -
checkstyle/import-control.xml.orig | 183 -------------------
.../stream/CoordinatorStreamMessage.java | 8 +-
.../stream/CoordinatorStreamWriter.java | 128 -------------
.../CoordinatorStreamWriterCommandLine.scala | 71 -------
.../MockCoordinatorStreamSystemFactory.java | 90 +++------
.../TestCoordinatorStreamSystemProducer.java | 59 +++++-
.../stream/TestCoordinatorStreamWriter.java | 166 -----------------
.../main/bash/run-coordinator-stream-writer.sh | 21 ---
9 files changed, 74 insertions(+), 653 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/samza/blob/01ee053e/checkstyle/import-control.xml
----------------------------------------------------------------------
diff --git a/checkstyle/import-control.xml b/checkstyle/import-control.xml
index 6654319..eef3370 100644
--- a/checkstyle/import-control.xml
+++ b/checkstyle/import-control.xml
@@ -123,7 +123,6 @@
<allow class="org.apache.samza.Partition" />
<allow class="org.apache.samza.SamzaException" />
- <allow class="joptsimple.OptionSet" />
</subpackage>
<subpackage name="checkpoint">
http://git-wip-us.apache.org/repos/asf/samza/blob/01ee053e/checkstyle/import-control.xml.orig
----------------------------------------------------------------------
diff --git a/checkstyle/import-control.xml.orig b/checkstyle/import-control.xml.orig
deleted file mode 100644
index eef3370..0000000
--- a/checkstyle/import-control.xml.orig
+++ /dev/null
@@ -1,183 +0,0 @@
-<!DOCTYPE import-control PUBLIC
- "-//Puppy Crawl//DTD Import Control 1.1//EN"
- "http://www.puppycrawl.com/dtds/import_control_1_1.dtd">
-<!--
-// Licensed to the Apache Software Foundation (ASF) under one or more
-// contributor license agreements. See the NOTICE file distributed with
-// this work for additional information regarding copyright ownership.
-// The ASF licenses this file to You under the Apache License, Version 2.0
-// (the "License"); you may not use this file except in compliance with
-// the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
--->
-<import-control pkg="org.apache.samza">
-
- <!-- THINK HARD ABOUT THE LAYERING OF THE PROJECT BEFORE CHANGING THIS FILE -->
-
- <!-- common library dependencies -->
- <allow pkg="java" />
- <allow pkg="javax.management" />
- <allow pkg="org.slf4j" />
- <allow pkg="org.junit" />
- <allow pkg="org.codehaus" />
- <allow pkg="org.mockito" />
- <allow pkg="org.apache.log4j" />
- <allow pkg="org.apache.kafka" />
-
- <subpackage name="config">
- <allow class="org.apache.samza.SamzaException" />
- </subpackage>
-
- <subpackage name="serializers">
- <allow pkg="org.apache.samza.config" />
-
- <subpackage name="model">
- <allow pkg="org.apache.samza.job.model" />
- <allow pkg="org.apache.samza.util" />
-
- <allow class="org.apache.samza.Partition" />
- <allow class="org.apache.samza.container.TaskName" />
- <allow class="org.apache.samza.system.SystemStreamPartition" />
- </subpackage>
- </subpackage>
-
- <subpackage name="job">
- <allow pkg="org.apache.samza.config" />
-
- <subpackage name="model">
- <allow class="org.apache.samza.Partition" />
- <allow class="org.apache.samza.container.TaskName" />
- <allow class="org.apache.samza.system.SystemStreamPartition" />
- <allow class="org.apache.samza.container.LocalityManager" />
- </subpackage>
- </subpackage>
-
- <subpackage name="system">
- <allow pkg="org.apache.samza.config" />
- <allow pkg="org.apache.samza.metrics" />
- <allow pkg="org.apache.samza.serializers" />
-
- <allow class="org.apache.samza.Partition" />
- <allow class="org.apache.samza.SamzaException" />
-
- <subpackage name="chooser">
- <allow class="org.apache.samza.system.SystemStreamPartition" />
- <allow class="org.apache.samza.system.IncomingMessageEnvelope" />
- </subpackage>
-
- <subpackage name="mock">
- <allow pkg="org.apache.samza.system" />
- <allow pkg="org.apache.samza.util" />
- </subpackage>
- </subpackage>
-
- <subpackage name="util">
- <allow pkg="org.apache.samza.metrics" />
- <allow pkg="org.apache.samza.system" />
-
- <allow class="org.apache.samza.Partition" />
- <allow class="org.apache.samza.SamzaException" />
- </subpackage>
-
- <subpackage name="metrics">
- <allow pkg="org.apache.samza.config" />
- <allow pkg="org.apache.samza.util" />
- </subpackage>
-
- <subpackage name="task">
- <allow pkg="org.apache.samza.config" />
- <allow pkg="org.apache.samza.container" />
- <allow pkg="org.apache.samza.metrics" />
- <allow pkg="org.apache.samza.system" />
- </subpackage>
-
- <subpackage name="container">
- <allow pkg="org.apache.samza.config" />
- <allow pkg="org.apache.samza.coordinator.stream" />
- <subpackage name="grouper">
- <subpackage name="stream">
- <allow pkg="org.apache.samza.container" />
- <allow pkg="org.apache.samza.system" />
- </subpackage>
-
- <subpackage name="task">
- <allow pkg="org.apache.samza.job" />
- </subpackage>
- </subpackage>
- </subpackage>
-
- <subpackage name="coordinator">
- <allow pkg="org.apache.samza.checkpoint" />
- <allow pkg="org.apache.samza.config" />
- <allow pkg="org.apache.samza.metrics" />
- <allow pkg="org.apache.samza.system" />
- <allow pkg="org.apache.samza.serializers" />
- <allow pkg="org.apache.samza.util" />
-
- <allow class="org.apache.samza.Partition" />
- <allow class="org.apache.samza.SamzaException" />
- </subpackage>
-
- <subpackage name="checkpoint">
- <allow pkg="org.apache.samza.config" />
- <allow pkg="org.apache.samza.container" />
- <allow pkg="org.apache.samza.coordinator" />
- <allow pkg="org.apache.samza.metrics" />
- <allow pkg="org.apache.samza.system" />
-
- <allow class="org.apache.samza.SamzaException" />
- </subpackage>
-
- <subpackage name="storage">
- <allow pkg="org.apache.samza.container" />
- <allow pkg="org.apache.samza.coordinator" />
- <allow pkg="org.apache.samza.metrics" />
- <allow pkg="org.apache.samza.serializers" />
- <allow pkg="org.apache.samza.system" />
- <allow pkg="org.apache.samza.task" />
- <allow pkg="org.apache.samza.util" />
- <allow pkg="org.apache.samza.job" />
- <allow pkg="org.apache.samza.config" />
- <allow pkg="joptsimple" />
-
- <allow class="org.apache.samza.SamzaException" />
- <allow class="org.apache.samza.Partition" />
- </subpackage>
-
- <subpackage name="logging">
- <subpackage name="log4j">
- <allow pkg="org.apache.samza.config" />
- <allow pkg="org.apache.samza.coordinator" />
- <allow pkg="org.apache.samza.job" />
- <allow pkg="org.apache.samza.metrics" />
- <allow pkg="org.apache.samza.system" />
- <allow pkg="org.apache.samza.serializers" />
- <allow pkg="org.apache.samza.util" />
-
- <allow class="org.apache.samza.logging.log4j.serializers.LoggingEventJsonSerdeFactory" />
- <allow class="org.apache.samza.logging.log4j.serializers.LoggingEventJsonSerde" />
- <allow class="org.apache.samza.logging.log4j.serializers.LoggingEventStringSerdeFactory" />
- <allow class="org.apache.samza.logging.log4j.serializers.LoggingEventStringSerde" />
- <allow class="org.apache.samza.SamzaException" />
- </subpackage>
- </subpackage>
-
- <subpackage name="test">
- <subpackage name="integration">
- <allow pkg="org.apache.samza.config" />
- <allow pkg="org.apache.samza.container" />
- <allow pkg="org.apache.samza.system" />
- <allow pkg="org.apache.samza.storage" />
- <allow pkg="org.apache.samza.task" />
- <allow pkg="org.apache.samza.util" />
- </subpackage>
- </subpackage>
-
-</import-control>
http://git-wip-us.apache.org/repos/asf/samza/blob/01ee053e/samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamMessage.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamMessage.java b/samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamMessage.java
index e5ab4fb..6bd1bd3 100644
--- a/samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamMessage.java
+++ b/samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamMessage.java
@@ -211,7 +211,7 @@ public class CoordinatorStreamMessage {
/**
* The type of the message is used to convert a generic
- * CoordinatorStreamMessage into a specific message, such as a SetConfig
+ * CoordinatorStreaMessage into a specific message, such as a SetConfig
* message.
*
* @return The type of the message.
@@ -235,14 +235,14 @@ public class CoordinatorStreamMessage {
}
/**
- * @return The username of a message.
+ * @return Whether the message signifies a delete or not.
*/
public String getUsername() {
return (String) this.messageMap.get("username");
}
/**
- * @return The timestamp of a message.
+ * @return Whether the message signifies a delete or not.
*/
public long getTimestamp() {
return (Long) this.messageMap.get("timestamp");
@@ -254,7 +254,7 @@ public class CoordinatorStreamMessage {
public Map<String, Object> getMessageMap() {
if (!isDelete) {
Map<String, Object> immutableMap = new HashMap<String, Object>(messageMap);
- // To make sure the values is immutable, we overwrite it with an immutable version of the the values map.
+ // To make sure the values is not immutable, we overwrite it with an immutable version of the the values map.
immutableMap.put("values", Collections.unmodifiableMap(getMessageValues()));
return Collections.unmodifiableMap(immutableMap);
} else {
http://git-wip-us.apache.org/repos/asf/samza/blob/01ee053e/samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamWriter.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamWriter.java b/samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamWriter.java
deleted file mode 100644
index f769756..0000000
--- a/samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamWriter.java
+++ /dev/null
@@ -1,128 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.coordinator.stream;
-
-import joptsimple.OptionSet;
-import org.apache.samza.config.Config;
-import org.apache.samza.metrics.MetricsRegistryMap;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-
-/**
- * This class writes control messages to the CoordinatorStream.
- * To use this class it, first, it should be initialized by the start() method,
- * and then use the sendMessage() function to send all the control messages needed.
- * Finally, the stop() method should be called.
- * The control messages are in the format of a (type, key, value) where:
- * type: defines the kind of message of the control message from the set {set-config}.
- * key: defines a key to associate with the value. This can be null as well for messages with no value
- * value: defines the value being sent along with the message. This can be null as well for messages with no value.
- */
-public class CoordinatorStreamWriter {
-
- private static final Logger log = LoggerFactory.getLogger(CoordinatorStreamWriter.class);
- public final static String SOURCE = "coordinator-stream-writer";
- public static final String SET_CONFIG_TYPE = CoordinatorStreamMessage.SetConfig.TYPE;
-
- private CoordinatorStreamSystemProducer coordinatorStreamSystemProducer;
-
-
- public CoordinatorStreamWriter(Config config) {
- coordinatorStreamSystemProducer = new CoordinatorStreamSystemFactory().getCoordinatorStreamSystemProducer(config, new MetricsRegistryMap());
- }
-
- /**
- * This method initializes the writer by starting a coordinator stream producer.
- */
- public void start() {
- coordinatorStreamSystemProducer.register(CoordinatorStreamWriter.SOURCE);
- coordinatorStreamSystemProducer.start();
- log.info("Started coordinator stream writer.");
- }
-
- /**
- * This method stops the writer and closes the coordinator stream producer
- */
- public void stop() {
- log.info("Stopping the coordinator stream producer.");
- coordinatorStreamSystemProducer.stop();
- }
-
- /**
- * This method sends a message to the coordinator stream. This creates a message containing (type,key,value).
- * For example if you want to set the number of yarn containers to 3, you would use
- * ("set-config", "yarn.container.count", "3").
- *
- * @param type defines the kind of message of the control message from the set {"set-config"}.
- * @param key defines a key to associate with the value. This can be null for messages with no key or value.
- * @param value defines the value being sent along with the message. This can be null for messages with no value.
- */
- public void sendMessage(String type, String key, String value) {
- //TODO: validate keys and values
- if (type.equals(SET_CONFIG_TYPE)) {
- sendSetConfigMessage(key, value);
- } else {
- throw new IllegalArgumentException("Type is invalid. The possible values are {" + SET_CONFIG_TYPE + "}");
- }
- }
-
- /**
- * This method sends message of type "set-config" to the coordinator stream
- *
- * @param key defines the name of the configuration being set. For example, for setting the number of yarn containers,
- * the key is "yarn.container.count"
- * @param value defines the value associated with the key. For example, if the key is "yarn.container.count" the value
- * is the new number of containers.
- */
- private void sendSetConfigMessage(String key, String value) {
- log.info("sent SetConfig message with key = " + key + " and value = " + value);
- coordinatorStreamSystemProducer.send(new CoordinatorStreamMessage.SetConfig(CoordinatorStreamWriter.SOURCE, key, value));
- }
-
- /**
- * Main function for using the CoordinatorStreamWriter. The main function starts a CoordinatorStreamWriter
- * and sends control messages.
- * To run the code use the following command:
- * {path to samza deployment}/samza/bin/run-coordinator-stream-writer.sh --config-factory={config-factory} --config-path={path to config file of a job} --type={type of the message} --key={[optional] key of the message} --value={[optional] value of the message}
- *
- * @param args input arguments for running the writer. These arguments are:
- * "config-factory" = The config file factory
- * "config-path" = The path to config file of a job
- * "type" = type of the message being written
- * "key" = [optional] key of the message being written
- * "value" = [optional] value of the message being written
- */
- public static void main(String[] args) {
- CoordinatorStreamWriterCommandLine cmdline = new CoordinatorStreamWriterCommandLine();
- OptionSet options = cmdline.parser().parse(args);
- Config config = cmdline.loadConfig(options);
- String type = cmdline.loadType(options);
- String key = cmdline.loadKey(options);
- String value = cmdline.loadValue(options);
-
- CoordinatorStreamWriter writer = new CoordinatorStreamWriter(config);
- writer.start();
- writer.sendMessage(type, key, value);
- writer.stop();
- }
-
-}
-
http://git-wip-us.apache.org/repos/asf/samza/blob/01ee053e/samza-core/src/main/scala/org/apache/samza/coordinator/stream/CoordinatorStreamWriterCommandLine.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/coordinator/stream/CoordinatorStreamWriterCommandLine.scala b/samza-core/src/main/scala/org/apache/samza/coordinator/stream/CoordinatorStreamWriterCommandLine.scala
deleted file mode 100644
index 0c17800..0000000
--- a/samza-core/src/main/scala/org/apache/samza/coordinator/stream/CoordinatorStreamWriterCommandLine.scala
+++ /dev/null
@@ -1,71 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.coordinator.stream
-
-import org.apache.samza.util.CommandLine
-import joptsimple.OptionSet
-
-class CoordinatorStreamWriterCommandLine extends CommandLine {
-
- val messageType =
- parser.accepts("type", "the type of the message being sent.")
- .withRequiredArg
- .ofType(classOf[java.lang.String])
- .describedAs("Required field. This field is the type of the message being sent." +
- " The possible values are {\"set-config\"}")
-
-
- val messageKey =
- parser.accepts("key", "the type of the message being sent")
- .withRequiredArg
- .ofType(classOf[java.lang.String])
- .describedAs("key of the message")
-
- val messageValue =
- parser.accepts("value", "the type of the message being sent")
- .withRequiredArg
- .ofType(classOf[java.lang.String])
- .describedAs("value of the message")
-
- def loadType(options: OptionSet) = {
- if (!options.has(messageType)) {
- parser.printHelpOn(System.err)
- System.exit(-1)
- }
- options.valueOf(messageType)
- }
-
- def loadKey(options: OptionSet): java.lang.String = {
- if (options.has(messageKey)) {
- options.valueOf(messageKey)
- } else {
- null
- }
- }
-
- def loadValue(options: OptionSet) = {
- var value: java.lang.String = null
- if (options.has(messageValue)) {
- value = options.valueOf(messageValue)
- }
-
- value
- }
-}
http://git-wip-us.apache.org/repos/asf/samza/blob/01ee053e/samza-core/src/test/java/org/apache/samza/coordinator/stream/MockCoordinatorStreamSystemFactory.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/coordinator/stream/MockCoordinatorStreamSystemFactory.java b/samza-core/src/test/java/org/apache/samza/coordinator/stream/MockCoordinatorStreamSystemFactory.java
index 84ae0b5..647cadb 100644
--- a/samza-core/src/test/java/org/apache/samza/coordinator/stream/MockCoordinatorStreamSystemFactory.java
+++ b/samza-core/src/test/java/org/apache/samza/coordinator/stream/MockCoordinatorStreamSystemFactory.java
@@ -24,18 +24,14 @@ import org.apache.samza.config.Config;
import org.apache.samza.config.ConfigException;
import org.apache.samza.metrics.MetricsRegistry;
import org.apache.samza.system.SystemAdmin;
+import org.apache.samza.system.OutgoingMessageEnvelope;
import org.apache.samza.system.SystemConsumer;
import org.apache.samza.system.SystemFactory;
-import org.apache.samza.system.SystemStreamPartition;
import org.apache.samza.system.SystemProducer;
-import org.apache.samza.system.OutgoingMessageEnvelope;
+import org.apache.samza.system.SystemStreamPartition;
import org.apache.samza.util.SinglePartitionWithoutOffsetsSystemAdmin;
import org.apache.samza.util.Util;
-import java.util.ArrayList;
-import java.util.List;
-
-
/**
* Helper for creating mock CoordinatorStreamConsumer and
* CoordinatorStreamConsumer. The CoordinatorStreamConsumer is meant to just
@@ -46,7 +42,6 @@ public class MockCoordinatorStreamSystemFactory implements SystemFactory {
private static SystemConsumer mockConsumer = null;
private static boolean useCachedConsumer = false;
-
public static void enableMockConsumerCache() {
mockConsumer = null;
useCachedConsumer = true;
@@ -59,10 +54,9 @@ public class MockCoordinatorStreamSystemFactory implements SystemFactory {
/**
* Returns a consumer that sends all configs to the coordinator stream.
- *
* @param config Along with the configs, you can pass checkpoints and changelog stream messages into the stream.
* The expected pattern is cp:source:taskname -> ssp,offset for checkpoint (Use sspToString util)
- * ch:source:taskname -> changelogPartition for changelog
+ * ch:source:taskname -> changelogPartition for changelog
* Everything else is processed as normal config
*/
public SystemConsumer getConsumer(String systemName, Config config, MetricsRegistry registry) {
@@ -86,10 +80,26 @@ public class MockCoordinatorStreamSystemFactory implements SystemFactory {
}
/**
- * Returns a MockCoordinatorSystemProducer.
+ * Returns a no-op producer.
*/
public SystemProducer getProducer(String systemName, Config config, MetricsRegistry registry) {
- return new MockSystemProducer(null);
+ // A do-nothing producer.
+ return new SystemProducer() {
+ public void start() {
+ }
+
+ public void stop() {
+ }
+
+ public void register(String source) {
+ }
+
+ public void send(String source, OutgoingMessageEnvelope envelope) {
+ }
+
+ public void flush(String source) {
+ }
+ };
}
/**
@@ -105,62 +115,4 @@ public class MockCoordinatorStreamSystemFactory implements SystemFactory {
// Do nothing.
}
}
-
- protected static class MockSystemProducer implements SystemProducer {
- private final String expectedSource;
- private final List<OutgoingMessageEnvelope> envelopes;
- private boolean started = false;
- private boolean registered = false;
- private boolean flushed = false;
-
- public MockSystemProducer(String expectedSource) {
- this.expectedSource = expectedSource;
- this.envelopes = new ArrayList<OutgoingMessageEnvelope>();
- }
-
-
- public void start() {
- started = true;
- }
-
- public void stop() {
- started = false;
- }
-
- public void register(String source) {
- registered = true;
- }
-
- public void send(String source, OutgoingMessageEnvelope envelope) {
- envelopes.add(envelope);
- }
-
- public void flush(String source) {
- flushed = true;
- }
-
- public List<OutgoingMessageEnvelope> getEnvelopes() {
- return envelopes;
- }
-
- public boolean isStarted() {
- return started;
- }
-
- public boolean isStopped() {
- return !started;
- }
-
- public boolean isRegistered() {
- return registered;
- }
-
- public boolean isFlushed() {
- return flushed;
- }
-
- public String getExpectedSource() {
- return expectedSource;
- }
- }
}
http://git-wip-us.apache.org/repos/asf/samza/blob/01ee053e/samza-core/src/test/java/org/apache/samza/coordinator/stream/TestCoordinatorStreamSystemProducer.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/coordinator/stream/TestCoordinatorStreamSystemProducer.java b/samza-core/src/test/java/org/apache/samza/coordinator/stream/TestCoordinatorStreamSystemProducer.java
index 1ef07d0..68e3255 100644
--- a/samza-core/src/test/java/org/apache/samza/coordinator/stream/TestCoordinatorStreamSystemProducer.java
+++ b/samza-core/src/test/java/org/apache/samza/coordinator/stream/TestCoordinatorStreamSystemProducer.java
@@ -23,6 +23,7 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
+import java.util.ArrayList;
import java.util.List;
import java.util.Map;
@@ -30,6 +31,7 @@ import org.apache.samza.SamzaException;
import org.apache.samza.serializers.model.SamzaObjectMapper;
import org.apache.samza.system.SystemAdmin;
import org.apache.samza.system.OutgoingMessageEnvelope;
+import org.apache.samza.system.SystemProducer;
import org.apache.samza.system.SystemStream;
import org.apache.samza.util.SinglePartitionWithoutOffsetsSystemAdmin;
import org.codehaus.jackson.type.TypeReference;
@@ -40,7 +42,7 @@ public class TestCoordinatorStreamSystemProducer {
public void testCoordinatorStreamSystemProducer() {
String source = "source";
SystemStream systemStream = new SystemStream("system", "stream");
- MockCoordinatorSystemProducer systemProducer = new MockCoordinatorSystemProducer(source);
+ MockSystemProducer systemProducer = new MockSystemProducer(source);
MockSystemAdmin systemAdmin = new MockSystemAdmin();
CoordinatorStreamSystemProducer producer = new CoordinatorStreamSystemProducer(systemStream, systemProducer, systemAdmin);
CoordinatorStreamMessage.SetConfig setConfig1 = new CoordinatorStreamMessage.SetConfig(source, "job.name", "my-job-name");
@@ -91,22 +93,59 @@ public class TestCoordinatorStreamSystemProducer {
}
}
- private static class MockCoordinatorSystemProducer extends MockCoordinatorStreamSystemFactory.MockSystemProducer {
+ private static class MockSystemProducer implements SystemProducer {
+ private final String expectedSource;
+ private final List<OutgoingMessageEnvelope> envelopes;
+ private boolean started = false;
+ private boolean stopped = false;
+ private boolean registered = false;
+ private boolean flushed = false;
- public MockCoordinatorSystemProducer(String expectedSource) {
- super(expectedSource);
+ public MockSystemProducer(String expectedSource) {
+ this.expectedSource = expectedSource;
+ this.envelopes = new ArrayList<OutgoingMessageEnvelope>();
+ }
+
+ public void start() {
+ started = true;
+ }
+
+ public void stop() {
+ stopped = true;
}
- @Override
public void register(String source) {
- assertEquals(super.getExpectedSource(), source);
- super.register(source);
+ assertEquals(expectedSource, source);
+ registered = true;
+ }
+
+ public void send(String source, OutgoingMessageEnvelope envelope) {
+ envelopes.add(envelope);
}
- @Override
public void flush(String source) {
- assertEquals(super.getExpectedSource(), source);
- super.flush(source);
+ assertEquals(expectedSource, source);
+ flushed = true;
+ }
+
+ public List<OutgoingMessageEnvelope> getEnvelopes() {
+ return envelopes;
+ }
+
+ public boolean isStarted() {
+ return started;
+ }
+
+ public boolean isStopped() {
+ return stopped;
+ }
+
+ public boolean isRegistered() {
+ return registered;
+ }
+
+ public boolean isFlushed() {
+ return flushed;
}
}
}
http://git-wip-us.apache.org/repos/asf/samza/blob/01ee053e/samza-core/src/test/java/org/apache/samza/coordinator/stream/TestCoordinatorStreamWriter.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/coordinator/stream/TestCoordinatorStreamWriter.java b/samza-core/src/test/java/org/apache/samza/coordinator/stream/TestCoordinatorStreamWriter.java
deleted file mode 100644
index c484660..0000000
--- a/samza-core/src/test/java/org/apache/samza/coordinator/stream/TestCoordinatorStreamWriter.java
+++ /dev/null
@@ -1,166 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.coordinator.stream;
-
-import org.apache.samza.SamzaException;
-import org.apache.samza.config.Config;
-import org.apache.samza.config.MapConfig;
-import org.apache.samza.serializers.model.SamzaObjectMapper;
-import org.apache.samza.system.OutgoingMessageEnvelope;
-import org.codehaus.jackson.type.TypeReference;
-import org.junit.Test;
-
-import java.lang.reflect.Field;
-import java.lang.reflect.InvocationTargetException;
-import java.lang.reflect.Method;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import static org.junit.Assert.*;
-
-/**
- * This class is a unit test for the CoordinatorStreamWriter class.
- */
-public class TestCoordinatorStreamWriter {
-
- private CoordinatorStreamWriter coordinatorStreamWriter;
- private MockCoordinatorStreamSystemFactory.MockSystemProducer systemProducer;
-
- @Test
- public void testCoordinatorStream() {
-
- Map<String, String> configMap = new HashMap<>();
- configMap.put("systems.coordinatorStreamWriter.samza.factory", "org.apache.samza.coordinator.stream.MockCoordinatorStreamSystemFactory");
- configMap.put("job.name", "coordinator-stream-writer-test");
- Config config = new MapConfig(configMap);
- coordinatorStreamWriter = new CoordinatorStreamWriter(config);
- boolean exceptionHappened = false;
-
- try {
-
- //get coordinator system producer
- Field coordinatorProducerField = coordinatorStreamWriter.getClass().getDeclaredField("coordinatorStreamSystemProducer");
- coordinatorProducerField.setAccessible(true);
- assertNotNull(coordinatorProducerField.get(coordinatorStreamWriter));
- CoordinatorStreamSystemProducer coordinatorStreamSystemProducer = (CoordinatorStreamSystemProducer) coordinatorProducerField.get(coordinatorStreamWriter);
-
- //get mock system producer
- Field systemProducerField = coordinatorStreamSystemProducer.getClass().getDeclaredField("systemProducer");
- systemProducerField.setAccessible(true);
- systemProducer = (MockCoordinatorStreamSystemFactory.MockSystemProducer) systemProducerField.get(coordinatorStreamSystemProducer);
-
- testStart();
- testSendMessage();
- testStop();
-
-
- } catch (NoSuchFieldException | IllegalAccessException | NoSuchMethodException | InvocationTargetException e) {
- e.printStackTrace();
- exceptionHappened = true;
- }
-
- assertFalse(exceptionHappened);
-
-
- }
-
-
- public void testStart() throws NoSuchFieldException, IllegalAccessException {
-
- //checks before starting
- assertFalse(systemProducer.isStarted());
-
- //start and check if start has been done successfully
- coordinatorStreamWriter.start();
- assertTrue(systemProducer.isStarted());
-
- }
-
- public void testStop() throws NoSuchFieldException, IllegalAccessException {
-
- //checks before stopping
- assertTrue(systemProducer.isStarted());
-
- //stop and check if stop has been done correctly
- coordinatorStreamWriter.stop();
- assertTrue(systemProducer.isStopped());
- }
-
- public void testSendMessage() throws NoSuchMethodException, InvocationTargetException, IllegalAccessException {
-
- //check a correct message
- assertEquals(0, systemProducer.getEnvelopes().size());
- coordinatorStreamWriter.sendMessage("set-config", "key0", "value0");
- assertEquals(1, systemProducer.getEnvelopes().size());
-
- //check invalid input is handled
- boolean exceptionHappened = false;
- try {
- coordinatorStreamWriter.sendMessage("invalid-type", "key-invalid", "value-invalid");
- } catch (IllegalArgumentException e) {
- exceptionHappened = true;
- }
- assertTrue(exceptionHappened);
- assertEquals(1, systemProducer.getEnvelopes().size());
-
-
- //check sendSetConfigMessage method works correctly
- Class[] sendArgs = {String.class, String.class};
- Method sendSetConfigMethod = coordinatorStreamWriter.getClass().getDeclaredMethod("sendSetConfigMessage", sendArgs);
- sendSetConfigMethod.setAccessible(true);
- sendSetConfigMethod.invoke(coordinatorStreamWriter, "key1", "value1");
- assertEquals(2, systemProducer.getEnvelopes().size());
-
-
- //check the messages are correct
- List<OutgoingMessageEnvelope> envelopes = systemProducer.getEnvelopes();
- OutgoingMessageEnvelope envelope0 = envelopes.get(0);
- OutgoingMessageEnvelope envelope1 = envelopes.get(1);
- TypeReference<Object[]> keyRef = new TypeReference<Object[]>() {
- };
- TypeReference<Map<String, Object>> msgRef = new TypeReference<Map<String, Object>>() {
- };
- assertEquals(2, envelopes.size());
-
- assertEquals("key0", deserialize((byte[]) envelope0.getKey(), keyRef)[CoordinatorStreamMessage.KEY_INDEX]);
- Map<String, String> values = (Map<String, String>) deserialize((byte[]) envelope0.getMessage(), msgRef).get("values");
- assertEquals("value0", values.get("value"));
-
- assertEquals("key1", deserialize((byte[]) envelope1.getKey(), keyRef)[CoordinatorStreamMessage.KEY_INDEX]);
- values = (Map<String, String>) deserialize((byte[]) envelope1.getMessage(), msgRef).get("values");
- assertEquals("value1", values.get("value"));
- }
-
- private <T> T deserialize(byte[] bytes, TypeReference<T> reference) {
- try {
- if (bytes != null) {
- String valueStr = new String((byte[]) bytes, "UTF-8");
- return SamzaObjectMapper.getObjectMapper().readValue(valueStr, reference);
- }
- } catch (Exception e) {
- throw new SamzaException(e);
- }
-
- return null;
- }
-
-}
-
http://git-wip-us.apache.org/repos/asf/samza/blob/01ee053e/samza-shell/src/main/bash/run-coordinator-stream-writer.sh
----------------------------------------------------------------------
diff --git a/samza-shell/src/main/bash/run-coordinator-stream-writer.sh b/samza-shell/src/main/bash/run-coordinator-stream-writer.sh
deleted file mode 100644
index d2249dd..0000000
--- a/samza-shell/src/main/bash/run-coordinator-stream-writer.sh
+++ /dev/null
@@ -1,21 +0,0 @@
-#!/bin/bash
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-
-[[ $JAVA_OPTS != *-Dlog4j.configuration* ]] && export JAVA_OPTS="$JAVA_OPTS -Dlog4j.configuration=file:$(dirname $0)/log4j-console.xml"
-
-exec $(dirname $0)/run-class.sh org.apache.samza.coordinator.stream.CoordinatorStreamWriter "$@"