You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by jo...@apache.org on 2014/12/08 21:29:41 UTC

[18/51] [partial] incubator-nifi git commit: Initial code contribution

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/NodeProtocolSender.java
----------------------------------------------------------------------
diff --git a/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/NodeProtocolSender.java b/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/NodeProtocolSender.java
new file mode 100644
index 0000000..1edcb91
--- /dev/null
+++ b/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/NodeProtocolSender.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.cluster.protocol;
+
+import org.apache.nifi.cluster.protocol.message.ConnectionRequestMessage;
+import org.apache.nifi.cluster.protocol.message.ConnectionResponseMessage;
+import org.apache.nifi.cluster.protocol.message.ControllerStartupFailureMessage;
+import org.apache.nifi.cluster.protocol.message.HeartbeatMessage;
+import org.apache.nifi.cluster.protocol.message.NodeBulletinsMessage;
+import org.apache.nifi.cluster.protocol.message.ReconnectionFailureMessage;
+
+/**
+ * An interface for sending protocol messages from a node to the cluster manager.
+ * @author unattributed
+ */
+public interface NodeProtocolSender {
+    
+    /**
+     * Sends a "connection request" message to the cluster manager.
+     * @param msg a message
+     * @return the response
+     * @throws UnknownServiceAddressException if the cluster manager's address is not known
+     * @throws ProtocolException if communication failed
+     */
+    ConnectionResponseMessage requestConnection(ConnectionRequestMessage msg) throws ProtocolException, UnknownServiceAddressException;
+    
+    /**
+     * Sends a "heartbeat" message to the cluster manager.
+     * @param msg a message
+     * @throws UnknownServiceAddressException if the cluster manager's address is not known
+     * @throws ProtocolException if communication failed
+     */
+    void heartbeat(HeartbeatMessage msg) throws ProtocolException, UnknownServiceAddressException;
+    
+    /**
+     * Sends a bulletins message to the cluster manager.
+     * @param msg
+     * @throws ProtocolException
+     * @throws UnknownServiceAddressException 
+     */
+    void sendBulletins(NodeBulletinsMessage msg) throws ProtocolException, UnknownServiceAddressException;
+    
+    /**
+     * Sends a failure notification if the controller was unable start.
+     * @param msg a message
+     * @throws UnknownServiceAddressException if the cluster manager's address is not known
+     * @throws ProtocolException if communication failed
+     */
+    void notifyControllerStartupFailure(ControllerStartupFailureMessage msg) throws ProtocolException, UnknownServiceAddressException;
+    
+    /**
+     * Sends a failure notification if the node was unable to reconnect to the cluster
+     * @param msg a message
+     * @throws UnknownServiceAddressException if the cluster manager's address is not known
+     * @throws ProtocolException if communication failed
+     */
+    void notifyReconnectionFailure(ReconnectionFailureMessage msg) throws ProtocolException, UnknownServiceAddressException;
+    
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/ProtocolContext.java
----------------------------------------------------------------------
diff --git a/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/ProtocolContext.java b/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/ProtocolContext.java
new file mode 100644
index 0000000..b614e76
--- /dev/null
+++ b/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/ProtocolContext.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.cluster.protocol;
+
+/**
+ * The context for communicating using the internal cluster protocol. 
+ * 
+ * @param <T> The type of protocol message.
+ * 
+ * @author unattributed
+ */
+public interface ProtocolContext<T> {
+ 
+    /**
+     * Creates a marshaller for serializing protocol messages.
+     * @return a marshaller
+     */
+    ProtocolMessageMarshaller<T> createMarshaller();
+    
+    /**
+     * Creates an unmarshaller for deserializing protocol messages.
+     * @return a unmarshaller
+     */
+    ProtocolMessageUnmarshaller<T> createUnmarshaller();
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/ProtocolException.java
----------------------------------------------------------------------
diff --git a/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/ProtocolException.java b/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/ProtocolException.java
new file mode 100644
index 0000000..f11ad84
--- /dev/null
+++ b/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/ProtocolException.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.cluster.protocol;
+
+/**
+ * The base exception for problems encountered while communicating within the
+ * cluster.
+ * @author unattributed
+ */
+public class ProtocolException extends RuntimeException {
+    
+    public ProtocolException() {
+    }
+    
+    public ProtocolException(String msg) {
+        super(msg);
+    }
+    
+    public ProtocolException(Throwable cause) {
+        super(cause);
+    }
+    
+    public ProtocolException(String msg, Throwable cause) {
+        super(msg, cause);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/ProtocolHandler.java
----------------------------------------------------------------------
diff --git a/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/ProtocolHandler.java b/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/ProtocolHandler.java
new file mode 100644
index 0000000..6de87db
--- /dev/null
+++ b/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/ProtocolHandler.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.cluster.protocol;
+
+import org.apache.nifi.cluster.protocol.message.ProtocolMessage;
+
+/**
+ * A handler for processing protocol messages.
+ * @author unattributed
+ */
+public interface ProtocolHandler {
+    
+    /**
+     * Handles the given protocol message or throws an exception if it cannot
+     * handle the message.  If no response is needed by the protocol, then null
+     * should be returned.
+     * 
+     * @param msg a message
+     * @return a response or null, if no response is necessary
+     * 
+     * @throws ProtocolException if the message could not be processed
+     */
+    ProtocolMessage handle(ProtocolMessage msg) throws ProtocolException;
+    
+    /**
+     * @param msg
+     * @return true if the handler can process the given message; false otherwise
+     */
+    boolean canHandle(ProtocolMessage msg);
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/ProtocolListener.java
----------------------------------------------------------------------
diff --git a/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/ProtocolListener.java b/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/ProtocolListener.java
new file mode 100644
index 0000000..32f0f5d
--- /dev/null
+++ b/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/ProtocolListener.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.cluster.protocol;
+
+import java.io.IOException;
+import java.util.Collection;
+
+import org.apache.nifi.reporting.BulletinRepository;
+
+/**
+ * Defines the interface for a listener to process protocol messages.
+ * @author unattributed
+ */
+public interface ProtocolListener {
+    
+    /**
+     * Starts the instance for listening for messages.  Start may only be called
+     * if the instance is not running.
+     * @throws java.io.IOException
+     */
+    void start() throws IOException;
+    
+    /**
+     * Stops the instance from listening for messages.  Stop may only be called
+     * if the instance is running.
+     * @throws java.io.IOException
+     */
+    void stop() throws IOException;
+    
+    /**
+     * @return true if the instance is started; false otherwise.
+     */
+    boolean isRunning();
+    
+    /**
+     * @return the handlers registered with the listener
+     */
+    Collection<ProtocolHandler> getHandlers();
+    
+    /**
+     * Registers a handler with the listener.
+     * @param handler a handler
+     */
+    void addHandler(ProtocolHandler handler);
+    
+    /**
+     * Sets the BulletinRepository that can be used to report bulletins
+     * @param bulletinRepository
+     */
+    void setBulletinRepository(BulletinRepository bulletinRepository);
+    
+    /**
+     * Unregisters the handler with the listener.
+     * @param handler a handler
+     * @return true if the handler was removed; false otherwise
+     */
+    boolean removeHandler(ProtocolHandler handler);
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/ProtocolMessageMarshaller.java
----------------------------------------------------------------------
diff --git a/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/ProtocolMessageMarshaller.java b/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/ProtocolMessageMarshaller.java
new file mode 100644
index 0000000..bb436e0
--- /dev/null
+++ b/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/ProtocolMessageMarshaller.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.cluster.protocol;
+
+import java.io.IOException;
+import java.io.OutputStream;
+
+/**
+ * Defines a marshaller for serializing protocol messages.
+ * 
+ * @param <T> The type of protocol message.
+ * 
+ * @author unattributed
+ */
+public interface ProtocolMessageMarshaller<T> {
+    
+    /**
+     * Serializes the given message to the given output stream.
+     * @param msg a message
+     * @param os an output stream
+     * @throws IOException if the message could not be serialized to the stream
+     */
+    void marshal(T msg, OutputStream os) throws IOException;
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/ProtocolMessageUnmarshaller.java
----------------------------------------------------------------------
diff --git a/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/ProtocolMessageUnmarshaller.java b/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/ProtocolMessageUnmarshaller.java
new file mode 100644
index 0000000..c690e7b
--- /dev/null
+++ b/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/ProtocolMessageUnmarshaller.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.cluster.protocol;
+
+import java.io.IOException;
+import java.io.InputStream;
+
+/**
+ * Defines an unmarshaller for deserializing protocol messages.
+ * 
+ * @param <T> The type of protocol message.
+ * 
+ * @author unattributed
+ */
+public interface ProtocolMessageUnmarshaller<T> {
+    
+    /**
+     * Deserializes a message on the given input stream.
+     * @param is an input stream
+     * @return 
+     * @throws IOException if the message could not be deserialized from the stream
+     */
+    T unmarshal(InputStream is) throws IOException;
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/StandardDataFlow.java
----------------------------------------------------------------------
diff --git a/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/StandardDataFlow.java b/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/StandardDataFlow.java
new file mode 100644
index 0000000..c2d16fc
--- /dev/null
+++ b/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/StandardDataFlow.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.cluster.protocol;
+
+import org.apache.nifi.cluster.protocol.DataFlow;
+import java.io.Serializable;
+import java.util.Arrays;
+
+import javax.xml.bind.annotation.adapters.XmlJavaTypeAdapter;
+
+import org.apache.nifi.cluster.protocol.jaxb.message.DataFlowAdapter;
+
+/**
+ * Represents a dataflow, which includes the raw bytes of the flow.xml and 
+ * whether processors should be started automatically at application startup.
+ */
+@XmlJavaTypeAdapter(DataFlowAdapter.class)
+public class StandardDataFlow implements Serializable, DataFlow {
+    
+    private final byte[] flow;
+    private final byte[] templateBytes;
+    private final byte[] snippetBytes;
+
+    private boolean autoStartProcessors;
+    
+    /**
+     * Constructs an instance.  
+     * 
+     * @param flow a valid flow as bytes, which cannot be null
+     * @param templateBytes an XML representation of templates
+     * @param snippetBytes an XML representation of snippets
+     * 
+     * @throws NullPointerException if any argument is null
+     */
+    public StandardDataFlow(final byte[] flow, final byte[] templateBytes, final byte[] snippetBytes) {
+        this.flow = flow;
+        this.templateBytes = templateBytes;
+        this.snippetBytes = snippetBytes;
+    }
+    
+    public StandardDataFlow(final DataFlow toCopy) {
+        this.flow = copy(toCopy.getFlow());
+        this.templateBytes = copy(toCopy.getTemplates());
+        this.snippetBytes = copy(toCopy.getSnippets());
+        this.autoStartProcessors = toCopy.isAutoStartProcessors();
+    }
+    
+    private static byte[] copy(final byte[] bytes) {
+        return bytes == null ? null : Arrays.copyOf(bytes, bytes.length);
+    }
+    
+    /**
+     * @return the raw byte array of the flow 
+     */
+    public byte[] getFlow() {
+        return flow;
+    }
+
+    /**
+     * @return the raw byte array of the templates
+     */
+    public byte[] getTemplates() {
+        return templateBytes;
+    }
+    
+    /**
+     * @return the raw byte array of the snippets
+     */
+    public byte[] getSnippets() {
+        return snippetBytes;
+    }
+    
+    /**
+     * @return true if processors should be automatically started at application 
+     * startup; false otherwise 
+     */
+    public boolean isAutoStartProcessors() {
+        return autoStartProcessors;
+    }
+    
+    /**
+     * 
+     * Sets the flag to automatically start processors at application startup.
+     * 
+     * @param autoStartProcessors true if processors should be automatically
+     * started at application startup; false otherwise
+     */
+    public void setAutoStartProcessors(final boolean autoStartProcessors) {
+        this.autoStartProcessors = autoStartProcessors;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/UnknownServiceAddressException.java
----------------------------------------------------------------------
diff --git a/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/UnknownServiceAddressException.java b/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/UnknownServiceAddressException.java
new file mode 100644
index 0000000..41c74eb
--- /dev/null
+++ b/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/UnknownServiceAddressException.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.cluster.protocol;
+
+/**
+ * Represents the exceptional case when a service's address is not known.
+ * @author unattributed
+ */
+public class UnknownServiceAddressException extends RuntimeException {
+    
+    public UnknownServiceAddressException() {
+    }
+    
+    public UnknownServiceAddressException(String msg) {
+        super(msg);
+    }
+    
+    public UnknownServiceAddressException(Throwable cause) {
+        super(cause);
+    }
+    
+    public UnknownServiceAddressException(String msg, Throwable cause) {
+        super(msg, cause);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/ClusterManagerProtocolSenderImpl.java
----------------------------------------------------------------------
diff --git a/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/ClusterManagerProtocolSenderImpl.java b/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/ClusterManagerProtocolSenderImpl.java
new file mode 100644
index 0000000..ceb3fcb
--- /dev/null
+++ b/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/ClusterManagerProtocolSenderImpl.java
@@ -0,0 +1,245 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.cluster.protocol.impl;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.Socket;
+import java.net.SocketException;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.nifi.cluster.protocol.ClusterManagerProtocolSender;
+import org.apache.nifi.cluster.protocol.NodeIdentifier;
+import org.apache.nifi.cluster.protocol.ProtocolContext;
+import org.apache.nifi.cluster.protocol.ProtocolException;
+import org.apache.nifi.cluster.protocol.ProtocolMessageMarshaller;
+import org.apache.nifi.cluster.protocol.ProtocolMessageUnmarshaller;
+import org.apache.nifi.cluster.protocol.message.DisconnectMessage;
+import org.apache.nifi.cluster.protocol.message.FlowRequestMessage;
+import org.apache.nifi.cluster.protocol.message.FlowResponseMessage;
+import org.apache.nifi.cluster.protocol.message.PrimaryRoleAssignmentMessage;
+import org.apache.nifi.cluster.protocol.message.ProtocolMessage;
+import org.apache.nifi.cluster.protocol.message.ProtocolMessage.MessageType;
+import org.apache.nifi.cluster.protocol.message.ReconnectionRequestMessage;
+import org.apache.nifi.cluster.protocol.message.ReconnectionResponseMessage;
+import org.apache.nifi.io.socket.SocketConfiguration;
+import org.apache.nifi.io.socket.SocketUtils;
+import org.apache.nifi.reporting.BulletinRepository;
+import org.apache.nifi.util.FormatUtils;
+
+/**
+ * A protocol sender for sending protocol messages from the cluster manager to
+ * nodes.  
+ * 
+ * Connection-type requests (e.g., reconnection, disconnection) by nature of 
+ * starting/stopping flow controllers take longer than other types of protocol 
+ * messages.  Therefore, a handshake timeout may be specified to lengthen the 
+ * allowable time for communication with the node.
+ * 
+ * @author unattributed
+ */
+public class ClusterManagerProtocolSenderImpl implements ClusterManagerProtocolSender {
+
+    
+    private final ProtocolContext<ProtocolMessage> protocolContext;
+    private final SocketConfiguration socketConfiguration;
+    private int handshakeTimeoutSeconds;
+    private volatile BulletinRepository bulletinRepository;
+
+    public ClusterManagerProtocolSenderImpl(final SocketConfiguration socketConfiguration, final ProtocolContext<ProtocolMessage> protocolContext) {
+        if(socketConfiguration == null) {
+            throw new IllegalArgumentException("Socket configuration may not be null.");
+        } else if(protocolContext == null) {
+            throw new IllegalArgumentException("Protocol Context may not be null.");
+        }
+        this.socketConfiguration = socketConfiguration;
+        this.protocolContext = protocolContext;
+        this.handshakeTimeoutSeconds = -1;  // less than zero denotes variable not configured
+    }
+    
+    @Override
+    public void setBulletinRepository(final BulletinRepository bulletinRepository) {
+        this.bulletinRepository = bulletinRepository;
+    }
+
+    /**
+     * Requests the data flow from a node.
+     * @param msg a message
+     * @return the message response
+     * @throws @throws ProtocolException if the message failed to be sent or the response was malformed
+     */
+    @Override
+    public FlowResponseMessage requestFlow(final FlowRequestMessage msg) throws ProtocolException {
+        Socket socket = null;
+        try {
+        	socket = createSocket(msg.getNodeId(), false);
+            
+            try {
+                // marshal message to output stream
+                final ProtocolMessageMarshaller<ProtocolMessage> marshaller = protocolContext.createMarshaller();
+                marshaller.marshal(msg, socket.getOutputStream());
+            } catch(final IOException ioe) {
+                throw new ProtocolException("Failed marshalling '" + msg.getType() + "' protocol message due to: " + ioe, ioe);
+            }
+            
+            final ProtocolMessage response;
+            try {
+                // unmarshall response and return
+                final ProtocolMessageUnmarshaller<ProtocolMessage> unmarshaller = protocolContext.createUnmarshaller();
+                response = unmarshaller.unmarshal(socket.getInputStream());
+            } catch(final IOException ioe) {
+                throw new ProtocolException("Failed unmarshalling '" + MessageType.FLOW_RESPONSE + "' protocol message due to: " + ioe, ioe);
+            } 
+            
+            if(MessageType.FLOW_RESPONSE == response.getType()) {
+                return (FlowResponseMessage) response;
+            } else {
+                throw new ProtocolException("Expected message type '" + MessageType.FLOW_RESPONSE + "' but found '" + response.getType() + "'");
+            }
+            
+        } finally {
+            SocketUtils.closeQuietly(socket);
+        }
+    }
+
+    /**
+     * Requests a node to reconnect to the cluster.  The configured value for
+     * handshake timeout is applied to the socket before making the request.
+     * @param msg a message
+     * @return the response
+     * @throws ProtocolException if the message failed to be sent or the response was malformed
+     */
+    @Override
+    public ReconnectionResponseMessage requestReconnection(final ReconnectionRequestMessage msg) throws ProtocolException {
+        Socket socket = null;
+        try {
+        	socket = createSocket(msg.getNodeId(), true);
+
+            // marshal message to output stream
+            try {
+                final ProtocolMessageMarshaller<ProtocolMessage> marshaller = protocolContext.createMarshaller();
+                marshaller.marshal(msg, socket.getOutputStream());
+            } catch(final IOException ioe) {
+                throw new ProtocolException("Failed marshalling '" + msg.getType() + "' protocol message due to: " + ioe, ioe);
+            }
+            
+            
+            final ProtocolMessage response;
+            try {
+                // unmarshall response and return
+                final ProtocolMessageUnmarshaller<ProtocolMessage> unmarshaller = protocolContext.createUnmarshaller();
+                response = unmarshaller.unmarshal(socket.getInputStream());
+            } catch(final IOException ioe) {
+                throw new ProtocolException("Failed unmarshalling '" + MessageType.RECONNECTION_RESPONSE + "' protocol message due to: " + ioe, ioe);
+            } 
+            
+            if(MessageType.RECONNECTION_RESPONSE == response.getType()) {
+                return (ReconnectionResponseMessage) response;
+            } else {
+                throw new ProtocolException("Expected message type '" + MessageType.FLOW_RESPONSE + "' but found '" + response.getType() + "'");
+            }
+        } finally {
+            SocketUtils.closeQuietly(socket);
+        }
+    }
+    
+    /**
+     * Requests a node to disconnect from the cluster.  The configured value for
+     * handshake timeout is applied to the socket before making the request.
+     * @param msg a message
+     * @throws ProtocolException if the message failed to be sent
+     */
+    @Override
+    public void disconnect(final DisconnectMessage msg) throws ProtocolException {
+        Socket socket = null;
+        try {
+        	socket = createSocket(msg.getNodeId(), true);
+
+            // marshal message to output stream
+            try {
+                final ProtocolMessageMarshaller<ProtocolMessage> marshaller = protocolContext.createMarshaller();
+                marshaller.marshal(msg, socket.getOutputStream());
+            } catch(final IOException ioe) {
+                throw new ProtocolException("Failed marshalling '" + msg.getType() + "' protocol message due to: " + ioe, ioe);
+            }
+        } finally {
+            SocketUtils.closeQuietly(socket);
+        }
+    }
+
+    /**
+     * Assigns the primary role to a node.
+     * 
+     * @param msg a message
+     * 
+     * @throws ProtocolException if the message failed to be sent
+     */
+    @Override
+    public void assignPrimaryRole(final PrimaryRoleAssignmentMessage msg) throws ProtocolException {
+        Socket socket = null;
+        try {
+        	socket = createSocket(msg.getNodeId(), true);
+
+            try {
+                // marshal message to output stream
+                final ProtocolMessageMarshaller<ProtocolMessage> marshaller = protocolContext.createMarshaller();
+                marshaller.marshal(msg, socket.getOutputStream());
+            } catch(final IOException ioe) {
+                throw new ProtocolException("Failed marshalling '" + msg.getType() + "' protocol message due to: " + ioe, ioe);
+            }
+        } finally {
+            SocketUtils.closeQuietly(socket);
+        }
+    }
+    
+    
+    private void setConnectionHandshakeTimeoutOnSocket(final Socket socket) throws SocketException {
+        // update socket timeout, if handshake timeout was set; otherwise use socket's current timeout
+        if(handshakeTimeoutSeconds >= 0) {
+            socket.setSoTimeout(handshakeTimeoutSeconds * 1000);
+        }   
+    }
+    
+    public SocketConfiguration getSocketConfiguration() {
+        return socketConfiguration;
+    }
+
+    public int getHandshakeTimeoutSeconds() {
+        return handshakeTimeoutSeconds;
+    }
+
+    public void setHandshakeTimeout(final String handshakeTimeout) {
+        this.handshakeTimeoutSeconds = (int) FormatUtils.getTimeDuration(handshakeTimeout, TimeUnit.SECONDS);
+    }
+
+    private Socket createSocket(final NodeIdentifier nodeId, final boolean applyHandshakeTimeout) {
+    	return createSocket(nodeId.getSocketAddress(), nodeId.getSocketPort(), applyHandshakeTimeout);
+    }
+    
+    private Socket createSocket(final String host, final int port, final boolean applyHandshakeTimeout) {
+    	try {
+            // create a socket
+            final Socket socket = SocketUtils.createSocket(InetSocketAddress.createUnresolved(host, port), socketConfiguration);
+            if ( applyHandshakeTimeout ) {
+            	setConnectionHandshakeTimeoutOnSocket(socket);
+            }
+            return socket;
+        } catch(final IOException ioe) {
+            throw new ProtocolException("Failed to create socket due to: " + ioe, ioe);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/ClusterManagerProtocolSenderListener.java
----------------------------------------------------------------------
diff --git a/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/ClusterManagerProtocolSenderListener.java b/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/ClusterManagerProtocolSenderListener.java
new file mode 100644
index 0000000..933e5fa
--- /dev/null
+++ b/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/ClusterManagerProtocolSenderListener.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.cluster.protocol.impl;
+
+import java.io.IOException;
+import java.util.Collection;
+
+import org.apache.nifi.cluster.protocol.ClusterManagerProtocolSender;
+import org.apache.nifi.cluster.protocol.ProtocolException;
+import org.apache.nifi.cluster.protocol.ProtocolHandler;
+import org.apache.nifi.cluster.protocol.ProtocolListener;
+import org.apache.nifi.cluster.protocol.message.DisconnectMessage;
+import org.apache.nifi.cluster.protocol.message.FlowRequestMessage;
+import org.apache.nifi.cluster.protocol.message.FlowResponseMessage;
+import org.apache.nifi.cluster.protocol.message.PrimaryRoleAssignmentMessage;
+import org.apache.nifi.cluster.protocol.message.ReconnectionRequestMessage;
+import org.apache.nifi.cluster.protocol.message.ReconnectionResponseMessage;
+import org.apache.nifi.reporting.BulletinRepository;
+
+/**
+ * A wrapper class for consolidating a protocol sender and listener for the cluster
+ * manager.
+ * 
+ * @author unattributed
+ */
+public class ClusterManagerProtocolSenderListener implements ClusterManagerProtocolSender, ProtocolListener {
+    
+    private final ClusterManagerProtocolSender sender;
+    
+    private final ProtocolListener listener;
+    
+    public ClusterManagerProtocolSenderListener(final ClusterManagerProtocolSender sender, final ProtocolListener listener) {
+        if(sender == null) {
+            throw new IllegalArgumentException("ClusterManagerProtocolSender may not be null.");
+        } else if(listener == null) {
+            throw new IllegalArgumentException("ProtocolListener may not be null.");
+        }
+        this.sender = sender;
+        this.listener = listener;
+    }
+
+    @Override
+    public void stop() throws IOException {
+        if(!isRunning()) {
+            throw new IllegalStateException("Instance is already stopped.");
+        }
+        listener.stop();
+    }
+
+    @Override
+    public void start() throws IOException {
+        if(isRunning()) {
+            throw new IllegalStateException("Instance is already started.");
+        }
+        listener.start();
+    }
+
+    @Override
+    public boolean isRunning() {
+        return listener.isRunning();
+    }
+
+    @Override
+    public boolean removeHandler(final ProtocolHandler handler) {
+        return listener.removeHandler(handler);
+    }
+
+    @Override
+    public Collection<ProtocolHandler> getHandlers() {
+        return listener.getHandlers();
+    }
+
+    @Override
+    public void addHandler(final ProtocolHandler handler) {
+        listener.addHandler(handler);
+    }
+    
+    @Override
+    public void setBulletinRepository(final BulletinRepository bulletinRepository) {
+        listener.setBulletinRepository(bulletinRepository);
+        sender.setBulletinRepository(bulletinRepository);
+    }
+    
+    @Override
+    public FlowResponseMessage requestFlow(final FlowRequestMessage msg) throws ProtocolException {
+        return sender.requestFlow(msg);
+    }
+
+    @Override
+    public ReconnectionResponseMessage requestReconnection(final ReconnectionRequestMessage msg) throws ProtocolException {
+        return sender.requestReconnection(msg);
+    }
+
+    @Override
+    public void disconnect(DisconnectMessage msg) throws ProtocolException {
+        sender.disconnect(msg);
+    }
+    
+    @Override
+    public void assignPrimaryRole(PrimaryRoleAssignmentMessage msg) throws ProtocolException {
+        sender.assignPrimaryRole(msg);
+    }
+    
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/ClusterServiceDiscovery.java
----------------------------------------------------------------------
diff --git a/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/ClusterServiceDiscovery.java b/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/ClusterServiceDiscovery.java
new file mode 100644
index 0000000..24e51e0
--- /dev/null
+++ b/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/ClusterServiceDiscovery.java
@@ -0,0 +1,181 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.cluster.protocol.impl;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.Collection;
+import java.util.Collections;
+import org.apache.nifi.io.socket.multicast.DiscoverableService;
+import org.apache.nifi.io.socket.multicast.DiscoverableServiceImpl;
+import org.apache.nifi.io.socket.multicast.MulticastConfiguration;
+import org.apache.nifi.io.socket.multicast.MulticastServiceDiscovery;
+import org.apache.nifi.reporting.BulletinRepository;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.nifi.cluster.protocol.ProtocolContext;
+import org.apache.nifi.cluster.protocol.ProtocolException;
+import org.apache.nifi.cluster.protocol.ProtocolHandler;
+import org.apache.nifi.cluster.protocol.ProtocolListener;
+import org.apache.nifi.cluster.protocol.message.ProtocolMessage;
+import org.apache.nifi.cluster.protocol.message.ProtocolMessage.MessageType;
+import org.apache.nifi.cluster.protocol.message.ServiceBroadcastMessage;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * An implementation for discovering services by way of "service broadcast" type
+ * protocol messages over multicast.
+ *
+ * The client caller is responsible for starting and stopping the service
+ * discovery. The instance must be stopped before termination of the JVM to
+ * ensure proper resource clean-up.
+ *
+ * @author unattributed
+ */
+public class ClusterServiceDiscovery implements MulticastServiceDiscovery, ProtocolListener {
+
+    private static final Logger logger = LoggerFactory.getLogger(ClusterServiceDiscovery.class);
+    private final String serviceName;
+    private final MulticastConfiguration multicastConfiguration;
+    private final MulticastProtocolListener listener;
+    private volatile BulletinRepository bulletinRepository;
+
+    /*
+     * guarded by this
+     */
+    private DiscoverableService service;
+
+    
+    public ClusterServiceDiscovery(final String serviceName, final InetSocketAddress multicastAddress,
+            final MulticastConfiguration multicastConfiguration, final ProtocolContext<ProtocolMessage> protocolContext) {
+
+        if (StringUtils.isBlank(serviceName)) {
+            throw new IllegalArgumentException("Service name may not be null or empty.");
+        } else if (multicastAddress == null) {
+            throw new IllegalArgumentException("Multicast address may not be null.");
+        } else if (multicastAddress.getAddress().isMulticastAddress() == false) {
+            throw new IllegalArgumentException("Multicast group must be a Class D address.");
+        } else if (protocolContext == null) {
+            throw new IllegalArgumentException("Protocol Context may not be null.");
+        } else if (multicastConfiguration == null) {
+            throw new IllegalArgumentException("Multicast configuration may not be null.");
+        }
+
+        this.serviceName = serviceName;
+        this.multicastConfiguration = multicastConfiguration;
+        this.listener = new MulticastProtocolListener(1, multicastAddress, multicastConfiguration, protocolContext);
+        listener.addHandler(new ClusterManagerServiceBroadcastHandler());
+    }
+
+    @Override
+    public void setBulletinRepository(final BulletinRepository bulletinRepository) {
+        this.bulletinRepository = bulletinRepository;
+    }
+
+    @Override
+    public synchronized DiscoverableService getService() {
+        return service;
+    }
+
+    @Override
+    public InetSocketAddress getMulticastAddress() {
+        return listener.getMulticastAddress();
+    }
+
+    @Override
+    public Collection<ProtocolHandler> getHandlers() {
+        return Collections.unmodifiableCollection(listener.getHandlers());
+    }
+
+    @Override
+    public void addHandler(ProtocolHandler handler) {
+        listener.addHandler(handler);
+    }
+
+    @Override
+    public boolean removeHandler(ProtocolHandler handler) {
+        return listener.removeHandler(handler);
+    }
+
+    @Override
+    public boolean isRunning() {
+        return listener.isRunning();
+    }
+
+    @Override
+    public void start() throws IOException {
+        if (isRunning()) {
+            throw new IllegalStateException("Instance is already running.");
+        }
+        listener.start();
+    }
+
+    @Override
+    public void stop() throws IOException {
+        if (isRunning() == false) {
+            throw new IllegalStateException("Instance is already stopped.");
+        }
+        listener.stop();
+    }
+
+    public String getServiceName() {
+        return serviceName;
+    }
+
+    public MulticastConfiguration getMulticastConfiguration() {
+        return multicastConfiguration;
+    }
+
+    private class ClusterManagerServiceBroadcastHandler implements ProtocolHandler {
+
+        @Override
+        public boolean canHandle(final ProtocolMessage msg) {
+            return MessageType.SERVICE_BROADCAST == msg.getType();
+        }
+
+        @Override
+        public ProtocolMessage handle(final ProtocolMessage msg) throws ProtocolException {
+            synchronized (ClusterServiceDiscovery.this) {
+                if (canHandle(msg) == false) {
+                    throw new ProtocolException("Handler cannot handle message type: " + msg.getType());
+                } else {
+                    final ServiceBroadcastMessage broadcastMsg = (ServiceBroadcastMessage) msg;
+                    if (serviceName.equals(broadcastMsg.getServiceName())) {
+                        final DiscoverableService oldService = service;
+                        if (oldService == null
+                                || broadcastMsg.getAddress().equalsIgnoreCase(oldService.getServiceAddress().getHostName()) == false
+                                || broadcastMsg.getPort() != oldService.getServiceAddress().getPort()) {
+                            service = new DiscoverableServiceImpl(serviceName, InetSocketAddress.createUnresolved(broadcastMsg.getAddress(), broadcastMsg.getPort()));
+                            final InetSocketAddress oldServiceAddress = (oldService == null) ? null : oldService.getServiceAddress();
+                            logger.info(String.format("Updating cluster service address for '%s' from '%s' to '%s'", serviceName, prettyPrint(oldServiceAddress), prettyPrint(service.getServiceAddress())));
+                        }
+                    }
+                    return null;
+                }
+            }
+        }
+    }
+
+    private String prettyPrint(final InetSocketAddress address) {
+        if (address == null) {
+            return "0.0.0.0:0";
+        } else {
+            return address.getHostName() + ":" + address.getPort();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/ClusterServiceLocator.java
----------------------------------------------------------------------
diff --git a/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/ClusterServiceLocator.java b/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/ClusterServiceLocator.java
new file mode 100644
index 0000000..bebfde8
--- /dev/null
+++ b/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/ClusterServiceLocator.java
@@ -0,0 +1,229 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.cluster.protocol.impl;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.nifi.io.socket.multicast.DiscoverableService;
+import org.apache.nifi.io.socket.multicast.DiscoverableServiceImpl;
+import org.apache.nifi.io.socket.multicast.ServiceDiscovery;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Implements the ServiceLocator interface for locating the socket address
+ * of a cluster service.  Depending on configuration, the address may be located
+ * using service discovery.  If using service discovery, then the service methods
+ * must be used for starting and stopping discovery.
+ * 
+ * Service discovery may be used in conjunction with a fixed port.  In this case,
+ * the service discovery will yield the service IP/host while the fixed port will
+ * be used for the port.
+ * 
+ * Alternatively, the instance may be configured with exact service location, in
+ * which case, no service discovery occurs and the caller will always receive the
+ * configured service.
+ * 
+ * @author unattributed
+ */
+public class ClusterServiceLocator implements ServiceDiscovery {
+    
+    private static final Logger logger = LoggerFactory.getLogger(ClusterServiceLocator.class);
+    
+    private final String serviceName;
+    
+    private final ClusterServiceDiscovery serviceDiscovery;
+    
+    private final DiscoverableService fixedService;
+
+    private final int fixedServicePort;
+    
+    private final AttemptsConfig attemptsConfig = new AttemptsConfig();
+    
+    private final AtomicBoolean running = new AtomicBoolean(false);
+    
+    public ClusterServiceLocator(final ClusterServiceDiscovery serviceDiscovery) {
+        if(serviceDiscovery == null) {
+            throw new IllegalArgumentException("ClusterServiceDiscovery may not be null.");
+        }
+        this.serviceDiscovery = serviceDiscovery;
+        this.fixedService = null;
+        this.fixedServicePort = 0;
+        this.serviceName = serviceDiscovery.getServiceName();
+    }
+    
+    public ClusterServiceLocator(final ClusterServiceDiscovery serviceDiscovery, final int fixedServicePort) {
+        if(serviceDiscovery == null) {
+            throw new IllegalArgumentException("ClusterServiceDiscovery may not be null.");
+        }
+        this.serviceDiscovery = serviceDiscovery;
+        this.fixedService = null;
+        this.fixedServicePort = fixedServicePort;
+        this.serviceName = serviceDiscovery.getServiceName();
+    }
+    
+    public ClusterServiceLocator(final DiscoverableService fixedService) {
+        if(fixedService == null) {
+            throw new IllegalArgumentException("Service may not be null.");
+        }
+        this.serviceDiscovery = null;
+        this.fixedService = fixedService;
+        this.fixedServicePort = 0;
+        this.serviceName = fixedService.getServiceName();
+    }
+    
+    @Override
+    public DiscoverableService getService() {
+        
+        final int numAttemptsValue;
+        final int secondsBetweenAttempts;
+        synchronized(this) {
+            numAttemptsValue = attemptsConfig.numAttempts;
+            secondsBetweenAttempts = attemptsConfig.getTimeBetweenAttempts();
+        }
+        
+        // try for a configured amount of attempts to retrieve the service address
+        for(int i = 0; i < numAttemptsValue; i++) {
+
+            if(fixedService != null) {
+                return fixedService;
+            } else if(serviceDiscovery != null) {
+                
+                final DiscoverableService discoveredService = serviceDiscovery.getService();
+                
+                // if we received an address
+                if(discoveredService != null) {
+                    // if we were configured with a fixed port, then use the discovered host and fixed port; otherwise use the discovered address
+                    if(fixedServicePort > 0) {
+                        // create service using discovered service name and address with fixed service port
+                        final InetSocketAddress addr = InetSocketAddress.createUnresolved(discoveredService.getServiceAddress().getHostName(), fixedServicePort);
+                        final DiscoverableService result = new DiscoverableServiceImpl(discoveredService.getServiceName(), addr);
+                        return result;
+                    } else {
+                        return discoveredService;
+                    }
+                }
+            }
+            
+            // could not obtain service address, so sleep a bit
+            try {
+                logger.debug(String.format("Locating Cluster Service '%s' Attempt: %d of %d failed.  Trying again in %d seconds.", 
+                    serviceName, (i + 1), numAttemptsValue, secondsBetweenAttempts));
+                Thread.sleep(secondsBetweenAttempts * 1000);
+            } catch(final InterruptedException ie) {
+                break;
+            }
+            
+        }
+
+        return null;
+    }
+
+    public boolean isRunning() {
+        if(serviceDiscovery != null) {
+            return serviceDiscovery.isRunning();
+        } else {
+            return running.get();
+        }
+    }
+
+    public void start() throws IOException {
+        
+        if(isRunning()) {
+            throw new IllegalStateException("Instance is already started.");
+        }
+        
+        if(serviceDiscovery != null) {
+            serviceDiscovery.start();
+        }
+        running.set(true);
+    }
+
+    public void stop() throws IOException {
+        
+        if(isRunning() == false) {
+            throw new IllegalStateException("Instance is already stopped.");
+        }
+        
+        if(serviceDiscovery != null) {
+            serviceDiscovery.stop();
+        }
+        running.set(false);
+    }
+    
+    public synchronized void setAttemptsConfig(final AttemptsConfig config) {
+        if(config == null) {
+            throw new IllegalArgumentException("Attempts configuration may not be null.");
+        }
+        this.attemptsConfig.numAttempts = config.numAttempts;
+        this.attemptsConfig.timeBetweenAttempts = config.timeBetweenAttempts;
+        this.attemptsConfig.timeBetweenAttempsUnit = config.timeBetweenAttempsUnit;
+    }
+
+    public synchronized AttemptsConfig getAttemptsConfig() {
+        final AttemptsConfig config = new AttemptsConfig();
+        config.numAttempts = this.attemptsConfig.numAttempts;
+        config.timeBetweenAttempts = this.attemptsConfig.timeBetweenAttempts;
+        config.timeBetweenAttempsUnit = this.attemptsConfig.timeBetweenAttempsUnit;
+        return config;
+    }
+    
+    public static class AttemptsConfig {
+        
+        private int numAttempts = 1;
+        
+        private int timeBetweenAttempts = 1;
+        
+        private TimeUnit timeBetweenAttempsUnit = TimeUnit.SECONDS;
+        
+        public int getNumAttempts() {
+            return numAttempts;
+        }
+
+        public void setNumAttempts(int numAttempts) {
+            if(numAttempts <= 0) {
+                throw new IllegalArgumentException("Number of attempts must be positive: " + numAttempts);
+            }
+            this.numAttempts = numAttempts;
+        }
+
+        public TimeUnit getTimeBetweenAttemptsUnit() {
+            return timeBetweenAttempsUnit;
+        }
+
+        public void setTimeBetweenAttempsUnit(TimeUnit timeBetweenAttempsUnit) {
+            if(timeBetweenAttempts <= 0) {
+                throw new IllegalArgumentException("Time between attempts must be positive: " + numAttempts);
+            } 
+            this.timeBetweenAttempsUnit = timeBetweenAttempsUnit;
+        }
+
+        public int getTimeBetweenAttempts() {
+            return timeBetweenAttempts;
+        }
+
+        public void setTimeBetweenAttempts(int timeBetweenAttempts) {
+            if(timeBetweenAttempts <= 0) {
+            throw new IllegalArgumentException("Time between attempts must be positive: " + numAttempts);
+        } 
+            this.timeBetweenAttempts = timeBetweenAttempts;
+        }
+
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/ClusterServicesBroadcaster.java
----------------------------------------------------------------------
diff --git a/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/ClusterServicesBroadcaster.java b/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/ClusterServicesBroadcaster.java
new file mode 100644
index 0000000..e9e7d5b
--- /dev/null
+++ b/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/ClusterServicesBroadcaster.java
@@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.cluster.protocol.impl;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.net.DatagramPacket;
+import java.net.InetSocketAddress;
+import java.net.MulticastSocket;
+import java.util.*;
+import java.util.concurrent.CopyOnWriteArraySet;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.nifi.io.socket.multicast.DiscoverableService;
+import org.apache.nifi.io.socket.multicast.MulticastConfiguration;
+import org.apache.nifi.io.socket.multicast.MulticastServicesBroadcaster;
+import org.apache.nifi.io.socket.multicast.MulticastUtils;
+import org.apache.nifi.logging.NiFiLog;
+import org.apache.nifi.util.FormatUtils;
+import org.apache.nifi.cluster.protocol.ProtocolContext;
+import org.apache.nifi.cluster.protocol.ProtocolMessageMarshaller;
+import org.apache.nifi.cluster.protocol.message.ProtocolMessage;
+import org.apache.nifi.cluster.protocol.message.ServiceBroadcastMessage;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Broadcasts services used by the clustering software using multicast communication.
+ * A configurable delay occurs after broadcasting the collection of services.
+ * 
+ * The client caller is responsible for starting and stopping the broadcasting.
+ * The instance must be stopped before termination of the JVM to ensure proper
+ * resource clean-up.
+ * 
+ * @author unattributed
+ */
+public class ClusterServicesBroadcaster implements MulticastServicesBroadcaster {
+    
+    private static final Logger logger = new NiFiLog(LoggerFactory.getLogger(ClusterServicesBroadcaster.class));
+    
+    private final Set<DiscoverableService> services = new CopyOnWriteArraySet<>();
+
+    private final InetSocketAddress multicastAddress;
+    
+    private final MulticastConfiguration multicastConfiguration;
+    
+    private final ProtocolContext<ProtocolMessage> protocolContext;
+    
+    private final int broadcastDelayMs;
+    
+    private Timer broadcaster;
+    
+    private MulticastSocket multicastSocket;
+    
+    public ClusterServicesBroadcaster(final InetSocketAddress multicastAddress, 
+            final MulticastConfiguration multicastConfiguration, 
+            final ProtocolContext<ProtocolMessage> protocolContext, final String broadcastDelay) {
+        
+        if(multicastAddress == null) {
+            throw new IllegalArgumentException("Multicast address may not be null.");
+        } else if(multicastAddress.getAddress().isMulticastAddress() == false) {
+            throw new IllegalArgumentException("Multicast group address is not a Class D IP address.");
+        } else if(protocolContext == null) {
+            throw new IllegalArgumentException("Protocol Context may not be null.");
+        } else if(multicastConfiguration == null) {
+            throw new IllegalArgumentException("Multicast configuration may not be null.");
+        }
+        
+        this.services.addAll(services);
+        this.multicastAddress = multicastAddress;
+        this.multicastConfiguration = multicastConfiguration;
+        this.protocolContext = protocolContext;
+        this.broadcastDelayMs = (int) FormatUtils.getTimeDuration(broadcastDelay, TimeUnit.MILLISECONDS);
+    }
+    
+    public void start() throws IOException {
+
+        if(isRunning()) {
+            throw new IllegalStateException("Instance is already started.");
+        }
+        
+        // setup socket
+        multicastSocket = MulticastUtils.createMulticastSocket(multicastConfiguration);
+        
+        // setup broadcaster
+        broadcaster = new Timer("Cluster Services Broadcaster", /* is daemon */ true);
+        broadcaster.schedule(new TimerTask() {
+            @Override
+            public void run() {
+                for(final DiscoverableService service : services) {
+                    try {
+
+                        final InetSocketAddress serviceAddress = service.getServiceAddress();
+                        logger.debug(String.format("Broadcasting Cluster Service '%s' at address %s:%d", 
+                            service.getServiceName(), serviceAddress.getHostName(), serviceAddress.getPort()));
+                        
+                        // create message
+                        final ServiceBroadcastMessage msg = new ServiceBroadcastMessage();
+                        msg.setServiceName(service.getServiceName());
+                        msg.setAddress(serviceAddress.getHostName());
+                        msg.setPort(serviceAddress.getPort());
+
+                        // marshal message to output stream
+                        final ProtocolMessageMarshaller<ProtocolMessage> marshaller = protocolContext.createMarshaller();
+                        final ByteArrayOutputStream baos = new ByteArrayOutputStream();
+                        marshaller.marshal(msg, baos);
+                        final byte[] packetBytes = baos.toByteArray();
+
+                        // send message
+                        final DatagramPacket packet = new DatagramPacket(packetBytes, packetBytes.length, multicastAddress);
+                        multicastSocket.send(packet);
+
+                    } catch(final Exception ex) {
+                        logger.warn(String.format("Cluster Services Broadcaster failed broadcasting service '%s' due to: %s", service.getServiceName(), ex), ex);
+                    }
+                }
+            }
+        }, 0, broadcastDelayMs);
+    }
+    
+    public boolean isRunning() {
+        return (broadcaster != null);
+    }
+    
+    public void stop() {
+        
+        if(isRunning() == false) {
+            throw new IllegalStateException("Instance is already stopped.");
+        }
+        
+        broadcaster.cancel();
+        broadcaster = null;
+
+        // close socket
+        MulticastUtils.closeQuietly(multicastSocket);
+        
+    }
+
+    @Override
+    public int getBroadcastDelayMs() {
+        return broadcastDelayMs;
+    }
+    
+    @Override
+    public Set<DiscoverableService> getServices() {
+        return Collections.unmodifiableSet(services);
+    }
+
+    @Override
+    public InetSocketAddress getMulticastAddress() {
+        return multicastAddress;
+    }
+    
+    @Override
+    public boolean addService(final DiscoverableService service) {
+        return services.add(service);
+    }
+    
+    @Override
+    public boolean removeService(final String serviceName) {
+        for(final DiscoverableService service : services) {
+            if(service.getServiceName().equals(serviceName)) {
+                return services.remove(service);
+            }
+        }
+        return false;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/CopyingInputStream.java
----------------------------------------------------------------------
diff --git a/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/CopyingInputStream.java b/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/CopyingInputStream.java
new file mode 100644
index 0000000..680df65
--- /dev/null
+++ b/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/CopyingInputStream.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.cluster.protocol.impl;
+
+import java.io.ByteArrayOutputStream;
+import java.io.FilterInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+
+public class CopyingInputStream extends FilterInputStream {
+    private final ByteArrayOutputStream baos = new ByteArrayOutputStream();
+    private final int maxBytesToCopy;
+    private final InputStream in;
+
+    public CopyingInputStream(final InputStream in, final int maxBytesToCopy) {
+        super(in);
+        this.maxBytesToCopy = maxBytesToCopy;
+        this.in = in;
+    }
+    
+    @Override
+    public int read() throws IOException {
+        final int delegateRead = in.read();
+        if ( delegateRead != -1 && getNumberOfBytesCopied() < maxBytesToCopy ) {
+            baos.write(delegateRead);
+        }
+        
+        return delegateRead;
+    }
+    
+    @Override
+    public int read(byte[] b) throws IOException {
+        final int delegateRead = in.read(b);
+        if ( delegateRead >= 0 ) {
+            baos.write(b, 0, Math.min(delegateRead, maxBytesToCopy - getNumberOfBytesCopied()));
+        }
+        
+        return delegateRead;
+    }
+    
+    @Override
+    public int read(byte[] b, int off, int len) throws IOException {
+        final int delegateRead = in.read(b, off, len);
+        if ( delegateRead >= 0 ) {
+            baos.write(b, off, Math.min(delegateRead, maxBytesToCopy - getNumberOfBytesCopied()));
+        }
+        
+        return delegateRead;
+    }
+    
+    public byte[] getBytesRead() {
+        return baos.toByteArray();
+    }
+    
+    public void writeBytes(final OutputStream out) throws IOException {
+        baos.writeTo(out);
+    }
+    
+    public int getNumberOfBytesCopied() {
+        return baos.size();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/MulticastProtocolListener.java
----------------------------------------------------------------------
diff --git a/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/MulticastProtocolListener.java b/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/MulticastProtocolListener.java
new file mode 100644
index 0000000..d3764b3
--- /dev/null
+++ b/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/MulticastProtocolListener.java
@@ -0,0 +1,204 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.cluster.protocol.impl;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.net.DatagramPacket;
+import java.net.InetSocketAddress;
+import java.net.MulticastSocket;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.UUID;
+import java.util.concurrent.CopyOnWriteArrayList;
+import org.apache.nifi.io.socket.multicast.MulticastConfiguration;
+import org.apache.nifi.io.socket.multicast.MulticastListener;
+import org.apache.nifi.logging.NiFiLog;
+import org.apache.nifi.reporting.Bulletin;
+import org.apache.nifi.reporting.BulletinRepository;
+import org.apache.nifi.cluster.protocol.ProtocolContext;
+import org.apache.nifi.cluster.protocol.ProtocolException;
+import org.apache.nifi.cluster.protocol.ProtocolHandler;
+import org.apache.nifi.cluster.protocol.ProtocolListener;
+import org.apache.nifi.cluster.protocol.ProtocolMessageMarshaller;
+import org.apache.nifi.cluster.protocol.ProtocolMessageUnmarshaller;
+import org.apache.nifi.cluster.protocol.message.MulticastProtocolMessage;
+import org.apache.nifi.cluster.protocol.message.ProtocolMessage;
+import org.apache.nifi.events.BulletinFactory;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Implements a listener for protocol messages sent over multicast.  If a message
+ * is of type MulticastProtocolMessage, then the underlying protocol message is
+ * passed to the handler.  If the receiving handler produces a message response,
+ * then the message is wrapped with a MulticastProtocolMessage before being 
+ * sent to the originator.
+ * 
+ * The client caller is responsible for starting and stopping the listener.
+ * The instance must be stopped before termination of the JVM to ensure proper
+ * resource clean-up.
+ * 
+ * @author unattributed
+ */
+public class MulticastProtocolListener extends MulticastListener implements ProtocolListener {
+    
+    private static final Logger logger = new NiFiLog(LoggerFactory.getLogger(MulticastProtocolListener.class));
+
+    // immutable members
+    private final Collection<ProtocolHandler> handlers = new CopyOnWriteArrayList<>();
+    private final String listenerId = UUID.randomUUID().toString();
+    private final ProtocolContext<ProtocolMessage> protocolContext;
+    private volatile BulletinRepository bulletinRepository;
+
+    public MulticastProtocolListener(
+            final int numThreads,
+            final InetSocketAddress multicastAddress,
+            final MulticastConfiguration configuration,
+            final ProtocolContext<ProtocolMessage> protocolContext) {
+
+        super(numThreads, multicastAddress, configuration);
+        
+        if (protocolContext == null) {
+            throw new IllegalArgumentException("Protocol Context may not be null.");
+        }
+        this.protocolContext = protocolContext;
+    }
+
+    @Override
+    public void setBulletinRepository(final BulletinRepository bulletinRepository) {
+        this.bulletinRepository = bulletinRepository;
+    }
+
+    @Override
+    public void start() throws IOException {
+
+        if(super.isRunning()) {
+            throw new IllegalStateException("Instance is already started.");
+        }
+        
+        super.start();
+        
+    }
+
+    @Override
+    public void stop() throws IOException {
+
+        if(super.isRunning() == false) {
+            throw new IllegalStateException("Instance is already stopped.");
+        }
+        
+        // shutdown listener
+        super.stop();
+
+    }
+
+    @Override
+    public Collection<ProtocolHandler> getHandlers() {
+        return Collections.unmodifiableCollection(handlers);
+    }
+
+    @Override
+    public void addHandler(final ProtocolHandler handler) {
+        if(handler == null) {
+            throw new NullPointerException("Protocol handler may not be null.");
+        }
+        handlers.add(handler);
+    }
+    
+    @Override
+    public boolean removeHandler(final ProtocolHandler handler) {
+        return handlers.remove(handler);
+    }
+    
+    @Override
+    public void dispatchRequest(final MulticastSocket multicastSocket, final DatagramPacket packet) {
+
+        try {
+
+            // unmarshall message
+            final ProtocolMessageUnmarshaller<ProtocolMessage> unmarshaller = protocolContext.createUnmarshaller();
+            final ProtocolMessage request = unmarshaller.unmarshal(new ByteArrayInputStream(packet.getData(), 0, packet.getLength()));
+
+            // unwrap multicast message, if necessary
+            final ProtocolMessage unwrappedRequest;
+            if(request instanceof MulticastProtocolMessage) {
+                final MulticastProtocolMessage multicastRequest = (MulticastProtocolMessage) request;
+                // don't process a message we sent
+                if(listenerId.equals(multicastRequest.getId())) {
+                    return;
+                } else {
+                    unwrappedRequest = multicastRequest.getProtocolMessage();
+                }
+            } else {
+                unwrappedRequest = request;
+            }
+            
+            // dispatch message to handler
+            ProtocolHandler desiredHandler = null;
+            for (final ProtocolHandler handler : getHandlers()) {
+                if (handler.canHandle(unwrappedRequest)) {
+                    desiredHandler = handler;
+                    break;
+                }
+            }
+
+            // if no handler found, throw exception; otherwise handle request
+            if (desiredHandler == null) {
+                throw new ProtocolException("No handler assigned to handle message type: " + request.getType());
+            } else {
+                final ProtocolMessage response = desiredHandler.handle(request);
+                if(response != null) {
+                    try {
+                        
+                        // wrap with listener id
+                        final MulticastProtocolMessage multicastResponse = new MulticastProtocolMessage(listenerId, response);
+                        
+                        // marshal message
+                        final ByteArrayOutputStream baos = new ByteArrayOutputStream();
+                        final ProtocolMessageMarshaller<ProtocolMessage> marshaller = protocolContext.createMarshaller();
+                        marshaller.marshal(multicastResponse, baos);
+                        final byte[] responseBytes = baos.toByteArray();
+                        
+                        final int maxPacketSizeBytes = getMaxPacketSizeBytes();
+                        if(responseBytes.length > maxPacketSizeBytes) {
+                            logger.warn("Cluster protocol handler '" + desiredHandler.getClass() + 
+                                "' produced a multicast response with length greater than configured max packet size '" + maxPacketSizeBytes + "'");
+                        }
+                        
+                        // create and send packet
+                        final DatagramPacket responseDatagram = new DatagramPacket(responseBytes, responseBytes.length, getMulticastAddress().getAddress(), getMulticastAddress().getPort()); 
+                        multicastSocket.send(responseDatagram);
+                        
+                    } catch (final IOException ioe) {
+                        throw new ProtocolException("Failed marshalling protocol message in response to message type: " + request.getType() + " due to: " + ioe, ioe);
+                    }
+                }
+            }
+
+        } catch (final Throwable t) {
+            logger.warn("Failed processing protocol message due to " + t, t);
+            
+            if ( bulletinRepository != null ) {
+                final Bulletin bulletin = BulletinFactory.createBulletin("Clustering", "WARNING", "Failed to process Protocol Message due to " + t.toString());
+                bulletinRepository.addBulletin(bulletin);
+            }
+        }
+    }
+}