You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by mc...@apache.org on 2016/07/11 12:13:52 UTC
[1/2] nifi git commit: NIFI-2185: Proxy requests through the cluster
coordinator rather than making use of distributed read/write locks. This
closes #621
Repository: nifi
Updated Branches:
refs/heads/master b836db21a -> cf183e15e
http://git-wip-us.apache.org/repos/asf/nifi/blob/cf183e15/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiWebConfigurationContext.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiWebConfigurationContext.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiWebConfigurationContext.java
index 794c5a1..e340623 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiWebConfigurationContext.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiWebConfigurationContext.java
@@ -16,7 +16,24 @@
*/
package org.apache.nifi.web;
-import com.sun.jersey.core.util.MultivaluedMapImpl;
+import java.io.UnsupportedEncodingException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.net.URLEncoder;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+
+import javax.ws.rs.HttpMethod;
+import javax.ws.rs.core.MultivaluedMap;
+import javax.ws.rs.core.Response;
+
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.action.Action;
import org.apache.nifi.action.Component;
@@ -39,6 +56,8 @@ import org.apache.nifi.cluster.coordination.ClusterCoordinator;
import org.apache.nifi.cluster.coordination.http.replication.RequestReplicator;
import org.apache.nifi.cluster.manager.NodeResponse;
import org.apache.nifi.cluster.manager.exception.IllegalClusterStateException;
+import org.apache.nifi.cluster.manager.exception.NoClusterCoordinatorException;
+import org.apache.nifi.cluster.protocol.NodeIdentifier;
import org.apache.nifi.controller.ControllerService;
import org.apache.nifi.controller.reporting.ReportingTaskProvider;
import org.apache.nifi.controller.service.ControllerServiceProvider;
@@ -52,25 +71,11 @@ import org.apache.nifi.web.api.dto.RevisionDTO;
import org.apache.nifi.web.api.entity.ControllerServiceEntity;
import org.apache.nifi.web.api.entity.ProcessorEntity;
import org.apache.nifi.web.api.entity.ReportingTaskEntity;
-import org.apache.nifi.web.concurrent.LockExpiredException;
import org.apache.nifi.web.util.ClientResponseUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import javax.ws.rs.HttpMethod;
-import javax.ws.rs.core.MultivaluedMap;
-import javax.ws.rs.core.Response;
-import java.io.UnsupportedEncodingException;
-import java.net.URI;
-import java.net.URISyntaxException;
-import java.net.URLEncoder;
-import java.util.Collection;
-import java.util.Date;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Objects;
+import com.sun.jersey.core.util.MultivaluedMapImpl;
/**
* Implements the NiFiWebConfigurationContext interface to support a context in both standalone and clustered environments.
@@ -274,6 +279,18 @@ public class StandardNiFiWebConfigurationContext implements NiFiWebConfiguration
return componentFacade.updateComponent(requestContext, annotationData, properties);
}
+
+ private NodeResponse replicate(final String method, final URI uri, final Object entity, final Map<String, String> headers) throws InterruptedException {
+ final NodeIdentifier coordinatorNode = clusterCoordinator.getElectedActiveCoordinatorNode();
+ if (coordinatorNode == null) {
+ throw new NoClusterCoordinatorException();
+ }
+
+ final Set<NodeIdentifier> coordinatorNodes = Collections.singleton(coordinatorNode);
+ return requestReplicator.replicate(coordinatorNodes, method, uri, entity, headers, false).awaitMergedResponse();
+ }
+
+
/**
* Facade over accessing different types of NiFi components.
*/
@@ -331,7 +348,7 @@ public class StandardNiFiWebConfigurationContext implements NiFiWebConfiguration
// replicate request
NodeResponse nodeResponse;
try {
- nodeResponse = requestReplicator.replicate(HttpMethod.GET, requestUrl, parameters, getHeaders(requestContext)).awaitMergedResponse();
+ nodeResponse = replicate(HttpMethod.GET, requestUrl, parameters, getHeaders(requestContext));
} catch (final InterruptedException e) {
throw new IllegalClusterStateException("Request was interrupted while waiting for response from node");
}
@@ -396,7 +413,7 @@ public class StandardNiFiWebConfigurationContext implements NiFiWebConfiguration
// replicate request
NodeResponse nodeResponse;
try {
- nodeResponse = requestReplicator.replicate(HttpMethod.PUT, requestUrl, processorEntity, headers).awaitMergedResponse();
+ nodeResponse = replicate(HttpMethod.PUT, requestUrl, processorEntity, headers);
} catch (final InterruptedException e) {
throw new IllegalClusterStateException("Request was interrupted while waiting for response from node");
}
@@ -412,20 +429,9 @@ public class StandardNiFiWebConfigurationContext implements NiFiWebConfiguration
processor = entity.getComponent();
} else {
// update processor within write lock
- final String writeLockId = serviceFacade.obtainWriteLock();
- try {
- processor = serviceFacade.withWriteLock(writeLockId, () -> {
- ProcessorDTO processorDTO = buildProcessorDto(id, annotationData, properties);
- final ProcessorEntity entity = serviceFacade.updateProcessor(revision, processorDTO);
- return entity.getComponent();
- });
- } finally {
- // ensure the lock is released
- try {
- serviceFacade.releaseWriteLock(writeLockId);
- } catch (final LockExpiredException e) {
- }
- }
+ ProcessorDTO processorDTO = buildProcessorDto(id, annotationData, properties);
+ final ProcessorEntity entity = serviceFacade.updateProcessor(revision, processorDTO);
+ processor = entity.getComponent();
}
// return the processor info
@@ -530,7 +536,7 @@ public class StandardNiFiWebConfigurationContext implements NiFiWebConfiguration
// replicate request
NodeResponse nodeResponse;
try {
- nodeResponse = requestReplicator.replicate(HttpMethod.GET, requestUrl, parameters, getHeaders(requestContext)).awaitMergedResponse();
+ nodeResponse = replicate(HttpMethod.GET, requestUrl, parameters, getHeaders(requestContext));
} catch (final InterruptedException e) {
throw new IllegalClusterStateException("Request was interrupted while waiting for response from node");
}
@@ -569,20 +575,9 @@ public class StandardNiFiWebConfigurationContext implements NiFiWebConfiguration
controllerServiceDto.setAnnotationData(annotationData);
controllerServiceDto.setProperties(properties);
- // update controller service within write lock
- final String writeLockId = serviceFacade.obtainWriteLock();
- try {
- controllerService = serviceFacade.withWriteLock(writeLockId, () -> {
- final ControllerServiceEntity entity = serviceFacade.updateControllerService(revision, controllerServiceDto);
- return entity.getComponent();
- });
- } finally {
- // ensure the lock is released
- try {
- serviceFacade.releaseWriteLock(writeLockId);
- } catch (final LockExpiredException e) {
- }
- }
+ // update controller service
+ final ControllerServiceEntity entity = serviceFacade.updateControllerService(revision, controllerServiceDto);
+ controllerService = entity.getComponent();
} else {
// if this is a standalone instance the service should have been found above... there should
// no cluster to replicate the request to
@@ -628,7 +623,7 @@ public class StandardNiFiWebConfigurationContext implements NiFiWebConfiguration
// replicate request
NodeResponse nodeResponse;
try {
- nodeResponse = requestReplicator.replicate(HttpMethod.PUT, requestUrl, controllerServiceEntity, headers).awaitMergedResponse();
+ nodeResponse = replicate(HttpMethod.PUT, requestUrl, controllerServiceEntity, headers);
} catch (final InterruptedException e) {
throw new IllegalClusterStateException("Request was interrupted while waiting for response from node");
}
@@ -702,7 +697,7 @@ public class StandardNiFiWebConfigurationContext implements NiFiWebConfiguration
// replicate request
NodeResponse nodeResponse;
try {
- nodeResponse = requestReplicator.replicate(HttpMethod.GET, requestUrl, parameters, getHeaders(requestContext)).awaitMergedResponse();
+ nodeResponse = replicate(HttpMethod.GET, requestUrl, parameters, getHeaders(requestContext));
} catch (final InterruptedException e) {
throw new IllegalClusterStateException("Request was interrupted while waiting for response from node");
}
@@ -742,20 +737,9 @@ public class StandardNiFiWebConfigurationContext implements NiFiWebConfiguration
reportingTaskDto.setProperties(properties);
// obtain write lock
- final String writeLockId = serviceFacade.obtainWriteLock();
- try {
- reportingTask = serviceFacade.withWriteLock(writeLockId, () -> {
- serviceFacade.verifyRevision(revision, user);
- final ReportingTaskEntity entity = serviceFacade.updateReportingTask(revision, reportingTaskDto);
- return entity.getComponent();
- });
- } finally {
- // ensure the lock is released
- try {
- serviceFacade.releaseWriteLock(writeLockId);
- } catch (final LockExpiredException e) {
- }
- }
+ serviceFacade.verifyRevision(revision, user);
+ final ReportingTaskEntity entity = serviceFacade.updateReportingTask(revision, reportingTaskDto);
+ reportingTask = entity.getComponent();
} else {
// if this is a standalone instance the task should have been found above... there should
// no cluster to replicate the request to
@@ -801,7 +785,7 @@ public class StandardNiFiWebConfigurationContext implements NiFiWebConfiguration
// replicate request
NodeResponse nodeResponse;
try {
- nodeResponse = requestReplicator.replicate(HttpMethod.PUT, requestUrl, reportingTaskEntity, headers).awaitMergedResponse();
+ nodeResponse = replicate(HttpMethod.PUT, requestUrl, reportingTaskEntity, headers);
} catch (final InterruptedException e) {
throw new IllegalClusterStateException("Request was interrupted while waiting for response from node");
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/cf183e15/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ApplicationResource.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ApplicationResource.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ApplicationResource.java
index 6fdca03..42f836c 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ApplicationResource.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ApplicationResource.java
@@ -49,6 +49,8 @@ import org.apache.nifi.authorization.user.NiFiUser;
import org.apache.nifi.authorization.user.NiFiUserUtils;
import org.apache.nifi.cluster.coordination.ClusterCoordinator;
import org.apache.nifi.cluster.coordination.http.replication.RequestReplicator;
+import org.apache.nifi.cluster.manager.NodeResponse;
+import org.apache.nifi.cluster.manager.exception.NoClusterCoordinatorException;
import org.apache.nifi.cluster.manager.exception.UnknownNodeException;
import org.apache.nifi.cluster.protocol.NodeIdentifier;
import org.apache.nifi.controller.Snippet;
@@ -61,7 +63,6 @@ import org.apache.nifi.web.api.dto.RevisionDTO;
import org.apache.nifi.web.api.dto.SnippetDTO;
import org.apache.nifi.web.api.entity.ComponentEntity;
import org.apache.nifi.web.api.request.ClientIdParameter;
-import org.apache.nifi.web.concurrent.LockExpiredException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -296,7 +297,9 @@ public abstract class ApplicationResource {
final Map<String, String> result = new HashMap<>();
final Map<String, String> overriddenHeadersIgnoreCaseMap = new TreeMap<>(String.CASE_INSENSITIVE_ORDER);
- overriddenHeadersIgnoreCaseMap.putAll(overriddenHeaders);
+ if (overriddenHeaders != null) {
+ overriddenHeadersIgnoreCaseMap.putAll(overriddenHeaders);
+ }
final Enumeration<String> headerNames = httpServletRequest.getHeaderNames();
while (headerNames.hasMoreElements()) {
@@ -346,10 +349,6 @@ public abstract class ApplicationResource {
return isTwoPhaseRequest(httpServletRequest) && httpServletRequest.getHeader(RequestReplicator.REQUEST_VALIDATION_HTTP_HEADER) != null;
}
- protected boolean isLockCancelationPhase(final HttpServletRequest httpServletRequest) {
- return httpServletRequest.getHeader(RequestReplicator.LOCK_CANCELATION_HEADER) != null;
- }
-
/**
* Checks whether or not the request should be replicated to the cluster
*
@@ -482,59 +481,21 @@ public abstract class ApplicationResource {
final NiFiServiceFacade serviceFacade, final AuthorizeAccess authorizer, final Runnable verifier, final Supplier<Response> action,
final Runnable verifyRevision) {
- if (isLockCancelationPhase(httpServletRequest)) {
- final String lockVersionId = httpServletRequest.getHeader(RequestReplicator.LOCK_VERSION_ID_HEADER);
- try {
- serviceFacade.releaseWriteLock(lockVersionId);
- } catch (final Exception e) {
- // If the lock has expired, then it has already been unlocked.
- }
-
- return generateOkResponse().build();
+ final boolean validationPhase = isValidationPhase(httpServletRequest);
+ if (validationPhase || !isTwoPhaseRequest(httpServletRequest)) {
+ // authorize access
+ serviceFacade.authorizeAccess(authorizer);
+ verifyRevision.run();
}
- String lockId = null;
- try {
- final boolean validationPhase = isValidationPhase(httpServletRequest);
- if (validationPhase || !isTwoPhaseRequest(httpServletRequest)) {
- // authorize access
- serviceFacade.authorizeAccess(authorizer);
-
- lockId = httpServletRequest.getHeader(RequestReplicator.LOCK_VERSION_ID_HEADER);
- lockId = serviceFacade.obtainWriteLock(lockId);
- verifyRevision.run();
- } else {
- lockId = httpServletRequest.getHeader(RequestReplicator.LOCK_VERSION_ID_HEADER);
- }
-
- if (validationPhase) {
- if (verifier != null) {
- verifier.run();
- }
- return generateContinueResponse().build();
- }
-
- try {
- return serviceFacade.withWriteLock(lockId, () -> action.get());
- } finally {
- try {
- serviceFacade.releaseWriteLock(lockId);
- } catch (final LockExpiredException e) {
- // If the lock expires here, it's okay. We've already completed our action,
- // so the expiration of the lock is of no consequence to us.
- }
- }
- } catch (final RuntimeException t) {
- if (lockId != null) {
- try {
- serviceFacade.releaseWriteLock(lockId);
- } catch (final Exception e) {
- t.addSuppressed(e);
- }
+ if (validationPhase) {
+ if (verifier != null) {
+ verifier.run();
}
-
- throw t;
+ return generateContinueResponse().build();
}
+
+ return action.get();
}
/**
@@ -582,16 +543,57 @@ public abstract class ApplicationResource {
throw new UnknownNodeException("Cannot replicate request " + method + " " + getAbsolutePath() + " to node with ID " + nodeUuid + " because the specified node does not exist.");
}
- final Set<NodeIdentifier> targetNodes = Collections.singleton(nodeId);
final URI path = getAbsolutePath();
try {
final Map<String, String> headers = headersToOverride == null ? getHeaders() : getHeaders(headersToOverride);
- return requestReplicator.replicate(targetNodes, method, path, entity, headers).awaitMergedResponse().getResponse();
+
+ // Determine if we should replicate to the node directly or if we should replicate to the Cluster Coordinator,
+ // and have it replicate the request on our behalf.
+ if (getReplicationTarget() == ReplicationTarget.CLUSTER_NODES) {
+ // If we are to replicate directly to the nodes, we need to indicate that the replication source is
+ // the cluster coordinator so that the node knows to service the request.
+ final Set<NodeIdentifier> targetNodes = Collections.singleton(nodeId);
+ return requestReplicator.replicate(targetNodes, method, path, entity, headers, true).awaitMergedResponse().getResponse();
+ } else {
+ headers.put(RequestReplicator.REPLICATION_TARGET_NODE_UUID_HEADER, nodeId.getId());
+ return requestReplicator.replicate(Collections.singleton(getClusterCoordinatorNode()), method,
+ path, entity, headers, false).awaitMergedResponse().getResponse();
+ }
} catch (final InterruptedException ie) {
return Response.status(Response.Status.INTERNAL_SERVER_ERROR).entity("Request to " + method + " " + path + " was interrupted").type("text/plain").build();
}
}
+ protected NodeIdentifier getClusterCoordinatorNode() {
+ final NodeIdentifier activeClusterCoordinator = clusterCoordinator.getElectedActiveCoordinatorNode();
+ if (activeClusterCoordinator != null) {
+ return activeClusterCoordinator;
+ }
+
+ throw new NoClusterCoordinatorException();
+ }
+
+ protected ReplicationTarget getReplicationTarget() {
+ return clusterCoordinator.isActiveClusterCoordinator() ? ReplicationTarget.CLUSTER_NODES : ReplicationTarget.CLUSTER_COORDINATOR;
+ }
+
+ protected Response replicate(final String method, final NodeIdentifier targetNode) {
+ try {
+ // Determine whether we should replicate only to the cluster coordinator, or if we should replicate directly
+ // to the cluster nodes themselves.
+ if (getReplicationTarget() == ReplicationTarget.CLUSTER_NODES) {
+ final Set<NodeIdentifier> nodeIds = Collections.singleton(targetNode);
+ return getRequestReplicator().replicate(nodeIds, method, getAbsolutePath(), getRequestParameters(true), getHeaders(), true).awaitMergedResponse().getResponse();
+ } else {
+ final Set<NodeIdentifier> coordinatorNode = Collections.singleton(getClusterCoordinatorNode());
+ final Map<String, String> headers = getHeaders(Collections.singletonMap(RequestReplicator.REPLICATION_TARGET_NODE_UUID_HEADER, targetNode.getId()));
+ return getRequestReplicator().replicate(coordinatorNode, method, getAbsolutePath(), getRequestParameters(true), headers, false).awaitMergedResponse().getResponse();
+ }
+ } catch (final InterruptedException ie) {
+ return Response.status(Response.Status.INTERNAL_SERVER_ERROR).entity("Request to " + method + " " + getAbsolutePath() + " was interrupted").type("text/plain").build();
+ }
+ }
+
/**
* Convenience method for calling {@link #replicate(String, Object)} with an entity of
* {@link #getRequestParameters() getRequestParameters(true)}
@@ -604,6 +606,18 @@ public abstract class ApplicationResource {
}
/**
+ * Convenience method for calling {@link #replicateNodeResponse(String, Object, Map)} with an entity of
+ * {@link #getRequestParameters() getRequestParameters(true)} and overriding no headers
+ *
+ * @param method the HTTP method to use
+ * @return the response from the request
+ * @throws InterruptedException if interrupted while replicating the request
+ */
+ protected NodeResponse replicateNodeResponse(final String method) throws InterruptedException {
+ return replicateNodeResponse(method, getRequestParameters(true), (Map<String, String>) null);
+ }
+
+ /**
* Replicates the request to all nodes in the cluster using the provided method and entity. The headers
* used will be those provided by the {@link #getHeaders()} method. The URI that will be used will be
* that provided by the {@link #getAbsolutePath()} method
@@ -621,18 +635,45 @@ public abstract class ApplicationResource {
* used will be those provided by the {@link #getHeaders()} method. The URI that will be used will be
* that provided by the {@link #getAbsolutePath()} method
*
- * @param method the HTTP method to use
- * @param entity the entity to replicate
+ * @param method the HTTP method to use
+ * @param entity the entity to replicate
* @param headersToOverride the headers to override
* @return the response from the request
+ * @see #replicateNodeResponse(String, Object, Map)
*/
protected Response replicate(final String method, final Object entity, final Map<String, String> headersToOverride) {
- final URI path = getAbsolutePath();
try {
- final Map<String, String> headers = headersToOverride == null ? getHeaders() : getHeaders(headersToOverride);
- return requestReplicator.replicate(method, path, entity, headers).awaitMergedResponse().getResponse();
+ return replicateNodeResponse(method, entity, headersToOverride).getResponse();
} catch (final InterruptedException ie) {
- return Response.status(Response.Status.INTERNAL_SERVER_ERROR).entity("Request to " + method + " " + path + " was interrupted").type("text/plain").build();
+ return Response.status(Response.Status.INTERNAL_SERVER_ERROR).entity("Request to " + method + " " + getAbsolutePath() + " was interrupted").type("text/plain").build();
+ }
+ }
+
+ /**
+ * Replicates the request to all nodes in the cluster using the provided method and entity. The headers
+ * used will be those provided by the {@link #getHeaders()} method. The URI that will be used will be
+ * that provided by the {@link #getAbsolutePath()} method. This method returns the NodeResponse,
+ * rather than a Response object.
+ *
+ * @param method the HTTP method to use
+ * @param entity the entity to replicate
+ * @param headersToOverride the headers to override
+ *
+ * @return the response from the request
+ *
+ * @throws InterruptedException if interrupted while replicating the request
+ * @see #replicate(String, Object, Map)
+ */
+ protected NodeResponse replicateNodeResponse(final String method, final Object entity, final Map<String, String> headersToOverride) throws InterruptedException {
+ final URI path = getAbsolutePath();
+ final Map<String, String> headers = headersToOverride == null ? getHeaders() : getHeaders(headersToOverride);
+
+ // Determine whether we should replicate only to the cluster coordinator, or if we should replicate directly
+ // to the cluster nodes themselves.
+ if (getReplicationTarget() == ReplicationTarget.CLUSTER_NODES) {
+ return requestReplicator.replicate(method, path, entity, headers).awaitMergedResponse();
+ } else {
+ return requestReplicator.replicate(Collections.singleton(getClusterCoordinatorNode()), method, path, entity, headers, false).awaitMergedResponse();
}
}
@@ -667,4 +708,8 @@ public abstract class ApplicationResource {
protected NiFiProperties getProperties() {
return properties;
}
+
+ public static enum ReplicationTarget {
+ CLUSTER_NODES, CLUSTER_COORDINATOR;
+ }
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/cf183e15/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/CountersResource.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/CountersResource.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/CountersResource.java
index 7428fb0..fea5d30 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/CountersResource.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/CountersResource.java
@@ -16,13 +16,23 @@
*/
package org.apache.nifi.web.api;
-import com.sun.jersey.api.core.ResourceContext;
-import com.wordnik.swagger.annotations.Api;
-import com.wordnik.swagger.annotations.ApiOperation;
-import com.wordnik.swagger.annotations.ApiParam;
-import com.wordnik.swagger.annotations.ApiResponse;
-import com.wordnik.swagger.annotations.ApiResponses;
-import com.wordnik.swagger.annotations.Authorization;
+import java.util.Collections;
+import java.util.Set;
+
+import javax.servlet.http.HttpServletRequest;
+import javax.ws.rs.Consumes;
+import javax.ws.rs.DefaultValue;
+import javax.ws.rs.GET;
+import javax.ws.rs.HttpMethod;
+import javax.ws.rs.PUT;
+import javax.ws.rs.Path;
+import javax.ws.rs.PathParam;
+import javax.ws.rs.Produces;
+import javax.ws.rs.QueryParam;
+import javax.ws.rs.core.Context;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.Response;
+
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.authorization.AccessDeniedException;
import org.apache.nifi.authorization.AuthorizationRequest;
@@ -43,21 +53,13 @@ import org.apache.nifi.web.api.entity.CounterEntity;
import org.apache.nifi.web.api.entity.CountersEntity;
import org.apache.nifi.web.api.entity.Entity;
-import javax.servlet.http.HttpServletRequest;
-import javax.ws.rs.Consumes;
-import javax.ws.rs.DefaultValue;
-import javax.ws.rs.GET;
-import javax.ws.rs.HttpMethod;
-import javax.ws.rs.PUT;
-import javax.ws.rs.Path;
-import javax.ws.rs.PathParam;
-import javax.ws.rs.Produces;
-import javax.ws.rs.QueryParam;
-import javax.ws.rs.core.Context;
-import javax.ws.rs.core.MediaType;
-import javax.ws.rs.core.Response;
-import java.util.Collections;
-import java.util.Set;
+import com.sun.jersey.api.core.ResourceContext;
+import com.wordnik.swagger.annotations.Api;
+import com.wordnik.swagger.annotations.ApiOperation;
+import com.wordnik.swagger.annotations.ApiParam;
+import com.wordnik.swagger.annotations.ApiResponse;
+import com.wordnik.swagger.annotations.ApiResponses;
+import com.wordnik.swagger.annotations.Authorization;
/**
@@ -148,7 +150,17 @@ public class CountersResource extends ApplicationResource {
if (isReplicateRequest()) {
// determine where this request should be sent
if (clusterNodeId == null) {
- final NodeResponse nodeResponse = getRequestReplicator().replicate(HttpMethod.GET, getAbsolutePath(), getRequestParameters(true), getHeaders()).awaitMergedResponse();
+ final NodeResponse nodeResponse;
+
+ // Determine whether we should replicate only to the cluster coordinator, or if we should replicate directly
+ // to the cluster nodes themselves.
+ if (getReplicationTarget() == ReplicationTarget.CLUSTER_NODES) {
+ nodeResponse = getRequestReplicator().replicate(HttpMethod.GET, getAbsolutePath(), getRequestParameters(true), getHeaders()).awaitMergedResponse();
+ } else {
+ final Set<NodeIdentifier> coordinatorNode = Collections.singleton(getClusterCoordinatorNode());
+ nodeResponse = getRequestReplicator().replicate(coordinatorNode, HttpMethod.GET, getAbsolutePath(), getRequestParameters(true), getHeaders(), false).awaitMergedResponse();
+ }
+
final CountersEntity entity = (CountersEntity) nodeResponse.getUpdatedEntity();
// ensure there is an updated entity (result of merging) and prune the response as necessary
@@ -164,10 +176,7 @@ public class CountersResource extends ApplicationResource {
throw new UnknownNodeException("The specified cluster node does not exist.");
}
- final Set<NodeIdentifier> targetNodes = Collections.singleton(targetNode);
-
- // replicate the request to the specific node
- return getRequestReplicator().replicate(targetNodes, HttpMethod.GET, getAbsolutePath(), getRequestParameters(true), getHeaders()).awaitMergedResponse().getResponse();
+ return replicate(HttpMethod.GET, targetNode);
}
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/cf183e15/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/FlowFileQueueResource.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/FlowFileQueueResource.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/FlowFileQueueResource.java
index 2be19e6..18765d0 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/FlowFileQueueResource.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/FlowFileQueueResource.java
@@ -16,12 +16,29 @@
*/
package org.apache.nifi.web.api;
-import com.wordnik.swagger.annotations.Api;
-import com.wordnik.swagger.annotations.ApiOperation;
-import com.wordnik.swagger.annotations.ApiParam;
-import com.wordnik.swagger.annotations.ApiResponse;
-import com.wordnik.swagger.annotations.ApiResponses;
-import com.wordnik.swagger.annotations.Authorization;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.net.URI;
+
+import javax.servlet.http.HttpServletRequest;
+import javax.ws.rs.Consumes;
+import javax.ws.rs.DELETE;
+import javax.ws.rs.DefaultValue;
+import javax.ws.rs.GET;
+import javax.ws.rs.HttpMethod;
+import javax.ws.rs.POST;
+import javax.ws.rs.Path;
+import javax.ws.rs.PathParam;
+import javax.ws.rs.Produces;
+import javax.ws.rs.QueryParam;
+import javax.ws.rs.WebApplicationException;
+import javax.ws.rs.core.Context;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.Response;
+import javax.ws.rs.core.Response.Status;
+import javax.ws.rs.core.StreamingOutput;
+
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.authorization.Authorizer;
import org.apache.nifi.authorization.RequestAction;
@@ -41,29 +58,12 @@ import org.apache.nifi.web.api.entity.FlowFileEntity;
import org.apache.nifi.web.api.entity.ListingRequestEntity;
import org.apache.nifi.web.api.request.ClientIdParameter;
-import javax.servlet.http.HttpServletRequest;
-import javax.ws.rs.Consumes;
-import javax.ws.rs.DELETE;
-import javax.ws.rs.DefaultValue;
-import javax.ws.rs.GET;
-import javax.ws.rs.HttpMethod;
-import javax.ws.rs.POST;
-import javax.ws.rs.Path;
-import javax.ws.rs.PathParam;
-import javax.ws.rs.Produces;
-import javax.ws.rs.QueryParam;
-import javax.ws.rs.WebApplicationException;
-import javax.ws.rs.core.Context;
-import javax.ws.rs.core.MediaType;
-import javax.ws.rs.core.Response;
-import javax.ws.rs.core.Response.Status;
-import javax.ws.rs.core.StreamingOutput;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.net.URI;
-import java.util.Collections;
-import java.util.Set;
+import com.wordnik.swagger.annotations.Api;
+import com.wordnik.swagger.annotations.ApiOperation;
+import com.wordnik.swagger.annotations.ApiParam;
+import com.wordnik.swagger.annotations.ApiResponse;
+import com.wordnik.swagger.annotations.ApiResponses;
+import com.wordnik.swagger.annotations.Authorization;
/**
* RESTful endpoint for managing a flowfile queue.
@@ -168,10 +168,7 @@ public class FlowFileQueueResource extends ApplicationResource {
throw new UnknownNodeException("The specified cluster node does not exist.");
}
- final Set<NodeIdentifier> targetNodes = Collections.singleton(targetNode);
-
- // replicate the request to the specific node
- return getRequestReplicator().replicate(targetNodes, HttpMethod.GET, getAbsolutePath(), getRequestParameters(true), getHeaders()).awaitMergedResponse().getResponse();
+ return replicate(HttpMethod.GET, targetNode);
}
}
@@ -256,10 +253,7 @@ public class FlowFileQueueResource extends ApplicationResource {
throw new UnknownNodeException("The specified cluster node does not exist.");
}
- final Set<NodeIdentifier> targetNodes = Collections.singleton(targetNode);
-
- // replicate the request to the specific node
- return getRequestReplicator().replicate(targetNodes, HttpMethod.GET, getAbsolutePath(), getRequestParameters(true), getHeaders()).awaitMergedResponse().getResponse();
+ return replicate(HttpMethod.GET, targetNode);
}
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/cf183e15/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/FlowResource.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/FlowResource.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/FlowResource.java
index cc7d5aa..961984b 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/FlowResource.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/FlowResource.java
@@ -16,13 +16,31 @@
*/
package org.apache.nifi.web.api;
-import com.sun.jersey.api.core.ResourceContext;
-import com.wordnik.swagger.annotations.Api;
-import com.wordnik.swagger.annotations.ApiOperation;
-import com.wordnik.swagger.annotations.ApiParam;
-import com.wordnik.swagger.annotations.ApiResponse;
-import com.wordnik.swagger.annotations.ApiResponses;
-import com.wordnik.swagger.annotations.Authorization;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.EnumSet;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import javax.servlet.http.HttpServletRequest;
+import javax.ws.rs.Consumes;
+import javax.ws.rs.DefaultValue;
+import javax.ws.rs.GET;
+import javax.ws.rs.HttpMethod;
+import javax.ws.rs.PUT;
+import javax.ws.rs.Path;
+import javax.ws.rs.PathParam;
+import javax.ws.rs.Produces;
+import javax.ws.rs.QueryParam;
+import javax.ws.rs.WebApplicationException;
+import javax.ws.rs.core.Context;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.Response;
+
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.authorization.AccessDeniedException;
import org.apache.nifi.authorization.AuthorizationRequest;
@@ -104,29 +122,13 @@ import org.apache.nifi.web.api.request.DateTimeParameter;
import org.apache.nifi.web.api.request.IntegerParameter;
import org.apache.nifi.web.api.request.LongParameter;
-import javax.servlet.http.HttpServletRequest;
-import javax.ws.rs.Consumes;
-import javax.ws.rs.DefaultValue;
-import javax.ws.rs.GET;
-import javax.ws.rs.HttpMethod;
-import javax.ws.rs.PUT;
-import javax.ws.rs.Path;
-import javax.ws.rs.PathParam;
-import javax.ws.rs.Produces;
-import javax.ws.rs.QueryParam;
-import javax.ws.rs.WebApplicationException;
-import javax.ws.rs.core.Context;
-import javax.ws.rs.core.MediaType;
-import javax.ws.rs.core.Response;
-import java.util.ArrayList;
-import java.util.Date;
-import java.util.EnumSet;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.stream.Collectors;
+import com.sun.jersey.api.core.ResourceContext;
+import com.wordnik.swagger.annotations.Api;
+import com.wordnik.swagger.annotations.ApiOperation;
+import com.wordnik.swagger.annotations.ApiParam;
+import com.wordnik.swagger.annotations.ApiResponse;
+import com.wordnik.swagger.annotations.ApiResponses;
+import com.wordnik.swagger.annotations.Authorization;
/**
* RESTful endpoint for managing a Flow.
@@ -1219,7 +1221,7 @@ public class FlowResource extends ApplicationResource {
if (isReplicateRequest()) {
// determine where this request should be sent
if (clusterNodeId == null) {
- final NodeResponse nodeResponse = getRequestReplicator().replicate(HttpMethod.GET, getAbsolutePath(), getRequestParameters(true), getHeaders()).awaitMergedResponse();
+ final NodeResponse nodeResponse = replicateNodeResponse(HttpMethod.GET);
final ProcessorStatusEntity entity = (ProcessorStatusEntity) nodeResponse.getUpdatedEntity();
// ensure there is an updated entity (result of merging) and prune the response as necessary
@@ -1301,7 +1303,7 @@ public class FlowResource extends ApplicationResource {
if (isReplicateRequest()) {
// determine where this request should be sent
if (clusterNodeId == null) {
- final NodeResponse nodeResponse = getRequestReplicator().replicate(HttpMethod.GET, getAbsolutePath(), getRequestParameters(true), getHeaders()).awaitMergedResponse();
+ final NodeResponse nodeResponse = replicateNodeResponse(HttpMethod.GET);
final PortStatusEntity entity = (PortStatusEntity) nodeResponse.getUpdatedEntity();
// ensure there is an updated entity (result of merging) and prune the response as necessary
@@ -1383,7 +1385,7 @@ public class FlowResource extends ApplicationResource {
if (isReplicateRequest()) {
// determine where this request should be sent
if (clusterNodeId == null) {
- final NodeResponse nodeResponse = getRequestReplicator().replicate(HttpMethod.GET, getAbsolutePath(), getRequestParameters(true), getHeaders()).awaitMergedResponse();
+ final NodeResponse nodeResponse = replicateNodeResponse(HttpMethod.GET);
final PortStatusEntity entity = (PortStatusEntity) nodeResponse.getUpdatedEntity();
// ensure there is an updated entity (result of merging) and prune the response as necessary
@@ -1465,7 +1467,7 @@ public class FlowResource extends ApplicationResource {
if (isReplicateRequest()) {
// determine where this request should be sent
if (clusterNodeId == null) {
- final NodeResponse nodeResponse = getRequestReplicator().replicate(HttpMethod.GET, getAbsolutePath(), getRequestParameters(true), getHeaders()).awaitMergedResponse();
+ final NodeResponse nodeResponse = replicateNodeResponse(HttpMethod.GET);
final RemoteProcessGroupStatusEntity entity = (RemoteProcessGroupStatusEntity) nodeResponse.getUpdatedEntity();
// ensure there is an updated entity (result of merging) and prune the response as necessary
@@ -1556,7 +1558,7 @@ public class FlowResource extends ApplicationResource {
if (isReplicateRequest()) {
// determine where this request should be sent
if (clusterNodeId == null) {
- final NodeResponse nodeResponse = getRequestReplicator().replicate(HttpMethod.GET, getAbsolutePath(), getRequestParameters(true), getHeaders()).awaitMergedResponse();
+ final NodeResponse nodeResponse = replicateNodeResponse(HttpMethod.GET);
final ProcessGroupStatusEntity entity = (ProcessGroupStatusEntity) nodeResponse.getUpdatedEntity();
// ensure there is an updated entity (result of merging) and prune the response as necessary
@@ -1659,7 +1661,7 @@ public class FlowResource extends ApplicationResource {
if (isReplicateRequest()) {
// determine where this request should be sent
if (clusterNodeId == null) {
- final NodeResponse nodeResponse = getRequestReplicator().replicate(HttpMethod.GET, getAbsolutePath(), getRequestParameters(true), getHeaders()).awaitMergedResponse();
+ final NodeResponse nodeResponse = replicateNodeResponse(HttpMethod.GET);
final ConnectionStatusEntity entity = (ConnectionStatusEntity) nodeResponse.getUpdatedEntity();
// ensure there is an updated entity (result of merging) and prune the response as necessary
http://git-wip-us.apache.org/repos/asf/nifi/blob/cf183e15/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ProcessGroupResource.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ProcessGroupResource.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ProcessGroupResource.java
index 0a94fea..624937f 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ProcessGroupResource.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ProcessGroupResource.java
@@ -16,19 +16,42 @@
*/
package org.apache.nifi.web.api;
-import com.sun.jersey.api.core.ResourceContext;
-import com.sun.jersey.multipart.FormDataParam;
-import com.wordnik.swagger.annotations.Api;
-import com.wordnik.swagger.annotations.ApiOperation;
-import com.wordnik.swagger.annotations.ApiParam;
-import com.wordnik.swagger.annotations.ApiResponse;
-import com.wordnik.swagger.annotations.ApiResponses;
-import com.wordnik.swagger.annotations.Authorization;
+import java.io.InputStream;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+
+import javax.servlet.http.HttpServletRequest;
+import javax.ws.rs.Consumes;
+import javax.ws.rs.DELETE;
+import javax.ws.rs.DefaultValue;
+import javax.ws.rs.GET;
+import javax.ws.rs.HttpMethod;
+import javax.ws.rs.POST;
+import javax.ws.rs.PUT;
+import javax.ws.rs.Path;
+import javax.ws.rs.PathParam;
+import javax.ws.rs.Produces;
+import javax.ws.rs.QueryParam;
+import javax.ws.rs.WebApplicationException;
+import javax.ws.rs.core.Context;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.Response;
+import javax.xml.bind.JAXBContext;
+import javax.xml.bind.JAXBElement;
+import javax.xml.bind.JAXBException;
+import javax.xml.bind.Unmarshaller;
+import javax.xml.transform.stream.StreamSource;
+
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.authorization.Authorizer;
import org.apache.nifi.authorization.RequestAction;
import org.apache.nifi.authorization.resource.Authorizable;
import org.apache.nifi.authorization.user.NiFiUserUtils;
+import org.apache.nifi.cluster.protocol.NodeIdentifier;
import org.apache.nifi.controller.Snippet;
import org.apache.nifi.web.AuthorizableLookup;
import org.apache.nifi.web.NiFiServiceFacade;
@@ -65,33 +88,14 @@ import org.apache.nifi.web.api.request.LongParameter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import javax.servlet.http.HttpServletRequest;
-import javax.ws.rs.Consumes;
-import javax.ws.rs.DELETE;
-import javax.ws.rs.DefaultValue;
-import javax.ws.rs.GET;
-import javax.ws.rs.HttpMethod;
-import javax.ws.rs.POST;
-import javax.ws.rs.PUT;
-import javax.ws.rs.Path;
-import javax.ws.rs.PathParam;
-import javax.ws.rs.Produces;
-import javax.ws.rs.QueryParam;
-import javax.ws.rs.WebApplicationException;
-import javax.ws.rs.core.Context;
-import javax.ws.rs.core.MediaType;
-import javax.ws.rs.core.Response;
-import javax.xml.bind.JAXBContext;
-import javax.xml.bind.JAXBElement;
-import javax.xml.bind.JAXBException;
-import javax.xml.bind.Unmarshaller;
-import javax.xml.transform.stream.StreamSource;
-import java.io.InputStream;
-import java.net.URI;
-import java.net.URISyntaxException;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Set;
+import com.sun.jersey.api.core.ResourceContext;
+import com.sun.jersey.multipart.FormDataParam;
+import com.wordnik.swagger.annotations.Api;
+import com.wordnik.swagger.annotations.ApiOperation;
+import com.wordnik.swagger.annotations.ApiParam;
+import com.wordnik.swagger.annotations.ApiResponse;
+import com.wordnik.swagger.annotations.ApiResponses;
+import com.wordnik.swagger.annotations.Authorization;
/**
* RESTful endpoint for managing a Group.
@@ -1981,8 +1985,14 @@ public class ProcessGroupResource extends ApplicationResource {
final Map<String, String> headersToOverride = new HashMap<>();
headersToOverride.put("content-type", MediaType.APPLICATION_XML);
- // replicate the request
- return getRequestReplicator().replicate(HttpMethod.POST, importUri, entity, getHeaders(headersToOverride)).awaitMergedResponse().getResponse();
+ // Determine whether we should replicate only to the cluster coordinator, or if we should replicate directly
+ // to the cluster nodes themselves.
+ if (getReplicationTarget() == ReplicationTarget.CLUSTER_NODES) {
+ return getRequestReplicator().replicate(HttpMethod.POST, importUri, entity, getHeaders(headersToOverride)).awaitMergedResponse().getResponse();
+ } else {
+ final Set<NodeIdentifier> coordinatorNode = Collections.singleton(getClusterCoordinatorNode());
+ return getRequestReplicator().replicate(coordinatorNode, HttpMethod.POST, importUri, entity, getHeaders(headersToOverride), false).awaitMergedResponse().getResponse();
+ }
}
// otherwise import the template locally
http://git-wip-us.apache.org/repos/asf/nifi/blob/cf183e15/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/SystemDiagnosticsResource.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/SystemDiagnosticsResource.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/SystemDiagnosticsResource.java
index cbe6cf7..429fb4b 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/SystemDiagnosticsResource.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/SystemDiagnosticsResource.java
@@ -16,6 +16,9 @@
*/
package org.apache.nifi.web.api;
+import java.util.Collections;
+import java.util.Set;
+
import javax.ws.rs.Consumes;
import javax.ws.rs.DefaultValue;
import javax.ws.rs.GET;
@@ -37,6 +40,7 @@ import org.apache.nifi.authorization.resource.ResourceFactory;
import org.apache.nifi.authorization.user.NiFiUser;
import org.apache.nifi.authorization.user.NiFiUserUtils;
import org.apache.nifi.cluster.manager.NodeResponse;
+import org.apache.nifi.cluster.protocol.NodeIdentifier;
import org.apache.nifi.web.NiFiServiceFacade;
import org.apache.nifi.web.api.dto.SystemDiagnosticsDTO;
import org.apache.nifi.web.api.entity.SystemDiagnosticsEntity;
@@ -125,7 +129,17 @@ public class SystemDiagnosticsResource extends ApplicationResource {
if (isReplicateRequest()) {
// determine where this request should be sent
if (clusterNodeId == null) {
- final NodeResponse nodeResponse = getRequestReplicator().replicate(HttpMethod.GET, getAbsolutePath(), getRequestParameters(true), getHeaders()).awaitMergedResponse();
+ final NodeResponse nodeResponse;
+
+ // Determine whether we should replicate only to the cluster coordinator, or if we should replicate directly
+ // to the cluster nodes themselves.
+ if (getReplicationTarget() == ReplicationTarget.CLUSTER_NODES) {
+ nodeResponse = getRequestReplicator().replicate(HttpMethod.GET, getAbsolutePath(), getRequestParameters(true), getHeaders()).awaitMergedResponse();
+ } else {
+ final Set<NodeIdentifier> coordinatorNode = Collections.singleton(getClusterCoordinatorNode());
+ nodeResponse = getRequestReplicator().replicate(coordinatorNode, HttpMethod.GET, getAbsolutePath(), getRequestParameters(true), getHeaders(), false).awaitMergedResponse();
+ }
+
final SystemDiagnosticsEntity entity = (SystemDiagnosticsEntity) nodeResponse.getUpdatedEntity();
// ensure there is an updated entity (result of merging) and prune the response as necessary
http://git-wip-us.apache.org/repos/asf/nifi/blob/cf183e15/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/config/LockExpiredExceptionMapper.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/config/LockExpiredExceptionMapper.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/config/LockExpiredExceptionMapper.java
deleted file mode 100644
index 56c87e9..0000000
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/config/LockExpiredExceptionMapper.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.nifi.web.api.config;
-
-import javax.ws.rs.core.Response;
-import javax.ws.rs.ext.ExceptionMapper;
-import javax.ws.rs.ext.Provider;
-
-import org.apache.nifi.util.StringUtils;
-import org.apache.nifi.web.concurrent.LockExpiredException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-@Provider
-public class LockExpiredExceptionMapper implements ExceptionMapper<LockExpiredException> {
- private static final Logger logger = LoggerFactory.getLogger(InvalidRevisionExceptionMapper.class);
-
- @Override
- public Response toResponse(LockExpiredException exception) {
- // log the error
- logger.warn(String.format("%s. Returning %s response.", exception, Response.Status.CONFLICT));
-
- if (logger.isDebugEnabled()) {
- logger.debug(StringUtils.EMPTY, exception);
- }
-
- return Response.status(Response.Status.CONFLICT).entity(exception.getMessage()).type("text/plain").build();
- }
-}
http://git-wip-us.apache.org/repos/asf/nifi/blob/cf183e15/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/config/NoClusterCoordinatorExceptionMapper.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/config/NoClusterCoordinatorExceptionMapper.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/config/NoClusterCoordinatorExceptionMapper.java
new file mode 100644
index 0000000..4b15a70
--- /dev/null
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/config/NoClusterCoordinatorExceptionMapper.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.web.api.config;
+
+import javax.ws.rs.core.Response;
+import javax.ws.rs.ext.ExceptionMapper;
+
+import org.apache.nifi.cluster.manager.exception.NoClusterCoordinatorException;
+import org.apache.nifi.cluster.manager.exception.NoConnectedNodesException;
+import org.apache.nifi.util.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class NoClusterCoordinatorExceptionMapper implements ExceptionMapper<NoClusterCoordinatorException> {
+ private static final Logger logger = LoggerFactory.getLogger(NoConnectedNodesException.class);
+
+ @Override
+ public Response toResponse(final NoClusterCoordinatorException ex) {
+ // log the error
+ logger.info(String.format("Cluster failed processing request: %s. Returning %s response.", ex, Response.Status.SERVICE_UNAVAILABLE));
+
+ if (logger.isDebugEnabled()) {
+ logger.debug(StringUtils.EMPTY, ex);
+ }
+
+ return Response.status(Response.Status.SERVICE_UNAVAILABLE).entity(ex.getMessage()).type("text/plain").build();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/nifi/blob/cf183e15/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/resources/nifi-web-api-context.xml
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/resources/nifi-web-api-context.xml b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/resources/nifi-web-api-context.xml
index 15d2f0f..2e42efb 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/resources/nifi-web-api-context.xml
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/resources/nifi-web-api-context.xml
@@ -163,13 +163,8 @@
<property name="clusterCoordinator" ref="clusterCoordinator"/>
<property name="heartbeatMonitor" ref="heartbeatMonitor" />
<property name="bulletinRepository" ref="bulletinRepository"/>
- <property name="lockManager" ref="lockManager" />
</bean>
- <bean id="lockManager" class="org.apache.nifi.web.concurrent.DistributedReadWriteLock">
- <constructor-arg ref="nifiProperties" />
- </bean>
-
<!-- component ui extension configuration context -->
<bean id="nifiWebConfigurationContext" class="org.apache.nifi.web.StandardNiFiWebConfigurationContext">
<property name="serviceFacade" ref="serviceFacade"/>
@@ -397,12 +392,12 @@
<bean class="org.apache.nifi.web.api.config.IllegalNodeReconnectionExceptionMapper" scope="singleton"/>
<bean class="org.apache.nifi.web.api.config.IllegalStateExceptionMapper" scope="singleton"/>
<bean class="org.apache.nifi.web.api.config.InvalidRevisionExceptionMapper" scope="singleton"/>
- <bean class="org.apache.nifi.web.api.config.LockExpiredExceptionMapper" scope="singleton"/>
<bean class="org.apache.nifi.web.api.config.JsonMappingExceptionMapper" scope="singleton"/>
<bean class="org.apache.nifi.web.api.config.JsonParseExceptionMapper" scope="singleton"/>
<bean class="org.apache.nifi.web.api.config.MutableRequestExceptionMapper" scope="singleton"/>
<bean class="org.apache.nifi.web.api.config.NiFiCoreExceptionMapper" scope="singleton"/>
<bean class="org.apache.nifi.web.api.config.NoConnectedNodesExceptionMapper" scope="singleton"/>
+ <bean class="org.apache.nifi.web.api.config.NoClusterCoordinatorExceptionMapper" scope="singleton"/>
<bean class="org.apache.nifi.web.api.config.NoResponseFromNodesExceptionMapper" scope="singleton"/>
<bean class="org.apache.nifi.web.api.config.NodeDisconnectionExceptionMapper" scope="singleton"/>
<bean class="org.apache.nifi.web.api.config.NodeReconnectionExceptionMapper" scope="singleton"/>
http://git-wip-us.apache.org/repos/asf/nifi/blob/cf183e15/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-optimistic-locking/src/main/java/org/apache/nifi/web/concurrent/DistributedReadWriteLock.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-optimistic-locking/src/main/java/org/apache/nifi/web/concurrent/DistributedReadWriteLock.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-optimistic-locking/src/main/java/org/apache/nifi/web/concurrent/DistributedReadWriteLock.java
deleted file mode 100644
index 608d0c0..0000000
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-optimistic-locking/src/main/java/org/apache/nifi/web/concurrent/DistributedReadWriteLock.java
+++ /dev/null
@@ -1,49 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.nifi.web.concurrent;
-
-import java.util.concurrent.TimeUnit;
-
-import org.apache.nifi.util.FormatUtils;
-import org.apache.nifi.util.NiFiProperties;
-
-public class DistributedReadWriteLock implements DistributedLockingManager {
- private final DistributedLock readLock;
- private final DistributedLock writeLock;
-
- public DistributedReadWriteLock(final NiFiProperties properties) {
- this(FormatUtils.getTimeDuration(properties.getProperty(NiFiProperties.REQUEST_REPLICATION_CLAIM_TIMEOUT,
- NiFiProperties.DEFAULT_REQUEST_REPLICATION_CLAIM_TIMEOUT), TimeUnit.MILLISECONDS), TimeUnit.MILLISECONDS);
- }
-
- public DistributedReadWriteLock(final long lockExpirationPeriod, final TimeUnit lockExpirationUnit) {
- final ReadWriteLockSync sync = new ReadWriteLockSync();
- readLock = new ReentrantDistributedLock(LockMode.SHARED, sync, lockExpirationPeriod, lockExpirationUnit);
- writeLock = new ReentrantDistributedLock(LockMode.MUTUALLY_EXCLUSIVE, sync, lockExpirationPeriod, lockExpirationUnit);
- }
-
- @Override
- public DistributedLock getReadLock() {
- return readLock;
- }
-
- @Override
- public DistributedLock getWriteLock() {
- return writeLock;
- }
-}
http://git-wip-us.apache.org/repos/asf/nifi/blob/cf183e15/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-optimistic-locking/src/main/java/org/apache/nifi/web/concurrent/LockInfo.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-optimistic-locking/src/main/java/org/apache/nifi/web/concurrent/LockInfo.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-optimistic-locking/src/main/java/org/apache/nifi/web/concurrent/LockInfo.java
deleted file mode 100644
index a34bb5b..0000000
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-optimistic-locking/src/main/java/org/apache/nifi/web/concurrent/LockInfo.java
+++ /dev/null
@@ -1,55 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.nifi.web.concurrent;
-
-import java.util.concurrent.TimeUnit;
-
-public class LockInfo {
- private final String versionId;
- private final int lockCount;
- private final LockMode lockMode;
- private final long expirationTime;
-
- public LockInfo(final String versionId, final LockMode lockMode, final int lockCount, final long expirationPeriod, final TimeUnit expirationUnit) {
- this.versionId = versionId;
- this.lockMode = lockMode;
- this.lockCount = lockCount;
- this.expirationTime = System.nanoTime() + expirationUnit.toNanos(expirationPeriod);
- }
-
- public boolean isExpired() {
- return System.nanoTime() > expirationTime;
- }
-
- public String getVersionId() {
- return versionId;
- }
-
- public int getLockCount() {
- return lockCount;
- }
-
- public LockMode getLockMode() {
- return lockMode;
- }
-
- @Override
- public String toString() {
- return "LockInfo[versionId=" + versionId + ", lockMode=" + lockMode + ", lockCount = " + lockCount + ", expired=" + isExpired() + "]";
- }
-}
http://git-wip-us.apache.org/repos/asf/nifi/blob/cf183e15/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-optimistic-locking/src/main/java/org/apache/nifi/web/concurrent/LockMode.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-optimistic-locking/src/main/java/org/apache/nifi/web/concurrent/LockMode.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-optimistic-locking/src/main/java/org/apache/nifi/web/concurrent/LockMode.java
deleted file mode 100644
index 2cc5eac..0000000
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-optimistic-locking/src/main/java/org/apache/nifi/web/concurrent/LockMode.java
+++ /dev/null
@@ -1,26 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.nifi.web.concurrent;
-
-public enum LockMode {
-
- SHARED,
-
- MUTUALLY_EXCLUSIVE;
-
-}
http://git-wip-us.apache.org/repos/asf/nifi/blob/cf183e15/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-optimistic-locking/src/main/java/org/apache/nifi/web/concurrent/ReadWriteLockSync.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-optimistic-locking/src/main/java/org/apache/nifi/web/concurrent/ReadWriteLockSync.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-optimistic-locking/src/main/java/org/apache/nifi/web/concurrent/ReadWriteLockSync.java
deleted file mode 100644
index ef32f8f..0000000
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-optimistic-locking/src/main/java/org/apache/nifi/web/concurrent/ReadWriteLockSync.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.nifi.web.concurrent;
-
-import java.util.concurrent.atomic.AtomicReference;
-
-public class ReadWriteLockSync {
- private final AtomicReference<LockInfo> lockInfoRef = new AtomicReference<>();
-
- public LockInfo get() {
- return lockInfoRef.get();
- }
-
- public boolean update(final LockInfo currentLock, final LockInfo updatedLock) {
- return lockInfoRef.compareAndSet(currentLock, updatedLock);
- }
-}
http://git-wip-us.apache.org/repos/asf/nifi/blob/cf183e15/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-optimistic-locking/src/main/java/org/apache/nifi/web/concurrent/ReentrantDistributedLock.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-optimistic-locking/src/main/java/org/apache/nifi/web/concurrent/ReentrantDistributedLock.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-optimistic-locking/src/main/java/org/apache/nifi/web/concurrent/ReentrantDistributedLock.java
deleted file mode 100644
index c51feec..0000000
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-optimistic-locking/src/main/java/org/apache/nifi/web/concurrent/ReentrantDistributedLock.java
+++ /dev/null
@@ -1,174 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.nifi.web.concurrent;
-
-import java.util.UUID;
-import java.util.concurrent.TimeUnit;
-import java.util.function.Supplier;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class ReentrantDistributedLock implements DistributedLock {
- private static final Logger logger = LoggerFactory.getLogger(ReentrantDistributedLock.class);
-
- private final long expirationNanos;
-
- private final ReadWriteLockSync sync;
- private final LockMode lockMode;
-
- public ReentrantDistributedLock(final LockMode lockMode, final ReadWriteLockSync sync, final long expirationTimePeriod, final TimeUnit expirationTimeUnit) {
- this.lockMode = lockMode;
- this.sync = sync;
- this.expirationNanos = expirationTimeUnit.toNanos(expirationTimePeriod);
- }
-
- int getClaimCount() {
- final LockInfo currentInfo = sync.get();
- if (currentInfo == null || currentInfo.isExpired()) {
- return 0;
- }
-
- return currentInfo.getLockCount();
- }
-
- @Override
- public String lock() {
- return lock(null);
- }
-
- @Override
- public String lock(final String versionIdentifier) {
- return tryLock(-1L, TimeUnit.MILLISECONDS, versionIdentifier);
- }
-
- @Override
- public String tryLock(final long time, final TimeUnit timeUnit) {
- return tryLock(time, timeUnit, null);
- }
-
- @Override
- public String tryLock(final long timePeriod, final TimeUnit timeUnit, final String versionIdentifier) {
- final long stopTryingTime = timePeriod < 0 ? -1L : System.nanoTime() + timeUnit.toNanos(timePeriod);
- logger.debug("Attempting to obtain {} lock with a max wait of {} {}", lockMode, timePeriod, timeUnit);
-
- long i = 0;
- while (true) {
- if (i++ > 0) {
- if (stopTryingTime > 0L && System.nanoTime() > stopTryingTime) {
- logger.debug("Failed to obtain {} lock within {} {}; returning null for tryLock", lockMode, timePeriod, timeUnit);
- return null;
- }
-
- // If not the first time we've reached this point, we want to
- // give other threads a chance to release their locks before
- // we enter the synchronized block.
- Thread.yield();
- }
-
- synchronized (sync) {
- final LockInfo currentInfo = sync.get();
- logger.trace("Current Lock Info = {}", currentInfo);
-
- if (currentInfo == null || currentInfo.isExpired()) {
- // There is no lock currently held. Attempt to obtain the lock.
- final String versionId = versionIdentifier == null ? UUID.randomUUID().toString() : versionIdentifier;
- final boolean updated = updateLockInfo(currentInfo, versionId, 1);
-
- if (updated) {
- // Lock has been obtained. Return the current version.
- logger.debug("Obtained {} lock with Version ID {}", lockMode, versionId);
- return versionId;
- } else {
- // Try again.
- logger.debug("Failed to update atomic reference. Trying again");
- continue;
- }
- } else {
- // There is already a lock held. If the lock that is being held is SHARED,
- // and this is a SHARED lock, then we can use it.
- if (lockMode == LockMode.SHARED && currentInfo.getLockMode() == LockMode.SHARED) {
- logger.debug("Lock is already held but is a shared lock. Attempting to increment lock count");
-
- // lock being held is a shared lock, and this is a shared lock. We can just
- // update the Lock Info by incrementing the lock count and using a new expiration time.
- final boolean updated = updateLockInfo(currentInfo, currentInfo.getVersionId(), currentInfo.getLockCount() + 1);
- if (updated) {
- // lock info was updated. Return the current version.
- logger.debug("Incremented lock count. Obtained {} lock with Version ID {}", lockMode, currentInfo.getVersionId());
- return currentInfo.getVersionId();
- } else {
- // failed to update the lock info. The lock has expired, so we have to start over.
- logger.debug("Failed to update atomic reference. Trying again");
- continue;
- }
- } else {
- // either the lock being held is a mutex or this lock requires a mutex. Either
- // way, we cannot enter the lock, so we will wait a bit and then retry.
- // We wait before entering synchronized block, because we don't want to overuse
- // the CPU and we want to give other threads a chance to unlock the lock.
- logger.debug("Cannot obtain {} lock because it is already held and cannot be shared. Trying again", lockMode);
- continue;
- }
- }
- }
- }
- }
-
- protected boolean updateLockInfo(final LockInfo currentInfo, final String versionId, final int lockCount) {
- final LockInfo newInfo = new LockInfo(versionId, lockMode, lockCount, expirationNanos, TimeUnit.NANOSECONDS);
- return sync.update(currentInfo, newInfo);
- }
-
- @Override
- public <T> T withLock(final String identifier, final Supplier<T> action) throws LockExpiredException {
- synchronized (sync) {
- verifyIdentifier(identifier, sync.get());
- return action.get();
- }
- }
-
- @Override
- public void unlock(final String identifier) throws LockExpiredException {
- synchronized (sync) {
- final LockInfo info = sync.get();
- verifyIdentifier(identifier, info);
-
- final int newLockCount = info.getLockCount() - 1;
- if (newLockCount <= 0) {
- sync.update(info, null);
- } else {
- sync.update(info, new LockInfo(info.getVersionId(), lockMode, newLockCount, expirationNanos, TimeUnit.NANOSECONDS));
- }
- }
- }
-
- private void verifyIdentifier(final String identifier, final LockInfo lockInfo) throws LockExpiredException {
- if (lockInfo == null) {
- throw new LockExpiredException("No lock has been obtained");
- }
-
- if (!lockInfo.getVersionId().equals(identifier)) {
- throw new LockExpiredException("Incorrect Lock ID provided. This typically means that the lock has already expired and another lock has been obtained.");
- }
-
- if (lockInfo.isExpired()) {
- throw new LockExpiredException("Lock has already expired");
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/nifi/blob/cf183e15/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-optimistic-locking/src/test/java/org/apache/nifi/web/concurrent/TestReentrantDistributedLock.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-optimistic-locking/src/test/java/org/apache/nifi/web/concurrent/TestReentrantDistributedLock.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-optimistic-locking/src/test/java/org/apache/nifi/web/concurrent/TestReentrantDistributedLock.java
deleted file mode 100644
index 8027614..0000000
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-optimistic-locking/src/test/java/org/apache/nifi/web/concurrent/TestReentrantDistributedLock.java
+++ /dev/null
@@ -1,216 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.nifi.web.concurrent;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNotSame;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertTrue;
-
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicReferenceArray;
-
-import org.junit.Before;
-import org.junit.Test;
-
-public class TestReentrantDistributedLock {
- private ReadWriteLockSync sync;
-
- @Before
- public void setup() {
- sync = new ReadWriteLockSync();
- }
-
- @Test(timeout = 5000)
- public void testMultipleReadLocks() throws LockExpiredException {
- final ReentrantDistributedLock lock = createReadLock();
- final String id1 = lock.lock();
- final String id2 = lock.lock();
- assertEquals(id1, id2);
-
- assertEquals(2, lock.getClaimCount());
- lock.unlock(id1);
- assertEquals(1, lock.getClaimCount());
- lock.unlock(id2);
- assertEquals(0, lock.getClaimCount());
- }
-
- @Test(timeout = 10000)
- public void testMultipleWriteLocksBlock() throws LockExpiredException {
- final ReentrantDistributedLock lock = createWriteLock();
- final String id1 = lock.lock();
- assertNotNull(id1);
-
- final long startTime = System.nanoTime();
- final String id2 = lock.tryLock(500, TimeUnit.MILLISECONDS);
- assertNull(id2);
-
- // We don't know exactly how long it will take to timeout because the time periods
- // won't be exact, but it should take more than 350 milliseconds.
- final long nanos = System.nanoTime() - startTime;
- assertTrue(nanos > TimeUnit.MILLISECONDS.toNanos(350L));
-
- lock.unlock(id1);
- final String id3 = lock.tryLock(500, TimeUnit.MILLISECONDS);
- assertNotNull(id3);
- assertNotSame(id1, id3);
- lock.unlock(id3);
- }
-
- @Test(timeout = 10000)
- public void testReadLockBlocksWriteLock() throws LockExpiredException {
- final ReentrantDistributedLock readLock = createReadLock();
- final ReentrantDistributedLock writeLock = createWriteLock();
-
- final String id1 = readLock.lock();
- assertNotNull(id1);
-
- final long startTime = System.nanoTime();
- final String id2 = writeLock.tryLock(500, TimeUnit.MILLISECONDS);
- assertNull(id2);
-
- // We don't know exactly how long it will take to timeout because the time periods
- // won't be exact, but it should take more than 350 milliseconds.
- final long nanos = System.nanoTime() - startTime;
- assertTrue(nanos > TimeUnit.MILLISECONDS.toNanos(350L));
-
- readLock.unlock(id1);
-
- final String id3 = writeLock.lock();
- assertNotNull(id3);
- assertNotSame(id1, id3);
-
- writeLock.unlock(id3);
- }
-
- @Test(timeout = 10000)
- public void testWriteLockBlocksReadLock() throws LockExpiredException {
- final ReentrantDistributedLock readLock = createReadLock();
- final ReentrantDistributedLock writeLock = createWriteLock();
-
- final String id1 = writeLock.lock();
- assertNotNull(id1);
-
- final long startTime = System.nanoTime();
- final String id2 = readLock.tryLock(500, TimeUnit.MILLISECONDS);
- assertNull(id2);
-
- // We don't know exactly how long it will take to timeout because the time periods
- // won't be exact, but it should take more than 350 milliseconds.
- final long nanos = System.nanoTime() - startTime;
- assertTrue(nanos > TimeUnit.MILLISECONDS.toNanos(350L));
-
- writeLock.unlock(id1);
-
- final String id3 = readLock.lock();
- assertNotNull(id3);
- assertNotSame(id1, id3);
-
- readLock.unlock(id3);
- }
-
- @Test(timeout = 10000)
- public void testMultipleReadLocksBlockingOnWriteLock() throws InterruptedException, LockExpiredException {
- final ReentrantDistributedLock readLock = createReadLock();
- final ReentrantDistributedLock writeLock = createWriteLock();
-
- final String id1 = writeLock.lock();
- assertNotNull(id1);
-
- final ExecutorService executor = Executors.newFixedThreadPool(3);
- final AtomicReferenceArray<String> array = new AtomicReferenceArray<>(3);
- for (int i = 0; i < 3; i++) {
- final int index = i;
- executor.submit(new Runnable() {
- @Override
- public void run() {
- final String id = readLock.lock();
- assertNotNull(id);
- array.set(index, id);
- }
- });
- }
-
- // wait a bit and then make sure that no values have been set
- Thread.sleep(250L);
- for (int i = 0; i < 3; i++) {
- assertNull(array.get(i));
- }
-
- // unlock so that the readers can lock.
- writeLock.unlock(id1);
-
- executor.shutdown();
- executor.awaitTermination(10, TimeUnit.MINUTES);
-
- final String id = array.get(0);
- assertNotNull(id);
- for (int i = 0; i < 3; i++) {
- assertEquals(id, array.get(i));
- }
-
- for (int i = 0; i < 3; i++) {
- assertEquals(3 - i, readLock.getClaimCount());
- readLock.unlock(id);
- }
-
- assertEquals(0, readLock.getClaimCount());
- }
-
- @Test(timeout = 10000)
- public void testLockExpires() {
- final ReentrantDistributedLock lock = new ReentrantDistributedLock(LockMode.MUTUALLY_EXCLUSIVE, sync, 25, TimeUnit.MILLISECONDS);
- final String id1 = lock.lock();
- assertNotNull(id1);
-
- final long start = System.nanoTime();
- final String id2 = lock.lock();
- final long nanos = System.nanoTime() - start;
-
- assertNotNull(id2);
- assertNotSame(id1, id2);
-
- // The timeout may not entirely elapse but will be close. Give 5 milliseconds buffer
- assertTrue(nanos > TimeUnit.MILLISECONDS.toNanos(20));
- }
-
- @Test(timeout = 10000)
- public void testWithLock() throws LockExpiredException, Exception {
- final ReentrantDistributedLock lock = createWriteLock();
- final String id = lock.lock();
- assertEquals(1, lock.getClaimCount());
-
- final Object obj = new Object();
- final Object returned = lock.withLock(id, () -> obj);
- assertTrue(returned == obj);
- assertEquals(1, lock.getClaimCount());
- lock.unlock(id);
- assertEquals(0, lock.getClaimCount());
- }
-
- private ReentrantDistributedLock createReadLock() {
- return new ReentrantDistributedLock(LockMode.SHARED, sync, 30, TimeUnit.SECONDS);
- }
-
- private ReentrantDistributedLock createWriteLock() {
- return new ReentrantDistributedLock(LockMode.MUTUALLY_EXCLUSIVE, sync, 30, TimeUnit.SECONDS);
- }
-}
[2/2] nifi git commit: NIFI-2185: Proxy requests through the cluster
coordinator rather than making use of distributed read/write locks. This
closes #621
Posted by mc...@apache.org.
NIFI-2185: Proxy requests through the cluster coordinator rather than making use of distributed read/write locks. This closes #621
Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/cf183e15
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/cf183e15
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/cf183e15
Branch: refs/heads/master
Commit: cf183e15e3d400a138791ee5ca79287e87ebd932
Parents: b836db2
Author: Mark Payne <ma...@hotmail.com>
Authored: Fri Jul 8 12:58:06 2016 -0400
Committer: Matt Gilman <ma...@gmail.com>
Committed: Mon Jul 11 08:12:44 2016 -0400
----------------------------------------------------------------------
.../coordination/ClusterCoordinator.java | 25 +++
.../coordination/node/NodeConnectionStatus.java | 8 +-
.../http/replication/RequestReplicator.java | 26 ++-
.../ThreadPoolRequestReplicator.java | 35 ++-
.../node/NodeClusterCoordinator.java | 147 ++++++++++---
.../NoClusterCoordinatorException.java | 31 +++
.../heartbeat/TestAbstractHeartbeatMonitor.java | 28 ++-
.../TestThreadPoolRequestReplicator.java | 17 +-
.../node/TestNodeClusterCoordinator.java | 36 ++--
.../nifi/web/concurrent/DistributedLock.java | 111 ----------
.../concurrent/DistributedLockingManager.java | 71 ------
.../web/concurrent/LockExpiredException.java | 26 ---
.../nifi/web/revision/RevisionManager.java | 7 -
.../apache/nifi/controller/FlowController.java | 24 ++-
.../nifi/controller/StandardFlowService.java | 5 +-
.../nifi/spring/FlowControllerFactoryBean.java | 3 +
.../org/apache/nifi/web/NiFiServiceFacade.java | 100 ---------
.../nifi/web/StandardNiFiContentAccess.java | 12 +-
.../nifi/web/StandardNiFiServiceFacade.java | 49 -----
.../StandardNiFiWebConfigurationContext.java | 112 +++++-----
.../nifi/web/api/ApplicationResource.java | 171 +++++++++------
.../apache/nifi/web/api/CountersResource.java | 63 +++---
.../nifi/web/api/FlowFileQueueResource.java | 68 +++---
.../org/apache/nifi/web/api/FlowResource.java | 74 +++----
.../nifi/web/api/ProcessGroupResource.java | 84 ++++----
.../nifi/web/api/SystemDiagnosticsResource.java | 16 +-
.../api/config/LockExpiredExceptionMapper.java | 44 ----
.../NoClusterCoordinatorExceptionMapper.java | 44 ++++
.../src/main/resources/nifi-web-api-context.xml | 7 +-
.../concurrent/DistributedReadWriteLock.java | 49 -----
.../apache/nifi/web/concurrent/LockInfo.java | 55 -----
.../apache/nifi/web/concurrent/LockMode.java | 26 ---
.../nifi/web/concurrent/ReadWriteLockSync.java | 32 ---
.../concurrent/ReentrantDistributedLock.java | 174 ---------------
.../TestReentrantDistributedLock.java | 216 -------------------
35 files changed, 685 insertions(+), 1311 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/nifi/blob/cf183e15/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/coordination/ClusterCoordinator.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/coordination/ClusterCoordinator.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/coordination/ClusterCoordinator.java
index 7cd9f80..a2e17b5 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/coordination/ClusterCoordinator.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/coordination/ClusterCoordinator.java
@@ -167,6 +167,17 @@ public interface ClusterCoordinator {
NodeIdentifier getPrimaryNode();
/**
+ * @return the identifier of the node that is elected the active cluster coordinator, or <code>null</code> if
+ * there is no active cluster coordinator elected.
+ */
+ NodeIdentifier getElectedActiveCoordinatorNode();
+
+ /**
+ * @return <code>true</code> if this node has been elected the active cluster coordinator, <code>false</code> otherwise.
+ */
+ boolean isActiveClusterCoordinator();
+
+ /**
* Updates the Flow Service to use for obtaining the current flow
*
* @param flowService the flow service to use for obtaining the current flow
@@ -200,4 +211,18 @@ public interface ClusterCoordinator {
* @return <code>true</code> if connected, <code>false</code> otherwise
*/
boolean isConnected();
+
+ /**
+ * Notifies the cluster coordinator that this node has been granted the given role
+ *
+ * @param clusterRole the role that this node has been granted
+ */
+ void addRole(String clusterRole);
+
+ /**
+ * Notifies the cluster coordinator that this node is no longer responsible for the given role
+ *
+ * @param clusterRole the role that this node is no longer responsible for
+ */
+ void removeRole(String clusterRole);
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/cf183e15/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/coordination/node/NodeConnectionStatus.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/coordination/node/NodeConnectionStatus.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/coordination/node/NodeConnectionStatus.java
index 4570a24..23e509d 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/coordination/node/NodeConnectionStatus.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/coordination/node/NodeConnectionStatus.java
@@ -43,18 +43,11 @@ public class NodeConnectionStatus {
private final Long connectionRequestTime;
private final Set<String> roles;
- public NodeConnectionStatus(final NodeIdentifier nodeId, final NodeConnectionState state) {
- this(nodeId, state, null, null, null, null);
- }
public NodeConnectionStatus(final NodeIdentifier nodeId, final NodeConnectionState state, final Set<String> roles) {
this(nodeId, state, null, null, null, roles);
}
- public NodeConnectionStatus(final NodeIdentifier nodeId, final NodeConnectionState state, final long connectionRequestTime) {
- this(nodeId, state, null, null, connectionRequestTime, null);
- }
-
public NodeConnectionStatus(final NodeIdentifier nodeId, final DisconnectionCode disconnectionCode) {
this(nodeId, NodeConnectionState.DISCONNECTED, disconnectionCode, disconnectionCode.name(), null, null);
}
@@ -129,6 +122,7 @@ public class NodeConnectionStatus {
if (state == NodeConnectionState.DISCONNECTED || state == NodeConnectionState.DISCONNECTING) {
sb.append(", Disconnect Code=").append(getDisconnectCode()).append(", Disconnect Reason=").append(getDisconnectReason());
}
+ sb.append(", roles=").append(getRoles());
sb.append(", updateId=").append(getUpdateIdentifier());
sb.append("]");
return sb.toString();
http://git-wip-us.apache.org/repos/asf/nifi/blob/cf183e15/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/RequestReplicator.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/RequestReplicator.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/RequestReplicator.java
index fad7454..6724901 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/RequestReplicator.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/RequestReplicator.java
@@ -27,7 +27,6 @@ public interface RequestReplicator {
public static final String REQUEST_TRANSACTION_ID_HEADER = "X-RequestTransactionId";
public static final String CLUSTER_ID_GENERATION_SEED_HEADER = "X-Cluster-Id-Generation-Seed";
- public static final String REPLICATION_INDICATOR_HEADER = "X-Request-Replicated";
/**
* The HTTP header that the requestor specifies to ask a node if they are able to process a given request. The value
@@ -45,6 +44,25 @@ public interface RequestReplicator {
public static final String LOCK_VERSION_ID_HEADER = "X-Lock-Version-Id";
/**
+ * When we replicate a request across the cluster, we replicate it only from the cluster coordinator.
+ * If the request needs to be replicated by another node, it first replicates the request to the coordinator,
+ * which then replicates the request on the node's behalf. This header name and value are used to denote
+ * that the request has already been to the cluster coordinator, and the cluster coordinator is the one replicating
+ * the request. This allows us to know that the request should be serviced, rather than proxied back to the
+ * cluster coordinator.
+ */
+ public static final String REPLICATION_INDICATOR_HEADER = "X-Request-Replicated";
+
+ /**
+ * When replicating a request to the cluster coordinator, it may be useful to denote that the request should
+ * be replicated only to a single node. This happens, for instance, when retrieving a Provenance Event that
+ * we know lives on a specific node. This request must still be replicated through the cluster coordinator.
+ * This header tells the cluster coordinator the UUID's (comma-separated list, possibly with spaces between)
+ * of the nodes that the request should be replicated to.
+ */
+ public static final String REPLICATION_TARGET_NODE_UUID_HEADER = "X-Replication-Target-Id";
+
+ /**
* Stops the instance from replicating requests. Calling this method on a stopped instance has no effect.
*/
void shutdown();
@@ -54,7 +72,7 @@ public interface RequestReplicator {
* Replicates a request to each node in the cluster. If the request attempts to modify the flow and there is a node
* that is not currently connected, an Exception will be thrown. Otherwise, the returned AsyncClusterResponse object
* will contain the results that are immediately available, as well as an identifier for obtaining an updated result
- * later.
+ * later. NOTE: This method will ALWAYS indicate that the request has been replicated.
*
* @param method the HTTP method (e.g., POST, PUT)
* @param uri the base request URI (up to, but not including, the query string)
@@ -78,10 +96,12 @@ public interface RequestReplicator {
* @param uri the base request URI (up to, but not including, the query string)
* @param entity an entity
* @param headers any HTTP headers
+ * @param indicateReplicated if <code>true</code>, will add a header indicating to the receiving nodes that the request
+ * has already been replicated, so the receiving node will not replicate the request itself.
*
* @return an AsyncClusterResponse that indicates the current status of the request and provides an identifier for obtaining an updated response later
*/
- AsyncClusterResponse replicate(Set<NodeIdentifier> nodeIds, String method, URI uri, Object entity, Map<String, String> headers);
+ AsyncClusterResponse replicate(Set<NodeIdentifier> nodeIds, String method, URI uri, Object entity, Map<String, String> headers, boolean indicateReplicated);
/**
* <p>
http://git-wip-us.apache.org/repos/asf/nifi/blob/cf183e15/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/ThreadPoolRequestReplicator.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/ThreadPoolRequestReplicator.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/ThreadPoolRequestReplicator.java
index 7a9b56f..f36a4f4 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/ThreadPoolRequestReplicator.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/ThreadPoolRequestReplicator.java
@@ -36,6 +36,9 @@ import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.Function;
import java.util.stream.Collectors;
@@ -91,6 +94,10 @@ public class ThreadPoolRequestReplicator implements RequestReplicator {
private final ConcurrentMap<String, StandardAsyncClusterResponse> responseMap = new ConcurrentHashMap<>();
private final ConcurrentMap<NodeIdentifier, AtomicInteger> sequentialLongRequestCounts = new ConcurrentHashMap<>();
+ private final ReadWriteLock rwLock = new ReentrantReadWriteLock();
+ private final Lock readLock = rwLock.readLock();
+ private final Lock writeLock = rwLock.writeLock();
+
/**
* Creates an instance using a connection timeout and read timeout of 3 seconds
*
@@ -204,14 +211,18 @@ public class ThreadPoolRequestReplicator implements RequestReplicator {
}
final Set<NodeIdentifier> nodeIdSet = new HashSet<>(nodeIds);
- return replicate(nodeIdSet, method, uri, entity, headers);
+
+ return replicate(nodeIdSet, method, uri, entity, headers, true);
}
@Override
- public AsyncClusterResponse replicate(Set<NodeIdentifier> nodeIds, String method, URI uri, Object entity, Map<String, String> headers) {
+ public AsyncClusterResponse replicate(Set<NodeIdentifier> nodeIds, String method, URI uri, Object entity, Map<String, String> headers, final boolean indicateReplicated) {
final Map<String, String> updatedHeaders = new HashMap<>(headers);
updatedHeaders.put(RequestReplicator.CLUSTER_ID_GENERATION_SEED_HEADER, UUID.randomUUID().toString());
- updatedHeaders.put(RequestReplicator.REPLICATION_INDICATOR_HEADER, "true");
+
+ if (indicateReplicated) {
+ updatedHeaders.put(RequestReplicator.REPLICATION_INDICATOR_HEADER, "true");
+ }
// If the user is authenticated, add them as a proxied entity so that when the receiving NiFi receives the request,
// it knows that we are acting as a proxy on behalf of the current user.
@@ -221,7 +232,23 @@ public class ThreadPoolRequestReplicator implements RequestReplicator {
updatedHeaders.put(ProxiedEntitiesUtils.PROXY_ENTITIES_CHAIN, proxiedEntitiesChain);
}
- return replicate(nodeIds, method, uri, entity, updatedHeaders, true, null);
+ if (indicateReplicated) {
+ // If we are replicating a request and indicating that it is replicated, then this means that we are
+ // performing an action, rather than simply proxying the request to the cluster coordinator. In this case,
+ // we need to ensure that we use proper locking. We don't want two requests modifying the flow at the same
+ // time, so we use a write lock if the request is mutable and a read lock otherwise.
+ final Lock lock = isMutableRequest(method, uri.getPath()) ? writeLock : readLock;
+ logger.debug("Obtaining lock {} in order to replicate request {} {}", method, uri);
+ lock.lock();
+ try {
+ logger.debug("Lock {} obtained in order to replicate request {} {}", method, uri);
+ return replicate(nodeIds, method, uri, entity, updatedHeaders, true, null);
+ } finally {
+ lock.unlock();
+ }
+ } else {
+ return replicate(nodeIds, method, uri, entity, updatedHeaders, true, null);
+ }
}
/**
http://git-wip-us.apache.org/repos/asf/nifi/blob/cf183e15/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/node/NodeClusterCoordinator.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/node/NodeClusterCoordinator.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/node/NodeClusterCoordinator.java
index a90be64..1b410d6 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/node/NodeClusterCoordinator.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/node/NodeClusterCoordinator.java
@@ -17,6 +17,20 @@
package org.apache.nifi.cluster.coordination.node;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+
import org.apache.commons.collections4.queue.CircularFifoQueue;
import org.apache.nifi.cluster.coordination.ClusterCoordinator;
import org.apache.nifi.cluster.coordination.http.HttpResponseMerger;
@@ -49,20 +63,6 @@ import org.apache.nifi.web.revision.RevisionManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.UUID;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import java.util.regex.Pattern;
-import java.util.stream.Collectors;
-
public class NodeClusterCoordinator implements ClusterCoordinator, ProtocolHandler, RequestCompletionCallback {
private static final Logger logger = LoggerFactory.getLogger(NodeClusterCoordinator.class);
private static final String EVENT_CATEGORY = "Clustering";
@@ -92,10 +92,14 @@ public class NodeClusterCoordinator implements ClusterCoordinator, ProtocolHandl
senderListener.addHandler(this);
}
+ // method is synchronized because it modifies local node state and then broadcasts the change. We synchronize any time that this
+ // is done so that we don't have an issue where we create a NodeConnectionStatus, then another thread creates one and sends it
+ // before the first one is sent (as this results in the first status having a larger id, which means that the first status is never
+ // seen by other nodes).
@Override
- public void shutdown() {
+ public synchronized void shutdown() {
final NodeConnectionStatus shutdownStatus = new NodeConnectionStatus(getLocalNodeIdentifier(), DisconnectionCode.NODE_SHUTDOWN);
- notifyOthersOfNodeStatusChange(shutdownStatus);
+ updateNodeStatus(shutdownStatus);
logger.info("Successfully notified other nodes that I am shutting down");
}
@@ -108,6 +112,22 @@ public class NodeClusterCoordinator implements ClusterCoordinator, ProtocolHandl
return nodeId;
}
+ private NodeIdentifier waitForLocalNodeIdentifier() {
+ NodeIdentifier localNodeId = null;
+ while (localNodeId == null) {
+ localNodeId = getLocalNodeIdentifier();
+ if (localNodeId == null) {
+ try {
+ Thread.sleep(100L);
+ } catch (final InterruptedException ie) {
+ Thread.currentThread().interrupt();
+ }
+ }
+ }
+
+ return localNodeId;
+ }
+
@Override
public void resetNodeStatuses(final Map<NodeIdentifier, NodeConnectionStatus> statusMap) {
logger.info("Resetting cluster node statuses from {} to {}", nodeStatuses, statusMap);
@@ -163,7 +183,7 @@ public class NodeClusterCoordinator implements ClusterCoordinator, ProtocolHandl
reportEvent(nodeId, Severity.INFO, "Requesting that node connect to cluster on behalf of " + userDn);
}
- updateNodeStatus(new NodeConnectionStatus(nodeId, NodeConnectionState.CONNECTING, System.currentTimeMillis()));
+ updateNodeStatus(new NodeConnectionStatus(nodeId, NodeConnectionState.CONNECTING, null, null, System.currentTimeMillis(), getRoles(nodeId)));
// create the request
final ReconnectionRequestMessage request = new ReconnectionRequestMessage();
@@ -173,6 +193,11 @@ public class NodeClusterCoordinator implements ClusterCoordinator, ProtocolHandl
requestReconnectionAsynchronously(request, 10, 5);
}
+ private Set<String> getRoles(final NodeIdentifier nodeId) {
+ final NodeConnectionStatus status = getConnectionStatus(nodeId);
+ return status == null ? Collections.emptySet() : status.getRoles();
+ }
+
@Override
public void finishNodeConnection(final NodeIdentifier nodeId) {
final NodeConnectionState state = getConnectionState(nodeId);
@@ -194,12 +219,10 @@ public class NodeClusterCoordinator implements ClusterCoordinator, ProtocolHandl
}
logger.info("{} is now connected", nodeId);
- final boolean updated = updateNodeStatus(new NodeConnectionStatus(nodeId, NodeConnectionState.CONNECTED));
- if (!updated) {
- logger.error("Attempting to Finish Node Connection but could not find Node with Identifier {}", nodeId);
- }
+ updateNodeStatus(new NodeConnectionStatus(nodeId, NodeConnectionState.CONNECTED, getRoles(nodeId)));
}
+
@Override
public void requestNodeDisconnect(final NodeIdentifier nodeId, final DisconnectionCode disconnectionCode, final String explanation) {
logger.info("Requesting that {} disconnect due to {}", nodeId, explanation == null ? disconnectionCode : explanation);
@@ -245,7 +268,7 @@ public class NodeClusterCoordinator implements ClusterCoordinator, ProtocolHandl
reportEvent(nodeId, Severity.INFO, "User " + userDn + " requested that node be removed from cluster");
nodeStatuses.remove(nodeId);
nodeEvents.remove(nodeId);
- notifyOthersOfNodeStatusChange(new NodeConnectionStatus(nodeId, NodeConnectionState.REMOVED));
+ notifyOthersOfNodeStatusChange(new NodeConnectionStatus(nodeId, NodeConnectionState.REMOVED, Collections.emptySet()));
}
@Override
@@ -360,6 +383,47 @@ public class NodeClusterCoordinator implements ClusterCoordinator, ProtocolHandl
return null;
}
+ // method is synchronized because it modifies local node state and then broadcasts the change. We synchronize any time that this
+ // is done so that we don't have an issue where we create a NodeConnectionStatus, then another thread creates one and sends it
+ // before the first one is sent (as this results in the first status having a larger id, which means that the first status is never
+ // seen by other nodes).
+ @Override
+ public synchronized void addRole(final String clusterRole) {
+ final NodeIdentifier localNodeId = waitForLocalNodeIdentifier();
+ final NodeConnectionStatus status = getConnectionStatus(localNodeId);
+ final Set<String> roles = new HashSet<>();
+ if (status != null) {
+ roles.addAll(status.getRoles());
+ }
+
+ final boolean roleAdded = roles.add(clusterRole);
+
+ if (roleAdded) {
+ updateNodeRoles(localNodeId, roles);
+ logger.info("Cluster role {} added. This node is now responsible for the following roles: {}", clusterRole, roles);
+ }
+ }
+
+ // method is synchronized because it modifies local node state and then broadcasts the change. We synchronize any time that this
+ // is done so that we don't have an issue where we create a NodeConnectionStatus, then another thread creates one and sends it
+ // before the first one is sent (as this results in the first status having a larger id, which means that the first status is never
+ // seen by other nodes).
+ @Override
+ public synchronized void removeRole(final String clusterRole) {
+ final NodeIdentifier localNodeId = waitForLocalNodeIdentifier();
+ final NodeConnectionStatus status = getConnectionStatus(localNodeId);
+ final Set<String> roles = new HashSet<>();
+ if (status != null) {
+ roles.addAll(status.getRoles());
+ }
+
+ final boolean roleRemoved = roles.remove(clusterRole);
+
+ if (roleRemoved) {
+ updateNodeRoles(localNodeId, roles);
+ logger.info("Cluster role {} removed. This node is now responsible for the following roles: {}", clusterRole, roles);
+ }
+ }
@Override
public Set<NodeIdentifier> getNodeIdentifiers(final NodeConnectionState... states) {
@@ -390,6 +454,31 @@ public class NodeClusterCoordinator implements ClusterCoordinator, ProtocolHandl
}
@Override
+ public NodeIdentifier getElectedActiveCoordinatorNode() {
+ final Set<NodeIdentifier> connectedNodeIds = getNodeIdentifiers(NodeConnectionState.CONNECTED);
+ return connectedNodeIds.stream()
+ .map(nodeId -> getConnectionStatus(nodeId))
+ .filter(status -> status.getRoles().contains(ClusterRoles.CLUSTER_COORDINATOR))
+ .findFirst()
+ .map(status -> status.getNodeIdentifier())
+ .orElse(null);
+ }
+
+ @Override
+ public boolean isActiveClusterCoordinator() {
+ final NodeIdentifier self = getLocalNodeIdentifier();
+ if (self == null) {
+ return false;
+ }
+
+ final NodeConnectionStatus selfStatus = getConnectionStatus(self);
+ if (selfStatus == null) {
+ return false;
+ }
+ return selfStatus.getRoles().contains(ClusterRoles.CLUSTER_COORDINATOR);
+ }
+
+ @Override
public List<NodeEvent> getNodeEvents(final NodeIdentifier nodeId) {
final CircularFifoQueue<NodeEvent> eventQueue = nodeEvents.get(nodeId);
if (eventQueue == null) {
@@ -426,10 +515,9 @@ public class NodeClusterCoordinator implements ClusterCoordinator, ProtocolHandl
* if successful, <code>false</code> if no node exists with the given ID
*
* @param status the new status of the node
- * @return <code>true</code> if the node exists and is updated, <code>false</code> if the node does not exist
*/
// visible for testing.
- boolean updateNodeStatus(final NodeConnectionStatus status) {
+ void updateNodeStatus(final NodeConnectionStatus status) {
final NodeIdentifier nodeId = status.getNodeIdentifier();
// In this case, we are using nodeStatuses.put() instead of getting the current value and
@@ -445,8 +533,6 @@ public class NodeClusterCoordinator implements ClusterCoordinator, ProtocolHandl
if (currentState == null || currentState != status.getState()) {
notifyOthersOfNodeStatusChange(status);
}
-
- return true;
}
@@ -656,7 +742,7 @@ public class NodeClusterCoordinator implements ClusterCoordinator, ProtocolHandl
addNodeEvent(resolvedNodeIdentifier, "Connection requested from existing node. Setting status to connecting");
}
- status = new NodeConnectionStatus(resolvedNodeIdentifier, NodeConnectionState.CONNECTING, System.currentTimeMillis());
+ status = new NodeConnectionStatus(resolvedNodeIdentifier, NodeConnectionState.CONNECTING, null, null, System.currentTimeMillis(), getRoles(resolvedNodeIdentifier));
updateNodeStatus(status);
DataFlow dataFlow = null;
@@ -741,9 +827,10 @@ public class NodeClusterCoordinator implements ClusterCoordinator, ProtocolHandl
// disconnect problematic nodes
if (!problematicNodeResponses.isEmpty() && problematicNodeResponses.size() < nodeResponses.size()) {
- logger.warn(String.format("The following nodes failed to process URI %s '%s'. Requesting each node to disconnect from cluster: ", uriPath, problematicNodeResponses));
- for (final NodeResponse nodeResponse : problematicNodeResponses) {
- requestNodeDisconnect(nodeResponse.getNodeId(), DisconnectionCode.FAILED_TO_SERVICE_REQUEST, "Failed to process URI to " + method + " " + uriPath);
+ final Set<NodeIdentifier> failedNodeIds = problematicNodeResponses.stream().map(response -> response.getNodeId()).collect(Collectors.toSet());
+ logger.warn(String.format("The following nodes failed to process URI %s '%s'. Requesting each node disconnect from cluster.", uriPath, failedNodeIds));
+ for (final NodeIdentifier nodeId : failedNodeIds) {
+ requestNodeDisconnect(nodeId, DisconnectionCode.FAILED_TO_SERVICE_REQUEST, "Failed to process request " + method + " " + uriPath);
}
}
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/cf183e15/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/exception/NoClusterCoordinatorException.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/exception/NoClusterCoordinatorException.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/exception/NoClusterCoordinatorException.java
new file mode 100644
index 0000000..89b6722
--- /dev/null
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/exception/NoClusterCoordinatorException.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.cluster.manager.exception;
+
+public class NoClusterCoordinatorException extends ClusterException {
+ private static final long serialVersionUID = -1782098541351698293L;
+
+ public NoClusterCoordinatorException() {
+ super("Action cannot be performed because there is currently no Cluster Coordinator elected. "
+ + "The request should be tried again after a moment, after a Cluster Coordinator has been automatically elected.");
+ }
+
+ public NoClusterCoordinatorException(final String message) {
+ super(message);
+ }
+}
http://git-wip-us.apache.org/repos/asf/nifi/blob/cf183e15/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/heartbeat/TestAbstractHeartbeatMonitor.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/heartbeat/TestAbstractHeartbeatMonitor.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/heartbeat/TestAbstractHeartbeatMonitor.java
index 7640787..81d72ed 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/heartbeat/TestAbstractHeartbeatMonitor.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/heartbeat/TestAbstractHeartbeatMonitor.java
@@ -158,7 +158,7 @@ public class TestAbstractHeartbeatMonitor {
private NodeHeartbeat createHeartbeat(final NodeIdentifier nodeId, final NodeConnectionState state) {
- final NodeConnectionStatus status = new NodeConnectionStatus(nodeId, state);
+ final NodeConnectionStatus status = new NodeConnectionStatus(nodeId, state, Collections.emptySet());
return new StandardNodeHeartbeat(nodeId, System.currentTimeMillis(), status, Collections.emptySet(), 0, 0, 0, 0);
}
@@ -184,7 +184,7 @@ public class TestAbstractHeartbeatMonitor {
@Override
public synchronized void requestNodeConnect(NodeIdentifier nodeId, String userDn) {
- statuses.put(nodeId, new NodeConnectionStatus(nodeId, NodeConnectionState.CONNECTING));
+ statuses.put(nodeId, new NodeConnectionStatus(nodeId, NodeConnectionState.CONNECTING, Collections.emptySet()));
}
@Override
@@ -194,17 +194,17 @@ public class TestAbstractHeartbeatMonitor {
@Override
public synchronized void finishNodeConnection(NodeIdentifier nodeId) {
- statuses.put(nodeId, new NodeConnectionStatus(nodeId, NodeConnectionState.CONNECTED));
+ statuses.put(nodeId, new NodeConnectionStatus(nodeId, NodeConnectionState.CONNECTED, Collections.emptySet()));
}
@Override
public synchronized void requestNodeDisconnect(NodeIdentifier nodeId, DisconnectionCode disconnectionCode, String explanation) {
- statuses.put(nodeId, new NodeConnectionStatus(nodeId, NodeConnectionState.DISCONNECTED));
+ statuses.put(nodeId, new NodeConnectionStatus(nodeId, NodeConnectionState.DISCONNECTED, Collections.emptySet()));
}
@Override
public synchronized void disconnectionRequestedByNode(NodeIdentifier nodeId, DisconnectionCode disconnectionCode, String explanation) {
- statuses.put(nodeId, new NodeConnectionStatus(nodeId, NodeConnectionState.DISCONNECTED));
+ statuses.put(nodeId, new NodeConnectionStatus(nodeId, NodeConnectionState.DISCONNECTED, Collections.emptySet()));
}
@Override
@@ -287,6 +287,24 @@ public class TestAbstractHeartbeatMonitor {
@Override
public void setConnected(boolean connected) {
}
+
+ @Override
+ public NodeIdentifier getElectedActiveCoordinatorNode() {
+ return null;
+ }
+
+ @Override
+ public boolean isActiveClusterCoordinator() {
+ return false;
+ }
+
+ @Override
+ public void addRole(String clusterRole) {
+ }
+
+ @Override
+ public void removeRole(String clusterRole) {
+ }
}
public static class ReportedEvent {
http://git-wip-us.apache.org/repos/asf/nifi/blob/cf183e15/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/http/replication/TestThreadPoolRequestReplicator.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/http/replication/TestThreadPoolRequestReplicator.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/http/replication/TestThreadPoolRequestReplicator.java
index a9c0af9..2af3e88 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/http/replication/TestThreadPoolRequestReplicator.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/http/replication/TestThreadPoolRequestReplicator.java
@@ -28,6 +28,7 @@ import java.net.SocketTimeoutException;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
@@ -86,7 +87,7 @@ public class TestThreadPoolRequestReplicator {
final URI uri = new URI("http://localhost:8080/processors/1");
final Entity entity = new ProcessorEntity();
- final AsyncClusterResponse response = replicator.replicate(nodeIds, HttpMethod.GET, uri, entity, new HashMap<>());
+ final AsyncClusterResponse response = replicator.replicate(nodeIds, HttpMethod.GET, uri, entity, new HashMap<>(), true);
// We should get back the same response object
assertTrue(response == replicator.getClusterResponse(response.getRequestIdentifier()));
@@ -114,7 +115,7 @@ public class TestThreadPoolRequestReplicator {
final URI uri = new URI("http://localhost:8080/processors/1");
final Entity entity = new ProcessorEntity();
- final AsyncClusterResponse response = replicator.replicate(nodeIds, HttpMethod.GET, uri, entity, new HashMap<>());
+ final AsyncClusterResponse response = replicator.replicate(nodeIds, HttpMethod.GET, uri, entity, new HashMap<>(), true);
// We should get back the same response object
assertTrue(response == replicator.getClusterResponse(response.getRequestIdentifier()));
@@ -150,7 +151,7 @@ public class TestThreadPoolRequestReplicator {
final URI uri = new URI("http://localhost:8080/processors/1");
final Entity entity = new ProcessorEntity();
- final AsyncClusterResponse response = replicator.replicate(nodeIds, HttpMethod.GET, uri, entity, new HashMap<>());
+ final AsyncClusterResponse response = replicator.replicate(nodeIds, HttpMethod.GET, uri, entity, new HashMap<>(), true);
assertNotNull(response.awaitMergedResponse(1, TimeUnit.SECONDS));
} , null, 0L, new IllegalArgumentException("Exception created for unit test"));
}
@@ -163,7 +164,7 @@ public class TestThreadPoolRequestReplicator {
nodeIds.add(nodeId);
final ClusterCoordinator coordinator = Mockito.mock(ClusterCoordinator.class);
- Mockito.when(coordinator.getConnectionStatus(Mockito.any(NodeIdentifier.class))).thenReturn(new NodeConnectionStatus(nodeId, NodeConnectionState.CONNECTED));
+ Mockito.when(coordinator.getConnectionStatus(Mockito.any(NodeIdentifier.class))).thenReturn(new NodeConnectionStatus(nodeId, NodeConnectionState.CONNECTED, Collections.emptySet()));
final AtomicInteger requestCount = new AtomicInteger(0);
final ThreadPoolRequestReplicator replicator = new ThreadPoolRequestReplicator(2, new Client(), coordinator, "1 sec", "1 sec", null, null) {
@@ -190,7 +191,7 @@ public class TestThreadPoolRequestReplicator {
try {
final AsyncClusterResponse clusterResponse = replicator.replicate(nodeIds, HttpMethod.POST,
- new URI("http://localhost:80/processors/1"), new ProcessorEntity(), new HashMap<>());
+ new URI("http://localhost:80/processors/1"), new ProcessorEntity(), new HashMap<>(), true);
clusterResponse.awaitMergedResponse();
// Ensure that we received two requests - the first should contain the X-NcmExpects header; the second should not.
@@ -209,7 +210,7 @@ public class TestThreadPoolRequestReplicator {
Mockito.when(coordinator.getConnectionStatus(Mockito.any(NodeIdentifier.class))).thenAnswer(new Answer<NodeConnectionStatus>() {
@Override
public NodeConnectionStatus answer(InvocationOnMock invocation) throws Throwable {
- return new NodeConnectionStatus(invocation.getArgumentAt(0, NodeIdentifier.class), NodeConnectionState.CONNECTED);
+ return new NodeConnectionStatus(invocation.getArgumentAt(0, NodeIdentifier.class), NodeConnectionState.CONNECTED, Collections.emptySet());
}
});
@@ -234,7 +235,7 @@ public class TestThreadPoolRequestReplicator {
Mockito.when(coordinator.getConnectionStates()).thenReturn(nodeMap);
final ThreadPoolRequestReplicator replicator = new ThreadPoolRequestReplicator(2, new Client(), coordinator, "1 sec", "1 sec", null, null) {
@Override
- public AsyncClusterResponse replicate(Set<NodeIdentifier> nodeIds, String method, URI uri, Object entity, Map<String, String> headers) {
+ public AsyncClusterResponse replicate(Set<NodeIdentifier> nodeIds, String method, URI uri, Object entity, Map<String, String> headers, boolean indicateReplicated) {
return null;
}
};
@@ -307,7 +308,7 @@ public class TestThreadPoolRequestReplicator {
try {
final AsyncClusterResponse clusterResponse = replicator.replicate(nodeIds, HttpMethod.POST,
- new URI("http://localhost:80/processors/1"), new ProcessorEntity(), new HashMap<>());
+ new URI("http://localhost:80/processors/1"), new ProcessorEntity(), new HashMap<>(), true);
clusterResponse.awaitMergedResponse();
Assert.fail("Expected to get an IllegalClusterStateException but did not");
http://git-wip-us.apache.org/repos/asf/nifi/blob/cf183e15/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/node/TestNodeClusterCoordinator.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/node/TestNodeClusterCoordinator.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/node/TestNodeClusterCoordinator.java
index f1cccb6..e3d5295 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/node/TestNodeClusterCoordinator.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/node/TestNodeClusterCoordinator.java
@@ -89,10 +89,10 @@ public class TestNodeClusterCoordinator {
public void testConnectionResponseIndicatesAllNodes() throws IOException {
// Add a disconnected node
coordinator.updateNodeStatus(new NodeConnectionStatus(createNodeId(1), DisconnectionCode.LACK_OF_HEARTBEAT));
- coordinator.updateNodeStatus(new NodeConnectionStatus(createNodeId(2), NodeConnectionState.DISCONNECTING));
- coordinator.updateNodeStatus(new NodeConnectionStatus(createNodeId(3), NodeConnectionState.CONNECTING));
- coordinator.updateNodeStatus(new NodeConnectionStatus(createNodeId(4), NodeConnectionState.CONNECTED));
- coordinator.updateNodeStatus(new NodeConnectionStatus(createNodeId(5), NodeConnectionState.CONNECTED));
+ coordinator.updateNodeStatus(new NodeConnectionStatus(createNodeId(2), NodeConnectionState.DISCONNECTING, Collections.emptySet()));
+ coordinator.updateNodeStatus(new NodeConnectionStatus(createNodeId(3), NodeConnectionState.CONNECTING, Collections.emptySet()));
+ coordinator.updateNodeStatus(new NodeConnectionStatus(createNodeId(4), NodeConnectionState.CONNECTED, Collections.emptySet()));
+ coordinator.updateNodeStatus(new NodeConnectionStatus(createNodeId(5), NodeConnectionState.CONNECTED, Collections.emptySet()));
// Create a connection request message and send to the coordinator
final NodeIdentifier requestedNodeId = createNodeId(6);
@@ -272,10 +272,10 @@ public class TestNodeClusterCoordinator {
public void testGetConnectionStates() throws IOException {
// Add a disconnected node
coordinator.updateNodeStatus(new NodeConnectionStatus(createNodeId(1), DisconnectionCode.LACK_OF_HEARTBEAT));
- coordinator.updateNodeStatus(new NodeConnectionStatus(createNodeId(2), NodeConnectionState.DISCONNECTING));
- coordinator.updateNodeStatus(new NodeConnectionStatus(createNodeId(3), NodeConnectionState.CONNECTING));
- coordinator.updateNodeStatus(new NodeConnectionStatus(createNodeId(4), NodeConnectionState.CONNECTED));
- coordinator.updateNodeStatus(new NodeConnectionStatus(createNodeId(5), NodeConnectionState.CONNECTED));
+ coordinator.updateNodeStatus(new NodeConnectionStatus(createNodeId(2), NodeConnectionState.DISCONNECTING, Collections.emptySet()));
+ coordinator.updateNodeStatus(new NodeConnectionStatus(createNodeId(3), NodeConnectionState.CONNECTING, Collections.emptySet()));
+ coordinator.updateNodeStatus(new NodeConnectionStatus(createNodeId(4), NodeConnectionState.CONNECTED, Collections.emptySet()));
+ coordinator.updateNodeStatus(new NodeConnectionStatus(createNodeId(5), NodeConnectionState.CONNECTED, Collections.emptySet()));
final Map<NodeConnectionState, List<NodeIdentifier>> stateMap = coordinator.getConnectionStates();
assertEquals(4, stateMap.size());
@@ -302,10 +302,10 @@ public class TestNodeClusterCoordinator {
public void testGetNodeIdentifiers() throws IOException {
// Add a disconnected node
coordinator.updateNodeStatus(new NodeConnectionStatus(createNodeId(1), DisconnectionCode.LACK_OF_HEARTBEAT));
- coordinator.updateNodeStatus(new NodeConnectionStatus(createNodeId(2), NodeConnectionState.DISCONNECTING));
- coordinator.updateNodeStatus(new NodeConnectionStatus(createNodeId(3), NodeConnectionState.CONNECTING));
- coordinator.updateNodeStatus(new NodeConnectionStatus(createNodeId(4), NodeConnectionState.CONNECTED));
- coordinator.updateNodeStatus(new NodeConnectionStatus(createNodeId(5), NodeConnectionState.CONNECTED));
+ coordinator.updateNodeStatus(new NodeConnectionStatus(createNodeId(2), NodeConnectionState.DISCONNECTING, Collections.emptySet()));
+ coordinator.updateNodeStatus(new NodeConnectionStatus(createNodeId(3), NodeConnectionState.CONNECTING, Collections.emptySet()));
+ coordinator.updateNodeStatus(new NodeConnectionStatus(createNodeId(4), NodeConnectionState.CONNECTED, Collections.emptySet()));
+ coordinator.updateNodeStatus(new NodeConnectionStatus(createNodeId(5), NodeConnectionState.CONNECTED, Collections.emptySet()));
final Set<NodeIdentifier> connectedIds = coordinator.getNodeIdentifiers(NodeConnectionState.CONNECTED);
assertEquals(2, connectedIds.size());
@@ -330,7 +330,7 @@ public class TestNodeClusterCoordinator {
public void testRequestNodeDisconnect() throws InterruptedException {
// Add a connected node
final NodeIdentifier nodeId = createNodeId(1);
- coordinator.updateNodeStatus(new NodeConnectionStatus(nodeId, NodeConnectionState.CONNECTED));
+ coordinator.updateNodeStatus(new NodeConnectionStatus(nodeId, NodeConnectionState.CONNECTED, Collections.emptySet()));
// wait for the status change message and clear it
while (nodeStatusChangeMessages.isEmpty()) {
@@ -356,8 +356,8 @@ public class TestNodeClusterCoordinator {
final NodeIdentifier nodeId1 = createNodeId(1);
final NodeIdentifier nodeId2 = createNodeId(2);
- coordinator.updateNodeStatus(new NodeConnectionStatus(nodeId1, NodeConnectionState.CONNECTED));
- coordinator.updateNodeStatus(new NodeConnectionStatus(nodeId2, NodeConnectionState.CONNECTED));
+ coordinator.updateNodeStatus(new NodeConnectionStatus(nodeId1, NodeConnectionState.CONNECTED, Collections.emptySet()));
+ coordinator.updateNodeStatus(new NodeConnectionStatus(nodeId2, NodeConnectionState.CONNECTED, Collections.emptySet()));
// wait for the status change message and clear it
while (nodeStatusChangeMessages.size() < 2) {
@@ -381,7 +381,7 @@ public class TestNodeClusterCoordinator {
assertEquals(NodeConnectionState.CONNECTED, curStatus.getState());
// Verify that resetMap updates only the newer statuses
- final NodeConnectionStatus node2Disconnecting = new NodeConnectionStatus(nodeId2, NodeConnectionState.DISCONNECTING);
+ final NodeConnectionStatus node2Disconnecting = new NodeConnectionStatus(nodeId2, NodeConnectionState.DISCONNECTING, Collections.emptySet());
final Map<NodeIdentifier, NodeConnectionStatus> resetMap = new HashMap<>();
resetMap.put(nodeId1, oldStatus);
resetMap.put(nodeId2, node2Disconnecting);
@@ -398,14 +398,14 @@ public class TestNodeClusterCoordinator {
final NodeIdentifier nodeId1 = createNodeId(1);
final NodeIdentifier nodeId2 = createNodeId(2);
- coordinator.updateNodeStatus(new NodeConnectionStatus(nodeId1, NodeConnectionState.CONNECTED));
+ coordinator.updateNodeStatus(new NodeConnectionStatus(nodeId1, NodeConnectionState.CONNECTED, Collections.emptySet()));
// wait for the status change message and clear it
while (nodeStatusChangeMessages.isEmpty()) {
Thread.sleep(10L);
}
nodeStatusChangeMessages.clear();
- coordinator.updateNodeStatus(new NodeConnectionStatus(nodeId2, NodeConnectionState.CONNECTED));
+ coordinator.updateNodeStatus(new NodeConnectionStatus(nodeId2, NodeConnectionState.CONNECTED, Collections.emptySet()));
// wait for the status change message and clear it
while (nodeStatusChangeMessages.isEmpty()) {
Thread.sleep(10L);
http://git-wip-us.apache.org/repos/asf/nifi/blob/cf183e15/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/web/concurrent/DistributedLock.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/web/concurrent/DistributedLock.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/web/concurrent/DistributedLock.java
deleted file mode 100644
index b5c974f..0000000
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/web/concurrent/DistributedLock.java
+++ /dev/null
@@ -1,111 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.nifi.web.concurrent;
-
-import java.util.concurrent.Callable;
-import java.util.concurrent.TimeUnit;
-import java.util.function.Supplier;
-
-public interface DistributedLock {
-
- /**
- * Obtains a lock, blocking as long as necessary to obtain the lock.
- * Once a lock has been obtained, the identifier of the version of the lock is returned,
- * which can be passed to the {@link #withLock(String, Callable)} or
- * {@link #unlock(String)} method. Once this method returns, it is
- * important that either the {@link #withLock(String, Callable)} or
- * {@link #unlock(String)} method be called with this identifier. Otherwise,
- * any attempt to claim another read lock or write lock will block until this
- * lock expires.
- *
- * @return the identifier
- */
- String lock();
-
- /**
- * Obtains a lock, blocking as long as necessary to obtain the lock.
- * Once a lock has been obtained, the identifier of the version of the lock is returned,
- * which can be passed to the {@link #withLock(String, Callable)} or
- * {@link #unlock(String)} method. Once this method returns, it is
- * important that either the {@link #withLock(String, Callable)} or
- * {@link #unlock(String)} method be called with this identifier. Otherwise,
- * any attempt to claim another read lock or write lock will block until this
- * lock expires.
- *
- * @param versionIdentifier a value that should be used as the version id instead of generating one.
- * This allows us to ensure that all nodes in the cluster use the same id.
- *
- * @return the identifier
- */
- String lock(String versionIdentifier);
-
- /**
- * Waits up to the given amount of time to obtain a lock. If the lock is obtained
- * within this time period, the identifier will be returned, as with {@link #lock()}.
- * If the lock cannot be obtained within the given time period, <code>null</code> will
- * be returned.
- *
- * @param time the maximum amount of time to wait for the lock
- * @param timeUnit the unit of time that the time parameter is in
- * @return the identifier of the lock, or <code>null</code> if no lock is obtained
- */
- String tryLock(long time, TimeUnit timeUnit);
-
- /**
- * Waits up to the given amount of time to obtain a lock. If the lock is obtained
- * within this time period, the identifier will be returned, as with {@link #lock()}.
- * If the lock cannot be obtained within the given time period, <code>null</code> will
- * be returned.
- *
- * @param time the maximum amount of time to wait for the lock
- * @param timeUnit the unit of time that the time parameter is in
- * @param versionIdentifier a value that should be used as the version id instead of generating one.
- * This allows us to ensure that all nodes in the cluster use the same id.
- * @return the identifier of the lock, or <code>null</code> if no lock is obtained
- */
- String tryLock(long time, TimeUnit timeUnit, String versionIdentifier);
-
- /**
- * Performs the given action while this lock is held. The identifier of the lock that was
- * obtained by calling {@link #lock()} must be provided to this method. If the
- * lock identifier is incorrect, or the lock has expired, a {@link LockExpiredException}
- * will be thrown. This method provides a mechanism for verifying that the lock obtained
- * by {@link #lock()} or {@link #tryLock(long, TimeUnit)} is still valid and that the action
- * being performed will be done so without the lock expiring (i.e., if the lock expires while
- * the action is being performed, the lock won't be released until the provided action completes).
- *
- * @param identifier the identifier of the lock that has already been obtained
- * @param action the action to perform
- *
- * @return the value returned by the given action
- *
- * @throws LockExpiredException if the provided identifier is not the identifier of the currently
- * held lock, or if the lock that was obtained has already expired and is no longer valid
- */
- <T> T withLock(String identifier, Supplier<T> action) throws LockExpiredException;
-
- /**
- * Cancels the lock with the given identifier, so that the lock is no longer valid.
- *
- * @param identifier the identifier of the lock that was obtained by calling {@link #lock()}.
- *
- * @throws LockExpiredException if the provided identifier is not the identifier of the currently
- * held lock, or if the lock that was obtained has already expired and is no longer valid
- */
- void unlock(String identifier) throws LockExpiredException;
-}
http://git-wip-us.apache.org/repos/asf/nifi/blob/cf183e15/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/web/concurrent/DistributedLockingManager.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/web/concurrent/DistributedLockingManager.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/web/concurrent/DistributedLockingManager.java
deleted file mode 100644
index 09c1d5d..0000000
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/web/concurrent/DistributedLockingManager.java
+++ /dev/null
@@ -1,71 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.nifi.web.concurrent;
-
-/**
- * <p>
- * A DistributedLockingManager is responsible for exposing a mechanism that
- * clients can use to obtain a lock on the dataflow.
- * </p>
- *
- * <p>
- * Because of the way in which NiFi replicates requests from one node to all
- * other nodes in the cluster, it is important that all nodes in the cluster
- * are able to obtain a lock for the request before the request is allowed to
- * proceed. This is accomplished by using a two-phase approach. For each request
- * that will require a lock (either a read (shared) lock or a write (mutually
- * exclusive) lock), the request must be done in two phases. The first phase is
- * responsible for obtaining the lock and optionally performing validation of
- * the request. Once a node has obtained the necessary lock and performed any
- * required validation, the node will respond to the web request with a status
- * code of 150 - NodeContinue.
- * </p>
- *
- * <p>
- * At this point, the node that originated the request
- * will verify that either all nodes obtained a lock or that at least one node
- * failed to obtain a lock. If all nodes respond with a 150 - NodeContinue,
- * then the second phase of the request will occur. In the second phase, the
- * actual logic of the desired request is performed while the lock is held.
- * The lock is then released, once the logic is performed (or if the logic fails
- * to be performed).
- * </p>
- *
- * <p>
- * In the case that at least one node responds with a status code with than
- * 150 - NodeContinue, the node that originated the request will instead issue
- * a cancel request for the second phase so that all nodes are able to unlock
- * the lock that was previously obtained for the request.
- * </p>
- *
- * <p>
- * A key consideration in this type of approach that must be taken into account
- * is that the node that originated the request could, at any point in time, fail
- * as a result of the process being killed, power loss, network connectivity problems,
- * etc. As a result, the locks that are obtained through a DistributedLockingManager
- * are designed to expire after some amount of time, so that locks are not held
- * indefinitely, even in the case of node failure.
- * </p>
- */
-public interface DistributedLockingManager {
-
- DistributedLock getReadLock();
-
- DistributedLock getWriteLock();
-
-}
http://git-wip-us.apache.org/repos/asf/nifi/blob/cf183e15/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/web/concurrent/LockExpiredException.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/web/concurrent/LockExpiredException.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/web/concurrent/LockExpiredException.java
deleted file mode 100644
index c673c9f..0000000
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/web/concurrent/LockExpiredException.java
+++ /dev/null
@@ -1,26 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.nifi.web.concurrent;
-
-public class LockExpiredException extends RuntimeException {
- private static final long serialVersionUID = 1L;
-
- public LockExpiredException(String message) {
- super(message);
- }
-}
http://git-wip-us.apache.org/repos/asf/nifi/blob/cf183e15/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/web/revision/RevisionManager.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/web/revision/RevisionManager.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/web/revision/RevisionManager.java
index 6b6bc45..e10454f 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/web/revision/RevisionManager.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/web/revision/RevisionManager.java
@@ -22,7 +22,6 @@ import java.util.List;
import org.apache.nifi.authorization.user.NiFiUser;
import org.apache.nifi.web.Revision;
-import org.apache.nifi.web.concurrent.DistributedLockingManager;
/**
@@ -55,12 +54,6 @@ import org.apache.nifi.web.concurrent.DistributedLockingManager;
* request may continue, this means that all nodes have agreed that the client's Revisios are
* acceptable.
* </p>
- *
- * <p>
- * To ensure that the revisions remain consistent between the time that they are validated and
- * the time that the modification takes place, it is important that the revisions always be
- * validated while an appropriate read or write lock is held, via the {@link DistributedLockingManager}.
- * </p>
*/
public interface RevisionManager {
http://git-wip-us.apache.org/repos/asf/nifi/blob/cf183e15/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
index 8c710fa..0c90c50 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
@@ -65,6 +65,7 @@ import org.apache.nifi.authorization.resource.ProvenanceEventAuthorizable;
import org.apache.nifi.authorization.resource.ResourceFactory;
import org.apache.nifi.authorization.user.NiFiUser;
import org.apache.nifi.cluster.HeartbeatPayload;
+import org.apache.nifi.cluster.coordination.ClusterCoordinator;
import org.apache.nifi.cluster.coordination.heartbeat.HeartbeatMonitor;
import org.apache.nifi.cluster.coordination.node.ClusterRoles;
import org.apache.nifi.cluster.coordination.node.DisconnectionCode;
@@ -305,6 +306,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
private final List<Connectable> startConnectablesAfterInitialization;
private final List<RemoteGroupPort> startRemoteGroupPortsAfterInitialization;
private final LeaderElectionManager leaderElectionManager;
+ private final ClusterCoordinator clusterCoordinator;
/**
* true if controller is configured to operate in a clustered environment
@@ -383,6 +385,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
/* configuredForClustering */ false,
/* NodeProtocolSender */ null,
bulletinRepo,
+ /* cluster coordinator */ null,
/* heartbeat monitor */ null);
}
@@ -394,6 +397,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
final StringEncryptor encryptor,
final NodeProtocolSender protocolSender,
final BulletinRepository bulletinRepo,
+ final ClusterCoordinator clusterCoordinator,
final HeartbeatMonitor heartbeatMonitor) {
final FlowController flowController = new FlowController(
flowFileEventRepo,
@@ -404,6 +408,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
/* configuredForClustering */ true,
protocolSender,
bulletinRepo,
+ clusterCoordinator,
heartbeatMonitor);
flowController.setClusterManagerRemoteSiteInfo(properties.getRemoteInputPort(), properties.getRemoteInputHttpPort(), properties.isSiteToSiteSecure());
@@ -420,6 +425,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
final boolean configuredForClustering,
final NodeProtocolSender protocolSender,
final BulletinRepository bulletinRepo,
+ final ClusterCoordinator clusterCoordinator,
final HeartbeatMonitor heartbeatMonitor) {
maxTimerDrivenThreads = new AtomicInteger(10);
@@ -430,6 +436,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
this.heartbeatMonitor = heartbeatMonitor;
sslContext = SslContextFactory.createSslContext(properties, false);
extensionManager = new ExtensionManager();
+ this.clusterCoordinator = clusterCoordinator;
timerDrivenEngineRef = new AtomicReference<>(new FlowEngine(maxTimerDrivenThreads.get(), "Timer-Driven Process"));
eventDrivenEngineRef = new AtomicReference<>(new FlowEngine(maxEventDrivenThreads.get(), "Event-Driven Process"));
@@ -3181,11 +3188,19 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
@Override
public void onLeaderRelinquish() {
heartbeatMonitor.stop();
+
+ if (clusterCoordinator != null) {
+ clusterCoordinator.removeRole(ClusterRoles.CLUSTER_COORDINATOR);
+ }
}
@Override
public void onLeaderElection() {
heartbeatMonitor.start();
+
+ if (clusterCoordinator != null) {
+ clusterCoordinator.addRole(ClusterRoles.CLUSTER_COORDINATOR);
+ }
}
});
}
@@ -3819,7 +3834,14 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
return null;
}
- final Set<String> roles = bean.isPrimary() ? Collections.singleton(ClusterRoles.PRIMARY_NODE) : Collections.emptySet();
+ final Set<String> roles = new HashSet<>();
+ if (bean.isPrimary()) {
+ roles.add(ClusterRoles.PRIMARY_NODE);
+ }
+ if (clusterCoordinator.isActiveClusterCoordinator()) {
+ roles.add(ClusterRoles.CLUSTER_COORDINATOR);
+ }
+
final Heartbeat heartbeat = new Heartbeat(nodeId, roles, bean.getConnectionStatus(), hbPayload.marshal());
final HeartbeatMessage message = new HeartbeatMessage();
message.setHeartbeat(heartbeat);
http://git-wip-us.apache.org/repos/asf/nifi/blob/cf183e15/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowService.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowService.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowService.java
index e2bdcf0..d5d40b6 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowService.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowService.java
@@ -83,6 +83,7 @@ import java.util.Collections;
import java.util.Date;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
@@ -867,7 +868,9 @@ public class StandardFlowService implements FlowService, ProtocolHandler {
controller.setClustered(true, response.getInstanceId(), response.getCoordinatorDN());
controller.setClusterManagerRemoteSiteInfo(response.getManagerRemoteInputPort(), response.getManagerRemoteInputHttpPort(), response.isManagerRemoteCommsSecure());
- controller.setConnectionStatus(new NodeConnectionStatus(nodeId, NodeConnectionState.CONNECTED));
+ final NodeConnectionStatus status = clusterCoordinator.getConnectionStatus(nodeId);
+ final Set<String> roles = status == null ? Collections.emptySet() : status.getRoles();
+ controller.setConnectionStatus(new NodeConnectionStatus(nodeId, NodeConnectionState.CONNECTED, roles));
// start the processors as indicated by the dataflow
controller.onFlowInitialized(autoResumeState);
http://git-wip-us.apache.org/repos/asf/nifi/blob/cf183e15/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/spring/FlowControllerFactoryBean.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/spring/FlowControllerFactoryBean.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/spring/FlowControllerFactoryBean.java
index ecec0a3..7de36c8 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/spring/FlowControllerFactoryBean.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/spring/FlowControllerFactoryBean.java
@@ -18,6 +18,7 @@ package org.apache.nifi.spring;
import org.apache.nifi.admin.service.AuditService;
import org.apache.nifi.authorization.Authorizer;
+import org.apache.nifi.cluster.coordination.ClusterCoordinator;
import org.apache.nifi.cluster.coordination.heartbeat.HeartbeatMonitor;
import org.apache.nifi.cluster.protocol.NodeProtocolSender;
import org.apache.nifi.controller.FlowController;
@@ -52,6 +53,7 @@ public class FlowControllerFactoryBean implements FactoryBean, ApplicationContex
if (properties.isNode()) {
final NodeProtocolSender nodeProtocolSender = applicationContext.getBean("nodeProtocolSender", NodeProtocolSender.class);
final HeartbeatMonitor heartbeatMonitor = applicationContext.getBean("heartbeatMonitor", HeartbeatMonitor.class);
+ final ClusterCoordinator clusterCoordinator = applicationContext.getBean("clusterCoordinator", ClusterCoordinator.class);
flowController = FlowController.createClusteredInstance(
flowFileEventRepository,
properties,
@@ -60,6 +62,7 @@ public class FlowControllerFactoryBean implements FactoryBean, ApplicationContex
encryptor,
nodeProtocolSender,
bulletinRepository,
+ clusterCoordinator,
heartbeatMonitor);
} else {
flowController = FlowController.createStandaloneInstance(
http://git-wip-us.apache.org/repos/asf/nifi/blob/cf183e15/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/NiFiServiceFacade.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/NiFiServiceFacade.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/NiFiServiceFacade.java
index 2d093ce..b8d9cfd 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/NiFiServiceFacade.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/NiFiServiceFacade.java
@@ -22,7 +22,6 @@ import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.function.Function;
-import java.util.function.Supplier;
import org.apache.nifi.authorization.user.NiFiUser;
import org.apache.nifi.controller.ScheduledState;
@@ -98,7 +97,6 @@ import org.apache.nifi.web.api.entity.SnippetEntity;
import org.apache.nifi.web.api.entity.TemplateEntity;
import org.apache.nifi.web.api.entity.UserEntity;
import org.apache.nifi.web.api.entity.UserGroupEntity;
-import org.apache.nifi.web.concurrent.LockExpiredException;
/**
* Defines the NiFiServiceFacade interface.
@@ -117,104 +115,6 @@ public interface NiFiServiceFacade {
void authorizeAccess(AuthorizeAccess authorizeAccess);
/**
- * Obtains a read (shared) lock for the entire flow, so that no other
- * requests can be made to modify the flow until either this read lock
- * is released via {@link #releaseReadLock()} or the lock expires
- *
- * @return an identifier that indicates the version of the lock, so that other
- * requests cannot release a lock that was held by this request
- */
- String obtainReadLock();
-
- /**
- * Obtains a read (shared) lock for the entire flow, so that no other
- * requests can be made to modify the flow until either this read lock
- * is released via {@link #releaseReadLock()} or the lock expires
- *
- * @param versionId specifies a value to use for the Version ID for the lock
- *
- * @return an identifier that indicates the version of the lock, so that other
- * requests cannot release a lock that was held by this request
- */
- String obtainReadLock(String versionId);
-
- /**
- * Performs the given action while holding the read lock that has already been obtained
- * with the given versionIdentifier. This allows the given action to be performed without
- * allowing the read lock to expire until the entire action has completed.
- *
- * @param versionIdentifier the identifier that indicates the version of the lock that
- * is held. The value that is to be passed here is the value that was returned from the
- * call to {@link #obtainReadLock()}.
- * @param action the action to perform
- *
- * @return the value returned by the action
- * @throws LockExpiredException if the lock has expired before the action is invoked
- * @throws Exception any Exception thrown by the given action is propagated
- */
- <T> T withReadLock(String versionIdentifier, Supplier<T> action) throws LockExpiredException;
-
- /**
- * Releases the read lock held on this flow
- *
- * @param versionIdentifier the identifier that indicates the version of the lock that
- * is held. The value that is to be passed here is the value that was returned from the
- * call to {@link #obtainReadLock()}.
- *
- * @throws LockExpiredException if the lock with the given identifier has already expired or is not valid
- */
- void releaseReadLock(String versionIdentifier) throws LockExpiredException;
-
- /**
- * Obtains a write (mutually exclusive) lock for the entire flow, so that no other
- * requests can be made to read or modify the flow until either this write lock
- * is released via {@link #releaseWriteLock()} or the lock expires
- *
- * @return an identifier that indicates the version of the lock, so that other
- * requests cannot release a lock that was held by this request
- */
- String obtainWriteLock();
-
- /**
- * Obtains a write (mutually exclusive) lock for the entire flow, so that no other
- * requests can be made to read or modify the flow until either this write lock
- * is released via {@link #releaseWriteLock()} or the lock expires
- *
- * @param versionId specifies a value to use for the Version ID for the lock
- *
- * @return an identifier that indicates the version of the lock, so that other
- * requests cannot release a lock that was held by this request
- */
- String obtainWriteLock(String versionId);
-
- /**
- * Performs the given action while holding the write lock that has already been obtained
- * with the given versionIdentifier. This allows the given action to be performed without
- * allowing the write lock to expire until the entire action has completed.
- *
- * @param versionIdentifier the identifier that indicates the version of the lock that
- * is held. The value that is to be passed here is the value that was returned from the
- * call to {@link #obtainWriteLock()}.
- * @param action the action to perform
- *
- * @return the value returned by the action
- * @throws LockExpiredException if the lock has expired before the action is invoked
- * @throws Exception any Exception thrown by the given action is propagated
- */
- <T> T withWriteLock(String versionIdentifier, Supplier<T> action) throws LockExpiredException;
-
- /**
- * Releases the write lock held on the flow
- *
- * @param versionIdentifier the identifier that indicates the version of the lock that
- * is held. The value that is to be passed here is the value that was returned from the
- * call to {@link #obtainWriteLock()}.
- *
- * @throws LockExpiredException if the lock with the given identifier has already expired or is not valid
- */
- void releaseWriteLock(String versionIdentifier) throws LockExpiredException;
-
- /**
* Claims the specified revision for the specified user.
*
* @param revision revision
http://git-wip-us.apache.org/repos/asf/nifi/blob/cf183e15/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiContentAccess.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiContentAccess.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiContentAccess.java
index 8a6b438..f1c47a1 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiContentAccess.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiContentAccess.java
@@ -25,6 +25,7 @@ import org.apache.nifi.cluster.coordination.ClusterCoordinator;
import org.apache.nifi.cluster.coordination.http.replication.RequestReplicator;
import org.apache.nifi.cluster.manager.NodeResponse;
import org.apache.nifi.cluster.manager.exception.IllegalClusterStateException;
+import org.apache.nifi.cluster.manager.exception.NoClusterCoordinatorException;
import org.apache.nifi.cluster.protocol.NodeIdentifier;
import org.apache.nifi.controller.repository.claim.ContentDirection;
import org.apache.nifi.util.NiFiProperties;
@@ -87,12 +88,17 @@ public class StandardNiFiContentAccess implements ContentAccess {
// get the target node and ensure it exists
final NodeIdentifier nodeId = clusterCoordinator.getNodeIdentifier(request.getClusterNodeId());
- final Set<NodeIdentifier> targetNodes = Collections.singleton(nodeId);
- // replicate the request to the specific node
+ // replicate the request to the cluster coordinator, indicating the target node
NodeResponse nodeResponse;
try {
- nodeResponse = requestReplicator.replicate(targetNodes, HttpMethod.GET, dataUri, parameters, headers).awaitMergedResponse();
+ headers.put(RequestReplicator.REPLICATION_TARGET_NODE_UUID_HEADER, nodeId.getId());
+ final NodeIdentifier coordinatorNode = clusterCoordinator.getElectedActiveCoordinatorNode();
+ if (coordinatorNode == null) {
+ throw new NoClusterCoordinatorException();
+ }
+ final Set<NodeIdentifier> coordinatorNodes = Collections.singleton(coordinatorNode);
+ nodeResponse = requestReplicator.replicate(coordinatorNodes, HttpMethod.GET, dataUri, parameters, headers, false).awaitMergedResponse();
} catch (InterruptedException e) {
throw new IllegalClusterStateException("Interrupted while waiting for a response from node");
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/cf183e15/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java
index 3526f32..490a9bb 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java
@@ -183,8 +183,6 @@ import org.apache.nifi.web.api.entity.TemplateEntity;
import org.apache.nifi.web.api.entity.TenantEntity;
import org.apache.nifi.web.api.entity.UserEntity;
import org.apache.nifi.web.api.entity.UserGroupEntity;
-import org.apache.nifi.web.concurrent.DistributedLockingManager;
-import org.apache.nifi.web.concurrent.LockExpiredException;
import org.apache.nifi.web.controller.ControllerFacade;
import org.apache.nifi.web.dao.AccessPolicyDAO;
import org.apache.nifi.web.dao.ConnectionDAO;
@@ -258,54 +256,11 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
private Authorizer authorizer;
private AuthorizableLookup authorizableLookup;
- private DistributedLockingManager lockManager;
// -----------------------------------------
// Synchronization methods
// -----------------------------------------
@Override
- public String obtainReadLock() {
- return lockManager.getReadLock().lock();
- }
-
- @Override
- public String obtainReadLock(final String versionIdSeed) {
- return lockManager.getReadLock().lock(versionIdSeed);
- }
-
- @Override
- public <T> T withReadLock(final String versionIdentifier, final Supplier<T> action) throws LockExpiredException {
- return lockManager.getReadLock().withLock(versionIdentifier, action);
- }
-
- @Override
- public void releaseReadLock(final String versionIdentifier) throws LockExpiredException {
- lockManager.getReadLock().unlock(versionIdentifier);
- }
-
- @Override
- public String obtainWriteLock() {
- return lockManager.getWriteLock().lock();
- }
-
- @Override
- public String obtainWriteLock(final String versionIdSeed) {
- return lockManager.getWriteLock().lock(versionIdSeed);
- }
-
- @Override
- public <T> T withWriteLock(final String versionIdentifier, final Supplier<T> action) throws LockExpiredException {
- return lockManager.getWriteLock().withLock(versionIdentifier, action);
- }
-
- @Override
- public void releaseWriteLock(final String versionIdentifier) throws LockExpiredException {
- lockManager.getWriteLock().unlock(versionIdentifier);
- }
-
-
-
- @Override
public void authorizeAccess(final AuthorizeAccess authorizeAccess) {
authorizeAccess.authorize(authorizableLookup);
}
@@ -2900,8 +2855,4 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
public void setBulletinRepository(final BulletinRepository bulletinRepository) {
this.bulletinRepository = bulletinRepository;
}
-
- public void setLockManager(final DistributedLockingManager lockManager) {
- this.lockManager = lockManager;
- }
}