You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by jo...@apache.org on 2015/01/31 04:43:55 UTC
[18/62] [abbrv] [partial] incubator-nifi git commit: NIFI-270 Made
all changes identified by adam, mark, joey to prep for a cleaner build
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/f6d9354b/nifi/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/flow/DataFlowManagementService.java
----------------------------------------------------------------------
diff --git a/nifi/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/flow/DataFlowManagementService.java b/nifi/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/flow/DataFlowManagementService.java
deleted file mode 100644
index 339d904..0000000
--- a/nifi/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/flow/DataFlowManagementService.java
+++ /dev/null
@@ -1,115 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.cluster.flow;
-
-import java.util.Set;
-import org.apache.nifi.cluster.protocol.NodeIdentifier;
-
-/**
- * A service for managing the cluster's flow. The service will attempt to keep
- * the cluster's dataflow current while respecting the value of the configured
- * retrieval delay.
- *
- * The eligible retrieval time is reset with the configured delay every time the
- * flow state is set to STALE. If the state is set to UNKNOWN or CURRENT, then
- * the flow will not be retrieved.
- *
- * Clients must call start() and stop() to initialize and stop the instance.
- *
- * @author unattributed
- */
-public interface DataFlowManagementService {
-
- /**
- * Starts the instance. Start may only be called if the instance is not
- * running.
- */
- void start();
-
- /**
- * Stops the instance. Stop may only be called if the instance is running.
- */
- void stop();
-
- /**
- * @return true if the instance is started; false otherwise.
- */
- boolean isRunning();
-
- /**
- * Loads the dataflow.
- *
- * @return the dataflow or null if no dataflow exists
- */
- ClusterDataFlow loadDataFlow();
-
- /**
- * Updates the dataflow with the given primary node identifier.
- *
- * @param nodeId the node identifier
- *
- * @throws DaoException if the update failed
- */
- void updatePrimaryNode(NodeIdentifier nodeId) throws DaoException;
-
- /**
- * Sets the state of the flow.
- *
- * @param flowState the state
- *
- * @see PersistedFlowState
- */
- void setPersistedFlowState(PersistedFlowState flowState);
-
- /**
- * @return the state of the flow
- */
- PersistedFlowState getPersistedFlowState();
-
- /**
- * @return true if the flow is current; false otherwise.
- */
- boolean isFlowCurrent();
-
- /**
- * Sets the node identifiers to use when attempting to retrieve the flow.
- *
- * @param nodeIds the node identifiers
- */
- void setNodeIds(Set<NodeIdentifier> nodeIds);
-
- /**
- * Returns the set of node identifiers the service is using to retrieve the
- * flow.
- *
- * @return the set of node identifiers the service is using to retrieve the
- * flow.
- */
- Set<NodeIdentifier> getNodeIds();
-
- /**
- * @return the retrieval delay in seconds
- */
- int getRetrievalDelaySeconds();
-
- /**
- * Sets the retrieval delay.
- *
- * @param delay the retrieval delay in seconds
- */
- void setRetrievalDelay(String delay);
-}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/f6d9354b/nifi/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/flow/PersistedFlowState.java
----------------------------------------------------------------------
diff --git a/nifi/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/flow/PersistedFlowState.java b/nifi/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/flow/PersistedFlowState.java
deleted file mode 100644
index b3afc6e..0000000
--- a/nifi/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/flow/PersistedFlowState.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.cluster.flow;
-
-/**
- * Represents the various state of a flow managed by the cluster.
- *
- * The semantics of the values are:
- * <ul>
- * <li> CURRENT - the flow is current </li>
- * <li> STALE - the flow is not current, but is eligible to be updated. </li>
- * <li> UNKNOWN - the flow is not current and is not eligible to be updated.
- * </li>
- * </ul>
- *
- * @author unattributed
- */
-public enum PersistedFlowState {
-
- CURRENT,
- STALE,
- UNKNOWN
-}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/f6d9354b/nifi/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/flow/StaleFlowException.java
----------------------------------------------------------------------
diff --git a/nifi/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/flow/StaleFlowException.java b/nifi/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/flow/StaleFlowException.java
deleted file mode 100644
index ce5a08b..0000000
--- a/nifi/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/flow/StaleFlowException.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.cluster.flow;
-
-/**
- * Represents the exceptional case when a caller is requesting the current flow,
- * but a current flow is not available.
- *
- * @author unattributed
- */
-public class StaleFlowException extends RuntimeException {
-
- public StaleFlowException(String message, Throwable cause) {
- super(message, cause);
- }
-
- public StaleFlowException(String message) {
- super(message);
- }
-
- public StaleFlowException(Throwable cause) {
- super(cause);
- }
-
- public StaleFlowException() {
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/f6d9354b/nifi/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/flow/impl/DataFlowDaoImpl.java
----------------------------------------------------------------------
diff --git a/nifi/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/flow/impl/DataFlowDaoImpl.java b/nifi/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/flow/impl/DataFlowDaoImpl.java
deleted file mode 100644
index 72b594a..0000000
--- a/nifi/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/flow/impl/DataFlowDaoImpl.java
+++ /dev/null
@@ -1,600 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.cluster.flow.impl;
-
-import java.io.ByteArrayOutputStream;
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.FileOutputStream;
-import java.io.FilenameFilter;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.security.MessageDigest;
-import java.security.NoSuchAlgorithmException;
-import java.util.Arrays;
-import java.util.UUID;
-
-import javax.xml.bind.JAXBContext;
-import javax.xml.bind.JAXBException;
-import javax.xml.bind.Marshaller;
-import javax.xml.bind.Unmarshaller;
-import javax.xml.bind.annotation.XmlRootElement;
-import javax.xml.bind.annotation.adapters.XmlJavaTypeAdapter;
-import javax.xml.parsers.DocumentBuilder;
-import javax.xml.parsers.DocumentBuilderFactory;
-import javax.xml.transform.OutputKeys;
-import javax.xml.transform.Transformer;
-import javax.xml.transform.TransformerFactory;
-import javax.xml.transform.dom.DOMSource;
-import javax.xml.transform.stream.StreamResult;
-
-import org.apache.commons.compress.archivers.ArchiveEntry;
-import org.apache.commons.compress.archivers.tar.TarArchiveEntry;
-import org.apache.commons.compress.archivers.tar.TarArchiveInputStream;
-import org.apache.commons.compress.archivers.tar.TarArchiveOutputStream;
-import org.apache.nifi.cluster.flow.ClusterDataFlow;
-import org.apache.nifi.cluster.flow.DaoException;
-import org.apache.nifi.cluster.flow.DataFlowDao;
-import org.apache.nifi.cluster.flow.PersistedFlowState;
-import org.apache.nifi.cluster.protocol.DataFlow;
-import org.apache.nifi.cluster.protocol.NodeIdentifier;
-import org.apache.nifi.cluster.protocol.StandardDataFlow;
-import org.apache.nifi.cluster.protocol.jaxb.message.NodeIdentifierAdapter;
-import org.apache.nifi.logging.NiFiLog;
-import org.apache.nifi.stream.io.BufferedInputStream;
-import org.apache.nifi.stream.io.BufferedOutputStream;
-import org.apache.nifi.stream.io.ByteArrayInputStream;
-import org.apache.nifi.stream.io.StreamUtils;
-import org.apache.nifi.util.file.FileUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.w3c.dom.Document;
-import org.w3c.dom.Element;
-
-/**
- * Implements the FlowDao interface. The implementation tracks the state of the
- * dataflow by annotating the filename of the flow state file. Specifically, the
- * implementation correlates PersistedFlowState states to filename extensions.
- * The correlation is as follows:
- * <ul>
- * <li> CURRENT maps to flow.xml </li>
- * <li> STALE maps to flow.xml.stale </li>
- * <li> UNKNOWN maps to flow.xml.unknown </li>
- * </ul>
- * Whenever the flow state changes, the flow state file's name is updated to
- * denote its state.
- *
- * The implementation also provides for a restore directory that may be
- * configured for higher availability. At instance creation, if the primary or
- * restore directories have multiple flow state files, an exception is thrown.
- * If the primary directory has a current flow state file, but the restore
- * directory does not, then the primary flow state file is copied to the restore
- * directory. If the restore directory has a current flow state file, but the
- * primary directory does not, then the restore flow state file is copied to the
- * primary directory. If both the primary and restore directories have a current
- * flow state file and the files are different, then an exception is thrown.
- *
- * When the flow state file is saved, it is always saved first to the restore
- * directory followed by a save to the primary directory. When the flow state
- * file is loaded, a check is made to verify that the primary and restore flow
- * state files are both current. If either is not current, then an exception is
- * thrown. The primary flow state file is always read when the load method is
- * called.
- *
- * @author unattributed
- */
-public class DataFlowDaoImpl implements DataFlowDao {
-
- private final File primaryDirectory;
- private final File restoreDirectory;
- private final boolean autoStart;
- private final String generatedRootGroupId = UUID.randomUUID().toString();
-
- public static final String STALE_EXT = ".stale";
- public static final String UNKNOWN_EXT = ".unknown";
- public static final String FLOW_PACKAGE = "flow.tar";
- public static final String FLOW_XML_FILENAME = "flow.xml";
- public static final String TEMPLATES_FILENAME = "templates.xml";
- public static final String SNIPPETS_FILENAME = "snippets.xml";
- public static final String CLUSTER_INFO_FILENAME = "cluster-info.xml";
-
- private static final Logger logger = new NiFiLog(LoggerFactory.getLogger(DataFlowDaoImpl.class));
-
- public DataFlowDaoImpl(final File primaryDirectory) throws DaoException {
- this(primaryDirectory, null, false);
- }
-
- public DataFlowDaoImpl(final File primaryDirectory, final File restoreDirectory, final boolean autoStart) throws DaoException {
-
- // sanity check that primary directory is a directory, creating it if necessary
- if (primaryDirectory == null) {
- throw new IllegalArgumentException("Primary directory may not be null.");
- } else if (!primaryDirectory.exists()) {
- if (!primaryDirectory.mkdir()) {
- throw new DaoException(String.format("Failed to create primary directory '%s'", primaryDirectory.getAbsolutePath()));
- }
- } else if (!primaryDirectory.isDirectory()) {
- throw new IllegalArgumentException("Primary directory must be a directory.");
- }
-
- this.autoStart = autoStart;
-
- try {
- this.primaryDirectory = primaryDirectory;
- this.restoreDirectory = restoreDirectory;
-
- if (restoreDirectory == null) {
- // check that we have exactly one current flow state file
- ensureSingleCurrentStateFile(primaryDirectory);
- } else {
-
- // check that restore directory is a directory, creating it if necessary
- FileUtils.ensureDirectoryExistAndCanAccess(restoreDirectory);
-
- // check that restore directory is not the same as the primary directory
- if (primaryDirectory.getAbsolutePath().equals(restoreDirectory.getAbsolutePath())) {
- throw new IllegalArgumentException(String.format("Primary directory '%s' is the same as restore directory '%s' ",
- primaryDirectory.getAbsolutePath(), restoreDirectory.getAbsolutePath()));
- }
-
- final File[] primaryFlowStateFiles = getFlowStateFiles(primaryDirectory);
- final File[] restoreFlowStateFiles = getFlowStateFiles(restoreDirectory);
-
- // if more than one state file in either primary or restore, then throw exception
- if (primaryFlowStateFiles.length > 1) {
- throw new IllegalStateException(String.format("Found multiple dataflow state files in primary directory '%s'", primaryDirectory));
- } else if (restoreFlowStateFiles.length > 1) {
- throw new IllegalStateException(String.format("Found multiple dataflow state files in restore directory '%s'", restoreDirectory));
- }
-
- // check that the single primary state file we found is current or create a new one
- final File primaryFlowStateFile = ensureSingleCurrentStateFile(primaryDirectory);
-
- // check that the single restore state file we found is current or create a new one
- final File restoreFlowStateFile = ensureSingleCurrentStateFile(restoreDirectory);
-
- // if there was a difference in flow state file directories, then copy the appropriate files
- if (restoreFlowStateFiles.length == 0 && primaryFlowStateFiles.length != 0) {
- // copy primary state file to restore
- FileUtils.copyFile(primaryFlowStateFile, restoreFlowStateFile, false, false, logger);
- } else if (primaryFlowStateFiles.length == 0 && restoreFlowStateFiles.length != 0) {
- // copy restore state file to primary
- FileUtils.copyFile(restoreFlowStateFile, primaryFlowStateFile, false, false, logger);
- } else {
- // sync the primary copy with the restore copy
- syncWithRestore(primaryFlowStateFile, restoreFlowStateFile);
- }
-
- }
- } catch (final IOException | IllegalArgumentException | IllegalStateException | JAXBException ex) {
- throw new DaoException(ex);
- }
- }
-
-
- private void syncWithRestore(final File primaryFile, final File restoreFile) throws IOException {
- try (final FileInputStream primaryFis = new FileInputStream(primaryFile);
- final TarArchiveInputStream primaryIn = new TarArchiveInputStream(primaryFis);
- final FileInputStream restoreFis = new FileInputStream(restoreFile);
- final TarArchiveInputStream restoreIn = new TarArchiveInputStream(restoreFis)) {
-
- final ArchiveEntry primaryEntry = primaryIn.getNextEntry();
- final ArchiveEntry restoreEntry = restoreIn.getNextEntry();
-
- if ( primaryEntry == null && restoreEntry == null ) {
- return;
- }
-
- if ( (primaryEntry == null && restoreEntry != null) || (primaryEntry != null && restoreEntry == null) ) {
- throw new IllegalStateException(String.format("Primary file '%s' is different than restore file '%s'",
- primaryFile.getAbsoluteFile(), restoreFile.getAbsolutePath()));
- }
-
- final byte[] primaryMd5 = calculateMd5(primaryIn);
- final byte[] restoreMd5 = calculateMd5(restoreIn);
-
- if ( !Arrays.equals(primaryMd5, restoreMd5) ) {
- throw new IllegalStateException(String.format("Primary file '%s' is different than restore file '%s'",
- primaryFile.getAbsoluteFile(), restoreFile.getAbsolutePath()));
- }
- }
- }
-
- private byte[] calculateMd5(final InputStream in) throws IOException {
- final MessageDigest digest;
- try {
- digest = MessageDigest.getInstance("MD5");
- } catch (final NoSuchAlgorithmException nsae) {
- throw new IOException(nsae);
- }
-
- int len;
- final byte[] buffer = new byte[8192];
- while ((len = in.read(buffer)) > -1) {
- if (len > 0) {
- digest.update(buffer, 0, len);
- }
- }
- return digest.digest();
- }
-
- @Override
- public ClusterDataFlow loadDataFlow() throws DaoException {
- try {
- return parseDataFlow(getExistingFlowStateFile(primaryDirectory));
- } catch (final IOException | JAXBException ex) {
- throw new DaoException(ex);
- }
- }
-
- @Override
- public void saveDataFlow(final ClusterDataFlow dataFlow) throws DaoException {
- try {
-
- final File primaryStateFile = getFlowStateFile(primaryDirectory);
-
- // write to restore before writing to primary in case primary experiences problems
- if (restoreDirectory != null) {
- final File restoreStateFile = getFlowStateFile(restoreDirectory);
- if (restoreStateFile == null) {
- if (primaryStateFile == null) {
- writeDataFlow(createNewFlowStateFile(restoreDirectory), dataFlow);
- } else {
- throw new DaoException(String.format("Unable to save dataflow because dataflow state file in primary directory '%s' exists, but it does not exist in the restore directory '%s'",
- primaryDirectory.getAbsolutePath(), restoreDirectory.getAbsolutePath()));
- }
- } else {
- if (primaryStateFile == null) {
- throw new DaoException(String.format("Unable to save dataflow because dataflow state file in restore directory '%s' exists, but it does not exist in the primary directory '%s'",
- restoreDirectory.getAbsolutePath(), primaryDirectory.getAbsolutePath()));
- } else {
- final PersistedFlowState primaryFlowState = getPersistedFlowState(primaryStateFile);
- final PersistedFlowState restoreFlowState = getPersistedFlowState(restoreStateFile);
- if (primaryFlowState == restoreFlowState) {
- writeDataFlow(restoreStateFile, dataFlow);
- } else {
- throw new DaoException(String.format("Unable to save dataflow because state file in primary directory '%s' has state '%s', but the state file in the restore directory '%s' has state '%s'",
- primaryDirectory.getAbsolutePath(), primaryFlowState, restoreDirectory.getAbsolutePath(), restoreFlowState));
- }
- }
- }
- }
-
- // write dataflow to primary
- if (primaryStateFile == null) {
- writeDataFlow(createNewFlowStateFile(primaryDirectory), dataFlow);
- } else {
- writeDataFlow(primaryStateFile, dataFlow);
- }
-
- } catch (final IOException | JAXBException ex) {
- throw new DaoException(ex);
- }
- }
-
- @Override
- public PersistedFlowState getPersistedFlowState() {
- // trust restore over primary if configured for restore
- if (restoreDirectory == null) {
- return getPersistedFlowState(getExistingFlowStateFile(primaryDirectory));
- } else {
- return getPersistedFlowState(getExistingFlowStateFile(restoreDirectory));
- }
- }
-
- @Override
- public void setPersistedFlowState(final PersistedFlowState flowState) throws DaoException {
- // rename restore before primary if configured for restore
- if (restoreDirectory != null) {
- renameFlowStateFile(getExistingFlowStateFile(restoreDirectory), flowState);
- }
- renameFlowStateFile(getExistingFlowStateFile(primaryDirectory), flowState);
- }
-
- private File ensureSingleCurrentStateFile(final File dir) throws IOException, JAXBException {
-
- // ensure that we have at most one state file and if we have one, it is current
- final File[] directoryFlowStateFiles = getFlowStateFiles(dir);
- if (directoryFlowStateFiles.length > 1) {
- throw new DaoException(String.format("Found multiple dataflow state files in directory '%s'", dir));
- } else if (directoryFlowStateFiles.length == 0) {
- // create a new file if none exist
- return createNewFlowStateFile(dir);
- } else {
- // check that the single flow state file is current
- final PersistedFlowState flowState = getPersistedFlowState(directoryFlowStateFiles[0]);
- if (PersistedFlowState.CURRENT == flowState) {
- return directoryFlowStateFiles[0];
- } else {
- throw new DaoException(String.format("Dataflow state file '%s' must be current.", directoryFlowStateFiles[0].getAbsolutePath()));
- }
- }
-
- }
-
- private PersistedFlowState getPersistedFlowState(final File file) {
- final String path = file.getAbsolutePath();
- if (path.endsWith(STALE_EXT)) {
- return PersistedFlowState.STALE;
- } else if (path.endsWith(UNKNOWN_EXT)) {
- return PersistedFlowState.UNKNOWN;
- } else {
- return PersistedFlowState.CURRENT;
- }
- }
-
- private File getFlowStateFile(final File dir) {
- final File[] files = getFlowStateFiles(dir);
- if (files.length > 1) {
- throw new IllegalStateException(String.format("Expected at most one dataflow state file, but found %s files.", files.length));
- } else if (files.length == 0) {
- return null;
- } else {
- return files[0];
- }
- }
-
- private File getExistingFlowStateFile(final File dir) {
- final File file = getFlowStateFile(dir);
- if (file == null) {
- throw new IllegalStateException(String.format("Expected a dataflow state file, but none existed in directory '%s'", dir.getAbsolutePath()));
- }
- return file;
- }
-
- private File[] getFlowStateFiles(final File dir) {
- final File[] files = dir.listFiles(new FilenameFilter() {
- @Override
- public boolean accept(File dir, String name) {
- return (name.equals(FLOW_PACKAGE) || name.endsWith(STALE_EXT) || name.endsWith(UNKNOWN_EXT));
- }
- });
-
- if (files == null) {
- return new File[0];
- } else {
- return files;
- }
- }
-
- private File removeStateFileExtension(final File file) {
-
- final String path = file.getAbsolutePath();
- final int stateFileExtIndex;
- if (path.endsWith(STALE_EXT)) {
- stateFileExtIndex = path.lastIndexOf(STALE_EXT);
- } else if (path.endsWith(UNKNOWN_EXT)) {
- stateFileExtIndex = path.lastIndexOf(UNKNOWN_EXT);
- } else {
- stateFileExtIndex = path.length();
- }
-
- return new File(path.substring(0, stateFileExtIndex));
- }
-
- private File addStateFileExtension(final File file, final PersistedFlowState state) {
- switch (state) {
- case CURRENT: {
- return file;
- }
- case STALE: {
- return new File(file.getAbsolutePath() + STALE_EXT);
- }
- case UNKNOWN: {
- return new File(file.getAbsolutePath() + UNKNOWN_EXT);
- }
- default: {
- throw new RuntimeException("Unsupported PersistedFlowState Enum value: " + state);
- }
- }
- }
-
- private File createNewFlowStateFile(final File dir) throws IOException, JAXBException {
- final File stateFile = new File(dir, FLOW_PACKAGE);
- stateFile.createNewFile();
-
- final byte[] flowBytes = getEmptyFlowBytes();
- final byte[] templateBytes = new byte[0];
- final byte[] snippetBytes = new byte[0];
- final DataFlow dataFlow = new StandardDataFlow(flowBytes, templateBytes, snippetBytes);
-
- final ClusterMetadata clusterMetadata = new ClusterMetadata();
- writeDataFlow(stateFile, dataFlow, clusterMetadata);
-
- return stateFile;
- }
-
- private byte[] getEmptyFlowBytes() throws IOException {
- try {
- final DocumentBuilder docBuilder = DocumentBuilderFactory.newInstance().newDocumentBuilder();
- final Document document = docBuilder.newDocument();
-
- final Element controller = document.createElement("flowController");
- document.appendChild(controller);
-
- controller.appendChild(createTextElement(document, "maxThreadCount", "15"));
-
- final Element rootGroup = document.createElement("rootGroup");
- rootGroup.appendChild(createTextElement(document, "id", generatedRootGroupId));
- rootGroup.appendChild(createTextElement(document, "name", "NiFi Flow"));
-
- // create the position element
- final Element positionElement = createTextElement(document, "position", "");
- positionElement.setAttribute("x", "0.0");
- positionElement.setAttribute("y", "0.0");
- rootGroup.appendChild(positionElement);
-
- rootGroup.appendChild(createTextElement(document, "comment", ""));
- controller.appendChild(rootGroup);
-
- final Transformer transformer = TransformerFactory.newInstance().newTransformer();
- transformer.setOutputProperty("{http://xml.apache.org/xslt}indent-amount", "2");
- transformer.setOutputProperty(OutputKeys.INDENT, "yes");
-
- final DOMSource source = new DOMSource(document);
- final ByteArrayOutputStream baos = new ByteArrayOutputStream();
- final StreamResult result = new StreamResult(baos);
- transformer.transform(source, result);
-
- return baos.toByteArray();
- } catch (final Exception e) {
- throw new IOException(e);
- }
- }
-
- private Element createTextElement(final Document document, final String elementName, final String value) {
- final Element element = document.createElement(elementName);
- element.setTextContent(value);
- return element;
- }
-
- private void renameFlowStateFile(final File flowStateFile, final PersistedFlowState newState) throws DaoException {
- final PersistedFlowState existingState = getPersistedFlowState(flowStateFile);
- if (existingState != newState) {
- final File newFlowStateFile = addStateFileExtension(removeStateFileExtension(flowStateFile), newState);
- if (flowStateFile.renameTo(newFlowStateFile) == false) {
- throw new DaoException(
- String.format("Failed to rename flow state file '%s' to new name '%s'", flowStateFile.getAbsolutePath(), newFlowStateFile.getAbsolutePath()));
- }
- }
- }
-
- private ClusterDataFlow parseDataFlow(final File file) throws IOException, JAXBException, DaoException {
- byte[] flowBytes = new byte[0];
- byte[] templateBytes = new byte[0];
- byte[] snippetBytes = new byte[0];
- byte[] clusterInfoBytes = new byte[0];
-
- try (final InputStream inStream = new FileInputStream(file);
- final TarArchiveInputStream tarIn = new TarArchiveInputStream(new BufferedInputStream(inStream))) {
- TarArchiveEntry tarEntry;
- while ((tarEntry = tarIn.getNextTarEntry()) != null) {
- switch (tarEntry.getName()) {
- case FLOW_XML_FILENAME:
- flowBytes = new byte[(int) tarEntry.getSize()];
- StreamUtils.fillBuffer(tarIn, flowBytes, true);
- break;
- case TEMPLATES_FILENAME:
- templateBytes = new byte[(int) tarEntry.getSize()];
- StreamUtils.fillBuffer(tarIn, templateBytes, true);
- break;
- case SNIPPETS_FILENAME:
- snippetBytes = new byte[(int) tarEntry.getSize()];
- StreamUtils.fillBuffer(tarIn, snippetBytes, true);
- break;
- case CLUSTER_INFO_FILENAME:
- clusterInfoBytes = new byte[(int) tarEntry.getSize()];
- StreamUtils.fillBuffer(tarIn, clusterInfoBytes, true);
- break;
- default:
- throw new DaoException("Found Unexpected file in dataflow configuration: " + tarEntry.getName());
- }
- }
- }
-
- final ClusterMetadata clusterMetadata;
- if (clusterInfoBytes.length == 0) {
- clusterMetadata = null;
- } else {
- final Unmarshaller clusterMetadataUnmarshaller = ClusterMetadata.jaxbCtx.createUnmarshaller();
- clusterMetadata = (ClusterMetadata) clusterMetadataUnmarshaller.unmarshal(new ByteArrayInputStream(clusterInfoBytes));
- }
-
- final StandardDataFlow dataFlow = new StandardDataFlow(flowBytes, templateBytes, snippetBytes);
- dataFlow.setAutoStartProcessors(autoStart);
-
- return new ClusterDataFlow(dataFlow, (clusterMetadata == null) ? null : clusterMetadata.getPrimaryNodeId());
- }
-
- private void writeDataFlow(final File file, final ClusterDataFlow clusterDataFlow) throws IOException, JAXBException {
-
- // get the data flow
- DataFlow dataFlow = clusterDataFlow.getDataFlow();
-
- // if no dataflow, then write a new dataflow
- if (dataFlow == null) {
- dataFlow = new StandardDataFlow(new byte[0], new byte[0], new byte[0]);
- }
-
- // setup the cluster metadata
- final ClusterMetadata clusterMetadata = new ClusterMetadata();
- clusterMetadata.setPrimaryNodeId(clusterDataFlow.getPrimaryNodeId());
-
- // write to disk
- writeDataFlow(file, dataFlow, clusterMetadata);
- }
-
- private void writeTarEntry(final TarArchiveOutputStream tarOut, final String filename, final byte[] bytes) throws IOException {
- final TarArchiveEntry flowEntry = new TarArchiveEntry(filename);
- flowEntry.setSize(bytes.length);
- tarOut.putArchiveEntry(flowEntry);
- tarOut.write(bytes);
- tarOut.closeArchiveEntry();
- }
-
- private void writeDataFlow(final File file, final DataFlow dataFlow, final ClusterMetadata clusterMetadata) throws IOException, JAXBException {
-
- try (final OutputStream fos = new FileOutputStream(file);
- final TarArchiveOutputStream tarOut = new TarArchiveOutputStream(new BufferedOutputStream(fos))) {
-
- writeTarEntry(tarOut, FLOW_XML_FILENAME, dataFlow.getFlow());
- writeTarEntry(tarOut, TEMPLATES_FILENAME, dataFlow.getTemplates());
- writeTarEntry(tarOut, SNIPPETS_FILENAME, dataFlow.getSnippets());
-
- final ByteArrayOutputStream baos = new ByteArrayOutputStream(256);
- writeClusterMetadata(clusterMetadata, baos);
- final byte[] clusterInfoBytes = baos.toByteArray();
-
- writeTarEntry(tarOut, CLUSTER_INFO_FILENAME, clusterInfoBytes);
- }
- }
-
- private void writeClusterMetadata(final ClusterMetadata clusterMetadata, final OutputStream os) throws IOException, JAXBException {
- // write cluster metadata to output stream
- final Marshaller marshaller = ClusterMetadata.jaxbCtx.createMarshaller();
- marshaller.setProperty(Marshaller.JAXB_FORMATTED_OUTPUT, true);
- marshaller.setProperty(Marshaller.JAXB_FRAGMENT, true);
- marshaller.setProperty(Marshaller.JAXB_ENCODING, "UTF-8");
- marshaller.marshal(clusterMetadata, os);
- }
-
- @XmlRootElement(name = "clusterMetadata")
- private static class ClusterMetadata {
-
- private NodeIdentifier primaryNodeId;
-
- private static final JAXBContext jaxbCtx;
-
- static {
- try {
- jaxbCtx = JAXBContext.newInstance(ClusterMetadata.class);
- } catch (final JAXBException je) {
- throw new RuntimeException(je);
- }
- }
-
- @XmlJavaTypeAdapter(NodeIdentifierAdapter.class)
- public NodeIdentifier getPrimaryNodeId() {
- return primaryNodeId;
- }
-
- public void setPrimaryNodeId(final NodeIdentifier primaryNodeId) {
- this.primaryNodeId = primaryNodeId;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/f6d9354b/nifi/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/flow/impl/DataFlowManagementServiceImpl.java
----------------------------------------------------------------------
diff --git a/nifi/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/flow/impl/DataFlowManagementServiceImpl.java b/nifi/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/flow/impl/DataFlowManagementServiceImpl.java
deleted file mode 100644
index e135af3..0000000
--- a/nifi/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/flow/impl/DataFlowManagementServiceImpl.java
+++ /dev/null
@@ -1,356 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.cluster.flow.impl;
-
-import java.util.Collections;
-import java.util.Date;
-import java.util.Set;
-import java.util.Timer;
-import java.util.TimerTask;
-import java.util.concurrent.CopyOnWriteArraySet;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantLock;
-
-import org.apache.nifi.cluster.flow.ClusterDataFlow;
-import org.apache.nifi.cluster.flow.DaoException;
-import org.apache.nifi.cluster.flow.DataFlowDao;
-import org.apache.nifi.cluster.flow.DataFlowManagementService;
-import org.apache.nifi.cluster.flow.PersistedFlowState;
-import org.apache.nifi.cluster.protocol.ClusterManagerProtocolSender;
-import org.apache.nifi.cluster.protocol.NodeIdentifier;
-import org.apache.nifi.cluster.protocol.StandardDataFlow;
-import org.apache.nifi.cluster.protocol.message.FlowRequestMessage;
-import org.apache.nifi.cluster.protocol.message.FlowResponseMessage;
-import org.apache.nifi.logging.NiFiLog;
-import org.apache.nifi.util.FormatUtils;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Implements FlowManagementService interface. The service tries to keep the
- * cluster's flow current with regards to the available nodes.
- *
- * The instance may be configured with a retrieval delay, which will reduce the
- * number of retrievals performed by the service at the expense of increasing
- * the chances that the service will not be able to provide a current flow to
- * the caller.
- *
- * By default, the service will try to update the flow as quickly as possible.
- * Configuring a delay enables a less aggressive retrieval strategy.
- * Specifically, the eligible retrieval time is reset every time the flow state
- * is set to STALE. If the state is set to UNKNOWN or CURRENT, then the flow
- * will not be retrieved.
- *
- * @author unattributed
- */
-public class DataFlowManagementServiceImpl implements DataFlowManagementService {
-
- /*
- * Developer Note:
- *
- * This class maintains an ExecutorService and a Runnable.
- * Although the class is not externally threadsafe, its internals are protected to
- * accommodate multithread access between the ExecutorServer and the Runnable.
- *
- */
- private static final Logger logger = new NiFiLog(LoggerFactory.getLogger(DataFlowManagementServiceImpl.class));
-
- private final DataFlowDao flowDao;
-
- private final ClusterManagerProtocolSender sender;
-
- private final Set<NodeIdentifier> nodeIds = new CopyOnWriteArraySet<>();
-
- private final AtomicBoolean stopRequested = new AtomicBoolean(false);
-
- private final AtomicLong lastRetrievalTime = new AtomicLong(-1);
-
- private Timer flowRetriever;
-
- private long retrievableAfterTime = 0L;
-
- private AtomicInteger retrievalDelaySeconds = new AtomicInteger(0);
-
- private final TimingReentrantLock resourceLock = new TimingReentrantLock(new ReentrantLock());
-
- public DataFlowManagementServiceImpl(final DataFlowDao flowDao, final ClusterManagerProtocolSender sender) {
- if (flowDao == null) {
- throw new IllegalArgumentException("Flow DAO may not be null.");
- } else if (sender == null) {
- throw new IllegalArgumentException("Cluster Manager Protocol Sender may not be null.");
- }
- this.flowDao = flowDao;
- this.sender = sender;
- }
-
- @Override
- public void start() {
-
- if (isRunning()) {
- throw new IllegalArgumentException("Instance is already running.");
- }
-
- // reset stop requested
- stopRequested.set(false);
-
- // setup flow retreiver timer
- flowRetriever = new Timer("Flow Management Service", /* is daemon */ true);
- flowRetriever.schedule(new FlowRetrieverTimerTask(), 0, 500);
- }
-
- @Override
- public boolean isRunning() {
- return (flowRetriever != null);
- }
-
- @Override
- public void stop() {
-
- if (isRunning() == false) {
- throw new IllegalArgumentException("Instance is already stopped.");
- }
-
- // record stop request
- stopRequested.set(true);
-
- flowRetriever.cancel();
- flowRetriever = null;
-
- }
-
- @Override
- public ClusterDataFlow loadDataFlow() throws DaoException {
- resourceLock.lock();
- try {
- return flowDao.loadDataFlow();
- } finally {
- resourceLock.unlock("loadDataFlow");
- }
- }
-
- @Override
- public void updatePrimaryNode(final NodeIdentifier nodeId) {
- resourceLock.lock();
- try {
- final ClusterDataFlow existingClusterDataFlow = flowDao.loadDataFlow();
-
- final StandardDataFlow dataFlow;
- if (existingClusterDataFlow == null) {
- dataFlow = null;
- } else {
- dataFlow = existingClusterDataFlow.getDataFlow();
- }
-
- flowDao.saveDataFlow(new ClusterDataFlow(dataFlow, nodeId));
- } finally {
- resourceLock.unlock("updatePrimaryNode");
- }
- }
-
- @Override
- public PersistedFlowState getPersistedFlowState() {
- resourceLock.lock();
- try {
- return flowDao.getPersistedFlowState();
- } finally {
- resourceLock.unlock("getPersistedFlowState");
- }
- }
-
- @Override
- public boolean isFlowCurrent() {
- return PersistedFlowState.CURRENT == getPersistedFlowState();
- }
-
- @Override
- public void setPersistedFlowState(final PersistedFlowState flowState) {
- // lock to ensure state change and retrievable time update are atomic
- resourceLock.lock();
- try {
- flowDao.setPersistedFlowState(flowState);
- if (PersistedFlowState.STALE == flowState) {
- retrievableAfterTime = new Date().getTime() + (getRetrievalDelaySeconds() * 1000);
- } else if (PersistedFlowState.UNKNOWN == flowState || PersistedFlowState.CURRENT == flowState) {
- retrievableAfterTime = Long.MAX_VALUE;
- }
- } finally {
- resourceLock.unlock("setPersistedFlowState");
- }
- }
-
- @Override
- public Set<NodeIdentifier> getNodeIds() {
- return Collections.unmodifiableSet(nodeIds);
- }
-
- @Override
- public void setNodeIds(final Set<NodeIdentifier> nodeIds) {
-
- if (nodeIds == null) {
- throw new IllegalArgumentException("Node IDs may not be null.");
- }
-
- resourceLock.lock();
- try {
-
- if (this.nodeIds.equals(nodeIds)) {
- return;
- }
-
- this.nodeIds.clear();
- this.nodeIds.addAll(nodeIds);
-
- } finally {
- resourceLock.unlock("setNodeIds");
- }
-
- }
-
- @Override
- public int getRetrievalDelaySeconds() {
- return retrievalDelaySeconds.get();
- }
-
- @Override
- public void setRetrievalDelay(final String retrievalDelay) {
- this.retrievalDelaySeconds.set((int) FormatUtils.getTimeDuration(retrievalDelay, TimeUnit.SECONDS));
- }
-
- public ClusterManagerProtocolSender getSender() {
- return sender;
- }
-
- public long getLastRetrievalTime() {
- return lastRetrievalTime.get();
- }
-
- /**
- * A timer task for issuing FlowRequestMessage messages to nodes to retrieve
- * an updated flow.
- */
- private class FlowRetrieverTimerTask extends TimerTask {
-
- @Override
- public void run() {
-
- resourceLock.lock();
- try {
- // if flow is current, then we're done
- if (isFlowCurrent()) {
- return;
- }
- } catch (final Exception ex) {
- logger.info("Encountered exception checking if flow is current caused by " + ex, ex);
- } finally {
- resourceLock.unlock("FlowRetrieverTimerTask - isFlowCurrent");
- }
-
- final FlowRequestMessage request = new FlowRequestMessage();
- for (final NodeIdentifier nodeId : getNodeIds()) {
- try {
- // setup request
- request.setNodeId(nodeId);
-
- // record request time
- final long requestSentTime = new Date().getTime();
-
- resourceLock.lock();
- try {
- // sanity checks before making request
- if (stopRequested.get()) { // did we receive a stop request
- logger.debug("Stopping runnable prematurely because a request to stop was issued.");
- return;
- } else if (requestSentTime < retrievableAfterTime) {
- /*
- * Retrievable after time was updated while obtaining
- * the lock, so try again later
- */
- return;
- }
- } finally {
- resourceLock.unlock("FlowRetrieverTimerTask - check stopRequested");
- }
-
- // send request
- final FlowResponseMessage response = sender.requestFlow(request);
-
- resourceLock.lock();
- try {
- // check if the retrieved flow is still valid
- if (requestSentTime > retrievableAfterTime) {
- logger.info("Saving retrieved flow.");
-
- final StandardDataFlow dataFlow = response.getDataFlow();
- final ClusterDataFlow existingClusterDataFlow = flowDao.loadDataFlow();
- final ClusterDataFlow currentClusterDataFlow;
- if (existingClusterDataFlow == null) {
- currentClusterDataFlow = new ClusterDataFlow(dataFlow, null);
- } else {
- currentClusterDataFlow = new ClusterDataFlow(dataFlow, existingClusterDataFlow.getPrimaryNodeId());
- }
- flowDao.saveDataFlow(currentClusterDataFlow);
- flowDao.setPersistedFlowState(PersistedFlowState.CURRENT);
- lastRetrievalTime.set(new Date().getTime());
- }
-
- /*
- * Retrievable after time was updated while requesting
- * the flow, so try again later.
- */
- } finally {
- resourceLock.unlock("FlowRetrieverTimerTask - saveDataFlow");
- }
-
- } catch (final Throwable t) {
- logger.info("Encountered exception retrieving flow from node " + nodeId + " caused by " + t, t);
- }
- }
- }
- }
-
- private static class TimingReentrantLock {
-
- private final Lock lock;
- private static final Logger logger = LoggerFactory.getLogger("dataFlowManagementService.lock");
-
- private final ThreadLocal<Long> lockTime = new ThreadLocal<>();
-
- public TimingReentrantLock(final Lock lock) {
- this.lock = lock;
- }
-
- public void lock() {
- lock.lock();
- lockTime.set(System.nanoTime());
- }
-
- public void unlock(final String task) {
- final long nanosLocked = System.nanoTime() - lockTime.get();
- lock.unlock();
-
- final long millisLocked = TimeUnit.MILLISECONDS.convert(nanosLocked, TimeUnit.NANOSECONDS);
- if (millisLocked > 100L) {
- logger.debug("Lock held for {} milliseconds for task: {}", millisLocked, task);
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/f6d9354b/nifi/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/manager/ClusterManager.java
----------------------------------------------------------------------
diff --git a/nifi/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/manager/ClusterManager.java b/nifi/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/manager/ClusterManager.java
deleted file mode 100644
index 0fcac8c..0000000
--- a/nifi/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/manager/ClusterManager.java
+++ /dev/null
@@ -1,225 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.cluster.manager;
-
-import org.apache.nifi.cluster.manager.exception.PrimaryRoleAssignmentException;
-import org.apache.nifi.cluster.manager.exception.IllegalNodeDisconnectionException;
-import org.apache.nifi.cluster.manager.exception.UnknownNodeException;
-import org.apache.nifi.cluster.manager.exception.IneligiblePrimaryNodeException;
-import org.apache.nifi.cluster.manager.exception.IllegalNodeDeletionException;
-import org.apache.nifi.cluster.manager.exception.IllegalNodeReconnectionException;
-import org.apache.nifi.cluster.manager.exception.NodeDisconnectionException;
-import org.apache.nifi.cluster.NodeInformant;
-import org.apache.nifi.cluster.event.Event;
-import org.apache.nifi.cluster.node.Node;
-import org.apache.nifi.cluster.node.Node.Status;
-import org.apache.nifi.cluster.protocol.ConnectionRequest;
-import org.apache.nifi.cluster.protocol.ConnectionResponse;
-import org.apache.nifi.cluster.protocol.Heartbeat;
-import org.apache.nifi.cluster.protocol.NodeIdentifier;
-import org.apache.nifi.controller.status.ProcessGroupStatus;
-import org.apache.nifi.diagnostics.SystemDiagnostics;
-import org.apache.nifi.reporting.BulletinRepository;
-
-import java.util.List;
-import java.util.Set;
-
-/**
- * Defines the interface for a ClusterManager. The cluster manager is a
- * threadsafe centralized manager for a cluster. Members of a cluster are nodes.
- * A member becomes a node by issuing a connection request to the manager. The
- * manager maintains the set of nodes. Nodes may be disconnected, reconnected,
- * and deleted.
- *
- * Nodes are responsible for sending heartbeats to the manager to indicate their
- * liveliness. A manager may disconnect a node if it does not receive a
- * heartbeat within a configurable time period. A cluster manager instance may
- * be configured with how often to monitor received heartbeats
- * (getHeartbeatMonitoringIntervalSeconds()) and the maximum time that may
- * elapse between node heartbeats before disconnecting the node
- * (getMaxHeartbeatGapSeconds()).
- *
- * Since only a single node may execute isolated processors, the cluster manager
- * maintains the notion of a primary node. The primary node is chosen at cluster
- * startup and retains the role until a user requests a different node to be the
- * primary node.
- *
- * @author unattributed
- */
-public interface ClusterManager extends NodeInformant {
-
- /**
- * Handles a node's heartbeat.
- *
- * @param heartbeat a heartbeat
- *
- */
- void handleHeartbeat(Heartbeat heartbeat);
-
- /**
- * @param statuses the statuses of the nodes
- * @return the set of nodes
- */
- Set<Node> getNodes(Status... statuses);
-
- /**
- * @param nodeId
- * @return returns the node with the given identifier or null if node does
- * not exist
- */
- Node getNode(String nodeId);
-
- /**
- * @param statuses
- * @return the set of node identifiers with the given node status
- */
- Set<NodeIdentifier> getNodeIds(Status... statuses);
-
- /**
- * Deletes the node with the given node identifier. If the given node is the
- * primary node, then a subsequent request may be made to the manager to set
- * a new primary node.
- *
- * @param nodeId the node identifier
- * @param userDn the Distinguished Name of the user requesting the node be
- * deleted from the cluster
- *
- * @throws UnknownNodeException if the node does not exist
- * @throws IllegalNodeDeletionException if the node is not in a disconnected
- * state
- */
- void deleteNode(String nodeId, String userDn) throws UnknownNodeException, IllegalNodeDeletionException;
-
- /**
- * Requests a connection to the cluster.
- *
- * @param request the request
- *
- * @return the response
- */
- ConnectionResponse requestConnection(ConnectionRequest request);
-
- /**
- * Services reconnection requests for a given node. If the node indicates
- * reconnection failure, then the node will be set to disconnected.
- * Otherwise, a reconnection request will be sent to the node, initiating
- * the connection handshake.
- *
- * @param nodeId a node identifier
- * @param userDn the Distinguished Name of the user requesting the
- * reconnection
- *
- * @throws UnknownNodeException if the node does not exist
- * @throws IllegalNodeReconnectionException if the node is not disconnected
- */
- void requestReconnection(String nodeId, String userDn) throws UnknownNodeException, IllegalNodeReconnectionException;
-
- /**
- * Requests the node with the given identifier be disconnected.
- *
- * @param nodeId the node identifier
- * @param userDn the Distinguished Name of the user requesting the
- * disconnection
- *
- * @throws UnknownNodeException if the node does not exist
- * @throws IllegalNodeDisconnectionException if the node cannot be
- * disconnected due to the cluster's state (e.g., node is last connected
- * node or node is primary)
- * @throws UnknownNodeException if the node does not exist
- * @throws IllegalNodeDisconnectionException if the node is not disconnected
- * @throws NodeDisconnectionException if the disconnection failed
- */
- void requestDisconnection(String nodeId, String userDn) throws UnknownNodeException, IllegalNodeDisconnectionException, NodeDisconnectionException;
-
- /**
- * @return the time in seconds to wait between successive executions of
- * heartbeat monitoring
- */
- int getHeartbeatMonitoringIntervalSeconds();
-
- /**
- * @return the maximum time in seconds that is allowed between successive
- * heartbeats of a node before disconnecting the node
- */
- int getMaxHeartbeatGapSeconds();
-
- /**
- * Returns a list of node events for the node with the given identifier. The
- * events will be returned in order of most recent to least recent according
- * to the creation date of the event.
- *
- * @param nodeId the node identifier
- *
- * @return the list of events or an empty list if no node exists with the
- * given identifier
- */
- List<Event> getNodeEvents(final String nodeId);
-
- /**
- * Revokes the primary role from the current primary node and assigns the
- * primary role to given given node ID.
- *
- * If role revocation fails, then the current primary node is set to
- * disconnected while retaining the primary role and no role assignment is
- * performed.
- *
- * If role assignment fails, then the given node is set to disconnected and
- * is given the primary role.
- *
- * @param nodeId the node identifier
- * @param userDn the Distinguished Name of the user requesting that the
- * Primary Node be assigned
- *
- * @throws UnknownNodeException if the node with the given identifier does
- * not exist
- * @throws IneligiblePrimaryNodeException if the node with the given
- * identifier is not eligible to be the primary node
- * @throws PrimaryRoleAssignmentException if the cluster was unable to
- * change the primary role to the requested node
- */
- void setPrimaryNode(String nodeId, String userDn) throws UnknownNodeException, IneligiblePrimaryNodeException, PrimaryRoleAssignmentException;
-
- /**
- * @return the primary node of the cluster or null if no primary node exists
- */
- Node getPrimaryNode();
-
- /**
- * Returns the bulletin repository.
- *
- * @return
- */
- BulletinRepository getBulletinRepository();
-
- /**
- * Returns a {@link ProcessGroupStatus} that represents the status of all
- * nodes with the given {@link Status}es for the given ProcessGroup id, or
- * null if no nodes exist with the given statuses
- *
- * @param groupId
- * @return
- */
- ProcessGroupStatus getProcessGroupStatus(String groupId);
-
- /**
- * Returns a merged representation of the System Diagnostics for all nodes
- * in the cluster
- *
- * @return
- */
- SystemDiagnostics getSystemDiagnostics();
-}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/f6d9354b/nifi/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/manager/HttpClusterManager.java
----------------------------------------------------------------------
diff --git a/nifi/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/manager/HttpClusterManager.java b/nifi/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/manager/HttpClusterManager.java
deleted file mode 100644
index 2cf5812..0000000
--- a/nifi/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/manager/HttpClusterManager.java
+++ /dev/null
@@ -1,169 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.cluster.manager;
-
-import org.apache.nifi.cluster.manager.exception.DisconnectedNodeMutableRequestException;
-import org.apache.nifi.cluster.manager.exception.UriConstructionException;
-import org.apache.nifi.cluster.manager.exception.ConnectingNodeMutableRequestException;
-import org.apache.nifi.cluster.manager.exception.NoConnectedNodesException;
-import org.apache.nifi.cluster.manager.exception.NoResponseFromNodesException;
-import org.apache.nifi.cluster.manager.exception.SafeModeMutableRequestException;
-import org.apache.nifi.cluster.protocol.NodeIdentifier;
-
-import java.net.URI;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-/**
- * Extends the ClusterManager interface to define how requests issued to the
- * cluster manager are federated to the nodes. Specifically, the HTTP protocol
- * is used for communicating requests to the cluster manager and to the nodes.
- *
- * @author unattributed
- */
-public interface HttpClusterManager extends ClusterManager {
-
- /**
- * Federates the HTTP request to all connected nodes in the cluster. The
- * given URI's host and port will not be used and instead will be adjusted
- * for each node's host and port. The node URIs are guaranteed to be
- * constructed before issuing any requests, so if a UriConstructionException
- * is thrown, then it is guaranteed that no request was issued.
- *
- * @param method the HTTP method (e.g., GET, POST, PUT, DELETE, HEAD)
- * @param uri the base request URI (up to, but not including, the query
- * string)
- * @param parameters the request parameters
- * @param headers the request headers
- *
- * @return the client response
- *
- * @throws NoConnectedNodesException if no nodes are connected as results of
- * the request
- * @throws NoResponseFromNodesException if no response could be obtained
- * @throws UriConstructionException if there was an issue constructing the
- * URIs tailored for each individual node
- * @throws ConnectingNodeMutableRequestException if the request was a PUT,
- * POST, DELETE and a node is connecting to the cluster
- * @throws DisconnectedNodeMutableRequestException if the request was a PUT,
- * POST, DELETE and a node is disconnected from the cluster
- * @throws SafeModeMutableRequestException if the request was a PUT, POST,
- * DELETE and a the cluster is in safe mode
- */
- NodeResponse applyRequest(String method, URI uri, Map<String, List<String>> parameters, Map<String, String> headers)
- throws NoConnectedNodesException, NoResponseFromNodesException, UriConstructionException, ConnectingNodeMutableRequestException,
- DisconnectedNodeMutableRequestException, SafeModeMutableRequestException;
-
- /**
- * Federates the HTTP request to the nodes specified. The given URI's host
- * and port will not be used and instead will be adjusted for each node's
- * host and port. The node URIs are guaranteed to be constructed before
- * issuing any requests, so if a UriConstructionException is thrown, then it
- * is guaranteed that no request was issued.
- *
- * @param method the HTTP method (e.g., GET, POST, PUT, DELETE, HEAD)
- * @param uri the base request URI (up to, but not including, the query
- * string)
- * @param parameters the request parameters
- * @param headers the request headers
- * @param nodeIdentifiers the NodeIdentifier for each node that the request
- * should be replaced to
- *
- * @return the client response
- *
- * @throws NoConnectedNodesException if no nodes are connected as results of
- * the request
- * @throws NoResponseFromNodesException if no response could be obtained
- * @throws UriConstructionException if there was an issue constructing the
- * URIs tailored for each individual node
- * @throws ConnectingNodeMutableRequestException if the request was a PUT,
- * POST, DELETE and a node is connecting to the cluster
- * @throws DisconnectedNodeMutableRequestException if the request was a PUT,
- * POST, DELETE and a node is disconnected from the cluster
- * @throws SafeModeMutableRequestException if the request was a PUT, POST,
- * DELETE and a the cluster is in safe mode
- */
- NodeResponse applyRequest(String method, URI uri, Map<String, List<String>> parameters, Map<String, String> headers,
- Set<NodeIdentifier> nodeIdentifiers)
- throws NoConnectedNodesException, NoResponseFromNodesException, UriConstructionException, ConnectingNodeMutableRequestException,
- DisconnectedNodeMutableRequestException, SafeModeMutableRequestException;
-
- /**
- * Federates the HTTP request to all connected nodes in the cluster. The
- * given URI's host and port will not be used and instead will be adjusted
- * for each node's host and port. The node URIs are guaranteed to be
- * constructed before issuing any requests, so if a UriConstructionException
- * is thrown, then it is guaranteed that no request was issued.
- *
- * @param method the HTTP method (e.g., GET, POST, PUT, DELETE, HEAD)
- * @param uri the base request URI (up to, but not including, the query
- * string)
- * @param entity the HTTP request entity
- * @param headers the request headers
- *
- * @return the client response
- *
- * @throws NoConnectedNodesException if no nodes are connected as results of
- * the request
- * @throws NoResponseFromNodesException if no response could be obtained
- * @throws UriConstructionException if there was an issue constructing the
- * URIs tailored for each individual node
- * @throws ConnectingNodeMutableRequestException if the request was a PUT,
- * POST, DELETE and a node is connecting to the cluster
- * @throws DisconnectedNodeMutableRequestException if the request was a PUT,
- * POST, DELETE and a node is disconnected from the cluster
- * @throws SafeModeMutableRequestException if the request was a PUT, POST,
- * DELETE and a the cluster is in safe mode
- */
- NodeResponse applyRequest(String method, URI uri, Object entity, Map<String, String> headers)
- throws NoConnectedNodesException, NoResponseFromNodesException, UriConstructionException, ConnectingNodeMutableRequestException,
- DisconnectedNodeMutableRequestException, SafeModeMutableRequestException;
-
- /**
- * Federates the HTTP request to the nodes specified. The given URI's host
- * and port will not be used and instead will be adjusted for each node's
- * host and port. The node URIs are guaranteed to be constructed before
- * issuing any requests, so if a UriConstructionException is thrown, then it
- * is guaranteed that no request was issued.
- *
- * @param method the HTTP method (e.g., GET, POST, PUT, DELETE, HEAD)
- * @param uri the base request URI (up to, but not including, the query
- * string)
- * @param entity the HTTP request entity
- * @param headers the request headers
- * @param nodeIdentifiers the NodeIdentifier for each node that the request
- * should be replaced to
- *
- * @return the client response
- *
- * @throws NoConnectedNodesException if no nodes are connected as results of
- * the request
- * @throws NoResponseFromNodesException if no response could be obtained
- * @throws UriConstructionException if there was an issue constructing the
- * URIs tailored for each individual node
- * @throws ConnectingNodeMutableRequestException if the request was a PUT,
- * POST, DELETE and a node is connecting to the cluster
- * @throws DisconnectedNodeMutableRequestException if the request was a PUT,
- * POST, DELETE and a node is disconnected from the cluster
- * @throws SafeModeMutableRequestException if the request was a PUT, POST,
- * DELETE and a the cluster is in safe mode
- */
- NodeResponse applyRequest(String method, URI uri, Object entity, Map<String, String> headers, Set<NodeIdentifier> nodeIdentifiers)
- throws NoConnectedNodesException, NoResponseFromNodesException, UriConstructionException, ConnectingNodeMutableRequestException,
- DisconnectedNodeMutableRequestException, SafeModeMutableRequestException;
-}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/f6d9354b/nifi/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/manager/HttpRequestReplicator.java
----------------------------------------------------------------------
diff --git a/nifi/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/manager/HttpRequestReplicator.java b/nifi/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/manager/HttpRequestReplicator.java
deleted file mode 100644
index fb57622..0000000
--- a/nifi/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/manager/HttpRequestReplicator.java
+++ /dev/null
@@ -1,99 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.cluster.manager;
-
-import org.apache.nifi.cluster.manager.exception.UriConstructionException;
-import java.net.URI;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import org.apache.nifi.cluster.protocol.NodeIdentifier;
-
-/**
- * A service for managing the replication of requests to nodes. It is up to the
- * implementing class to decide if requests are sent concurrently or serially.
- *
- * Clients must call start() and stop() to initialize and shutdown the instance.
- * The instance must be started before issuing any replication requests.
- *
- * @author unattributed
- */
-public interface HttpRequestReplicator {
-
- /**
- * Starts the instance for replicating requests. Start may only be called if
- * the instance is not running.
- */
- void start();
-
- /**
- * Stops the instance from replicating requests. Stop may only be called if
- * the instance is running.
- */
- void stop();
-
- /**
- * @return true if the instance is started; false otherwise.
- */
- boolean isRunning();
-
- /**
- * Requests are sent to each node in the cluster. If the request results in
- * an exception, then the NodeResourceResponse will contain the exception.
- *
- * HTTP DELETE and OPTIONS methods must supply an empty parameters map or
- * else and IllegalArgumentException is thrown.
- *
- * @param nodeIds the node identifiers
- * @param method the HTTP method (e.g., GET, POST, PUT, DELETE, HEAD,
- * OPTIONS)
- * @param uri the base request URI (up to, but not including, the query
- * string)
- * @param parameters any request parameters
- * @param headers any HTTP headers
- *
- * @return the set of node responses
- *
- * @throws UriConstructionException if a request for a node failed to be
- * constructed from the given prototype URI. If thrown, it is guaranteed
- * that no request was sent.
- */
- Set<NodeResponse> replicate(Set<NodeIdentifier> nodeIds, String method, URI uri, Map<String, List<String>> parameters, Map<String, String> headers) throws UriConstructionException;
-
- /**
- * Requests are sent to each node in the cluster. If the request results in
- * an exception, then the NodeResourceResponse will contain the exception.
- *
- * HTTP DELETE, GET, HEAD, and OPTIONS methods will throw an
- * IllegalArgumentException if used.
- *
- * @param nodeIds the node identifiers
- * @param method the HTTP method (e.g., POST, PUT)
- * @param uri the base request URI (up to, but not including, the query
- * string)
- * @param entity an entity
- * @param headers any HTTP headers
- *
- * @return the set of node responses
- *
- * @throws UriConstructionException if a request for a node failed to be
- * constructed from the given prototype URI. If thrown, it is guaranteed
- * that no request was sent.
- */
- Set<NodeResponse> replicate(Set<NodeIdentifier> nodeIds, String method, URI uri, Object entity, Map<String, String> headers) throws UriConstructionException;
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/f6d9354b/nifi/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/manager/HttpResponseMapper.java
----------------------------------------------------------------------
diff --git a/nifi/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/manager/HttpResponseMapper.java b/nifi/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/manager/HttpResponseMapper.java
deleted file mode 100644
index 843a666..0000000
--- a/nifi/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/manager/HttpResponseMapper.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.cluster.manager;
-
-import java.net.URI;
-import java.util.Map;
-import java.util.Set;
-import org.apache.nifi.cluster.node.Node.Status;
-
-/**
- * Maps a HTTP response to a node status.
- *
- * @author unattributed
- */
-public interface HttpResponseMapper {
-
- /**
- * Maps a HTTP response to a node response and the corresponding node
- * status.
- *
- * @param requestURI the original request URI
- * @param nodeResponses a set of node resource responses
- *
- * @return a map associating the node response to the node status
- */
- Map<NodeResponse, Status> map(URI requestURI, Set<NodeResponse> nodeResponses);
-
-}