You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ma...@apache.org on 2015/11/02 20:32:45 UTC

[21/30] nifi git commit: NIFI-730: - Adding emptying a queue when clustered.

NIFI-730:
- Adding emptying a queue when clustered.

Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/39a050d2
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/39a050d2
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/39a050d2

Branch: refs/heads/master
Commit: 39a050d2fdc2437576d860b968ab1ec78d18fa21
Parents: 09a3f6d
Author: Matt Gilman <ma...@gmail.com>
Authored: Wed Oct 14 17:47:06 2015 -0400
Committer: Matt Gilman <ma...@gmail.com>
Committed: Wed Oct 14 17:47:06 2015 -0400

----------------------------------------------------------------------
 .../controller/queue/DropFlowFileState.java     |  18 ++-
 .../cluster/manager/impl/WebClusterManager.java | 111 +++++++++++++++++--
 .../apache/nifi/web/api/ConnectionResource.java |  26 ++++-
 .../org/apache/nifi/web/api/dto/DtoFactory.java |  12 +-
 4 files changed, 144 insertions(+), 23 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/39a050d2/nifi-api/src/main/java/org/apache/nifi/controller/queue/DropFlowFileState.java
----------------------------------------------------------------------
diff --git a/nifi-api/src/main/java/org/apache/nifi/controller/queue/DropFlowFileState.java b/nifi-api/src/main/java/org/apache/nifi/controller/queue/DropFlowFileState.java
index 12dbedf..32efcbb 100644
--- a/nifi-api/src/main/java/org/apache/nifi/controller/queue/DropFlowFileState.java
+++ b/nifi-api/src/main/java/org/apache/nifi/controller/queue/DropFlowFileState.java
@@ -14,7 +14,6 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
 package org.apache.nifi.controller.queue;
 
 /**
@@ -38,4 +37,21 @@ public enum DropFlowFileState {
     public String toString() {
         return description;
     }
+
+    /**
+     * @param description string form of drop flow file state
+     * @return the matching DropFlowFileState or null if the description doesn't match
+     */
+    public static DropFlowFileState valueOfDescription(String description) {
+        DropFlowFileState desiredState = null;
+
+        for (DropFlowFileState state : values()) {
+            if (state.toString().equals(description)) {
+                desiredState = state;
+                break;
+            }
+        }
+
+        return desiredState;
+    }
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/39a050d2/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManager.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManager.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManager.java
index bfeec7a..6b0bb64 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManager.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManager.java
@@ -226,14 +226,17 @@ import org.xml.sax.SAXException;
 import org.xml.sax.SAXParseException;
 
 import com.sun.jersey.api.client.ClientResponse;
+import org.apache.nifi.controller.queue.DropFlowFileState;
 
 import org.apache.nifi.controller.service.ControllerServiceState;
 import org.apache.nifi.web.api.dto.ControllerServiceDTO;
 import org.apache.nifi.web.api.dto.ControllerServiceReferencingComponentDTO;
+import org.apache.nifi.web.api.dto.DropRequestDTO;
 import org.apache.nifi.web.api.dto.ReportingTaskDTO;
 import org.apache.nifi.web.api.entity.ControllerServiceEntity;
 import org.apache.nifi.web.api.entity.ControllerServiceReferencingComponentsEntity;
 import org.apache.nifi.web.api.entity.ControllerServicesEntity;
+import org.apache.nifi.web.api.entity.DropRequestEntity;
 import org.apache.nifi.web.api.entity.ReportingTaskEntity;
 import org.apache.nifi.web.api.entity.ReportingTasksEntity;
 
@@ -316,6 +319,9 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
     public static final String REPORTING_TASKS_URI = "/nifi-api/controller/reporting-tasks/node";
     public static final Pattern REPORTING_TASK_URI_PATTERN = Pattern.compile("/nifi-api/controller/reporting-tasks/node/[a-f0-9\\-]{36}");
 
