You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ma...@apache.org on 2015/01/22 18:04:10 UTC
[21/79] [abbrv] [partial] incubator-nifi git commit: NIFI-270 Made
all changes identified by adam, mark, joey to prep for a cleaner build
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/f6d9354b/nifi/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/ClusterServicesBroadcaster.java
----------------------------------------------------------------------
diff --git a/nifi/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/ClusterServicesBroadcaster.java b/nifi/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/ClusterServicesBroadcaster.java
deleted file mode 100644
index e9e7d5b..0000000
--- a/nifi/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/ClusterServicesBroadcaster.java
+++ /dev/null
@@ -1,182 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.cluster.protocol.impl;
-
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.net.DatagramPacket;
-import java.net.InetSocketAddress;
-import java.net.MulticastSocket;
-import java.util.*;
-import java.util.concurrent.CopyOnWriteArraySet;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.nifi.io.socket.multicast.DiscoverableService;
-import org.apache.nifi.io.socket.multicast.MulticastConfiguration;
-import org.apache.nifi.io.socket.multicast.MulticastServicesBroadcaster;
-import org.apache.nifi.io.socket.multicast.MulticastUtils;
-import org.apache.nifi.logging.NiFiLog;
-import org.apache.nifi.util.FormatUtils;
-import org.apache.nifi.cluster.protocol.ProtocolContext;
-import org.apache.nifi.cluster.protocol.ProtocolMessageMarshaller;
-import org.apache.nifi.cluster.protocol.message.ProtocolMessage;
-import org.apache.nifi.cluster.protocol.message.ServiceBroadcastMessage;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Broadcasts services used by the clustering software using multicast communication.
- * A configurable delay occurs after broadcasting the collection of services.
- *
- * The client caller is responsible for starting and stopping the broadcasting.
- * The instance must be stopped before termination of the JVM to ensure proper
- * resource clean-up.
- *
- * @author unattributed
- */
-public class ClusterServicesBroadcaster implements MulticastServicesBroadcaster {
-
- private static final Logger logger = new NiFiLog(LoggerFactory.getLogger(ClusterServicesBroadcaster.class));
-
- private final Set<DiscoverableService> services = new CopyOnWriteArraySet<>();
-
- private final InetSocketAddress multicastAddress;
-
- private final MulticastConfiguration multicastConfiguration;
-
- private final ProtocolContext<ProtocolMessage> protocolContext;
-
- private final int broadcastDelayMs;
-
- private Timer broadcaster;
-
- private MulticastSocket multicastSocket;
-
- public ClusterServicesBroadcaster(final InetSocketAddress multicastAddress,
- final MulticastConfiguration multicastConfiguration,
- final ProtocolContext<ProtocolMessage> protocolContext, final String broadcastDelay) {
-
- if(multicastAddress == null) {
- throw new IllegalArgumentException("Multicast address may not be null.");
- } else if(multicastAddress.getAddress().isMulticastAddress() == false) {
- throw new IllegalArgumentException("Multicast group address is not a Class D IP address.");
- } else if(protocolContext == null) {
- throw new IllegalArgumentException("Protocol Context may not be null.");
- } else if(multicastConfiguration == null) {
- throw new IllegalArgumentException("Multicast configuration may not be null.");
- }
-
- this.services.addAll(services);
- this.multicastAddress = multicastAddress;
- this.multicastConfiguration = multicastConfiguration;
- this.protocolContext = protocolContext;
- this.broadcastDelayMs = (int) FormatUtils.getTimeDuration(broadcastDelay, TimeUnit.MILLISECONDS);
- }
-
- public void start() throws IOException {
-
- if(isRunning()) {
- throw new IllegalStateException("Instance is already started.");
- }
-
- // setup socket
- multicastSocket = MulticastUtils.createMulticastSocket(multicastConfiguration);
-
- // setup broadcaster
- broadcaster = new Timer("Cluster Services Broadcaster", /* is daemon */ true);
- broadcaster.schedule(new TimerTask() {
- @Override
- public void run() {
- for(final DiscoverableService service : services) {
- try {
-
- final InetSocketAddress serviceAddress = service.getServiceAddress();
- logger.debug(String.format("Broadcasting Cluster Service '%s' at address %s:%d",
- service.getServiceName(), serviceAddress.getHostName(), serviceAddress.getPort()));
-
- // create message
- final ServiceBroadcastMessage msg = new ServiceBroadcastMessage();
- msg.setServiceName(service.getServiceName());
- msg.setAddress(serviceAddress.getHostName());
- msg.setPort(serviceAddress.getPort());
-
- // marshal message to output stream
- final ProtocolMessageMarshaller<ProtocolMessage> marshaller = protocolContext.createMarshaller();
- final ByteArrayOutputStream baos = new ByteArrayOutputStream();
- marshaller.marshal(msg, baos);
- final byte[] packetBytes = baos.toByteArray();
-
- // send message
- final DatagramPacket packet = new DatagramPacket(packetBytes, packetBytes.length, multicastAddress);
- multicastSocket.send(packet);
-
- } catch(final Exception ex) {
- logger.warn(String.format("Cluster Services Broadcaster failed broadcasting service '%s' due to: %s", service.getServiceName(), ex), ex);
- }
- }
- }
- }, 0, broadcastDelayMs);
- }
-
- public boolean isRunning() {
- return (broadcaster != null);
- }
-
- public void stop() {
-
- if(isRunning() == false) {
- throw new IllegalStateException("Instance is already stopped.");
- }
-
- broadcaster.cancel();
- broadcaster = null;
-
- // close socket
- MulticastUtils.closeQuietly(multicastSocket);
-
- }
-
- @Override
- public int getBroadcastDelayMs() {
- return broadcastDelayMs;
- }
-
- @Override
- public Set<DiscoverableService> getServices() {
- return Collections.unmodifiableSet(services);
- }
-
- @Override
- public InetSocketAddress getMulticastAddress() {
- return multicastAddress;
- }
-
- @Override
- public boolean addService(final DiscoverableService service) {
- return services.add(service);
- }
-
- @Override
- public boolean removeService(final String serviceName) {
- for(final DiscoverableService service : services) {
- if(service.getServiceName().equals(serviceName)) {
- return services.remove(service);
- }
- }
- return false;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/f6d9354b/nifi/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/CopyingInputStream.java
----------------------------------------------------------------------
diff --git a/nifi/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/CopyingInputStream.java b/nifi/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/CopyingInputStream.java
deleted file mode 100644
index 680df65..0000000
--- a/nifi/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/CopyingInputStream.java
+++ /dev/null
@@ -1,77 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.cluster.protocol.impl;
-
-import java.io.ByteArrayOutputStream;
-import java.io.FilterInputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-
-public class CopyingInputStream extends FilterInputStream {
- private final ByteArrayOutputStream baos = new ByteArrayOutputStream();
- private final int maxBytesToCopy;
- private final InputStream in;
-
- public CopyingInputStream(final InputStream in, final int maxBytesToCopy) {
- super(in);
- this.maxBytesToCopy = maxBytesToCopy;
- this.in = in;
- }
-
- @Override
- public int read() throws IOException {
- final int delegateRead = in.read();
- if ( delegateRead != -1 && getNumberOfBytesCopied() < maxBytesToCopy ) {
- baos.write(delegateRead);
- }
-
- return delegateRead;
- }
-
- @Override
- public int read(byte[] b) throws IOException {
- final int delegateRead = in.read(b);
- if ( delegateRead >= 0 ) {
- baos.write(b, 0, Math.min(delegateRead, maxBytesToCopy - getNumberOfBytesCopied()));
- }
-
- return delegateRead;
- }
-
- @Override
- public int read(byte[] b, int off, int len) throws IOException {
- final int delegateRead = in.read(b, off, len);
- if ( delegateRead >= 0 ) {
- baos.write(b, off, Math.min(delegateRead, maxBytesToCopy - getNumberOfBytesCopied()));
- }
-
- return delegateRead;
- }
-
- public byte[] getBytesRead() {
- return baos.toByteArray();
- }
-
- public void writeBytes(final OutputStream out) throws IOException {
- baos.writeTo(out);
- }
-
- public int getNumberOfBytesCopied() {
- return baos.size();
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/f6d9354b/nifi/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/MulticastProtocolListener.java
----------------------------------------------------------------------
diff --git a/nifi/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/MulticastProtocolListener.java b/nifi/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/MulticastProtocolListener.java
deleted file mode 100644
index d3764b3..0000000
--- a/nifi/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/MulticastProtocolListener.java
+++ /dev/null
@@ -1,204 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.cluster.protocol.impl;
-
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.net.DatagramPacket;
-import java.net.InetSocketAddress;
-import java.net.MulticastSocket;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.UUID;
-import java.util.concurrent.CopyOnWriteArrayList;
-import org.apache.nifi.io.socket.multicast.MulticastConfiguration;
-import org.apache.nifi.io.socket.multicast.MulticastListener;
-import org.apache.nifi.logging.NiFiLog;
-import org.apache.nifi.reporting.Bulletin;
-import org.apache.nifi.reporting.BulletinRepository;
-import org.apache.nifi.cluster.protocol.ProtocolContext;
-import org.apache.nifi.cluster.protocol.ProtocolException;
-import org.apache.nifi.cluster.protocol.ProtocolHandler;
-import org.apache.nifi.cluster.protocol.ProtocolListener;
-import org.apache.nifi.cluster.protocol.ProtocolMessageMarshaller;
-import org.apache.nifi.cluster.protocol.ProtocolMessageUnmarshaller;
-import org.apache.nifi.cluster.protocol.message.MulticastProtocolMessage;
-import org.apache.nifi.cluster.protocol.message.ProtocolMessage;
-import org.apache.nifi.events.BulletinFactory;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Implements a listener for protocol messages sent over multicast. If a message
- * is of type MulticastProtocolMessage, then the underlying protocol message is
- * passed to the handler. If the receiving handler produces a message response,
- * then the message is wrapped with a MulticastProtocolMessage before being
- * sent to the originator.
- *
- * The client caller is responsible for starting and stopping the listener.
- * The instance must be stopped before termination of the JVM to ensure proper
- * resource clean-up.
- *
- * @author unattributed
- */
-public class MulticastProtocolListener extends MulticastListener implements ProtocolListener {
-
- private static final Logger logger = new NiFiLog(LoggerFactory.getLogger(MulticastProtocolListener.class));
-
- // immutable members
- private final Collection<ProtocolHandler> handlers = new CopyOnWriteArrayList<>();
- private final String listenerId = UUID.randomUUID().toString();
- private final ProtocolContext<ProtocolMessage> protocolContext;
- private volatile BulletinRepository bulletinRepository;
-
- public MulticastProtocolListener(
- final int numThreads,
- final InetSocketAddress multicastAddress,
- final MulticastConfiguration configuration,
- final ProtocolContext<ProtocolMessage> protocolContext) {
-
- super(numThreads, multicastAddress, configuration);
-
- if (protocolContext == null) {
- throw new IllegalArgumentException("Protocol Context may not be null.");
- }
- this.protocolContext = protocolContext;
- }
-
- @Override
- public void setBulletinRepository(final BulletinRepository bulletinRepository) {
- this.bulletinRepository = bulletinRepository;
- }
-
- @Override
- public void start() throws IOException {
-
- if(super.isRunning()) {
- throw new IllegalStateException("Instance is already started.");
- }
-
- super.start();
-
- }
-
- @Override
- public void stop() throws IOException {
-
- if(super.isRunning() == false) {
- throw new IllegalStateException("Instance is already stopped.");
- }
-
- // shutdown listener
- super.stop();
-
- }
-
- @Override
- public Collection<ProtocolHandler> getHandlers() {
- return Collections.unmodifiableCollection(handlers);
- }
-
- @Override
- public void addHandler(final ProtocolHandler handler) {
- if(handler == null) {
- throw new NullPointerException("Protocol handler may not be null.");
- }
- handlers.add(handler);
- }
-
- @Override
- public boolean removeHandler(final ProtocolHandler handler) {
- return handlers.remove(handler);
- }
-
- @Override
- public void dispatchRequest(final MulticastSocket multicastSocket, final DatagramPacket packet) {
-
- try {
-
- // unmarshall message
- final ProtocolMessageUnmarshaller<ProtocolMessage> unmarshaller = protocolContext.createUnmarshaller();
- final ProtocolMessage request = unmarshaller.unmarshal(new ByteArrayInputStream(packet.getData(), 0, packet.getLength()));
-
- // unwrap multicast message, if necessary
- final ProtocolMessage unwrappedRequest;
- if(request instanceof MulticastProtocolMessage) {
- final MulticastProtocolMessage multicastRequest = (MulticastProtocolMessage) request;
- // don't process a message we sent
- if(listenerId.equals(multicastRequest.getId())) {
- return;
- } else {
- unwrappedRequest = multicastRequest.getProtocolMessage();
- }
- } else {
- unwrappedRequest = request;
- }
-
- // dispatch message to handler
- ProtocolHandler desiredHandler = null;
- for (final ProtocolHandler handler : getHandlers()) {
- if (handler.canHandle(unwrappedRequest)) {
- desiredHandler = handler;
- break;
- }
- }
-
- // if no handler found, throw exception; otherwise handle request
- if (desiredHandler == null) {
- throw new ProtocolException("No handler assigned to handle message type: " + request.getType());
- } else {
- final ProtocolMessage response = desiredHandler.handle(request);
- if(response != null) {
- try {
-
- // wrap with listener id
- final MulticastProtocolMessage multicastResponse = new MulticastProtocolMessage(listenerId, response);
-
- // marshal message
- final ByteArrayOutputStream baos = new ByteArrayOutputStream();
- final ProtocolMessageMarshaller<ProtocolMessage> marshaller = protocolContext.createMarshaller();
- marshaller.marshal(multicastResponse, baos);
- final byte[] responseBytes = baos.toByteArray();
-
- final int maxPacketSizeBytes = getMaxPacketSizeBytes();
- if(responseBytes.length > maxPacketSizeBytes) {
- logger.warn("Cluster protocol handler '" + desiredHandler.getClass() +
- "' produced a multicast response with length greater than configured max packet size '" + maxPacketSizeBytes + "'");
- }
-
- // create and send packet
- final DatagramPacket responseDatagram = new DatagramPacket(responseBytes, responseBytes.length, getMulticastAddress().getAddress(), getMulticastAddress().getPort());
- multicastSocket.send(responseDatagram);
-
- } catch (final IOException ioe) {
- throw new ProtocolException("Failed marshalling protocol message in response to message type: " + request.getType() + " due to: " + ioe, ioe);
- }
- }
- }
-
- } catch (final Throwable t) {
- logger.warn("Failed processing protocol message due to " + t, t);
-
- if ( bulletinRepository != null ) {
- final Bulletin bulletin = BulletinFactory.createBulletin("Clustering", "WARNING", "Failed to process Protocol Message due to " + t.toString());
- bulletinRepository.addBulletin(bulletin);
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/f6d9354b/nifi/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/NodeProtocolSenderImpl.java
----------------------------------------------------------------------
diff --git a/nifi/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/NodeProtocolSenderImpl.java b/nifi/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/NodeProtocolSenderImpl.java
deleted file mode 100644
index dc86d24..0000000
--- a/nifi/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/NodeProtocolSenderImpl.java
+++ /dev/null
@@ -1,171 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.cluster.protocol.impl;
-
-import java.io.IOException;
-import java.net.Socket;
-
-import javax.net.ssl.SSLSocket;
-import javax.security.cert.X509Certificate;
-
-import org.apache.nifi.cluster.protocol.NodeProtocolSender;
-import org.apache.nifi.cluster.protocol.ProtocolContext;
-import org.apache.nifi.cluster.protocol.ProtocolException;
-import org.apache.nifi.cluster.protocol.ProtocolMessageMarshaller;
-import org.apache.nifi.cluster.protocol.ProtocolMessageUnmarshaller;
-import org.apache.nifi.cluster.protocol.UnknownServiceAddressException;
-import org.apache.nifi.cluster.protocol.message.ConnectionRequestMessage;
-import org.apache.nifi.cluster.protocol.message.ConnectionResponseMessage;
-import org.apache.nifi.cluster.protocol.message.ControllerStartupFailureMessage;
-import org.apache.nifi.cluster.protocol.message.HeartbeatMessage;
-import org.apache.nifi.cluster.protocol.message.NodeBulletinsMessage;
-import org.apache.nifi.cluster.protocol.message.ProtocolMessage;
-import org.apache.nifi.cluster.protocol.message.ProtocolMessage.MessageType;
-import org.apache.nifi.cluster.protocol.message.ReconnectionFailureMessage;
-import org.apache.nifi.io.socket.SocketConfiguration;
-import org.apache.nifi.io.socket.SocketUtils;
-import org.apache.nifi.io.socket.multicast.DiscoverableService;
-
-public class NodeProtocolSenderImpl implements NodeProtocolSender {
- private final SocketConfiguration socketConfiguration;
- private final ClusterServiceLocator clusterManagerProtocolServiceLocator;
- private final ProtocolContext<ProtocolMessage> protocolContext;
-
- public NodeProtocolSenderImpl(final ClusterServiceLocator clusterManagerProtocolServiceLocator,
- final SocketConfiguration socketConfiguration, final ProtocolContext<ProtocolMessage> protocolContext) {
- if(clusterManagerProtocolServiceLocator == null) {
- throw new IllegalArgumentException("Protocol Service Locator may not be null.");
- } else if(socketConfiguration == null) {
- throw new IllegalArgumentException("Socket configuration may not be null.");
- } else if(protocolContext == null) {
- throw new IllegalArgumentException("Protocol Context may not be null.");
- }
-
- this.clusterManagerProtocolServiceLocator = clusterManagerProtocolServiceLocator;
- this.socketConfiguration = socketConfiguration;
- this.protocolContext = protocolContext;
- }
-
-
- @Override
- public ConnectionResponseMessage requestConnection(final ConnectionRequestMessage msg) throws ProtocolException, UnknownServiceAddressException {
- Socket socket = null;
- try {
- socket = createSocket();
-
- String ncmDn = null;
- if ( socket instanceof SSLSocket ) {
- final SSLSocket sslSocket = (SSLSocket) socket;
- try {
- final X509Certificate[] certChains = sslSocket.getSession().getPeerCertificateChain();
- if ( certChains != null && certChains.length > 0 ) {
- ncmDn = certChains[0].getSubjectDN().getName();
- }
- } catch (final ProtocolException pe) {
- throw pe;
- } catch (final Exception e) {
- throw new ProtocolException(e);
- }
- }
-
- try {
- // marshal message to output stream
- final ProtocolMessageMarshaller<ProtocolMessage> marshaller = protocolContext.createMarshaller();
- marshaller.marshal(msg, socket.getOutputStream());
- } catch(final IOException ioe) {
- throw new ProtocolException("Failed marshalling '" + msg.getType() + "' protocol message due to: " + ioe, ioe);
- }
-
- final ProtocolMessage response;
- try {
- // unmarshall response and return
- final ProtocolMessageUnmarshaller<ProtocolMessage> unmarshaller = protocolContext.createUnmarshaller();
- response = unmarshaller.unmarshal(socket.getInputStream());
- } catch(final IOException ioe) {
- throw new ProtocolException("Failed unmarshalling '" + MessageType.CONNECTION_RESPONSE + "' protocol message due to: " + ioe, ioe);
- }
-
- if(MessageType.CONNECTION_RESPONSE == response.getType()) {
- final ConnectionResponseMessage connectionResponse = (ConnectionResponseMessage) response;
- connectionResponse.setClusterManagerDN(ncmDn);
- return connectionResponse;
- } else {
- throw new ProtocolException("Expected message type '" + MessageType.CONNECTION_RESPONSE + "' but found '" + response.getType() + "'");
- }
- } finally {
- SocketUtils.closeQuietly(socket);
- }
- }
-
-
- @Override
- public void heartbeat(final HeartbeatMessage msg) throws ProtocolException, UnknownServiceAddressException {
- sendProtocolMessage(msg);
- }
-
- @Override
- public void sendBulletins(NodeBulletinsMessage msg) throws ProtocolException, UnknownServiceAddressException {
- sendProtocolMessage(msg);
- }
-
- @Override
- public void notifyControllerStartupFailure(final ControllerStartupFailureMessage msg) throws ProtocolException, UnknownServiceAddressException {
- sendProtocolMessage(msg);
- }
-
- @Override
- public void notifyReconnectionFailure(ReconnectionFailureMessage msg) throws ProtocolException, UnknownServiceAddressException {
- sendProtocolMessage(msg);
- }
-
- private Socket createSocket() {
- // determine the cluster manager's address
- final DiscoverableService service = clusterManagerProtocolServiceLocator.getService();
- if(service == null) {
- throw new UnknownServiceAddressException("Cluster Manager's service is not known. Verify a cluster manager is running.");
- }
-
- try {
- // create a socket
- return SocketUtils.createSocket(service.getServiceAddress(), socketConfiguration);
- } catch(final IOException ioe) {
- throw new ProtocolException("Failed to create socket due to: " + ioe, ioe);
- }
- }
-
- private void sendProtocolMessage(final ProtocolMessage msg) {
- Socket socket = null;
- try {
- socket = createSocket();
-
- try {
- // marshal message to output stream
- final ProtocolMessageMarshaller<ProtocolMessage> marshaller = protocolContext.createMarshaller();
- marshaller.marshal(msg, socket.getOutputStream());
- } catch(final IOException ioe) {
- throw new ProtocolException("Failed marshalling '" + msg.getType() + "' protocol message due to: " + ioe, ioe);
- }
- } finally {
- SocketUtils.closeQuietly(socket);
- }
- }
-
- public SocketConfiguration getSocketConfiguration() {
- return socketConfiguration;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/f6d9354b/nifi/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/NodeProtocolSenderListener.java
----------------------------------------------------------------------
diff --git a/nifi/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/NodeProtocolSenderListener.java b/nifi/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/NodeProtocolSenderListener.java
deleted file mode 100644
index 4b359f4..0000000
--- a/nifi/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/NodeProtocolSenderListener.java
+++ /dev/null
@@ -1,115 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.cluster.protocol.impl;
-
-import java.io.IOException;
-import java.util.Collection;
-
-import org.apache.nifi.cluster.protocol.NodeProtocolSender;
-import org.apache.nifi.cluster.protocol.ProtocolException;
-import org.apache.nifi.cluster.protocol.ProtocolHandler;
-import org.apache.nifi.cluster.protocol.ProtocolListener;
-import org.apache.nifi.cluster.protocol.UnknownServiceAddressException;
-import org.apache.nifi.cluster.protocol.message.NodeBulletinsMessage;
-import org.apache.nifi.cluster.protocol.message.ConnectionRequestMessage;
-import org.apache.nifi.cluster.protocol.message.ConnectionResponseMessage;
-import org.apache.nifi.cluster.protocol.message.ControllerStartupFailureMessage;
-import org.apache.nifi.cluster.protocol.message.HeartbeatMessage;
-import org.apache.nifi.cluster.protocol.message.ReconnectionFailureMessage;
-import org.apache.nifi.reporting.BulletinRepository;
-
-public class NodeProtocolSenderListener implements NodeProtocolSender, ProtocolListener {
-
- private final NodeProtocolSender sender;
- private final ProtocolListener listener;
-
- public NodeProtocolSenderListener(final NodeProtocolSender sender, final ProtocolListener listener) {
- if(sender == null) {
- throw new IllegalArgumentException("NodeProtocolSender may not be null.");
- } else if(listener == null) {
- throw new IllegalArgumentException("ProtocolListener may not be null.");
- }
- this.sender = sender;
- this.listener = listener;
- }
-
- @Override
- public void stop() throws IOException {
- if(!isRunning()) {
- throw new IllegalStateException("Instance is already stopped.");
- }
- listener.stop();
- }
-
- @Override
- public void start() throws IOException {
- if(isRunning()) {
- throw new IllegalStateException("Instance is already started.");
- }
- listener.start();
- }
-
- @Override
- public boolean isRunning() {
- return listener.isRunning();
- }
-
- @Override
- public boolean removeHandler(final ProtocolHandler handler) {
- return listener.removeHandler(handler);
- }
-
- @Override
- public Collection<ProtocolHandler> getHandlers() {
- return listener.getHandlers();
- }
-
- @Override
- public void addHandler(final ProtocolHandler handler) {
- listener.addHandler(handler);
- }
-
- @Override
- public void heartbeat(final HeartbeatMessage msg) throws ProtocolException, UnknownServiceAddressException {
- sender.heartbeat(msg);
- }
-
- @Override
- public ConnectionResponseMessage requestConnection(final ConnectionRequestMessage msg) throws ProtocolException, UnknownServiceAddressException {
- return sender.requestConnection(msg);
- }
-
- @Override
- public void notifyControllerStartupFailure(final ControllerStartupFailureMessage msg) throws ProtocolException, UnknownServiceAddressException {
- sender.notifyControllerStartupFailure(msg);
- }
-
- @Override
- public void notifyReconnectionFailure(final ReconnectionFailureMessage msg) throws ProtocolException, UnknownServiceAddressException {
- sender.notifyReconnectionFailure(msg);
- }
-
- @Override
- public void sendBulletins(NodeBulletinsMessage msg) throws ProtocolException, UnknownServiceAddressException {
- sender.sendBulletins(msg);
- }
-
- @Override
- public void setBulletinRepository(final BulletinRepository bulletinRepository) {
- listener.setBulletinRepository(bulletinRepository);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/f6d9354b/nifi/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/SocketProtocolListener.java
----------------------------------------------------------------------
diff --git a/nifi/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/SocketProtocolListener.java b/nifi/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/SocketProtocolListener.java
deleted file mode 100644
index ca30d9b..0000000
--- a/nifi/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/SocketProtocolListener.java
+++ /dev/null
@@ -1,205 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.cluster.protocol.impl;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.net.Socket;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.UUID;
-import java.util.concurrent.CopyOnWriteArrayList;
-import java.util.concurrent.TimeUnit;
-
-import javax.net.ssl.SSLSocket;
-import javax.security.cert.X509Certificate;
-
-import org.apache.nifi.cluster.protocol.ProtocolContext;
-import org.apache.nifi.cluster.protocol.ProtocolException;
-import org.apache.nifi.cluster.protocol.ProtocolHandler;
-import org.apache.nifi.cluster.protocol.ProtocolListener;
-import org.apache.nifi.cluster.protocol.ProtocolMessageMarshaller;
-import org.apache.nifi.cluster.protocol.ProtocolMessageUnmarshaller;
-import org.apache.nifi.cluster.protocol.message.ProtocolMessage;
-import org.apache.nifi.events.BulletinFactory;
-import org.apache.nifi.io.socket.ServerSocketConfiguration;
-import org.apache.nifi.io.socket.SocketListener;
-import org.apache.nifi.logging.NiFiLog;
-import org.apache.nifi.reporting.Bulletin;
-import org.apache.nifi.reporting.BulletinRepository;
-import org.apache.nifi.util.StopWatch;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Implements a listener for protocol messages sent over unicast socket.
- *
- * @author unattributed
- */
-public class SocketProtocolListener extends SocketListener implements ProtocolListener {
-
- private static final Logger logger = new NiFiLog(LoggerFactory.getLogger(SocketProtocolListener.class));
- private final ProtocolContext<ProtocolMessage> protocolContext;
- private final Collection<ProtocolHandler> handlers = new CopyOnWriteArrayList<>();
- private volatile BulletinRepository bulletinRepository;
-
- public SocketProtocolListener(
- final int numThreads,
- final int port,
- final ServerSocketConfiguration configuration,
- final ProtocolContext<ProtocolMessage> protocolContext) {
-
- super(numThreads, port, configuration);
-
- if(protocolContext == null) {
- throw new IllegalArgumentException("Protocol Context may not be null.");
- }
-
- this.protocolContext = protocolContext;
- }
-
- @Override
- public void setBulletinRepository(final BulletinRepository bulletinRepository) {
- this.bulletinRepository = bulletinRepository;
- }
-
- @Override
- public void start() throws IOException {
-
- if(super.isRunning()) {
- throw new IllegalStateException("Instance is already started.");
- }
-
- super.start();
- }
-
- @Override
- public void stop() throws IOException {
-
- if(super.isRunning() == false) {
- throw new IOException("Instance is already stopped.");
- }
-
- super.stop();
-
- }
-
- @Override
- public Collection<ProtocolHandler> getHandlers() {
- return Collections.unmodifiableCollection(handlers);
- }
-
- @Override
- public void addHandler(final ProtocolHandler handler) {
- if(handler == null) {
- throw new NullPointerException("Protocol handler may not be null.");
- }
- handlers.add(handler);
- }
-
- @Override
- public boolean removeHandler(final ProtocolHandler handler) {
- return handlers.remove(handler);
- }
-
- @Override
- public void dispatchRequest(final Socket socket) {
- byte[] receivedMessage = null;
- String hostname = null;
- final int maxMsgBuffer = 1024 * 1024; // don't buffer more than 1 MB of the message
- try {
- final StopWatch stopWatch = new StopWatch(true);
- hostname = socket.getInetAddress().getHostName();
- final String requestId = UUID.randomUUID().toString();
- logger.info("Received request {} from {}", requestId, hostname);
-
- String requestorDn = null;
- if ( socket instanceof SSLSocket ) {
- final SSLSocket sslSocket = (SSLSocket) socket;
- try {
- final X509Certificate[] certChains = sslSocket.getSession().getPeerCertificateChain();
- if ( certChains != null && certChains.length > 0 ) {
- requestorDn = certChains[0].getSubjectDN().getName();
- }
- } catch (final ProtocolException pe) {
- throw pe;
- } catch (final Exception e) {
- throw new ProtocolException(e);
- }
- }
-
- // unmarshall message
- final ProtocolMessageUnmarshaller<ProtocolMessage> unmarshaller = protocolContext.createUnmarshaller();
- final InputStream inStream = socket.getInputStream();
- final CopyingInputStream copyingInputStream = new CopyingInputStream(inStream, maxMsgBuffer); // don't copy more than 1 MB
- logger.debug("Request {} has a message length of {}", requestId, copyingInputStream.getNumberOfBytesCopied());
-
- final ProtocolMessage request;
- try {
- request = unmarshaller.unmarshal(copyingInputStream);
- } finally {
- receivedMessage = copyingInputStream.getBytesRead();
- }
-
- request.setRequestorDN(requestorDn);
-
- // dispatch message to handler
- ProtocolHandler desiredHandler = null;
- for (final ProtocolHandler handler : getHandlers()) {
- if (handler.canHandle(request)) {
- desiredHandler = handler;
- break;
- }
- }
-
- // if no handler found, throw exception; otherwise handle request
- if (desiredHandler == null) {
- throw new ProtocolException("No handler assigned to handle message type: " + request.getType());
- } else {
- final ProtocolMessage response = desiredHandler.handle(request);
- if(response != null) {
- try {
- logger.debug("Sending response for request {}", requestId);
-
- // marshal message to output stream
- final ProtocolMessageMarshaller<ProtocolMessage> marshaller = protocolContext.createMarshaller();
- marshaller.marshal(response, socket.getOutputStream());
- } catch (final IOException ioe) {
- throw new ProtocolException("Failed marshalling protocol message in response to message type: " + request.getType() + " due to " + ioe, ioe);
- }
- }
- }
-
- stopWatch.stop();
- logger.info("Finished processing request {} (type={}, length={} bytes) in {} millis", requestId, request.getType(), receivedMessage.length, stopWatch.getDuration(TimeUnit.MILLISECONDS));
- } catch (final IOException e) {
- logger.warn("Failed processing protocol message from " + hostname + " due to " + e, e);
-
- if ( bulletinRepository != null ) {
- final Bulletin bulletin = BulletinFactory.createBulletin("Clustering", "WARNING", String.format("Failed to process protocol message from %s due to: %s", hostname, e.toString()));
- bulletinRepository.addBulletin(bulletin);
- }
- } catch (final ProtocolException e) {
- logger.warn("Failed processing protocol message from " + hostname + " due to " + e, e);
- if ( bulletinRepository != null ) {
- final Bulletin bulletin = BulletinFactory.createBulletin("Clustering", "WARNING", String.format("Failed to process protocol message from %s due to: %s", hostname, e.toString()));
- bulletinRepository.addBulletin(bulletin);
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/f6d9354b/nifi/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/JaxbProtocolContext.java
----------------------------------------------------------------------
diff --git a/nifi/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/JaxbProtocolContext.java b/nifi/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/JaxbProtocolContext.java
deleted file mode 100644
index bc68630..0000000
--- a/nifi/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/JaxbProtocolContext.java
+++ /dev/null
@@ -1,148 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.cluster.protocol.jaxb;
-
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.io.EOFException;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.nio.ByteBuffer;
-
-import javax.xml.bind.JAXBContext;
-import javax.xml.bind.JAXBException;
-import javax.xml.bind.Marshaller;
-import javax.xml.bind.Unmarshaller;
-
-import org.apache.nifi.cluster.protocol.ProtocolContext;
-import org.apache.nifi.cluster.protocol.ProtocolMessageMarshaller;
-import org.apache.nifi.cluster.protocol.ProtocolMessageUnmarshaller;
-
-/**
- * Implements a context for communicating internally amongst the cluster using
- * JAXB.
- *
- * @param <T> The type of protocol message.
- *
- * @author unattributed
- */
-public class JaxbProtocolContext<T> implements ProtocolContext {
-
- private static final int BUF_SIZE = (int) Math.pow(2, 10); // 1k
-
- /*
- * A sentinel is used to detect corrupted messages. Relying on the integrity
- * of the message size can cause memory issues if the value is corrupted
- * and equal to a number larger than the memory size.
- */
- private static final byte MESSAGE_PROTOCOL_START_SENTINEL = 0x5A;
-
- private final JAXBContext jaxbCtx;
-
- public JaxbProtocolContext(final JAXBContext jaxbCtx) {
- this.jaxbCtx = jaxbCtx;
- }
-
- @Override
- public ProtocolMessageMarshaller<T> createMarshaller() {
- return new ProtocolMessageMarshaller<T>() {
-
- @Override
- public void marshal(final T msg, final OutputStream os) throws IOException {
-
- try {
-
- // marshal message to output stream
- final Marshaller marshaller = jaxbCtx.createMarshaller();
- final ByteArrayOutputStream msgBytes = new ByteArrayOutputStream();
- marshaller.marshal(msg, msgBytes);
-
- final DataOutputStream dos = new DataOutputStream(os);
-
- // write message protocol sentinel
- dos.write(MESSAGE_PROTOCOL_START_SENTINEL);
-
- // write message size in bytes
- dos.writeInt(msgBytes.size());
-
- // write message
- dos.write(msgBytes.toByteArray());
-
- dos.flush();
-
- } catch (final JAXBException je) {
- throw new IOException("Failed marshalling protocol message due to: " + je, je);
- }
-
- }
- };
- }
-
- @Override
- public ProtocolMessageUnmarshaller<T> createUnmarshaller() {
- return new ProtocolMessageUnmarshaller<T>() {
-
- @Override
- public T unmarshal(final InputStream is) throws IOException {
-
- try {
-
- final DataInputStream dis = new DataInputStream(is);
-
- // check for the presence of the message protocol sentinel
- final byte sentinel = (byte) dis.read();
- if ( sentinel == -1 ) {
- throw new EOFException();
- }
-
- if(MESSAGE_PROTOCOL_START_SENTINEL != sentinel) {
- throw new IOException("Failed reading protocol message due to malformed header");
- }
-
- // read the message size
- final int msgBytesSize = dis.readInt();
-
- // read the message
- final ByteBuffer buffer = ByteBuffer.allocate(msgBytesSize);
- int totalBytesRead = 0;
- do {
- final int bytesToRead;
- if ((msgBytesSize - totalBytesRead) >= BUF_SIZE) {
- bytesToRead = BUF_SIZE;
- } else {
- bytesToRead = msgBytesSize - totalBytesRead;
- }
- totalBytesRead += dis.read(buffer.array(), totalBytesRead, bytesToRead);
- } while (totalBytesRead < msgBytesSize);
-
- // unmarshall message and return
- final Unmarshaller unmarshaller = jaxbCtx.createUnmarshaller();
- final byte[] msg = new byte[totalBytesRead];
- buffer.get(msg);
- return (T) unmarshaller.unmarshal(new ByteArrayInputStream(msg));
-
- } catch (final JAXBException je) {
- throw new IOException("Failed unmarshalling protocol message due to: " + je, je);
- }
-
- }
- };
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/f6d9354b/nifi/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/AdaptedConnectionRequest.java
----------------------------------------------------------------------
diff --git a/nifi/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/AdaptedConnectionRequest.java b/nifi/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/AdaptedConnectionRequest.java
deleted file mode 100644
index d9de24f..0000000
--- a/nifi/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/AdaptedConnectionRequest.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.cluster.protocol.jaxb.message;
-
-import javax.xml.bind.annotation.adapters.XmlJavaTypeAdapter;
-import org.apache.nifi.cluster.protocol.NodeIdentifier;
-
-/**
- * @author unattributed
- */
-public class AdaptedConnectionRequest {
-
- private NodeIdentifier nodeIdentifier;
-
- public AdaptedConnectionRequest() {}
-
- @XmlJavaTypeAdapter(NodeIdentifierAdapter.class)
- public NodeIdentifier getNodeIdentifier() {
- return nodeIdentifier;
- }
-
- public void setNodeIdentifier(final NodeIdentifier nodeIdentifier) {
- this.nodeIdentifier = nodeIdentifier;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/f6d9354b/nifi/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/AdaptedConnectionResponse.java
----------------------------------------------------------------------
diff --git a/nifi/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/AdaptedConnectionResponse.java b/nifi/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/AdaptedConnectionResponse.java
deleted file mode 100644
index c7c783b..0000000
--- a/nifi/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/AdaptedConnectionResponse.java
+++ /dev/null
@@ -1,109 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.cluster.protocol.jaxb.message;
-
-import javax.xml.bind.annotation.adapters.XmlJavaTypeAdapter;
-
-import org.apache.nifi.cluster.protocol.NodeIdentifier;
-import org.apache.nifi.cluster.protocol.StandardDataFlow;
-
-/**
- * @author unattributed
- */
-public class AdaptedConnectionResponse {
-
- private StandardDataFlow dataFlow;
- private NodeIdentifier nodeIdentifier;
- private boolean blockedByFirewall;
- private boolean primary;
- private int tryLaterSeconds;
- private Integer managerRemoteInputPort;
- private Boolean managerRemoteCommsSecure;
- private String instanceId;
-
- public AdaptedConnectionResponse() {}
-
- @XmlJavaTypeAdapter(DataFlowAdapter.class)
- public StandardDataFlow getDataFlow() {
- return dataFlow;
- }
-
- public void setDataFlow(StandardDataFlow dataFlow) {
- this.dataFlow = dataFlow;
- }
-
- @XmlJavaTypeAdapter(NodeIdentifierAdapter.class)
- public NodeIdentifier getNodeIdentifier() {
- return nodeIdentifier;
- }
-
- public void setNodeIdentifier(NodeIdentifier nodeIdentifier) {
- this.nodeIdentifier = nodeIdentifier;
- }
-
- public int getTryLaterSeconds() {
- return tryLaterSeconds;
- }
-
- public void setTryLaterSeconds(int tryLaterSeconds) {
- this.tryLaterSeconds = tryLaterSeconds;
- }
-
- public boolean isBlockedByFirewall() {
- return blockedByFirewall;
- }
-
- public void setBlockedByFirewall(boolean blockedByFirewall) {
- this.blockedByFirewall = blockedByFirewall;
- }
-
- public boolean isPrimary() {
- return primary;
- }
-
- public void setPrimary(boolean primary) {
- this.primary = primary;
- }
-
- public boolean shouldTryLater() {
- return tryLaterSeconds > 0;
- }
-
- public void setManagerRemoteInputPort(Integer managerRemoteInputPort) {
- this.managerRemoteInputPort = managerRemoteInputPort;
- }
-
- public Integer getManagerRemoteInputPort() {
- return managerRemoteInputPort;
- }
-
- public void setManagerRemoteCommsSecure(Boolean secure) {
- this.managerRemoteCommsSecure = secure;
- }
-
- public Boolean isManagerRemoteCommsSecure() {
- return managerRemoteCommsSecure;
- }
-
- public void setInstanceId(String instanceId) {
- this.instanceId = instanceId;
- }
-
- public String getInstanceId() {
- return instanceId;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/f6d9354b/nifi/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/AdaptedCounter.java
----------------------------------------------------------------------
diff --git a/nifi/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/AdaptedCounter.java b/nifi/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/AdaptedCounter.java
deleted file mode 100644
index 89d903b..0000000
--- a/nifi/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/AdaptedCounter.java
+++ /dev/null
@@ -1,56 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.cluster.protocol.jaxb.message;
-
-/**
- * @author unattributed
- */
-public class AdaptedCounter {
-
- private String groupName;
-
- private String name;
-
- private long value;
-
- public AdaptedCounter() {}
-
- public String getGroupName() {
- return groupName;
- }
-
- public void setGroupName(String counterGroupName) {
- this.groupName = counterGroupName;
- }
-
- public String getName() {
- return name;
- }
-
- public void setName(String counterName) {
- this.name = counterName;
- }
-
- public long getValue() {
- return value;
- }
-
- public void setValue(long value) {
- this.value = value;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/f6d9354b/nifi/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/AdaptedDataFlow.java
----------------------------------------------------------------------
diff --git a/nifi/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/AdaptedDataFlow.java b/nifi/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/AdaptedDataFlow.java
deleted file mode 100644
index bb97619..0000000
--- a/nifi/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/AdaptedDataFlow.java
+++ /dev/null
@@ -1,64 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.cluster.protocol.jaxb.message;
-
-/**
- * @author unattributed
- */
-public class AdaptedDataFlow {
-
- private byte[] flow;
- private byte[] templates;
- private byte[] snippets;
-
- private boolean autoStartProcessors;
-
- public AdaptedDataFlow() {}
-
- public byte[] getFlow() {
- return flow;
- }
-
- public void setFlow(byte[] flow) {
- this.flow = flow;
- }
-
- public byte[] getTemplates() {
- return templates;
- }
-
- public void setTemplates(byte[] templates) {
- this.templates = templates;
- }
-
- public byte[] getSnippets() {
- return snippets;
- }
-
- public void setSnippets(byte[] snippets) {
- this.snippets = snippets;
- }
-
- public boolean isAutoStartProcessors() {
- return autoStartProcessors;
- }
-
- public void setAutoStartProcessors(boolean runningAllProcessors) {
- this.autoStartProcessors = runningAllProcessors;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/f6d9354b/nifi/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/AdaptedHeartbeat.java
----------------------------------------------------------------------
diff --git a/nifi/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/AdaptedHeartbeat.java b/nifi/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/AdaptedHeartbeat.java
deleted file mode 100644
index 5b9d9b7..0000000
--- a/nifi/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/AdaptedHeartbeat.java
+++ /dev/null
@@ -1,66 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.cluster.protocol.jaxb.message;
-
-import javax.xml.bind.annotation.adapters.XmlJavaTypeAdapter;
-import org.apache.nifi.cluster.protocol.NodeIdentifier;
-
-/**
- * @author unattributed
- */
-public class AdaptedHeartbeat {
-
- private NodeIdentifier nodeIdentifier;
- private byte[] payload;
- private boolean primary;
- private boolean connected;
-
- public AdaptedHeartbeat() {}
-
- @XmlJavaTypeAdapter(NodeIdentifierAdapter.class)
- public NodeIdentifier getNodeIdentifier() {
- return nodeIdentifier;
- }
-
- public void setNodeIdentifier(NodeIdentifier nodeIdentifier) {
- this.nodeIdentifier = nodeIdentifier;
- }
-
- public boolean isPrimary() {
- return primary;
- }
-
- public void setPrimary(boolean primary) {
- this.primary = primary;
- }
-
- public boolean isConnected() {
- return connected;
- }
-
- public void setConnected(boolean connected) {
- this.connected = connected;
- }
-
- public byte[] getPayload() {
- return payload;
- }
-
- public void setPayload(byte[] payload) {
- this.payload = payload;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/f6d9354b/nifi/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/AdaptedNodeBulletins.java
----------------------------------------------------------------------
diff --git a/nifi/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/AdaptedNodeBulletins.java b/nifi/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/AdaptedNodeBulletins.java
deleted file mode 100644
index 98e2438..0000000
--- a/nifi/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/AdaptedNodeBulletins.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.cluster.protocol.jaxb.message;
-
-import javax.xml.bind.annotation.adapters.XmlJavaTypeAdapter;
-import org.apache.nifi.cluster.protocol.NodeIdentifier;
-
-/**
- * @author unattributed
- */
-public class AdaptedNodeBulletins {
-
- private NodeIdentifier nodeIdentifier;
-
- private byte[] payload;
-
- public AdaptedNodeBulletins() {}
-
- @XmlJavaTypeAdapter(NodeIdentifierAdapter.class)
- public NodeIdentifier getNodeIdentifier() {
- return nodeIdentifier;
- }
-
- public void setNodeIdentifier(NodeIdentifier nodeIdentifier) {
- this.nodeIdentifier = nodeIdentifier;
- }
-
- public byte[] getPayload() {
- return payload;
- }
-
- public void setPayload(byte[] payload) {
- this.payload = payload;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/f6d9354b/nifi/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/AdaptedNodeIdentifier.java
----------------------------------------------------------------------
diff --git a/nifi/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/AdaptedNodeIdentifier.java b/nifi/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/AdaptedNodeIdentifier.java
deleted file mode 100644
index 8134ea3..0000000
--- a/nifi/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/AdaptedNodeIdentifier.java
+++ /dev/null
@@ -1,76 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.cluster.protocol.jaxb.message;
-
-/**
- * @author unattributed
- */
-public class AdaptedNodeIdentifier {
-
- private String id;
-
- private String apiAddress;
-
- private int apiPort;
-
- private String socketAddress;
-
- private int socketPort;
-
- public AdaptedNodeIdentifier() {}
-
- public String getApiAddress() {
- return apiAddress;
- }
-
- public void setApiAddress(String apiAddress) {
- this.apiAddress = apiAddress;
- }
-
- public int getApiPort() {
- return apiPort;
- }
-
- public void setApiPort(int apiPort) {
- this.apiPort = apiPort;
- }
-
- public String getId() {
- return id;
- }
-
- public void setId(String id) {
- this.id = id;
- }
-
- public String getSocketAddress() {
- return socketAddress;
- }
-
- public void setSocketAddress(String socketAddress) {
- this.socketAddress = socketAddress;
- }
-
- public int getSocketPort() {
- return socketPort;
- }
-
- public void setSocketPort(int socketPort) {
- this.socketPort = socketPort;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/f6d9354b/nifi/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/ConnectionRequestAdapter.java
----------------------------------------------------------------------
diff --git a/nifi/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/ConnectionRequestAdapter.java b/nifi/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/ConnectionRequestAdapter.java
deleted file mode 100644
index 1f91cf1..0000000
--- a/nifi/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/ConnectionRequestAdapter.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.cluster.protocol.jaxb.message;
-
-import javax.xml.bind.annotation.adapters.XmlAdapter;
-import org.apache.nifi.cluster.protocol.ConnectionRequest;
-
-/**
- * @author unattributed
- */
-public class ConnectionRequestAdapter extends XmlAdapter<AdaptedConnectionRequest, ConnectionRequest> {
-
- @Override
- public AdaptedConnectionRequest marshal(final ConnectionRequest cr) {
- final AdaptedConnectionRequest aCr = new AdaptedConnectionRequest();
- if(cr != null) {
- aCr.setNodeIdentifier(cr.getProposedNodeIdentifier());
- }
- return aCr;
- }
-
- @Override
- public ConnectionRequest unmarshal(final AdaptedConnectionRequest aCr) {
- return new ConnectionRequest(aCr.getNodeIdentifier());
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/f6d9354b/nifi/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/ConnectionResponseAdapter.java
----------------------------------------------------------------------
diff --git a/nifi/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/ConnectionResponseAdapter.java b/nifi/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/ConnectionResponseAdapter.java
deleted file mode 100644
index 143bab0..0000000
--- a/nifi/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/ConnectionResponseAdapter.java
+++ /dev/null
@@ -1,55 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.cluster.protocol.jaxb.message;
-
-import javax.xml.bind.annotation.adapters.XmlAdapter;
-import org.apache.nifi.cluster.protocol.ConnectionResponse;
-
-/**
- * @author unattributed
- */
-public class ConnectionResponseAdapter extends XmlAdapter<AdaptedConnectionResponse, ConnectionResponse> {
-
- @Override
- public AdaptedConnectionResponse marshal(final ConnectionResponse cr) {
- final AdaptedConnectionResponse aCr = new AdaptedConnectionResponse();
- if(cr != null) {
- aCr.setDataFlow(cr.getDataFlow());
- aCr.setNodeIdentifier(cr.getNodeIdentifier());
- aCr.setTryLaterSeconds(cr.getTryLaterSeconds());
- aCr.setBlockedByFirewall(cr.isBlockedByFirewall());
- aCr.setPrimary(cr.isPrimary());
- aCr.setManagerRemoteInputPort(cr.getManagerRemoteInputPort());
- aCr.setManagerRemoteCommsSecure(cr.isManagerRemoteCommsSecure());
- aCr.setInstanceId(cr.getInstanceId());
- }
- return aCr;
- }
-
- @Override
- public ConnectionResponse unmarshal(final AdaptedConnectionResponse aCr) {
- if(aCr.shouldTryLater()) {
- return new ConnectionResponse(aCr.getTryLaterSeconds());
- } else if(aCr.isBlockedByFirewall()) {
- return ConnectionResponse.createBlockedByFirewallResponse();
- } else {
- return new ConnectionResponse(aCr.getNodeIdentifier(), aCr.getDataFlow(), aCr.isPrimary(),
- aCr.getManagerRemoteInputPort(), aCr.isManagerRemoteCommsSecure(), aCr.getInstanceId());
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/f6d9354b/nifi/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/DataFlowAdapter.java
----------------------------------------------------------------------
diff --git a/nifi/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/DataFlowAdapter.java b/nifi/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/DataFlowAdapter.java
deleted file mode 100644
index 8d9467f..0000000
--- a/nifi/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/DataFlowAdapter.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.cluster.protocol.jaxb.message;
-
-import javax.xml.bind.annotation.adapters.XmlAdapter;
-
-import org.apache.nifi.cluster.protocol.StandardDataFlow;
-
-/**
- * @author unattributed
- */
-public class DataFlowAdapter extends XmlAdapter<AdaptedDataFlow, StandardDataFlow> {
-
- @Override
- public AdaptedDataFlow marshal(final StandardDataFlow df) {
-
- final AdaptedDataFlow aDf = new AdaptedDataFlow();
-
- if(df != null) {
- aDf.setFlow(df.getFlow());
- aDf.setTemplates(df.getTemplates());
- aDf.setSnippets(df.getSnippets());
- aDf.setAutoStartProcessors(df.isAutoStartProcessors());
- }
-
- return aDf;
- }
-
- @Override
- public StandardDataFlow unmarshal(final AdaptedDataFlow aDf) {
- final StandardDataFlow dataFlow = new StandardDataFlow(aDf.getFlow(), aDf.getTemplates(), aDf.getSnippets());
- dataFlow.setAutoStartProcessors(aDf.isAutoStartProcessors());
- return dataFlow;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/f6d9354b/nifi/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/HeartbeatAdapter.java
----------------------------------------------------------------------
diff --git a/nifi/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/HeartbeatAdapter.java b/nifi/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/HeartbeatAdapter.java
deleted file mode 100644
index 0e073b6..0000000
--- a/nifi/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/HeartbeatAdapter.java
+++ /dev/null
@@ -1,54 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.cluster.protocol.jaxb.message;
-
-import javax.xml.bind.annotation.adapters.XmlAdapter;
-import org.apache.nifi.cluster.protocol.Heartbeat;
-
-/**
- * @author unattributed
- */
-public class HeartbeatAdapter extends XmlAdapter<AdaptedHeartbeat, Heartbeat> {
-
- @Override
- public AdaptedHeartbeat marshal(final Heartbeat hb) {
-
- final AdaptedHeartbeat aHb = new AdaptedHeartbeat();
-
- if(hb != null) {
- // set node identifier
- aHb.setNodeIdentifier(hb.getNodeIdentifier());
-
- // set payload
- aHb.setPayload(hb.getPayload());
-
- // set leader flag
- aHb.setPrimary(hb.isPrimary());
-
- // set connected flag
- aHb.setConnected(hb.isConnected());
- }
-
- return aHb;
- }
-
- @Override
- public Heartbeat unmarshal(final AdaptedHeartbeat aHb) {
- return new Heartbeat(aHb.getNodeIdentifier(), aHb.isPrimary(), aHb.isConnected(), aHb.getPayload());
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/f6d9354b/nifi/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/JaxbProtocolUtils.java
----------------------------------------------------------------------
diff --git a/nifi/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/JaxbProtocolUtils.java b/nifi/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/JaxbProtocolUtils.java
deleted file mode 100644
index c3a57f5..0000000
--- a/nifi/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/JaxbProtocolUtils.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.cluster.protocol.jaxb.message;
-
-import javax.xml.bind.JAXBContext;
-import javax.xml.bind.JAXBException;
-
-/**
- * @author unattributed
- */
-public final class JaxbProtocolUtils {
-
- public static final String JAXB_CONTEXT_PATH = ObjectFactory.class.getPackage().getName();
-
- public static final JAXBContext JAXB_CONTEXT = initializeJaxbContext();
-
- /**
- * Load the JAXBContext version.
- */
- private static JAXBContext initializeJaxbContext() {
- try {
- return JAXBContext.newInstance(JAXB_CONTEXT_PATH);
- } catch (JAXBException e) {
- throw new RuntimeException("Unable to create JAXBContext.", e);
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/f6d9354b/nifi/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/NodeBulletinsAdapter.java
----------------------------------------------------------------------
diff --git a/nifi/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/NodeBulletinsAdapter.java b/nifi/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/NodeBulletinsAdapter.java
deleted file mode 100644
index 1ae41f7..0000000
--- a/nifi/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/NodeBulletinsAdapter.java
+++ /dev/null
@@ -1,48 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.cluster.protocol.jaxb.message;
-
-import javax.xml.bind.annotation.adapters.XmlAdapter;
-import org.apache.nifi.cluster.protocol.NodeBulletins;
-
-/**
- * @author unattributed
- */
-public class NodeBulletinsAdapter extends XmlAdapter<AdaptedNodeBulletins, NodeBulletins> {
-
- @Override
- public AdaptedNodeBulletins marshal(final NodeBulletins hb) {
-
- final AdaptedNodeBulletins adaptedBulletins = new AdaptedNodeBulletins();
-
- if(hb != null) {
- // set node identifier
- adaptedBulletins.setNodeIdentifier(hb.getNodeIdentifier());
-
- // set payload
- adaptedBulletins.setPayload(hb.getPayload());
- }
-
- return adaptedBulletins;
- }
-
- @Override
- public NodeBulletins unmarshal(final AdaptedNodeBulletins adaptedBulletins) {
- return new NodeBulletins(adaptedBulletins.getNodeIdentifier(), adaptedBulletins.getPayload());
- }
-
-}