You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by jo...@apache.org on 2014/12/08 21:29:40 UTC

[17/51] [partial] incubator-nifi git commit: Initial code contribution

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/NodeProtocolSenderImpl.java
----------------------------------------------------------------------
diff --git a/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/NodeProtocolSenderImpl.java b/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/NodeProtocolSenderImpl.java
new file mode 100644
index 0000000..dc86d24
--- /dev/null
+++ b/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/NodeProtocolSenderImpl.java
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.cluster.protocol.impl;
+
+import java.io.IOException;
+import java.net.Socket;
+
+import javax.net.ssl.SSLSocket;
+import javax.security.cert.X509Certificate;
+
+import org.apache.nifi.cluster.protocol.NodeProtocolSender;
+import org.apache.nifi.cluster.protocol.ProtocolContext;
+import org.apache.nifi.cluster.protocol.ProtocolException;
+import org.apache.nifi.cluster.protocol.ProtocolMessageMarshaller;
+import org.apache.nifi.cluster.protocol.ProtocolMessageUnmarshaller;
+import org.apache.nifi.cluster.protocol.UnknownServiceAddressException;
+import org.apache.nifi.cluster.protocol.message.ConnectionRequestMessage;
+import org.apache.nifi.cluster.protocol.message.ConnectionResponseMessage;
+import org.apache.nifi.cluster.protocol.message.ControllerStartupFailureMessage;
+import org.apache.nifi.cluster.protocol.message.HeartbeatMessage;
+import org.apache.nifi.cluster.protocol.message.NodeBulletinsMessage;
+import org.apache.nifi.cluster.protocol.message.ProtocolMessage;
+import org.apache.nifi.cluster.protocol.message.ProtocolMessage.MessageType;
+import org.apache.nifi.cluster.protocol.message.ReconnectionFailureMessage;
+import org.apache.nifi.io.socket.SocketConfiguration;
+import org.apache.nifi.io.socket.SocketUtils;
+import org.apache.nifi.io.socket.multicast.DiscoverableService;
+
+public class NodeProtocolSenderImpl implements NodeProtocolSender {
+    private final SocketConfiguration socketConfiguration;
+    private final ClusterServiceLocator clusterManagerProtocolServiceLocator;
+    private final ProtocolContext<ProtocolMessage> protocolContext;
+    
+    public NodeProtocolSenderImpl(final ClusterServiceLocator clusterManagerProtocolServiceLocator, 
+            final SocketConfiguration socketConfiguration, final ProtocolContext<ProtocolMessage> protocolContext) {
+        if(clusterManagerProtocolServiceLocator == null) {
+            throw new IllegalArgumentException("Protocol Service Locator may not be null.");
+        } else if(socketConfiguration == null) {
+            throw new IllegalArgumentException("Socket configuration may not be null.");
+        } else if(protocolContext == null) {
+            throw new IllegalArgumentException("Protocol Context may not be null.");
+        }
+        
+        this.clusterManagerProtocolServiceLocator = clusterManagerProtocolServiceLocator;
+        this.socketConfiguration = socketConfiguration;
+        this.protocolContext = protocolContext;
+    }
+    
+    
+    @Override
+    public ConnectionResponseMessage requestConnection(final ConnectionRequestMessage msg) throws ProtocolException, UnknownServiceAddressException {
+        Socket socket = null;
+        try {
+            socket = createSocket();
+            
+            String ncmDn = null;
+            if ( socket instanceof SSLSocket ) {
+                final SSLSocket sslSocket = (SSLSocket) socket;
+                try {
+                    final X509Certificate[] certChains = sslSocket.getSession().getPeerCertificateChain();
+                    if ( certChains != null && certChains.length > 0 ) {
+                        ncmDn = certChains[0].getSubjectDN().getName();
+                    }
+                } catch (final ProtocolException pe) {
+                    throw pe;
+                } catch (final Exception e) {
+                    throw new ProtocolException(e);
+                }
+            }
+            
+            try {
+                // marshal message to output stream
+                final ProtocolMessageMarshaller<ProtocolMessage> marshaller = protocolContext.createMarshaller();
+                marshaller.marshal(msg, socket.getOutputStream());
+            } catch(final IOException ioe) {
+                throw new ProtocolException("Failed marshalling '" + msg.getType() + "' protocol message due to: " + ioe, ioe);
+            } 
+            
+            final ProtocolMessage response;
+            try {
+                // unmarshall response and return
+                final ProtocolMessageUnmarshaller<ProtocolMessage> unmarshaller = protocolContext.createUnmarshaller();
+                response = unmarshaller.unmarshal(socket.getInputStream());
+            } catch(final IOException ioe) {
+                throw new ProtocolException("Failed unmarshalling '" + MessageType.CONNECTION_RESPONSE + "' protocol message due to: " + ioe, ioe);
+            } 
+            
+            if(MessageType.CONNECTION_RESPONSE == response.getType()) {
+                final ConnectionResponseMessage connectionResponse = (ConnectionResponseMessage) response;
+                connectionResponse.setClusterManagerDN(ncmDn);
+                return connectionResponse;
+            } else {
+                throw new ProtocolException("Expected message type '" + MessageType.CONNECTION_RESPONSE + "' but found '" + response.getType() + "'");
+            }
+        } finally {
+            SocketUtils.closeQuietly(socket);
+        }
+    }
+    
+    
+    @Override
+    public void heartbeat(final HeartbeatMessage msg) throws ProtocolException, UnknownServiceAddressException {
+        sendProtocolMessage(msg);
+    }
+
+    @Override
+    public void sendBulletins(NodeBulletinsMessage msg) throws ProtocolException, UnknownServiceAddressException {
+        sendProtocolMessage(msg);
+    }
+
+    @Override
+    public void notifyControllerStartupFailure(final ControllerStartupFailureMessage msg) throws ProtocolException, UnknownServiceAddressException {
+        sendProtocolMessage(msg);
+    }
+
+    @Override
+    public void notifyReconnectionFailure(ReconnectionFailureMessage msg) throws ProtocolException, UnknownServiceAddressException {
+        sendProtocolMessage(msg);
+    }
+    
+    private Socket createSocket() {
+        // determine the cluster manager's address
+        final DiscoverableService service = clusterManagerProtocolServiceLocator.getService(); 
+        if(service == null) {
+            throw new UnknownServiceAddressException("Cluster Manager's service is not known.  Verify a cluster manager is running.");
+        }
+        
+        try {
+            // create a socket
+            return SocketUtils.createSocket(service.getServiceAddress(), socketConfiguration); 
+        } catch(final IOException ioe) {
+            throw new ProtocolException("Failed to create socket due to: " + ioe, ioe);
+        }
+    }
+    
+    private void sendProtocolMessage(final ProtocolMessage msg) {
+        Socket socket = null;
+        try {
+            socket = createSocket();
+
+            try {
+                // marshal message to output stream
+                final ProtocolMessageMarshaller<ProtocolMessage> marshaller = protocolContext.createMarshaller();
+                marshaller.marshal(msg, socket.getOutputStream());
+            } catch(final IOException ioe) {
+                throw new ProtocolException("Failed marshalling '" + msg.getType() + "' protocol message due to: " + ioe, ioe);
+            }
+        } finally {
+            SocketUtils.closeQuietly(socket);
+        }
+    }
+    
+    public SocketConfiguration getSocketConfiguration() {
+        return socketConfiguration;
+    }
+    
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/NodeProtocolSenderListener.java
----------------------------------------------------------------------
diff --git a/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/NodeProtocolSenderListener.java b/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/NodeProtocolSenderListener.java
new file mode 100644
index 0000000..4b359f4
--- /dev/null
+++ b/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/NodeProtocolSenderListener.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.cluster.protocol.impl;
+
+import java.io.IOException;
+import java.util.Collection;
+
+import org.apache.nifi.cluster.protocol.NodeProtocolSender;
+import org.apache.nifi.cluster.protocol.ProtocolException;
+import org.apache.nifi.cluster.protocol.ProtocolHandler;
+import org.apache.nifi.cluster.protocol.ProtocolListener;
+import org.apache.nifi.cluster.protocol.UnknownServiceAddressException;
+import org.apache.nifi.cluster.protocol.message.NodeBulletinsMessage;
+import org.apache.nifi.cluster.protocol.message.ConnectionRequestMessage;
+import org.apache.nifi.cluster.protocol.message.ConnectionResponseMessage;
+import org.apache.nifi.cluster.protocol.message.ControllerStartupFailureMessage;
+import org.apache.nifi.cluster.protocol.message.HeartbeatMessage;
+import org.apache.nifi.cluster.protocol.message.ReconnectionFailureMessage;
+import org.apache.nifi.reporting.BulletinRepository;
+
+public class NodeProtocolSenderListener implements NodeProtocolSender, ProtocolListener {
+    
+    private final NodeProtocolSender sender;
+    private final ProtocolListener listener;
+    
+    public NodeProtocolSenderListener(final NodeProtocolSender sender, final ProtocolListener listener) {
+        if(sender == null) {
+            throw new IllegalArgumentException("NodeProtocolSender may not be null.");
+        } else if(listener == null) {
+            throw new IllegalArgumentException("ProtocolListener may not be null.");
+        }
+        this.sender = sender;
+        this.listener = listener;
+    }
+
+    @Override
+    public void stop() throws IOException {
+        if(!isRunning()) {
+            throw new IllegalStateException("Instance is already stopped.");
+        }
+        listener.stop();
+    }
+
+    @Override
+    public void start() throws IOException {
+        if(isRunning()) {
+            throw new IllegalStateException("Instance is already started.");
+        }
+        listener.start();
+    }
+
+    @Override
+    public boolean isRunning() {
+        return listener.isRunning();
+    }
+
+    @Override
+    public boolean removeHandler(final ProtocolHandler handler) {
+        return listener.removeHandler(handler);
+    }
+
+    @Override
+    public Collection<ProtocolHandler> getHandlers() {
+        return listener.getHandlers();
+    }
+
+    @Override
+    public void addHandler(final ProtocolHandler handler) {
+        listener.addHandler(handler);
+    }
+
+    @Override
+    public void heartbeat(final HeartbeatMessage msg) throws ProtocolException, UnknownServiceAddressException {
+        sender.heartbeat(msg);
+    }
+
+    @Override
+    public ConnectionResponseMessage requestConnection(final ConnectionRequestMessage msg) throws ProtocolException, UnknownServiceAddressException {
+        return sender.requestConnection(msg);
+    }
+    
+    @Override
+    public void notifyControllerStartupFailure(final ControllerStartupFailureMessage msg) throws ProtocolException, UnknownServiceAddressException {
+        sender.notifyControllerStartupFailure(msg);
+    }
+    
+    @Override
+    public void notifyReconnectionFailure(final ReconnectionFailureMessage msg) throws ProtocolException, UnknownServiceAddressException {
+        sender.notifyReconnectionFailure(msg);
+    }
+
+    @Override
+    public void sendBulletins(NodeBulletinsMessage msg) throws ProtocolException, UnknownServiceAddressException {
+        sender.sendBulletins(msg);
+    }
+
+    @Override
+    public void setBulletinRepository(final BulletinRepository bulletinRepository) {
+        listener.setBulletinRepository(bulletinRepository);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/SocketProtocolListener.java
----------------------------------------------------------------------
diff --git a/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/SocketProtocolListener.java b/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/SocketProtocolListener.java
new file mode 100644
index 0000000..ca30d9b
--- /dev/null
+++ b/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/SocketProtocolListener.java
@@ -0,0 +1,205 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.cluster.protocol.impl;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.Socket;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.UUID;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.TimeUnit;
+
+import javax.net.ssl.SSLSocket;
+import javax.security.cert.X509Certificate;
+
+import org.apache.nifi.cluster.protocol.ProtocolContext;
+import org.apache.nifi.cluster.protocol.ProtocolException;
+import org.apache.nifi.cluster.protocol.ProtocolHandler;
+import org.apache.nifi.cluster.protocol.ProtocolListener;
+import org.apache.nifi.cluster.protocol.ProtocolMessageMarshaller;
+import org.apache.nifi.cluster.protocol.ProtocolMessageUnmarshaller;
+import org.apache.nifi.cluster.protocol.message.ProtocolMessage;
+import org.apache.nifi.events.BulletinFactory;
+import org.apache.nifi.io.socket.ServerSocketConfiguration;
+import org.apache.nifi.io.socket.SocketListener;
+import org.apache.nifi.logging.NiFiLog;
+import org.apache.nifi.reporting.Bulletin;
+import org.apache.nifi.reporting.BulletinRepository;
+import org.apache.nifi.util.StopWatch;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Implements a listener for protocol messages sent over unicast socket. 
+ * 
+ * @author unattributed
+ */
+public class SocketProtocolListener extends SocketListener implements ProtocolListener {
+
+    private static final Logger logger = new NiFiLog(LoggerFactory.getLogger(SocketProtocolListener.class));
+    private final ProtocolContext<ProtocolMessage> protocolContext;
+    private final Collection<ProtocolHandler> handlers = new CopyOnWriteArrayList<>();
+    private volatile BulletinRepository bulletinRepository;
+    
+    public SocketProtocolListener(
+            final int numThreads,
+            final int port,
+            final ServerSocketConfiguration configuration,
+            final ProtocolContext<ProtocolMessage> protocolContext) {
+
+        super(numThreads, port, configuration);
+        
+        if(protocolContext == null) {
+            throw new IllegalArgumentException("Protocol Context may not be null.");
+        }
+        
+        this.protocolContext = protocolContext;
+    }
+
+    @Override
+    public void setBulletinRepository(final BulletinRepository bulletinRepository) {
+        this.bulletinRepository = bulletinRepository;
+    }
+    
+    @Override
+    public void start() throws IOException {
+
+        if(super.isRunning()) {
+            throw new IllegalStateException("Instance is already started.");
+        }
+        
+        super.start();
+    }
+
+    @Override
+    public void stop() throws IOException {
+
+        if(super.isRunning() == false) {
+            throw new IOException("Instance is already stopped.");
+        }
+        
+        super.stop();
+
+    }
+
+    @Override
+    public Collection<ProtocolHandler> getHandlers() {
+        return Collections.unmodifiableCollection(handlers);
+    }
+
+    @Override
+    public void addHandler(final ProtocolHandler handler) {
+        if(handler == null) {
+            throw new NullPointerException("Protocol handler may not be null.");
+        }
+        handlers.add(handler);
+    }
+    
+    @Override
+    public boolean removeHandler(final ProtocolHandler handler) {
+        return handlers.remove(handler);
+    }
+
+    @Override
+    public void dispatchRequest(final Socket socket) {
+        byte[] receivedMessage = null;
+        String hostname = null;
+        final int maxMsgBuffer = 1024 * 1024;   // don't buffer more than 1 MB of the message
+        try {
+            final StopWatch stopWatch = new StopWatch(true);
+            hostname = socket.getInetAddress().getHostName();
+            final String requestId = UUID.randomUUID().toString();
+            logger.info("Received request {} from {}", requestId, hostname);
+            
+            String requestorDn = null;
+            if ( socket instanceof SSLSocket ) {
+                final SSLSocket sslSocket = (SSLSocket) socket;
+                try {
+                    final X509Certificate[] certChains = sslSocket.getSession().getPeerCertificateChain();
+                    if ( certChains != null && certChains.length > 0 ) {
+                        requestorDn = certChains[0].getSubjectDN().getName();
+                    }
+                } catch (final ProtocolException pe) {
+                    throw pe;
+                } catch (final Exception e) {
+                    throw new ProtocolException(e);
+                }
+            }
+            
+            // unmarshall message
+            final ProtocolMessageUnmarshaller<ProtocolMessage> unmarshaller = protocolContext.createUnmarshaller();
+            final InputStream inStream = socket.getInputStream();
+            final CopyingInputStream copyingInputStream = new CopyingInputStream(inStream, maxMsgBuffer); // don't copy more than 1 MB
+            logger.debug("Request {} has a message length of {}", requestId, copyingInputStream.getNumberOfBytesCopied());
+            
+            final ProtocolMessage request;
+            try {
+                request = unmarshaller.unmarshal(copyingInputStream);
+            } finally {
+                receivedMessage = copyingInputStream.getBytesRead();
+            }
+            
+            request.setRequestorDN(requestorDn);
+            
+            // dispatch message to handler
+            ProtocolHandler desiredHandler = null;
+            for (final ProtocolHandler handler : getHandlers()) {
+                if (handler.canHandle(request)) {
+                    desiredHandler = handler;
+                    break;
+                }
+            }
+
+            // if no handler found, throw exception; otherwise handle request
+            if (desiredHandler == null) {
+                throw new ProtocolException("No handler assigned to handle message type: " + request.getType());
+            } else {
+                final ProtocolMessage response = desiredHandler.handle(request);
+                if(response != null) {
+                    try {
+                        logger.debug("Sending response for request {}", requestId);
+                            
+                        // marshal message to output stream
+                        final ProtocolMessageMarshaller<ProtocolMessage> marshaller = protocolContext.createMarshaller();
+                        marshaller.marshal(response, socket.getOutputStream());
+                    } catch (final IOException ioe) {
+                        throw new ProtocolException("Failed marshalling protocol message in response to message type: " + request.getType() + " due to " + ioe, ioe);
+                    }
+                }
+            }
+            
+            stopWatch.stop();
+            logger.info("Finished processing request {} (type={}, length={} bytes) in {} millis", requestId, request.getType(), receivedMessage.length, stopWatch.getDuration(TimeUnit.MILLISECONDS));
+        } catch (final IOException e) {
+            logger.warn("Failed processing protocol message from " + hostname + " due to " + e, e);
+            
+            if ( bulletinRepository != null ) {
+                final Bulletin bulletin = BulletinFactory.createBulletin("Clustering", "WARNING", String.format("Failed to process protocol message from %s due to: %s", hostname, e.toString()));
+                bulletinRepository.addBulletin(bulletin);
+            }
+        } catch (final ProtocolException e) {
+            logger.warn("Failed processing protocol message from " + hostname + " due to " + e, e);
+            if ( bulletinRepository != null ) {
+                final Bulletin bulletin = BulletinFactory.createBulletin("Clustering", "WARNING", String.format("Failed to process protocol message from %s due to: %s", hostname, e.toString()));
+                bulletinRepository.addBulletin(bulletin);
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/JaxbProtocolContext.java
----------------------------------------------------------------------
diff --git a/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/JaxbProtocolContext.java b/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/JaxbProtocolContext.java
new file mode 100644
index 0000000..bc68630
--- /dev/null
+++ b/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/JaxbProtocolContext.java
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.cluster.protocol.jaxb;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.EOFException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+
+import javax.xml.bind.JAXBContext;
+import javax.xml.bind.JAXBException;
+import javax.xml.bind.Marshaller;
+import javax.xml.bind.Unmarshaller;
+
+import org.apache.nifi.cluster.protocol.ProtocolContext;
+import org.apache.nifi.cluster.protocol.ProtocolMessageMarshaller;
+import org.apache.nifi.cluster.protocol.ProtocolMessageUnmarshaller;
+
+/**
+ * Implements a context for communicating internally amongst the cluster using
+ * JAXB.
+ * 
+ * @param <T> The type of protocol message.
+ *
+ * @author unattributed
+ */
+public class JaxbProtocolContext<T> implements ProtocolContext {
+
+    private static final int BUF_SIZE = (int) Math.pow(2, 10);  // 1k
+    
+    /*
+     * A sentinel is used to detect corrupted messages.  Relying on the integrity
+     * of the message size can cause memory issues if the value is corrupted 
+     * and equal to a number larger than the memory size.
+     */
+    private static final byte MESSAGE_PROTOCOL_START_SENTINEL = 0x5A;
+    
+    private final JAXBContext jaxbCtx;
+    
+    public JaxbProtocolContext(final JAXBContext jaxbCtx) {
+        this.jaxbCtx = jaxbCtx;
+    }
+    
+    @Override
+    public ProtocolMessageMarshaller<T> createMarshaller() {
+        return new ProtocolMessageMarshaller<T>() {
+
+            @Override
+            public void marshal(final T msg, final OutputStream os) throws IOException {
+
+                try {
+
+                    // marshal message to output stream
+                    final Marshaller marshaller = jaxbCtx.createMarshaller();
+                    final ByteArrayOutputStream msgBytes = new ByteArrayOutputStream();
+                    marshaller.marshal(msg, msgBytes);
+
+                    final DataOutputStream dos = new DataOutputStream(os);
+
+                    // write message protocol sentinel
+                    dos.write(MESSAGE_PROTOCOL_START_SENTINEL);
+                    
+                    // write message size in bytes
+                    dos.writeInt(msgBytes.size());
+
+                    // write message
+                    dos.write(msgBytes.toByteArray());
+
+                    dos.flush();
+
+                } catch (final JAXBException je) {
+                    throw new IOException("Failed marshalling protocol message due to: " + je, je);
+                }
+
+            }
+        };
+    }
+
+    @Override
+    public ProtocolMessageUnmarshaller<T> createUnmarshaller() {
+        return new ProtocolMessageUnmarshaller<T>() {
+
+            @Override
+            public T unmarshal(final InputStream is) throws IOException {
+
+                try {
+
+                    final DataInputStream dis = new DataInputStream(is);
+
+                    // check for the presence of the message protocol sentinel
+                    final byte sentinel = (byte) dis.read();
+                    if ( sentinel == -1 ) {
+                        throw new EOFException();
+                    }
+
+                    if(MESSAGE_PROTOCOL_START_SENTINEL != sentinel) {
+                        throw new IOException("Failed reading protocol message due to malformed header");
+                    }
+                    
+                    // read the message size
+                    final int msgBytesSize = dis.readInt();
+
+                    // read the message
+                    final ByteBuffer buffer = ByteBuffer.allocate(msgBytesSize);
+                    int totalBytesRead = 0;
+                    do {
+                        final int bytesToRead;
+                        if ((msgBytesSize - totalBytesRead) >= BUF_SIZE) {
+                            bytesToRead = BUF_SIZE;
+                        } else {
+                            bytesToRead = msgBytesSize - totalBytesRead;
+                        }
+                        totalBytesRead += dis.read(buffer.array(), totalBytesRead, bytesToRead);
+                    } while (totalBytesRead < msgBytesSize);
+
+                    // unmarshall message and return
+                    final Unmarshaller unmarshaller = jaxbCtx.createUnmarshaller();
+                    final byte[] msg = new byte[totalBytesRead];
+                    buffer.get(msg);
+                    return (T) unmarshaller.unmarshal(new ByteArrayInputStream(msg));
+
+                } catch (final JAXBException je) {
+                    throw new IOException("Failed unmarshalling protocol message due to: " + je, je);
+                }
+
+            }
+        };
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/AdaptedConnectionRequest.java
----------------------------------------------------------------------
diff --git a/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/AdaptedConnectionRequest.java b/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/AdaptedConnectionRequest.java
new file mode 100644
index 0000000..d9de24f
--- /dev/null
+++ b/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/AdaptedConnectionRequest.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.cluster.protocol.jaxb.message;
+
+import javax.xml.bind.annotation.adapters.XmlJavaTypeAdapter;
+import org.apache.nifi.cluster.protocol.NodeIdentifier;
+
+/**
+ * @author unattributed
+ */
+public class AdaptedConnectionRequest {
+    
+    private NodeIdentifier nodeIdentifier;
+    
+    public AdaptedConnectionRequest() {}
+
+    @XmlJavaTypeAdapter(NodeIdentifierAdapter.class)
+    public NodeIdentifier getNodeIdentifier() {
+        return nodeIdentifier;
+    }
+
+    public void setNodeIdentifier(final NodeIdentifier nodeIdentifier) {
+        this.nodeIdentifier = nodeIdentifier;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/AdaptedConnectionResponse.java
----------------------------------------------------------------------
diff --git a/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/AdaptedConnectionResponse.java b/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/AdaptedConnectionResponse.java
new file mode 100644
index 0000000..c7c783b
--- /dev/null
+++ b/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/AdaptedConnectionResponse.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.cluster.protocol.jaxb.message;
+
+import javax.xml.bind.annotation.adapters.XmlJavaTypeAdapter;
+
+import org.apache.nifi.cluster.protocol.NodeIdentifier;
+import org.apache.nifi.cluster.protocol.StandardDataFlow;
+
+/**
+ * @author unattributed
+ */
+public class AdaptedConnectionResponse {
+    
+    private StandardDataFlow dataFlow;
+    private NodeIdentifier nodeIdentifier;
+    private boolean blockedByFirewall;
+    private boolean primary;
+    private int tryLaterSeconds;
+    private Integer managerRemoteInputPort;
+    private Boolean managerRemoteCommsSecure;
+    private String instanceId;
+    
+    public AdaptedConnectionResponse() {}
+
+    @XmlJavaTypeAdapter(DataFlowAdapter.class)
+    public StandardDataFlow getDataFlow() {
+        return dataFlow;
+    }
+
+    public void setDataFlow(StandardDataFlow dataFlow) {
+        this.dataFlow = dataFlow;
+    }
+
+    @XmlJavaTypeAdapter(NodeIdentifierAdapter.class)
+    public NodeIdentifier getNodeIdentifier() {
+        return nodeIdentifier;
+    }
+
+    public void setNodeIdentifier(NodeIdentifier nodeIdentifier) {
+        this.nodeIdentifier = nodeIdentifier;
+    }
+
+    public int getTryLaterSeconds() {
+        return tryLaterSeconds;
+    }
+
+    public void setTryLaterSeconds(int tryLaterSeconds) {
+        this.tryLaterSeconds = tryLaterSeconds;
+    }
+
+    public boolean isBlockedByFirewall() {
+        return blockedByFirewall;
+    }
+
+    public void setBlockedByFirewall(boolean blockedByFirewall) {
+        this.blockedByFirewall = blockedByFirewall;
+    }
+
+    public boolean isPrimary() {
+        return primary;
+    }
+
+    public void setPrimary(boolean primary) {
+        this.primary = primary;
+    }
+
+    public boolean shouldTryLater() {
+        return tryLaterSeconds > 0;
+    }
+    
+    public void setManagerRemoteInputPort(Integer managerRemoteInputPort) {
+        this.managerRemoteInputPort = managerRemoteInputPort;
+    }
+    
+    public Integer getManagerRemoteInputPort() {
+        return managerRemoteInputPort;
+    }
+    
+    public void setManagerRemoteCommsSecure(Boolean secure) {
+        this.managerRemoteCommsSecure = secure;
+    }
+    
+    public Boolean isManagerRemoteCommsSecure() {
+        return managerRemoteCommsSecure;
+    }
+    
+    public void setInstanceId(String instanceId) {
+        this.instanceId = instanceId;
+    }
+    
+    public String getInstanceId() {
+        return instanceId;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/AdaptedCounter.java
----------------------------------------------------------------------
diff --git a/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/AdaptedCounter.java b/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/AdaptedCounter.java
new file mode 100644
index 0000000..89d903b
--- /dev/null
+++ b/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/AdaptedCounter.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.cluster.protocol.jaxb.message;
+
+/**
+ * @author unattributed
+ */
+public class AdaptedCounter {
+    
+    private String groupName;
+    
+    private String name;
+    
+    private long value;
+
+    public AdaptedCounter() {}
+
+    public String getGroupName() {
+        return groupName;
+    }
+
+    public void setGroupName(String counterGroupName) {
+        this.groupName = counterGroupName;
+    }
+
+    public String getName() {
+        return name;
+    }
+
+    public void setName(String counterName) {
+        this.name = counterName;
+    }
+
+    public long getValue() {
+        return value;
+    }
+
+    public void setValue(long value) {
+        this.value = value;
+    }
+    
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/AdaptedDataFlow.java
----------------------------------------------------------------------
diff --git a/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/AdaptedDataFlow.java b/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/AdaptedDataFlow.java
new file mode 100644
index 0000000..bb97619
--- /dev/null
+++ b/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/AdaptedDataFlow.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.cluster.protocol.jaxb.message;
+
+/**
+ * @author unattributed
+ */
+public class AdaptedDataFlow {
+    
+    private byte[] flow;
+    private byte[] templates;
+    private byte[] snippets;
+    
+    private boolean autoStartProcessors;
+    
+    public AdaptedDataFlow() {}
+
+    public byte[] getFlow() {
+        return flow;
+    }
+
+    public void setFlow(byte[] flow) {
+        this.flow = flow;
+    }
+
+    public byte[] getTemplates() {
+        return templates;
+    }
+
+    public void setTemplates(byte[] templates) {
+        this.templates = templates;
+    }
+
+    public byte[] getSnippets() {
+        return snippets;
+    }
+
+    public void setSnippets(byte[] snippets) {
+        this.snippets = snippets;
+    }
+
+    public boolean isAutoStartProcessors() {
+        return autoStartProcessors;
+    }
+
+    public void setAutoStartProcessors(boolean runningAllProcessors) {
+        this.autoStartProcessors = runningAllProcessors;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/AdaptedHeartbeat.java
----------------------------------------------------------------------
diff --git a/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/AdaptedHeartbeat.java b/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/AdaptedHeartbeat.java
new file mode 100644
index 0000000..5b9d9b7
--- /dev/null
+++ b/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/AdaptedHeartbeat.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.cluster.protocol.jaxb.message;
+
+import javax.xml.bind.annotation.adapters.XmlJavaTypeAdapter;
+import org.apache.nifi.cluster.protocol.NodeIdentifier;
+
+/**
+ * @author unattributed
+ */
+public class AdaptedHeartbeat {
+    
+    private NodeIdentifier nodeIdentifier;
+    private byte[] payload;
+    private boolean primary;
+    private boolean connected;
+    
+    public AdaptedHeartbeat() {}
+
+    @XmlJavaTypeAdapter(NodeIdentifierAdapter.class)
+    public NodeIdentifier getNodeIdentifier() {
+        return nodeIdentifier;
+    }
+
+    public void setNodeIdentifier(NodeIdentifier nodeIdentifier) {
+        this.nodeIdentifier = nodeIdentifier;
+    }
+    
+    public boolean isPrimary() {
+        return primary;
+    }
+
+    public void setPrimary(boolean primary) {
+        this.primary = primary;
+    }
+
+    public boolean isConnected() {
+        return connected;
+    }
+    
+    public void setConnected(boolean connected) {
+        this.connected = connected;
+    }
+    
+    public byte[] getPayload() {
+        return payload;
+    }
+
+    public void setPayload(byte[] payload) {
+        this.payload = payload;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/AdaptedNodeBulletins.java
----------------------------------------------------------------------
diff --git a/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/AdaptedNodeBulletins.java b/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/AdaptedNodeBulletins.java
new file mode 100644
index 0000000..98e2438
--- /dev/null
+++ b/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/AdaptedNodeBulletins.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.cluster.protocol.jaxb.message;
+
+import javax.xml.bind.annotation.adapters.XmlJavaTypeAdapter;
+import org.apache.nifi.cluster.protocol.NodeIdentifier;
+
+/**
+ * @author unattributed
+ */
+public class AdaptedNodeBulletins {
+    
+    private NodeIdentifier nodeIdentifier;
+    
+    private byte[] payload;
+    
+    public AdaptedNodeBulletins() {}
+
+    @XmlJavaTypeAdapter(NodeIdentifierAdapter.class)
+    public NodeIdentifier getNodeIdentifier() {
+        return nodeIdentifier;
+    }
+
+    public void setNodeIdentifier(NodeIdentifier nodeIdentifier) {
+        this.nodeIdentifier = nodeIdentifier;
+    }
+    
+    public byte[] getPayload() {
+        return payload;
+    }
+
+    public void setPayload(byte[] payload) {
+        this.payload = payload;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/AdaptedNodeIdentifier.java
----------------------------------------------------------------------
diff --git a/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/AdaptedNodeIdentifier.java b/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/AdaptedNodeIdentifier.java
new file mode 100644
index 0000000..8134ea3
--- /dev/null
+++ b/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/AdaptedNodeIdentifier.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.cluster.protocol.jaxb.message;
+
+/**
+ * @author unattributed
+ */
+public class AdaptedNodeIdentifier {
+    
+    private String id;
+    
+    private String apiAddress;
+
+    private int apiPort;    
+
+    private String socketAddress;
+    
+    private int socketPort;
+    
+    public AdaptedNodeIdentifier() {}
+
+    public String getApiAddress() {
+        return apiAddress;
+    }
+
+    public void setApiAddress(String apiAddress) {
+        this.apiAddress = apiAddress;
+    }
+
+    public int getApiPort() {
+        return apiPort;
+    }
+
+    public void setApiPort(int apiPort) {
+        this.apiPort = apiPort;
+    }
+
+    public String getId() {
+        return id;
+    }
+
+    public void setId(String id) {
+        this.id = id;
+    }
+
+    public String getSocketAddress() {
+        return socketAddress;
+    }
+
+    public void setSocketAddress(String socketAddress) {
+        this.socketAddress = socketAddress;
+    }
+
+    public int getSocketPort() {
+        return socketPort;
+    }
+
+    public void setSocketPort(int socketPort) {
+        this.socketPort = socketPort;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/ConnectionRequestAdapter.java
----------------------------------------------------------------------
diff --git a/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/ConnectionRequestAdapter.java b/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/ConnectionRequestAdapter.java
new file mode 100644
index 0000000..1f91cf1
--- /dev/null
+++ b/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/ConnectionRequestAdapter.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.cluster.protocol.jaxb.message;
+
+import javax.xml.bind.annotation.adapters.XmlAdapter;
+import org.apache.nifi.cluster.protocol.ConnectionRequest;
+
+/**
+ * @author unattributed
+ */
+public class ConnectionRequestAdapter extends XmlAdapter<AdaptedConnectionRequest, ConnectionRequest> {
+
+    @Override
+    public AdaptedConnectionRequest marshal(final ConnectionRequest cr) {
+        final AdaptedConnectionRequest aCr = new AdaptedConnectionRequest();
+        if(cr != null) {
+            aCr.setNodeIdentifier(cr.getProposedNodeIdentifier());
+        }
+        return aCr;
+    }
+
+    @Override
+    public ConnectionRequest unmarshal(final AdaptedConnectionRequest aCr) {
+        return new ConnectionRequest(aCr.getNodeIdentifier());
+    }
+ 
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/ConnectionResponseAdapter.java
----------------------------------------------------------------------
diff --git a/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/ConnectionResponseAdapter.java b/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/ConnectionResponseAdapter.java
new file mode 100644
index 0000000..143bab0
--- /dev/null
+++ b/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/ConnectionResponseAdapter.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.cluster.protocol.jaxb.message;
+
+import javax.xml.bind.annotation.adapters.XmlAdapter;
+import org.apache.nifi.cluster.protocol.ConnectionResponse;
+
+/**
+ * @author unattributed
+ */
+public class ConnectionResponseAdapter extends XmlAdapter<AdaptedConnectionResponse, ConnectionResponse> {
+
+    @Override
+    public AdaptedConnectionResponse marshal(final ConnectionResponse cr) {
+        final AdaptedConnectionResponse aCr = new AdaptedConnectionResponse();
+        if(cr != null) {
+            aCr.setDataFlow(cr.getDataFlow());
+            aCr.setNodeIdentifier(cr.getNodeIdentifier());
+            aCr.setTryLaterSeconds(cr.getTryLaterSeconds());
+            aCr.setBlockedByFirewall(cr.isBlockedByFirewall());
+            aCr.setPrimary(cr.isPrimary());
+            aCr.setManagerRemoteInputPort(cr.getManagerRemoteInputPort());
+            aCr.setManagerRemoteCommsSecure(cr.isManagerRemoteCommsSecure());
+            aCr.setInstanceId(cr.getInstanceId());
+        }
+        return aCr;
+    }
+
+    @Override
+    public ConnectionResponse unmarshal(final AdaptedConnectionResponse aCr) {
+        if(aCr.shouldTryLater()) {
+            return new ConnectionResponse(aCr.getTryLaterSeconds());
+        } else if(aCr.isBlockedByFirewall()) {
+            return ConnectionResponse.createBlockedByFirewallResponse();
+        } else {
+            return new ConnectionResponse(aCr.getNodeIdentifier(), aCr.getDataFlow(), aCr.isPrimary(), 
+                aCr.getManagerRemoteInputPort(), aCr.isManagerRemoteCommsSecure(), aCr.getInstanceId());
+        }
+    }
+ 
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/DataFlowAdapter.java
----------------------------------------------------------------------
diff --git a/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/DataFlowAdapter.java b/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/DataFlowAdapter.java
new file mode 100644
index 0000000..8d9467f
--- /dev/null
+++ b/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/DataFlowAdapter.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.cluster.protocol.jaxb.message;
+
+import javax.xml.bind.annotation.adapters.XmlAdapter;
+
+import org.apache.nifi.cluster.protocol.StandardDataFlow;
+
+/**
+ * @author unattributed
+ */
+public class DataFlowAdapter extends XmlAdapter<AdaptedDataFlow, StandardDataFlow> {
+
+    @Override
+    public AdaptedDataFlow marshal(final StandardDataFlow df) {
+        
+        final AdaptedDataFlow aDf = new AdaptedDataFlow();
+        
+        if(df != null) {
+            aDf.setFlow(df.getFlow());
+            aDf.setTemplates(df.getTemplates());
+            aDf.setSnippets(df.getSnippets());
+            aDf.setAutoStartProcessors(df.isAutoStartProcessors());
+        }
+        
+        return aDf;
+    }
+
+    @Override
+    public StandardDataFlow unmarshal(final AdaptedDataFlow aDf) {
+        final StandardDataFlow dataFlow = new StandardDataFlow(aDf.getFlow(), aDf.getTemplates(), aDf.getSnippets());
+        dataFlow.setAutoStartProcessors(aDf.isAutoStartProcessors());
+        return dataFlow;
+    }
+ 
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/HeartbeatAdapter.java
----------------------------------------------------------------------
diff --git a/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/HeartbeatAdapter.java b/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/HeartbeatAdapter.java
new file mode 100644
index 0000000..0e073b6
--- /dev/null
+++ b/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/HeartbeatAdapter.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.cluster.protocol.jaxb.message;
+
+import javax.xml.bind.annotation.adapters.XmlAdapter;
+import org.apache.nifi.cluster.protocol.Heartbeat;
+
+/**
+ * @author unattributed
+ */
+public class HeartbeatAdapter extends XmlAdapter<AdaptedHeartbeat, Heartbeat> {
+
+    @Override
+    public AdaptedHeartbeat marshal(final Heartbeat hb) {
+        
+        final AdaptedHeartbeat aHb = new AdaptedHeartbeat();
+        
+        if(hb != null) {
+            // set node identifier
+            aHb.setNodeIdentifier(hb.getNodeIdentifier());
+
+            // set payload
+            aHb.setPayload(hb.getPayload());
+            
+            // set leader flag
+            aHb.setPrimary(hb.isPrimary());
+            
+            // set connected flag
+            aHb.setConnected(hb.isConnected());
+        }
+        
+        return aHb;
+    }
+
+    @Override
+    public Heartbeat unmarshal(final AdaptedHeartbeat aHb) {
+        return new Heartbeat(aHb.getNodeIdentifier(), aHb.isPrimary(), aHb.isConnected(), aHb.getPayload());
+    }
+ 
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/JaxbProtocolUtils.java
----------------------------------------------------------------------
diff --git a/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/JaxbProtocolUtils.java b/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/JaxbProtocolUtils.java
new file mode 100644
index 0000000..c3a57f5
--- /dev/null
+++ b/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/JaxbProtocolUtils.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.cluster.protocol.jaxb.message;
+
+import javax.xml.bind.JAXBContext;
+import javax.xml.bind.JAXBException;
+
+/**
+ * @author unattributed
+ */
+public final class JaxbProtocolUtils {
+    
+    public static final String JAXB_CONTEXT_PATH = ObjectFactory.class.getPackage().getName();
+
+    public static final JAXBContext JAXB_CONTEXT = initializeJaxbContext();
+    
+    /**
+     * Load the JAXBContext version.
+     */
+    private static JAXBContext initializeJaxbContext() {
+        try {
+            return JAXBContext.newInstance(JAXB_CONTEXT_PATH);
+        } catch (JAXBException e) {
+            throw new RuntimeException("Unable to create JAXBContext.", e);
+        }
+    }
+    
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/NodeBulletinsAdapter.java
----------------------------------------------------------------------
diff --git a/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/NodeBulletinsAdapter.java b/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/NodeBulletinsAdapter.java
new file mode 100644
index 0000000..1ae41f7
--- /dev/null
+++ b/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/NodeBulletinsAdapter.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.cluster.protocol.jaxb.message;
+
+import javax.xml.bind.annotation.adapters.XmlAdapter;
+import org.apache.nifi.cluster.protocol.NodeBulletins;
+
+/**
+ * @author unattributed
+ */
+public class NodeBulletinsAdapter extends XmlAdapter<AdaptedNodeBulletins, NodeBulletins> {
+
+    @Override
+    public AdaptedNodeBulletins marshal(final NodeBulletins hb) {
+        
+        final AdaptedNodeBulletins adaptedBulletins = new AdaptedNodeBulletins();
+        
+        if(hb != null) {
+            // set node identifier
+            adaptedBulletins.setNodeIdentifier(hb.getNodeIdentifier());
+
+            // set payload
+            adaptedBulletins.setPayload(hb.getPayload());
+        }
+        
+        return adaptedBulletins;
+    }
+
+    @Override
+    public NodeBulletins unmarshal(final AdaptedNodeBulletins adaptedBulletins) {
+        return new NodeBulletins(adaptedBulletins.getNodeIdentifier(), adaptedBulletins.getPayload());
+    }
+ 
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/NodeIdentifierAdapter.java
----------------------------------------------------------------------
diff --git a/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/NodeIdentifierAdapter.java b/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/NodeIdentifierAdapter.java
new file mode 100644
index 0000000..fe2d8a4
--- /dev/null
+++ b/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/NodeIdentifierAdapter.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.cluster.protocol.jaxb.message;
+
+import javax.xml.bind.annotation.adapters.XmlAdapter;
+import org.apache.nifi.cluster.protocol.NodeIdentifier;
+
+/**
+ * @author unattributed
+ */
+public class NodeIdentifierAdapter extends XmlAdapter<AdaptedNodeIdentifier, NodeIdentifier> {
+
+    @Override
+    public AdaptedNodeIdentifier marshal(final NodeIdentifier ni) {
+        if(ni == null) {
+            return null;
+        } else {
+            final AdaptedNodeIdentifier aNi = new AdaptedNodeIdentifier();
+            aNi.setId(ni.getId());
+            aNi.setApiAddress(ni.getApiAddress());
+            aNi.setApiPort(ni.getApiPort());
+            aNi.setSocketAddress(ni.getSocketAddress());
+            aNi.setSocketPort(ni.getSocketPort());
+            return aNi;
+        }
+    }
+
+    @Override
+    public NodeIdentifier unmarshal(final AdaptedNodeIdentifier aNi) {
+        if(aNi == null) {
+            return null;
+        } else {
+            return new NodeIdentifier(aNi.getId(), aNi.getApiAddress(), aNi.getApiPort(), aNi.getSocketAddress(), aNi.getSocketPort());
+        }
+    }
+ 
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/ObjectFactory.java
----------------------------------------------------------------------
diff --git a/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/ObjectFactory.java b/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/ObjectFactory.java
new file mode 100644
index 0000000..1613536
--- /dev/null
+++ b/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/ObjectFactory.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.cluster.protocol.jaxb.message;
+
+import javax.xml.bind.annotation.XmlRegistry;
+
+import org.apache.nifi.cluster.protocol.message.ConnectionRequestMessage;
+import org.apache.nifi.cluster.protocol.message.ConnectionResponseMessage;
+import org.apache.nifi.cluster.protocol.message.ControllerStartupFailureMessage;
+import org.apache.nifi.cluster.protocol.message.DisconnectMessage;
+import org.apache.nifi.cluster.protocol.message.FlowRequestMessage;
+import org.apache.nifi.cluster.protocol.message.FlowResponseMessage;
+import org.apache.nifi.cluster.protocol.message.HeartbeatMessage;
+import org.apache.nifi.cluster.protocol.message.MulticastProtocolMessage;
+import org.apache.nifi.cluster.protocol.message.NodeBulletinsMessage;
+import org.apache.nifi.cluster.protocol.message.PingMessage;
+import org.apache.nifi.cluster.protocol.message.PrimaryRoleAssignmentMessage;
+import org.apache.nifi.cluster.protocol.message.ReconnectionFailureMessage;
+import org.apache.nifi.cluster.protocol.message.ReconnectionRequestMessage;
+import org.apache.nifi.cluster.protocol.message.ReconnectionResponseMessage;
+import org.apache.nifi.cluster.protocol.message.ServiceBroadcastMessage;
+
+/**
+ * @author unattributed
+ */
+@XmlRegistry
+public class ObjectFactory {
+    
+    public ObjectFactory() {}
+    
+    public ReconnectionRequestMessage createReconnectionRequestMessage() {
+        return new ReconnectionRequestMessage();
+    }
+    
+    public ReconnectionFailureMessage createReconnectionFailureMessage() {
+        return new ReconnectionFailureMessage();
+    }
+    
+    public ReconnectionResponseMessage createReconnectionResponseMessage() {
+        return new ReconnectionResponseMessage();
+    }
+    
+    public DisconnectMessage createDisconnectionMessage() {
+        return new DisconnectMessage();
+    }
+    
+    public ConnectionRequestMessage createConnectionRequestMessage() {
+        return new ConnectionRequestMessage();
+    }
+    
+    public ConnectionResponseMessage createConnectionResponseMessage() {
+        return new ConnectionResponseMessage();
+    }
+    
+    public ServiceBroadcastMessage createServiceBroadcastMessage() {
+        return new ServiceBroadcastMessage();
+    }
+    
+    public HeartbeatMessage createHeartbeatMessage() {
+        return new HeartbeatMessage();
+    }
+    
+    public FlowRequestMessage createFlowRequestMessage() {
+        return new FlowRequestMessage();
+    }
+    
+    public FlowResponseMessage createFlowResponseMessage() {
+        return new FlowResponseMessage();
+    }
+    
+    public PingMessage createPingMessage() {
+        return new PingMessage();
+    }
+    
+    public MulticastProtocolMessage createMulticastProtocolMessage() {
+        return new MulticastProtocolMessage();
+    }
+    
+    public ControllerStartupFailureMessage createControllerStartupFailureMessage() {
+        return new ControllerStartupFailureMessage();
+    }
+    
+    public PrimaryRoleAssignmentMessage createPrimaryRoleAssignmentMessage() {
+        return new PrimaryRoleAssignmentMessage();
+    }
+    
+    public NodeBulletinsMessage createBulletinsMessage() {
+        return new NodeBulletinsMessage();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/message/ConnectionRequestMessage.java
----------------------------------------------------------------------
diff --git a/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/message/ConnectionRequestMessage.java b/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/message/ConnectionRequestMessage.java
new file mode 100644
index 0000000..344de4e
--- /dev/null
+++ b/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/message/ConnectionRequestMessage.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.cluster.protocol.message;
+
+import javax.xml.bind.annotation.XmlRootElement;
+
+import org.apache.nifi.cluster.protocol.ConnectionRequest;
+
+/**
+ * @author unattributed
+ */
+@XmlRootElement(name = "connectionRequestMessage")
+public class ConnectionRequestMessage extends ProtocolMessage {
+    
+    private ConnectionRequest connectionRequest;
+    
+    public ConnectionRequestMessage() {}
+    
+    public ConnectionRequest getConnectionRequest() {
+        return connectionRequest;
+    }
+
+    public void setConnectionRequest(ConnectionRequest connectionRequest) {
+        this.connectionRequest = connectionRequest;
+    }
+    
+    @Override
+    public MessageType getType() {
+        return MessageType.CONNECTION_REQUEST;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/message/ConnectionResponseMessage.java
----------------------------------------------------------------------
diff --git a/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/message/ConnectionResponseMessage.java b/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/message/ConnectionResponseMessage.java
new file mode 100644
index 0000000..a262d7a
--- /dev/null
+++ b/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/message/ConnectionResponseMessage.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.cluster.protocol.message;
+
+import javax.xml.bind.annotation.XmlRootElement;
+import org.apache.nifi.cluster.protocol.ConnectionResponse;
+
+/**
+ * @author unattributed
+ */
+@XmlRootElement(name = "connectionResponseMessage")
+public class ConnectionResponseMessage extends ProtocolMessage {
+    
+    private ConnectionResponse connectionResponse;
+    private String clusterManagerDN;
+    
+    public ConnectionResponseMessage() {}
+
+    public ConnectionResponse getConnectionResponse() {
+        return connectionResponse;
+    }
+
+    public void setConnectionResponse(final ConnectionResponse connectionResponse) {
+        this.connectionResponse = connectionResponse;
+        
+        if ( clusterManagerDN != null ) {
+            this.connectionResponse.setClusterManagerDN(clusterManagerDN);
+        }
+    }
+    
+    public void setClusterManagerDN(final String dn) {
+        if ( connectionResponse != null ) {
+            connectionResponse.setClusterManagerDN(dn);
+        }
+        this.clusterManagerDN = dn;
+    }
+    
+    /**
+     * Returns the DN of the NCM, if it is available or <code>null</code> otherwise.
+     * 
+     * @return
+     */
+    public String getClusterManagerDN() {
+        return clusterManagerDN;
+    }
+
+    @Override
+    public MessageType getType() {
+        return MessageType.CONNECTION_RESPONSE;
+    }
+    
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/message/ControllerStartupFailureMessage.java
----------------------------------------------------------------------
diff --git a/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/message/ControllerStartupFailureMessage.java b/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/message/ControllerStartupFailureMessage.java
new file mode 100644
index 0000000..ebc1cae
--- /dev/null
+++ b/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/message/ControllerStartupFailureMessage.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.cluster.protocol.message;
+
+import javax.xml.bind.annotation.XmlRootElement;
+import javax.xml.bind.annotation.adapters.XmlJavaTypeAdapter;
+
+import org.apache.nifi.cluster.protocol.NodeIdentifier;
+import org.apache.nifi.cluster.protocol.jaxb.message.NodeIdentifierAdapter;
+
+/**
+ * @author unattributed
+ */
+@XmlRootElement(name = "controllerStartupFailureMessage")
+public class ControllerStartupFailureMessage extends ExceptionMessage {
+
+    private NodeIdentifier nodeId;
+    
+    public ControllerStartupFailureMessage() {}
+    
+    @Override
+    public MessageType getType() {
+        return MessageType.CONTROLLER_STARTUP_FAILURE;
+    }
+
+    @XmlJavaTypeAdapter(NodeIdentifierAdapter.class)
+    public NodeIdentifier getNodeId() {
+        return nodeId;
+    }
+
+    public void setNodeId(NodeIdentifier nodeId) {
+        this.nodeId = nodeId;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/message/DisconnectMessage.java
----------------------------------------------------------------------
diff --git a/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/message/DisconnectMessage.java b/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/message/DisconnectMessage.java
new file mode 100644
index 0000000..8aa7a40
--- /dev/null
+++ b/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/message/DisconnectMessage.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.cluster.protocol.message;
+
+import javax.xml.bind.annotation.XmlRootElement;
+import javax.xml.bind.annotation.adapters.XmlJavaTypeAdapter;
+import org.apache.nifi.cluster.protocol.NodeIdentifier;
+import org.apache.nifi.cluster.protocol.jaxb.message.NodeIdentifierAdapter;
+
+/**
+ * @author unattributed
+ */
+@XmlRootElement(name = "disconnectionMessage")
+public class DisconnectMessage extends ProtocolMessage {
+    
+    private NodeIdentifier nodeId;
+    private String explanation;
+ 
+    @XmlJavaTypeAdapter(NodeIdentifierAdapter.class)
+    public NodeIdentifier getNodeId() {
+        return nodeId;
+    }
+
+    public void setNodeId(NodeIdentifier nodeId) {
+        this.nodeId = nodeId;
+    }
+
+    public String getExplanation() {
+        return explanation;
+    }
+
+    public void setExplanation(String explanation) {
+        this.explanation = explanation;
+    }
+    
+    @Override
+    public MessageType getType() {
+        return MessageType.DISCONNECTION_REQUEST;
+    }
+    
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/message/ExceptionMessage.java
----------------------------------------------------------------------
diff --git a/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/message/ExceptionMessage.java b/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/message/ExceptionMessage.java
new file mode 100644
index 0000000..99a6dee
--- /dev/null
+++ b/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/message/ExceptionMessage.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.cluster.protocol.message;
+
+import javax.xml.bind.annotation.XmlRootElement;
+
+/**
+ * @author unattributed
+ */
+@XmlRootElement(name = "exceptionMessage")
+public class ExceptionMessage extends ProtocolMessage {
+    
+    private String exceptionMessage;
+
+    public ExceptionMessage() {}
+
+    public String getExceptionMessage() {
+        return exceptionMessage;
+    }
+
+    public void setExceptionMessage(String exceptionMessage) {
+        this.exceptionMessage = exceptionMessage;
+    }
+    
+    @Override
+    public MessageType getType() {
+        return MessageType.EXCEPTION;
+    }
+    
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/message/FlowRequestMessage.java
----------------------------------------------------------------------
diff --git a/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/message/FlowRequestMessage.java b/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/message/FlowRequestMessage.java
new file mode 100644
index 0000000..4a10538
--- /dev/null
+++ b/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/message/FlowRequestMessage.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.cluster.protocol.message;
+
+import javax.xml.bind.annotation.XmlRootElement;
+import javax.xml.bind.annotation.adapters.XmlJavaTypeAdapter;
+import org.apache.nifi.cluster.protocol.NodeIdentifier;
+import org.apache.nifi.cluster.protocol.jaxb.message.NodeIdentifierAdapter;
+
+/**
+ * @author unattributed
+ */
+@XmlRootElement(name = "flowRequestMessage")
+public class FlowRequestMessage extends ProtocolMessage {
+    
+    private NodeIdentifier nodeId;
+
+    @XmlJavaTypeAdapter(NodeIdentifierAdapter.class)
+    public NodeIdentifier getNodeId() {
+        return nodeId;
+    }
+
+    public void setNodeId(NodeIdentifier nodeId) {
+        this.nodeId = nodeId;
+    }
+    
+    @Override
+    public MessageType getType() {
+        return MessageType.FLOW_REQUEST;
+    }
+    
+}