+    public static final Pattern QUEUE_CONTENTS_URI = Pattern.compile("/nifi-api/controller/process-groups/(?:(?:root)|(?:[a-f0-9\\-]{36}))/connections/[a-f0-9\\-]{36}/contents");
+    public static final Pattern DROP_REQUEST_URI = Pattern.compile("/nifi-api/controller/process-groups/(?:(?:root)|(?:[a-f0-9\\-]{36}))/connections/[a-f0-9\\-]{36}/drop-requests/[a-f0-9\\-]{36}");
+
     private final NiFiProperties properties;
     private final HttpRequestReplicator httpRequestReplicator;
     private final HttpResponseMapper httpResponseMapper;
@@ -1090,7 +1096,7 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
         // Register log observer to provide bulletins when reporting task logs anything at WARN level or above
         final LogRepository logRepository = LogRepositoryFactory.getRepository(id);
         logRepository.addObserver(StandardProcessorNode.BULLETIN_OBSERVER_ID, LogLevel.WARN,
-            new ReportingTaskLogObserver(getBulletinRepository(), taskNode));
+                new ReportingTaskLogObserver(getBulletinRepository(), taskNode));
 
         return taskNode;
     }
@@ -1385,7 +1391,7 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
         // Register log observer to provide bulletins when reporting task logs anything at WARN level or above
         final LogRepository logRepository = LogRepositoryFactory.getRepository(id);
         logRepository.addObserver(StandardProcessorNode.BULLETIN_OBSERVER_ID, LogLevel.WARN,
-            new ControllerServiceLogObserver(getBulletinRepository(), serviceNode));
+                new ControllerServiceLogObserver(getBulletinRepository(), serviceNode));
 
         return serviceNode;
     }
@@ -2465,6 +2471,16 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
         return false;
     }
 
+    private static boolean isDropRequestEndpoint(final URI uri, final String method) {
+        if ("DELETE".equalsIgnoreCase(method) && QUEUE_CONTENTS_URI.matcher(uri.getPath()).matches()) {
+            return true;
+        } else if (("GET".equalsIgnoreCase(method) || "DELETE".equalsIgnoreCase(method)) && DROP_REQUEST_URI.matcher(uri.getPath()).matches()) {
+            return true;
+        }
+
+        return false;
+    }
+
     static boolean isResponseInterpreted(final URI uri, final String method) {
         return isProcessorsEndpoint(uri, method) || isProcessorEndpoint(uri, method)
                 || isRemoteProcessGroupsEndpoint(uri, method) || isRemoteProcessGroupEndpoint(uri, method)
@@ -2472,7 +2488,8 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
                 || isTemplateEndpoint(uri, method) || isFlowSnippetEndpoint(uri, method)
                 || isProvenanceQueryEndpoint(uri, method) || isProvenanceEventEndpoint(uri, method)
                 || isControllerServicesEndpoint(uri, method) || isControllerServiceEndpoint(uri, method) || isControllerServiceReferenceEndpoint(uri, method)
-                || isReportingTasksEndpoint(uri, method) || isReportingTaskEndpoint(uri, method);
+                || isReportingTasksEndpoint(uri, method) || isReportingTaskEndpoint(uri, method)
+                || isDropRequestEndpoint(uri, method);
     }
 
     private void mergeProcessorValidationErrors(final ProcessorDTO processor, Map<NodeIdentifier, ProcessorDTO> processorMap) {
@@ -2808,6 +2825,62 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
         return normalizedValidationErrors;
     }
 
