You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by ya...@apache.org on 2015/08/04 01:29:45 UTC
[2/2] samza git commit: SAMZA-682: refactored coordinator stream
messages.
SAMZA-682: refactored coordinator stream messages.
Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/59ec3b78
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/59ec3b78
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/59ec3b78
Branch: refs/heads/master
Commit: 59ec3b789c81d17e820e1587be66e756c4daeaa6
Parents: c9fa796
Author: József Márton Jung <j....@levi9.com>
Authored: Mon Aug 3 16:30:15 2015 -0700
Committer: Yan Fang <ya...@gmail.com>
Committed: Mon Aug 3 16:30:15 2015 -0700
----------------------------------------------------------------------
checkstyle/import-control.xml | 12 +
.../samza/checkpoint/CheckpointManager.java | 44 +-
.../apache/samza/container/LocalityManager.java | 44 +-
.../AbstractCoordinatorStreamManager.java | 112 ++++
.../stream/CoordinatorStreamMessage.java | 527 -------------------
.../stream/CoordinatorStreamSystemConsumer.java | 6 +-
.../stream/CoordinatorStreamSystemProducer.java | 4 +-
.../stream/CoordinatorStreamWriter.java | 5 +-
.../messages/CoordinatorStreamMessage.java | 321 +++++++++++
.../coordinator/stream/messages/Delete.java | 58 ++
.../stream/messages/SetChangelogMapping.java | 64 +++
.../stream/messages/SetCheckpoint.java | 74 +++
.../coordinator/stream/messages/SetConfig.java | 59 +++
.../messages/SetContainerHostMapping.java | 95 ++++
.../org/apache/samza/job/model/JobModel.java | 2 +-
.../storage/ChangelogPartitionManager.java | 45 +-
.../samza/coordinator/JobCoordinator.scala | 2 +-
.../scala/org/apache/samza/job/JobRunner.scala | 5 +-
.../MockCoordinatorStreamWrappedConsumer.java | 9 +-
.../stream/TestCoordinatorStreamMessage.java | 10 +-
.../TestCoordinatorStreamSystemConsumer.java | 15 +-
.../TestCoordinatorStreamSystemProducer.java | 9 +-
.../stream/TestCoordinatorStreamWriter.java | 1 +
.../samza/container/TestSamzaContainer.scala | 2 +-
.../resources/scalate/WEB-INF/views/index.scaml | 4 +-
25 files changed, 883 insertions(+), 646 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/samza/blob/59ec3b78/checkstyle/import-control.xml
----------------------------------------------------------------------
diff --git a/checkstyle/import-control.xml b/checkstyle/import-control.xml
index 6654319..24ed680 100644
--- a/checkstyle/import-control.xml
+++ b/checkstyle/import-control.xml
@@ -52,6 +52,8 @@
<allow pkg="org.apache.samza.config" />
<subpackage name="model">
+ <allow pkg="org.apache.samza.coordinator.stream.messages" />
+
<allow class="org.apache.samza.Partition" />
<allow class="org.apache.samza.container.TaskName" />
<allow class="org.apache.samza.system.SystemStreamPartition" />
@@ -101,6 +103,7 @@
<subpackage name="container">
<allow pkg="org.apache.samza.config" />
<allow pkg="org.apache.samza.coordinator.stream" />
+ <allow class="org.apache.samza.coordinator.stream.AbstractCoordinatorStreamManager" />
<subpackage name="grouper">
<subpackage name="stream">
<allow pkg="org.apache.samza.container" />
@@ -120,10 +123,12 @@
<allow pkg="org.apache.samza.system" />
<allow pkg="org.apache.samza.serializers" />
<allow pkg="org.apache.samza.util" />
+ <allow pkg="org.apache.samza.coordinator.stream.messages" />
<allow class="org.apache.samza.Partition" />
<allow class="org.apache.samza.SamzaException" />
<allow class="joptsimple.OptionSet" />
+ <allow class="org.apache.samza.container.TaskName" />
</subpackage>
<subpackage name="checkpoint">
@@ -134,6 +139,7 @@
<allow pkg="org.apache.samza.system" />
<allow class="org.apache.samza.SamzaException" />
+ <allow class="org.apache.samza.coordinator.stream.AbstractCoordinatorStreamManager" />
</subpackage>
<subpackage name="storage">
@@ -143,6 +149,7 @@
<allow pkg="org.apache.samza.serializers" />
<allow pkg="org.apache.samza.system" />
<allow pkg="org.apache.samza.task" />
+ <allow class="org.apache.samza.coordinator.stream.AbstractCoordinatorStreamManager" />
<allow pkg="org.apache.samza.util" />
<allow pkg="org.apache.samza.job" />
<allow pkg="org.apache.samza.config" />
@@ -181,4 +188,9 @@
</subpackage>
</subpackage>
+ <subpackage name="manager">
+ <allow pkg="org.apache.samza.coordinator.stream" />
+ <allow class="org.apache.samza.container.TaskName" />
+ </subpackage>
+
</import-control>
http://git-wip-us.apache.org/repos/asf/samza/blob/59ec3b78/samza-core/src/main/java/org/apache/samza/checkpoint/CheckpointManager.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/checkpoint/CheckpointManager.java b/samza-core/src/main/java/org/apache/samza/checkpoint/CheckpointManager.java
index 7445996..0185751 100644
--- a/samza-core/src/main/java/org/apache/samza/checkpoint/CheckpointManager.java
+++ b/samza-core/src/main/java/org/apache/samza/checkpoint/CheckpointManager.java
@@ -22,12 +22,12 @@ package org.apache.samza.checkpoint;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
-import java.util.Set;
import org.apache.samza.container.TaskName;
-import org.apache.samza.coordinator.stream.CoordinatorStreamMessage;
-import org.apache.samza.coordinator.stream.CoordinatorStreamMessage.SetCheckpoint;
+import org.apache.samza.coordinator.stream.messages.CoordinatorStreamMessage;
+import org.apache.samza.coordinator.stream.messages.SetCheckpoint;
import org.apache.samza.coordinator.stream.CoordinatorStreamSystemConsumer;
import org.apache.samza.coordinator.stream.CoordinatorStreamSystemProducer;
+import org.apache.samza.coordinator.stream.AbstractCoordinatorStreamManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -36,34 +36,18 @@ import org.slf4j.LoggerFactory;
* The CheckpointManager is used to persist and restore checkpoint information. The CheckpointManager uses
* CoordinatorStream underneath to do this.
*/
-public class CheckpointManager {
+public class CheckpointManager extends AbstractCoordinatorStreamManager {
private static final Logger log = LoggerFactory.getLogger(CheckpointManager.class);
- private final CoordinatorStreamSystemProducer coordinatorStreamProducer;
- private final CoordinatorStreamSystemConsumer coordinatorStreamConsumer;
private final Map<TaskName, Checkpoint> taskNamesToOffsets;
private final HashSet<TaskName> taskNames;
- private String source;
-
- public CheckpointManager(CoordinatorStreamSystemProducer coordinatorStreamProducer,
- CoordinatorStreamSystemConsumer coordinatorStreamConsumer) {
- this.coordinatorStreamConsumer = coordinatorStreamConsumer;
- this.coordinatorStreamProducer = coordinatorStreamProducer;
- taskNamesToOffsets = new HashMap<TaskName, Checkpoint>();
- taskNames = new HashSet<TaskName>();
- this.source = "Unknown";
- }
public CheckpointManager(CoordinatorStreamSystemProducer coordinatorStreamProducer,
CoordinatorStreamSystemConsumer coordinatorStreamConsumer,
String source) {
- this(coordinatorStreamProducer, coordinatorStreamConsumer);
- this.source = source;
- }
-
- public void start() {
- coordinatorStreamProducer.start();
- coordinatorStreamConsumer.start();
+ super(coordinatorStreamProducer, coordinatorStreamConsumer, source);
+ taskNamesToOffsets = new HashMap<TaskName, Checkpoint>();
+ taskNames = new HashSet<TaskName>();
}
/**
@@ -73,8 +57,8 @@ public class CheckpointManager {
public void register(TaskName taskName) {
log.debug("Adding taskName {} to {}", taskName, this);
taskNames.add(taskName);
- coordinatorStreamConsumer.register();
- coordinatorStreamProducer.register(taskName.getTaskName());
+ registerCoordinatorStreamConsumer();
+ registerCoordinatorStreamProducer(taskName.getTaskName());
}
/**
@@ -84,8 +68,7 @@ public class CheckpointManager {
*/
public void writeCheckpoint(TaskName taskName, Checkpoint checkpoint) {
log.debug("Writing checkpoint for Task: {} with offsets: {}", taskName.getTaskName(), checkpoint.getOffsets());
- SetCheckpoint checkPointMessage = new SetCheckpoint(source, taskName.getTaskName(), checkpoint);
- coordinatorStreamProducer.send(checkPointMessage);
+ send(new SetCheckpoint(getSource(), taskName.getTaskName(), checkpoint));
}
/**
@@ -96,8 +79,7 @@ public class CheckpointManager {
public Checkpoint readLastCheckpoint(TaskName taskName) {
// Bootstrap each time to make sure that we are caught up with the stream, the bootstrap will just catch up on consecutive calls
log.debug("Reading checkpoint for Task: {}", taskName.getTaskName());
- Set<CoordinatorStreamMessage> bootstrappedStream = coordinatorStreamConsumer.getBootstrappedStream(SetCheckpoint.TYPE);
- for (CoordinatorStreamMessage coordinatorStreamMessage : bootstrappedStream) {
+ for (CoordinatorStreamMessage coordinatorStreamMessage : getBootstrappedStream(SetCheckpoint.TYPE)) {
SetCheckpoint setCheckpoint = new SetCheckpoint(coordinatorStreamMessage);
TaskName taskNameInCheckpoint = new TaskName(setCheckpoint.getKey());
if (taskNames.contains(taskNameInCheckpoint)) {
@@ -108,8 +90,4 @@ public class CheckpointManager {
return taskNamesToOffsets.get(taskName);
}
- public void stop() {
- coordinatorStreamConsumer.stop();
- coordinatorStreamProducer.stop();
- }
}
http://git-wip-us.apache.org/repos/asf/samza/blob/59ec3b78/samza-core/src/main/java/org/apache/samza/container/LocalityManager.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/container/LocalityManager.java b/samza-core/src/main/java/org/apache/samza/container/LocalityManager.java
index 55c258f..c567bf4 100644
--- a/samza-core/src/main/java/org/apache/samza/container/LocalityManager.java
+++ b/samza-core/src/main/java/org/apache/samza/container/LocalityManager.java
@@ -19,55 +19,53 @@
package org.apache.samza.container;
-import org.apache.samza.coordinator.stream.CoordinatorStreamMessage;
+import org.apache.samza.coordinator.stream.messages.CoordinatorStreamMessage;
import org.apache.samza.coordinator.stream.CoordinatorStreamSystemConsumer;
import org.apache.samza.coordinator.stream.CoordinatorStreamSystemProducer;
+import org.apache.samza.coordinator.stream.AbstractCoordinatorStreamManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
-import org.apache.samza.coordinator.stream.CoordinatorStreamMessage.SetContainerHostMapping;
+import org.apache.samza.coordinator.stream.messages.SetContainerHostMapping;
/**
* Locality Manager is used to persist and read the container-to-host
* assignment information from the coordinator stream
* */
-public class LocalityManager {
+public class LocalityManager extends AbstractCoordinatorStreamManager {
private static final Logger log = LoggerFactory.getLogger(LocalityManager.class);
- private final CoordinatorStreamSystemConsumer coordinatorStreamConsumer;
- private final CoordinatorStreamSystemProducer coordinatorStreamProducer;
- private static final String SOURCE = "SamzaContainer-";
private Map<Integer, Map<String, String>> containerToHostMapping;
public LocalityManager(CoordinatorStreamSystemProducer coordinatorStreamProducer,
CoordinatorStreamSystemConsumer coordinatorStreamConsumer) {
- this.coordinatorStreamConsumer = coordinatorStreamConsumer;
- this.coordinatorStreamProducer = coordinatorStreamProducer;
+ super(coordinatorStreamProducer, coordinatorStreamConsumer, "SamzaContainer-");
this.containerToHostMapping = new HashMap<>();
}
- public void start() {
- coordinatorStreamProducer.start();
- coordinatorStreamConsumer.start();
+ /**
+ * This method is not supported in {@link LocalityManager}. Use {@link LocalityManager#register(String)} instead.
+ *
+ * @throws UnsupportedOperationException in the case if a {@link TaskName} is passed
+ */
+ public void register(TaskName taskName) {
+ throw new UnsupportedOperationException("TaskName cannot be registered with LocalityManager");
}
- public void stop() {
- coordinatorStreamConsumer.stop();
- coordinatorStreamProducer.stop();
- }
-
- /*
- * Register with source suffix that is containerId
- * */
+ /**
+ * Registers the locality manager with a source suffix that is container id
+ *
+ * @param sourceSuffix the source suffix which is a container id
+ */
public void register(String sourceSuffix) {
- coordinatorStreamConsumer.register();
- coordinatorStreamProducer.register(LocalityManager.SOURCE + sourceSuffix);
+ registerCoordinatorStreamConsumer();
+ registerCoordinatorStreamProducer(getSource() + sourceSuffix);
}
public Map<Integer, Map<String, String>> readContainerLocality() {
Map<Integer, Map<String, String>> allMappings = new HashMap<>();
- for (CoordinatorStreamMessage message: coordinatorStreamConsumer.getBootstrappedStream(SetContainerHostMapping.TYPE)) {
+ for (CoordinatorStreamMessage message: getBootstrappedStream(SetContainerHostMapping.TYPE)) {
SetContainerHostMapping mapping = new SetContainerHostMapping(message);
Map<String, String> localityMappings = new HashMap<>();
localityMappings.put(SetContainerHostMapping.IP_KEY, mapping.getHostLocality());
@@ -88,7 +86,7 @@ public class LocalityManager {
} else {
log.info("Container {} started at {}", containerId, hostHttpAddress);
}
- coordinatorStreamProducer.send(new SetContainerHostMapping(SOURCE + containerId, String.valueOf(containerId), hostHttpAddress, jmxAddress, jmxTunnelingAddress));
+ send(new SetContainerHostMapping(getSource() + containerId, String.valueOf(containerId), hostHttpAddress, jmxAddress, jmxTunnelingAddress));
Map<String, String> mappings = new HashMap<>();
mappings.put(SetContainerHostMapping.IP_KEY, hostHttpAddress);
mappings.put(SetContainerHostMapping.JMX_URL_KEY, jmxAddress);
http://git-wip-us.apache.org/repos/asf/samza/blob/59ec3b78/samza-core/src/main/java/org/apache/samza/coordinator/stream/AbstractCoordinatorStreamManager.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/coordinator/stream/AbstractCoordinatorStreamManager.java b/samza-core/src/main/java/org/apache/samza/coordinator/stream/AbstractCoordinatorStreamManager.java
new file mode 100644
index 0000000..ca97ce8
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/coordinator/stream/AbstractCoordinatorStreamManager.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.coordinator.stream;
+
+import org.apache.samza.container.TaskName;
+import org.apache.samza.coordinator.stream.messages.CoordinatorStreamMessage;
+
+import java.util.Set;
+
+/**
+ * Abstract class which handles the common functionality for coordinator stream consumer and producer
+ */
+public abstract class AbstractCoordinatorStreamManager {
+ private final CoordinatorStreamSystemProducer coordinatorStreamProducer;
+ private final CoordinatorStreamSystemConsumer coordinatorStreamConsumer;
+ private final String source;
+
+ /**
+ * Creates a new {@link AbstractCoordinatorStreamManager} with a given coordinator stream producer, consumer and with a given source.
+ * @param coordinatorStreamProducer the {@link CoordinatorStreamSystemProducer} which should be used with the {@link AbstractCoordinatorStreamManager}
+ * @param coordinatorStreamConsumer the {@link CoordinatorStreamSystemConsumer} which should be used with the {@link AbstractCoordinatorStreamManager}
+ * @param source ths source for the coordinator stream producer
+ */
+ protected AbstractCoordinatorStreamManager(CoordinatorStreamSystemProducer coordinatorStreamProducer, CoordinatorStreamSystemConsumer coordinatorStreamConsumer, String source) {
+ this.coordinatorStreamProducer = coordinatorStreamProducer;
+ this.coordinatorStreamConsumer = coordinatorStreamConsumer;
+ this.source = source;
+ }
+
+ /**
+ * Starts the underlying coordinator stream producer and consumer.
+ */
+ public void start() {
+ coordinatorStreamProducer.start();
+ coordinatorStreamConsumer.start();
+ }
+
+ /**
+ * Stops the underlying coordinator stream producer and consumer.
+ */
+ public void stop() {
+ coordinatorStreamConsumer.stop();
+ coordinatorStreamProducer.stop();
+ }
+
+ /**
+ * Sends a {@link CoordinatorStreamMessage} using the underlying system producer.
+ * @param message message which should be sent to producer
+ */
+ public void send(CoordinatorStreamMessage message) {
+ coordinatorStreamProducer.send(message);
+ }
+
+ /**
+ * Returns a set of messages from the bootstrapped stream for a given source.
+ * @param source the source of the given messages
+ * @return a set of {@link CoordinatorStreamMessage} if messages exists for the given source, else an empty set
+ */
+ public Set<CoordinatorStreamMessage> getBootstrappedStream(String source) {
+ return coordinatorStreamConsumer.getBootstrappedStream(source);
+ }
+
+ /**
+ * Register the coordinator stream consumer.
+ */
+ protected void registerCoordinatorStreamConsumer() {
+ coordinatorStreamConsumer.register();
+ }
+
+ /**
+ * Registers the coordinator stream producer for a given source.
+ * @param source the source to register
+ */
+ protected void registerCoordinatorStreamProducer(String source) {
+ coordinatorStreamProducer.register(source);
+ }
+
+ /**
+ * Returns the source name which is managed by {@link AbstractCoordinatorStreamManager}.
+ * @return the source name
+ */
+ protected String getSource() {
+ return source;
+ }
+
+ /**
+ * Registers a consumer and a produces. Every subclass should implement it's logic for registration.<br><br>
+ * Registering a single consumer and a single producer can be done with {@link AbstractCoordinatorStreamManager#registerCoordinatorStreamConsumer()}
+ * and {@link AbstractCoordinatorStreamManager#registerCoordinatorStreamProducer(String)} methods respectively.<br>
+ * These methods can be used in the concrete implementation of this register method.
+ *
+ * @param taskName name which should be used with the producer
+ */
+ public abstract void register(TaskName taskName);
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/59ec3b78/samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamMessage.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamMessage.java b/samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamMessage.java
deleted file mode 100644
index e5ab4fb..0000000
--- a/samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamMessage.java
+++ /dev/null
@@ -1,527 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.coordinator.stream;
-
-import java.net.InetAddress;
-import java.net.UnknownHostException;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Map;
-
-import org.apache.samza.checkpoint.Checkpoint;
-import org.apache.samza.system.SystemStreamPartition;
-import org.apache.samza.util.Util;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * <p>
- * Represents a message for the job coordinator. All messages in the coordinator
- * stream must wrap the CoordinatorStreamMessage class. Coordinator stream
- * messages are modeled as key/value pairs. The key is a list of well defined
- * fields: version, type, and key. The value is a map. There are some
- * pre-defined fields (such as timestamp, host, etc) for the value map, which
- * are common to all messages.
- * </p>
- *
- * <p>
- * The full structure for a CoordinatorStreamMessage is:
- * </p>
- *
- * <pre>
- * key => [1, "set-config", "job.name"]
- *
- * message => {
- * "host": "192.168.0.1",
- * "username": "criccomini",
- * "source": "job-runner",
- * "timestamp": 123456789,
- * "values": {
- * "value": "my-job-name"
- * }
- * }
- * </pre>
- *
- * Where the key's structure is:
- *
- * <pre>
- * key => [<version>, <type>, <key>]
- * </pre>
- *
- * <p>
- * Note that the white space in the above JSON blobs are done for legibility.
- * Over the wire, the JSON should be compact, and no unnecessary white space
- * should be used. This is extremely important for key serialization, since a
- * key with [1,"set-config","job.name"] and [1, "set-config", "job.name"] will
- * be evaluated as two different keys, and Kafka will not log compact them (if
- * Kafka is used as the underlying system for a coordinator stream).
- * </p>
- *
- * <p>
- * The "values" map in the message is defined on a per-message-type basis. For
- * set-config messages, there is just a single key/value pair, where the "value"
- * key is defined. For offset messages, there will be multiple key/values pairs
- * in "values" (one for each SystemStreamPartition/offset pair for a given
- * TaskName).
- * </p>
- *
- * <p>
- * The most important fields are type, key, and values. The type field (defined
- * as index 1 in the key list) defines the kind of message, the key (defined as
- * index 2 in the key list) defines a key to associate with the values, and the
- * values map defines a set of values associated with the type. A concrete
- * example would be a config message of type "set-config" with key "job.name"
- * and values {"value": "my-job-name"}.
- * </p>
- */
-public class CoordinatorStreamMessage {
- public static final int VERSION_INDEX = 0;
- public static final int TYPE_INDEX = 1;
- public static final int KEY_INDEX = 2;
-
- private static final Logger log = LoggerFactory.getLogger(CoordinatorStreamMessage.class);
-
- /**
- * Protocol version for coordinator stream messages. This version number must
- * be incremented any time new messages are added to the coordinator stream,
- * or changes are made to the key/message headers.
- */
- public static final int VERSION = 1;
-
- /**
- * Contains all key fields. Currently, this includes the type of the message,
- * the key associated with the type (e.g. type: set-config key: job.name), and
- * the version of the protocol. The indices are defined as the INDEX static
- * variables above.
- */
- private final Object[] keyArray;
-
- /**
- * Contains all fields for the message. This includes who sent the message,
- * the host, etc. It also includes a "values" map, which contains all values
- * associated with the key of the message. If set-config/job.name were used as
- * the type/key of the message, then values would contain
- * {"value":"my-job-name"}.
- */
- private final Map<String, Object> messageMap;
- private boolean isDelete;
-
- public CoordinatorStreamMessage(CoordinatorStreamMessage message) {
- this(message.getKeyArray(), message.getMessageMap());
- }
-
- public CoordinatorStreamMessage(Object[] keyArray, Map<String, Object> messageMap) {
- this.keyArray = keyArray;
- this.messageMap = messageMap;
- this.isDelete = messageMap == null;
- }
-
- public CoordinatorStreamMessage(String source) {
- this(source, new Object[] {Integer.valueOf(VERSION), null, null}, new HashMap<String, Object>());
- }
-
- public CoordinatorStreamMessage(String source, Object[] keyArray, Map<String, Object> messageMap) {
- this(keyArray, messageMap);
- if (!isDelete) {
- this.messageMap.put("values", new HashMap<String, String>());
- setSource(source);
- setUsername(System.getProperty("user.name"));
- setTimestamp(System.currentTimeMillis());
-
- try {
- setHost(InetAddress.getLocalHost().getHostAddress());
- } catch (UnknownHostException e) {
- log.warn("Unable to retrieve host for current machine. Setting coordinator stream message host field to an empty string.");
- setHost("");
- }
- }
-
- setVersion(VERSION);
- }
-
- protected void setIsDelete(boolean isDelete) {
- this.isDelete = isDelete;
- }
-
- protected void setHost(String host) {
- messageMap.put("host", host);
- }
-
- protected void setUsername(String username) {
- messageMap.put("username", username);
- }
-
- protected void setSource(String source) {
- messageMap.put("source", source);
- }
-
- protected void setTimestamp(long timestamp) {
- messageMap.put("timestamp", Long.valueOf(timestamp));
- }
-
- protected void setVersion(int version) {
- this.keyArray[VERSION_INDEX] = Integer.valueOf(version);
- }
-
- protected void setType(String type) {
- this.keyArray[TYPE_INDEX] = type;
- }
-
- protected void setKey(String key) {
- this.keyArray[KEY_INDEX] = key;
- }
-
- @SuppressWarnings("unchecked")
- protected Map<String, String> getMessageValues() {
- return (Map<String, String>) this.messageMap.get("values");
- }
-
- protected String getMessageValue(String key) {
- return getMessageValues().get(key);
- }
-
- /**
- * @param key
- * The key inside the messageMap, please only use human readable string (no JSON or such) - this allows
- * easy mutation of the coordinator stream outside of Samza (scripts)
- * @param value
- * The value corresponding to the key, should also be a simple string
- */
- protected void putMessageValue(String key, String value) {
- getMessageValues().put(key, value);
- }
-
- /**
- * The type of the message is used to convert a generic
- * CoordinatorStreamMessage into a specific message, such as a SetConfig
- * message.
- *
- * @return The type of the message.
- */
- public String getType() {
- return (String) this.keyArray[TYPE_INDEX];
- }
-
- /**
- * @return The whole key map including both the key and type of the message.
- */
- public Object[] getKeyArray() {
- return this.keyArray;
- }
-
- /**
- * @return Whether the message signifies a delete or not.
- */
- public boolean isDelete() {
- return isDelete;
- }
-
- /**
- * @return The username of a message.
- */
- public String getUsername() {
- return (String) this.messageMap.get("username");
- }
-
- /**
- * @return The timestamp of a message.
- */
- public long getTimestamp() {
- return (Long) this.messageMap.get("timestamp");
- }
-
- /**
- * @return The whole message map including header information.
- */
- public Map<String, Object> getMessageMap() {
- if (!isDelete) {
- Map<String, Object> immutableMap = new HashMap<String, Object>(messageMap);
- // To make sure the values is immutable, we overwrite it with an immutable version of the the values map.
- immutableMap.put("values", Collections.unmodifiableMap(getMessageValues()));
- return Collections.unmodifiableMap(immutableMap);
- } else {
- return null;
- }
- }
-
- /**
- * @return The source that sent the coordinator message. This is a string
- * defined by the sender.
- */
- public String getSource() {
- return (String) this.messageMap.get("source");
- }
-
- /**
- * @return The protocol version that the message conforms to.
- */
- public int getVersion() {
- return (Integer) this.keyArray[VERSION_INDEX];
- }
-
- /**
- * @return The key for a message. The key's meaning is defined by the type of
- * the message.
- */
- public String getKey() {
- return (String) this.keyArray[KEY_INDEX];
- }
-
- @Override
- public String toString() {
- return "CoordinatorStreamMessage [keyArray=" + Arrays.toString(keyArray) + ", messageMap=" + messageMap + ", isDelete=" + isDelete + "]";
- }
-
- @Override
- public int hashCode() {
- final int prime = 31;
- int result = 1;
- result = prime * result + (isDelete ? 1231 : 1237);
- result = prime * result + Arrays.hashCode(keyArray);
- result = prime * result + ((messageMap == null) ? 0 : messageMap.hashCode());
- return result;
- }
-
- @Override
- public boolean equals(Object obj) {
- if (this == obj)
- return true;
- if (obj == null)
- return false;
- if (getClass() != obj.getClass())
- return false;
- CoordinatorStreamMessage other = (CoordinatorStreamMessage) obj;
- if (isDelete != other.isDelete)
- return false;
- if (!Arrays.equals(keyArray, other.keyArray))
- return false;
- if (messageMap == null) {
- if (other.messageMap != null)
- return false;
- } else if (!messageMap.equals(other.messageMap))
- return false;
- return true;
- }
-
- /**
- * A coordinator stream message that tells the job coordinator to set a
- * specific configuration.
- */
- public static class SetConfig extends CoordinatorStreamMessage {
- public static final String TYPE = "set-config";
-
- public SetConfig(CoordinatorStreamMessage message) {
- super(message.getKeyArray(), message.getMessageMap());
- }
-
- public SetConfig(String source, String key, String value) {
- super(source);
- setType(TYPE);
- setKey(key);
- putMessageValue("value", value);
- }
-
- public String getConfigValue() {
- return (String) getMessageValue("value");
- }
- }
-
- public static class Delete extends CoordinatorStreamMessage {
- public Delete(String source, String key, String type) {
- this(source, key, type, VERSION);
- }
-
- /**
- * <p>
- * Delete messages must take the type of another CoordinatorStreamMessage
- * (e.g. SetConfig) to define the type of message that's being deleted.
- * Considering Kafka's log compaction, for example, the keys of a message
- * and its delete key must match exactly:
- * </p>
- *
- * <pre>
- * k=>[1,"job.name","set-config"] .. v=> {..some stuff..}
- * v=>[1,"job.name","set-config"] .. v=> null
- * </pre>
- *
- * <p>
- * Deletes are modeled as a CoordinatorStreamMessage with a null message
- * map, and a key that's identical to the key map that's to be deleted.
- * </p>
- *
- * @param source
- * The source ID of the sender of the delete message.
- * @param key
- * The key to delete.
- * @param type
- * The type of message to delete. Must correspond to one of hte
- * other CoordinatorStreamMessages.
- * @param version
- * The protocol version.
- */
- public Delete(String source, String key, String type, int version) {
- super(source);
- setType(type);
- setKey(key);
- setVersion(version);
- setIsDelete(true);
- }
- }
-
- /**
- * The SetCheckpoint is used to store the checkpoint messages for a particular task.
- * The structure looks like:
- * {
- * Key: TaskName
- * Type: set-checkpoint
- * Source: ContainerID
- * MessageMap:
- * {
- * SSP1 : offset,
- * SSP2 : offset
- * }
- * }
- */
- public static class SetCheckpoint extends CoordinatorStreamMessage {
- public static final String TYPE = "set-checkpoint";
-
- public SetCheckpoint(CoordinatorStreamMessage message) {
- super(message.getKeyArray(), message.getMessageMap());
- }
-
- /**
- *
- * @param source The source writing the checkpoint
- * @param key The key for the checkpoint message (Typically task name)
- * @param checkpoint Checkpoint message to be written to the stream
- */
- public SetCheckpoint(String source, String key, Checkpoint checkpoint) {
- super(source);
- setType(TYPE);
- setKey(key);
- Map<SystemStreamPartition, String> offsets = checkpoint.getOffsets();
- for (Map.Entry<SystemStreamPartition, String> systemStreamPartitionStringEntry : offsets.entrySet()) {
- putMessageValue(Util.sspToString(systemStreamPartitionStringEntry.getKey()), systemStreamPartitionStringEntry.getValue());
- }
- }
-
- public Checkpoint getCheckpoint() {
- Map<SystemStreamPartition, String> offsetMap = new HashMap<SystemStreamPartition, String>();
- for (Map.Entry<String, String> sspToOffsetEntry : getMessageValues().entrySet()) {
- offsetMap.put(Util.stringToSsp(sspToOffsetEntry.getKey()), sspToOffsetEntry.getValue());
- }
- return new Checkpoint(offsetMap);
- }
- }
-
- /**
- * The SetChanglog is used to store the changelog parition information for a particular task.
- * The structure looks like:
- * {
- * Key: TaskName
- * Type: set-changelog
- * Source: ContainerID
- * MessageMap:
- * {
- * "Partition" : partitionNumber (They key is just a dummy key here, the value contains the actual partition)
- * }
- * }
- */
- public static class SetChangelogMapping extends CoordinatorStreamMessage {
- public static final String TYPE = "set-changelog";
-
- public SetChangelogMapping(CoordinatorStreamMessage message) {
- super(message.getKeyArray(), message.getMessageMap());
- }
-
- /**
- *
- * @param source Source writing the change log mapping
- * @param taskName The task name to be used in the mapping
- * @param changelogPartitionNumber The partition to which the task's changelog is mapped to
- */
- public SetChangelogMapping(String source, String taskName, int changelogPartitionNumber) {
- super(source);
- setType(TYPE);
- setKey(taskName);
- putMessageValue("Partition", String.valueOf(changelogPartitionNumber));
- }
-
- public String getTaskName() {
- return getKey();
- }
-
- public int getPartition() {
- return Integer.parseInt(getMessageValue("Partition"));
- }
- }
-
- /**
- * SetContainerHostMapping is used internally by the samza framework to
- * persist the container-to-host mappings.
- *
- * Structure of the message looks like:
- * {
- * Key: $ContainerId
- * Type: set-container-host-assignment
- * Source: "SamzaContainer-$ContainerId"
- * MessageMap:
- * {
- * ip: InetAddressString,
- * jmx-url: jmxAddressString
- * jmx-tunneling-url: jmxTunnelingAddressString
- * }
- * }
- * */
- public static class SetContainerHostMapping extends CoordinatorStreamMessage {
- public static final String TYPE = "set-container-host-assignment";
- public static final String IP_KEY = "ip";
- public static final String JMX_URL_KEY = "jmx-url";
- public static final String JMX_TUNNELING_URL_KEY = "jmx-tunneling-url";
-
- public SetContainerHostMapping(CoordinatorStreamMessage message) {
- super(message.getKeyArray(), message.getMessageMap());
- }
-
- public SetContainerHostMapping(String source, String key, String hostHttpAddress, String jmxAddress, String jmxTunnelingAddress) {
- super(source);
- setType(TYPE);
- setKey(key);
- putMessageValue(IP_KEY, hostHttpAddress);
- putMessageValue(JMX_URL_KEY, jmxAddress);
- putMessageValue(JMX_TUNNELING_URL_KEY, jmxTunnelingAddress);
- }
-
- public String getHostLocality() {
- return getMessageValue(IP_KEY);
-
- }
-
- public String getJmxUrl() {
- return getMessageValue(JMX_URL_KEY);
- }
-
- public String getJmxTunnelingUrl() {
- return getMessageValue(JMX_TUNNELING_URL_KEY);
- }
-
- }
-}
http://git-wip-us.apache.org/repos/asf/samza/blob/59ec3b78/samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamSystemConsumer.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamSystemConsumer.java b/samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamSystemConsumer.java
index b1078bd..2277a73 100644
--- a/samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamSystemConsumer.java
+++ b/samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamSystemConsumer.java
@@ -30,6 +30,8 @@ import org.apache.samza.Partition;
import org.apache.samza.SamzaException;
import org.apache.samza.config.Config;
import org.apache.samza.config.MapConfig;
+import org.apache.samza.coordinator.stream.messages.CoordinatorStreamMessage;
+import org.apache.samza.coordinator.stream.messages.SetConfig;
import org.apache.samza.serializers.JsonSerde;
import org.apache.samza.serializers.Serde;
import org.apache.samza.system.IncomingMessageEnvelope;
@@ -146,12 +148,12 @@ public class CoordinatorStreamSystemConsumer {
CoordinatorStreamMessage coordinatorStreamMessage = new CoordinatorStreamMessage(keyArray, valueMap);
log.debug("Received coordinator stream message: {}", coordinatorStreamMessage);
bootstrappedStreamSet.add(coordinatorStreamMessage);
- if (CoordinatorStreamMessage.SetConfig.TYPE.equals(coordinatorStreamMessage.getType())) {
+ if (SetConfig.TYPE.equals(coordinatorStreamMessage.getType())) {
String configKey = coordinatorStreamMessage.getKey();
if (coordinatorStreamMessage.isDelete()) {
configMap.remove(configKey);
} else {
- String configValue = new CoordinatorStreamMessage.SetConfig(coordinatorStreamMessage).getConfigValue();
+ String configValue = new SetConfig(coordinatorStreamMessage).getConfigValue();
configMap.put(configKey, configValue);
}
}
http://git-wip-us.apache.org/repos/asf/samza/blob/59ec3b78/samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamSystemProducer.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamSystemProducer.java b/samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamSystemProducer.java
index 92f8907..42ae00b 100644
--- a/samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamSystemProducer.java
+++ b/samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamSystemProducer.java
@@ -25,6 +25,8 @@ import java.util.Map;
import org.apache.samza.SamzaException;
import org.apache.samza.config.Config;
+import org.apache.samza.coordinator.stream.messages.CoordinatorStreamMessage;
+import org.apache.samza.coordinator.stream.messages.SetConfig;
import org.apache.samza.serializers.JsonSerde;
import org.apache.samza.serializers.Serde;
import org.apache.samza.system.SystemAdmin;
@@ -126,7 +128,7 @@ public class CoordinatorStreamSystemProducer {
public void writeConfig(String source, Config config) {
log.debug("Writing config: {}", config);
for (Map.Entry<String, String> configPair : config.entrySet()) {
- send(new CoordinatorStreamMessage.SetConfig(source, configPair.getKey(), configPair.getValue()));
+ send(new SetConfig(source, configPair.getKey(), configPair.getValue()));
}
systemProducer.flush(source);
}
http://git-wip-us.apache.org/repos/asf/samza/blob/59ec3b78/samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamWriter.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamWriter.java b/samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamWriter.java
index f769756..77594dc 100644
--- a/samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamWriter.java
+++ b/samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamWriter.java
@@ -21,6 +21,7 @@ package org.apache.samza.coordinator.stream;
import joptsimple.OptionSet;
import org.apache.samza.config.Config;
+import org.apache.samza.coordinator.stream.messages.SetConfig;
import org.apache.samza.metrics.MetricsRegistryMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -40,7 +41,7 @@ public class CoordinatorStreamWriter {
private static final Logger log = LoggerFactory.getLogger(CoordinatorStreamWriter.class);
public final static String SOURCE = "coordinator-stream-writer";
- public static final String SET_CONFIG_TYPE = CoordinatorStreamMessage.SetConfig.TYPE;
+ public static final String SET_CONFIG_TYPE = SetConfig.TYPE;
private CoordinatorStreamSystemProducer coordinatorStreamSystemProducer;
@@ -94,7 +95,7 @@ public class CoordinatorStreamWriter {
*/
private void sendSetConfigMessage(String key, String value) {
log.info("sent SetConfig message with key = " + key + " and value = " + value);
- coordinatorStreamSystemProducer.send(new CoordinatorStreamMessage.SetConfig(CoordinatorStreamWriter.SOURCE, key, value));
+ coordinatorStreamSystemProducer.send(new SetConfig(CoordinatorStreamWriter.SOURCE, key, value));
}
/**
http://git-wip-us.apache.org/repos/asf/samza/blob/59ec3b78/samza-core/src/main/java/org/apache/samza/coordinator/stream/messages/CoordinatorStreamMessage.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/coordinator/stream/messages/CoordinatorStreamMessage.java b/samza-core/src/main/java/org/apache/samza/coordinator/stream/messages/CoordinatorStreamMessage.java
new file mode 100644
index 0000000..e3adc85
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/coordinator/stream/messages/CoordinatorStreamMessage.java
@@ -0,0 +1,321 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.coordinator.stream.messages;
+
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * <p>
+ * Represents a message for the job coordinator. All messages in the coordinator
+ * stream must wrap the CoordinatorStreamMessage class. Coordinator stream
+ * messages are modeled as key/value pairs. The key is a list of well defined
+ * fields: version, type, and key. The value is a map. There are some
+ * pre-defined fields (such as timestamp, host, etc) for the value map, which
+ * are common to all messages.
+ * </p>
+ *
+ * <p>
+ * The full structure for a CoordinatorStreamMessage is:
+ * </p>
+ *
+ * <pre>
+ * key => [1, "set-config", "job.name"]
+ *
+ * message => {
+ * "host": "192.168.0.1",
+ * "username": "criccomini",
+ * "source": "job-runner",
+ * "timestamp": 123456789,
+ * "values": {
+ * "value": "my-job-name"
+ * }
+ * }
+ * </pre>
+ *
+ * Where the key's structure is:
+ *
+ * <pre>
+ * key => [<version>, <type>, <key>]
+ * </pre>
+ *
+ * <p>
+ * Note that the white space in the above JSON blobs are done for legibility.
+ * Over the wire, the JSON should be compact, and no unnecessary white space
+ * should be used. This is extremely important for key serialization, since a
+ * key with [1,"set-config","job.name"] and [1, "set-config", "job.name"] will
+ * be evaluated as two different keys, and Kafka will not log compact them (if
+ * Kafka is used as the underlying system for a coordinator stream).
+ * </p>
+ *
+ * <p>
+ * The "values" map in the message is defined on a per-message-type basis. For
+ * set-config messages, there is just a single key/value pair, where the "value"
+ * key is defined. For offset messages, there will be multiple key/values pairs
+ * in "values" (one for each SystemStreamPartition/offset pair for a given
+ * TaskName).
+ * </p>
+ *
+ * <p>
+ * The most important fields are type, key, and values. The type field (defined
+ * as index 1 in the key list) defines the kind of message, the key (defined as
+ * index 2 in the key list) defines a key to associate with the values, and the
+ * values map defines a set of values associated with the type. A concrete
+ * example would be a config message of type "set-config" with key "job.name"
+ * and values {"value": "my-job-name"}.
+ * </p>
+ */
+public class CoordinatorStreamMessage {
+ public static final int VERSION_INDEX = 0;
+ public static final int TYPE_INDEX = 1;
+ public static final int KEY_INDEX = 2;
+
+ private static final Logger log = LoggerFactory.getLogger(CoordinatorStreamMessage.class);
+
+ /**
+ * Protocol version for coordinator stream messages. This version number must
+ * be incremented any time new messages are added to the coordinator stream,
+ * or changes are made to the key/message headers.
+ */
+ public static final int VERSION = 1;
+
+ /**
+ * Contains all key fields. Currently, this includes the type of the message,
+ * the key associated with the type (e.g. type: set-config key: job.name), and
+ * the version of the protocol. The indices are defined as the INDEX static
+ * variables above.
+ */
+ private final Object[] keyArray;
+
+ /**
+ * Contains all fields for the message. This includes who sent the message,
+ * the host, etc. It also includes a "values" map, which contains all values
+ * associated with the key of the message. If set-config/job.name were used as
+ * the type/key of the message, then values would contain
+ * {"value":"my-job-name"}.
+ */
+ private final Map<String, Object> messageMap;
+ private boolean isDelete;
+
+ public CoordinatorStreamMessage(CoordinatorStreamMessage message) {
+ this(message.getKeyArray(), message.getMessageMap());
+ }
+
+ public CoordinatorStreamMessage(Object[] keyArray, Map<String, Object> messageMap) {
+ this.keyArray = keyArray;
+ this.messageMap = messageMap;
+ this.isDelete = messageMap == null;
+ }
+
+ public CoordinatorStreamMessage(String source) {
+ this(source, new Object[] {Integer.valueOf(VERSION), null, null}, new HashMap<String, Object>());
+ }
+
+ public CoordinatorStreamMessage(String source, Object[] keyArray, Map<String, Object> messageMap) {
+ this(keyArray, messageMap);
+ if (!isDelete) {
+ this.messageMap.put("values", new HashMap<String, String>());
+ setSource(source);
+ setUsername(System.getProperty("user.name"));
+ setTimestamp(System.currentTimeMillis());
+
+ try {
+ setHost(InetAddress.getLocalHost().getHostAddress());
+ } catch (UnknownHostException e) {
+ log.warn("Unable to retrieve host for current machine. Setting coordinator stream message host field to an empty string.");
+ setHost("");
+ }
+ }
+
+ setVersion(VERSION);
+ }
+
+ protected void setIsDelete(boolean isDelete) {
+ this.isDelete = isDelete;
+ }
+
+ protected void setHost(String host) {
+ messageMap.put("host", host);
+ }
+
+ protected void setUsername(String username) {
+ messageMap.put("username", username);
+ }
+
+ protected void setSource(String source) {
+ messageMap.put("source", source);
+ }
+
+ protected void setTimestamp(long timestamp) {
+ messageMap.put("timestamp", Long.valueOf(timestamp));
+ }
+
+ protected void setVersion(int version) {
+ this.keyArray[VERSION_INDEX] = Integer.valueOf(version);
+ }
+
+ protected void setType(String type) {
+ this.keyArray[TYPE_INDEX] = type;
+ }
+
+ protected void setKey(String key) {
+ this.keyArray[KEY_INDEX] = key;
+ }
+
+ @SuppressWarnings("unchecked")
+ protected Map<String, String> getMessageValues() {
+ return (Map<String, String>) this.messageMap.get("values");
+ }
+
+ protected String getMessageValue(String key) {
+ return getMessageValues().get(key);
+ }
+
+ /**
+ * @param key
+ * The key inside the messageMap, please only use human readable string (no JSON or such) - this allows
+ * easy mutation of the coordinator stream outside of Samza (scripts)
+ * @param value
+ * The value corresponding to the key, should also be a simple string
+ */
+ protected void putMessageValue(String key, String value) {
+ getMessageValues().put(key, value);
+ }
+
+ /**
+ * The type of the message is used to convert a generic
+ * CoordinatorStreamMessage into a specific message, such as a SetConfig
+ * message.
+ *
+ * @return The type of the message.
+ */
+ public String getType() {
+ return (String) this.keyArray[TYPE_INDEX];
+ }
+
+ /**
+ * @return The whole key map including both the key and type of the message.
+ */
+ public Object[] getKeyArray() {
+ return this.keyArray;
+ }
+
+ /**
+ * @return Whether the message signifies a delete or not.
+ */
+ public boolean isDelete() {
+ return isDelete;
+ }
+
+ /**
+ * @return The username of a message.
+ */
+ public String getUsername() {
+ return (String) this.messageMap.get("username");
+ }
+
+ /**
+ * @return The timestamp of a message.
+ */
+ public long getTimestamp() {
+ return (Long) this.messageMap.get("timestamp");
+ }
+
+ /**
+ * @return The whole message map including header information.
+ */
+ public Map<String, Object> getMessageMap() {
+ if (!isDelete) {
+ Map<String, Object> immutableMap = new HashMap<String, Object>(messageMap);
+ // To make sure the values is immutable, we overwrite it with an immutable version of the the values map.
+ immutableMap.put("values", Collections.unmodifiableMap(getMessageValues()));
+ return Collections.unmodifiableMap(immutableMap);
+ } else {
+ return null;
+ }
+ }
+
+ /**
+ * @return The source that sent the coordinator message. This is a string
+ * defined by the sender.
+ */
+ public String getSource() {
+ return (String) this.messageMap.get("source");
+ }
+
+ /**
+ * @return The protocol version that the message conforms to.
+ */
+ public int getVersion() {
+ return (Integer) this.keyArray[VERSION_INDEX];
+ }
+
+ /**
+ * @return The key for a message. The key's meaning is defined by the type of
+ * the message.
+ */
+ public String getKey() {
+ return (String) this.keyArray[KEY_INDEX];
+ }
+
+ @Override
+ public String toString() {
+ return "CoordinatorStreamMessage [keyArray=" + Arrays.toString(keyArray) + ", messageMap=" + messageMap + ", isDelete=" + isDelete + "]";
+ }
+
+ @Override
+ public int hashCode() {
+ final int prime = 31;
+ int result = 1;
+ result = prime * result + (isDelete ? 1231 : 1237);
+ result = prime * result + Arrays.hashCode(keyArray);
+ result = prime * result + ((messageMap == null) ? 0 : messageMap.hashCode());
+ return result;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj)
+ return true;
+ if (obj == null)
+ return false;
+ if (getClass() != obj.getClass())
+ return false;
+ CoordinatorStreamMessage other = (CoordinatorStreamMessage) obj;
+ if (isDelete != other.isDelete)
+ return false;
+ if (!Arrays.equals(keyArray, other.keyArray))
+ return false;
+ if (messageMap == null) {
+ if (other.messageMap != null)
+ return false;
+ } else if (!messageMap.equals(other.messageMap))
+ return false;
+ return true;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/59ec3b78/samza-core/src/main/java/org/apache/samza/coordinator/stream/messages/Delete.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/coordinator/stream/messages/Delete.java b/samza-core/src/main/java/org/apache/samza/coordinator/stream/messages/Delete.java
new file mode 100644
index 0000000..964ab39
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/coordinator/stream/messages/Delete.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.coordinator.stream.messages;
+
+public class Delete extends CoordinatorStreamMessage {
+ public Delete(String source, String key, String type) {
+ this(source, key, type, VERSION);
+ }
+
+ /**
+ * <p>
+ * Delete messages must take the type of another CoordinatorStreamMessage
+ * (e.g. SetConfig) to define the type of message that's being deleted.
+ * Considering Kafka's log compaction, for example, the keys of a message
+ * and its delete key must match exactly:
+ * </p>
+ *
+ * <pre>
+ * k=>[1,"job.name","set-config"] .. v=> {..some stuff..}
+ * v=>[1,"job.name","set-config"] .. v=> null
+ * </pre>
+ *
+ * <p>
+ * Deletes are modeled as a CoordinatorStreamMessage with a null message
+ * map, and a key that's identical to the key map that's to be deleted.
+ * </p>
+ *
+ * @param source The source ID of the sender of the delete message.
+ * @param key The key to delete.
+ * @param type The type of message to delete. Must correspond to one of hte
+ * other CoordinatorStreamMessages.
+ * @param version The protocol version.
+ */
+ public Delete(String source, String key, String type, int version) {
+ super(source);
+ setType(type);
+ setKey(key);
+ setVersion(version);
+ setIsDelete(true);
+ }
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/59ec3b78/samza-core/src/main/java/org/apache/samza/coordinator/stream/messages/SetChangelogMapping.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/coordinator/stream/messages/SetChangelogMapping.java b/samza-core/src/main/java/org/apache/samza/coordinator/stream/messages/SetChangelogMapping.java
new file mode 100644
index 0000000..279ae06
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/coordinator/stream/messages/SetChangelogMapping.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.coordinator.stream.messages;
+
+/**
+ * The {@link SetChangelogMapping} message is used to store the changelog partition information for a particular task.
+ * The structure looks like:
+ * {
+ * Key: TaskName
+ * Type: set-changelog
+ * Source: ContainerID
+ * MessageMap:
+ * {
+ * "Partition" : partitionNumber (They key is just a dummy key here, the value contains the actual partition)
+ * }
+ * }
+ */
+public class SetChangelogMapping extends CoordinatorStreamMessage {
+ public static final String TYPE = "set-changelog";
+
+ private static final String CHANGELOG_VALUE_KEY = "Partition";
+
+ public SetChangelogMapping(CoordinatorStreamMessage message) {
+ super(message.getKeyArray(), message.getMessageMap());
+ }
+
+ /**
+ * The change log mapping message is used to store changelog partition information for a given task name.
+ * @param source Source writing the change log mapping
+ * @param taskName The task name to be used in the mapping
+ * @param changelogPartitionNumber The partition to which the task's changelog is mapped to
+ */
+ public SetChangelogMapping(String source, String taskName, int changelogPartitionNumber) {
+ super(source);
+ setType(TYPE);
+ setKey(taskName);
+ putMessageValue(CHANGELOG_VALUE_KEY, String.valueOf(changelogPartitionNumber));
+ }
+
+ public String getTaskName() {
+ return getKey();
+ }
+
+ public int getPartition() {
+ return Integer.parseInt(getMessageValue(CHANGELOG_VALUE_KEY));
+ }
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/59ec3b78/samza-core/src/main/java/org/apache/samza/coordinator/stream/messages/SetCheckpoint.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/coordinator/stream/messages/SetCheckpoint.java b/samza-core/src/main/java/org/apache/samza/coordinator/stream/messages/SetCheckpoint.java
new file mode 100644
index 0000000..21afa85
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/coordinator/stream/messages/SetCheckpoint.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.coordinator.stream.messages;
+
+import org.apache.samza.checkpoint.Checkpoint;
+import org.apache.samza.system.SystemStreamPartition;
+import org.apache.samza.util.Util;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * The SetCheckpoint is used to store the checkpoint messages for a particular task.
+ * The structure looks like:
+ * {
+ * Key: TaskName
+ * Type: set-checkpoint
+ * Source: ContainerID
+ * MessageMap:
+ * {
+ * SSP1 : offset,
+ * SSP2 : offset
+ * }
+ * }
+ */
+public class SetCheckpoint extends CoordinatorStreamMessage {
+ public static final String TYPE = "set-checkpoint";
+
+ public SetCheckpoint(CoordinatorStreamMessage message) {
+ super(message.getKeyArray(), message.getMessageMap());
+ }
+
+ /**
+ * The SetCheckpoint is used to store checkpoint message for a given task.
+ *
+ * @param source The source writing the checkpoint
+ * @param key The key for the checkpoint message (Typically task name)
+ * @param checkpoint Checkpoint message to be written to the stream
+ */
+ public SetCheckpoint(String source, String key, Checkpoint checkpoint) {
+ super(source);
+ setType(TYPE);
+ setKey(key);
+ Map<SystemStreamPartition, String> offsets = checkpoint.getOffsets();
+ for (Map.Entry<SystemStreamPartition, String> systemStreamPartitionStringEntry : offsets.entrySet()) {
+ putMessageValue(Util.sspToString(systemStreamPartitionStringEntry.getKey()), systemStreamPartitionStringEntry.getValue());
+ }
+ }
+
+ public Checkpoint getCheckpoint() {
+ Map<SystemStreamPartition, String> offsetMap = new HashMap<SystemStreamPartition, String>();
+ for (Map.Entry<String, String> sspToOffsetEntry : getMessageValues().entrySet()) {
+ offsetMap.put(Util.stringToSsp(sspToOffsetEntry.getKey()), sspToOffsetEntry.getValue());
+ }
+ return new Checkpoint(offsetMap);
+ }
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/59ec3b78/samza-core/src/main/java/org/apache/samza/coordinator/stream/messages/SetConfig.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/coordinator/stream/messages/SetConfig.java b/samza-core/src/main/java/org/apache/samza/coordinator/stream/messages/SetConfig.java
new file mode 100644
index 0000000..bbb12f7
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/coordinator/stream/messages/SetConfig.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.coordinator.stream.messages;
+
+/**
+ * A coordinator stream message that tells the job coordinator to set a
+ * specific configuration.
+ */
+public class SetConfig extends CoordinatorStreamMessage {
+ public static final String TYPE = "set-config";
+ private static final String CONFIG_VALUE_KEY = "value";
+
+ /**
+ * The SetConfig message is used to store a specific configuration.
+ * @param message message to store which holds the configuration.
+ */
+ public SetConfig(CoordinatorStreamMessage message) {
+ super(message.getKeyArray(), message.getMessageMap());
+ }
+
+ /**
+ * The SetConfig message is used to store a specific configuration.
+ * This constructor is used to create a SetConfig message for a given source for a specific config key and config value.
+ * @param source the source of the config
+ * @param key the key for the given config
+ * @param value the value for the given config
+ */
+ public SetConfig(String source, String key, String value) {
+ super(source);
+ setType(TYPE);
+ setKey(key);
+ putMessageValue(CONFIG_VALUE_KEY, value);
+ }
+
+ /**
+ * Return the configuration value as a string.
+ * @return the configuration as a string
+ */
+ public String getConfigValue() {
+ return getMessageValue(CONFIG_VALUE_KEY);
+ }
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/59ec3b78/samza-core/src/main/java/org/apache/samza/coordinator/stream/messages/SetContainerHostMapping.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/coordinator/stream/messages/SetContainerHostMapping.java b/samza-core/src/main/java/org/apache/samza/coordinator/stream/messages/SetContainerHostMapping.java
new file mode 100644
index 0000000..5455881
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/coordinator/stream/messages/SetContainerHostMapping.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.coordinator.stream.messages;
+
+/**
+ * SetContainerHostMapping is used internally by the Samza framework to
+ * persist the container-to-host mappings.
+ *
+ * Structure of the message looks like:
+ * {
+ * Key: $ContainerId
+ * Type: set-container-host-assignment
+ * Source: "SamzaContainer-$ContainerId"
+ * MessageMap:
+ * {
+ * ip: InetAddressString,
+ * jmx-url: jmxAddressString
+ * jmx-tunneling-url: jmxTunnelingAddressString
+ * }
+ * }
+ * */
+public class SetContainerHostMapping extends CoordinatorStreamMessage {
+ public static final String TYPE = "set-container-host-assignment";
+ public static final String IP_KEY = "ip";
+ public static final String JMX_URL_KEY = "jmx-url";
+ public static final String JMX_TUNNELING_URL_KEY = "jmx-tunneling-url";
+
+ /**
+ * SteContainerToHostMapping is used to set the container to host mapping information.
+ * @param message which holds the container to host information.
+ */
+ public SetContainerHostMapping(CoordinatorStreamMessage message) {
+ super(message.getKeyArray(), message.getMessageMap());
+ }
+
+ /**
+ * SteContainerToHostMapping is used to set the container to host mapping information.
+ * @param source the source of the message
+ * @param key the key which is used to persist the message
+ * @param hostHttpAddress the IP address of the container
+ * @param jmxAddress the JMX address of the container
+ * @param jmxTunnelingAddress the JMX tunneling address of the container
+ */
+ public SetContainerHostMapping(String source, String key, String hostHttpAddress, String jmxAddress, String jmxTunnelingAddress) {
+ super(source);
+ setType(TYPE);
+ setKey(key);
+ putMessageValue(IP_KEY, hostHttpAddress);
+ putMessageValue(JMX_URL_KEY, jmxAddress);
+ putMessageValue(JMX_TUNNELING_URL_KEY, jmxTunnelingAddress);
+ }
+
+ /**
+ * Returns the IP address of the container.
+ * @return the container IP address
+ */
+ public String getHostLocality() {
+ return getMessageValue(IP_KEY);
+
+ }
+
+ /**
+ * Returns the JMX url of the container.
+ * @return the JMX url
+ */
+ public String getJmxUrl() {
+ return getMessageValue(JMX_URL_KEY);
+ }
+
+ /**
+ * Returns the JMX tunneling url of the container
+ * @return the JMX tunneling url
+ */
+ public String getJmxTunnelingUrl() {
+ return getMessageValue(JMX_TUNNELING_URL_KEY);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/59ec3b78/samza-core/src/main/java/org/apache/samza/job/model/JobModel.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/job/model/JobModel.java b/samza-core/src/main/java/org/apache/samza/job/model/JobModel.java
index ad6387d..7b59274 100644
--- a/samza-core/src/main/java/org/apache/samza/job/model/JobModel.java
+++ b/samza-core/src/main/java/org/apache/samza/job/model/JobModel.java
@@ -74,7 +74,7 @@ public class JobModel {
* Returns the container to host mapping for a given container ID and mapping key
*
* @param containerId the ID of the container
- * @param key mapping key which is one of the keys declared in {@link org.apache.samza.coordinator.stream.CoordinatorStreamMessage.SetContainerHostMapping}
+ * @param key mapping key which is one of the keys declared in {@link org.apache.samza.coordinator.stream.messages.SetContainerHostMapping}
* @return the value if it exists for a given container and key, otherwise an empty string
*/
public String getContainerToHostValue(Integer containerId, String key) {
http://git-wip-us.apache.org/repos/asf/samza/blob/59ec3b78/samza-core/src/main/java/org/apache/samza/storage/ChangelogPartitionManager.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/storage/ChangelogPartitionManager.java b/samza-core/src/main/java/org/apache/samza/storage/ChangelogPartitionManager.java
index 7d3409c..7e274c7 100644
--- a/samza-core/src/main/java/org/apache/samza/storage/ChangelogPartitionManager.java
+++ b/samza-core/src/main/java/org/apache/samza/storage/ChangelogPartitionManager.java
@@ -21,12 +21,12 @@ package org.apache.samza.storage;
import java.util.HashMap;
import java.util.Map;
-import java.util.Set;
import org.apache.samza.container.TaskName;
-import org.apache.samza.coordinator.stream.CoordinatorStreamMessage;
-import org.apache.samza.coordinator.stream.CoordinatorStreamMessage.SetChangelogMapping;
+import org.apache.samza.coordinator.stream.messages.CoordinatorStreamMessage;
+import org.apache.samza.coordinator.stream.messages.SetChangelogMapping;
import org.apache.samza.coordinator.stream.CoordinatorStreamSystemConsumer;
import org.apache.samza.coordinator.stream.CoordinatorStreamSystemProducer;
+import org.apache.samza.coordinator.stream.AbstractCoordinatorStreamManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -34,36 +34,15 @@ import org.slf4j.LoggerFactory;
/**
* The Changelog manager is used to persist and read the changelog information from the coordinator stream.
*/
-public class ChangelogPartitionManager {
+public class ChangelogPartitionManager extends AbstractCoordinatorStreamManager {
private static final Logger log = LoggerFactory.getLogger(ChangelogPartitionManager.class);
- private final CoordinatorStreamSystemProducer coordinatorStreamProducer;
- private final CoordinatorStreamSystemConsumer coordinatorStreamConsumer;
private boolean isCoordinatorConsumerRegistered = false;
- private String source;
-
- public ChangelogPartitionManager(CoordinatorStreamSystemProducer coordinatorStreamProducer,
- CoordinatorStreamSystemConsumer coordinatorStreamConsumer) {
- this.coordinatorStreamConsumer = coordinatorStreamConsumer;
- this.coordinatorStreamProducer = coordinatorStreamProducer;
- this.source = "Unknown";
- }
public ChangelogPartitionManager(CoordinatorStreamSystemProducer coordinatorStreamProducer,
CoordinatorStreamSystemConsumer coordinatorStreamConsumer,
String source) {
- this(coordinatorStreamProducer, coordinatorStreamConsumer);
- this.source = source;
- }
-
- public void start() {
- coordinatorStreamProducer.start();
- coordinatorStreamConsumer.start();
- }
-
- public void stop() {
- coordinatorStreamConsumer.stop();
- coordinatorStreamProducer.stop();
+ super(coordinatorStreamProducer, coordinatorStreamConsumer, source);
}
/**
@@ -73,10 +52,10 @@ public class ChangelogPartitionManager {
public void register(TaskName taskName) {
log.debug("Adding taskName {} to {}", taskName, this);
if (!isCoordinatorConsumerRegistered) {
- coordinatorStreamConsumer.register();
+ registerCoordinatorStreamConsumer();
isCoordinatorConsumerRegistered = true;
}
- coordinatorStreamProducer.register(taskName.getTaskName());
+ registerCoordinatorStreamProducer(taskName.getTaskName());
}
/**
@@ -85,9 +64,8 @@ public class ChangelogPartitionManager {
*/
public Map<TaskName, Integer> readChangeLogPartitionMapping() {
log.debug("Reading changelog partition information");
- Set<CoordinatorStreamMessage> bootstrappedStream = coordinatorStreamConsumer.getBootstrappedStream(SetChangelogMapping.TYPE);
- HashMap<TaskName, Integer> changelogMapping = new HashMap<TaskName, Integer>();
- for (CoordinatorStreamMessage coordinatorStreamMessage : bootstrappedStream) {
+ final HashMap<TaskName, Integer> changelogMapping = new HashMap<TaskName, Integer>();
+ for (CoordinatorStreamMessage coordinatorStreamMessage : getBootstrappedStream(SetChangelogMapping.TYPE)) {
SetChangelogMapping changelogMapEntry = new SetChangelogMapping(coordinatorStreamMessage);
changelogMapping.put(new TaskName(changelogMapEntry.getTaskName()), changelogMapEntry.getPartition());
log.debug("TaskName: {} is mapped to {}", changelogMapEntry.getTaskName(), changelogMapEntry.getPartition());
@@ -104,10 +82,7 @@ public class ChangelogPartitionManager {
log.debug("Updating changelog information with: ");
for (Map.Entry<TaskName, Integer> entry : changelogEntries.entrySet()) {
log.debug("TaskName: {} to Partition: {}", entry.getKey().getTaskName(), entry.getValue());
- SetChangelogMapping changelogMapping = new SetChangelogMapping(source,
- entry.getKey().getTaskName(),
- entry.getValue());
- coordinatorStreamProducer.send(changelogMapping);
+ send(new SetChangelogMapping(getSource(), entry.getKey().getTaskName(), entry.getValue()));
}
}
http://git-wip-us.apache.org/repos/asf/samza/blob/59ec3b78/samza-core/src/main/scala/org/apache/samza/coordinator/JobCoordinator.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/coordinator/JobCoordinator.scala b/samza-core/src/main/scala/org/apache/samza/coordinator/JobCoordinator.scala
index 0dbf14b..ea2eaa5 100644
--- a/samza-core/src/main/scala/org/apache/samza/coordinator/JobCoordinator.scala
+++ b/samza-core/src/main/scala/org/apache/samza/coordinator/JobCoordinator.scala
@@ -322,4 +322,4 @@ class JobCoordinator(
info("Stopped checkpoint manager.")
}
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/59ec3b78/samza-core/src/main/scala/org/apache/samza/job/JobRunner.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/job/JobRunner.scala b/samza-core/src/main/scala/org/apache/samza/job/JobRunner.scala
index 1c178a6..d7c928c 100644
--- a/samza-core/src/main/scala/org/apache/samza/job/JobRunner.scala
+++ b/samza-core/src/main/scala/org/apache/samza/job/JobRunner.scala
@@ -22,6 +22,7 @@ package org.apache.samza.job
import org.apache.samza.SamzaException
import org.apache.samza.config.Config
import org.apache.samza.config.JobConfig.Config2Job
+import org.apache.samza.coordinator.stream.messages.{Delete, SetConfig, CoordinatorStreamMessage}
import org.apache.samza.job.ApplicationStatus.Running
import org.apache.samza.util.CommandLine
import org.apache.samza.util.Logging
@@ -32,8 +33,6 @@ import org.apache.samza.config.ConfigException
import org.apache.samza.config.SystemConfig
import org.apache.samza.system.SystemFactory
import org.apache.samza.metrics.MetricsRegistryMap
-import org.apache.samza.coordinator.stream.CoordinatorStreamMessage
-import org.apache.samza.coordinator.stream.CoordinatorStreamMessage.SetConfig
import org.apache.samza.coordinator.stream.CoordinatorStreamSystemProducer
import org.apache.samza.system.SystemStream
import org.apache.samza.coordinator.stream.CoordinatorStreamSystemFactory
@@ -85,7 +84,7 @@ class JobRunner(config: Config) extends Logging {
val oldConfig = coordinatorSystemConsumer.getConfig();
info("Deleting old configs that are no longer defined: %s".format(oldConfig.keySet -- config.keySet))
(oldConfig.keySet -- config.keySet).foreach(key => {
- coordinatorSystemProducer.send(new CoordinatorStreamMessage.Delete(JobRunner.SOURCE, key, SetConfig.TYPE))
+ coordinatorSystemProducer.send(new Delete(JobRunner.SOURCE, key, SetConfig.TYPE))
})
coordinatorSystemProducer.stop
http://git-wip-us.apache.org/repos/asf/samza/blob/59ec3b78/samza-core/src/test/java/org/apache/samza/coordinator/stream/MockCoordinatorStreamWrappedConsumer.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/coordinator/stream/MockCoordinatorStreamWrappedConsumer.java b/samza-core/src/test/java/org/apache/samza/coordinator/stream/MockCoordinatorStreamWrappedConsumer.java
index e454593..47a44b1 100644
--- a/samza-core/src/test/java/org/apache/samza/coordinator/stream/MockCoordinatorStreamWrappedConsumer.java
+++ b/samza-core/src/test/java/org/apache/samza/coordinator/stream/MockCoordinatorStreamWrappedConsumer.java
@@ -27,6 +27,9 @@ import java.util.concurrent.CountDownLatch;
import org.apache.samza.SamzaException;
import org.apache.samza.checkpoint.Checkpoint;
import org.apache.samza.config.Config;
+import org.apache.samza.coordinator.stream.messages.SetChangelogMapping;
+import org.apache.samza.coordinator.stream.messages.SetCheckpoint;
+import org.apache.samza.coordinator.stream.messages.SetConfig;
import org.apache.samza.serializers.model.SamzaObjectMapper;
import org.apache.samza.system.IncomingMessageEnvelope;
import org.apache.samza.system.SystemStreamPartition;
@@ -76,17 +79,17 @@ public class MockCoordinatorStreamWrappedConsumer extends BlockingEnvelopeMap {
HashMap<SystemStreamPartition, String> checkpointMap = new HashMap<SystemStreamPartition, String>();
checkpointMap.put(Util.stringToSsp(sspOffsetPair[0]), sspOffsetPair[1]);
Checkpoint cp = new Checkpoint(checkpointMap);
- CoordinatorStreamMessage.SetCheckpoint setCheckpoint = new CoordinatorStreamMessage.SetCheckpoint(checkpointInfo[1], checkpointInfo[2], cp);
+ SetCheckpoint setCheckpoint = new SetCheckpoint(checkpointInfo[1], checkpointInfo[2], cp);
keyBytes = MAPPER.writeValueAsString(setCheckpoint.getKeyArray()).getBytes("UTF-8");
messgeBytes = MAPPER.writeValueAsString(setCheckpoint.getMessageMap()).getBytes("UTF-8");
} else if (configPair.getKey().startsWith(CHANGELOGPREFIX)) {
String[] changelogInfo = configPair.getKey().split(":");
String changeLogPartition = configPair.getValue();
- CoordinatorStreamMessage.SetChangelogMapping changelogMapping = new CoordinatorStreamMessage.SetChangelogMapping(changelogInfo[1], changelogInfo[2], Integer.parseInt(changeLogPartition));
+ SetChangelogMapping changelogMapping = new SetChangelogMapping(changelogInfo[1], changelogInfo[2], Integer.parseInt(changeLogPartition));
keyBytes = MAPPER.writeValueAsString(changelogMapping.getKeyArray()).getBytes("UTF-8");
messgeBytes = MAPPER.writeValueAsString(changelogMapping.getMessageMap()).getBytes("UTF-8");
} else {
- CoordinatorStreamMessage.SetConfig setConfig = new CoordinatorStreamMessage.SetConfig("source", configPair.getKey(), configPair.getValue());
+ SetConfig setConfig = new SetConfig("source", configPair.getKey(), configPair.getValue());
keyBytes = MAPPER.writeValueAsString(setConfig.getKeyArray()).getBytes("UTF-8");
messgeBytes = MAPPER.writeValueAsString(setConfig.getMessageMap()).getBytes("UTF-8");
}
http://git-wip-us.apache.org/repos/asf/samza/blob/59ec3b78/samza-core/src/test/java/org/apache/samza/coordinator/stream/TestCoordinatorStreamMessage.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/coordinator/stream/TestCoordinatorStreamMessage.java b/samza-core/src/test/java/org/apache/samza/coordinator/stream/TestCoordinatorStreamMessage.java
index ac26a01..60e2e5d 100644
--- a/samza-core/src/test/java/org/apache/samza/coordinator/stream/TestCoordinatorStreamMessage.java
+++ b/samza-core/src/test/java/org/apache/samza/coordinator/stream/TestCoordinatorStreamMessage.java
@@ -24,6 +24,10 @@ import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.assertFalse;
+
+import org.apache.samza.coordinator.stream.messages.CoordinatorStreamMessage;
+import org.apache.samza.coordinator.stream.messages.Delete;
+import org.apache.samza.coordinator.stream.messages.SetConfig;
import org.junit.Test;
public class TestCoordinatorStreamMessage {
@@ -48,8 +52,8 @@ public class TestCoordinatorStreamMessage {
@Test
public void testSetConfig() {
- CoordinatorStreamMessage.SetConfig setConfig = new CoordinatorStreamMessage.SetConfig("source", "key", "value");
- assertEquals(CoordinatorStreamMessage.SetConfig.TYPE, setConfig.getType());
+ SetConfig setConfig = new SetConfig("source", "key", "value");
+ assertEquals(SetConfig.TYPE, setConfig.getType());
assertEquals("key", setConfig.getKey());
assertEquals("value", setConfig.getConfigValue());
assertFalse(setConfig.isDelete());
@@ -58,7 +62,7 @@ public class TestCoordinatorStreamMessage {
@Test
public void testDelete() {
- CoordinatorStreamMessage.Delete delete = new CoordinatorStreamMessage.Delete("source2", "key", "delete-type");
+ Delete delete = new Delete("source2", "key", "delete-type");
assertEquals("delete-type", delete.getType());
assertEquals("key", delete.getKey());
assertNull(delete.getMessageMap());
http://git-wip-us.apache.org/repos/asf/samza/blob/59ec3b78/samza-core/src/test/java/org/apache/samza/coordinator/stream/TestCoordinatorStreamSystemConsumer.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/coordinator/stream/TestCoordinatorStreamSystemConsumer.java b/samza-core/src/test/java/org/apache/samza/coordinator/stream/TestCoordinatorStreamSystemConsumer.java
index c25f6a7..370cfb7 100644
--- a/samza-core/src/test/java/org/apache/samza/coordinator/stream/TestCoordinatorStreamSystemConsumer.java
+++ b/samza-core/src/test/java/org/apache/samza/coordinator/stream/TestCoordinatorStreamSystemConsumer.java
@@ -33,6 +33,9 @@ import java.util.Set;
import org.apache.samza.Partition;
import org.apache.samza.SamzaException;
+import org.apache.samza.coordinator.stream.messages.CoordinatorStreamMessage;
+import org.apache.samza.coordinator.stream.messages.Delete;
+import org.apache.samza.coordinator.stream.messages.SetConfig;
import org.apache.samza.serializers.model.SamzaObjectMapper;
import org.apache.samza.system.IncomingMessageEnvelope;
import org.apache.samza.system.SystemConsumer;
@@ -72,9 +75,9 @@ public class TestCoordinatorStreamSystemConsumer {
private boolean testOrder(Set<CoordinatorStreamMessage> bootstrappedStreamSet) {
int initialSize = bootstrappedStreamSet.size();
List<CoordinatorStreamMessage> listStreamMessages = new ArrayList<CoordinatorStreamMessage>();
- listStreamMessages.add(new CoordinatorStreamMessage.SetConfig("order1", "job.name.order1", "my-order1-name"));
- listStreamMessages.add(new CoordinatorStreamMessage.SetConfig("order2", "job.name.order2", "my-order2-name"));
- listStreamMessages.add(new CoordinatorStreamMessage.SetConfig("order3", "job.name.order3", "my-order3-name"));
+ listStreamMessages.add(new SetConfig("order1", "job.name.order1", "my-order1-name"));
+ listStreamMessages.add(new SetConfig("order2", "job.name.order2", "my-order2-name"));
+ listStreamMessages.add(new SetConfig("order3", "job.name.order3", "my-order3-name"));
bootstrappedStreamSet.addAll(listStreamMessages);
Iterator<CoordinatorStreamMessage> iter = bootstrappedStreamSet.iterator();
@@ -126,9 +129,9 @@ public class TestCoordinatorStreamSystemConsumer {
if (pollCount++ == 0) {
List<IncomingMessageEnvelope> list = new ArrayList<IncomingMessageEnvelope>();
- CoordinatorStreamMessage.SetConfig setConfig1 = new CoordinatorStreamMessage.SetConfig("test", "job.name", "my-job-name");
- CoordinatorStreamMessage.SetConfig setConfig2 = new CoordinatorStreamMessage.SetConfig("test", "job.id", "1234");
- CoordinatorStreamMessage.Delete delete = new CoordinatorStreamMessage.Delete("test", "job.name", CoordinatorStreamMessage.SetConfig.TYPE);
+ SetConfig setConfig1 = new SetConfig("test", "job.name", "my-job-name");
+ SetConfig setConfig2 = new SetConfig("test", "job.id", "1234");
+ Delete delete = new Delete("test", "job.name", SetConfig.TYPE);
list.add(new IncomingMessageEnvelope(systemStreamPartition, null, serialize(setConfig1.getKeyArray()), serialize(setConfig1.getMessageMap())));
list.add(new IncomingMessageEnvelope(systemStreamPartition, null, serialize(setConfig2.getKeyArray()), serialize(setConfig2.getMessageMap())));
list.add(new IncomingMessageEnvelope(systemStreamPartition, null, serialize(delete.getKeyArray()), delete.getMessageMap()));
http://git-wip-us.apache.org/repos/asf/samza/blob/59ec3b78/samza-core/src/test/java/org/apache/samza/coordinator/stream/TestCoordinatorStreamSystemProducer.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/coordinator/stream/TestCoordinatorStreamSystemProducer.java b/samza-core/src/test/java/org/apache/samza/coordinator/stream/TestCoordinatorStreamSystemProducer.java
index 1ef07d0..9e5e06c 100644
--- a/samza-core/src/test/java/org/apache/samza/coordinator/stream/TestCoordinatorStreamSystemProducer.java
+++ b/samza-core/src/test/java/org/apache/samza/coordinator/stream/TestCoordinatorStreamSystemProducer.java
@@ -27,6 +27,9 @@ import java.util.List;
import java.util.Map;
import org.apache.samza.SamzaException;
+import org.apache.samza.coordinator.stream.messages.CoordinatorStreamMessage;
+import org.apache.samza.coordinator.stream.messages.Delete;
+import org.apache.samza.coordinator.stream.messages.SetConfig;
import org.apache.samza.serializers.model.SamzaObjectMapper;
import org.apache.samza.system.SystemAdmin;
import org.apache.samza.system.OutgoingMessageEnvelope;
@@ -43,9 +46,9 @@ public class TestCoordinatorStreamSystemProducer {
MockCoordinatorSystemProducer systemProducer = new MockCoordinatorSystemProducer(source);
MockSystemAdmin systemAdmin = new MockSystemAdmin();
CoordinatorStreamSystemProducer producer = new CoordinatorStreamSystemProducer(systemStream, systemProducer, systemAdmin);
- CoordinatorStreamMessage.SetConfig setConfig1 = new CoordinatorStreamMessage.SetConfig(source, "job.name", "my-job-name");
- CoordinatorStreamMessage.SetConfig setConfig2 = new CoordinatorStreamMessage.SetConfig(source, "job.id", "1234");
- CoordinatorStreamMessage.Delete delete = new CoordinatorStreamMessage.Delete(source, "job.name", CoordinatorStreamMessage.SetConfig.TYPE);
+ SetConfig setConfig1 = new SetConfig(source, "job.name", "my-job-name");
+ SetConfig setConfig2 = new SetConfig(source, "job.id", "1234");
+ Delete delete = new Delete(source, "job.name", SetConfig.TYPE);
assertFalse(systemProducer.isRegistered());
producer.register(source);
assertTrue(systemProducer.isRegistered());