You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by al...@apache.org on 2015/04/22 23:13:32 UTC
[42/49] incubator-nifi git commit: NIFI-271 checkpoint
NIFI-271 checkpoint
Project: http://git-wip-us.apache.org/repos/asf/incubator-nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-nifi/commit/9dda16c9
Tree: http://git-wip-us.apache.org/repos/asf/incubator-nifi/tree/9dda16c9
Diff: http://git-wip-us.apache.org/repos/asf/incubator-nifi/diff/9dda16c9
Branch: refs/heads/develop
Commit: 9dda16c995ba55b8025506a12b0844031f5efcf0
Parents: 888254b
Author: joewitt <jo...@apache.org>
Authored: Wed Apr 22 12:52:00 2015 -0400
Committer: joewitt <jo...@apache.org>
Committed: Wed Apr 22 12:52:00 2015 -0400
----------------------------------------------------------------------
.../nifi/cluster/context/ClusterContext.java | 36 +-
.../cluster/context/ClusterContextImpl.java | 10 +-
.../context/ClusterContextThreadLocal.java | 12 +-
.../cluster/firewall/ClusterNodeFirewall.java | 5 +-
.../impl/FileBasedClusterNodeFirewall.java | 8 +-
.../nifi/cluster/flow/ClusterDataFlow.java | 8 +-
.../apache/nifi/cluster/flow/DataFlowDao.java | 2 +-
.../cluster/flow/DataFlowManagementService.java | 23 +-
.../nifi/cluster/flow/impl/DataFlowDaoImpl.java | 52 +--
.../impl/DataFlowManagementServiceImpl.java | 38 +-
.../nifi/cluster/manager/ClusterManager.java | 8 +-
.../nifi/cluster/manager/NodeResponse.java | 18 +-
.../cluster/manager/impl/WebClusterManager.java | 374 +++++++++----------
.../java/org/apache/nifi/cluster/node/Node.java | 2 +-
...anagerProtocolServiceLocatorFactoryBean.java | 2 +-
.../spring/WebClusterManagerFactoryBean.java | 8 +-
.../apache/nifi/web/ConfigurationRequest.java | 9 +-
.../apache/nifi/web/ConfigurationSnapshot.java | 3 +-
.../org/apache/nifi/web/FlowModification.java | 16 +-
.../nifi/web/OptimisticLockingManager.java | 25 +-
.../web/StandardOptimisticLockingManager.java | 32 +-
.../org/apache/nifi/web/UpdateRevision.java | 6 +-
.../org/apache/nifi/web/security/DnUtils.java | 14 +-
.../anonymous/NiFiAnonymousUserFilter.java | 4 +-
.../NiFiAuthenticationEntryPoint.java | 11 +-
.../authorization/NiFiAuthorizationService.java | 38 +-
.../nifi/web/security/user/NiFiUserDetails.java | 16 +-
.../nifi/web/security/user/NiFiUserUtils.java | 6 +-
.../x509/SubjectDnX509PrincipalExtractor.java | 6 -
.../security/x509/X509AuthenticationFilter.java | 50 +--
.../security/x509/X509CertificateExtractor.java | 4 +-
.../x509/ocsp/OcspCertificateValidator.java | 43 +--
.../NiFiAuthorizationServiceTest.java | 91 ++---
33 files changed, 469 insertions(+), 511 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/9dda16c9/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-web/src/main/java/org/apache/nifi/cluster/context/ClusterContext.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-web/src/main/java/org/apache/nifi/cluster/context/ClusterContext.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-web/src/main/java/org/apache/nifi/cluster/context/ClusterContext.java
index 44fb25a..8c3e41b 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-web/src/main/java/org/apache/nifi/cluster/context/ClusterContext.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-web/src/main/java/org/apache/nifi/cluster/context/ClusterContext.java
@@ -22,38 +22,44 @@ import org.apache.nifi.action.Action;
import org.apache.nifi.web.Revision;
/**
- * Contains contextual information about clustering that may be serialized
+ * Contains contextual information about clustering that may be serialized
* between manager and node when communicating over HTTP.
*/
public interface ClusterContext extends Serializable {
-
+
/**
- * Returns a list of auditable actions. The list is modifiable
- * and will never be null.
+ * Returns a list of auditable actions. The list is modifiable and will
+ * never be null.
+ *
* @return a collection of actions
*/
List<Action> getActions();
-
+
Revision getRevision();
-
+
void setRevision(Revision revision);
-
+
/**
- * @return true if the request was sent by the cluster manager; false otherwise
+ * @return true if the request was sent by the cluster manager; false
+ * otherwise
*/
boolean isRequestSentByClusterManager();
-
+
/**
* Sets the flag to indicate if a request was sent by the cluster manager.
- * @param flag true if the request was sent by the cluster manager; false otherwise
+ *
+ * @param flag true if the request was sent by the cluster manager; false
+ * otherwise
*/
void setRequestSentByClusterManager(boolean flag);
-
+
/**
- * Gets an id generation seed. This is used to ensure that nodes are able to generate the
- * same id across the cluster. This is usually handled by the cluster manager creating the
- * id, however for some actions (snippets, templates, etc) this is not possible.
- * @return
+ * Gets an id generation seed. This is used to ensure that nodes are able to
+ * generate the same id across the cluster. This is usually handled by the
+ * cluster manager creating the id, however for some actions (snippets,
+ * templates, etc) this is not possible.
+ *
+ * @return generated id seed
*/
String getIdGenerationSeed();
}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/9dda16c9/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-web/src/main/java/org/apache/nifi/cluster/context/ClusterContextImpl.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-web/src/main/java/org/apache/nifi/cluster/context/ClusterContextImpl.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-web/src/main/java/org/apache/nifi/cluster/context/ClusterContextImpl.java
index 06907d2..43e7c2d 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-web/src/main/java/org/apache/nifi/cluster/context/ClusterContextImpl.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-web/src/main/java/org/apache/nifi/cluster/context/ClusterContextImpl.java
@@ -29,13 +29,13 @@ import org.apache.nifi.web.Revision;
public class ClusterContextImpl implements ClusterContext, Serializable {
private final List<Action> actions = new ArrayList<>();
-
+
private Revision revision;
-
+
private boolean requestSentByClusterManager;
-
+
private final String idGenerationSeed = UUID.randomUUID().toString();
-
+
@Override
public List<Action> getActions() {
return actions;
@@ -55,7 +55,7 @@ public class ClusterContextImpl implements ClusterContext, Serializable {
public boolean isRequestSentByClusterManager() {
return requestSentByClusterManager;
}
-
+
@Override
public void setRequestSentByClusterManager(boolean requestSentByClusterManager) {
this.requestSentByClusterManager = requestSentByClusterManager;
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/9dda16c9/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-web/src/main/java/org/apache/nifi/cluster/context/ClusterContextThreadLocal.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-web/src/main/java/org/apache/nifi/cluster/context/ClusterContextThreadLocal.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-web/src/main/java/org/apache/nifi/cluster/context/ClusterContextThreadLocal.java
index c8c7206..79900fb 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-web/src/main/java/org/apache/nifi/cluster/context/ClusterContextThreadLocal.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-web/src/main/java/org/apache/nifi/cluster/context/ClusterContextThreadLocal.java
@@ -20,23 +20,23 @@ package org.apache.nifi.cluster.context;
* Manages a cluster context on a threadlocal.
*/
public class ClusterContextThreadLocal {
-
+
private static final ThreadLocal<ClusterContext> contextHolder = new ThreadLocal<>();
-
+
public static void removeContext() {
contextHolder.remove();
}
-
+
public static ClusterContext createEmptyContext() {
return new ClusterContextImpl();
}
-
+
public static ClusterContext getContext() {
return contextHolder.get();
}
-
+
public static void setContext(final ClusterContext context) {
contextHolder.set(context);
}
-
+
}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/9dda16c9/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/firewall/ClusterNodeFirewall.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/firewall/ClusterNodeFirewall.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/firewall/ClusterNodeFirewall.java
index 2e3d278..08d21a5 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/firewall/ClusterNodeFirewall.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/firewall/ClusterNodeFirewall.java
@@ -27,8 +27,9 @@ public interface ClusterNodeFirewall {
* false otherwise.
*
* If an IP is given, then it must be formatted in dotted decimal notation.
- * @param hostOrIp
- * @return
+ *
+ * @param hostOrIp host
+ * @return true if permissible
*/
boolean isPermissible(String hostOrIp);
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/9dda16c9/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/firewall/impl/FileBasedClusterNodeFirewall.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/firewall/impl/FileBasedClusterNodeFirewall.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/firewall/impl/FileBasedClusterNodeFirewall.java
index 916ec14..5219629 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/firewall/impl/FileBasedClusterNodeFirewall.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/firewall/impl/FileBasedClusterNodeFirewall.java
@@ -16,10 +16,14 @@
*/
package org.apache.nifi.cluster.firewall.impl;
-import java.io.*;
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileReader;
+import java.io.IOException;
import java.net.InetAddress;
import java.net.UnknownHostException;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.Collection;
import org.apache.commons.net.util.SubnetUtils;
import org.apache.nifi.cluster.firewall.ClusterNodeFirewall;
import org.apache.nifi.util.file.FileUtils;
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/9dda16c9/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/flow/ClusterDataFlow.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/flow/ClusterDataFlow.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/flow/ClusterDataFlow.java
index c17b429..2803d4c 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/flow/ClusterDataFlow.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/flow/ClusterDataFlow.java
@@ -39,13 +39,13 @@ public class ClusterDataFlow {
}
public byte[] getControllerServices() {
- return controllerServices;
+ return controllerServices;
}
-
+
public byte[] getReportingTasks() {
- return reportingTasks;
+ return reportingTasks;
}
-
+
public NodeIdentifier getPrimaryNodeId() {
return primaryNodeId;
}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/9dda16c9/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/flow/DataFlowDao.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/flow/DataFlowDao.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/flow/DataFlowDao.java
index a273704..9ee5aa8 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/flow/DataFlowDao.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/flow/DataFlowDao.java
@@ -36,7 +36,7 @@ public interface DataFlowDao {
* Saves the cluster's dataflow.
*
*
- * @param dataFlow
+ * @param dataFlow flow
* @throws DaoException if the dataflow was unable to be saved
*/
void saveDataFlow(ClusterDataFlow dataFlow) throws DaoException;
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/9dda16c9/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/flow/DataFlowManagementService.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/flow/DataFlowManagementService.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/flow/DataFlowManagementService.java
index 082d65e..f354507 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/flow/DataFlowManagementService.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/flow/DataFlowManagementService.java
@@ -31,7 +31,6 @@ import org.apache.nifi.cluster.protocol.NodeIdentifier;
*
* Clients must call start() and stop() to initialize and stop the instance.
*
- * @author unattributed
*/
public interface DataFlowManagementService {
@@ -68,21 +67,23 @@ public interface DataFlowManagementService {
void updatePrimaryNode(NodeIdentifier nodeId) throws DaoException;
/**
- * Updates the dataflow with the given serialized form of the Controller Services that are to exist on the NCM.
- *
- * @param serializedControllerServices
- * @throws DaoException
+ * Updates the dataflow with the given serialized form of the Controller
+ * Services that are to exist on the NCM.
+ *
+ * @param serializedControllerServices services
+ * @throws DaoException ex
*/
void updateControllerServices(byte[] serializedControllerServices) throws DaoException;
-
+
/**
- * Updates the dataflow with the given serialized form of Reporting Tasks that are to exist on the NCM.
- *
- * @param serviceNodes
- * @throws DaoException
+ * Updates the dataflow with the given serialized form of Reporting Tasks
+ * that are to exist on the NCM.
+ *
+ * @param serializedReportingTasks tasks
+ * @throws DaoException ex
*/
void updateReportingTasks(byte[] serializedReportingTasks) throws DaoException;
-
+
/**
* Sets the state of the flow.
*
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/9dda16c9/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/flow/impl/DataFlowDaoImpl.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/flow/impl/DataFlowDaoImpl.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/flow/impl/DataFlowDaoImpl.java
index dd9d2a3..e2690f7 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/flow/impl/DataFlowDaoImpl.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/flow/impl/DataFlowDaoImpl.java
@@ -187,36 +187,35 @@ public class DataFlowDaoImpl implements DataFlowDao {
throw new DaoException(ex);
}
}
-
-
+
private void syncWithRestore(final File primaryFile, final File restoreFile) throws IOException {
try (final FileInputStream primaryFis = new FileInputStream(primaryFile);
- final TarArchiveInputStream primaryIn = new TarArchiveInputStream(primaryFis);
- final FileInputStream restoreFis = new FileInputStream(restoreFile);
- final TarArchiveInputStream restoreIn = new TarArchiveInputStream(restoreFis)) {
-
+ final TarArchiveInputStream primaryIn = new TarArchiveInputStream(primaryFis);
+ final FileInputStream restoreFis = new FileInputStream(restoreFile);
+ final TarArchiveInputStream restoreIn = new TarArchiveInputStream(restoreFis)) {
+
final ArchiveEntry primaryEntry = primaryIn.getNextEntry();
final ArchiveEntry restoreEntry = restoreIn.getNextEntry();
- if ( primaryEntry == null && restoreEntry == null ) {
+ if (primaryEntry == null && restoreEntry == null) {
return;
}
- if ( (primaryEntry == null && restoreEntry != null) || (primaryEntry != null && restoreEntry == null) ) {
+ if ((primaryEntry == null && restoreEntry != null) || (primaryEntry != null && restoreEntry == null)) {
throw new IllegalStateException(String.format("Primary file '%s' is different than restore file '%s'",
primaryFile.getAbsoluteFile(), restoreFile.getAbsolutePath()));
}
-
+
final byte[] primaryMd5 = calculateMd5(primaryIn);
final byte[] restoreMd5 = calculateMd5(restoreIn);
-
- if ( !Arrays.equals(primaryMd5, restoreMd5) ) {
+
+ if (!Arrays.equals(primaryMd5, restoreMd5)) {
throw new IllegalStateException(String.format("Primary file '%s' is different than restore file '%s'",
primaryFile.getAbsoluteFile(), restoreFile.getAbsolutePath()));
}
}
}
-
+
private byte[] calculateMd5(final InputStream in) throws IOException {
final MessageDigest digest;
try {
@@ -224,7 +223,7 @@ public class DataFlowDaoImpl implements DataFlowDao {
} catch (final NoSuchAlgorithmException nsae) {
throw new IOException(nsae);
}
-
+
int len;
final byte[] buffer = new byte[8192];
while ((len = in.read(buffer)) > -1) {
@@ -257,12 +256,14 @@ public class DataFlowDaoImpl implements DataFlowDao {
if (primaryStateFile == null) {
writeDataFlow(createNewFlowStateFile(restoreDirectory), dataFlow);
} else {
- throw new DaoException(String.format("Unable to save dataflow because dataflow state file in primary directory '%s' exists, but it does not exist in the restore directory '%s'",
+ throw new DaoException(String.format("Unable to save dataflow because dataflow state file in primary directory "
+ + "'%s' exists, but it does not exist in the restore directory '%s'",
primaryDirectory.getAbsolutePath(), restoreDirectory.getAbsolutePath()));
}
} else {
if (primaryStateFile == null) {
- throw new DaoException(String.format("Unable to save dataflow because dataflow state file in restore directory '%s' exists, but it does not exist in the primary directory '%s'",
+ throw new DaoException(String.format("Unable to save dataflow because dataflow state file in restore directory "
+ + "'%s' exists, but it does not exist in the primary directory '%s'",
restoreDirectory.getAbsolutePath(), primaryDirectory.getAbsolutePath()));
} else {
final PersistedFlowState primaryFlowState = getPersistedFlowState(primaryStateFile);
@@ -270,14 +271,15 @@ public class DataFlowDaoImpl implements DataFlowDao {
if (primaryFlowState == restoreFlowState) {
writeDataFlow(restoreStateFile, dataFlow);
} else {
- throw new DaoException(String.format("Unable to save dataflow because state file in primary directory '%s' has state '%s', but the state file in the restore directory '%s' has state '%s'",
+ throw new DaoException(String.format("Unable to save dataflow because state file in primary directory "
+ + "'%s' has state '%s', but the state file in the restore directory '%s' has state '%s'",
primaryDirectory.getAbsolutePath(), primaryFlowState, restoreDirectory.getAbsolutePath(), restoreFlowState));
}
}
}
}
- // write dataflow to primary
+ // write dataflow to primary
if (primaryStateFile == null) {
writeDataFlow(createNewFlowStateFile(primaryDirectory), dataFlow);
} else {
@@ -477,7 +479,7 @@ public class DataFlowDaoImpl implements DataFlowDao {
byte[] clusterInfoBytes = new byte[0];
byte[] controllerServiceBytes = new byte[0];
byte[] reportingTaskBytes = new byte[0];
-
+
try (final InputStream inStream = new FileInputStream(file);
final TarArchiveInputStream tarIn = new TarArchiveInputStream(new BufferedInputStream(inStream))) {
TarArchiveEntry tarEntry;
@@ -500,13 +502,13 @@ public class DataFlowDaoImpl implements DataFlowDao {
StreamUtils.fillBuffer(tarIn, clusterInfoBytes, true);
break;
case CONTROLLER_SERVICES_FILENAME:
- controllerServiceBytes = new byte[(int) tarEntry.getSize()];
- StreamUtils.fillBuffer(tarIn, controllerServiceBytes, true);
- break;
+ controllerServiceBytes = new byte[(int) tarEntry.getSize()];
+ StreamUtils.fillBuffer(tarIn, controllerServiceBytes, true);
+ break;
case REPORTING_TASKS_FILENAME:
- reportingTaskBytes = new byte[(int) tarEntry.getSize()];
- StreamUtils.fillBuffer(tarIn, reportingTaskBytes, true);
- break;
+ reportingTaskBytes = new byte[(int) tarEntry.getSize()];
+ StreamUtils.fillBuffer(tarIn, reportingTaskBytes, true);
+ break;
default:
throw new DaoException("Found Unexpected file in dataflow configuration: " + tarEntry.getName());
}
@@ -559,7 +561,7 @@ public class DataFlowDaoImpl implements DataFlowDao {
final TarArchiveOutputStream tarOut = new TarArchiveOutputStream(new BufferedOutputStream(fos))) {
final DataFlow dataFlow = clusterDataFlow.getDataFlow();
- if ( dataFlow == null ) {
+ if (dataFlow == null) {
writeTarEntry(tarOut, FLOW_XML_FILENAME, getEmptyFlowBytes());
writeTarEntry(tarOut, TEMPLATES_FILENAME, new byte[0]);
writeTarEntry(tarOut, SNIPPETS_FILENAME, new byte[0]);
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/9dda16c9/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/flow/impl/DataFlowManagementServiceImpl.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/flow/impl/DataFlowManagementServiceImpl.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/flow/impl/DataFlowManagementServiceImpl.java
index 1bb8ca3..4fa6504 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/flow/impl/DataFlowManagementServiceImpl.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/flow/impl/DataFlowManagementServiceImpl.java
@@ -64,12 +64,11 @@ import org.slf4j.LoggerFactory;
public class DataFlowManagementServiceImpl implements DataFlowManagementService {
/*
- * Developer Note:
- *
+ * Developer Note:
+ *
* This class maintains an ExecutorService and a Runnable.
* Although the class is not externally threadsafe, its internals are protected to
* accommodate multithread access between the ExecutorServer and the Runnable.
- *
*/
private static final Logger logger = new NiFiLog(LoggerFactory.getLogger(DataFlowManagementServiceImpl.class));
@@ -170,13 +169,12 @@ public class DataFlowManagementServiceImpl implements DataFlowManagementService
resourceLock.unlock("updatePrimaryNode");
}
}
-
-
+
@Override
public void updateControllerServices(final byte[] controllerServiceBytes) throws DaoException {
- resourceLock.lock();
- try {
- final ClusterDataFlow existingClusterDataFlow = flowDao.loadDataFlow();
+ resourceLock.lock();
+ try {
+ final ClusterDataFlow existingClusterDataFlow = flowDao.loadDataFlow();
final StandardDataFlow dataFlow;
final byte[] reportingTaskBytes;
@@ -192,16 +190,16 @@ public class DataFlowManagementServiceImpl implements DataFlowManagementService
}
flowDao.saveDataFlow(new ClusterDataFlow(dataFlow, nodeId, controllerServiceBytes, reportingTaskBytes));
- } finally {
- resourceLock.unlock("updateControllerServices");
- }
+ } finally {
+ resourceLock.unlock("updateControllerServices");
+ }
}
-
+
@Override
public void updateReportingTasks(final byte[] reportingTaskBytes) throws DaoException {
- resourceLock.lock();
- try {
- final ClusterDataFlow existingClusterDataFlow = flowDao.loadDataFlow();
+ resourceLock.lock();
+ try {
+ final ClusterDataFlow existingClusterDataFlow = flowDao.loadDataFlow();
final StandardDataFlow dataFlow;
final byte[] controllerServiceBytes;
@@ -217,9 +215,9 @@ public class DataFlowManagementServiceImpl implements DataFlowManagementService
}
flowDao.saveDataFlow(new ClusterDataFlow(dataFlow, nodeId, controllerServiceBytes, reportingTaskBytes));
- } finally {
- resourceLock.unlock("updateControllerServices");
- }
+ } finally {
+ resourceLock.unlock("updateControllerServices");
+ }
}
@Override
@@ -361,8 +359,8 @@ public class DataFlowManagementServiceImpl implements DataFlowManagementService
if (existingClusterDataFlow == null) {
currentClusterDataFlow = new ClusterDataFlow(dataFlow, null, new byte[0], new byte[0]);
} else {
- currentClusterDataFlow = new ClusterDataFlow(dataFlow, existingClusterDataFlow.getPrimaryNodeId(),
- existingClusterDataFlow.getControllerServices(), existingClusterDataFlow.getReportingTasks());
+ currentClusterDataFlow = new ClusterDataFlow(dataFlow, existingClusterDataFlow.getPrimaryNodeId(),
+ existingClusterDataFlow.getControllerServices(), existingClusterDataFlow.getReportingTasks());
}
flowDao.saveDataFlow(currentClusterDataFlow);
flowDao.setPersistedFlowState(PersistedFlowState.CURRENT);
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/9dda16c9/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/ClusterManager.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/ClusterManager.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/ClusterManager.java
index 3a1dfb2..be52e0f 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/ClusterManager.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/ClusterManager.java
@@ -77,14 +77,14 @@ public interface ClusterManager extends NodeInformant {
Set<Node> getNodes(Status... statuses);
/**
- * @param nodeId
+ * @param nodeId node identifier
* @return returns the node with the given identifier or null if node does
* not exist
*/
Node getNode(String nodeId);
/**
- * @param statuses
+ * @param statuses statuses
* @return the set of node identifiers with the given node status
*/
Set<NodeIdentifier> getNodeIds(Status... statuses);
@@ -199,9 +199,7 @@ public interface ClusterManager extends NodeInformant {
Node getPrimaryNode();
/**
- * Returns the bulletin repository.
- *
- * @return
+ * @return the bulletin repository
*/
BulletinRepository getBulletinRepository();
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/9dda16c9/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/NodeResponse.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/NodeResponse.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/NodeResponse.java
index 8bc73ab..958d600 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/NodeResponse.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/NodeResponse.java
@@ -192,19 +192,19 @@ public class NodeResponse {
}
/**
- * If this node response has been merged returns the updated entity,
- * otherwise null. Also returns null if hasThrowable() is true. The
- * intent of this method is to support getting the response entity
- * when it was already consumed during the merge operation. In this
- * case the client response rom getClientResponse() will not support
- * a getEntity(...) or getEntityInputStream() call.
- *
- * @return
+ * If this node response has been merged returns the updated entity,
+ * otherwise null. Also returns null if hasThrowable() is true. The intent
+ * of this method is to support getting the response entity when it was
+ * already consumed during the merge operation. In this case the client
+ * response rom getClientResponse() will not support a getEntity(...) or
+ * getEntityInputStream() call.
+ *
+ * @return
*/
public Entity getUpdatedEntity() {
return updatedEntity;
}
-
+
/**
* Creates a Response by mapping the ClientResponse values to it. Since the
* ClientResponse's input stream can only be read once, this method should
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/9dda16c9/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManager.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManager.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManager.java
index db6421e..94ea17f 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManager.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManager.java
@@ -318,13 +318,13 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
public static final String PROVENANCE_URI = "/nifi-api/controller/provenance";
public static final Pattern PROVENANCE_QUERY_URI = Pattern.compile("/nifi-api/controller/provenance/[a-f0-9\\-]{36}");
public static final Pattern PROVENANCE_EVENT_URI = Pattern.compile("/nifi-api/controller/provenance/events/[0-9]+");
-
+
public static final String CONTROLLER_SERVICES_URI = "/nifi-api/controller/controller-services/node";
public static final Pattern CONTROLLER_SERVICE_URI_PATTERN = Pattern.compile("/nifi-api/controller/controller-services/node/[a-f0-9\\-]{36}");
public static final Pattern CONTROLLER_SERVICE_REFERENCES_URI_PATTERN = Pattern.compile("/nifi-api/controller/controller-services/node/[a-f0-9\\-]{36}/references");
public static final String REPORTING_TASKS_URI = "/nifi-api/controller/reporting-tasks/node";
public static final Pattern REPORTING_TASK_URI_PATTERN = Pattern.compile("/nifi-api/controller/reporting-tasks/node/[a-f0-9\\-]{36}");
-
+
private final NiFiProperties properties;
private final HttpRequestReplicator httpRequestReplicator;
private final HttpResponseMapper httpResponseMapper;
@@ -427,14 +427,14 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
public void heartbeat() {
}
}, this, encryptor);
-
+
// When we construct the scheduling agents, we can pass null for a lot of the arguments because we are only
// going to be scheduling Reporting Tasks. Otherwise, it would not be okay.
processScheduler.setSchedulingAgent(SchedulingStrategy.TIMER_DRIVEN, new TimerDrivenSchedulingAgent(null, reportingTaskEngine, null, encryptor));
processScheduler.setSchedulingAgent(SchedulingStrategy.CRON_DRIVEN, new QuartzSchedulingAgent(null, reportingTaskEngine, null, encryptor));
processScheduler.setMaxThreadCount(SchedulingStrategy.TIMER_DRIVEN, 10);
processScheduler.setMaxThreadCount(SchedulingStrategy.CRON_DRIVEN, 10);
-
+
controllerServiceProvider = new StandardControllerServiceProvider(processScheduler, bulletinRepository);
}
@@ -479,10 +479,10 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
}
final byte[] serializedServices = clusterDataFlow.getControllerServices();
- if ( serializedServices != null && serializedServices.length > 0 ) {
- ControllerServiceLoader.loadControllerServices(this, new ByteArrayInputStream(serializedServices), encryptor, bulletinRepository, properties.getAutoResumeState());
+ if (serializedServices != null && serializedServices.length > 0) {
+ ControllerServiceLoader.loadControllerServices(this, new ByteArrayInputStream(serializedServices), encryptor, bulletinRepository, properties.getAutoResumeState());
}
-
+
// start multicast broadcasting service, if configured
if (servicesBroadcaster != null) {
servicesBroadcaster.start();
@@ -493,8 +493,8 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
// Load and start running Reporting Tasks
final byte[] serializedReportingTasks = clusterDataFlow.getReportingTasks();
- if ( serializedReportingTasks != null && serializedReportingTasks.length > 0 ) {
- loadReportingTasks(serializedReportingTasks);
+ if (serializedReportingTasks != null && serializedReportingTasks.length > 0) {
+ loadReportingTasks(serializedReportingTasks);
}
} catch (final IOException ioe) {
logger.warn("Failed to initialize cluster services due to: " + ioe, ioe);
@@ -558,10 +558,10 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
servicesBroadcaster.stop();
}
- if ( processScheduler != null ) {
+ if (processScheduler != null) {
processScheduler.shutdown();
}
-
+
if (encounteredException) {
throw new IOException("Failed to shutdown Cluster Manager because one or more cluster services failed to shutdown. Check the logs for details.");
}
@@ -946,7 +946,7 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
final String scheduleStateValue = DomUtils.getChild(taskElement, "scheduledState").getTextContent().trim();
final ScheduledState scheduledState = ScheduledState.valueOf(scheduleStateValue);
-
+
// Reporting Task Properties
for (final Element property : DomUtils.getChildElementsByTagName(taskElement, "property")) {
final String name = DomUtils.getChildText(property, "name");
@@ -969,21 +969,21 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
final ReportingTask reportingTask = reportingTaskNode.getReportingTask();
final ComponentLog componentLog = new SimpleProcessLogger(taskId, reportingTask);
- final ReportingInitializationContext config = new StandardReportingInitializationContext(taskId, taskName,
+ final ReportingInitializationContext config = new StandardReportingInitializationContext(taskId, taskName,
schedulingStrategy, taskSchedulingPeriod, componentLog, this);
reportingTask.initialize(config);
final String annotationData = DomUtils.getChildText(taskElement, "annotationData");
- if ( annotationData != null ) {
+ if (annotationData != null) {
reportingTaskNode.setAnnotationData(annotationData.trim());
}
-
+
final Map<PropertyDescriptor, String> resolvedProps;
try (final NarCloseable narCloseable = NarCloseable.withNarLoader()) {
resolvedProps = new HashMap<>();
for (final Map.Entry<String, String> entry : properties.entrySet()) {
final PropertyDescriptor descriptor = reportingTask.getPropertyDescriptor(entry.getKey());
- if ( entry.getValue() == null ) {
+ if (entry.getValue() == null) {
resolvedProps.put(descriptor, descriptor.getDefaultValue());
} else {
resolvedProps.put(descriptor, entry.getValue());
@@ -992,24 +992,24 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
}
for (final Map.Entry<PropertyDescriptor, String> entry : resolvedProps.entrySet()) {
- if ( entry.getValue() != null ) {
+ if (entry.getValue() != null) {
reportingTaskNode.setProperty(entry.getKey().getName(), entry.getValue());
}
}
-
+
final String comments = DomUtils.getChildText(taskElement, "comment");
- if ( comments != null ) {
+ if (comments != null) {
reportingTaskNode.setComments(comments);
}
reportingTaskNode.setScheduledState(scheduledState);
- if ( ScheduledState.RUNNING.equals(scheduledState) ) {
- if ( reportingTaskNode.isValid() ) {
+ if (ScheduledState.RUNNING.equals(scheduledState)) {
+ if (reportingTaskNode.isValid()) {
try {
processScheduler.schedule(reportingTaskNode);
} catch (final Exception e) {
logger.error("Failed to start {} due to {}", reportingTaskNode, e);
- if ( logger.isDebugEnabled() ) {
+ if (logger.isDebugEnabled()) {
logger.error("", e);
}
}
@@ -1017,8 +1017,7 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
logger.error("Failed to start {} because it is invalid due to {}", reportingTaskNode, reportingTaskNode.getValidationErrors());
}
}
-
-
+
tasks.put(reportingTaskNode.getIdentifier(), reportingTaskNode);
}
} catch (final SAXException | ParserConfigurationException | IOException | DOMException | NumberFormatException | InitializationException t) {
@@ -1031,7 +1030,6 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
return tasks;
}
-
@Override
public ReportingTaskNode createReportingTask(final String type, final String id, final boolean firstTimeAdded) throws ReportingTaskInstantiationException {
if (type == null) {
@@ -1064,16 +1062,16 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
final ReportingTaskNode taskNode = new ClusteredReportingTaskNode(task, id, processScheduler,
new ClusteredEventAccess(this), bulletinRepository, controllerServiceProvider, validationContextFactory);
taskNode.setName(task.getClass().getSimpleName());
-
+
reportingTasks.put(id, taskNode);
- if ( firstTimeAdded ) {
+ if (firstTimeAdded) {
try (final NarCloseable x = NarCloseable.withNarLoader()) {
ReflectionUtils.invokeMethodsWithAnnotation(OnAdded.class, task);
} catch (final Exception e) {
throw new ComponentLifeCycleException("Failed to invoke On-Added Lifecycle methods of " + task, e);
}
}
-
+
return taskNode;
}
@@ -1372,7 +1370,7 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
writeLock.unlock("handleControllerStartupFailure");
}
}
-
+
/**
* Adds an instance of a specified controller service.
*
@@ -1383,7 +1381,7 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
*/
@Override
public ControllerServiceNode createControllerService(final String type, final String id, final boolean firstTimeAdded) {
- return controllerServiceProvider.createControllerService(type, id, firstTimeAdded);
+ return controllerServiceProvider.createControllerService(type, id, firstTimeAdded);
}
@Override
@@ -1410,82 +1408,80 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
public boolean isControllerServiceEnabling(final String serviceIdentifier) {
return controllerServiceProvider.isControllerServiceEnabling(serviceIdentifier);
}
-
+
@Override
public String getControllerServiceName(final String serviceIdentifier) {
- return controllerServiceProvider.getControllerServiceName(serviceIdentifier);
+ return controllerServiceProvider.getControllerServiceName(serviceIdentifier);
}
@Override
public void removeControllerService(final ControllerServiceNode serviceNode) {
controllerServiceProvider.removeControllerService(serviceNode);
}
-
@Override
public void enableControllerService(final ControllerServiceNode serviceNode) {
controllerServiceProvider.enableControllerService(serviceNode);
}
-
+
@Override
public void enableControllerServices(final Collection<ControllerServiceNode> serviceNodes) {
controllerServiceProvider.enableControllerServices(serviceNodes);
}
-
+
@Override
public void disableControllerService(final ControllerServiceNode serviceNode) {
controllerServiceProvider.disableControllerService(serviceNode);
}
-
+
@Override
public Set<ControllerServiceNode> getAllControllerServices() {
- return controllerServiceProvider.getAllControllerServices();
+ return controllerServiceProvider.getAllControllerServices();
}
-
-
+
@Override
public void disableReferencingServices(final ControllerServiceNode serviceNode) {
controllerServiceProvider.disableReferencingServices(serviceNode);
}
-
+
@Override
public void enableReferencingServices(final ControllerServiceNode serviceNode) {
controllerServiceProvider.enableReferencingServices(serviceNode);
}
-
+
@Override
public void scheduleReferencingComponents(final ControllerServiceNode serviceNode) {
controllerServiceProvider.scheduleReferencingComponents(serviceNode);
}
-
+
@Override
public void unscheduleReferencingComponents(final ControllerServiceNode serviceNode) {
controllerServiceProvider.unscheduleReferencingComponents(serviceNode);
}
-
+
@Override
public void verifyCanEnableReferencingServices(final ControllerServiceNode serviceNode) {
controllerServiceProvider.verifyCanEnableReferencingServices(serviceNode);
}
-
+
@Override
public void verifyCanScheduleReferencingComponents(final ControllerServiceNode serviceNode) {
controllerServiceProvider.verifyCanScheduleReferencingComponents(serviceNode);
}
-
+
@Override
public void verifyCanDisableReferencingServices(final ControllerServiceNode serviceNode) {
controllerServiceProvider.verifyCanDisableReferencingServices(serviceNode);
}
-
+
@Override
public void verifyCanStopReferencingComponents(final ControllerServiceNode serviceNode) {
controllerServiceProvider.verifyCanStopReferencingComponents(serviceNode);
}
-
+
private byte[] serialize(final Document doc) throws TransformerException {
- final ByteArrayOutputStream baos = new ByteArrayOutputStream();
- final DOMSource domSource = new DOMSource(doc);
+ final ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ final DOMSource domSource = new DOMSource(doc);
final StreamResult streamResult = new StreamResult(baos);
// configure the transformer and convert the DOM
@@ -1498,91 +1494,89 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
transformer.transform(domSource, streamResult);
return baos.toByteArray();
}
-
+
private byte[] serializeControllerServices() throws ParserConfigurationException, TransformerException {
- final DocumentBuilderFactory docFactory = DocumentBuilderFactory.newInstance();
+ final DocumentBuilderFactory docFactory = DocumentBuilderFactory.newInstance();
final DocumentBuilder docBuilder = docFactory.newDocumentBuilder();
final Document document = docBuilder.newDocument();
- final Element rootElement = document.createElement("controllerServices");
- document.appendChild(rootElement);
-
- for ( final ControllerServiceNode serviceNode : getAllControllerServices() ) {
- StandardFlowSerializer.addControllerService(rootElement, serviceNode, encryptor);
- }
-
- return serialize(document);
- }
-
+ final Element rootElement = document.createElement("controllerServices");
+ document.appendChild(rootElement);
+
+ for (final ControllerServiceNode serviceNode : getAllControllerServices()) {
+ StandardFlowSerializer.addControllerService(rootElement, serviceNode, encryptor);
+ }
+
+ return serialize(document);
+ }
+
private byte[] serializeReportingTasks() throws ParserConfigurationException, TransformerException {
- final DocumentBuilderFactory docFactory = DocumentBuilderFactory.newInstance();
+ final DocumentBuilderFactory docFactory = DocumentBuilderFactory.newInstance();
final DocumentBuilder docBuilder = docFactory.newDocumentBuilder();
final Document document = docBuilder.newDocument();
- final Element rootElement = document.createElement("reportingTasks");
- document.appendChild(rootElement);
-
- for ( final ReportingTaskNode taskNode : getAllReportingTasks() ) {
- StandardFlowSerializer.addReportingTask(rootElement, taskNode, encryptor);
- }
-
- return serialize(document);
- }
-
-
+ final Element rootElement = document.createElement("reportingTasks");
+ document.appendChild(rootElement);
+
+ for (final ReportingTaskNode taskNode : getAllReportingTasks()) {
+ StandardFlowSerializer.addReportingTask(rootElement, taskNode, encryptor);
+ }
+
+ return serialize(document);
+ }
+
public void saveControllerServices() {
- try {
- dataFlowManagementService.updateControllerServices(serializeControllerServices());
- } catch (final Exception e) {
- logger.error("Failed to save changes to NCM's Controller Services; changes may be lost on restart due to " + e);
- if ( logger.isDebugEnabled() ) {
- logger.error("", e);
- }
-
- getBulletinRepository().addBulletin(BulletinFactory.createBulletin("Controller Services", Severity.ERROR.name(),
- "Failed to save changes to NCM's Controller Services; changes may be lost on restart. See logs for more details."));
- }
- }
-
+ try {
+ dataFlowManagementService.updateControllerServices(serializeControllerServices());
+ } catch (final Exception e) {
+ logger.error("Failed to save changes to NCM's Controller Services; changes may be lost on restart due to " + e);
+ if (logger.isDebugEnabled()) {
+ logger.error("", e);
+ }
+
+ getBulletinRepository().addBulletin(BulletinFactory.createBulletin("Controller Services", Severity.ERROR.name(),
+ "Failed to save changes to NCM's Controller Services; changes may be lost on restart. See logs for more details."));
+ }
+ }
+
public void saveReportingTasks() {
- try {
- dataFlowManagementService.updateReportingTasks(serializeReportingTasks());
- } catch (final Exception e) {
- logger.error("Failed to save changes to NCM's Reporting Tasks; changes may be lost on restart due to " + e);
- if ( logger.isDebugEnabled() ) {
- logger.error("", e);
- }
-
- getBulletinRepository().addBulletin(BulletinFactory.createBulletin("Reporting Tasks", Severity.ERROR.name(),
- "Failed to save changes to NCM's Reporting Tasks; changes may be lost on restart. See logs for more details."));
- }
+ try {
+ dataFlowManagementService.updateReportingTasks(serializeReportingTasks());
+ } catch (final Exception e) {
+ logger.error("Failed to save changes to NCM's Reporting Tasks; changes may be lost on restart due to " + e);
+ if (logger.isDebugEnabled()) {
+ logger.error("", e);
+ }
+
+ getBulletinRepository().addBulletin(BulletinFactory.createBulletin("Reporting Tasks", Severity.ERROR.name(),
+ "Failed to save changes to NCM's Reporting Tasks; changes may be lost on restart. See logs for more details."));
+ }
}
@Override
public Set<ReportingTaskNode> getAllReportingTasks() {
- readLock.lock();
- try {
- return new HashSet<>(reportingTasks.values());
- } finally {
- readLock.unlock("getReportingTasks");
- }
+ readLock.lock();
+ try {
+ return new HashSet<>(reportingTasks.values());
+ } finally {
+ readLock.unlock("getReportingTasks");
+ }
}
@Override
public ReportingTaskNode getReportingTaskNode(final String taskId) {
- readLock.lock();
- try {
- return reportingTasks.get(taskId);
- } finally {
- readLock.unlock("getReportingTaskNode");
- }
+ readLock.lock();
+ try {
+ return reportingTasks.get(taskId);
+ } finally {
+ readLock.unlock("getReportingTaskNode");
+ }
}
@Override
public void startReportingTask(final ReportingTaskNode reportingTaskNode) {
reportingTaskNode.verifyCanStart();
- processScheduler.schedule(reportingTaskNode);
+ processScheduler.schedule(reportingTaskNode);
}
-
@Override
public void stopReportingTask(final ReportingTaskNode reportingTaskNode) {
reportingTaskNode.verifyCanStop();
@@ -1591,52 +1585,50 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
@Override
public void removeReportingTask(final ReportingTaskNode reportingTaskNode) {
- writeLock.lock();
- try {
- final ReportingTaskNode existing = reportingTasks.get(reportingTaskNode.getIdentifier());
- if ( existing == null || existing != reportingTaskNode ) {
- throw new IllegalStateException("Reporting Task " + reportingTaskNode + " does not exist in this Flow");
- }
-
- reportingTaskNode.verifyCanDelete();
-
- try (final NarCloseable x = NarCloseable.withNarLoader()) {
- ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnRemoved.class, reportingTaskNode.getReportingTask(), reportingTaskNode.getConfigurationContext());
- }
-
- for ( final Map.Entry<PropertyDescriptor, String> entry : reportingTaskNode.getProperties().entrySet() ) {
- final PropertyDescriptor descriptor = entry.getKey();
- if (descriptor.getControllerServiceDefinition() != null ) {
- final String value = entry.getValue() == null ? descriptor.getDefaultValue() : entry.getValue();
- if ( value != null ) {
- final ControllerServiceNode serviceNode = controllerServiceProvider.getControllerServiceNode(value);
- if ( serviceNode != null ) {
- serviceNode.removeReference(reportingTaskNode);
- }
- }
- }
- }
-
- reportingTasks.remove(reportingTaskNode.getIdentifier());
- } finally {
- writeLock.unlock("removeReportingTask");
- }
- }
-
-
+ writeLock.lock();
+ try {
+ final ReportingTaskNode existing = reportingTasks.get(reportingTaskNode.getIdentifier());
+ if (existing == null || existing != reportingTaskNode) {
+ throw new IllegalStateException("Reporting Task " + reportingTaskNode + " does not exist in this Flow");
+ }
+
+ reportingTaskNode.verifyCanDelete();
+
+ try (final NarCloseable x = NarCloseable.withNarLoader()) {
+ ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnRemoved.class, reportingTaskNode.getReportingTask(), reportingTaskNode.getConfigurationContext());
+ }
+
+ for (final Map.Entry<PropertyDescriptor, String> entry : reportingTaskNode.getProperties().entrySet()) {
+ final PropertyDescriptor descriptor = entry.getKey();
+ if (descriptor.getControllerServiceDefinition() != null) {
+ final String value = entry.getValue() == null ? descriptor.getDefaultValue() : entry.getValue();
+ if (value != null) {
+ final ControllerServiceNode serviceNode = controllerServiceProvider.getControllerServiceNode(value);
+ if (serviceNode != null) {
+ serviceNode.removeReference(reportingTaskNode);
+ }
+ }
+ }
+ }
+
+ reportingTasks.remove(reportingTaskNode.getIdentifier());
+ } finally {
+ writeLock.unlock("removeReportingTask");
+ }
+ }
+
@Override
public void disableReportingTask(final ReportingTaskNode reportingTask) {
reportingTask.verifyCanDisable();
processScheduler.disableReportingTask(reportingTask);
}
-
+
@Override
public void enableReportingTask(final ReportingTaskNode reportingTask) {
reportingTask.verifyCanEnable();
processScheduler.enableReportingTask(reportingTask);
}
-
-
+
/**
* Handle a bulletins message.
*
@@ -2336,7 +2328,7 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
// merge the response
final NodeResponse clientResponse = mergeResponses(uri, method, nodeResponses, mutableRequest);
holder.set(clientResponse);
-
+
// if we have a response get the updated cluster context for auditing and revision updating
Revision updatedRevision = null;
if (mutableRequest && clientResponse != null) {
@@ -2367,18 +2359,18 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
logger.warn("Classpath issue detected because failed to deserialize cluster context from node response due to: " + cnfe, cnfe);
}
}
-
+
return updatedRevision;
}
};
-
+
// federate the request and lock on the revision
if (mutableRequest) {
optimisticLockingManager.setRevision(federateRequest);
} else {
federateRequest.execute(optimisticLockingManager.getLastModification().getRevision());
}
-
+
return holder.get();
}
@@ -2387,7 +2379,7 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
}
private static boolean isProcessorEndpoint(final URI uri, final String method) {
- if (("GET".equalsIgnoreCase(method) || "PUT".equalsIgnoreCase(method)) && (PROCESSOR_URI_PATTERN.matcher(uri.getPath()).matches() || CLUSTER_PROCESSOR_URI_PATTERN.matcher(uri.getPath()).matches()) ) {
+ if (("GET".equalsIgnoreCase(method) || "PUT".equalsIgnoreCase(method)) && (PROCESSOR_URI_PATTERN.matcher(uri.getPath()).matches() || CLUSTER_PROCESSOR_URI_PATTERN.matcher(uri.getPath()).matches())) {
return true;
} else if ("POST".equalsIgnoreCase(method) && PROCESSORS_URI_PATTERN.matcher(uri.getPath()).matches()) {
return true;
@@ -2434,11 +2426,11 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
private static boolean isProvenanceEventEndpoint(final URI uri, final String method) {
return "GET".equalsIgnoreCase(method) && PROVENANCE_EVENT_URI.matcher(uri.getPath()).matches();
}
-
+
private static boolean isControllerServicesEndpoint(final URI uri, final String method) {
return "GET".equalsIgnoreCase(method) && CONTROLLER_SERVICES_URI.equals(uri.getPath());
}
-
+
private static boolean isControllerServiceEndpoint(final URI uri, final String method) {
if (("GET".equalsIgnoreCase(method) || "PUT".equalsIgnoreCase(method)) && CONTROLLER_SERVICE_URI_PATTERN.matcher(uri.getPath()).matches()) {
return true;
@@ -2448,19 +2440,19 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
return false;
}
-
+
private static boolean isControllerServiceReferenceEndpoint(final URI uri, final String method) {
if (("GET".equalsIgnoreCase(method) || "PUT".equalsIgnoreCase(method)) && CONTROLLER_SERVICE_REFERENCES_URI_PATTERN.matcher(uri.getPath()).matches()) {
return true;
}
-
+
return false;
}
-
+
private static boolean isReportingTasksEndpoint(final URI uri, final String method) {
return "GET".equalsIgnoreCase(method) && REPORTING_TASKS_URI.equals(uri.getPath());
}
-
+
private static boolean isReportingTaskEndpoint(final URI uri, final String method) {
if (("GET".equalsIgnoreCase(method) || "PUT".equalsIgnoreCase(method)) && REPORTING_TASK_URI_PATTERN.matcher(uri.getPath()).matches()) {
return true;
@@ -2661,7 +2653,7 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
remoteProcessGroup.setAuthorizationIssues(mergedAuthorizationIssues);
}
}
-
+
private void mergeControllerServiceReferences(final Set<ControllerServiceReferencingComponentDTO> referencingComponents, final Map<NodeIdentifier, Set<ControllerServiceReferencingComponentDTO>> referencingComponentMap) {
final Map<String, Integer> activeThreadCounts = new HashMap<>();
final Map<String, String> states = new HashMap<>();
@@ -2669,7 +2661,7 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
final Set<ControllerServiceReferencingComponentDTO> nodeReferencingComponents = nodeEntry.getValue();
// go through all the nodes referencing components
- if ( nodeReferencingComponents != null ) {
+ if (nodeReferencingComponents != null) {
for (final ControllerServiceReferencingComponentDTO nodeReferencingComponent : nodeReferencingComponents) {
// handle active thread counts
if (nodeReferencingComponent.getActiveThreadCount() != null && nodeReferencingComponent.getActiveThreadCount() > 0) {
@@ -2680,7 +2672,7 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
activeThreadCounts.put(nodeReferencingComponent.getId(), nodeReferencingComponent.getActiveThreadCount() + current);
}
}
-
+
// handle controller service state
final String state = states.get(nodeReferencingComponent.getId());
if (state == null) {
@@ -2692,7 +2684,7 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
}
}
}
- }
+ }
// go through each referencing components
for (final ControllerServiceReferencingComponentDTO referencingComponent : referencingComponents) {
@@ -2700,24 +2692,24 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
if (activeThreadCount != null) {
referencingComponent.setActiveThreadCount(activeThreadCount);
}
-
+
final String state = states.get(referencingComponent.getId());
if (state != null) {
referencingComponent.setState(state);
}
}
}
-
+
private void mergeControllerService(final ControllerServiceDTO controllerService, final Map<NodeIdentifier, ControllerServiceDTO> controllerServiceMap) {
final Map<String, Set<NodeIdentifier>> validationErrorMap = new HashMap<>();
final Set<ControllerServiceReferencingComponentDTO> referencingComponents = controllerService.getReferencingComponents();
final Map<NodeIdentifier, Set<ControllerServiceReferencingComponentDTO>> nodeReferencingComponentsMap = new HashMap<>();
-
+
String state = null;
for (final Map.Entry<NodeIdentifier, ControllerServiceDTO> nodeEntry : controllerServiceMap.entrySet()) {
final NodeIdentifier nodeId = nodeEntry.getKey();
final ControllerServiceDTO nodeControllerService = nodeEntry.getValue();
-
+
if (state == null) {
if (ControllerServiceState.DISABLING.name().equals(nodeControllerService.getState())) {
state = ControllerServiceState.DISABLING.name();
@@ -2725,27 +2717,27 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
state = ControllerServiceState.ENABLING.name();
}
}
-
+
for (final ControllerServiceReferencingComponentDTO nodeReferencingComponents : nodeControllerService.getReferencingComponents()) {
nodeReferencingComponentsMap.put(nodeId, nodeReferencingComponents.getReferencingComponents());
}
-
+
// merge the validation errors
mergeValidationErrors(validationErrorMap, nodeId, nodeControllerService.getValidationErrors());
}
-
+
// merge the referencing components
mergeControllerServiceReferences(referencingComponents, nodeReferencingComponentsMap);
-
+
// store the 'transition' state is applicable
if (state != null) {
controllerService.setState(state);
}
-
+
// set the merged the validation errors
controllerService.setValidationErrors(normalizedMergedValidationErrors(validationErrorMap, controllerServiceMap.size()));
}
-
+
private void mergeReportingTask(final ReportingTaskDTO reportingTask, final Map<NodeIdentifier, ReportingTaskDTO> reportingTaskMap) {
final Map<String, Set<NodeIdentifier>> validationErrorMap = new HashMap<>();
@@ -2757,24 +2749,25 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
if (nodeReportingTask.getActiveThreadCount() != null) {
activeThreadCount += nodeReportingTask.getActiveThreadCount();
}
-
+
// merge the validation errors
mergeValidationErrors(validationErrorMap, nodeId, nodeReportingTask.getValidationErrors());
}
// set the merged active thread counts
reportingTask.setActiveThreadCount(activeThreadCount);
-
+
// set the merged the validation errors
reportingTask.setValidationErrors(normalizedMergedValidationErrors(validationErrorMap, reportingTaskMap.size()));
}
/**
- * Merges the validation errors into the specified map, recording the corresponding node identifier.
- *
+ * Merges the validation errors into the specified map, recording the
+ * corresponding node identifier.
+ *
* @param validationErrorMap
* @param nodeId
- * @param nodeValidationErrors
+ * @param nodeValidationErrors
*/
public void mergeValidationErrors(final Map<String, Set<NodeIdentifier>> validationErrorMap, final NodeIdentifier nodeId, final Collection<String> nodeValidationErrors) {
if (nodeValidationErrors != null) {
@@ -2788,13 +2781,14 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
}
}
}
-
+
/**
- * Normalizes the validation errors by prepending the corresponding nodes when the error does not exist across all nodes.
- *
+ * Normalizes the validation errors by prepending the corresponding nodes
+ * when the error does not exist across all nodes.
+ *
* @param validationErrorMap
* @param totalNodes
- * @return
+ * @return
*/
public Set<String> normalizedMergedValidationErrors(final Map<String, Set<NodeIdentifier>> validationErrorMap, int totalNodes) {
final Set<String> normalizedValidationErrors = new HashSet<>();
@@ -2812,7 +2806,7 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
}
return normalizedValidationErrors;
}
-
+
// requires write lock to be already acquired unless request is not mutable
private NodeResponse mergeResponses(final URI uri, final String method, final Set<NodeResponse> nodeResponses, final boolean mutableRequest) {
// holds the one response of all the node responses to return to the client
@@ -3105,7 +3099,7 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
} else if (hasSuccessfulClientResponse && isControllerServiceEndpoint(uri, method)) {
final ControllerServiceEntity responseEntity = clientResponse.getClientResponse().getEntity(ControllerServiceEntity.class);
final ControllerServiceDTO controllerService = responseEntity.getControllerService();
-
+
final Map<NodeIdentifier, ControllerServiceDTO> resultsMap = new HashMap<>();
for (final NodeResponse nodeResponse : updatedNodesMap.values()) {
if (problematicNodeResponses.contains(nodeResponse)) {
@@ -3118,12 +3112,12 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
resultsMap.put(nodeResponse.getNodeId(), nodeControllerService);
}
mergeControllerService(controllerService, resultsMap);
-
+
clientResponse = new NodeResponse(clientResponse, responseEntity);
} else if (hasSuccessfulClientResponse && isControllerServicesEndpoint(uri, method)) {
final ControllerServicesEntity responseEntity = clientResponse.getClientResponse().getEntity(ControllerServicesEntity.class);
final Set<ControllerServiceDTO> controllerServices = responseEntity.getControllerServices();
-
+
final Map<String, Map<NodeIdentifier, ControllerServiceDTO>> controllerServiceMap = new HashMap<>();
for (final NodeResponse nodeResponse : updatedNodesMap.values()) {
if (problematicNodeResponses.contains(nodeResponse)) {
@@ -3156,7 +3150,7 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
} else if (hasSuccessfulClientResponse && isControllerServiceReferenceEndpoint(uri, method)) {
final ControllerServiceReferencingComponentsEntity responseEntity = clientResponse.getClientResponse().getEntity(ControllerServiceReferencingComponentsEntity.class);
final Set<ControllerServiceReferencingComponentDTO> referencingComponents = responseEntity.getControllerServiceReferencingComponents();
-
+
final Map<NodeIdentifier, Set<ControllerServiceReferencingComponentDTO>> resultsMap = new HashMap<>();
for (final NodeResponse nodeResponse : updatedNodesMap.values()) {
if (problematicNodeResponses.contains(nodeResponse)) {
@@ -3169,12 +3163,12 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
resultsMap.put(nodeResponse.getNodeId(), nodeReferencingComponents);
}
mergeControllerServiceReferences(referencingComponents, resultsMap);
-
+
clientResponse = new NodeResponse(clientResponse, responseEntity);
} else if (hasSuccessfulClientResponse && isReportingTaskEndpoint(uri, method)) {
final ReportingTaskEntity responseEntity = clientResponse.getClientResponse().getEntity(ReportingTaskEntity.class);
final ReportingTaskDTO reportingTask = responseEntity.getReportingTask();
-
+
final Map<NodeIdentifier, ReportingTaskDTO> resultsMap = new HashMap<>();
for (final NodeResponse nodeResponse : updatedNodesMap.values()) {
if (problematicNodeResponses.contains(nodeResponse)) {
@@ -3187,12 +3181,12 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
resultsMap.put(nodeResponse.getNodeId(), nodeReportingTask);
}
mergeReportingTask(reportingTask, resultsMap);
-
+
clientResponse = new NodeResponse(clientResponse, responseEntity);
} else if (hasSuccessfulClientResponse && isReportingTasksEndpoint(uri, method)) {
final ReportingTasksEntity responseEntity = clientResponse.getClientResponse().getEntity(ReportingTasksEntity.class);
final Set<ReportingTaskDTO> reportingTaskSet = responseEntity.getReportingTasks();
-
+
final Map<String, Map<NodeIdentifier, ReportingTaskDTO>> reportingTaskMap = new HashMap<>();
for (final NodeResponse nodeResponse : updatedNodesMap.values()) {
if (problematicNodeResponses.contains(nodeResponse)) {
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/9dda16c9/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/node/Node.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/node/Node.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/node/Node.java
index 84565da..1b128f7 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/node/Node.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/node/Node.java
@@ -161,7 +161,7 @@ public class Node implements Cloneable, Comparable<Node> {
*
* This method is thread-safe and may be called without obtaining any lock.
*
- * @param connectionRequestedTimestamp
+ * @param connectionRequestedTimestamp timestamp
*/
public void setConnectionRequestedTimestamp(long connectionRequestedTimestamp) {
this.connectionRequestedTimestamp.set(connectionRequestedTimestamp);
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/9dda16c9/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/spring/ClusterManagerProtocolServiceLocatorFactoryBean.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/spring/ClusterManagerProtocolServiceLocatorFactoryBean.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/spring/ClusterManagerProtocolServiceLocatorFactoryBean.java
index e26d196..c369a7f 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/spring/ClusterManagerProtocolServiceLocatorFactoryBean.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/spring/ClusterManagerProtocolServiceLocatorFactoryBean.java
@@ -51,7 +51,7 @@ public class ClusterManagerProtocolServiceLocatorFactoryBean implements FactoryB
@Override
public Object getObject() throws Exception {
/*
- * If configured for the cluster manager, then the service locator is never used.
+ * If configured for the cluster manager, then the service locator is never used.
*/
if (properties.isClusterManager()) {
return null;
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/9dda16c9/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/spring/WebClusterManagerFactoryBean.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/spring/WebClusterManagerFactoryBean.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/spring/WebClusterManagerFactoryBean.java
index d3cff3b..7bcb203 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/spring/WebClusterManagerFactoryBean.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/spring/WebClusterManagerFactoryBean.java
@@ -49,7 +49,7 @@ public class WebClusterManagerFactoryBean implements FactoryBean, ApplicationCon
private NiFiProperties properties;
private StringEncryptor encryptor;
-
+
private OptimisticLockingManager optimisticLockingManager;
@Override
@@ -58,8 +58,8 @@ public class WebClusterManagerFactoryBean implements FactoryBean, ApplicationCon
throw new IllegalStateException("Application may be configured as a cluster manager or a node, but not both.");
} else if (!properties.isClusterManager()) {
/*
- * If not configured for the cluster manager, then the cluster manager is never used.
- * null is returned so that we don't instantiate a thread pool or other resources.
+ * If not configured for the cluster manager, then the cluster manager is never used.
+ * null is returned so that we don't instantiate a thread pool or other resources.
*/
return null;
} else if (clusterManager == null) {
@@ -127,7 +127,7 @@ public class WebClusterManagerFactoryBean implements FactoryBean, ApplicationCon
public void setEncryptor(final StringEncryptor encryptor) {
this.encryptor = encryptor;
}
-
+
public void setOptimisticLockingManager(OptimisticLockingManager optimisticLockingManager) {
this.optimisticLockingManager = optimisticLockingManager;
}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/9dda16c9/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-optimistic-locking/src/main/java/org/apache/nifi/web/ConfigurationRequest.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-optimistic-locking/src/main/java/org/apache/nifi/web/ConfigurationRequest.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-optimistic-locking/src/main/java/org/apache/nifi/web/ConfigurationRequest.java
index 939c3f0..c2e940a 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-optimistic-locking/src/main/java/org/apache/nifi/web/ConfigurationRequest.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-optimistic-locking/src/main/java/org/apache/nifi/web/ConfigurationRequest.java
@@ -20,14 +20,15 @@ package org.apache.nifi.web;
* Represents a request to configure. The implementations execute method will
* perform the configuration action. It will return type T which will be
* encapsulated in a ConfigurationSnapshot.
- *
- * @param <T>
+ *
+ * @param <T> type of request
*/
public interface ConfigurationRequest<T> {
/**
- * Executes a configuration action and returns the updated resulting configuration.
- *
+ * Executes a configuration action and returns the updated resulting
+ * configuration.
+ *
* @return The resulting configuration
*/
T execute();
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/9dda16c9/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-optimistic-locking/src/main/java/org/apache/nifi/web/ConfigurationSnapshot.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-optimistic-locking/src/main/java/org/apache/nifi/web/ConfigurationSnapshot.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-optimistic-locking/src/main/java/org/apache/nifi/web/ConfigurationSnapshot.java
index 8817d69..c706fd2 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-optimistic-locking/src/main/java/org/apache/nifi/web/ConfigurationSnapshot.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-optimistic-locking/src/main/java/org/apache/nifi/web/ConfigurationSnapshot.java
@@ -18,7 +18,8 @@ package org.apache.nifi.web;
/**
* Response object that captures some configuration for a given revision.
- * @param <T>
+ *
+ * @param <T> type of snapshot
*/
public class ConfigurationSnapshot<T> {
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/9dda16c9/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-optimistic-locking/src/main/java/org/apache/nifi/web/FlowModification.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-optimistic-locking/src/main/java/org/apache/nifi/web/FlowModification.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-optimistic-locking/src/main/java/org/apache/nifi/web/FlowModification.java
index f6bccb1..70aa30e 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-optimistic-locking/src/main/java/org/apache/nifi/web/FlowModification.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-optimistic-locking/src/main/java/org/apache/nifi/web/FlowModification.java
@@ -27,9 +27,9 @@ public class FlowModification {
/**
* Creates a new FlowModification.
- *
- * @param revision
- * @param lastModifier
+ *
+ * @param revision revision
+ * @param lastModifier modifier
*/
public FlowModification(Revision revision, String lastModifier) {
this.revision = revision;
@@ -38,8 +38,8 @@ public class FlowModification {
/**
* Get the revision.
- *
- * @return
+ *
+ * @return the revision
*/
public Revision getRevision() {
return revision;
@@ -47,11 +47,11 @@ public class FlowModification {
/**
* Get the last modifier.
- *
- * @return
+ *
+ * @return the modifier
*/
public String getLastModifier() {
return lastModifier;
}
-
+
}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/9dda16c9/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-optimistic-locking/src/main/java/org/apache/nifi/web/OptimisticLockingManager.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-optimistic-locking/src/main/java/org/apache/nifi/web/OptimisticLockingManager.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-optimistic-locking/src/main/java/org/apache/nifi/web/OptimisticLockingManager.java
index 4c54b7c..3cb1d45 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-optimistic-locking/src/main/java/org/apache/nifi/web/OptimisticLockingManager.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-optimistic-locking/src/main/java/org/apache/nifi/web/OptimisticLockingManager.java
@@ -26,26 +26,27 @@ package org.apache.nifi.web;
public interface OptimisticLockingManager {
/**
- * Attempts to execute the specified configuration request using the specified revision within a lock.
- *
- * @param <T>
- * @param revision
- * @param configurationRequest
- * @return
+ * Attempts to execute the specified configuration request using the
+ * specified revision within a lock.
+ *
+ * @param <T> type of snapshot
+ * @param revision revision
+ * @param configurationRequest request
+ * @return snapshot
*/
<T> ConfigurationSnapshot<T> configureFlow(Revision revision, ConfigurationRequest<T> configurationRequest);
-
+
/**
* Updates the revision using the specified revision within a lock.
- *
- * @param updateRevision
+ *
+ * @param updateRevision new revision
*/
void setRevision(UpdateRevision updateRevision);
/**
- * Returns the last flow modification. This is a combination of the revision and the user
- * who performed the modification.
- *
+ * Returns the last flow modification. This is a combination of the revision
+ * and the user who performed the modification.
+ *
* @return the last modification
*/
FlowModification getLastModification();