You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by jo...@apache.org on 2014/12/08 21:29:37 UTC

[14/51] [partial] incubator-nifi git commit: Initial code contribution

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/flow/impl/DataFlowDaoImpl.java
----------------------------------------------------------------------
diff --git a/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/flow/impl/DataFlowDaoImpl.java b/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/flow/impl/DataFlowDaoImpl.java
new file mode 100644
index 0000000..702f081
--- /dev/null
+++ b/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/flow/impl/DataFlowDaoImpl.java
@@ -0,0 +1,551 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.cluster.flow.impl;
+
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.FilenameFilter;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.UUID;
+
+import javax.xml.bind.JAXBContext;
+import javax.xml.bind.JAXBException;
+import javax.xml.bind.Marshaller;
+import javax.xml.bind.Unmarshaller;
+import javax.xml.bind.annotation.XmlRootElement;
+import javax.xml.bind.annotation.adapters.XmlJavaTypeAdapter;
+import javax.xml.parsers.DocumentBuilder;
+import javax.xml.parsers.DocumentBuilderFactory;
+import javax.xml.transform.OutputKeys;
+import javax.xml.transform.Transformer;
+import javax.xml.transform.TransformerFactory;
+import javax.xml.transform.dom.DOMSource;
+import javax.xml.transform.stream.StreamResult;
+
+import org.apache.nifi.cluster.flow.ClusterDataFlow;
+import org.apache.nifi.cluster.flow.DaoException;
+import org.apache.nifi.cluster.flow.DataFlowDao;
+import org.apache.nifi.cluster.flow.PersistedFlowState;
+import org.apache.nifi.cluster.protocol.DataFlow;
+import org.apache.nifi.cluster.protocol.NodeIdentifier;
+import org.apache.nifi.cluster.protocol.StandardDataFlow;
+import org.apache.nifi.cluster.protocol.jaxb.message.NodeIdentifierAdapter;
+import org.apache.nifi.file.FileUtils;
+import org.apache.nifi.io.BufferedInputStream;
+import org.apache.nifi.io.BufferedOutputStream;
+import org.apache.nifi.io.ByteArrayInputStream;
+import org.apache.nifi.io.StreamUtils;
+import org.apache.nifi.logging.NiFiLog;
+
+import org.apache.commons.compress.archivers.tar.TarArchiveEntry;
+import org.apache.commons.compress.archivers.tar.TarArchiveInputStream;
+import org.apache.commons.compress.archivers.tar.TarArchiveOutputStream;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.w3c.dom.Document;
+import org.w3c.dom.Element;
+
+/**
+ * Implements the FlowDao interface. The implementation tracks the state of the
+ * dataflow by annotating the filename of the flow state file. Specifically, the
+ * implementation correlates PersistedFlowState states to filename extensions.
+ * The correlation is as follows:
+ * <ul>
+ * <li> CURRENT maps to flow.xml </li>
+ * <li> STALE maps to flow.xml.stale </li>
+ * <li> UNKNOWN maps to flow.xml.unknown </li>
+ * </ul>
+ * Whenever the flow state changes, the flow state file's name is updated to
+ * denote its state.
+ *
+ * The implementation also provides for a restore directory that may be
+ * configured for higher availability. At instance creation, if the primary or
+ * restore directories have multiple flow state files, an exception is thrown.
+ * If the primary directory has a current flow state file, but the restore
+ * directory does not, then the primary flow state file is copied to the restore
+ * directory. If the restore directory has a current flow state file, but the
+ * primary directory does not, then the restore flow state file is copied to the
+ * primary directory. If both the primary and restore directories have a current
+ * flow state file and the files are different, then an exception is thrown.
+ *
+ * When the flow state file is saved, it is always saved first to the restore
+ * directory followed by a save to the primary directory. When the flow state
+ * file is loaded, a check is made to verify that the primary and restore flow
+ * state files are both current. If either is not current, then an exception is
+ * thrown. The primary flow state file is always read when the load method is
+ * called.
+ *
+ * @author unattributed
+ */
+public class DataFlowDaoImpl implements DataFlowDao {
+
+    private final File primaryDirectory;
+    private final File restoreDirectory;
+    private final boolean autoStart;
+    private final String generatedRootGroupId = UUID.randomUUID().toString();
+
+    public static final String STALE_EXT = ".stale";
+    public static final String UNKNOWN_EXT = ".unknown";
+    public static final String FLOW_PACKAGE = "flow.tar";
+    public static final String FLOW_XML_FILENAME = "flow.xml";
+    public static final String TEMPLATES_FILENAME = "templates.xml";
+    public static final String SNIPPETS_FILENAME = "snippets.xml";
+    public static final String CLUSTER_INFO_FILENAME = "cluster-info.xml";
+
+    private static final Logger logger = new NiFiLog(LoggerFactory.getLogger(DataFlowDaoImpl.class));
+
+    public DataFlowDaoImpl(final File primaryDirectory) throws DaoException {
+        this(primaryDirectory, null, false);
+    }
+
+    public DataFlowDaoImpl(final File primaryDirectory, final File restoreDirectory, final boolean autoStart) throws DaoException {
+
+        // sanity check that primary directory is a directory, creating it if necessary
+        if (primaryDirectory == null) {
+            throw new IllegalArgumentException("Primary directory may not be null.");
+        } else if (!primaryDirectory.exists()) {
+            if (!primaryDirectory.mkdir()) {
+                throw new DaoException(String.format("Failed to create primary directory '%s'", primaryDirectory.getAbsolutePath()));
+            }
+        } else if (!primaryDirectory.isDirectory()) {
+            throw new IllegalArgumentException("Primary directory must be a directory.");
+        }
+
+        this.autoStart = autoStart;
+
+        try {
+            this.primaryDirectory = primaryDirectory;
+            this.restoreDirectory = restoreDirectory;
+
+            if (restoreDirectory == null) {
+                // check that we have exactly one current flow state file
+                ensureSingleCurrentStateFile(primaryDirectory);
+            } else {
+
+                // check that restore directory is a directory, creating it if necessary
+                FileUtils.ensureDirectoryExistAndCanAccess(restoreDirectory);
+
+                // check that restore directory is not the same as the primary directory
+                if (primaryDirectory.getAbsolutePath().equals(restoreDirectory.getAbsolutePath())) {
+                    throw new IllegalArgumentException(String.format("Primary directory '%s' is the same as restore directory '%s' ",
+                            primaryDirectory.getAbsolutePath(), restoreDirectory.getAbsolutePath()));
+                }
+
+                final File[] primaryFlowStateFiles = getFlowStateFiles(primaryDirectory);
+                final File[] restoreFlowStateFiles = getFlowStateFiles(restoreDirectory);
+
+                // if more than one state file in either primary or restore, then throw exception
+                if (primaryFlowStateFiles.length > 1) {
+                    throw new IllegalStateException(String.format("Found multiple dataflow state files in primary directory '%s'", primaryDirectory));
+                } else if (restoreFlowStateFiles.length > 1) {
+                    throw new IllegalStateException(String.format("Found multiple dataflow state files in restore directory '%s'", restoreDirectory));
+                }
+
+                // check that the single primary state file we found is current or create a new one
+                final File primaryFlowStateFile = ensureSingleCurrentStateFile(primaryDirectory);
+
+                // check that the single restore state file we found is current or create a new one
+                final File restoreFlowStateFile = ensureSingleCurrentStateFile(restoreDirectory);
+
+                // if there was a difference in flow state file directories, then copy the appropriate files
+                if (restoreFlowStateFiles.length == 0 && primaryFlowStateFiles.length != 0) {
+                    // copy primary state file to restore
+                    FileUtils.copyFile(primaryFlowStateFile, restoreFlowStateFile, false, false, logger);
+                } else if (primaryFlowStateFiles.length == 0 && restoreFlowStateFiles.length != 0) {
+                    // copy restore state file to primary
+                    FileUtils.copyFile(restoreFlowStateFile, primaryFlowStateFile, false, false, logger);
+                } else {
+                    // sync the primary copy with the restore copy
+                    FileUtils.syncWithRestore(primaryFlowStateFile, restoreFlowStateFile, logger);
+                }
+
+            }
+        } catch (final IOException | IllegalArgumentException | IllegalStateException | JAXBException ex) {
+            throw new DaoException(ex);
+        }
+
+    }
+
+    @Override
+    public ClusterDataFlow loadDataFlow() throws DaoException {
+        try {
+            return parseDataFlow(getExistingFlowStateFile(primaryDirectory));
+        } catch (final IOException | JAXBException ex) {
+            throw new DaoException(ex);
+        }
+    }
+
+    @Override
+    public void saveDataFlow(final ClusterDataFlow dataFlow) throws DaoException {
+        try {
+
+            final File primaryStateFile = getFlowStateFile(primaryDirectory);
+
+            // write to restore before writing to primary in case primary experiences problems
+            if (restoreDirectory != null) {
+                final File restoreStateFile = getFlowStateFile(restoreDirectory);
+                if (restoreStateFile == null) {
+                    if (primaryStateFile == null) {
+                        writeDataFlow(createNewFlowStateFile(restoreDirectory), dataFlow);
+                    } else {
+                        throw new DaoException(String.format("Unable to save dataflow because dataflow state file in primary directory '%s' exists, but it does not exist in the restore directory '%s'",
+                                primaryDirectory.getAbsolutePath(), restoreDirectory.getAbsolutePath()));
+                    }
+                } else {
+                    if (primaryStateFile == null) {
+                        throw new DaoException(String.format("Unable to save dataflow because dataflow state file in restore directory '%s' exists, but it does not exist in the primary directory '%s'",
+                                restoreDirectory.getAbsolutePath(), primaryDirectory.getAbsolutePath()));
+                    } else {
+                        final PersistedFlowState primaryFlowState = getPersistedFlowState(primaryStateFile);
+                        final PersistedFlowState restoreFlowState = getPersistedFlowState(restoreStateFile);
+                        if (primaryFlowState == restoreFlowState) {
+                            writeDataFlow(restoreStateFile, dataFlow);
+                        } else {
+                            throw new DaoException(String.format("Unable to save dataflow because state file in primary directory '%s' has state '%s', but the state file in the restore directory '%s' has state '%s'",
+                                    primaryDirectory.getAbsolutePath(), primaryFlowState, restoreDirectory.getAbsolutePath(), restoreFlowState));
+                        }
+                    }
+                }
+            }
+
+            // write dataflow to primary 
+            if (primaryStateFile == null) {
+                writeDataFlow(createNewFlowStateFile(primaryDirectory), dataFlow);
+            } else {
+                writeDataFlow(primaryStateFile, dataFlow);
+            }
+
+        } catch (final IOException | JAXBException ex) {
+            throw new DaoException(ex);
+        }
+    }
+
+    @Override
+    public PersistedFlowState getPersistedFlowState() {
+        // trust restore over primary if configured for restore
+        if (restoreDirectory == null) {
+            return getPersistedFlowState(getExistingFlowStateFile(primaryDirectory));
+        } else {
+            return getPersistedFlowState(getExistingFlowStateFile(restoreDirectory));
+        }
+    }
+
+    @Override
+    public void setPersistedFlowState(final PersistedFlowState flowState) throws DaoException {
+        // rename restore before primary if configured for restore
+        if (restoreDirectory != null) {
+            renameFlowStateFile(getExistingFlowStateFile(restoreDirectory), flowState);
+        }
+        renameFlowStateFile(getExistingFlowStateFile(primaryDirectory), flowState);
+    }
+
+    private File ensureSingleCurrentStateFile(final File dir) throws IOException, JAXBException {
+
+        // ensure that we have at most one state file and if we have one, it is current
+        final File[] directoryFlowStateFiles = getFlowStateFiles(dir);
+        if (directoryFlowStateFiles.length > 1) {
+            throw new DaoException(String.format("Found multiple dataflow state files in directory '%s'", dir));
+        } else if (directoryFlowStateFiles.length == 0) {
+            // create a new file if none exist
+            return createNewFlowStateFile(dir);
+        } else {
+            // check that the single flow state file is current
+            final PersistedFlowState flowState = getPersistedFlowState(directoryFlowStateFiles[0]);
+            if (PersistedFlowState.CURRENT == flowState) {
+                return directoryFlowStateFiles[0];
+            } else {
+                throw new DaoException(String.format("Dataflow state file '%s' must be current.", directoryFlowStateFiles[0].getAbsolutePath()));
+            }
+        }
+
+    }
+
+    private PersistedFlowState getPersistedFlowState(final File file) {
+        final String path = file.getAbsolutePath();
+        if (path.endsWith(STALE_EXT)) {
+            return PersistedFlowState.STALE;
+        } else if (path.endsWith(UNKNOWN_EXT)) {
+            return PersistedFlowState.UNKNOWN;
+        } else {
+            return PersistedFlowState.CURRENT;
+        }
+    }
+
+    private File getFlowStateFile(final File dir) {
+        final File[] files = getFlowStateFiles(dir);
+        if (files.length > 1) {
+            throw new IllegalStateException(String.format("Expected at most one dataflow state file, but found %s files.", files.length));
+        } else if (files.length == 0) {
+            return null;
+        } else {
+            return files[0];
+        }
+    }
+
+    private File getExistingFlowStateFile(final File dir) {
+        final File file = getFlowStateFile(dir);
+        if (file == null) {
+            throw new IllegalStateException(String.format("Expected a dataflow state file, but none existed in directory '%s'", dir.getAbsolutePath()));
+        }
+        return file;
+    }
+
+    private File[] getFlowStateFiles(final File dir) {
+        final File[] files = dir.listFiles(new FilenameFilter() {
+            @Override
+            public boolean accept(File dir, String name) {
+                return (name.equals(FLOW_PACKAGE) || name.endsWith(STALE_EXT) || name.endsWith(UNKNOWN_EXT));
+            }
+        });
+
+        if (files == null) {
+            return new File[0];
+        } else {
+            return files;
+        }
+    }
+
+    private File removeStateFileExtension(final File file) {
+
+        final String path = file.getAbsolutePath();
+        final int stateFileExtIndex;
+        if (path.endsWith(STALE_EXT)) {
+            stateFileExtIndex = path.lastIndexOf(STALE_EXT);
+        } else if (path.endsWith(UNKNOWN_EXT)) {
+            stateFileExtIndex = path.lastIndexOf(UNKNOWN_EXT);
+        } else {
+            stateFileExtIndex = path.length();
+        }
+
+        return new File(path.substring(0, stateFileExtIndex));
+    }
+
+    private File addStateFileExtension(final File file, final PersistedFlowState state) {
+        switch (state) {
+            case CURRENT: {
+                return file;
+            }
+            case STALE: {
+                return new File(file.getAbsolutePath() + STALE_EXT);
+            }
+            case UNKNOWN: {
+                return new File(file.getAbsolutePath() + UNKNOWN_EXT);
+            }
+            default: {
+                throw new RuntimeException("Unsupported PersistedFlowState Enum value: " + state);
+            }
+        }
+    }
+
+    private File createNewFlowStateFile(final File dir) throws IOException, JAXBException {
+        final File stateFile = new File(dir, FLOW_PACKAGE);
+        stateFile.createNewFile();
+
+        final byte[] flowBytes = getEmptyFlowBytes();
+        final byte[] templateBytes = new byte[0];
+        final byte[] snippetBytes = new byte[0];
+        final DataFlow dataFlow = new StandardDataFlow(flowBytes, templateBytes, snippetBytes);
+
+        final ClusterMetadata clusterMetadata = new ClusterMetadata();
+        writeDataFlow(stateFile, dataFlow, clusterMetadata);
+
+        return stateFile;
+    }
+
+    private byte[] getEmptyFlowBytes() throws IOException {
+        try {
+            final DocumentBuilder docBuilder = DocumentBuilderFactory.newInstance().newDocumentBuilder();
+            final Document document = docBuilder.newDocument();
+
+            final Element controller = document.createElement("flowController");
+            document.appendChild(controller);
+
+            controller.appendChild(createTextElement(document, "maxThreadCount", "15"));
+
+            final Element rootGroup = document.createElement("rootGroup");
+            rootGroup.appendChild(createTextElement(document, "id", generatedRootGroupId));
+            rootGroup.appendChild(createTextElement(document, "name", "NiFi Flow"));
+
+            // create the position element
+            final Element positionElement = createTextElement(document, "position", "");
+            positionElement.setAttribute("x", "0.0");
+            positionElement.setAttribute("y", "0.0");
+            rootGroup.appendChild(positionElement);
+
+            rootGroup.appendChild(createTextElement(document, "comment", ""));
+            controller.appendChild(rootGroup);
+
+            final Transformer transformer = TransformerFactory.newInstance().newTransformer();
+            transformer.setOutputProperty("{http://xml.apache.org/xslt}indent-amount", "2");
+            transformer.setOutputProperty(OutputKeys.INDENT, "yes");
+
+            final DOMSource source = new DOMSource(document);
+            final ByteArrayOutputStream baos = new ByteArrayOutputStream();
+            final StreamResult result = new StreamResult(baos);
+            transformer.transform(source, result);
+
+            return baos.toByteArray();
+        } catch (final Exception e) {
+            throw new IOException(e);
+        }
+    }
+
+    private Element createTextElement(final Document document, final String elementName, final String value) {
+        final Element element = document.createElement(elementName);
+        element.setTextContent(value);
+        return element;
+    }
+
+    private void renameFlowStateFile(final File flowStateFile, final PersistedFlowState newState) throws DaoException {
+        final PersistedFlowState existingState = getPersistedFlowState(flowStateFile);
+        if (existingState != newState) {
+            final File newFlowStateFile = addStateFileExtension(removeStateFileExtension(flowStateFile), newState);
+            if (flowStateFile.renameTo(newFlowStateFile) == false) {
+                throw new DaoException(
+                        String.format("Failed to rename flow state file '%s' to new name '%s'", flowStateFile.getAbsolutePath(), newFlowStateFile.getAbsolutePath()));
+            }
+        }
+    }
+
+    private ClusterDataFlow parseDataFlow(final File file) throws IOException, JAXBException, DaoException {
+        byte[] flowBytes = new byte[0];
+        byte[] templateBytes = new byte[0];
+        byte[] snippetBytes = new byte[0];
+        byte[] clusterInfoBytes = new byte[0];
+
+        try (final InputStream inStream = new FileInputStream(file);
+                final TarArchiveInputStream tarIn = new TarArchiveInputStream(new BufferedInputStream(inStream))) {
+            TarArchiveEntry tarEntry;
+            while ((tarEntry = tarIn.getNextTarEntry()) != null) {
+                switch (tarEntry.getName()) {
+                    case FLOW_XML_FILENAME:
+                        flowBytes = new byte[(int) tarEntry.getSize()];
+                        StreamUtils.fillBuffer(tarIn, flowBytes, true);
+                        break;
+                    case TEMPLATES_FILENAME:
+                        templateBytes = new byte[(int) tarEntry.getSize()];
+                        StreamUtils.fillBuffer(tarIn, templateBytes, true);
+                        break;
+                    case SNIPPETS_FILENAME:
+                        snippetBytes = new byte[(int) tarEntry.getSize()];
+                        StreamUtils.fillBuffer(tarIn, snippetBytes, true);
+                        break;
+                    case CLUSTER_INFO_FILENAME:
+                        clusterInfoBytes = new byte[(int) tarEntry.getSize()];
+                        StreamUtils.fillBuffer(tarIn, clusterInfoBytes, true);
+                        break;
+                    default:
+                        throw new DaoException("Found Unexpected file in dataflow configuration: " + tarEntry.getName());
+                }
+            }
+        }
+
+        final ClusterMetadata clusterMetadata;
+        if (clusterInfoBytes.length == 0) {
+            clusterMetadata = null;
+        } else {
+            final Unmarshaller clusterMetadataUnmarshaller = ClusterMetadata.jaxbCtx.createUnmarshaller();
+            clusterMetadata = (ClusterMetadata) clusterMetadataUnmarshaller.unmarshal(new ByteArrayInputStream(clusterInfoBytes));
+        }
+
+        final StandardDataFlow dataFlow = new StandardDataFlow(flowBytes, templateBytes, snippetBytes);
+        dataFlow.setAutoStartProcessors(autoStart);
+
+        return new ClusterDataFlow(dataFlow, (clusterMetadata == null) ? null : clusterMetadata.getPrimaryNodeId());
+    }
+
+    private void writeDataFlow(final File file, final ClusterDataFlow clusterDataFlow) throws IOException, JAXBException {
+
+        // get the data flow
+        DataFlow dataFlow = clusterDataFlow.getDataFlow();
+
+        // if no dataflow, then write a new dataflow
+        if (dataFlow == null) {
+            dataFlow = new StandardDataFlow(new byte[0], new byte[0], new byte[0]);
+        }
+
+        // setup the cluster metadata
+        final ClusterMetadata clusterMetadata = new ClusterMetadata();
+        clusterMetadata.setPrimaryNodeId(clusterDataFlow.getPrimaryNodeId());
+
+        // write to disk
+        writeDataFlow(file, dataFlow, clusterMetadata);
+    }
+
+    private void writeTarEntry(final TarArchiveOutputStream tarOut, final String filename, final byte[] bytes) throws IOException {
+        final TarArchiveEntry flowEntry = new TarArchiveEntry(filename);
+        flowEntry.setSize(bytes.length);
+        tarOut.putArchiveEntry(flowEntry);
+        tarOut.write(bytes);
+        tarOut.closeArchiveEntry();
+    }
+
+    private void writeDataFlow(final File file, final DataFlow dataFlow, final ClusterMetadata clusterMetadata) throws IOException, JAXBException {
+
+        try (final OutputStream fos = new FileOutputStream(file);
+                final TarArchiveOutputStream tarOut = new TarArchiveOutputStream(new BufferedOutputStream(fos))) {
+
+            writeTarEntry(tarOut, FLOW_XML_FILENAME, dataFlow.getFlow());
+            writeTarEntry(tarOut, TEMPLATES_FILENAME, dataFlow.getTemplates());
+            writeTarEntry(tarOut, SNIPPETS_FILENAME, dataFlow.getSnippets());
+
+            final ByteArrayOutputStream baos = new ByteArrayOutputStream(256);
+            writeClusterMetadata(clusterMetadata, baos);
+            final byte[] clusterInfoBytes = baos.toByteArray();
+
+            writeTarEntry(tarOut, CLUSTER_INFO_FILENAME, clusterInfoBytes);
+        }
+    }
+
+    private void writeClusterMetadata(final ClusterMetadata clusterMetadata, final OutputStream os) throws IOException, JAXBException {
+        // write cluster metadata to output stream
+        final Marshaller marshaller = ClusterMetadata.jaxbCtx.createMarshaller();
+        marshaller.setProperty(Marshaller.JAXB_FORMATTED_OUTPUT, true);
+        marshaller.setProperty(Marshaller.JAXB_FRAGMENT, true);
+        marshaller.setProperty(Marshaller.JAXB_ENCODING, "UTF-8");
+        marshaller.marshal(clusterMetadata, os);
+    }
+
+    @XmlRootElement(name = "clusterMetadata")
+    private static class ClusterMetadata {
+
+        private NodeIdentifier primaryNodeId;
+
+        private static final JAXBContext jaxbCtx;
+
+        static {
+            try {
+                jaxbCtx = JAXBContext.newInstance(ClusterMetadata.class);
+            } catch (final JAXBException je) {
+                throw new RuntimeException(je);
+            }
+        }
+
+        @XmlJavaTypeAdapter(NodeIdentifierAdapter.class)
+        public NodeIdentifier getPrimaryNodeId() {
+            return primaryNodeId;
+        }
+
+        public void setPrimaryNodeId(final NodeIdentifier primaryNodeId) {
+            this.primaryNodeId = primaryNodeId;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/flow/impl/DataFlowManagementServiceImpl.java
----------------------------------------------------------------------
diff --git a/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/flow/impl/DataFlowManagementServiceImpl.java b/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/flow/impl/DataFlowManagementServiceImpl.java
new file mode 100644
index 0000000..e135af3
--- /dev/null
+++ b/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/flow/impl/DataFlowManagementServiceImpl.java
@@ -0,0 +1,356 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.cluster.flow.impl;
+
+import java.util.Collections;
+import java.util.Date;
+import java.util.Set;
+import java.util.Timer;
+import java.util.TimerTask;
+import java.util.concurrent.CopyOnWriteArraySet;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+import org.apache.nifi.cluster.flow.ClusterDataFlow;
+import org.apache.nifi.cluster.flow.DaoException;
+import org.apache.nifi.cluster.flow.DataFlowDao;
+import org.apache.nifi.cluster.flow.DataFlowManagementService;
+import org.apache.nifi.cluster.flow.PersistedFlowState;
+import org.apache.nifi.cluster.protocol.ClusterManagerProtocolSender;
+import org.apache.nifi.cluster.protocol.NodeIdentifier;
+import org.apache.nifi.cluster.protocol.StandardDataFlow;
+import org.apache.nifi.cluster.protocol.message.FlowRequestMessage;
+import org.apache.nifi.cluster.protocol.message.FlowResponseMessage;
+import org.apache.nifi.logging.NiFiLog;
+import org.apache.nifi.util.FormatUtils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Implements FlowManagementService interface. The service tries to keep the
+ * cluster's flow current with regards to the available nodes.
+ *
+ * The instance may be configured with a retrieval delay, which will reduce the
+ * number of retrievals performed by the service at the expense of increasing
+ * the chances that the service will not be able to provide a current flow to
+ * the caller.
+ *
+ * By default, the service will try to update the flow as quickly as possible.
+ * Configuring a delay enables a less aggressive retrieval strategy.
+ * Specifically, the eligible retrieval time is reset every time the flow state
+ * is set to STALE. If the state is set to UNKNOWN or CURRENT, then the flow
+ * will not be retrieved.
+ *
+ * @author unattributed
+ */
+public class DataFlowManagementServiceImpl implements DataFlowManagementService {
+
+    /*
+     * Developer Note: 
+     * 
+     * This class maintains an ExecutorService and a Runnable.
+     * Although the class is not externally threadsafe, its internals are protected to
+     * accommodate multithread access between the ExecutorServer and the Runnable.
+     * 
+     */
+    private static final Logger logger = new NiFiLog(LoggerFactory.getLogger(DataFlowManagementServiceImpl.class));
+
+    private final DataFlowDao flowDao;
+
+    private final ClusterManagerProtocolSender sender;
+
+    private final Set<NodeIdentifier> nodeIds = new CopyOnWriteArraySet<>();
+
+    private final AtomicBoolean stopRequested = new AtomicBoolean(false);
+
+    private final AtomicLong lastRetrievalTime = new AtomicLong(-1);
+
+    private Timer flowRetriever;
+
+    private long retrievableAfterTime = 0L;
+
+    private AtomicInteger retrievalDelaySeconds = new AtomicInteger(0);
+
+    private final TimingReentrantLock resourceLock = new TimingReentrantLock(new ReentrantLock());
+
+    public DataFlowManagementServiceImpl(final DataFlowDao flowDao, final ClusterManagerProtocolSender sender) {
+        if (flowDao == null) {
+            throw new IllegalArgumentException("Flow DAO may not be null.");
+        } else if (sender == null) {
+            throw new IllegalArgumentException("Cluster Manager Protocol Sender may not be null.");
+        }
+        this.flowDao = flowDao;
+        this.sender = sender;
+    }
+
+    @Override
+    public void start() {
+
+        if (isRunning()) {
+            throw new IllegalArgumentException("Instance is already running.");
+        }
+
+        // reset stop requested
+        stopRequested.set(false);
+
+        // setup flow retreiver timer
+        flowRetriever = new Timer("Flow Management Service", /* is daemon */ true);
+        flowRetriever.schedule(new FlowRetrieverTimerTask(), 0, 500);
+    }
+
+    @Override
+    public boolean isRunning() {
+        return (flowRetriever != null);
+    }
+
+    @Override
+    public void stop() {
+
+        if (isRunning() == false) {
+            throw new IllegalArgumentException("Instance is already stopped.");
+        }
+
+        // record stop request
+        stopRequested.set(true);
+
+        flowRetriever.cancel();
+        flowRetriever = null;
+
+    }
+
+    @Override
+    public ClusterDataFlow loadDataFlow() throws DaoException {
+        resourceLock.lock();
+        try {
+            return flowDao.loadDataFlow();
+        } finally {
+            resourceLock.unlock("loadDataFlow");
+        }
+    }
+
+    @Override
+    public void updatePrimaryNode(final NodeIdentifier nodeId) {
+        resourceLock.lock();
+        try {
+            final ClusterDataFlow existingClusterDataFlow = flowDao.loadDataFlow();
+
+            final StandardDataFlow dataFlow;
+            if (existingClusterDataFlow == null) {
+                dataFlow = null;
+            } else {
+                dataFlow = existingClusterDataFlow.getDataFlow();
+            }
+
+            flowDao.saveDataFlow(new ClusterDataFlow(dataFlow, nodeId));
+        } finally {
+            resourceLock.unlock("updatePrimaryNode");
+        }
+    }
+
+    @Override
+    public PersistedFlowState getPersistedFlowState() {
+        resourceLock.lock();
+        try {
+            return flowDao.getPersistedFlowState();
+        } finally {
+            resourceLock.unlock("getPersistedFlowState");
+        }
+    }
+
+    @Override
+    public boolean isFlowCurrent() {
+        return PersistedFlowState.CURRENT == getPersistedFlowState();
+    }
+
+    @Override
+    public void setPersistedFlowState(final PersistedFlowState flowState) {
+        // lock to ensure state change and retrievable time update are atomic
+        resourceLock.lock();
+        try {
+            flowDao.setPersistedFlowState(flowState);
+            if (PersistedFlowState.STALE == flowState) {
+                retrievableAfterTime = new Date().getTime() + (getRetrievalDelaySeconds() * 1000);
+            } else if (PersistedFlowState.UNKNOWN == flowState || PersistedFlowState.CURRENT == flowState) {
+                retrievableAfterTime = Long.MAX_VALUE;
+            }
+        } finally {
+            resourceLock.unlock("setPersistedFlowState");
+        }
+    }
+
+    @Override
+    public Set<NodeIdentifier> getNodeIds() {
+        return Collections.unmodifiableSet(nodeIds);
+    }
+
+    @Override
+    public void setNodeIds(final Set<NodeIdentifier> nodeIds) {
+
+        if (nodeIds == null) {
+            throw new IllegalArgumentException("Node IDs may not be null.");
+        }
+
+        resourceLock.lock();
+        try {
+
+            if (this.nodeIds.equals(nodeIds)) {
+                return;
+            }
+
+            this.nodeIds.clear();
+            this.nodeIds.addAll(nodeIds);
+
+        } finally {
+            resourceLock.unlock("setNodeIds");
+        }
+
+    }
+
+    @Override
+    public int getRetrievalDelaySeconds() {
+        return retrievalDelaySeconds.get();
+    }
+
+    @Override
+    public void setRetrievalDelay(final String retrievalDelay) {
+        this.retrievalDelaySeconds.set((int) FormatUtils.getTimeDuration(retrievalDelay, TimeUnit.SECONDS));
+    }
+
+    public ClusterManagerProtocolSender getSender() {
+        return sender;
+    }
+
+    public long getLastRetrievalTime() {
+        return lastRetrievalTime.get();
+    }
+
+    /**
+     * A timer task for issuing FlowRequestMessage messages to nodes to retrieve
+     * an updated flow.
+     */
+    private class FlowRetrieverTimerTask extends TimerTask {
+
+        @Override
+        public void run() {
+
+            resourceLock.lock();
+            try {
+                // if flow is current, then we're done
+                if (isFlowCurrent()) {
+                    return;
+                }
+            } catch (final Exception ex) {
+                logger.info("Encountered exception checking if flow is current caused by " + ex, ex);
+            } finally {
+                resourceLock.unlock("FlowRetrieverTimerTask - isFlowCurrent");
+            }
+
+            final FlowRequestMessage request = new FlowRequestMessage();
+            for (final NodeIdentifier nodeId : getNodeIds()) {
+                try {
+                    // setup request
+                    request.setNodeId(nodeId);
+
+                    // record request time
+                    final long requestSentTime = new Date().getTime();
+
+                    resourceLock.lock();
+                    try {
+                        // sanity checks before making request
+                        if (stopRequested.get()) {  // did we receive a stop request
+                            logger.debug("Stopping runnable prematurely because a request to stop was issued.");
+                            return;
+                        } else if (requestSentTime < retrievableAfterTime) {
+                            /*
+                             * Retrievable after time was updated while obtaining
+                             * the lock, so try again later
+                             */
+                            return;
+                        }
+                    } finally {
+                        resourceLock.unlock("FlowRetrieverTimerTask - check stopRequested");
+                    }
+
+                    // send request
+                    final FlowResponseMessage response = sender.requestFlow(request);
+
+                    resourceLock.lock();
+                    try {
+                        // check if the retrieved flow is still valid
+                        if (requestSentTime > retrievableAfterTime) {
+                            logger.info("Saving retrieved flow.");
+
+                            final StandardDataFlow dataFlow = response.getDataFlow();
+                            final ClusterDataFlow existingClusterDataFlow = flowDao.loadDataFlow();
+                            final ClusterDataFlow currentClusterDataFlow;
+                            if (existingClusterDataFlow == null) {
+                                currentClusterDataFlow = new ClusterDataFlow(dataFlow, null);
+                            } else {
+                                currentClusterDataFlow = new ClusterDataFlow(dataFlow, existingClusterDataFlow.getPrimaryNodeId());
+                            }
+                            flowDao.saveDataFlow(currentClusterDataFlow);
+                            flowDao.setPersistedFlowState(PersistedFlowState.CURRENT);
+                            lastRetrievalTime.set(new Date().getTime());
+                        }
+
+                        /*
+                         * Retrievable after time was updated while requesting
+                         * the flow, so try again later.
+                         */
+                    } finally {
+                        resourceLock.unlock("FlowRetrieverTimerTask - saveDataFlow");
+                    }
+
+                } catch (final Throwable t) {
+                    logger.info("Encountered exception retrieving flow from node " + nodeId + " caused by " + t, t);
+                }
+            }
+        }
+    }
+
+    private static class TimingReentrantLock {
+
+        private final Lock lock;
+        private static final Logger logger = LoggerFactory.getLogger("dataFlowManagementService.lock");
+
+        private final ThreadLocal<Long> lockTime = new ThreadLocal<>();
+
+        public TimingReentrantLock(final Lock lock) {
+            this.lock = lock;
+        }
+
+        public void lock() {
+            lock.lock();
+            lockTime.set(System.nanoTime());
+        }
+
+        public void unlock(final String task) {
+            final long nanosLocked = System.nanoTime() - lockTime.get();
+            lock.unlock();
+
+            final long millisLocked = TimeUnit.MILLISECONDS.convert(nanosLocked, TimeUnit.NANOSECONDS);
+            if (millisLocked > 100L) {
+                logger.debug("Lock held for {} milliseconds for task: {}", millisLocked, task);
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/manager/ClusterManager.java
----------------------------------------------------------------------
diff --git a/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/manager/ClusterManager.java b/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/manager/ClusterManager.java
new file mode 100644
index 0000000..0fcac8c
--- /dev/null
+++ b/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/manager/ClusterManager.java
@@ -0,0 +1,225 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.cluster.manager;
+
+import org.apache.nifi.cluster.manager.exception.PrimaryRoleAssignmentException;
+import org.apache.nifi.cluster.manager.exception.IllegalNodeDisconnectionException;
+import org.apache.nifi.cluster.manager.exception.UnknownNodeException;
+import org.apache.nifi.cluster.manager.exception.IneligiblePrimaryNodeException;
+import org.apache.nifi.cluster.manager.exception.IllegalNodeDeletionException;
+import org.apache.nifi.cluster.manager.exception.IllegalNodeReconnectionException;
+import org.apache.nifi.cluster.manager.exception.NodeDisconnectionException;
+import org.apache.nifi.cluster.NodeInformant;
+import org.apache.nifi.cluster.event.Event;
+import org.apache.nifi.cluster.node.Node;
+import org.apache.nifi.cluster.node.Node.Status;
+import org.apache.nifi.cluster.protocol.ConnectionRequest;
+import org.apache.nifi.cluster.protocol.ConnectionResponse;
+import org.apache.nifi.cluster.protocol.Heartbeat;
+import org.apache.nifi.cluster.protocol.NodeIdentifier;
+import org.apache.nifi.controller.status.ProcessGroupStatus;
+import org.apache.nifi.diagnostics.SystemDiagnostics;
+import org.apache.nifi.reporting.BulletinRepository;
+
+import java.util.List;
+import java.util.Set;
+
+/**
+ * Defines the interface for a ClusterManager. The cluster manager is a
+ * threadsafe centralized manager for a cluster. Members of a cluster are nodes.
+ * A member becomes a node by issuing a connection request to the manager. The
+ * manager maintains the set of nodes. Nodes may be disconnected, reconnected,
+ * and deleted.
+ *
+ * Nodes are responsible for sending heartbeats to the manager to indicate their
+ * liveliness. A manager may disconnect a node if it does not receive a
+ * heartbeat within a configurable time period. A cluster manager instance may
+ * be configured with how often to monitor received heartbeats
+ * (getHeartbeatMonitoringIntervalSeconds()) and the maximum time that may
+ * elapse between node heartbeats before disconnecting the node
+ * (getMaxHeartbeatGapSeconds()).
+ *
+ * Since only a single node may execute isolated processors, the cluster manager
+ * maintains the notion of a primary node. The primary node is chosen at cluster
+ * startup and retains the role until a user requests a different node to be the
+ * primary node.
+ *
+ * @author unattributed
+ */
+public interface ClusterManager extends NodeInformant {
+
+    /**
+     * Handles a node's heartbeat.
+     *
+     * @param heartbeat a heartbeat
+     *
+     */
+    void handleHeartbeat(Heartbeat heartbeat);
+
+    /**
+     * @param statuses the statuses of the nodes
+     * @return the set of nodes
+     */
+    Set<Node> getNodes(Status... statuses);
+
+    /**
+     * @param nodeId
+     * @return returns the node with the given identifier or null if node does
+     * not exist
+     */
+    Node getNode(String nodeId);
+
+    /**
+     * @param statuses
+     * @return the set of node identifiers with the given node status
+     */
+    Set<NodeIdentifier> getNodeIds(Status... statuses);
+
+    /**
+     * Deletes the node with the given node identifier. If the given node is the
+     * primary node, then a subsequent request may be made to the manager to set
+     * a new primary node.
+     *
+     * @param nodeId the node identifier
+     * @param userDn the Distinguished Name of the user requesting the node be
+     * deleted from the cluster
+     *
+     * @throws UnknownNodeException if the node does not exist
+     * @throws IllegalNodeDeletionException if the node is not in a disconnected
+     * state
+     */
+    void deleteNode(String nodeId, String userDn) throws UnknownNodeException, IllegalNodeDeletionException;
+
+    /**
+     * Requests a connection to the cluster.
+     *
+     * @param request the request
+     *
+     * @return the response
+     */
+    ConnectionResponse requestConnection(ConnectionRequest request);
+
+    /**
+     * Services reconnection requests for a given node. If the node indicates
+     * reconnection failure, then the node will be set to disconnected.
+     * Otherwise, a reconnection request will be sent to the node, initiating
+     * the connection handshake.
+     *
+     * @param nodeId a node identifier
+     * @param userDn the Distinguished Name of the user requesting the
+     * reconnection
+     *
+     * @throws UnknownNodeException if the node does not exist
+     * @throws IllegalNodeReconnectionException if the node is not disconnected
+     */
+    void requestReconnection(String nodeId, String userDn) throws UnknownNodeException, IllegalNodeReconnectionException;
+
+    /**
+     * Requests the node with the given identifier be disconnected.
+     *
+     * @param nodeId the node identifier
+     * @param userDn the Distinguished Name of the user requesting the
+     * disconnection
+     *
+     * @throws UnknownNodeException if the node does not exist
+     * @throws IllegalNodeDisconnectionException if the node cannot be
+     * disconnected due to the cluster's state (e.g., node is last connected
+     * node or node is primary)
+     * @throws UnknownNodeException if the node does not exist
+     * @throws IllegalNodeDisconnectionException if the node is not disconnected
+     * @throws NodeDisconnectionException if the disconnection failed
+     */
+    void requestDisconnection(String nodeId, String userDn) throws UnknownNodeException, IllegalNodeDisconnectionException, NodeDisconnectionException;
+
+    /**
+     * @return the time in seconds to wait between successive executions of
+     * heartbeat monitoring
+     */
+    int getHeartbeatMonitoringIntervalSeconds();
+
+    /**
+     * @return the maximum time in seconds that is allowed between successive
+     * heartbeats of a node before disconnecting the node
+     */
+    int getMaxHeartbeatGapSeconds();
+
+    /**
+     * Returns a list of node events for the node with the given identifier. The
+     * events will be returned in order of most recent to least recent according
+     * to the creation date of the event.
+     *
+     * @param nodeId the node identifier
+     *
+     * @return the list of events or an empty list if no node exists with the
+     * given identifier
+     */
+    List<Event> getNodeEvents(final String nodeId);
+
+    /**
+     * Revokes the primary role from the current primary node and assigns the
+     * primary role to given given node ID.
+     *
+     * If role revocation fails, then the current primary node is set to
+     * disconnected while retaining the primary role and no role assignment is
+     * performed.
+     *
+     * If role assignment fails, then the given node is set to disconnected and
+     * is given the primary role.
+     *
+     * @param nodeId the node identifier
+     * @param userDn the Distinguished Name of the user requesting that the
+     * Primary Node be assigned
+     *
+     * @throws UnknownNodeException if the node with the given identifier does
+     * not exist
+     * @throws IneligiblePrimaryNodeException if the node with the given
+     * identifier is not eligible to be the primary node
+     * @throws PrimaryRoleAssignmentException if the cluster was unable to
+     * change the primary role to the requested node
+     */
+    void setPrimaryNode(String nodeId, String userDn) throws UnknownNodeException, IneligiblePrimaryNodeException, PrimaryRoleAssignmentException;
+
+    /**
+     * @return the primary node of the cluster or null if no primary node exists
+     */
+    Node getPrimaryNode();
+
+    /**
+     * Returns the bulletin repository.
+     *
+     * @return
+     */
+    BulletinRepository getBulletinRepository();
+
+    /**
+     * Returns a {@link ProcessGroupStatus} that represents the status of all
+     * nodes with the given {@link Status}es for the given ProcessGroup id, or
+     * null if no nodes exist with the given statuses
+     *
+     * @param groupId
+     * @return
+     */
+    ProcessGroupStatus getProcessGroupStatus(String groupId);
+
+    /**
+     * Returns a merged representation of the System Diagnostics for all nodes
+     * in the cluster
+     *
+     * @return
+     */
+    SystemDiagnostics getSystemDiagnostics();
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/manager/HttpClusterManager.java
----------------------------------------------------------------------
diff --git a/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/manager/HttpClusterManager.java b/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/manager/HttpClusterManager.java
new file mode 100644
index 0000000..2cf5812
--- /dev/null
+++ b/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/manager/HttpClusterManager.java
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.cluster.manager;
+
+import org.apache.nifi.cluster.manager.exception.DisconnectedNodeMutableRequestException;
+import org.apache.nifi.cluster.manager.exception.UriConstructionException;
+import org.apache.nifi.cluster.manager.exception.ConnectingNodeMutableRequestException;
+import org.apache.nifi.cluster.manager.exception.NoConnectedNodesException;
+import org.apache.nifi.cluster.manager.exception.NoResponseFromNodesException;
+import org.apache.nifi.cluster.manager.exception.SafeModeMutableRequestException;
+import org.apache.nifi.cluster.protocol.NodeIdentifier;
+
+import java.net.URI;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * Extends the ClusterManager interface to define how requests issued to the
+ * cluster manager are federated to the nodes. Specifically, the HTTP protocol
+ * is used for communicating requests to the cluster manager and to the nodes.
+ *
+ * @author unattributed
+ */
+public interface HttpClusterManager extends ClusterManager {
+
+    /**
+     * Federates the HTTP request to all connected nodes in the cluster. The
+     * given URI's host and port will not be used and instead will be adjusted
+     * for each node's host and port. The node URIs are guaranteed to be
+     * constructed before issuing any requests, so if a UriConstructionException
+     * is thrown, then it is guaranteed that no request was issued.
+     *
+     * @param method the HTTP method (e.g., GET, POST, PUT, DELETE, HEAD)
+     * @param uri the base request URI (up to, but not including, the query
+     * string)
+     * @param parameters the request parameters
+     * @param headers the request headers
+     *
+     * @return the client response
+     *
+     * @throws NoConnectedNodesException if no nodes are connected as results of
+     * the request
+     * @throws NoResponseFromNodesException if no response could be obtained
+     * @throws UriConstructionException if there was an issue constructing the
+     * URIs tailored for each individual node
+     * @throws ConnectingNodeMutableRequestException if the request was a PUT,
+     * POST, DELETE and a node is connecting to the cluster
+     * @throws DisconnectedNodeMutableRequestException if the request was a PUT,
+     * POST, DELETE and a node is disconnected from the cluster
+     * @throws SafeModeMutableRequestException if the request was a PUT, POST,
+     * DELETE and a the cluster is in safe mode
+     */
+    NodeResponse applyRequest(String method, URI uri, Map<String, List<String>> parameters, Map<String, String> headers)
+            throws NoConnectedNodesException, NoResponseFromNodesException, UriConstructionException, ConnectingNodeMutableRequestException,
+            DisconnectedNodeMutableRequestException, SafeModeMutableRequestException;
+
+    /**
+     * Federates the HTTP request to the nodes specified. The given URI's host
+     * and port will not be used and instead will be adjusted for each node's
+     * host and port. The node URIs are guaranteed to be constructed before
+     * issuing any requests, so if a UriConstructionException is thrown, then it
+     * is guaranteed that no request was issued.
+     *
+     * @param method the HTTP method (e.g., GET, POST, PUT, DELETE, HEAD)
+     * @param uri the base request URI (up to, but not including, the query
+     * string)
+     * @param parameters the request parameters
+     * @param headers the request headers
+     * @param nodeIdentifiers the NodeIdentifier for each node that the request
+     * should be replaced to
+     *
+     * @return the client response
+     *
+     * @throws NoConnectedNodesException if no nodes are connected as results of
+     * the request
+     * @throws NoResponseFromNodesException if no response could be obtained
+     * @throws UriConstructionException if there was an issue constructing the
+     * URIs tailored for each individual node
+     * @throws ConnectingNodeMutableRequestException if the request was a PUT,
+     * POST, DELETE and a node is connecting to the cluster
+     * @throws DisconnectedNodeMutableRequestException if the request was a PUT,
+     * POST, DELETE and a node is disconnected from the cluster
+     * @throws SafeModeMutableRequestException if the request was a PUT, POST,
+     * DELETE and a the cluster is in safe mode
+     */
+    NodeResponse applyRequest(String method, URI uri, Map<String, List<String>> parameters, Map<String, String> headers,
+            Set<NodeIdentifier> nodeIdentifiers)
+            throws NoConnectedNodesException, NoResponseFromNodesException, UriConstructionException, ConnectingNodeMutableRequestException,
+            DisconnectedNodeMutableRequestException, SafeModeMutableRequestException;
+
+    /**
+     * Federates the HTTP request to all connected nodes in the cluster. The
+     * given URI's host and port will not be used and instead will be adjusted
+     * for each node's host and port. The node URIs are guaranteed to be
+     * constructed before issuing any requests, so if a UriConstructionException
+     * is thrown, then it is guaranteed that no request was issued.
+     *
+     * @param method the HTTP method (e.g., GET, POST, PUT, DELETE, HEAD)
+     * @param uri the base request URI (up to, but not including, the query
+     * string)
+     * @param entity the HTTP request entity
+     * @param headers the request headers
+     *
+     * @return the client response
+     *
+     * @throws NoConnectedNodesException if no nodes are connected as results of
+     * the request
+     * @throws NoResponseFromNodesException if no response could be obtained
+     * @throws UriConstructionException if there was an issue constructing the
+     * URIs tailored for each individual node
+     * @throws ConnectingNodeMutableRequestException if the request was a PUT,
+     * POST, DELETE and a node is connecting to the cluster
+     * @throws DisconnectedNodeMutableRequestException if the request was a PUT,
+     * POST, DELETE and a node is disconnected from the cluster
+     * @throws SafeModeMutableRequestException if the request was a PUT, POST,
+     * DELETE and a the cluster is in safe mode
+     */
+    NodeResponse applyRequest(String method, URI uri, Object entity, Map<String, String> headers)
+            throws NoConnectedNodesException, NoResponseFromNodesException, UriConstructionException, ConnectingNodeMutableRequestException,
+            DisconnectedNodeMutableRequestException, SafeModeMutableRequestException;
+
+    /**
+     * Federates the HTTP request to the nodes specified. The given URI's host
+     * and port will not be used and instead will be adjusted for each node's
+     * host and port. The node URIs are guaranteed to be constructed before
+     * issuing any requests, so if a UriConstructionException is thrown, then it
+     * is guaranteed that no request was issued.
+     *
+     * @param method the HTTP method (e.g., GET, POST, PUT, DELETE, HEAD)
+     * @param uri the base request URI (up to, but not including, the query
+     * string)
+     * @param entity the HTTP request entity
+     * @param headers the request headers
+     * @param nodeIdentifiers the NodeIdentifier for each node that the request
+     * should be replaced to
+     *
+     * @return the client response
+     *
+     * @throws NoConnectedNodesException if no nodes are connected as results of
+     * the request
+     * @throws NoResponseFromNodesException if no response could be obtained
+     * @throws UriConstructionException if there was an issue constructing the
+     * URIs tailored for each individual node
+     * @throws ConnectingNodeMutableRequestException if the request was a PUT,
+     * POST, DELETE and a node is connecting to the cluster
+     * @throws DisconnectedNodeMutableRequestException if the request was a PUT,
+     * POST, DELETE and a node is disconnected from the cluster
+     * @throws SafeModeMutableRequestException if the request was a PUT, POST,
+     * DELETE and a the cluster is in safe mode
+     */
+    NodeResponse applyRequest(String method, URI uri, Object entity, Map<String, String> headers, Set<NodeIdentifier> nodeIdentifiers)
+            throws NoConnectedNodesException, NoResponseFromNodesException, UriConstructionException, ConnectingNodeMutableRequestException,
+            DisconnectedNodeMutableRequestException, SafeModeMutableRequestException;
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/manager/HttpRequestReplicator.java
----------------------------------------------------------------------
diff --git a/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/manager/HttpRequestReplicator.java b/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/manager/HttpRequestReplicator.java
new file mode 100644
index 0000000..fb57622
--- /dev/null
+++ b/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/manager/HttpRequestReplicator.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.cluster.manager;
+
+import org.apache.nifi.cluster.manager.exception.UriConstructionException;
+import java.net.URI;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import org.apache.nifi.cluster.protocol.NodeIdentifier;
+
+/**
+ * A service for managing the replication of requests to nodes. It is up to the
+ * implementing class to decide if requests are sent concurrently or serially.
+ *
+ * Clients must call start() and stop() to initialize and shutdown the instance.
+ * The instance must be started before issuing any replication requests.
+ *
+ * @author unattributed
+ */
+public interface HttpRequestReplicator {
+
+    /**
+     * Starts the instance for replicating requests. Start may only be called if
+     * the instance is not running.
+     */
+    void start();
+
+    /**
+     * Stops the instance from replicating requests. Stop may only be called if
+     * the instance is running.
+     */
+    void stop();
+
+    /**
+     * @return true if the instance is started; false otherwise.
+     */
+    boolean isRunning();
+
+    /**
+     * Requests are sent to each node in the cluster. If the request results in
+     * an exception, then the NodeResourceResponse will contain the exception.
+     *
+     * HTTP DELETE and OPTIONS methods must supply an empty parameters map or
+     * else and IllegalArgumentException is thrown.
+     *
+     * @param nodeIds the node identifiers
+     * @param method the HTTP method (e.g., GET, POST, PUT, DELETE, HEAD,
+     * OPTIONS)
+     * @param uri the base request URI (up to, but not including, the query
+     * string)
+     * @param parameters any request parameters
+     * @param headers any HTTP headers
+     *
+     * @return the set of node responses
+     *
+     * @throws UriConstructionException if a request for a node failed to be
+     * constructed from the given prototype URI. If thrown, it is guaranteed
+     * that no request was sent.
+     */
+    Set<NodeResponse> replicate(Set<NodeIdentifier> nodeIds, String method, URI uri, Map<String, List<String>> parameters, Map<String, String> headers) throws UriConstructionException;
+
+    /**
+     * Requests are sent to each node in the cluster. If the request results in
+     * an exception, then the NodeResourceResponse will contain the exception.
+     *
+     * HTTP DELETE, GET, HEAD, and OPTIONS methods will throw an
+     * IllegalArgumentException if used.
+     *
+     * @param nodeIds the node identifiers
+     * @param method the HTTP method (e.g., POST, PUT)
+     * @param uri the base request URI (up to, but not including, the query
+     * string)
+     * @param entity an entity
+     * @param headers any HTTP headers
+     *
+     * @return the set of node responses
+     *
+     * @throws UriConstructionException if a request for a node failed to be
+     * constructed from the given prototype URI. If thrown, it is guaranteed
+     * that no request was sent.
+     */
+    Set<NodeResponse> replicate(Set<NodeIdentifier> nodeIds, String method, URI uri, Object entity, Map<String, String> headers) throws UriConstructionException;
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/manager/HttpResponseMapper.java
----------------------------------------------------------------------
diff --git a/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/manager/HttpResponseMapper.java b/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/manager/HttpResponseMapper.java
new file mode 100644
index 0000000..843a666
--- /dev/null
+++ b/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/manager/HttpResponseMapper.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.cluster.manager;
+
+import java.net.URI;
+import java.util.Map;
+import java.util.Set;
+import org.apache.nifi.cluster.node.Node.Status;
+
+/**
+ * Maps a HTTP response to a node status.
+ *
+ * @author unattributed
+ */
+public interface HttpResponseMapper {
+
+    /**
+     * Maps a HTTP response to a node response and the corresponding node
+     * status.
+     *
+     * @param requestURI the original request URI
+     * @param nodeResponses a set of node resource responses
+     *
+     * @return a map associating the node response to the node status
+     */
+    Map<NodeResponse, Status> map(URI requestURI, Set<NodeResponse> nodeResponses);
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/manager/NodeResponse.java
----------------------------------------------------------------------
diff --git a/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/manager/NodeResponse.java b/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/manager/NodeResponse.java
new file mode 100644
index 0000000..3f966e5
--- /dev/null
+++ b/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/manager/NodeResponse.java
@@ -0,0 +1,329 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.cluster.manager;
+
+import com.sun.jersey.api.client.ClientResponse;
+import java.io.BufferedInputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.net.URI;
+import java.util.List;
+import java.util.Objects;
+import java.util.concurrent.TimeUnit;
+
+import javax.ws.rs.HttpMethod;
+import javax.ws.rs.WebApplicationException;
+import javax.ws.rs.core.Response;
+import javax.ws.rs.core.Response.ResponseBuilder;
+import javax.ws.rs.core.Response.Status;
+import javax.ws.rs.core.StreamingOutput;
+import org.apache.nifi.cluster.protocol.NodeIdentifier;
+import org.apache.nifi.web.api.entity.Entity;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Encapsulates a node's response in regards to receiving a external API
+ * request.
+ *
+ * Both the ClientResponse and (server) Response may be obtained from this
+ * instance. The ClientResponse is stored as it is received from the node. This
+ * includes the entity input stream. The Response is constructed on demand when
+ * mapping a ClientResponse to the Response. The ClientResponse to Response
+ * mapping includes copying the ClientResponse's input stream to the Response.
+ * Therefore, the getResponse() method should not be called more than once.
+ * Furthermore, the method should not be called if the caller has already read
+ * the ClientResponse's input stream.
+ *
+ * If a ClientResponse was unable to be created, then a NodeResponse will store
+ * the Throwable, which may be obtained by calling getThrowable().
+ *
+ * This class overrides hashCode and equals and considers two instances to be
+ * equal if they have the equal NodeIdentifiers.
+ *
+ * @author unattributed
+ */
+public class NodeResponse {
+
+    private static final Logger logger = LoggerFactory.getLogger(NodeResponse.class);
+    private final String httpMethod;
+    private final URI requestUri;
+    private final ClientResponse clientResponse;
+    private final NodeIdentifier nodeId;
+    private final Throwable throwable;
+    private boolean hasCreatedResponse = false;
+    private final Entity updatedEntity;
+    private final long requestDurationNanos;
+    private final String requestId;
+
+    public NodeResponse(final NodeIdentifier nodeId, final String httpMethod, final URI requestUri, final ClientResponse clientResponse, final long requestDurationNanos, final String requestId) {
+        if (nodeId == null) {
+            throw new IllegalArgumentException("Node identifier may not be null.");
+        } else if (StringUtils.isBlank(httpMethod)) {
+            throw new IllegalArgumentException("Http method may not be null or empty.");
+        } else if (requestUri == null) {
+            throw new IllegalArgumentException("Request URI may not be null.");
+        } else if (clientResponse == null) {
+            throw new IllegalArgumentException("ClientResponse may not be null.");
+        }
+        this.nodeId = nodeId;
+        this.httpMethod = httpMethod;
+        this.requestUri = requestUri;
+        this.clientResponse = clientResponse;
+        this.throwable = null;
+        this.updatedEntity = null;
+        this.requestDurationNanos = requestDurationNanos;
+        this.requestId = requestId;
+    }
+
+    public NodeResponse(final NodeIdentifier nodeId, final String httpMethod, final URI requestUri, final Throwable throwable) {
+        if (nodeId == null) {
+            throw new IllegalArgumentException("Node identifier may not be null.");
+        } else if (StringUtils.isBlank(httpMethod)) {
+            throw new IllegalArgumentException("Http method may not be null or empty.");
+        } else if (requestUri == null) {
+            throw new IllegalArgumentException("Request URI may not be null.");
+        } else if (throwable == null) {
+            throw new IllegalArgumentException("Throwable may not be null.");
+        }
+        this.nodeId = nodeId;
+        this.httpMethod = httpMethod;
+        this.requestUri = requestUri;
+        this.clientResponse = null;
+        this.throwable = throwable;
+        this.updatedEntity = null;
+        this.requestDurationNanos = -1L;
+        this.requestId = null;
+    }
+
+    public NodeResponse(final NodeResponse example, final Entity updatedEntity) {
+        Objects.requireNonNull(example, "NodeResponse cannot be null");
+        Objects.requireNonNull(updatedEntity, "UpdatedEntity cannot be null");
+
+        this.nodeId = example.nodeId;
+        this.httpMethod = example.httpMethod;
+        this.requestUri = example.requestUri;
+        this.clientResponse = example.clientResponse;
+        this.throwable = example.throwable;
+        this.updatedEntity = updatedEntity;
+        this.requestDurationNanos = example.requestDurationNanos;
+        this.requestId = null;
+    }
+
+    public NodeIdentifier getNodeId() {
+        return nodeId;
+    }
+
+    public String getHttpMethod() {
+        return httpMethod;
+    }
+
+    public URI getRequestUri() {
+        return requestUri;
+    }
+
+    /**
+     * @return the HTTP response status code
+     */
+    public int getStatus() {
+        if (hasThrowable()) {
+            /*
+             * since there is a throwable, there is no client input stream to 
+             * worry about maintaining, so we can call getResponse() method
+             */
+            return getResponse().getStatus();
+        } else {
+            /*
+             * use client response's status instead of calling getResponse().getStatus()
+             * so that we don't read the client's input stream as part of creating 
+             * the response in the getResponse() method
+             */
+            return clientResponse.getStatus();
+        }
+    }
+
+    /**
+     * Returns true if the response status is 2xx, false otherwise.
+     *
+     * @return
+     */
+    public boolean is2xx() {
+        final int statusCode = getStatus();
+        return (200 <= statusCode && statusCode <= 299);
+    }
+
+    /**
+     * Returns true if the response status is 5xx, false otherwise.
+     *
+     * @return
+     */
+    public boolean is5xx() {
+        final int statusCode = getStatus();
+        return (500 <= statusCode && statusCode <= 599);
+    }
+
+    /**
+     * Returns null if hasThrowable() is true; otherwise the client's response
+     * is returned.
+     *
+     * The ClientResponse's input stream can only be read once.
+     *
+     * @return the client's response
+     */
+    public ClientResponse getClientResponse() {
+        return clientResponse;
+    }
+
+    /**
+     * Creates a Response by mapping the ClientResponse values to it. Since the
+     * ClientResponse's input stream can only be read once, this method should
+     * only be called once. Furthermore, the caller should not have already read
+     * the ClientResponse's input stream.
+     *
+     * @return the response
+     */
+    public Response getResponse() {
+        // if the response encapsulates a throwable, then the input stream is never read and the below warning is irrelevant
+        if (hasCreatedResponse && !hasThrowable()) {
+            logger.warn("ClientResponse's input stream has already been read.  The created response will not contain this data.");
+        }
+        hasCreatedResponse = true;
+        return createResponse();
+    }
+
+    /**
+     * Returns the throwable or null if no throwable exists.
+     *
+     * @return the throwable or null if no throwable exists
+     */
+    public Throwable getThrowable() {
+        return throwable;
+    }
+
+    /**
+     * Returns true if a throwable was thrown and a response was not able to be
+     * created; false otherwise.
+     *
+     * @return true if a throwable was thrown and a response was not able to be
+     * created; false otherwise
+     */
+    public boolean hasThrowable() {
+        return getThrowable() != null;
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+        if (obj == null) {
+            return false;
+        }
+        if (getClass() != obj.getClass()) {
+            return false;
+        }
+        final NodeResponse other = (NodeResponse) obj;
+        if (this.nodeId != other.nodeId && (this.nodeId == null || !this.nodeId.equals(other.nodeId))) {
+            return false;
+        }
+        return true;
+    }
+
+    @Override
+    public int hashCode() {
+        int hash = 7;
+        hash = 13 * hash + (this.nodeId != null ? this.nodeId.hashCode() : 0);
+        return hash;
+    }
+
+    public long getRequestDuration(final TimeUnit timeUnit) {
+        return timeUnit.convert(requestDurationNanos, TimeUnit.NANOSECONDS);
+    }
+
+    public String getRequestId() {
+        return requestId;
+    }
+
+    private Response createResponse() {
+
+        // if no client response was created, then generate a 500 response
+        if (hasThrowable()) {
+            return Response.status(Status.INTERNAL_SERVER_ERROR).build();
+        }
+
+        // set the status
+        final ResponseBuilder responseBuilder = Response.status(clientResponse.getStatus());
+
+        // set the headers
+        for (final String key : clientResponse.getHeaders().keySet()) {
+            final List<String> values = clientResponse.getHeaders().get(key);
+            for (final String value : values) {
+
+                if (key.equalsIgnoreCase("transfer-encoding") || key.equalsIgnoreCase("content-length")) {
+                    /*
+                     * do not copy the transfer-encoding header (i.e., chunked encoding) or
+                     * the content-length. Let the outgoing response builder determine it.
+                     */
+                    continue;
+                } else if (key.equals("X-ClusterContext")) {
+                    /*
+                     * do not copy the cluster context to the response because
+                     * this information is private and should not be sent to
+                     * the client
+                     */
+                    continue;
+                }
+                responseBuilder.header(key, value);
+            }
+        }
+
+        // head requests must not have a message-body in the response
+        if (!HttpMethod.HEAD.equalsIgnoreCase(httpMethod)) {
+
+            // set the entity
+            if (updatedEntity == null) {
+                responseBuilder.entity(new StreamingOutput() {
+                    @Override
+                    public void write(final OutputStream output) throws IOException, WebApplicationException {
+                        BufferedInputStream bis = null;
+                        try {
+                            bis = new BufferedInputStream(clientResponse.getEntityInputStream());
+                            IOUtils.copy(bis, output);
+                        } finally {
+                            IOUtils.closeQuietly(bis);
+                        }
+                    }
+                });
+            } else {
+                responseBuilder.entity(updatedEntity);
+            }
+        }
+
+        return responseBuilder.build();
+    }
+
+    @Override
+    public String toString() {
+        final StringBuilder sb = new StringBuilder();
+        sb.append("NodeResponse[nodeUri=").append(nodeId.getApiAddress()).append(":").append(nodeId.getApiPort()).append(",")
+                .append("method=").append(httpMethod)
+                .append(",URI=").append(requestUri)
+                .append(",ResponseCode=").append(getStatus())
+                .append(",Duration=").append(TimeUnit.MILLISECONDS.convert(requestDurationNanos, TimeUnit.NANOSECONDS)).append(" ms]");
+        return sb.toString();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/manager/exception/BlockedByFirewallException.java
----------------------------------------------------------------------
diff --git a/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/manager/exception/BlockedByFirewallException.java b/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/manager/exception/BlockedByFirewallException.java
new file mode 100644
index 0000000..49bcd35
--- /dev/null
+++ b/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/manager/exception/BlockedByFirewallException.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.cluster.manager.exception;
+
+import org.apache.nifi.cluster.protocol.NodeIdentifier;
+
+/**
+ *
+ */
+public class BlockedByFirewallException extends ClusterException {
+
+    private final NodeIdentifier nodeId;
+    private final boolean isExistingNode;
+
+    public BlockedByFirewallException(NodeIdentifier nodeId, boolean isExistingNode, String msg, Throwable cause) {
+        super(msg, cause);
+        this.nodeId = nodeId;
+        this.isExistingNode = isExistingNode;
+    }
+
+    public BlockedByFirewallException(NodeIdentifier nodeId, boolean isExistingNode, Throwable cause) {
+        super(cause);
+        this.nodeId = nodeId;
+        this.isExistingNode = isExistingNode;
+    }
+
+    public BlockedByFirewallException(NodeIdentifier nodeId, boolean isExistingNode, String msg) {
+        super(msg);
+        this.nodeId = nodeId;
+        this.isExistingNode = isExistingNode;
+    }
+
+    public BlockedByFirewallException(NodeIdentifier nodeId, boolean isExistingNode) {
+        this.nodeId = nodeId;
+        this.isExistingNode = isExistingNode;
+    }
+
+    public NodeIdentifier getNodeId() {
+        return nodeId;
+    }
+
+    public boolean isExistingNode() {
+        return isExistingNode;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/manager/exception/ClusterException.java
----------------------------------------------------------------------
diff --git a/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/manager/exception/ClusterException.java b/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/manager/exception/ClusterException.java
new file mode 100644
index 0000000..3bf9752
--- /dev/null
+++ b/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/manager/exception/ClusterException.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.cluster.manager.exception;
+
+/**
+ * The base exception class for cluster related exceptions.
+ *
+ * @author unattributed
+ */
+public class ClusterException extends RuntimeException {
+
+    public ClusterException() {
+    }
+
+    public ClusterException(String msg) {
+        super(msg);
+    }
+
+    public ClusterException(Throwable cause) {
+        super(cause);
+    }
+
+    public ClusterException(String msg, Throwable cause) {
+        super(msg, cause);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/manager/exception/ConnectingNodeMutableRequestException.java
----------------------------------------------------------------------
diff --git a/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/manager/exception/ConnectingNodeMutableRequestException.java b/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/manager/exception/ConnectingNodeMutableRequestException.java
new file mode 100644
index 0000000..365b5f0
--- /dev/null
+++ b/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/manager/exception/ConnectingNodeMutableRequestException.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.cluster.manager.exception;
+
+/**
+ * Represents the exceptional case when a HTTP request that may change a node's
+ * dataflow is to be replicated while a node is connecting to the cluster.
+ *
+ * @author unattributed
+ */
+public class ConnectingNodeMutableRequestException extends MutableRequestException {
+
+    public ConnectingNodeMutableRequestException() {
+    }
+
+    public ConnectingNodeMutableRequestException(String msg) {
+        super(msg);
+    }
+
+    public ConnectingNodeMutableRequestException(Throwable cause) {
+        super(cause);
+    }
+
+    public ConnectingNodeMutableRequestException(String msg, Throwable cause) {
+        super(msg, cause);
+    }
+}