You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by jo...@apache.org on 2014/12/08 21:29:40 UTC
[17/51] [partial] incubator-nifi git commit: Initial code contribution
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/NodeProtocolSenderImpl.java
----------------------------------------------------------------------
diff --git a/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/NodeProtocolSenderImpl.java b/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/NodeProtocolSenderImpl.java
new file mode 100644
index 0000000..dc86d24
--- /dev/null
+++ b/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/NodeProtocolSenderImpl.java
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.cluster.protocol.impl;
+
+import java.io.IOException;
+import java.net.Socket;
+
+import javax.net.ssl.SSLSocket;
+import javax.security.cert.X509Certificate;
+
+import org.apache.nifi.cluster.protocol.NodeProtocolSender;
+import org.apache.nifi.cluster.protocol.ProtocolContext;
+import org.apache.nifi.cluster.protocol.ProtocolException;
+import org.apache.nifi.cluster.protocol.ProtocolMessageMarshaller;
+import org.apache.nifi.cluster.protocol.ProtocolMessageUnmarshaller;
+import org.apache.nifi.cluster.protocol.UnknownServiceAddressException;
+import org.apache.nifi.cluster.protocol.message.ConnectionRequestMessage;
+import org.apache.nifi.cluster.protocol.message.ConnectionResponseMessage;
+import org.apache.nifi.cluster.protocol.message.ControllerStartupFailureMessage;
+import org.apache.nifi.cluster.protocol.message.HeartbeatMessage;
+import org.apache.nifi.cluster.protocol.message.NodeBulletinsMessage;
+import org.apache.nifi.cluster.protocol.message.ProtocolMessage;
+import org.apache.nifi.cluster.protocol.message.ProtocolMessage.MessageType;
+import org.apache.nifi.cluster.protocol.message.ReconnectionFailureMessage;
+import org.apache.nifi.io.socket.SocketConfiguration;
+import org.apache.nifi.io.socket.SocketUtils;
+import org.apache.nifi.io.socket.multicast.DiscoverableService;
+
+public class NodeProtocolSenderImpl implements NodeProtocolSender {
+ private final SocketConfiguration socketConfiguration;
+ private final ClusterServiceLocator clusterManagerProtocolServiceLocator;
+ private final ProtocolContext<ProtocolMessage> protocolContext;
+
+ public NodeProtocolSenderImpl(final ClusterServiceLocator clusterManagerProtocolServiceLocator,
+ final SocketConfiguration socketConfiguration, final ProtocolContext<ProtocolMessage> protocolContext) {
+ if(clusterManagerProtocolServiceLocator == null) {
+ throw new IllegalArgumentException("Protocol Service Locator may not be null.");
+ } else if(socketConfiguration == null) {
+ throw new IllegalArgumentException("Socket configuration may not be null.");
+ } else if(protocolContext == null) {
+ throw new IllegalArgumentException("Protocol Context may not be null.");
+ }
+
+ this.clusterManagerProtocolServiceLocator = clusterManagerProtocolServiceLocator;
+ this.socketConfiguration = socketConfiguration;
+ this.protocolContext = protocolContext;
+ }
+
+
+ @Override
+ public ConnectionResponseMessage requestConnection(final ConnectionRequestMessage msg) throws ProtocolException, UnknownServiceAddressException {
+ Socket socket = null;
+ try {
+ socket = createSocket();
+
+ String ncmDn = null;
+ if ( socket instanceof SSLSocket ) {
+ final SSLSocket sslSocket = (SSLSocket) socket;
+ try {
+ final X509Certificate[] certChains = sslSocket.getSession().getPeerCertificateChain();
+ if ( certChains != null && certChains.length > 0 ) {
+ ncmDn = certChains[0].getSubjectDN().getName();
+ }
+ } catch (final ProtocolException pe) {
+ throw pe;
+ } catch (final Exception e) {
+ throw new ProtocolException(e);
+ }
+ }
+
+ try {
+ // marshal message to output stream
+ final ProtocolMessageMarshaller<ProtocolMessage> marshaller = protocolContext.createMarshaller();
+ marshaller.marshal(msg, socket.getOutputStream());
+ } catch(final IOException ioe) {
+ throw new ProtocolException("Failed marshalling '" + msg.getType() + "' protocol message due to: " + ioe, ioe);
+ }
+
+ final ProtocolMessage response;
+ try {
+ // unmarshall response and return
+ final ProtocolMessageUnmarshaller<ProtocolMessage> unmarshaller = protocolContext.createUnmarshaller();
+ response = unmarshaller.unmarshal(socket.getInputStream());
+ } catch(final IOException ioe) {
+ throw new ProtocolException("Failed unmarshalling '" + MessageType.CONNECTION_RESPONSE + "' protocol message due to: " + ioe, ioe);
+ }
+
+ if(MessageType.CONNECTION_RESPONSE == response.getType()) {
+ final ConnectionResponseMessage connectionResponse = (ConnectionResponseMessage) response;
+ connectionResponse.setClusterManagerDN(ncmDn);
+ return connectionResponse;
+ } else {
+ throw new ProtocolException("Expected message type '" + MessageType.CONNECTION_RESPONSE + "' but found '" + response.getType() + "'");
+ }
+ } finally {
+ SocketUtils.closeQuietly(socket);
+ }
+ }
+
+
+ @Override
+ public void heartbeat(final HeartbeatMessage msg) throws ProtocolException, UnknownServiceAddressException {
+ sendProtocolMessage(msg);
+ }
+
+ @Override
+ public void sendBulletins(NodeBulletinsMessage msg) throws ProtocolException, UnknownServiceAddressException {
+ sendProtocolMessage(msg);
+ }
+
+ @Override
+ public void notifyControllerStartupFailure(final ControllerStartupFailureMessage msg) throws ProtocolException, UnknownServiceAddressException {
+ sendProtocolMessage(msg);
+ }
+
+ @Override
+ public void notifyReconnectionFailure(ReconnectionFailureMessage msg) throws ProtocolException, UnknownServiceAddressException {
+ sendProtocolMessage(msg);
+ }
+
+ private Socket createSocket() {
+ // determine the cluster manager's address
+ final DiscoverableService service = clusterManagerProtocolServiceLocator.getService();
+ if(service == null) {
+ throw new UnknownServiceAddressException("Cluster Manager's service is not known. Verify a cluster manager is running.");
+ }
+
+ try {
+ // create a socket
+ return SocketUtils.createSocket(service.getServiceAddress(), socketConfiguration);
+ } catch(final IOException ioe) {
+ throw new ProtocolException("Failed to create socket due to: " + ioe, ioe);
+ }
+ }
+
+ private void sendProtocolMessage(final ProtocolMessage msg) {
+ Socket socket = null;
+ try {
+ socket = createSocket();
+
+ try {
+ // marshal message to output stream
+ final ProtocolMessageMarshaller<ProtocolMessage> marshaller = protocolContext.createMarshaller();
+ marshaller.marshal(msg, socket.getOutputStream());
+ } catch(final IOException ioe) {
+ throw new ProtocolException("Failed marshalling '" + msg.getType() + "' protocol message due to: " + ioe, ioe);
+ }
+ } finally {
+ SocketUtils.closeQuietly(socket);
+ }
+ }
+
+ public SocketConfiguration getSocketConfiguration() {
+ return socketConfiguration;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/NodeProtocolSenderListener.java
----------------------------------------------------------------------
diff --git a/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/NodeProtocolSenderListener.java b/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/NodeProtocolSenderListener.java
new file mode 100644
index 0000000..4b359f4
--- /dev/null
+++ b/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/NodeProtocolSenderListener.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.cluster.protocol.impl;
+
+import java.io.IOException;
+import java.util.Collection;
+
+import org.apache.nifi.cluster.protocol.NodeProtocolSender;
+import org.apache.nifi.cluster.protocol.ProtocolException;
+import org.apache.nifi.cluster.protocol.ProtocolHandler;
+import org.apache.nifi.cluster.protocol.ProtocolListener;
+import org.apache.nifi.cluster.protocol.UnknownServiceAddressException;
+import org.apache.nifi.cluster.protocol.message.NodeBulletinsMessage;
+import org.apache.nifi.cluster.protocol.message.ConnectionRequestMessage;
+import org.apache.nifi.cluster.protocol.message.ConnectionResponseMessage;
+import org.apache.nifi.cluster.protocol.message.ControllerStartupFailureMessage;
+import org.apache.nifi.cluster.protocol.message.HeartbeatMessage;
+import org.apache.nifi.cluster.protocol.message.ReconnectionFailureMessage;
+import org.apache.nifi.reporting.BulletinRepository;
+
+public class NodeProtocolSenderListener implements NodeProtocolSender, ProtocolListener {
+
+ private final NodeProtocolSender sender;
+ private final ProtocolListener listener;
+
+ public NodeProtocolSenderListener(final NodeProtocolSender sender, final ProtocolListener listener) {
+ if(sender == null) {
+ throw new IllegalArgumentException("NodeProtocolSender may not be null.");
+ } else if(listener == null) {
+ throw new IllegalArgumentException("ProtocolListener may not be null.");
+ }
+ this.sender = sender;
+ this.listener = listener;
+ }
+
+ @Override
+ public void stop() throws IOException {
+ if(!isRunning()) {
+ throw new IllegalStateException("Instance is already stopped.");
+ }
+ listener.stop();
+ }
+
+ @Override
+ public void start() throws IOException {
+ if(isRunning()) {
+ throw new IllegalStateException("Instance is already started.");
+ }
+ listener.start();
+ }
+
+ @Override
+ public boolean isRunning() {
+ return listener.isRunning();
+ }
+
+ @Override
+ public boolean removeHandler(final ProtocolHandler handler) {
+ return listener.removeHandler(handler);
+ }
+
+ @Override
+ public Collection<ProtocolHandler> getHandlers() {
+ return listener.getHandlers();
+ }
+
+ @Override
+ public void addHandler(final ProtocolHandler handler) {
+ listener.addHandler(handler);
+ }
+
+ @Override
+ public void heartbeat(final HeartbeatMessage msg) throws ProtocolException, UnknownServiceAddressException {
+ sender.heartbeat(msg);
+ }
+
+ @Override
+ public ConnectionResponseMessage requestConnection(final ConnectionRequestMessage msg) throws ProtocolException, UnknownServiceAddressException {
+ return sender.requestConnection(msg);
+ }
+
+ @Override
+ public void notifyControllerStartupFailure(final ControllerStartupFailureMessage msg) throws ProtocolException, UnknownServiceAddressException {
+ sender.notifyControllerStartupFailure(msg);
+ }
+
+ @Override
+ public void notifyReconnectionFailure(final ReconnectionFailureMessage msg) throws ProtocolException, UnknownServiceAddressException {
+ sender.notifyReconnectionFailure(msg);
+ }
+
+ @Override
+ public void sendBulletins(NodeBulletinsMessage msg) throws ProtocolException, UnknownServiceAddressException {
+ sender.sendBulletins(msg);
+ }
+
+ @Override
+ public void setBulletinRepository(final BulletinRepository bulletinRepository) {
+ listener.setBulletinRepository(bulletinRepository);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/SocketProtocolListener.java
----------------------------------------------------------------------
diff --git a/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/SocketProtocolListener.java b/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/SocketProtocolListener.java
new file mode 100644
index 0000000..ca30d9b
--- /dev/null
+++ b/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/SocketProtocolListener.java
@@ -0,0 +1,205 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.cluster.protocol.impl;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.Socket;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.UUID;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.TimeUnit;
+
+import javax.net.ssl.SSLSocket;
+import javax.security.cert.X509Certificate;
+
+import org.apache.nifi.cluster.protocol.ProtocolContext;
+import org.apache.nifi.cluster.protocol.ProtocolException;
+import org.apache.nifi.cluster.protocol.ProtocolHandler;
+import org.apache.nifi.cluster.protocol.ProtocolListener;
+import org.apache.nifi.cluster.protocol.ProtocolMessageMarshaller;
+import org.apache.nifi.cluster.protocol.ProtocolMessageUnmarshaller;
+import org.apache.nifi.cluster.protocol.message.ProtocolMessage;
+import org.apache.nifi.events.BulletinFactory;
+import org.apache.nifi.io.socket.ServerSocketConfiguration;
+import org.apache.nifi.io.socket.SocketListener;
+import org.apache.nifi.logging.NiFiLog;
+import org.apache.nifi.reporting.Bulletin;
+import org.apache.nifi.reporting.BulletinRepository;
+import org.apache.nifi.util.StopWatch;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Implements a listener for protocol messages sent over unicast socket.
+ *
+ * @author unattributed
+ */
+public class SocketProtocolListener extends SocketListener implements ProtocolListener {
+
+ private static final Logger logger = new NiFiLog(LoggerFactory.getLogger(SocketProtocolListener.class));
+ private final ProtocolContext<ProtocolMessage> protocolContext;
+ private final Collection<ProtocolHandler> handlers = new CopyOnWriteArrayList<>();
+ private volatile BulletinRepository bulletinRepository;
+
+ public SocketProtocolListener(
+ final int numThreads,
+ final int port,
+ final ServerSocketConfiguration configuration,
+ final ProtocolContext<ProtocolMessage> protocolContext) {
+
+ super(numThreads, port, configuration);
+
+ if(protocolContext == null) {
+ throw new IllegalArgumentException("Protocol Context may not be null.");
+ }
+
+ this.protocolContext = protocolContext;
+ }
+
+ @Override
+ public void setBulletinRepository(final BulletinRepository bulletinRepository) {
+ this.bulletinRepository = bulletinRepository;
+ }
+
+ @Override
+ public void start() throws IOException {
+
+ if(super.isRunning()) {
+ throw new IllegalStateException("Instance is already started.");
+ }
+
+ super.start();
+ }
+
+ @Override
+ public void stop() throws IOException {
+
+ if(super.isRunning() == false) {
+ throw new IOException("Instance is already stopped.");
+ }
+
+ super.stop();
+
+ }
+
+ @Override
+ public Collection<ProtocolHandler> getHandlers() {
+ return Collections.unmodifiableCollection(handlers);
+ }
+
+ @Override
+ public void addHandler(final ProtocolHandler handler) {
+ if(handler == null) {
+ throw new NullPointerException("Protocol handler may not be null.");
+ }
+ handlers.add(handler);
+ }
+
+ @Override
+ public boolean removeHandler(final ProtocolHandler handler) {
+ return handlers.remove(handler);
+ }
+
+ @Override
+ public void dispatchRequest(final Socket socket) {
+ byte[] receivedMessage = null;
+ String hostname = null;
+ final int maxMsgBuffer = 1024 * 1024; // don't buffer more than 1 MB of the message
+ try {
+ final StopWatch stopWatch = new StopWatch(true);
+ hostname = socket.getInetAddress().getHostName();
+ final String requestId = UUID.randomUUID().toString();
+ logger.info("Received request {} from {}", requestId, hostname);
+
+ String requestorDn = null;
+ if ( socket instanceof SSLSocket ) {
+ final SSLSocket sslSocket = (SSLSocket) socket;
+ try {
+ final X509Certificate[] certChains = sslSocket.getSession().getPeerCertificateChain();
+ if ( certChains != null && certChains.length > 0 ) {
+ requestorDn = certChains[0].getSubjectDN().getName();
+ }
+ } catch (final ProtocolException pe) {
+ throw pe;
+ } catch (final Exception e) {
+ throw new ProtocolException(e);
+ }
+ }
+
+ // unmarshall message
+ final ProtocolMessageUnmarshaller<ProtocolMessage> unmarshaller = protocolContext.createUnmarshaller();
+ final InputStream inStream = socket.getInputStream();
+ final CopyingInputStream copyingInputStream = new CopyingInputStream(inStream, maxMsgBuffer); // don't copy more than 1 MB
+ logger.debug("Request {} has a message length of {}", requestId, copyingInputStream.getNumberOfBytesCopied());
+
+ final ProtocolMessage request;
+ try {
+ request = unmarshaller.unmarshal(copyingInputStream);
+ } finally {
+ receivedMessage = copyingInputStream.getBytesRead();
+ }
+
+ request.setRequestorDN(requestorDn);
+
+ // dispatch message to handler
+ ProtocolHandler desiredHandler = null;
+ for (final ProtocolHandler handler : getHandlers()) {
+ if (handler.canHandle(request)) {
+ desiredHandler = handler;
+ break;
+ }
+ }
+
+ // if no handler found, throw exception; otherwise handle request
+ if (desiredHandler == null) {
+ throw new ProtocolException("No handler assigned to handle message type: " + request.getType());
+ } else {
+ final ProtocolMessage response = desiredHandler.handle(request);
+ if(response != null) {
+ try {
+ logger.debug("Sending response for request {}", requestId);
+
+ // marshal message to output stream
+ final ProtocolMessageMarshaller<ProtocolMessage> marshaller = protocolContext.createMarshaller();
+ marshaller.marshal(response, socket.getOutputStream());
+ } catch (final IOException ioe) {
+ throw new ProtocolException("Failed marshalling protocol message in response to message type: " + request.getType() + " due to " + ioe, ioe);
+ }
+ }
+ }
+
+ stopWatch.stop();
+ logger.info("Finished processing request {} (type={}, length={} bytes) in {} millis", requestId, request.getType(), receivedMessage.length, stopWatch.getDuration(TimeUnit.MILLISECONDS));
+ } catch (final IOException e) {
+ logger.warn("Failed processing protocol message from " + hostname + " due to " + e, e);
+
+ if ( bulletinRepository != null ) {
+ final Bulletin bulletin = BulletinFactory.createBulletin("Clustering", "WARNING", String.format("Failed to process protocol message from %s due to: %s", hostname, e.toString()));
+ bulletinRepository.addBulletin(bulletin);
+ }
+ } catch (final ProtocolException e) {
+ logger.warn("Failed processing protocol message from " + hostname + " due to " + e, e);
+ if ( bulletinRepository != null ) {
+ final Bulletin bulletin = BulletinFactory.createBulletin("Clustering", "WARNING", String.format("Failed to process protocol message from %s due to: %s", hostname, e.toString()));
+ bulletinRepository.addBulletin(bulletin);
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/JaxbProtocolContext.java
----------------------------------------------------------------------
diff --git a/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/JaxbProtocolContext.java b/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/JaxbProtocolContext.java
new file mode 100644
index 0000000..bc68630
--- /dev/null
+++ b/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/JaxbProtocolContext.java
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.cluster.protocol.jaxb;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.EOFException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+
+import javax.xml.bind.JAXBContext;
+import javax.xml.bind.JAXBException;
+import javax.xml.bind.Marshaller;
+import javax.xml.bind.Unmarshaller;
+
+import org.apache.nifi.cluster.protocol.ProtocolContext;
+import org.apache.nifi.cluster.protocol.ProtocolMessageMarshaller;
+import org.apache.nifi.cluster.protocol.ProtocolMessageUnmarshaller;
+
+/**
+ * Implements a context for communicating internally amongst the cluster using
+ * JAXB.
+ *
+ * @param <T> The type of protocol message.
+ *
+ * @author unattributed
+ */
+public class JaxbProtocolContext<T> implements ProtocolContext {
+
+ private static final int BUF_SIZE = (int) Math.pow(2, 10); // 1k
+
+ /*
+ * A sentinel is used to detect corrupted messages. Relying on the integrity
+ * of the message size can cause memory issues if the value is corrupted
+ * and equal to a number larger than the memory size.
+ */
+ private static final byte MESSAGE_PROTOCOL_START_SENTINEL = 0x5A;
+
+ private final JAXBContext jaxbCtx;
+
+ public JaxbProtocolContext(final JAXBContext jaxbCtx) {
+ this.jaxbCtx = jaxbCtx;
+ }
+
+ @Override
+ public ProtocolMessageMarshaller<T> createMarshaller() {
+ return new ProtocolMessageMarshaller<T>() {
+
+ @Override
+ public void marshal(final T msg, final OutputStream os) throws IOException {
+
+ try {
+
+ // marshal message to output stream
+ final Marshaller marshaller = jaxbCtx.createMarshaller();
+ final ByteArrayOutputStream msgBytes = new ByteArrayOutputStream();
+ marshaller.marshal(msg, msgBytes);
+
+ final DataOutputStream dos = new DataOutputStream(os);
+
+ // write message protocol sentinel
+ dos.write(MESSAGE_PROTOCOL_START_SENTINEL);
+
+ // write message size in bytes
+ dos.writeInt(msgBytes.size());
+
+ // write message
+ dos.write(msgBytes.toByteArray());
+
+ dos.flush();
+
+ } catch (final JAXBException je) {
+ throw new IOException("Failed marshalling protocol message due to: " + je, je);
+ }
+
+ }
+ };
+ }
+
+ @Override
+ public ProtocolMessageUnmarshaller<T> createUnmarshaller() {
+ return new ProtocolMessageUnmarshaller<T>() {
+
+ @Override
+ public T unmarshal(final InputStream is) throws IOException {
+
+ try {
+
+ final DataInputStream dis = new DataInputStream(is);
+
+ // check for the presence of the message protocol sentinel
+ final byte sentinel = (byte) dis.read();
+ if ( sentinel == -1 ) {
+ throw new EOFException();
+ }
+
+ if(MESSAGE_PROTOCOL_START_SENTINEL != sentinel) {
+ throw new IOException("Failed reading protocol message due to malformed header");
+ }
+
+ // read the message size
+ final int msgBytesSize = dis.readInt();
+
+ // read the message
+ final ByteBuffer buffer = ByteBuffer.allocate(msgBytesSize);
+ int totalBytesRead = 0;
+ do {
+ final int bytesToRead;
+ if ((msgBytesSize - totalBytesRead) >= BUF_SIZE) {
+ bytesToRead = BUF_SIZE;
+ } else {
+ bytesToRead = msgBytesSize - totalBytesRead;
+ }
+ totalBytesRead += dis.read(buffer.array(), totalBytesRead, bytesToRead);
+ } while (totalBytesRead < msgBytesSize);
+
+ // unmarshall message and return
+ final Unmarshaller unmarshaller = jaxbCtx.createUnmarshaller();
+ final byte[] msg = new byte[totalBytesRead];
+ buffer.get(msg);
+ return (T) unmarshaller.unmarshal(new ByteArrayInputStream(msg));
+
+ } catch (final JAXBException je) {
+ throw new IOException("Failed unmarshalling protocol message due to: " + je, je);
+ }
+
+ }
+ };
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/AdaptedConnectionRequest.java
----------------------------------------------------------------------
diff --git a/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/AdaptedConnectionRequest.java b/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/AdaptedConnectionRequest.java
new file mode 100644
index 0000000..d9de24f
--- /dev/null
+++ b/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/AdaptedConnectionRequest.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.cluster.protocol.jaxb.message;
+
+import javax.xml.bind.annotation.adapters.XmlJavaTypeAdapter;
+import org.apache.nifi.cluster.protocol.NodeIdentifier;
+
+/**
+ * @author unattributed
+ */
+public class AdaptedConnectionRequest {
+
+ private NodeIdentifier nodeIdentifier;
+
+ public AdaptedConnectionRequest() {}
+
+ @XmlJavaTypeAdapter(NodeIdentifierAdapter.class)
+ public NodeIdentifier getNodeIdentifier() {
+ return nodeIdentifier;
+ }
+
+ public void setNodeIdentifier(final NodeIdentifier nodeIdentifier) {
+ this.nodeIdentifier = nodeIdentifier;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/AdaptedConnectionResponse.java
----------------------------------------------------------------------
diff --git a/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/AdaptedConnectionResponse.java b/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/AdaptedConnectionResponse.java
new file mode 100644
index 0000000..c7c783b
--- /dev/null
+++ b/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/AdaptedConnectionResponse.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.cluster.protocol.jaxb.message;
+
+import javax.xml.bind.annotation.adapters.XmlJavaTypeAdapter;
+
+import org.apache.nifi.cluster.protocol.NodeIdentifier;
+import org.apache.nifi.cluster.protocol.StandardDataFlow;
+
+/**
+ * @author unattributed
+ */
+public class AdaptedConnectionResponse {
+
+ private StandardDataFlow dataFlow;
+ private NodeIdentifier nodeIdentifier;
+ private boolean blockedByFirewall;
+ private boolean primary;
+ private int tryLaterSeconds;
+ private Integer managerRemoteInputPort;
+ private Boolean managerRemoteCommsSecure;
+ private String instanceId;
+
+ public AdaptedConnectionResponse() {}
+
+ @XmlJavaTypeAdapter(DataFlowAdapter.class)
+ public StandardDataFlow getDataFlow() {
+ return dataFlow;
+ }
+
+ public void setDataFlow(StandardDataFlow dataFlow) {
+ this.dataFlow = dataFlow;
+ }
+
+ @XmlJavaTypeAdapter(NodeIdentifierAdapter.class)
+ public NodeIdentifier getNodeIdentifier() {
+ return nodeIdentifier;
+ }
+
+ public void setNodeIdentifier(NodeIdentifier nodeIdentifier) {
+ this.nodeIdentifier = nodeIdentifier;
+ }
+
+ public int getTryLaterSeconds() {
+ return tryLaterSeconds;
+ }
+
+ public void setTryLaterSeconds(int tryLaterSeconds) {
+ this.tryLaterSeconds = tryLaterSeconds;
+ }
+
+ public boolean isBlockedByFirewall() {
+ return blockedByFirewall;
+ }
+
+ public void setBlockedByFirewall(boolean blockedByFirewall) {
+ this.blockedByFirewall = blockedByFirewall;
+ }
+
+ public boolean isPrimary() {
+ return primary;
+ }
+
+ public void setPrimary(boolean primary) {
+ this.primary = primary;
+ }
+
+ public boolean shouldTryLater() {
+ return tryLaterSeconds > 0;
+ }
+
+ public void setManagerRemoteInputPort(Integer managerRemoteInputPort) {
+ this.managerRemoteInputPort = managerRemoteInputPort;
+ }
+
+ public Integer getManagerRemoteInputPort() {
+ return managerRemoteInputPort;
+ }
+
+ public void setManagerRemoteCommsSecure(Boolean secure) {
+ this.managerRemoteCommsSecure = secure;
+ }
+
+ public Boolean isManagerRemoteCommsSecure() {
+ return managerRemoteCommsSecure;
+ }
+
+ public void setInstanceId(String instanceId) {
+ this.instanceId = instanceId;
+ }
+
+ public String getInstanceId() {
+ return instanceId;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/AdaptedCounter.java
----------------------------------------------------------------------
diff --git a/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/AdaptedCounter.java b/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/AdaptedCounter.java
new file mode 100644
index 0000000..89d903b
--- /dev/null
+++ b/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/AdaptedCounter.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.cluster.protocol.jaxb.message;
+
+/**
+ * @author unattributed
+ */
+public class AdaptedCounter {
+
+ private String groupName;
+
+ private String name;
+
+ private long value;
+
+ public AdaptedCounter() {}
+
+ public String getGroupName() {
+ return groupName;
+ }
+
+ public void setGroupName(String counterGroupName) {
+ this.groupName = counterGroupName;
+ }
+
+ public String getName() {
+ return name;
+ }
+
+ public void setName(String counterName) {
+ this.name = counterName;
+ }
+
+ public long getValue() {
+ return value;
+ }
+
+ public void setValue(long value) {
+ this.value = value;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/AdaptedDataFlow.java
----------------------------------------------------------------------
diff --git a/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/AdaptedDataFlow.java b/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/AdaptedDataFlow.java
new file mode 100644
index 0000000..bb97619
--- /dev/null
+++ b/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/AdaptedDataFlow.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.cluster.protocol.jaxb.message;
+
+/**
+ * @author unattributed
+ */
+public class AdaptedDataFlow {
+
+ private byte[] flow;
+ private byte[] templates;
+ private byte[] snippets;
+
+ private boolean autoStartProcessors;
+
+ public AdaptedDataFlow() {}
+
+ public byte[] getFlow() {
+ return flow;
+ }
+
+ public void setFlow(byte[] flow) {
+ this.flow = flow;
+ }
+
+ public byte[] getTemplates() {
+ return templates;
+ }
+
+ public void setTemplates(byte[] templates) {
+ this.templates = templates;
+ }
+
+ public byte[] getSnippets() {
+ return snippets;
+ }
+
+ public void setSnippets(byte[] snippets) {
+ this.snippets = snippets;
+ }
+
+ public boolean isAutoStartProcessors() {
+ return autoStartProcessors;
+ }
+
+ public void setAutoStartProcessors(boolean runningAllProcessors) {
+ this.autoStartProcessors = runningAllProcessors;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/AdaptedHeartbeat.java
----------------------------------------------------------------------
diff --git a/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/AdaptedHeartbeat.java b/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/AdaptedHeartbeat.java
new file mode 100644
index 0000000..5b9d9b7
--- /dev/null
+++ b/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/AdaptedHeartbeat.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.cluster.protocol.jaxb.message;
+
+import javax.xml.bind.annotation.adapters.XmlJavaTypeAdapter;
+import org.apache.nifi.cluster.protocol.NodeIdentifier;
+
+/**
+ * @author unattributed
+ */
+public class AdaptedHeartbeat {
+
+ private NodeIdentifier nodeIdentifier;
+ private byte[] payload;
+ private boolean primary;
+ private boolean connected;
+
+ public AdaptedHeartbeat() {}
+
+ @XmlJavaTypeAdapter(NodeIdentifierAdapter.class)
+ public NodeIdentifier getNodeIdentifier() {
+ return nodeIdentifier;
+ }
+
+ public void setNodeIdentifier(NodeIdentifier nodeIdentifier) {
+ this.nodeIdentifier = nodeIdentifier;
+ }
+
+ public boolean isPrimary() {
+ return primary;
+ }
+
+ public void setPrimary(boolean primary) {
+ this.primary = primary;
+ }
+
+ public boolean isConnected() {
+ return connected;
+ }
+
+ public void setConnected(boolean connected) {
+ this.connected = connected;
+ }
+
+ public byte[] getPayload() {
+ return payload;
+ }
+
+ public void setPayload(byte[] payload) {
+ this.payload = payload;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/AdaptedNodeBulletins.java
----------------------------------------------------------------------
diff --git a/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/AdaptedNodeBulletins.java b/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/AdaptedNodeBulletins.java
new file mode 100644
index 0000000..98e2438
--- /dev/null
+++ b/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/AdaptedNodeBulletins.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.cluster.protocol.jaxb.message;
+
+import javax.xml.bind.annotation.adapters.XmlJavaTypeAdapter;
+import org.apache.nifi.cluster.protocol.NodeIdentifier;
+
+/**
+ * @author unattributed
+ */
+public class AdaptedNodeBulletins {
+
+ private NodeIdentifier nodeIdentifier;
+
+ private byte[] payload;
+
+ public AdaptedNodeBulletins() {}
+
+ @XmlJavaTypeAdapter(NodeIdentifierAdapter.class)
+ public NodeIdentifier getNodeIdentifier() {
+ return nodeIdentifier;
+ }
+
+ public void setNodeIdentifier(NodeIdentifier nodeIdentifier) {
+ this.nodeIdentifier = nodeIdentifier;
+ }
+
+ public byte[] getPayload() {
+ return payload;
+ }
+
+ public void setPayload(byte[] payload) {
+ this.payload = payload;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/AdaptedNodeIdentifier.java
----------------------------------------------------------------------
diff --git a/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/AdaptedNodeIdentifier.java b/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/AdaptedNodeIdentifier.java
new file mode 100644
index 0000000..8134ea3
--- /dev/null
+++ b/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/AdaptedNodeIdentifier.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.cluster.protocol.jaxb.message;
+
+/**
+ * @author unattributed
+ */
+public class AdaptedNodeIdentifier {
+
+ private String id;
+
+ private String apiAddress;
+
+ private int apiPort;
+
+ private String socketAddress;
+
+ private int socketPort;
+
+ public AdaptedNodeIdentifier() {}
+
+ public String getApiAddress() {
+ return apiAddress;
+ }
+
+ public void setApiAddress(String apiAddress) {
+ this.apiAddress = apiAddress;
+ }
+
+ public int getApiPort() {
+ return apiPort;
+ }
+
+ public void setApiPort(int apiPort) {
+ this.apiPort = apiPort;
+ }
+
+ public String getId() {
+ return id;
+ }
+
+ public void setId(String id) {
+ this.id = id;
+ }
+
+ public String getSocketAddress() {
+ return socketAddress;
+ }
+
+ public void setSocketAddress(String socketAddress) {
+ this.socketAddress = socketAddress;
+ }
+
+ public int getSocketPort() {
+ return socketPort;
+ }
+
+ public void setSocketPort(int socketPort) {
+ this.socketPort = socketPort;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/ConnectionRequestAdapter.java
----------------------------------------------------------------------
diff --git a/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/ConnectionRequestAdapter.java b/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/ConnectionRequestAdapter.java
new file mode 100644
index 0000000..1f91cf1
--- /dev/null
+++ b/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/ConnectionRequestAdapter.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.cluster.protocol.jaxb.message;
+
+import javax.xml.bind.annotation.adapters.XmlAdapter;
+import org.apache.nifi.cluster.protocol.ConnectionRequest;
+
+/**
+ * @author unattributed
+ */
+public class ConnectionRequestAdapter extends XmlAdapter<AdaptedConnectionRequest, ConnectionRequest> {
+
+ @Override
+ public AdaptedConnectionRequest marshal(final ConnectionRequest cr) {
+ final AdaptedConnectionRequest aCr = new AdaptedConnectionRequest();
+ if(cr != null) {
+ aCr.setNodeIdentifier(cr.getProposedNodeIdentifier());
+ }
+ return aCr;
+ }
+
+ @Override
+ public ConnectionRequest unmarshal(final AdaptedConnectionRequest aCr) {
+ return new ConnectionRequest(aCr.getNodeIdentifier());
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/ConnectionResponseAdapter.java
----------------------------------------------------------------------
diff --git a/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/ConnectionResponseAdapter.java b/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/ConnectionResponseAdapter.java
new file mode 100644
index 0000000..143bab0
--- /dev/null
+++ b/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/ConnectionResponseAdapter.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.cluster.protocol.jaxb.message;
+
+import javax.xml.bind.annotation.adapters.XmlAdapter;
+import org.apache.nifi.cluster.protocol.ConnectionResponse;
+
+/**
+ * @author unattributed
+ */
+public class ConnectionResponseAdapter extends XmlAdapter<AdaptedConnectionResponse, ConnectionResponse> {
+
+ @Override
+ public AdaptedConnectionResponse marshal(final ConnectionResponse cr) {
+ final AdaptedConnectionResponse aCr = new AdaptedConnectionResponse();
+ if(cr != null) {
+ aCr.setDataFlow(cr.getDataFlow());
+ aCr.setNodeIdentifier(cr.getNodeIdentifier());
+ aCr.setTryLaterSeconds(cr.getTryLaterSeconds());
+ aCr.setBlockedByFirewall(cr.isBlockedByFirewall());
+ aCr.setPrimary(cr.isPrimary());
+ aCr.setManagerRemoteInputPort(cr.getManagerRemoteInputPort());
+ aCr.setManagerRemoteCommsSecure(cr.isManagerRemoteCommsSecure());
+ aCr.setInstanceId(cr.getInstanceId());
+ }
+ return aCr;
+ }
+
+ @Override
+ public ConnectionResponse unmarshal(final AdaptedConnectionResponse aCr) {
+ if(aCr.shouldTryLater()) {
+ return new ConnectionResponse(aCr.getTryLaterSeconds());
+ } else if(aCr.isBlockedByFirewall()) {
+ return ConnectionResponse.createBlockedByFirewallResponse();
+ } else {
+ return new ConnectionResponse(aCr.getNodeIdentifier(), aCr.getDataFlow(), aCr.isPrimary(),
+ aCr.getManagerRemoteInputPort(), aCr.isManagerRemoteCommsSecure(), aCr.getInstanceId());
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/DataFlowAdapter.java
----------------------------------------------------------------------
diff --git a/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/DataFlowAdapter.java b/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/DataFlowAdapter.java
new file mode 100644
index 0000000..8d9467f
--- /dev/null
+++ b/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/DataFlowAdapter.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.cluster.protocol.jaxb.message;
+
+import javax.xml.bind.annotation.adapters.XmlAdapter;
+
+import org.apache.nifi.cluster.protocol.StandardDataFlow;
+
+/**
+ * @author unattributed
+ */
+public class DataFlowAdapter extends XmlAdapter<AdaptedDataFlow, StandardDataFlow> {
+
+ @Override
+ public AdaptedDataFlow marshal(final StandardDataFlow df) {
+
+ final AdaptedDataFlow aDf = new AdaptedDataFlow();
+
+ if(df != null) {
+ aDf.setFlow(df.getFlow());
+ aDf.setTemplates(df.getTemplates());
+ aDf.setSnippets(df.getSnippets());
+ aDf.setAutoStartProcessors(df.isAutoStartProcessors());
+ }
+
+ return aDf;
+ }
+
+ @Override
+ public StandardDataFlow unmarshal(final AdaptedDataFlow aDf) {
+ final StandardDataFlow dataFlow = new StandardDataFlow(aDf.getFlow(), aDf.getTemplates(), aDf.getSnippets());
+ dataFlow.setAutoStartProcessors(aDf.isAutoStartProcessors());
+ return dataFlow;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/HeartbeatAdapter.java
----------------------------------------------------------------------
diff --git a/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/HeartbeatAdapter.java b/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/HeartbeatAdapter.java
new file mode 100644
index 0000000..0e073b6
--- /dev/null
+++ b/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/HeartbeatAdapter.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.cluster.protocol.jaxb.message;
+
+import javax.xml.bind.annotation.adapters.XmlAdapter;
+import org.apache.nifi.cluster.protocol.Heartbeat;
+
+/**
+ * @author unattributed
+ */
+public class HeartbeatAdapter extends XmlAdapter<AdaptedHeartbeat, Heartbeat> {
+
+ @Override
+ public AdaptedHeartbeat marshal(final Heartbeat hb) {
+
+ final AdaptedHeartbeat aHb = new AdaptedHeartbeat();
+
+ if(hb != null) {
+ // set node identifier
+ aHb.setNodeIdentifier(hb.getNodeIdentifier());
+
+ // set payload
+ aHb.setPayload(hb.getPayload());
+
+ // set leader flag
+ aHb.setPrimary(hb.isPrimary());
+
+ // set connected flag
+ aHb.setConnected(hb.isConnected());
+ }
+
+ return aHb;
+ }
+
+ @Override
+ public Heartbeat unmarshal(final AdaptedHeartbeat aHb) {
+ return new Heartbeat(aHb.getNodeIdentifier(), aHb.isPrimary(), aHb.isConnected(), aHb.getPayload());
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/JaxbProtocolUtils.java
----------------------------------------------------------------------
diff --git a/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/JaxbProtocolUtils.java b/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/JaxbProtocolUtils.java
new file mode 100644
index 0000000..c3a57f5
--- /dev/null
+++ b/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/JaxbProtocolUtils.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.cluster.protocol.jaxb.message;
+
+import javax.xml.bind.JAXBContext;
+import javax.xml.bind.JAXBException;
+
+/**
+ * @author unattributed
+ */
+public final class JaxbProtocolUtils {
+
+ public static final String JAXB_CONTEXT_PATH = ObjectFactory.class.getPackage().getName();
+
+ public static final JAXBContext JAXB_CONTEXT = initializeJaxbContext();
+
+ /**
+ * Load the JAXBContext version.
+ */
+ private static JAXBContext initializeJaxbContext() {
+ try {
+ return JAXBContext.newInstance(JAXB_CONTEXT_PATH);
+ } catch (JAXBException e) {
+ throw new RuntimeException("Unable to create JAXBContext.", e);
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/NodeBulletinsAdapter.java
----------------------------------------------------------------------
diff --git a/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/NodeBulletinsAdapter.java b/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/NodeBulletinsAdapter.java
new file mode 100644
index 0000000..1ae41f7
--- /dev/null
+++ b/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/NodeBulletinsAdapter.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.cluster.protocol.jaxb.message;
+
+import javax.xml.bind.annotation.adapters.XmlAdapter;
+import org.apache.nifi.cluster.protocol.NodeBulletins;
+
+/**
+ * @author unattributed
+ */
+public class NodeBulletinsAdapter extends XmlAdapter<AdaptedNodeBulletins, NodeBulletins> {
+
+ @Override
+ public AdaptedNodeBulletins marshal(final NodeBulletins hb) {
+
+ final AdaptedNodeBulletins adaptedBulletins = new AdaptedNodeBulletins();
+
+ if(hb != null) {
+ // set node identifier
+ adaptedBulletins.setNodeIdentifier(hb.getNodeIdentifier());
+
+ // set payload
+ adaptedBulletins.setPayload(hb.getPayload());
+ }
+
+ return adaptedBulletins;
+ }
+
+ @Override
+ public NodeBulletins unmarshal(final AdaptedNodeBulletins adaptedBulletins) {
+ return new NodeBulletins(adaptedBulletins.getNodeIdentifier(), adaptedBulletins.getPayload());
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/NodeIdentifierAdapter.java
----------------------------------------------------------------------
diff --git a/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/NodeIdentifierAdapter.java b/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/NodeIdentifierAdapter.java
new file mode 100644
index 0000000..fe2d8a4
--- /dev/null
+++ b/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/NodeIdentifierAdapter.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.cluster.protocol.jaxb.message;
+
+import javax.xml.bind.annotation.adapters.XmlAdapter;
+import org.apache.nifi.cluster.protocol.NodeIdentifier;
+
+/**
+ * @author unattributed
+ */
+public class NodeIdentifierAdapter extends XmlAdapter<AdaptedNodeIdentifier, NodeIdentifier> {
+
+ @Override
+ public AdaptedNodeIdentifier marshal(final NodeIdentifier ni) {
+ if(ni == null) {
+ return null;
+ } else {
+ final AdaptedNodeIdentifier aNi = new AdaptedNodeIdentifier();
+ aNi.setId(ni.getId());
+ aNi.setApiAddress(ni.getApiAddress());
+ aNi.setApiPort(ni.getApiPort());
+ aNi.setSocketAddress(ni.getSocketAddress());
+ aNi.setSocketPort(ni.getSocketPort());
+ return aNi;
+ }
+ }
+
+ @Override
+ public NodeIdentifier unmarshal(final AdaptedNodeIdentifier aNi) {
+ if(aNi == null) {
+ return null;
+ } else {
+ return new NodeIdentifier(aNi.getId(), aNi.getApiAddress(), aNi.getApiPort(), aNi.getSocketAddress(), aNi.getSocketPort());
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/ObjectFactory.java
----------------------------------------------------------------------
diff --git a/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/ObjectFactory.java b/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/ObjectFactory.java
new file mode 100644
index 0000000..1613536
--- /dev/null
+++ b/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/ObjectFactory.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.cluster.protocol.jaxb.message;
+
+import javax.xml.bind.annotation.XmlRegistry;
+
+import org.apache.nifi.cluster.protocol.message.ConnectionRequestMessage;
+import org.apache.nifi.cluster.protocol.message.ConnectionResponseMessage;
+import org.apache.nifi.cluster.protocol.message.ControllerStartupFailureMessage;
+import org.apache.nifi.cluster.protocol.message.DisconnectMessage;
+import org.apache.nifi.cluster.protocol.message.FlowRequestMessage;
+import org.apache.nifi.cluster.protocol.message.FlowResponseMessage;
+import org.apache.nifi.cluster.protocol.message.HeartbeatMessage;
+import org.apache.nifi.cluster.protocol.message.MulticastProtocolMessage;
+import org.apache.nifi.cluster.protocol.message.NodeBulletinsMessage;
+import org.apache.nifi.cluster.protocol.message.PingMessage;
+import org.apache.nifi.cluster.protocol.message.PrimaryRoleAssignmentMessage;
+import org.apache.nifi.cluster.protocol.message.ReconnectionFailureMessage;
+import org.apache.nifi.cluster.protocol.message.ReconnectionRequestMessage;
+import org.apache.nifi.cluster.protocol.message.ReconnectionResponseMessage;
+import org.apache.nifi.cluster.protocol.message.ServiceBroadcastMessage;
+
+/**
+ * @author unattributed
+ */
+@XmlRegistry
+public class ObjectFactory {
+
+ public ObjectFactory() {}
+
+ public ReconnectionRequestMessage createReconnectionRequestMessage() {
+ return new ReconnectionRequestMessage();
+ }
+
+ public ReconnectionFailureMessage createReconnectionFailureMessage() {
+ return new ReconnectionFailureMessage();
+ }
+
+ public ReconnectionResponseMessage createReconnectionResponseMessage() {
+ return new ReconnectionResponseMessage();
+ }
+
+ public DisconnectMessage createDisconnectionMessage() {
+ return new DisconnectMessage();
+ }
+
+ public ConnectionRequestMessage createConnectionRequestMessage() {
+ return new ConnectionRequestMessage();
+ }
+
+ public ConnectionResponseMessage createConnectionResponseMessage() {
+ return new ConnectionResponseMessage();
+ }
+
+ public ServiceBroadcastMessage createServiceBroadcastMessage() {
+ return new ServiceBroadcastMessage();
+ }
+
+ public HeartbeatMessage createHeartbeatMessage() {
+ return new HeartbeatMessage();
+ }
+
+ public FlowRequestMessage createFlowRequestMessage() {
+ return new FlowRequestMessage();
+ }
+
+ public FlowResponseMessage createFlowResponseMessage() {
+ return new FlowResponseMessage();
+ }
+
+ public PingMessage createPingMessage() {
+ return new PingMessage();
+ }
+
+ public MulticastProtocolMessage createMulticastProtocolMessage() {
+ return new MulticastProtocolMessage();
+ }
+
+ public ControllerStartupFailureMessage createControllerStartupFailureMessage() {
+ return new ControllerStartupFailureMessage();
+ }
+
+ public PrimaryRoleAssignmentMessage createPrimaryRoleAssignmentMessage() {
+ return new PrimaryRoleAssignmentMessage();
+ }
+
+ public NodeBulletinsMessage createBulletinsMessage() {
+ return new NodeBulletinsMessage();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/message/ConnectionRequestMessage.java
----------------------------------------------------------------------
diff --git a/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/message/ConnectionRequestMessage.java b/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/message/ConnectionRequestMessage.java
new file mode 100644
index 0000000..344de4e
--- /dev/null
+++ b/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/message/ConnectionRequestMessage.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.cluster.protocol.message;
+
+import javax.xml.bind.annotation.XmlRootElement;
+
+import org.apache.nifi.cluster.protocol.ConnectionRequest;
+
+/**
+ * @author unattributed
+ */
+@XmlRootElement(name = "connectionRequestMessage")
+public class ConnectionRequestMessage extends ProtocolMessage {
+
+ private ConnectionRequest connectionRequest;
+
+ public ConnectionRequestMessage() {}
+
+ public ConnectionRequest getConnectionRequest() {
+ return connectionRequest;
+ }
+
+ public void setConnectionRequest(ConnectionRequest connectionRequest) {
+ this.connectionRequest = connectionRequest;
+ }
+
+ @Override
+ public MessageType getType() {
+ return MessageType.CONNECTION_REQUEST;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/message/ConnectionResponseMessage.java
----------------------------------------------------------------------
diff --git a/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/message/ConnectionResponseMessage.java b/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/message/ConnectionResponseMessage.java
new file mode 100644
index 0000000..a262d7a
--- /dev/null
+++ b/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/message/ConnectionResponseMessage.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.cluster.protocol.message;
+
+import javax.xml.bind.annotation.XmlRootElement;
+import org.apache.nifi.cluster.protocol.ConnectionResponse;
+
+/**
+ * @author unattributed
+ */
+@XmlRootElement(name = "connectionResponseMessage")
+public class ConnectionResponseMessage extends ProtocolMessage {
+
+ private ConnectionResponse connectionResponse;
+ private String clusterManagerDN;
+
+ public ConnectionResponseMessage() {}
+
+ public ConnectionResponse getConnectionResponse() {
+ return connectionResponse;
+ }
+
+ public void setConnectionResponse(final ConnectionResponse connectionResponse) {
+ this.connectionResponse = connectionResponse;
+
+ if ( clusterManagerDN != null ) {
+ this.connectionResponse.setClusterManagerDN(clusterManagerDN);
+ }
+ }
+
+ public void setClusterManagerDN(final String dn) {
+ if ( connectionResponse != null ) {
+ connectionResponse.setClusterManagerDN(dn);
+ }
+ this.clusterManagerDN = dn;
+ }
+
+ /**
+ * Returns the DN of the NCM, if it is available or <code>null</code> otherwise.
+ *
+ * @return
+ */
+ public String getClusterManagerDN() {
+ return clusterManagerDN;
+ }
+
+ @Override
+ public MessageType getType() {
+ return MessageType.CONNECTION_RESPONSE;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/message/ControllerStartupFailureMessage.java
----------------------------------------------------------------------
diff --git a/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/message/ControllerStartupFailureMessage.java b/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/message/ControllerStartupFailureMessage.java
new file mode 100644
index 0000000..ebc1cae
--- /dev/null
+++ b/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/message/ControllerStartupFailureMessage.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.cluster.protocol.message;
+
+import javax.xml.bind.annotation.XmlRootElement;
+import javax.xml.bind.annotation.adapters.XmlJavaTypeAdapter;
+
+import org.apache.nifi.cluster.protocol.NodeIdentifier;
+import org.apache.nifi.cluster.protocol.jaxb.message.NodeIdentifierAdapter;
+
+/**
+ * @author unattributed
+ */
+@XmlRootElement(name = "controllerStartupFailureMessage")
+public class ControllerStartupFailureMessage extends ExceptionMessage {
+
+ private NodeIdentifier nodeId;
+
+ public ControllerStartupFailureMessage() {}
+
+ @Override
+ public MessageType getType() {
+ return MessageType.CONTROLLER_STARTUP_FAILURE;
+ }
+
+ @XmlJavaTypeAdapter(NodeIdentifierAdapter.class)
+ public NodeIdentifier getNodeId() {
+ return nodeId;
+ }
+
+ public void setNodeId(NodeIdentifier nodeId) {
+ this.nodeId = nodeId;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/message/DisconnectMessage.java
----------------------------------------------------------------------
diff --git a/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/message/DisconnectMessage.java b/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/message/DisconnectMessage.java
new file mode 100644
index 0000000..8aa7a40
--- /dev/null
+++ b/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/message/DisconnectMessage.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.cluster.protocol.message;
+
+import javax.xml.bind.annotation.XmlRootElement;
+import javax.xml.bind.annotation.adapters.XmlJavaTypeAdapter;
+import org.apache.nifi.cluster.protocol.NodeIdentifier;
+import org.apache.nifi.cluster.protocol.jaxb.message.NodeIdentifierAdapter;
+
+/**
+ * @author unattributed
+ */
+@XmlRootElement(name = "disconnectionMessage")
+public class DisconnectMessage extends ProtocolMessage {
+
+ private NodeIdentifier nodeId;
+ private String explanation;
+
+ @XmlJavaTypeAdapter(NodeIdentifierAdapter.class)
+ public NodeIdentifier getNodeId() {
+ return nodeId;
+ }
+
+ public void setNodeId(NodeIdentifier nodeId) {
+ this.nodeId = nodeId;
+ }
+
+ public String getExplanation() {
+ return explanation;
+ }
+
+ public void setExplanation(String explanation) {
+ this.explanation = explanation;
+ }
+
+ @Override
+ public MessageType getType() {
+ return MessageType.DISCONNECTION_REQUEST;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/message/ExceptionMessage.java
----------------------------------------------------------------------
diff --git a/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/message/ExceptionMessage.java b/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/message/ExceptionMessage.java
new file mode 100644
index 0000000..99a6dee
--- /dev/null
+++ b/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/message/ExceptionMessage.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.cluster.protocol.message;
+
+import javax.xml.bind.annotation.XmlRootElement;
+
+/**
+ * @author unattributed
+ */
+@XmlRootElement(name = "exceptionMessage")
+public class ExceptionMessage extends ProtocolMessage {
+
+ private String exceptionMessage;
+
+ public ExceptionMessage() {}
+
+ public String getExceptionMessage() {
+ return exceptionMessage;
+ }
+
+ public void setExceptionMessage(String exceptionMessage) {
+ this.exceptionMessage = exceptionMessage;
+ }
+
+ @Override
+ public MessageType getType() {
+ return MessageType.EXCEPTION;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/message/FlowRequestMessage.java
----------------------------------------------------------------------
diff --git a/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/message/FlowRequestMessage.java b/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/message/FlowRequestMessage.java
new file mode 100644
index 0000000..4a10538
--- /dev/null
+++ b/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/message/FlowRequestMessage.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.cluster.protocol.message;
+
+import javax.xml.bind.annotation.XmlRootElement;
+import javax.xml.bind.annotation.adapters.XmlJavaTypeAdapter;
+import org.apache.nifi.cluster.protocol.NodeIdentifier;
+import org.apache.nifi.cluster.protocol.jaxb.message.NodeIdentifierAdapter;
+
+/**
+ * @author unattributed
+ */
+@XmlRootElement(name = "flowRequestMessage")
+public class FlowRequestMessage extends ProtocolMessage {
+
+ private NodeIdentifier nodeId;
+
+ @XmlJavaTypeAdapter(NodeIdentifierAdapter.class)
+ public NodeIdentifier getNodeId() {
+ return nodeId;
+ }
+
+ public void setNodeId(NodeIdentifier nodeId) {
+ this.nodeId = nodeId;
+ }
+
+ @Override
+ public MessageType getType() {
+ return MessageType.FLOW_REQUEST;
+ }
+
+}