+    /**
+     * Merges the drop requests in the specified map into the specified drop request.
+     *
+     * @param dropRequest the target drop request
+     * @param dropRequestMap the mapping of all responses being merged
+     */
+    private void mergeDropRequests(final DropRequestDTO dropRequest, final Map<NodeIdentifier, DropRequestDTO> dropRequestMap) {
+        boolean nodeWaiting = false;
+        int originalCount = 0;
+        long originalSize = 0;
+        int currentCount = 0;
+        long currentSize = 0;
+        int droppedCount = 0;
+        long droppedSize = 0;
+
+        DropFlowFileState state = null;
+        for (final Map.Entry<NodeIdentifier, DropRequestDTO> nodeEntry : dropRequestMap.entrySet()) {
+            final DropRequestDTO nodeDropRequest = nodeEntry.getValue();
+
+            currentCount += nodeDropRequest.getCurrentCount();
+            currentSize += nodeDropRequest.getCurrentSize();
+            droppedCount += nodeDropRequest.getDroppedCount();
+            droppedSize += nodeDropRequest.getDroppedSize();
+
+            if (nodeDropRequest.getOriginalCount() == null) {
+                nodeWaiting = true;
+            } else {
+                originalCount += nodeDropRequest.getOriginalCount();
+                originalSize += nodeDropRequest.getOriginalSize();
+            }
+
+            final DropFlowFileState nodeState = DropFlowFileState.valueOfDescription(nodeDropRequest.getState());
+            if (state == null || state.compareTo(nodeState) > 0) {
+                state = nodeState;
+            }
+        }
+
+        dropRequest.setCurrentCount(currentCount);
+        dropRequest.setCurrentSize(currentSize);
+        dropRequest.setCurrent(FormatUtils.formatCount(currentCount) + " / " + FormatUtils.formatDataSize(currentSize));
+
+        dropRequest.setDroppedCount(droppedCount);
+        dropRequest.setDroppedSize(droppedSize);
+        dropRequest.setDropped(FormatUtils.formatCount(droppedCount) + " / " + FormatUtils.formatDataSize(droppedSize));
+
+        if (!nodeWaiting) {
+            dropRequest.setOriginalCount(originalCount);
+            dropRequest.setOriginalSize(originalSize);
+            dropRequest.setOriginal(FormatUtils.formatCount(originalCount) + " / " + FormatUtils.formatDataSize(originalSize));
+        }
+
+        if (state != null) {
+            dropRequest.setState(state.toString());
+        }
+    }
+
     // requires write lock to be already acquired unless request is not mutable
     private NodeResponse mergeResponses(final URI uri, final String method, final Set<NodeResponse> nodeResponses, final boolean mutableRequest) {
         // holds the one response of all the node responses to return to the client
@@ -3158,8 +3231,8 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
                     continue;
                 }
 
-                final ControllerServiceReferencingComponentsEntity nodeResponseEntity =
-                        nodeResponse == clientResponse ? responseEntity : nodeResponse.getClientResponse().getEntity(ControllerServiceReferencingComponentsEntity.class);
+                final ControllerServiceReferencingComponentsEntity nodeResponseEntity
+                        = nodeResponse == clientResponse ? responseEntity : nodeResponse.getClientResponse().getEntity(ControllerServiceReferencingComponentsEntity.class);
                 final Set<ControllerServiceReferencingComponentDTO> nodeReferencingComponents = nodeResponseEntity.getControllerServiceReferencingComponents();
 
                 resultsMap.put(nodeResponse.getNodeId(), nodeReferencingComponents);
@@ -3218,6 +3291,24 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
 
             // create a new client response
             clientResponse = new NodeResponse(clientResponse, responseEntity);
