You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by jo...@apache.org on 2014/12/08 21:29:37 UTC
[14/51] [partial] incubator-nifi git commit: Initial code contribution
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/flow/impl/DataFlowDaoImpl.java
----------------------------------------------------------------------
diff --git a/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/flow/impl/DataFlowDaoImpl.java b/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/flow/impl/DataFlowDaoImpl.java
new file mode 100644
index 0000000..702f081
--- /dev/null
+++ b/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/flow/impl/DataFlowDaoImpl.java
@@ -0,0 +1,551 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.cluster.flow.impl;
+
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.FilenameFilter;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.UUID;
+
+import javax.xml.bind.JAXBContext;
+import javax.xml.bind.JAXBException;
+import javax.xml.bind.Marshaller;
+import javax.xml.bind.Unmarshaller;
+import javax.xml.bind.annotation.XmlRootElement;
+import javax.xml.bind.annotation.adapters.XmlJavaTypeAdapter;
+import javax.xml.parsers.DocumentBuilder;
+import javax.xml.parsers.DocumentBuilderFactory;
+import javax.xml.transform.OutputKeys;
+import javax.xml.transform.Transformer;
+import javax.xml.transform.TransformerFactory;
+import javax.xml.transform.dom.DOMSource;
+import javax.xml.transform.stream.StreamResult;
+
+import org.apache.nifi.cluster.flow.ClusterDataFlow;
+import org.apache.nifi.cluster.flow.DaoException;
+import org.apache.nifi.cluster.flow.DataFlowDao;
+import org.apache.nifi.cluster.flow.PersistedFlowState;
+import org.apache.nifi.cluster.protocol.DataFlow;
+import org.apache.nifi.cluster.protocol.NodeIdentifier;
+import org.apache.nifi.cluster.protocol.StandardDataFlow;
+import org.apache.nifi.cluster.protocol.jaxb.message.NodeIdentifierAdapter;
+import org.apache.nifi.file.FileUtils;
+import org.apache.nifi.io.BufferedInputStream;
+import org.apache.nifi.io.BufferedOutputStream;
+import org.apache.nifi.io.ByteArrayInputStream;
+import org.apache.nifi.io.StreamUtils;
+import org.apache.nifi.logging.NiFiLog;
+
+import org.apache.commons.compress.archivers.tar.TarArchiveEntry;
+import org.apache.commons.compress.archivers.tar.TarArchiveInputStream;
+import org.apache.commons.compress.archivers.tar.TarArchiveOutputStream;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.w3c.dom.Document;
+import org.w3c.dom.Element;
+
+/**
+ * Implements the FlowDao interface. The implementation tracks the state of the
+ * dataflow by annotating the filename of the flow state file. Specifically, the
+ * implementation correlates PersistedFlowState states to filename extensions.
+ * The correlation is as follows:
+ * <ul>
+ * <li> CURRENT maps to flow.xml </li>
+ * <li> STALE maps to flow.xml.stale </li>
+ * <li> UNKNOWN maps to flow.xml.unknown </li>
+ * </ul>
+ * Whenever the flow state changes, the flow state file's name is updated to
+ * denote its state.
+ *
+ * The implementation also provides for a restore directory that may be
+ * configured for higher availability. At instance creation, if the primary or
+ * restore directories have multiple flow state files, an exception is thrown.
+ * If the primary directory has a current flow state file, but the restore
+ * directory does not, then the primary flow state file is copied to the restore
+ * directory. If the restore directory has a current flow state file, but the
+ * primary directory does not, then the restore flow state file is copied to the
+ * primary directory. If both the primary and restore directories have a current
+ * flow state file and the files are different, then an exception is thrown.
+ *
+ * When the flow state file is saved, it is always saved first to the restore
+ * directory followed by a save to the primary directory. When the flow state
+ * file is loaded, a check is made to verify that the primary and restore flow
+ * state files are both current. If either is not current, then an exception is
+ * thrown. The primary flow state file is always read when the load method is
+ * called.
+ *
+ * @author unattributed
+ */
+public class DataFlowDaoImpl implements DataFlowDao {
+
+ private final File primaryDirectory;
+ private final File restoreDirectory;
+ private final boolean autoStart;
+ private final String generatedRootGroupId = UUID.randomUUID().toString();
+
+ public static final String STALE_EXT = ".stale";
+ public static final String UNKNOWN_EXT = ".unknown";
+ public static final String FLOW_PACKAGE = "flow.tar";
+ public static final String FLOW_XML_FILENAME = "flow.xml";
+ public static final String TEMPLATES_FILENAME = "templates.xml";
+ public static final String SNIPPETS_FILENAME = "snippets.xml";
+ public static final String CLUSTER_INFO_FILENAME = "cluster-info.xml";
+
+ private static final Logger logger = new NiFiLog(LoggerFactory.getLogger(DataFlowDaoImpl.class));
+
+ public DataFlowDaoImpl(final File primaryDirectory) throws DaoException {
+ this(primaryDirectory, null, false);
+ }
+
+ public DataFlowDaoImpl(final File primaryDirectory, final File restoreDirectory, final boolean autoStart) throws DaoException {
+
+ // sanity check that primary directory is a directory, creating it if necessary
+ if (primaryDirectory == null) {
+ throw new IllegalArgumentException("Primary directory may not be null.");
+ } else if (!primaryDirectory.exists()) {
+ if (!primaryDirectory.mkdir()) {
+ throw new DaoException(String.format("Failed to create primary directory '%s'", primaryDirectory.getAbsolutePath()));
+ }
+ } else if (!primaryDirectory.isDirectory()) {
+ throw new IllegalArgumentException("Primary directory must be a directory.");
+ }
+
+ this.autoStart = autoStart;
+
+ try {
+ this.primaryDirectory = primaryDirectory;
+ this.restoreDirectory = restoreDirectory;
+
+ if (restoreDirectory == null) {
+ // check that we have exactly one current flow state file
+ ensureSingleCurrentStateFile(primaryDirectory);
+ } else {
+
+ // check that restore directory is a directory, creating it if necessary
+ FileUtils.ensureDirectoryExistAndCanAccess(restoreDirectory);
+
+ // check that restore directory is not the same as the primary directory
+ if (primaryDirectory.getAbsolutePath().equals(restoreDirectory.getAbsolutePath())) {
+ throw new IllegalArgumentException(String.format("Primary directory '%s' is the same as restore directory '%s' ",
+ primaryDirectory.getAbsolutePath(), restoreDirectory.getAbsolutePath()));
+ }
+
+ final File[] primaryFlowStateFiles = getFlowStateFiles(primaryDirectory);
+ final File[] restoreFlowStateFiles = getFlowStateFiles(restoreDirectory);
+
+ // if more than one state file in either primary or restore, then throw exception
+ if (primaryFlowStateFiles.length > 1) {
+ throw new IllegalStateException(String.format("Found multiple dataflow state files in primary directory '%s'", primaryDirectory));
+ } else if (restoreFlowStateFiles.length > 1) {
+ throw new IllegalStateException(String.format("Found multiple dataflow state files in restore directory '%s'", restoreDirectory));
+ }
+
+ // check that the single primary state file we found is current or create a new one
+ final File primaryFlowStateFile = ensureSingleCurrentStateFile(primaryDirectory);
+
+ // check that the single restore state file we found is current or create a new one
+ final File restoreFlowStateFile = ensureSingleCurrentStateFile(restoreDirectory);
+
+ // if there was a difference in flow state file directories, then copy the appropriate files
+ if (restoreFlowStateFiles.length == 0 && primaryFlowStateFiles.length != 0) {
+ // copy primary state file to restore
+ FileUtils.copyFile(primaryFlowStateFile, restoreFlowStateFile, false, false, logger);
+ } else if (primaryFlowStateFiles.length == 0 && restoreFlowStateFiles.length != 0) {
+ // copy restore state file to primary
+ FileUtils.copyFile(restoreFlowStateFile, primaryFlowStateFile, false, false, logger);
+ } else {
+ // sync the primary copy with the restore copy
+ FileUtils.syncWithRestore(primaryFlowStateFile, restoreFlowStateFile, logger);
+ }
+
+ }
+ } catch (final IOException | IllegalArgumentException | IllegalStateException | JAXBException ex) {
+ throw new DaoException(ex);
+ }
+
+ }
+
+ @Override
+ public ClusterDataFlow loadDataFlow() throws DaoException {
+ try {
+ return parseDataFlow(getExistingFlowStateFile(primaryDirectory));
+ } catch (final IOException | JAXBException ex) {
+ throw new DaoException(ex);
+ }
+ }
+
+ @Override
+ public void saveDataFlow(final ClusterDataFlow dataFlow) throws DaoException {
+ try {
+
+ final File primaryStateFile = getFlowStateFile(primaryDirectory);
+
+ // write to restore before writing to primary in case primary experiences problems
+ if (restoreDirectory != null) {
+ final File restoreStateFile = getFlowStateFile(restoreDirectory);
+ if (restoreStateFile == null) {
+ if (primaryStateFile == null) {
+ writeDataFlow(createNewFlowStateFile(restoreDirectory), dataFlow);
+ } else {
+ throw new DaoException(String.format("Unable to save dataflow because dataflow state file in primary directory '%s' exists, but it does not exist in the restore directory '%s'",
+ primaryDirectory.getAbsolutePath(), restoreDirectory.getAbsolutePath()));
+ }
+ } else {
+ if (primaryStateFile == null) {
+ throw new DaoException(String.format("Unable to save dataflow because dataflow state file in restore directory '%s' exists, but it does not exist in the primary directory '%s'",
+ restoreDirectory.getAbsolutePath(), primaryDirectory.getAbsolutePath()));
+ } else {
+ final PersistedFlowState primaryFlowState = getPersistedFlowState(primaryStateFile);
+ final PersistedFlowState restoreFlowState = getPersistedFlowState(restoreStateFile);
+ if (primaryFlowState == restoreFlowState) {
+ writeDataFlow(restoreStateFile, dataFlow);
+ } else {
+ throw new DaoException(String.format("Unable to save dataflow because state file in primary directory '%s' has state '%s', but the state file in the restore directory '%s' has state '%s'",
+ primaryDirectory.getAbsolutePath(), primaryFlowState, restoreDirectory.getAbsolutePath(), restoreFlowState));
+ }
+ }
+ }
+ }
+
+ // write dataflow to primary
+ if (primaryStateFile == null) {
+ writeDataFlow(createNewFlowStateFile(primaryDirectory), dataFlow);
+ } else {
+ writeDataFlow(primaryStateFile, dataFlow);
+ }
+
+ } catch (final IOException | JAXBException ex) {
+ throw new DaoException(ex);
+ }
+ }
+
+ @Override
+ public PersistedFlowState getPersistedFlowState() {
+ // trust restore over primary if configured for restore
+ if (restoreDirectory == null) {
+ return getPersistedFlowState(getExistingFlowStateFile(primaryDirectory));
+ } else {
+ return getPersistedFlowState(getExistingFlowStateFile(restoreDirectory));
+ }
+ }
+
+ @Override
+ public void setPersistedFlowState(final PersistedFlowState flowState) throws DaoException {
+ // rename restore before primary if configured for restore
+ if (restoreDirectory != null) {
+ renameFlowStateFile(getExistingFlowStateFile(restoreDirectory), flowState);
+ }
+ renameFlowStateFile(getExistingFlowStateFile(primaryDirectory), flowState);
+ }
+
+ private File ensureSingleCurrentStateFile(final File dir) throws IOException, JAXBException {
+
+ // ensure that we have at most one state file and if we have one, it is current
+ final File[] directoryFlowStateFiles = getFlowStateFiles(dir);
+ if (directoryFlowStateFiles.length > 1) {
+ throw new DaoException(String.format("Found multiple dataflow state files in directory '%s'", dir));
+ } else if (directoryFlowStateFiles.length == 0) {
+ // create a new file if none exist
+ return createNewFlowStateFile(dir);
+ } else {
+ // check that the single flow state file is current
+ final PersistedFlowState flowState = getPersistedFlowState(directoryFlowStateFiles[0]);
+ if (PersistedFlowState.CURRENT == flowState) {
+ return directoryFlowStateFiles[0];
+ } else {
+ throw new DaoException(String.format("Dataflow state file '%s' must be current.", directoryFlowStateFiles[0].getAbsolutePath()));
+ }
+ }
+
+ }
+
+ private PersistedFlowState getPersistedFlowState(final File file) {
+ final String path = file.getAbsolutePath();
+ if (path.endsWith(STALE_EXT)) {
+ return PersistedFlowState.STALE;
+ } else if (path.endsWith(UNKNOWN_EXT)) {
+ return PersistedFlowState.UNKNOWN;
+ } else {
+ return PersistedFlowState.CURRENT;
+ }
+ }
+
+ private File getFlowStateFile(final File dir) {
+ final File[] files = getFlowStateFiles(dir);
+ if (files.length > 1) {
+ throw new IllegalStateException(String.format("Expected at most one dataflow state file, but found %s files.", files.length));
+ } else if (files.length == 0) {
+ return null;
+ } else {
+ return files[0];
+ }
+ }
+
+ private File getExistingFlowStateFile(final File dir) {
+ final File file = getFlowStateFile(dir);
+ if (file == null) {
+ throw new IllegalStateException(String.format("Expected a dataflow state file, but none existed in directory '%s'", dir.getAbsolutePath()));
+ }
+ return file;
+ }
+
+ private File[] getFlowStateFiles(final File dir) {
+ final File[] files = dir.listFiles(new FilenameFilter() {
+ @Override
+ public boolean accept(File dir, String name) {
+ return (name.equals(FLOW_PACKAGE) || name.endsWith(STALE_EXT) || name.endsWith(UNKNOWN_EXT));
+ }
+ });
+
+ if (files == null) {
+ return new File[0];
+ } else {
+ return files;
+ }
+ }
+
+ private File removeStateFileExtension(final File file) {
+
+ final String path = file.getAbsolutePath();
+ final int stateFileExtIndex;
+ if (path.endsWith(STALE_EXT)) {
+ stateFileExtIndex = path.lastIndexOf(STALE_EXT);
+ } else if (path.endsWith(UNKNOWN_EXT)) {
+ stateFileExtIndex = path.lastIndexOf(UNKNOWN_EXT);
+ } else {
+ stateFileExtIndex = path.length();
+ }
+
+ return new File(path.substring(0, stateFileExtIndex));
+ }
+
+ private File addStateFileExtension(final File file, final PersistedFlowState state) {
+ switch (state) {
+ case CURRENT: {
+ return file;
+ }
+ case STALE: {
+ return new File(file.getAbsolutePath() + STALE_EXT);
+ }
+ case UNKNOWN: {
+ return new File(file.getAbsolutePath() + UNKNOWN_EXT);
+ }
+ default: {
+ throw new RuntimeException("Unsupported PersistedFlowState Enum value: " + state);
+ }
+ }
+ }
+
+ private File createNewFlowStateFile(final File dir) throws IOException, JAXBException {
+ final File stateFile = new File(dir, FLOW_PACKAGE);
+ stateFile.createNewFile();
+
+ final byte[] flowBytes = getEmptyFlowBytes();
+ final byte[] templateBytes = new byte[0];
+ final byte[] snippetBytes = new byte[0];
+ final DataFlow dataFlow = new StandardDataFlow(flowBytes, templateBytes, snippetBytes);
+
+ final ClusterMetadata clusterMetadata = new ClusterMetadata();
+ writeDataFlow(stateFile, dataFlow, clusterMetadata);
+
+ return stateFile;
+ }
+
+ private byte[] getEmptyFlowBytes() throws IOException {
+ try {
+ final DocumentBuilder docBuilder = DocumentBuilderFactory.newInstance().newDocumentBuilder();
+ final Document document = docBuilder.newDocument();
+
+ final Element controller = document.createElement("flowController");
+ document.appendChild(controller);
+
+ controller.appendChild(createTextElement(document, "maxThreadCount", "15"));
+
+ final Element rootGroup = document.createElement("rootGroup");
+ rootGroup.appendChild(createTextElement(document, "id", generatedRootGroupId));
+ rootGroup.appendChild(createTextElement(document, "name", "NiFi Flow"));
+
+ // create the position element
+ final Element positionElement = createTextElement(document, "position", "");
+ positionElement.setAttribute("x", "0.0");
+ positionElement.setAttribute("y", "0.0");
+ rootGroup.appendChild(positionElement);
+
+ rootGroup.appendChild(createTextElement(document, "comment", ""));
+ controller.appendChild(rootGroup);
+
+ final Transformer transformer = TransformerFactory.newInstance().newTransformer();
+ transformer.setOutputProperty("{http://xml.apache.org/xslt}indent-amount", "2");
+ transformer.setOutputProperty(OutputKeys.INDENT, "yes");
+
+ final DOMSource source = new DOMSource(document);
+ final ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ final StreamResult result = new StreamResult(baos);
+ transformer.transform(source, result);
+
+ return baos.toByteArray();
+ } catch (final Exception e) {
+ throw new IOException(e);
+ }
+ }
+
+ private Element createTextElement(final Document document, final String elementName, final String value) {
+ final Element element = document.createElement(elementName);
+ element.setTextContent(value);
+ return element;
+ }
+
+ private void renameFlowStateFile(final File flowStateFile, final PersistedFlowState newState) throws DaoException {
+ final PersistedFlowState existingState = getPersistedFlowState(flowStateFile);
+ if (existingState != newState) {
+ final File newFlowStateFile = addStateFileExtension(removeStateFileExtension(flowStateFile), newState);
+ if (flowStateFile.renameTo(newFlowStateFile) == false) {
+ throw new DaoException(
+ String.format("Failed to rename flow state file '%s' to new name '%s'", flowStateFile.getAbsolutePath(), newFlowStateFile.getAbsolutePath()));
+ }
+ }
+ }
+
+ private ClusterDataFlow parseDataFlow(final File file) throws IOException, JAXBException, DaoException {
+ byte[] flowBytes = new byte[0];
+ byte[] templateBytes = new byte[0];
+ byte[] snippetBytes = new byte[0];
+ byte[] clusterInfoBytes = new byte[0];
+
+ try (final InputStream inStream = new FileInputStream(file);
+ final TarArchiveInputStream tarIn = new TarArchiveInputStream(new BufferedInputStream(inStream))) {
+ TarArchiveEntry tarEntry;
+ while ((tarEntry = tarIn.getNextTarEntry()) != null) {
+ switch (tarEntry.getName()) {
+ case FLOW_XML_FILENAME:
+ flowBytes = new byte[(int) tarEntry.getSize()];
+ StreamUtils.fillBuffer(tarIn, flowBytes, true);
+ break;
+ case TEMPLATES_FILENAME:
+ templateBytes = new byte[(int) tarEntry.getSize()];
+ StreamUtils.fillBuffer(tarIn, templateBytes, true);
+ break;
+ case SNIPPETS_FILENAME:
+ snippetBytes = new byte[(int) tarEntry.getSize()];
+ StreamUtils.fillBuffer(tarIn, snippetBytes, true);
+ break;
+ case CLUSTER_INFO_FILENAME:
+ clusterInfoBytes = new byte[(int) tarEntry.getSize()];
+ StreamUtils.fillBuffer(tarIn, clusterInfoBytes, true);
+ break;
+ default:
+ throw new DaoException("Found Unexpected file in dataflow configuration: " + tarEntry.getName());
+ }
+ }
+ }
+
+ final ClusterMetadata clusterMetadata;
+ if (clusterInfoBytes.length == 0) {
+ clusterMetadata = null;
+ } else {
+ final Unmarshaller clusterMetadataUnmarshaller = ClusterMetadata.jaxbCtx.createUnmarshaller();
+ clusterMetadata = (ClusterMetadata) clusterMetadataUnmarshaller.unmarshal(new ByteArrayInputStream(clusterInfoBytes));
+ }
+
+ final StandardDataFlow dataFlow = new StandardDataFlow(flowBytes, templateBytes, snippetBytes);
+ dataFlow.setAutoStartProcessors(autoStart);
+
+ return new ClusterDataFlow(dataFlow, (clusterMetadata == null) ? null : clusterMetadata.getPrimaryNodeId());
+ }
+
+ private void writeDataFlow(final File file, final ClusterDataFlow clusterDataFlow) throws IOException, JAXBException {
+
+ // get the data flow
+ DataFlow dataFlow = clusterDataFlow.getDataFlow();
+
+ // if no dataflow, then write a new dataflow
+ if (dataFlow == null) {
+ dataFlow = new StandardDataFlow(new byte[0], new byte[0], new byte[0]);
+ }
+
+ // setup the cluster metadata
+ final ClusterMetadata clusterMetadata = new ClusterMetadata();
+ clusterMetadata.setPrimaryNodeId(clusterDataFlow.getPrimaryNodeId());
+
+ // write to disk
+ writeDataFlow(file, dataFlow, clusterMetadata);
+ }
+
+ private void writeTarEntry(final TarArchiveOutputStream tarOut, final String filename, final byte[] bytes) throws IOException {
+ final TarArchiveEntry flowEntry = new TarArchiveEntry(filename);
+ flowEntry.setSize(bytes.length);
+ tarOut.putArchiveEntry(flowEntry);
+ tarOut.write(bytes);
+ tarOut.closeArchiveEntry();
+ }
+
+ private void writeDataFlow(final File file, final DataFlow dataFlow, final ClusterMetadata clusterMetadata) throws IOException, JAXBException {
+
+ try (final OutputStream fos = new FileOutputStream(file);
+ final TarArchiveOutputStream tarOut = new TarArchiveOutputStream(new BufferedOutputStream(fos))) {
+
+ writeTarEntry(tarOut, FLOW_XML_FILENAME, dataFlow.getFlow());
+ writeTarEntry(tarOut, TEMPLATES_FILENAME, dataFlow.getTemplates());
+ writeTarEntry(tarOut, SNIPPETS_FILENAME, dataFlow.getSnippets());
+
+ final ByteArrayOutputStream baos = new ByteArrayOutputStream(256);
+ writeClusterMetadata(clusterMetadata, baos);
+ final byte[] clusterInfoBytes = baos.toByteArray();
+
+ writeTarEntry(tarOut, CLUSTER_INFO_FILENAME, clusterInfoBytes);
+ }
+ }
+
+ private void writeClusterMetadata(final ClusterMetadata clusterMetadata, final OutputStream os) throws IOException, JAXBException {
+ // write cluster metadata to output stream
+ final Marshaller marshaller = ClusterMetadata.jaxbCtx.createMarshaller();
+ marshaller.setProperty(Marshaller.JAXB_FORMATTED_OUTPUT, true);
+ marshaller.setProperty(Marshaller.JAXB_FRAGMENT, true);
+ marshaller.setProperty(Marshaller.JAXB_ENCODING, "UTF-8");
+ marshaller.marshal(clusterMetadata, os);
+ }
+
+ @XmlRootElement(name = "clusterMetadata")
+ private static class ClusterMetadata {
+
+ private NodeIdentifier primaryNodeId;
+
+ private static final JAXBContext jaxbCtx;
+
+ static {
+ try {
+ jaxbCtx = JAXBContext.newInstance(ClusterMetadata.class);
+ } catch (final JAXBException je) {
+ throw new RuntimeException(je);
+ }
+ }
+
+ @XmlJavaTypeAdapter(NodeIdentifierAdapter.class)
+ public NodeIdentifier getPrimaryNodeId() {
+ return primaryNodeId;
+ }
+
+ public void setPrimaryNodeId(final NodeIdentifier primaryNodeId) {
+ this.primaryNodeId = primaryNodeId;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/flow/impl/DataFlowManagementServiceImpl.java
----------------------------------------------------------------------
diff --git a/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/flow/impl/DataFlowManagementServiceImpl.java b/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/flow/impl/DataFlowManagementServiceImpl.java
new file mode 100644
index 0000000..e135af3
--- /dev/null
+++ b/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/flow/impl/DataFlowManagementServiceImpl.java
@@ -0,0 +1,356 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.cluster.flow.impl;
+
+import java.util.Collections;
+import java.util.Date;
+import java.util.Set;
+import java.util.Timer;
+import java.util.TimerTask;
+import java.util.concurrent.CopyOnWriteArraySet;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+import org.apache.nifi.cluster.flow.ClusterDataFlow;
+import org.apache.nifi.cluster.flow.DaoException;
+import org.apache.nifi.cluster.flow.DataFlowDao;
+import org.apache.nifi.cluster.flow.DataFlowManagementService;
+import org.apache.nifi.cluster.flow.PersistedFlowState;
+import org.apache.nifi.cluster.protocol.ClusterManagerProtocolSender;
+import org.apache.nifi.cluster.protocol.NodeIdentifier;
+import org.apache.nifi.cluster.protocol.StandardDataFlow;
+import org.apache.nifi.cluster.protocol.message.FlowRequestMessage;
+import org.apache.nifi.cluster.protocol.message.FlowResponseMessage;
+import org.apache.nifi.logging.NiFiLog;
+import org.apache.nifi.util.FormatUtils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Implements FlowManagementService interface. The service tries to keep the
+ * cluster's flow current with regards to the available nodes.
+ *
+ * The instance may be configured with a retrieval delay, which will reduce the
+ * number of retrievals performed by the service at the expense of increasing
+ * the chances that the service will not be able to provide a current flow to
+ * the caller.
+ *
+ * By default, the service will try to update the flow as quickly as possible.
+ * Configuring a delay enables a less aggressive retrieval strategy.
+ * Specifically, the eligible retrieval time is reset every time the flow state
+ * is set to STALE. If the state is set to UNKNOWN or CURRENT, then the flow
+ * will not be retrieved.
+ *
+ * @author unattributed
+ */
+public class DataFlowManagementServiceImpl implements DataFlowManagementService {
+
+ /*
+ * Developer Note:
+ *
+ * This class maintains an ExecutorService and a Runnable.
+ * Although the class is not externally threadsafe, its internals are protected to
+ * accommodate multithread access between the ExecutorServer and the Runnable.
+ *
+ */
+ private static final Logger logger = new NiFiLog(LoggerFactory.getLogger(DataFlowManagementServiceImpl.class));
+
+ private final DataFlowDao flowDao;
+
+ private final ClusterManagerProtocolSender sender;
+
+ private final Set<NodeIdentifier> nodeIds = new CopyOnWriteArraySet<>();
+
+ private final AtomicBoolean stopRequested = new AtomicBoolean(false);
+
+ private final AtomicLong lastRetrievalTime = new AtomicLong(-1);
+
+ private Timer flowRetriever;
+
+ private long retrievableAfterTime = 0L;
+
+ private AtomicInteger retrievalDelaySeconds = new AtomicInteger(0);
+
+ private final TimingReentrantLock resourceLock = new TimingReentrantLock(new ReentrantLock());
+
+ public DataFlowManagementServiceImpl(final DataFlowDao flowDao, final ClusterManagerProtocolSender sender) {
+ if (flowDao == null) {
+ throw new IllegalArgumentException("Flow DAO may not be null.");
+ } else if (sender == null) {
+ throw new IllegalArgumentException("Cluster Manager Protocol Sender may not be null.");
+ }
+ this.flowDao = flowDao;
+ this.sender = sender;
+ }
+
+ @Override
+ public void start() {
+
+ if (isRunning()) {
+ throw new IllegalArgumentException("Instance is already running.");
+ }
+
+ // reset stop requested
+ stopRequested.set(false);
+
+ // setup flow retreiver timer
+ flowRetriever = new Timer("Flow Management Service", /* is daemon */ true);
+ flowRetriever.schedule(new FlowRetrieverTimerTask(), 0, 500);
+ }
+
+ @Override
+ public boolean isRunning() {
+ return (flowRetriever != null);
+ }
+
+ @Override
+ public void stop() {
+
+ if (isRunning() == false) {
+ throw new IllegalArgumentException("Instance is already stopped.");
+ }
+
+ // record stop request
+ stopRequested.set(true);
+
+ flowRetriever.cancel();
+ flowRetriever = null;
+
+ }
+
+ @Override
+ public ClusterDataFlow loadDataFlow() throws DaoException {
+ resourceLock.lock();
+ try {
+ return flowDao.loadDataFlow();
+ } finally {
+ resourceLock.unlock("loadDataFlow");
+ }
+ }
+
+ @Override
+ public void updatePrimaryNode(final NodeIdentifier nodeId) {
+ resourceLock.lock();
+ try {
+ final ClusterDataFlow existingClusterDataFlow = flowDao.loadDataFlow();
+
+ final StandardDataFlow dataFlow;
+ if (existingClusterDataFlow == null) {
+ dataFlow = null;
+ } else {
+ dataFlow = existingClusterDataFlow.getDataFlow();
+ }
+
+ flowDao.saveDataFlow(new ClusterDataFlow(dataFlow, nodeId));
+ } finally {
+ resourceLock.unlock("updatePrimaryNode");
+ }
+ }
+
+ @Override
+ public PersistedFlowState getPersistedFlowState() {
+ resourceLock.lock();
+ try {
+ return flowDao.getPersistedFlowState();
+ } finally {
+ resourceLock.unlock("getPersistedFlowState");
+ }
+ }
+
+ @Override
+ public boolean isFlowCurrent() {
+ return PersistedFlowState.CURRENT == getPersistedFlowState();
+ }
+
+ @Override
+ public void setPersistedFlowState(final PersistedFlowState flowState) {
+ // lock to ensure state change and retrievable time update are atomic
+ resourceLock.lock();
+ try {
+ flowDao.setPersistedFlowState(flowState);
+ if (PersistedFlowState.STALE == flowState) {
+ retrievableAfterTime = new Date().getTime() + (getRetrievalDelaySeconds() * 1000);
+ } else if (PersistedFlowState.UNKNOWN == flowState || PersistedFlowState.CURRENT == flowState) {
+ retrievableAfterTime = Long.MAX_VALUE;
+ }
+ } finally {
+ resourceLock.unlock("setPersistedFlowState");
+ }
+ }
+
+ @Override
+ public Set<NodeIdentifier> getNodeIds() {
+ return Collections.unmodifiableSet(nodeIds);
+ }
+
+ @Override
+ public void setNodeIds(final Set<NodeIdentifier> nodeIds) {
+
+ if (nodeIds == null) {
+ throw new IllegalArgumentException("Node IDs may not be null.");
+ }
+
+ resourceLock.lock();
+ try {
+
+ if (this.nodeIds.equals(nodeIds)) {
+ return;
+ }
+
+ this.nodeIds.clear();
+ this.nodeIds.addAll(nodeIds);
+
+ } finally {
+ resourceLock.unlock("setNodeIds");
+ }
+
+ }
+
+ @Override
+ public int getRetrievalDelaySeconds() {
+ return retrievalDelaySeconds.get();
+ }
+
+ @Override
+ public void setRetrievalDelay(final String retrievalDelay) {
+ this.retrievalDelaySeconds.set((int) FormatUtils.getTimeDuration(retrievalDelay, TimeUnit.SECONDS));
+ }
+
+ public ClusterManagerProtocolSender getSender() {
+ return sender;
+ }
+
+ public long getLastRetrievalTime() {
+ return lastRetrievalTime.get();
+ }
+
+ /**
+ * A timer task for issuing FlowRequestMessage messages to nodes to retrieve
+ * an updated flow.
+ */
+ private class FlowRetrieverTimerTask extends TimerTask {
+
+ @Override
+ public void run() {
+
+ resourceLock.lock();
+ try {
+ // if flow is current, then we're done
+ if (isFlowCurrent()) {
+ return;
+ }
+ } catch (final Exception ex) {
+ logger.info("Encountered exception checking if flow is current caused by " + ex, ex);
+ } finally {
+ resourceLock.unlock("FlowRetrieverTimerTask - isFlowCurrent");
+ }
+
+ final FlowRequestMessage request = new FlowRequestMessage();
+ for (final NodeIdentifier nodeId : getNodeIds()) {
+ try {
+ // setup request
+ request.setNodeId(nodeId);
+
+ // record request time
+ final long requestSentTime = new Date().getTime();
+
+ resourceLock.lock();
+ try {
+ // sanity checks before making request
+ if (stopRequested.get()) { // did we receive a stop request
+ logger.debug("Stopping runnable prematurely because a request to stop was issued.");
+ return;
+ } else if (requestSentTime < retrievableAfterTime) {
+ /*
+ * Retrievable after time was updated while obtaining
+ * the lock, so try again later
+ */
+ return;
+ }
+ } finally {
+ resourceLock.unlock("FlowRetrieverTimerTask - check stopRequested");
+ }
+
+ // send request
+ final FlowResponseMessage response = sender.requestFlow(request);
+
+ resourceLock.lock();
+ try {
+ // check if the retrieved flow is still valid
+ if (requestSentTime > retrievableAfterTime) {
+ logger.info("Saving retrieved flow.");
+
+ final StandardDataFlow dataFlow = response.getDataFlow();
+ final ClusterDataFlow existingClusterDataFlow = flowDao.loadDataFlow();
+ final ClusterDataFlow currentClusterDataFlow;
+ if (existingClusterDataFlow == null) {
+ currentClusterDataFlow = new ClusterDataFlow(dataFlow, null);
+ } else {
+ currentClusterDataFlow = new ClusterDataFlow(dataFlow, existingClusterDataFlow.getPrimaryNodeId());
+ }
+ flowDao.saveDataFlow(currentClusterDataFlow);
+ flowDao.setPersistedFlowState(PersistedFlowState.CURRENT);
+ lastRetrievalTime.set(new Date().getTime());
+ }
+
+ /*
+ * Retrievable after time was updated while requesting
+ * the flow, so try again later.
+ */
+ } finally {
+ resourceLock.unlock("FlowRetrieverTimerTask - saveDataFlow");
+ }
+
+ } catch (final Throwable t) {
+ logger.info("Encountered exception retrieving flow from node " + nodeId + " caused by " + t, t);
+ }
+ }
+ }
+ }
+
+ private static class TimingReentrantLock {
+
+ private final Lock lock;
+ private static final Logger logger = LoggerFactory.getLogger("dataFlowManagementService.lock");
+
+ private final ThreadLocal<Long> lockTime = new ThreadLocal<>();
+
+ public TimingReentrantLock(final Lock lock) {
+ this.lock = lock;
+ }
+
+ public void lock() {
+ lock.lock();
+ lockTime.set(System.nanoTime());
+ }
+
+ public void unlock(final String task) {
+ final long nanosLocked = System.nanoTime() - lockTime.get();
+ lock.unlock();
+
+ final long millisLocked = TimeUnit.MILLISECONDS.convert(nanosLocked, TimeUnit.NANOSECONDS);
+ if (millisLocked > 100L) {
+ logger.debug("Lock held for {} milliseconds for task: {}", millisLocked, task);
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/manager/ClusterManager.java
----------------------------------------------------------------------
diff --git a/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/manager/ClusterManager.java b/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/manager/ClusterManager.java
new file mode 100644
index 0000000..0fcac8c
--- /dev/null
+++ b/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/manager/ClusterManager.java
@@ -0,0 +1,225 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.cluster.manager;
+
+import org.apache.nifi.cluster.manager.exception.PrimaryRoleAssignmentException;
+import org.apache.nifi.cluster.manager.exception.IllegalNodeDisconnectionException;
+import org.apache.nifi.cluster.manager.exception.UnknownNodeException;
+import org.apache.nifi.cluster.manager.exception.IneligiblePrimaryNodeException;
+import org.apache.nifi.cluster.manager.exception.IllegalNodeDeletionException;
+import org.apache.nifi.cluster.manager.exception.IllegalNodeReconnectionException;
+import org.apache.nifi.cluster.manager.exception.NodeDisconnectionException;
+import org.apache.nifi.cluster.NodeInformant;
+import org.apache.nifi.cluster.event.Event;
+import org.apache.nifi.cluster.node.Node;
+import org.apache.nifi.cluster.node.Node.Status;
+import org.apache.nifi.cluster.protocol.ConnectionRequest;
+import org.apache.nifi.cluster.protocol.ConnectionResponse;
+import org.apache.nifi.cluster.protocol.Heartbeat;
+import org.apache.nifi.cluster.protocol.NodeIdentifier;
+import org.apache.nifi.controller.status.ProcessGroupStatus;
+import org.apache.nifi.diagnostics.SystemDiagnostics;
+import org.apache.nifi.reporting.BulletinRepository;
+
+import java.util.List;
+import java.util.Set;
+
+/**
+ * Defines the interface for a ClusterManager. The cluster manager is a
+ * threadsafe centralized manager for a cluster. Members of a cluster are nodes.
+ * A member becomes a node by issuing a connection request to the manager. The
+ * manager maintains the set of nodes. Nodes may be disconnected, reconnected,
+ * and deleted.
+ *
+ * Nodes are responsible for sending heartbeats to the manager to indicate their
+ * liveliness. A manager may disconnect a node if it does not receive a
+ * heartbeat within a configurable time period. A cluster manager instance may
+ * be configured with how often to monitor received heartbeats
+ * (getHeartbeatMonitoringIntervalSeconds()) and the maximum time that may
+ * elapse between node heartbeats before disconnecting the node
+ * (getMaxHeartbeatGapSeconds()).
+ *
+ * Since only a single node may execute isolated processors, the cluster manager
+ * maintains the notion of a primary node. The primary node is chosen at cluster
+ * startup and retains the role until a user requests a different node to be the
+ * primary node.
+ *
+ * @author unattributed
+ */
+public interface ClusterManager extends NodeInformant {
+
+ /**
+ * Handles a node's heartbeat.
+ *
+ * @param heartbeat a heartbeat
+ *
+ */
+ void handleHeartbeat(Heartbeat heartbeat);
+
+ /**
+ * @param statuses the statuses of the nodes
+ * @return the set of nodes
+ */
+ Set<Node> getNodes(Status... statuses);
+
+ /**
+ * @param nodeId
+ * @return returns the node with the given identifier or null if node does
+ * not exist
+ */
+ Node getNode(String nodeId);
+
+ /**
+ * @param statuses
+ * @return the set of node identifiers with the given node status
+ */
+ Set<NodeIdentifier> getNodeIds(Status... statuses);
+
+ /**
+ * Deletes the node with the given node identifier. If the given node is the
+ * primary node, then a subsequent request may be made to the manager to set
+ * a new primary node.
+ *
+ * @param nodeId the node identifier
+ * @param userDn the Distinguished Name of the user requesting the node be
+ * deleted from the cluster
+ *
+ * @throws UnknownNodeException if the node does not exist
+ * @throws IllegalNodeDeletionException if the node is not in a disconnected
+ * state
+ */
+ void deleteNode(String nodeId, String userDn) throws UnknownNodeException, IllegalNodeDeletionException;
+
+ /**
+ * Requests a connection to the cluster.
+ *
+ * @param request the request
+ *
+ * @return the response
+ */
+ ConnectionResponse requestConnection(ConnectionRequest request);
+
+ /**
+ * Services reconnection requests for a given node. If the node indicates
+ * reconnection failure, then the node will be set to disconnected.
+ * Otherwise, a reconnection request will be sent to the node, initiating
+ * the connection handshake.
+ *
+ * @param nodeId a node identifier
+ * @param userDn the Distinguished Name of the user requesting the
+ * reconnection
+ *
+ * @throws UnknownNodeException if the node does not exist
+ * @throws IllegalNodeReconnectionException if the node is not disconnected
+ */
+ void requestReconnection(String nodeId, String userDn) throws UnknownNodeException, IllegalNodeReconnectionException;
+
+ /**
+ * Requests the node with the given identifier be disconnected.
+ *
+ * @param nodeId the node identifier
+ * @param userDn the Distinguished Name of the user requesting the
+ * disconnection
+ *
+ * @throws UnknownNodeException if the node does not exist
+ * @throws IllegalNodeDisconnectionException if the node cannot be
+ * disconnected due to the cluster's state (e.g., node is last connected
+ * node or node is primary)
+ * @throws UnknownNodeException if the node does not exist
+ * @throws IllegalNodeDisconnectionException if the node is not disconnected
+ * @throws NodeDisconnectionException if the disconnection failed
+ */
+ void requestDisconnection(String nodeId, String userDn) throws UnknownNodeException, IllegalNodeDisconnectionException, NodeDisconnectionException;
+
+ /**
+ * @return the time in seconds to wait between successive executions of
+ * heartbeat monitoring
+ */
+ int getHeartbeatMonitoringIntervalSeconds();
+
+ /**
+ * @return the maximum time in seconds that is allowed between successive
+ * heartbeats of a node before disconnecting the node
+ */
+ int getMaxHeartbeatGapSeconds();
+
+ /**
+ * Returns a list of node events for the node with the given identifier. The
+ * events will be returned in order of most recent to least recent according
+ * to the creation date of the event.
+ *
+ * @param nodeId the node identifier
+ *
+ * @return the list of events or an empty list if no node exists with the
+ * given identifier
+ */
+ List<Event> getNodeEvents(final String nodeId);
+
+ /**
+ * Revokes the primary role from the current primary node and assigns the
+ * primary role to given given node ID.
+ *
+ * If role revocation fails, then the current primary node is set to
+ * disconnected while retaining the primary role and no role assignment is
+ * performed.
+ *
+ * If role assignment fails, then the given node is set to disconnected and
+ * is given the primary role.
+ *
+ * @param nodeId the node identifier
+ * @param userDn the Distinguished Name of the user requesting that the
+ * Primary Node be assigned
+ *
+ * @throws UnknownNodeException if the node with the given identifier does
+ * not exist
+ * @throws IneligiblePrimaryNodeException if the node with the given
+ * identifier is not eligible to be the primary node
+ * @throws PrimaryRoleAssignmentException if the cluster was unable to
+ * change the primary role to the requested node
+ */
+ void setPrimaryNode(String nodeId, String userDn) throws UnknownNodeException, IneligiblePrimaryNodeException, PrimaryRoleAssignmentException;
+
+ /**
+ * @return the primary node of the cluster or null if no primary node exists
+ */
+ Node getPrimaryNode();
+
+ /**
+ * Returns the bulletin repository.
+ *
+ * @return
+ */
+ BulletinRepository getBulletinRepository();
+
+ /**
+ * Returns a {@link ProcessGroupStatus} that represents the status of all
+ * nodes with the given {@link Status}es for the given ProcessGroup id, or
+ * null if no nodes exist with the given statuses
+ *
+ * @param groupId
+ * @return
+ */
+ ProcessGroupStatus getProcessGroupStatus(String groupId);
+
+ /**
+ * Returns a merged representation of the System Diagnostics for all nodes
+ * in the cluster
+ *
+ * @return
+ */
+ SystemDiagnostics getSystemDiagnostics();
+}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/manager/HttpClusterManager.java
----------------------------------------------------------------------
diff --git a/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/manager/HttpClusterManager.java b/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/manager/HttpClusterManager.java
new file mode 100644
index 0000000..2cf5812
--- /dev/null
+++ b/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/manager/HttpClusterManager.java
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.cluster.manager;
+
+import org.apache.nifi.cluster.manager.exception.DisconnectedNodeMutableRequestException;
+import org.apache.nifi.cluster.manager.exception.UriConstructionException;
+import org.apache.nifi.cluster.manager.exception.ConnectingNodeMutableRequestException;
+import org.apache.nifi.cluster.manager.exception.NoConnectedNodesException;
+import org.apache.nifi.cluster.manager.exception.NoResponseFromNodesException;
+import org.apache.nifi.cluster.manager.exception.SafeModeMutableRequestException;
+import org.apache.nifi.cluster.protocol.NodeIdentifier;
+
+import java.net.URI;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * Extends the ClusterManager interface to define how requests issued to the
+ * cluster manager are federated to the nodes. Specifically, the HTTP protocol
+ * is used for communicating requests to the cluster manager and to the nodes.
+ *
+ * @author unattributed
+ */
+public interface HttpClusterManager extends ClusterManager {
+
+ /**
+ * Federates the HTTP request to all connected nodes in the cluster. The
+ * given URI's host and port will not be used and instead will be adjusted
+ * for each node's host and port. The node URIs are guaranteed to be
+ * constructed before issuing any requests, so if a UriConstructionException
+ * is thrown, then it is guaranteed that no request was issued.
+ *
+ * @param method the HTTP method (e.g., GET, POST, PUT, DELETE, HEAD)
+ * @param uri the base request URI (up to, but not including, the query
+ * string)
+ * @param parameters the request parameters
+ * @param headers the request headers
+ *
+ * @return the client response
+ *
+ * @throws NoConnectedNodesException if no nodes are connected as results of
+ * the request
+ * @throws NoResponseFromNodesException if no response could be obtained
+ * @throws UriConstructionException if there was an issue constructing the
+ * URIs tailored for each individual node
+ * @throws ConnectingNodeMutableRequestException if the request was a PUT,
+ * POST, DELETE and a node is connecting to the cluster
+ * @throws DisconnectedNodeMutableRequestException if the request was a PUT,
+ * POST, DELETE and a node is disconnected from the cluster
+ * @throws SafeModeMutableRequestException if the request was a PUT, POST,
+ * DELETE and a the cluster is in safe mode
+ */
+ NodeResponse applyRequest(String method, URI uri, Map<String, List<String>> parameters, Map<String, String> headers)
+ throws NoConnectedNodesException, NoResponseFromNodesException, UriConstructionException, ConnectingNodeMutableRequestException,
+ DisconnectedNodeMutableRequestException, SafeModeMutableRequestException;
+
+ /**
+ * Federates the HTTP request to the nodes specified. The given URI's host
+ * and port will not be used and instead will be adjusted for each node's
+ * host and port. The node URIs are guaranteed to be constructed before
+ * issuing any requests, so if a UriConstructionException is thrown, then it
+ * is guaranteed that no request was issued.
+ *
+ * @param method the HTTP method (e.g., GET, POST, PUT, DELETE, HEAD)
+ * @param uri the base request URI (up to, but not including, the query
+ * string)
+ * @param parameters the request parameters
+ * @param headers the request headers
+ * @param nodeIdentifiers the NodeIdentifier for each node that the request
+ * should be replaced to
+ *
+ * @return the client response
+ *
+ * @throws NoConnectedNodesException if no nodes are connected as results of
+ * the request
+ * @throws NoResponseFromNodesException if no response could be obtained
+ * @throws UriConstructionException if there was an issue constructing the
+ * URIs tailored for each individual node
+ * @throws ConnectingNodeMutableRequestException if the request was a PUT,
+ * POST, DELETE and a node is connecting to the cluster
+ * @throws DisconnectedNodeMutableRequestException if the request was a PUT,
+ * POST, DELETE and a node is disconnected from the cluster
+ * @throws SafeModeMutableRequestException if the request was a PUT, POST,
+ * DELETE and a the cluster is in safe mode
+ */
+ NodeResponse applyRequest(String method, URI uri, Map<String, List<String>> parameters, Map<String, String> headers,
+ Set<NodeIdentifier> nodeIdentifiers)
+ throws NoConnectedNodesException, NoResponseFromNodesException, UriConstructionException, ConnectingNodeMutableRequestException,
+ DisconnectedNodeMutableRequestException, SafeModeMutableRequestException;
+
+ /**
+ * Federates the HTTP request to all connected nodes in the cluster. The
+ * given URI's host and port will not be used and instead will be adjusted
+ * for each node's host and port. The node URIs are guaranteed to be
+ * constructed before issuing any requests, so if a UriConstructionException
+ * is thrown, then it is guaranteed that no request was issued.
+ *
+ * @param method the HTTP method (e.g., GET, POST, PUT, DELETE, HEAD)
+ * @param uri the base request URI (up to, but not including, the query
+ * string)
+ * @param entity the HTTP request entity
+ * @param headers the request headers
+ *
+ * @return the client response
+ *
+ * @throws NoConnectedNodesException if no nodes are connected as results of
+ * the request
+ * @throws NoResponseFromNodesException if no response could be obtained
+ * @throws UriConstructionException if there was an issue constructing the
+ * URIs tailored for each individual node
+ * @throws ConnectingNodeMutableRequestException if the request was a PUT,
+ * POST, DELETE and a node is connecting to the cluster
+ * @throws DisconnectedNodeMutableRequestException if the request was a PUT,
+ * POST, DELETE and a node is disconnected from the cluster
+ * @throws SafeModeMutableRequestException if the request was a PUT, POST,
+ * DELETE and a the cluster is in safe mode
+ */
+ NodeResponse applyRequest(String method, URI uri, Object entity, Map<String, String> headers)
+ throws NoConnectedNodesException, NoResponseFromNodesException, UriConstructionException, ConnectingNodeMutableRequestException,
+ DisconnectedNodeMutableRequestException, SafeModeMutableRequestException;
+
+ /**
+ * Federates the HTTP request to the nodes specified. The given URI's host
+ * and port will not be used and instead will be adjusted for each node's
+ * host and port. The node URIs are guaranteed to be constructed before
+ * issuing any requests, so if a UriConstructionException is thrown, then it
+ * is guaranteed that no request was issued.
+ *
+ * @param method the HTTP method (e.g., GET, POST, PUT, DELETE, HEAD)
+ * @param uri the base request URI (up to, but not including, the query
+ * string)
+ * @param entity the HTTP request entity
+ * @param headers the request headers
+ * @param nodeIdentifiers the NodeIdentifier for each node that the request
+ * should be replaced to
+ *
+ * @return the client response
+ *
+ * @throws NoConnectedNodesException if no nodes are connected as results of
+ * the request
+ * @throws NoResponseFromNodesException if no response could be obtained
+ * @throws UriConstructionException if there was an issue constructing the
+ * URIs tailored for each individual node
+ * @throws ConnectingNodeMutableRequestException if the request was a PUT,
+ * POST, DELETE and a node is connecting to the cluster
+ * @throws DisconnectedNodeMutableRequestException if the request was a PUT,
+ * POST, DELETE and a node is disconnected from the cluster
+ * @throws SafeModeMutableRequestException if the request was a PUT, POST,
+ * DELETE and a the cluster is in safe mode
+ */
+ NodeResponse applyRequest(String method, URI uri, Object entity, Map<String, String> headers, Set<NodeIdentifier> nodeIdentifiers)
+ throws NoConnectedNodesException, NoResponseFromNodesException, UriConstructionException, ConnectingNodeMutableRequestException,
+ DisconnectedNodeMutableRequestException, SafeModeMutableRequestException;
+}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/manager/HttpRequestReplicator.java
----------------------------------------------------------------------
diff --git a/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/manager/HttpRequestReplicator.java b/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/manager/HttpRequestReplicator.java
new file mode 100644
index 0000000..fb57622
--- /dev/null
+++ b/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/manager/HttpRequestReplicator.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.cluster.manager;
+
+import org.apache.nifi.cluster.manager.exception.UriConstructionException;
+import java.net.URI;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import org.apache.nifi.cluster.protocol.NodeIdentifier;
+
+/**
+ * A service for managing the replication of requests to nodes. It is up to the
+ * implementing class to decide if requests are sent concurrently or serially.
+ *
+ * Clients must call start() and stop() to initialize and shutdown the instance.
+ * The instance must be started before issuing any replication requests.
+ *
+ * @author unattributed
+ */
+public interface HttpRequestReplicator {
+
+ /**
+ * Starts the instance for replicating requests. Start may only be called if
+ * the instance is not running.
+ */
+ void start();
+
+ /**
+ * Stops the instance from replicating requests. Stop may only be called if
+ * the instance is running.
+ */
+ void stop();
+
+ /**
+ * @return true if the instance is started; false otherwise.
+ */
+ boolean isRunning();
+
+ /**
+ * Requests are sent to each node in the cluster. If the request results in
+ * an exception, then the NodeResourceResponse will contain the exception.
+ *
+ * HTTP DELETE and OPTIONS methods must supply an empty parameters map or
+ * else and IllegalArgumentException is thrown.
+ *
+ * @param nodeIds the node identifiers
+ * @param method the HTTP method (e.g., GET, POST, PUT, DELETE, HEAD,
+ * OPTIONS)
+ * @param uri the base request URI (up to, but not including, the query
+ * string)
+ * @param parameters any request parameters
+ * @param headers any HTTP headers
+ *
+ * @return the set of node responses
+ *
+ * @throws UriConstructionException if a request for a node failed to be
+ * constructed from the given prototype URI. If thrown, it is guaranteed
+ * that no request was sent.
+ */
+ Set<NodeResponse> replicate(Set<NodeIdentifier> nodeIds, String method, URI uri, Map<String, List<String>> parameters, Map<String, String> headers) throws UriConstructionException;
+
+ /**
+ * Requests are sent to each node in the cluster. If the request results in
+ * an exception, then the NodeResourceResponse will contain the exception.
+ *
+ * HTTP DELETE, GET, HEAD, and OPTIONS methods will throw an
+ * IllegalArgumentException if used.
+ *
+ * @param nodeIds the node identifiers
+ * @param method the HTTP method (e.g., POST, PUT)
+ * @param uri the base request URI (up to, but not including, the query
+ * string)
+ * @param entity an entity
+ * @param headers any HTTP headers
+ *
+ * @return the set of node responses
+ *
+ * @throws UriConstructionException if a request for a node failed to be
+ * constructed from the given prototype URI. If thrown, it is guaranteed
+ * that no request was sent.
+ */
+ Set<NodeResponse> replicate(Set<NodeIdentifier> nodeIds, String method, URI uri, Object entity, Map<String, String> headers) throws UriConstructionException;
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/manager/HttpResponseMapper.java
----------------------------------------------------------------------
diff --git a/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/manager/HttpResponseMapper.java b/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/manager/HttpResponseMapper.java
new file mode 100644
index 0000000..843a666
--- /dev/null
+++ b/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/manager/HttpResponseMapper.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.cluster.manager;
+
+import java.net.URI;
+import java.util.Map;
+import java.util.Set;
+import org.apache.nifi.cluster.node.Node.Status;
+
+/**
+ * Maps a HTTP response to a node status.
+ *
+ * @author unattributed
+ */
+public interface HttpResponseMapper {
+
+ /**
+ * Maps a HTTP response to a node response and the corresponding node
+ * status.
+ *
+ * @param requestURI the original request URI
+ * @param nodeResponses a set of node resource responses
+ *
+ * @return a map associating the node response to the node status
+ */
+ Map<NodeResponse, Status> map(URI requestURI, Set<NodeResponse> nodeResponses);
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/manager/NodeResponse.java
----------------------------------------------------------------------
diff --git a/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/manager/NodeResponse.java b/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/manager/NodeResponse.java
new file mode 100644
index 0000000..3f966e5
--- /dev/null
+++ b/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/manager/NodeResponse.java
@@ -0,0 +1,329 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.cluster.manager;
+
+import com.sun.jersey.api.client.ClientResponse;
+import java.io.BufferedInputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.net.URI;
+import java.util.List;
+import java.util.Objects;
+import java.util.concurrent.TimeUnit;
+
+import javax.ws.rs.HttpMethod;
+import javax.ws.rs.WebApplicationException;
+import javax.ws.rs.core.Response;
+import javax.ws.rs.core.Response.ResponseBuilder;
+import javax.ws.rs.core.Response.Status;
+import javax.ws.rs.core.StreamingOutput;
+import org.apache.nifi.cluster.protocol.NodeIdentifier;
+import org.apache.nifi.web.api.entity.Entity;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Encapsulates a node's response in regards to receiving a external API
+ * request.
+ *
+ * Both the ClientResponse and (server) Response may be obtained from this
+ * instance. The ClientResponse is stored as it is received from the node. This
+ * includes the entity input stream. The Response is constructed on demand when
+ * mapping a ClientResponse to the Response. The ClientResponse to Response
+ * mapping includes copying the ClientResponse's input stream to the Response.
+ * Therefore, the getResponse() method should not be called more than once.
+ * Furthermore, the method should not be called if the caller has already read
+ * the ClientResponse's input stream.
+ *
+ * If a ClientResponse was unable to be created, then a NodeResponse will store
+ * the Throwable, which may be obtained by calling getThrowable().
+ *
+ * This class overrides hashCode and equals and considers two instances to be
+ * equal if they have the equal NodeIdentifiers.
+ *
+ * @author unattributed
+ */
+public class NodeResponse {
+
+ private static final Logger logger = LoggerFactory.getLogger(NodeResponse.class);
+ private final String httpMethod;
+ private final URI requestUri;
+ private final ClientResponse clientResponse;
+ private final NodeIdentifier nodeId;
+ private final Throwable throwable;
+ private boolean hasCreatedResponse = false;
+ private final Entity updatedEntity;
+ private final long requestDurationNanos;
+ private final String requestId;
+
+ public NodeResponse(final NodeIdentifier nodeId, final String httpMethod, final URI requestUri, final ClientResponse clientResponse, final long requestDurationNanos, final String requestId) {
+ if (nodeId == null) {
+ throw new IllegalArgumentException("Node identifier may not be null.");
+ } else if (StringUtils.isBlank(httpMethod)) {
+ throw new IllegalArgumentException("Http method may not be null or empty.");
+ } else if (requestUri == null) {
+ throw new IllegalArgumentException("Request URI may not be null.");
+ } else if (clientResponse == null) {
+ throw new IllegalArgumentException("ClientResponse may not be null.");
+ }
+ this.nodeId = nodeId;
+ this.httpMethod = httpMethod;
+ this.requestUri = requestUri;
+ this.clientResponse = clientResponse;
+ this.throwable = null;
+ this.updatedEntity = null;
+ this.requestDurationNanos = requestDurationNanos;
+ this.requestId = requestId;
+ }
+
+ public NodeResponse(final NodeIdentifier nodeId, final String httpMethod, final URI requestUri, final Throwable throwable) {
+ if (nodeId == null) {
+ throw new IllegalArgumentException("Node identifier may not be null.");
+ } else if (StringUtils.isBlank(httpMethod)) {
+ throw new IllegalArgumentException("Http method may not be null or empty.");
+ } else if (requestUri == null) {
+ throw new IllegalArgumentException("Request URI may not be null.");
+ } else if (throwable == null) {
+ throw new IllegalArgumentException("Throwable may not be null.");
+ }
+ this.nodeId = nodeId;
+ this.httpMethod = httpMethod;
+ this.requestUri = requestUri;
+ this.clientResponse = null;
+ this.throwable = throwable;
+ this.updatedEntity = null;
+ this.requestDurationNanos = -1L;
+ this.requestId = null;
+ }
+
+ public NodeResponse(final NodeResponse example, final Entity updatedEntity) {
+ Objects.requireNonNull(example, "NodeResponse cannot be null");
+ Objects.requireNonNull(updatedEntity, "UpdatedEntity cannot be null");
+
+ this.nodeId = example.nodeId;
+ this.httpMethod = example.httpMethod;
+ this.requestUri = example.requestUri;
+ this.clientResponse = example.clientResponse;
+ this.throwable = example.throwable;
+ this.updatedEntity = updatedEntity;
+ this.requestDurationNanos = example.requestDurationNanos;
+ this.requestId = null;
+ }
+
+ public NodeIdentifier getNodeId() {
+ return nodeId;
+ }
+
+ public String getHttpMethod() {
+ return httpMethod;
+ }
+
+ public URI getRequestUri() {
+ return requestUri;
+ }
+
+ /**
+ * @return the HTTP response status code
+ */
+ public int getStatus() {
+ if (hasThrowable()) {
+ /*
+ * since there is a throwable, there is no client input stream to
+ * worry about maintaining, so we can call getResponse() method
+ */
+ return getResponse().getStatus();
+ } else {
+ /*
+ * use client response's status instead of calling getResponse().getStatus()
+ * so that we don't read the client's input stream as part of creating
+ * the response in the getResponse() method
+ */
+ return clientResponse.getStatus();
+ }
+ }
+
+ /**
+ * Returns true if the response status is 2xx, false otherwise.
+ *
+ * @return
+ */
+ public boolean is2xx() {
+ final int statusCode = getStatus();
+ return (200 <= statusCode && statusCode <= 299);
+ }
+
+ /**
+ * Returns true if the response status is 5xx, false otherwise.
+ *
+ * @return
+ */
+ public boolean is5xx() {
+ final int statusCode = getStatus();
+ return (500 <= statusCode && statusCode <= 599);
+ }
+
+ /**
+ * Returns null if hasThrowable() is true; otherwise the client's response
+ * is returned.
+ *
+ * The ClientResponse's input stream can only be read once.
+ *
+ * @return the client's response
+ */
+ public ClientResponse getClientResponse() {
+ return clientResponse;
+ }
+
+ /**
+ * Creates a Response by mapping the ClientResponse values to it. Since the
+ * ClientResponse's input stream can only be read once, this method should
+ * only be called once. Furthermore, the caller should not have already read
+ * the ClientResponse's input stream.
+ *
+ * @return the response
+ */
+ public Response getResponse() {
+ // if the response encapsulates a throwable, then the input stream is never read and the below warning is irrelevant
+ if (hasCreatedResponse && !hasThrowable()) {
+ logger.warn("ClientResponse's input stream has already been read. The created response will not contain this data.");
+ }
+ hasCreatedResponse = true;
+ return createResponse();
+ }
+
+ /**
+ * Returns the throwable or null if no throwable exists.
+ *
+ * @return the throwable or null if no throwable exists
+ */
+ public Throwable getThrowable() {
+ return throwable;
+ }
+
+ /**
+ * Returns true if a throwable was thrown and a response was not able to be
+ * created; false otherwise.
+ *
+ * @return true if a throwable was thrown and a response was not able to be
+ * created; false otherwise
+ */
+ public boolean hasThrowable() {
+ return getThrowable() != null;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (obj == null) {
+ return false;
+ }
+ if (getClass() != obj.getClass()) {
+ return false;
+ }
+ final NodeResponse other = (NodeResponse) obj;
+ if (this.nodeId != other.nodeId && (this.nodeId == null || !this.nodeId.equals(other.nodeId))) {
+ return false;
+ }
+ return true;
+ }
+
+ @Override
+ public int hashCode() {
+ int hash = 7;
+ hash = 13 * hash + (this.nodeId != null ? this.nodeId.hashCode() : 0);
+ return hash;
+ }
+
+ public long getRequestDuration(final TimeUnit timeUnit) {
+ return timeUnit.convert(requestDurationNanos, TimeUnit.NANOSECONDS);
+ }
+
+ public String getRequestId() {
+ return requestId;
+ }
+
+ private Response createResponse() {
+
+ // if no client response was created, then generate a 500 response
+ if (hasThrowable()) {
+ return Response.status(Status.INTERNAL_SERVER_ERROR).build();
+ }
+
+ // set the status
+ final ResponseBuilder responseBuilder = Response.status(clientResponse.getStatus());
+
+ // set the headers
+ for (final String key : clientResponse.getHeaders().keySet()) {
+ final List<String> values = clientResponse.getHeaders().get(key);
+ for (final String value : values) {
+
+ if (key.equalsIgnoreCase("transfer-encoding") || key.equalsIgnoreCase("content-length")) {
+ /*
+ * do not copy the transfer-encoding header (i.e., chunked encoding) or
+ * the content-length. Let the outgoing response builder determine it.
+ */
+ continue;
+ } else if (key.equals("X-ClusterContext")) {
+ /*
+ * do not copy the cluster context to the response because
+ * this information is private and should not be sent to
+ * the client
+ */
+ continue;
+ }
+ responseBuilder.header(key, value);
+ }
+ }
+
+ // head requests must not have a message-body in the response
+ if (!HttpMethod.HEAD.equalsIgnoreCase(httpMethod)) {
+
+ // set the entity
+ if (updatedEntity == null) {
+ responseBuilder.entity(new StreamingOutput() {
+ @Override
+ public void write(final OutputStream output) throws IOException, WebApplicationException {
+ BufferedInputStream bis = null;
+ try {
+ bis = new BufferedInputStream(clientResponse.getEntityInputStream());
+ IOUtils.copy(bis, output);
+ } finally {
+ IOUtils.closeQuietly(bis);
+ }
+ }
+ });
+ } else {
+ responseBuilder.entity(updatedEntity);
+ }
+ }
+
+ return responseBuilder.build();
+ }
+
+ @Override
+ public String toString() {
+ final StringBuilder sb = new StringBuilder();
+ sb.append("NodeResponse[nodeUri=").append(nodeId.getApiAddress()).append(":").append(nodeId.getApiPort()).append(",")
+ .append("method=").append(httpMethod)
+ .append(",URI=").append(requestUri)
+ .append(",ResponseCode=").append(getStatus())
+ .append(",Duration=").append(TimeUnit.MILLISECONDS.convert(requestDurationNanos, TimeUnit.NANOSECONDS)).append(" ms]");
+ return sb.toString();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/manager/exception/BlockedByFirewallException.java
----------------------------------------------------------------------
diff --git a/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/manager/exception/BlockedByFirewallException.java b/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/manager/exception/BlockedByFirewallException.java
new file mode 100644
index 0000000..49bcd35
--- /dev/null
+++ b/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/manager/exception/BlockedByFirewallException.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.cluster.manager.exception;
+
+import org.apache.nifi.cluster.protocol.NodeIdentifier;
+
+/**
+ *
+ */
+public class BlockedByFirewallException extends ClusterException {
+
+ private final NodeIdentifier nodeId;
+ private final boolean isExistingNode;
+
+ public BlockedByFirewallException(NodeIdentifier nodeId, boolean isExistingNode, String msg, Throwable cause) {
+ super(msg, cause);
+ this.nodeId = nodeId;
+ this.isExistingNode = isExistingNode;
+ }
+
+ public BlockedByFirewallException(NodeIdentifier nodeId, boolean isExistingNode, Throwable cause) {
+ super(cause);
+ this.nodeId = nodeId;
+ this.isExistingNode = isExistingNode;
+ }
+
+ public BlockedByFirewallException(NodeIdentifier nodeId, boolean isExistingNode, String msg) {
+ super(msg);
+ this.nodeId = nodeId;
+ this.isExistingNode = isExistingNode;
+ }
+
+ public BlockedByFirewallException(NodeIdentifier nodeId, boolean isExistingNode) {
+ this.nodeId = nodeId;
+ this.isExistingNode = isExistingNode;
+ }
+
+ public NodeIdentifier getNodeId() {
+ return nodeId;
+ }
+
+ public boolean isExistingNode() {
+ return isExistingNode;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/manager/exception/ClusterException.java
----------------------------------------------------------------------
diff --git a/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/manager/exception/ClusterException.java b/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/manager/exception/ClusterException.java
new file mode 100644
index 0000000..3bf9752
--- /dev/null
+++ b/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/manager/exception/ClusterException.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.cluster.manager.exception;
+
+/**
+ * The base exception class for cluster related exceptions.
+ *
+ * @author unattributed
+ */
+public class ClusterException extends RuntimeException {
+
+ public ClusterException() {
+ }
+
+ public ClusterException(String msg) {
+ super(msg);
+ }
+
+ public ClusterException(Throwable cause) {
+ super(cause);
+ }
+
+ public ClusterException(String msg, Throwable cause) {
+ super(msg, cause);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/manager/exception/ConnectingNodeMutableRequestException.java
----------------------------------------------------------------------
diff --git a/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/manager/exception/ConnectingNodeMutableRequestException.java b/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/manager/exception/ConnectingNodeMutableRequestException.java
new file mode 100644
index 0000000..365b5f0
--- /dev/null
+++ b/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/manager/exception/ConnectingNodeMutableRequestException.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.cluster.manager.exception;
+
+/**
+ * Represents the exceptional case when a HTTP request that may change a node's
+ * dataflow is to be replicated while a node is connecting to the cluster.
+ *
+ * @author unattributed
+ */
+public class ConnectingNodeMutableRequestException extends MutableRequestException {
+
+ public ConnectingNodeMutableRequestException() {
+ }
+
+ public ConnectingNodeMutableRequestException(String msg) {
+ super(msg);
+ }
+
+ public ConnectingNodeMutableRequestException(Throwable cause) {
+ super(cause);
+ }
+
+ public ConnectingNodeMutableRequestException(String msg, Throwable cause) {
+ super(msg, cause);
+ }
+}