You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by mc...@apache.org on 2015/10/14 23:47:20 UTC
nifi git commit: NIFI-730: - Adding emptying a queue when clustered.
Repository: nifi
Updated Branches:
refs/heads/NIFI-730 09a3f6dad -> 39a050d2f
NIFI-730:
- Adding emptying a queue when clustered.
Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/39a050d2
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/39a050d2
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/39a050d2
Branch: refs/heads/NIFI-730
Commit: 39a050d2fdc2437576d860b968ab1ec78d18fa21
Parents: 09a3f6d
Author: Matt Gilman <ma...@gmail.com>
Authored: Wed Oct 14 17:47:06 2015 -0400
Committer: Matt Gilman <ma...@gmail.com>
Committed: Wed Oct 14 17:47:06 2015 -0400
----------------------------------------------------------------------
.../controller/queue/DropFlowFileState.java | 18 ++-
.../cluster/manager/impl/WebClusterManager.java | 111 +++++++++++++++++--
.../apache/nifi/web/api/ConnectionResource.java | 26 ++++-
.../org/apache/nifi/web/api/dto/DtoFactory.java | 12 +-
4 files changed, 144 insertions(+), 23 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/nifi/blob/39a050d2/nifi-api/src/main/java/org/apache/nifi/controller/queue/DropFlowFileState.java
----------------------------------------------------------------------
diff --git a/nifi-api/src/main/java/org/apache/nifi/controller/queue/DropFlowFileState.java b/nifi-api/src/main/java/org/apache/nifi/controller/queue/DropFlowFileState.java
index 12dbedf..32efcbb 100644
--- a/nifi-api/src/main/java/org/apache/nifi/controller/queue/DropFlowFileState.java
+++ b/nifi-api/src/main/java/org/apache/nifi/controller/queue/DropFlowFileState.java
@@ -14,7 +14,6 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-
package org.apache.nifi.controller.queue;
/**
@@ -38,4 +37,21 @@ public enum DropFlowFileState {
public String toString() {
return description;
}
+
+ /**
+ * @param description string form of drop flow file state
+ * @return the matching DropFlowFileState or null if the description doesn't match
+ */
+ public static DropFlowFileState valueOfDescription(String description) {
+ DropFlowFileState desiredState = null;
+
+ for (DropFlowFileState state : values()) {
+ if (state.toString().equals(description)) {
+ desiredState = state;
+ break;
+ }
+ }
+
+ return desiredState;
+ }
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/39a050d2/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManager.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManager.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManager.java
index bfeec7a..6b0bb64 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManager.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManager.java
@@ -226,14 +226,17 @@ import org.xml.sax.SAXException;
import org.xml.sax.SAXParseException;
import com.sun.jersey.api.client.ClientResponse;
+import org.apache.nifi.controller.queue.DropFlowFileState;
import org.apache.nifi.controller.service.ControllerServiceState;
import org.apache.nifi.web.api.dto.ControllerServiceDTO;
import org.apache.nifi.web.api.dto.ControllerServiceReferencingComponentDTO;
+import org.apache.nifi.web.api.dto.DropRequestDTO;
import org.apache.nifi.web.api.dto.ReportingTaskDTO;
import org.apache.nifi.web.api.entity.ControllerServiceEntity;
import org.apache.nifi.web.api.entity.ControllerServiceReferencingComponentsEntity;
import org.apache.nifi.web.api.entity.ControllerServicesEntity;
+import org.apache.nifi.web.api.entity.DropRequestEntity;
import org.apache.nifi.web.api.entity.ReportingTaskEntity;
import org.apache.nifi.web.api.entity.ReportingTasksEntity;
@@ -316,6 +319,9 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
public static final String REPORTING_TASKS_URI = "/nifi-api/controller/reporting-tasks/node";
public static final Pattern REPORTING_TASK_URI_PATTERN = Pattern.compile("/nifi-api/controller/reporting-tasks/node/[a-f0-9\\-]{36}");
+ public static final Pattern QUEUE_CONTENTS_URI = Pattern.compile("/nifi-api/controller/process-groups/(?:(?:root)|(?:[a-f0-9\\-]{36}))/connections/[a-f0-9\\-]{36}/contents");
+ public static final Pattern DROP_REQUEST_URI = Pattern.compile("/nifi-api/controller/process-groups/(?:(?:root)|(?:[a-f0-9\\-]{36}))/connections/[a-f0-9\\-]{36}/drop-requests/[a-f0-9\\-]{36}");
+
private final NiFiProperties properties;
private final HttpRequestReplicator httpRequestReplicator;
private final HttpResponseMapper httpResponseMapper;
@@ -1090,7 +1096,7 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
// Register log observer to provide bulletins when reporting task logs anything at WARN level or above
final LogRepository logRepository = LogRepositoryFactory.getRepository(id);
logRepository.addObserver(StandardProcessorNode.BULLETIN_OBSERVER_ID, LogLevel.WARN,
- new ReportingTaskLogObserver(getBulletinRepository(), taskNode));
+ new ReportingTaskLogObserver(getBulletinRepository(), taskNode));
return taskNode;
}
@@ -1385,7 +1391,7 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
// Register log observer to provide bulletins when reporting task logs anything at WARN level or above
final LogRepository logRepository = LogRepositoryFactory.getRepository(id);
logRepository.addObserver(StandardProcessorNode.BULLETIN_OBSERVER_ID, LogLevel.WARN,
- new ControllerServiceLogObserver(getBulletinRepository(), serviceNode));
+ new ControllerServiceLogObserver(getBulletinRepository(), serviceNode));
return serviceNode;
}
@@ -2465,6 +2471,16 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
return false;
}
+ private static boolean isDropRequestEndpoint(final URI uri, final String method) {
+ if ("DELETE".equalsIgnoreCase(method) && QUEUE_CONTENTS_URI.matcher(uri.getPath()).matches()) {
+ return true;
+ } else if (("GET".equalsIgnoreCase(method) || "DELETE".equalsIgnoreCase(method)) && DROP_REQUEST_URI.matcher(uri.getPath()).matches()) {
+ return true;
+ }
+
+ return false;
+ }
+
static boolean isResponseInterpreted(final URI uri, final String method) {
return isProcessorsEndpoint(uri, method) || isProcessorEndpoint(uri, method)
|| isRemoteProcessGroupsEndpoint(uri, method) || isRemoteProcessGroupEndpoint(uri, method)
@@ -2472,7 +2488,8 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
|| isTemplateEndpoint(uri, method) || isFlowSnippetEndpoint(uri, method)
|| isProvenanceQueryEndpoint(uri, method) || isProvenanceEventEndpoint(uri, method)
|| isControllerServicesEndpoint(uri, method) || isControllerServiceEndpoint(uri, method) || isControllerServiceReferenceEndpoint(uri, method)
- || isReportingTasksEndpoint(uri, method) || isReportingTaskEndpoint(uri, method);
+ || isReportingTasksEndpoint(uri, method) || isReportingTaskEndpoint(uri, method)
+ || isDropRequestEndpoint(uri, method);
}
private void mergeProcessorValidationErrors(final ProcessorDTO processor, Map<NodeIdentifier, ProcessorDTO> processorMap) {
@@ -2808,6 +2825,62 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
return normalizedValidationErrors;
}
+ /**
+ * Merges the drop requests in the specified map into the specified drop request.
+ *
+ * @param dropRequest the target drop request
+ * @param dropRequestMap the mapping of all responses being merged
+ */
+ private void mergeDropRequests(final DropRequestDTO dropRequest, final Map<NodeIdentifier, DropRequestDTO> dropRequestMap) {
+ boolean nodeWaiting = false;
+ int originalCount = 0;
+ long originalSize = 0;
+ int currentCount = 0;
+ long currentSize = 0;
+ int droppedCount = 0;
+ long droppedSize = 0;
+
+ DropFlowFileState state = null;
+ for (final Map.Entry<NodeIdentifier, DropRequestDTO> nodeEntry : dropRequestMap.entrySet()) {
+ final DropRequestDTO nodeDropRequest = nodeEntry.getValue();
+
+ currentCount += nodeDropRequest.getCurrentCount();
+ currentSize += nodeDropRequest.getCurrentSize();
+ droppedCount += nodeDropRequest.getDroppedCount();
+ droppedSize += nodeDropRequest.getDroppedSize();
+
+ if (nodeDropRequest.getOriginalCount() == null) {
+ nodeWaiting = true;
+ } else {
+ originalCount += nodeDropRequest.getOriginalCount();
+ originalSize += nodeDropRequest.getOriginalSize();
+ }
+
+ final DropFlowFileState nodeState = DropFlowFileState.valueOfDescription(nodeDropRequest.getState());
+ if (state == null || state.compareTo(nodeState) > 0) {
+ state = nodeState;
+ }
+ }
+
+ dropRequest.setCurrentCount(currentCount);
+ dropRequest.setCurrentSize(currentSize);
+ dropRequest.setCurrent(FormatUtils.formatCount(currentCount) + " / " + FormatUtils.formatDataSize(currentSize));
+
+ dropRequest.setDroppedCount(droppedCount);
+ dropRequest.setDroppedSize(droppedSize);
+ dropRequest.setDropped(FormatUtils.formatCount(droppedCount) + " / " + FormatUtils.formatDataSize(droppedSize));
+
+ if (!nodeWaiting) {
+ dropRequest.setOriginalCount(originalCount);
+ dropRequest.setOriginalSize(originalSize);
+ dropRequest.setOriginal(FormatUtils.formatCount(originalCount) + " / " + FormatUtils.formatDataSize(originalSize));
+ }
+
+ if (state != null) {
+ dropRequest.setState(state.toString());
+ }
+ }
+
// requires write lock to be already acquired unless request is not mutable
private NodeResponse mergeResponses(final URI uri, final String method, final Set<NodeResponse> nodeResponses, final boolean mutableRequest) {
// holds the one response of all the node responses to return to the client
@@ -3158,8 +3231,8 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
continue;
}
- final ControllerServiceReferencingComponentsEntity nodeResponseEntity =
- nodeResponse == clientResponse ? responseEntity : nodeResponse.getClientResponse().getEntity(ControllerServiceReferencingComponentsEntity.class);
+ final ControllerServiceReferencingComponentsEntity nodeResponseEntity
+ = nodeResponse == clientResponse ? responseEntity : nodeResponse.getClientResponse().getEntity(ControllerServiceReferencingComponentsEntity.class);
final Set<ControllerServiceReferencingComponentDTO> nodeReferencingComponents = nodeResponseEntity.getControllerServiceReferencingComponents();
resultsMap.put(nodeResponse.getNodeId(), nodeReferencingComponents);
@@ -3218,6 +3291,24 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
// create a new client response
clientResponse = new NodeResponse(clientResponse, responseEntity);
+ } else if (hasSuccessfulClientResponse && isDropRequestEndpoint(uri, method)) {
+ final DropRequestEntity responseEntity = clientResponse.getClientResponse().getEntity(DropRequestEntity.class);
+ final DropRequestDTO dropRequest = responseEntity.getDropRequest();
+
+ final Map<NodeIdentifier, DropRequestDTO> resultsMap = new HashMap<>();
+ for (final NodeResponse nodeResponse : updatedNodesMap.values()) {
+ if (problematicNodeResponses.contains(nodeResponse)) {
+ continue;
+ }
+
+ final DropRequestEntity nodeResponseEntity = nodeResponse == clientResponse ? responseEntity : nodeResponse.getClientResponse().getEntity(DropRequestEntity.class);
+ final DropRequestDTO nodeDropRequest = nodeResponseEntity.getDropRequest();
+
+ resultsMap.put(nodeResponse.getNodeId(), nodeDropRequest);
+ }
+ mergeDropRequests(dropRequest, resultsMap);
+
+ clientResponse = new NodeResponse(clientResponse, responseEntity);
} else {
if (!nodeResponsesToDrain.isEmpty()) {
drainResponses(nodeResponsesToDrain);
@@ -3270,12 +3361,12 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
}
/**
- * Determines if all problematic responses were due to 404 NOT_FOUND. Assumes that problematicNodeResponses is not empty and
- * is not comprised of responses from all nodes in the cluster (at least one node contained the counter in question).
+ * Determines if all problematic responses were due to 404 NOT_FOUND. Assumes that problematicNodeResponses is not empty and is not comprised of responses from all nodes in the cluster (at least
+ * one node contained the counter in question).
*
- * @param problematicNodeResponses The problematic node responses
- * @param uri The URI for the request
- * @return Whether all problematic node responses were due to a missing counter
+ * @param problematicNodeResponses The problematic node responses
+ * @param uri The URI for the request
+ * @return Whether all problematic node responses were due to a missing counter
*/
private boolean isMissingCounter(final Set<NodeResponse> problematicNodeResponses, final URI uri) {
if (isCountersEndpoint(uri)) {
http://git-wip-us.apache.org/repos/asf/nifi/blob/39a050d2/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ConnectionResource.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ConnectionResource.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ConnectionResource.java
index b170d39..6741348 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ConnectionResource.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ConnectionResource.java
@@ -894,6 +894,7 @@ public class ConnectionResource extends ApplicationResource {
/**
* Drops the flowfiles in the queue of the specified connection.
*
+ * @param httpServletRequest request
* @param clientId Optional client id. If the client id is not specified, a new one will be generated. This value (whether specified or generated) is included in the response.
* @param id The id of the connection
* @return A dropRequestEntity
@@ -920,6 +921,7 @@ public class ConnectionResource extends ApplicationResource {
}
)
public Response dropQueueContents(
+ @Context HttpServletRequest httpServletRequest,
@ApiParam(
value = "If the client id is not specified, new one will be generated. This value (whether specified or generated) is included in the response.",
required = false
@@ -936,6 +938,12 @@ public class ConnectionResource extends ApplicationResource {
return clusterManager.applyRequest(HttpMethod.DELETE, getAbsolutePath(), getRequestParameters(true), getHeaders()).getResponse();
}
+ // handle expects request (usually from the cluster manager)
+ final String expects = httpServletRequest.getHeader(WebClusterManager.NCM_EXPECTS_HTTP_HEADER);
+ if (expects != null) {
+ return generateContinueResponse().build();
+ }
+
// ensure the id is the same across the cluster
final String dropRequestId;
final ClusterContext clusterContext = ClusterContextThreadLocal.getContext();
@@ -947,7 +955,7 @@ public class ConnectionResource extends ApplicationResource {
// submit the drop request
final DropRequestDTO dropRequest = serviceFacade.createFlowFileDropRequest(groupId, id, dropRequestId);
- dropRequest.setUri(generateResourceUri("controller", "process-groups", groupId, "connections", id, "contents", "drop-requests", dropRequest.getId()));
+ dropRequest.setUri(generateResourceUri("controller", "process-groups", groupId, "connections", id, "drop-requests", dropRequest.getId()));
// create the revision
final RevisionDTO revision = new RevisionDTO();
@@ -978,7 +986,7 @@ public class ConnectionResource extends ApplicationResource {
@GET
@Consumes(MediaType.WILDCARD)
@Produces({MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML})
- @Path("/{connection-id}/contents/drop-requests/{drop-request-id}")
+ @Path("/{connection-id}/drop-requests/{drop-request-id}")
@PreAuthorize("hasRole('ROLE_DFM')")
@ApiOperation(
value = "Gets the current status of a drop request for the specified connection.",
@@ -1020,7 +1028,7 @@ public class ConnectionResource extends ApplicationResource {
// get the drop request
final DropRequestDTO dropRequest = serviceFacade.getFlowFileDropRequest(groupId, connectionId, dropRequestId);
- dropRequest.setUri(generateResourceUri("controller", "process-groups", groupId, "connections", connectionId, "contents", "drop-requests", dropRequestId));
+ dropRequest.setUri(generateResourceUri("controller", "process-groups", groupId, "connections", connectionId, "drop-requests", dropRequestId));
// create the revision
final RevisionDTO revision = new RevisionDTO();
@@ -1037,6 +1045,7 @@ public class ConnectionResource extends ApplicationResource {
/**
* Deletes the specified drop request.
*
+ * @param httpServletRequest request
* @param clientId Optional client id. If the client id is not specified, a new one will be generated. This value (whether specified or generated) is included in the response.
* @param connectionId The connection id
* @param dropRequestId The drop request id
@@ -1045,7 +1054,7 @@ public class ConnectionResource extends ApplicationResource {
@DELETE
@Consumes(MediaType.WILDCARD)
@Produces({MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML})
- @Path("/{connection-id}/contents/drop-requests/{drop-request-id}")
+ @Path("/{connection-id}/drop-requests/{drop-request-id}")
@PreAuthorize("hasRole('ROLE_DFM')")
@ApiOperation(
value = "Cancels and/or removes a request drop of the contents in this connection.",
@@ -1064,6 +1073,7 @@ public class ConnectionResource extends ApplicationResource {
}
)
public Response removeDropRequest(
+ @Context HttpServletRequest httpServletRequest,
@ApiParam(
value = "If the client id is not specified, new one will be generated. This value (whether specified or generated) is included in the response.",
required = false
@@ -1085,9 +1095,15 @@ public class ConnectionResource extends ApplicationResource {
return clusterManager.applyRequest(HttpMethod.DELETE, getAbsolutePath(), getRequestParameters(true), getHeaders()).getResponse();
}
+ // handle expects request (usually from the cluster manager)
+ final String expects = httpServletRequest.getHeader(WebClusterManager.NCM_EXPECTS_HTTP_HEADER);
+ if (expects != null) {
+ return generateContinueResponse().build();
+ }
+
// delete the drop request
final DropRequestDTO dropRequest = serviceFacade.deleteFlowFileDropRequest(groupId, connectionId, dropRequestId);
- dropRequest.setUri(generateResourceUri("controller", "process-groups", groupId, "connections", connectionId, "contents", "drop-requests", dropRequestId));
+ dropRequest.setUri(generateResourceUri("controller", "process-groups", groupId, "connections", connectionId, "drop-requests", dropRequestId));
// create the revision
final RevisionDTO revision = new RevisionDTO();
http://git-wip-us.apache.org/repos/asf/nifi/blob/39a050d2/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java
index 264268b..0758ce2 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java
@@ -315,6 +315,11 @@ public final class DtoFactory {
dto.setDroppedSize(dropped.getByteCount());
dto.setDropped(FormatUtils.formatCount(dropped.getObjectCount()) + " / " + FormatUtils.formatDataSize(dropped.getByteCount()));
+ final QueueSize current = dropRequest.getCurrentSize();
+ dto.setCurrentCount(current.getObjectCount());
+ dto.setCurrentSize(current.getByteCount());
+ dto.setCurrent(FormatUtils.formatCount(current.getObjectCount()) + " / " + FormatUtils.formatDataSize(current.getByteCount()));
+
if (dropRequest.getOriginalSize() != null) {
final QueueSize original = dropRequest.getOriginalSize();
dto.setOriginalCount(original.getObjectCount());
@@ -326,13 +331,6 @@ public final class DtoFactory {
dto.setPercentCompleted(0);
}
- if (dropRequest.getCurrentSize() != null) {
- final QueueSize current = dropRequest.getCurrentSize();
- dto.setCurrentCount(current.getObjectCount());
- dto.setCurrentSize(current.getByteCount());
- dto.setCurrent(FormatUtils.formatCount(current.getObjectCount()) + " / " + FormatUtils.formatDataSize(current.getByteCount()));
- }
-
return dto;
}