+        } else if (hasSuccessfulClientResponse && isDropRequestEndpoint(uri, method)) {
+            final DropRequestEntity responseEntity = clientResponse.getClientResponse().getEntity(DropRequestEntity.class);
+            final DropRequestDTO dropRequest = responseEntity.getDropRequest();
+
+            final Map<NodeIdentifier, DropRequestDTO> resultsMap = new HashMap<>();
+            for (final NodeResponse nodeResponse : updatedNodesMap.values()) {
+                if (problematicNodeResponses.contains(nodeResponse)) {
+                    continue;
+                }
+
+                final DropRequestEntity nodeResponseEntity = nodeResponse == clientResponse ? responseEntity : nodeResponse.getClientResponse().getEntity(DropRequestEntity.class);
+                final DropRequestDTO nodeDropRequest = nodeResponseEntity.getDropRequest();
+
+                resultsMap.put(nodeResponse.getNodeId(), nodeDropRequest);
+            }
+            mergeDropRequests(dropRequest, resultsMap);
+
+            clientResponse = new NodeResponse(clientResponse, responseEntity);
         } else {
             if (!nodeResponsesToDrain.isEmpty()) {
                 drainResponses(nodeResponsesToDrain);
@@ -3270,12 +3361,12 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
     }
 
     /**
-     * Determines if all problematic responses were due to 404 NOT_FOUND. Assumes that problematicNodeResponses is not empty and
-     * is not comprised of responses from all nodes in the cluster (at least one node contained the counter in question).
+     * Determines if all problematic responses were due to 404 NOT_FOUND. Assumes that problematicNodeResponses is not empty and is not comprised of responses from all nodes in the cluster (at least
+     * one node contained the counter in question).
      *
-     * @param problematicNodeResponses  The problematic node responses
-     * @param uri                       The URI for the request
-     * @return                          Whether all problematic node responses were due to a missing counter
+     * @param problematicNodeResponses The problematic node responses
+     * @param uri The URI for the request
+     * @return Whether all problematic node responses were due to a missing counter
      */
     private boolean isMissingCounter(final Set<NodeResponse> problematicNodeResponses, final URI uri) {
         if (isCountersEndpoint(uri)) {

http://git-wip-us.apache.org/repos/asf/nifi/blob/39a050d2/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ConnectionResource.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ConnectionResource.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ConnectionResource.java
index b170d39..6741348 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ConnectionResource.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ConnectionResource.java
@@ -894,6 +894,7 @@ public class ConnectionResource extends ApplicationResource {
     /**
      * Drops the flowfiles in the queue of the specified connection.
      *
+     * @param httpServletRequest request
      * @param clientId Optional client id. If the client id is not specified, a new one will be generated. This value (whether specified or generated) is included in the response.
      * @param id The id of the connection
      * @return A dropRequestEntity
@@ -920,6 +921,7 @@ public class ConnectionResource extends ApplicationResource {
             }
     )
     public Response dropQueueContents(
+            @Context HttpServletRequest httpServletRequest,
             @ApiParam(
                     value = "If the client id is not specified, new one will be generated. This value (whether specified or generated) is included in the response.",
                     required = false
@@ -936,6 +938,12 @@ public class ConnectionResource extends ApplicationResource {
             return clusterManager.applyRequest(HttpMethod.DELETE, getAbsolutePath(), getRequestParameters(true), getHeaders()).getResponse();
         }
 
+        // handle expects request (usually from the cluster manager)
+        final String expects = httpServletRequest.getHeader(WebClusterManager.NCM_EXPECTS_HTTP_HEADER);
+        if (expects != null) {
+            return generateContinueResponse().build();
+        }
+
         // ensure the id is the same across the cluster
         final String dropRequestId;
         final ClusterContext clusterContext = ClusterContextThreadLocal.getContext();
@@ -947,7 +955,7 @@ public class ConnectionResource extends ApplicationResource {
 
         // submit the drop request
         final DropRequestDTO dropRequest = serviceFacade.createFlowFileDropRequest(groupId, id, dropRequestId);
-        dropRequest.setUri(generateResourceUri("controller", "process-groups", groupId, "connections", id, "contents", "drop-requests", dropRequest.getId()));
+        dropRequest.setUri(generateResourceUri("controller", "process-groups", groupId, "connections", id, "drop-requests", dropRequest.getId()));
 
         // create the revision
         final RevisionDTO revision = new RevisionDTO();
@@ -978,7 +986,7 @@ public class ConnectionResource extends ApplicationResource {
     @GET
     @Consumes(MediaType.WILDCARD)
     @Produces({MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML})
-    @Path("/{connection-id}/contents/drop-requests/{drop-request-id}")
+    @Path("/{connection-id}/drop-requests/{drop-request-id}")
     @PreAuthorize("hasRole('ROLE_DFM')")
     @ApiOperation(
             value = "Gets the current status of a drop request for the specified connection.",
@@ -1020,7 +1028,7 @@ public class ConnectionResource extends ApplicationResource {
 
         // get the drop request
         final DropRequestDTO dropRequest = serviceFacade.getFlowFileDropRequest(groupId, connectionId, dropRequestId);
-        dropRequest.setUri(generateResourceUri("controller", "process-groups", groupId, "connections", connectionId, "contents", "drop-requests", dropRequestId));
+        dropRequest.setUri(generateResourceUri("controller", "process-groups", groupId, "connections", connectionId, "drop-requests", dropRequestId));
 
         // create the revision
         final RevisionDTO revision = new RevisionDTO();
@@ -1037,6 +1045,7 @@ public class ConnectionResource extends ApplicationResource {
     /**
      * Deletes the specified drop request.
      *
+     * @param httpServletRequest request
      * @param clientId Optional client id. If the client id is not specified, a new one will be generated. This value (whether specified or generated) is included in the response.
      * @param connectionId The connection id
      * @param dropRequestId The drop request id
@@ -1045,7 +1054,7 @@ public class ConnectionResource extends ApplicationResource {
     @DELETE
     @Consumes(MediaType.WILDCARD)
     @Produces({MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML})
-    @Path("/{connection-id}/contents/drop-requests/{drop-request-id}")
+    @Path("/{connection-id}/drop-requests/{drop-request-id}")
     @PreAuthorize("hasRole('ROLE_DFM')")
     @ApiOperation(
             value = "Cancels and/or removes a request drop of the contents in this connection.",
@@ -1064,6 +1073,7 @@ public class ConnectionResource extends ApplicationResource {
             }
     )
     public Response removeDropRequest(
+            @Context HttpServletRequest httpServletRequest,
             @ApiParam(
                     value = "If the client id is not specified, new one will be generated. This value (whether specified or generated) is included in the response.",
                     required = false
@@ -1085,9 +1095,15 @@ public class ConnectionResource extends ApplicationResource {
             return clusterManager.applyRequest(HttpMethod.DELETE, getAbsolutePath(), getRequestParameters(true), getHeaders()).getResponse();
         }
 
+        // handle expects request (usually from the cluster manager)
+        final String expects = httpServletRequest.getHeader(WebClusterManager.NCM_EXPECTS_HTTP_HEADER);
+        if (expects != null) {
+            return generateContinueResponse().build();
+        }
+
         // delete the drop request
         final DropRequestDTO dropRequest = serviceFacade.deleteFlowFileDropRequest(groupId, connectionId, dropRequestId);
-        dropRequest.setUri(generateResourceUri("controller", "process-groups", groupId, "connections", connectionId, "contents", "drop-requests", dropRequestId));
+        dropRequest.setUri(generateResourceUri("controller", "process-groups", groupId, "connections", connectionId, "drop-requests", dropRequestId));
 
         // create the revision
         final RevisionDTO revision = new RevisionDTO();

http://git-wip-us.apache.org/repos/asf/nifi/blob/39a050d2/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java
index 264268b..0758ce2 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java
@@ -315,6 +315,11 @@ public final class DtoFactory {
         dto.setDroppedSize(dropped.getByteCount());
         dto.setDropped(FormatUtils.formatCount(dropped.getObjectCount()) + " / " + FormatUtils.formatDataSize(dropped.getByteCount()));
 
+        final QueueSize current = dropRequest.getCurrentSize();
+        dto.setCurrentCount(current.getObjectCount());
+        dto.setCurrentSize(current.getByteCount());
+        dto.setCurrent(FormatUtils.formatCount(current.getObjectCount()) + " / " + FormatUtils.formatDataSize(current.getByteCount()));
+
         if (dropRequest.getOriginalSize() != null) {
             final QueueSize original = dropRequest.getOriginalSize();
             dto.setOriginalCount(original.getObjectCount());
@@ -326,13 +331,6 @@ public final class DtoFactory {
             dto.setPercentCompleted(0);
         }
 
-        if (dropRequest.getCurrentSize() != null) {
-            final QueueSize current = dropRequest.getCurrentSize();
-            dto.setCurrentCount(current.getObjectCount());
-            dto.setCurrentSize(current.getByteCount());
-            dto.setCurrent(FormatUtils.formatCount(current.getObjectCount()) + " / " + FormatUtils.formatDataSize(current.getByteCount()));
-        }
-
         return dto;
     }