You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by al...@apache.org on 2016/02/06 04:36:04 UTC

[08/50] nifi git commit: NIFI-259: - Merging responses when clustered to populate node details. - Fixed bug when clearing processor state when clustered. - Cleared the table after successfully clearing state.

NIFI-259:
- Merging responses when clustered to populate node details.
- Fixed bug when clearing processor state when clustered.
- Cleared the table after successfully clearing state.


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/3f4bd919
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/3f4bd919
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/3f4bd919

Branch: refs/heads/master
Commit: 3f4bd919a9455998209ad7de4dde8e8a3e65e985
Parents: 06f525b
Author: Matt Gilman <ma...@gmail.com>
Authored: Wed Jan 13 15:12:17 2016 -0500
Committer: Matt Gilman <ma...@gmail.com>
Committed: Wed Jan 13 15:12:17 2016 -0500

----------------------------------------------------------------------
 .../cluster/manager/impl/WebClusterManager.java | 65 +++++++++++++++++++-
 .../apache/nifi/web/api/ProcessorResource.java  |  9 +++
 .../webapp/js/nf/canvas/nf-component-state.js   | 10 +--
 3 files changed, 76 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/3f4bd919/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManager.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManager.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManager.java
index a58de2a..db98e7d 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManager.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManager.java
@@ -203,6 +203,7 @@ import org.apache.nifi.util.ReflectionUtils;
 import org.apache.nifi.web.OptimisticLockingManager;
 import org.apache.nifi.web.Revision;
 import org.apache.nifi.web.UpdateRevision;
+import org.apache.nifi.web.api.dto.ComponentStateDTO;
 import org.apache.nifi.web.api.dto.ControllerServiceDTO;
 import org.apache.nifi.web.api.dto.ControllerServiceReferencingComponentDTO;
 import org.apache.nifi.web.api.dto.DropRequestDTO;
@@ -217,6 +218,8 @@ import org.apache.nifi.web.api.dto.RemoteProcessGroupContentsDTO;
 import org.apache.nifi.web.api.dto.RemoteProcessGroupDTO;
 import org.apache.nifi.web.api.dto.RemoteProcessGroupPortDTO;
 import org.apache.nifi.web.api.dto.ReportingTaskDTO;
+import org.apache.nifi.web.api.dto.StateEntryDTO;
+import org.apache.nifi.web.api.dto.StateMapDTO;
 import org.apache.nifi.web.api.dto.provenance.ProvenanceDTO;
 import org.apache.nifi.web.api.dto.provenance.ProvenanceEventDTO;
 import org.apache.nifi.web.api.dto.provenance.ProvenanceRequestDTO;
@@ -225,6 +228,7 @@ import org.apache.nifi.web.api.dto.status.ClusterStatusHistoryDTO;
 import org.apache.nifi.web.api.dto.status.NodeStatusHistoryDTO;
 import org.apache.nifi.web.api.dto.status.StatusHistoryDTO;
 import org.apache.nifi.web.api.dto.status.StatusSnapshotDTO;
+import org.apache.nifi.web.api.entity.ComponentStateEntity;
 import org.apache.nifi.web.api.entity.ControllerServiceEntity;
 import org.apache.nifi.web.api.entity.ControllerServiceReferencingComponentsEntity;
 import org.apache.nifi.web.api.entity.ControllerServicesEntity;
@@ -311,6 +315,7 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
 
     public static final Pattern PROCESSORS_URI_PATTERN = Pattern.compile("/nifi-api/controller/process-groups/(?:(?:root)|(?:[a-f0-9\\-]{36}))/processors");
     public static final Pattern PROCESSOR_URI_PATTERN = Pattern.compile("/nifi-api/controller/process-groups/(?:(?:root)|(?:[a-f0-9\\-]{36}))/processors/[a-f0-9\\-]{36}");
+    public static final Pattern PROCESSOR_STATE_URI_PATTERN = Pattern.compile("/nifi-api/controller/process-groups/(?:(?:root)|(?:[a-f0-9\\-]{36}))/processors/[a-f0-9\\-]{36}/state");
     public static final Pattern CLUSTER_PROCESSOR_URI_PATTERN = Pattern.compile("/nifi-api/cluster/processors/[a-f0-9\\-]{36}");
 
     public static final Pattern REMOTE_PROCESS_GROUPS_URI_PATTERN = Pattern.compile("/nifi-api/controller/process-groups/(?:(?:root)|(?:[a-f0-9\\-]{36}))/remote-process-groups");
@@ -327,9 +332,11 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
     public static final Pattern COUNTERS_URI = Pattern.compile("/nifi-api/controller/counters/[a-f0-9\\-]{36}");
     public static final String CONTROLLER_SERVICES_URI = "/nifi-api/controller/controller-services/node";
     public static final Pattern CONTROLLER_SERVICE_URI_PATTERN = Pattern.compile("/nifi-api/controller/controller-services/node/[a-f0-9\\-]{36}");
+    public static final Pattern CONTROLLER_SERVICE_STATE_URI_PATTERN = Pattern.compile("/nifi-api/controller/controller-services/node/[a-f0-9\\-]{36}/state");
     public static final Pattern CONTROLLER_SERVICE_REFERENCES_URI_PATTERN = Pattern.compile("/nifi-api/controller/controller-services/node/[a-f0-9\\-]{36}/references");
     public static final String REPORTING_TASKS_URI = "/nifi-api/controller/reporting-tasks/node";
     public static final Pattern REPORTING_TASK_URI_PATTERN = Pattern.compile("/nifi-api/controller/reporting-tasks/node/[a-f0-9\\-]{36}");
+    public static final Pattern REPORTING_TASK_STATE_URI_PATTERN = Pattern.compile("/nifi-api/controller/reporting-tasks/node/[a-f0-9\\-]{36}/state");
 
     @Deprecated
     public static final Pattern QUEUE_CONTENTS_URI = Pattern.compile("/nifi-api/controller/process-groups/(?:(?:root)|(?:[a-f0-9\\-]{36}))/connections/[a-f0-9\\-]{36}/contents");
@@ -2431,6 +2438,10 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
         return false;
     }
 
+    private static boolean isProcessorStateEndpoint(final URI uri, final String method) {
+        return "GET".equalsIgnoreCase(method) && PROCESSOR_STATE_URI_PATTERN.matcher(uri.getPath()).matches();
+    }
+
     private static boolean isProcessGroupEndpoint(final URI uri, final String method) {
         return ("GET".equalsIgnoreCase(method) || "PUT".equalsIgnoreCase(method)) && PROCESS_GROUP_URI_PATTERN.matcher(uri.getPath()).matches();
     }
@@ -2498,6 +2509,10 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
         return false;
     }
 
+    private static boolean isControllerServiceStateEndpoint(final URI uri, final String method) {
+        return "GET".equalsIgnoreCase(method) && CONTROLLER_SERVICE_STATE_URI_PATTERN.matcher(uri.getPath()).matches();
+    }
+
     private static boolean isControllerServiceReferenceEndpoint(final URI uri, final String method) {
         if (("GET".equalsIgnoreCase(method) || "PUT".equalsIgnoreCase(method)) && CONTROLLER_SERVICE_REFERENCES_URI_PATTERN.matcher(uri.getPath()).matches()) {
             return true;
@@ -2520,6 +2535,10 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
         return false;
     }
 
+    private static boolean isReportingTaskStateEndpoint(final URI uri, final String method) {
+        return "GET".equalsIgnoreCase(method) && REPORTING_TASK_STATE_URI_PATTERN.matcher(uri.getPath()).matches();
+    }
+
     private static boolean isDropRequestEndpoint(final URI uri, final String method) {
         if ("DELETE".equalsIgnoreCase(method) && QUEUE_CONTENTS_URI.matcher(uri.getPath()).matches()) {
             return true;
@@ -2533,13 +2552,13 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
     }
 
     static boolean isResponseInterpreted(final URI uri, final String method) {
-        return isProcessorsEndpoint(uri, method) || isProcessorEndpoint(uri, method)
+        return isProcessorsEndpoint(uri, method) || isProcessorEndpoint(uri, method) || isProcessorStateEndpoint(uri, method)
                 || isRemoteProcessGroupsEndpoint(uri, method) || isRemoteProcessGroupEndpoint(uri, method)
                 || isProcessGroupEndpoint(uri, method)
                 || isTemplateEndpoint(uri, method) || isFlowSnippetEndpoint(uri, method)
                 || isProvenanceQueryEndpoint(uri, method) || isProvenanceEventEndpoint(uri, method)
-                || isControllerServicesEndpoint(uri, method) || isControllerServiceEndpoint(uri, method) || isControllerServiceReferenceEndpoint(uri, method)
-                || isReportingTasksEndpoint(uri, method) || isReportingTaskEndpoint(uri, method)
+                || isControllerServicesEndpoint(uri, method) || isControllerServiceEndpoint(uri, method) || isControllerServiceReferenceEndpoint(uri, method) || isControllerServiceStateEndpoint(uri, method)
+                || isReportingTasksEndpoint(uri, method) || isReportingTaskEndpoint(uri, method) || isReportingTaskStateEndpoint(uri, method)
                 || isDropRequestEndpoint(uri, method);
     }
 
@@ -2558,6 +2577,28 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
         processor.setValidationErrors(normalizedMergedValidationErrors(validationErrorMap, processorMap.size()));
     }
 
+    private void mergeComponentState(final ComponentStateDTO componentState, Map<NodeIdentifier, ComponentStateDTO> componentStateMap) {
+        final List<StateEntryDTO> localStateEntries = new ArrayList<>();
+
+        for (final Map.Entry<NodeIdentifier, ComponentStateDTO> nodeEntry : componentStateMap.entrySet()) {
+            final ComponentStateDTO nodeComponentState = nodeEntry.getValue();
+            final NodeIdentifier nodeId = nodeEntry.getKey();
+            final String nodeAddress = nodeId.getApiAddress() + ":" + nodeId.getApiPort();
+
+            final StateMapDTO nodeLocalStateMap = nodeComponentState.getLocalState();
+            if (nodeLocalStateMap.getState() != null) {
+                for (final StateEntryDTO nodeStateEntry : nodeLocalStateMap.getState()) {
+                    nodeStateEntry.setClusterNodeId(nodeId.getId());
+                    nodeStateEntry.setClusterNodeAddress(nodeAddress);
+                    localStateEntries.add(nodeStateEntry);
+                }
+            }
+        }
+
+        // add all the local state entries
+        componentState.getLocalState().setState(localStateEntries);
+    }
+
     private void mergeProvenanceQueryResults(final ProvenanceDTO provenanceDto, final Map<NodeIdentifier, ProvenanceDTO> resultMap, final Set<NodeResponse> problematicResponses) {
         final ProvenanceResultsDTO results = provenanceDto.getResults();
         final ProvenanceRequestDTO request = provenanceDto.getRequest();
@@ -3452,6 +3493,24 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
             mergeListingRequests(listingRequest, resultsMap);
 
             clientResponse = new NodeResponse(clientResponse, responseEntity);
+        } else if (hasSuccessfulClientResponse && (isProcessorStateEndpoint(uri, method) || isControllerServiceStateEndpoint(uri, method) || isReportingTaskStateEndpoint(uri, method))) {
+            final ComponentStateEntity responseEntity = clientResponse.getClientResponse().getEntity(ComponentStateEntity.class);
+            final ComponentStateDTO componentState = responseEntity.getComponentState();
+
+            final Map<NodeIdentifier, ComponentStateDTO> resultsMap = new HashMap<>();
+            for (final NodeResponse nodeResponse : updatedNodesMap.values()) {
+                if (problematicNodeResponses.contains(nodeResponse)) {
+                    continue;
+                }
+
+                final ComponentStateEntity nodeResponseEntity = nodeResponse == clientResponse ? responseEntity : nodeResponse.getClientResponse().getEntity(ComponentStateEntity.class);
+                final ComponentStateDTO nodeComponentState = nodeResponseEntity.getComponentState();
+
+                resultsMap.put(nodeResponse.getNodeId(), nodeComponentState);
+            }
+            mergeComponentState(componentState, resultsMap);
+
+            clientResponse = new NodeResponse(clientResponse, responseEntity);
         } else {
             if (!nodeResponsesToDrain.isEmpty()) {
                 drainResponses(nodeResponsesToDrain);

http://git-wip-us.apache.org/repos/asf/nifi/blob/3f4bd919/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ProcessorResource.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ProcessorResource.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ProcessorResource.java
index 9418749..778c9bf 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ProcessorResource.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ProcessorResource.java
@@ -624,6 +624,7 @@ public class ProcessorResource extends ApplicationResource {
     /**
      * Clears the state for a processor.
      *
+     * @param httpServletRequest servlet request
      * @param clientId Optional client id. If the client id is not specified, a new one will be generated. This value (whether specified or generated) is included in the response.
      * @param version The revision is used to verify the client is working with the latest version of the flow.
      * @param id The id of the processor
@@ -651,6 +652,7 @@ public class ProcessorResource extends ApplicationResource {
         }
     )
     public Response clearState(
+        @Context HttpServletRequest httpServletRequest,
         @ApiParam(
             value = "If the client id is not specified, new one will be generated. This value (whether specified or generated) is included in the response.",
             required = false
@@ -672,6 +674,13 @@ public class ProcessorResource extends ApplicationResource {
             return clusterManager.applyRequest(HttpMethod.POST, getAbsolutePath(), getRequestParameters(true), getHeaders()).getResponse();
         }
 
+        // handle expects request (usually from the cluster manager)
+        final String expects = httpServletRequest.getHeader(WebClusterManager.NCM_EXPECTS_HTTP_HEADER);
+        if (expects != null) {
+            serviceFacade.verifyCanClearProcessorState(groupId, id);
+            return generateContinueResponse().build();
+        }
+
         // get the revision specified by the user
         Long revision = null;
         if (version != null) {

http://git-wip-us.apache.org/repos/asf/nifi/blob/3f4bd919/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/canvas/nf-component-state.js
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/canvas/nf-component-state.js b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/canvas/nf-component-state.js
index d4fb63c..c8a2d41 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/canvas/nf-component-state.js
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/canvas/nf-component-state.js
@@ -141,7 +141,7 @@ nf.ComponentState = (function () {
             $.each(localState.state, function (i, stateEntry) {
                 componentStateData.addItem($.extend({
                     id: count++,
-                    scope: stateEntry.nodeAddress
+                    scope: stateEntry.clusterNodeAddress
                 }, stateEntry));
             });
         }
@@ -224,9 +224,6 @@ nf.ComponentState = (function () {
             // clear state link
             $('#clear-link').on('click', function () {
                 if ($(this).hasClass('disabled') === false) {
-                    // clear the table
-                    clearTable();
-
                     // clear the state
                     var revision = nf.Client.getRevision();
                     var component = $('#component-state-table').data('component');
@@ -242,6 +239,9 @@ nf.ComponentState = (function () {
                         // update the revision
                         nf.Client.setRevision(response.revision);
 
+                        // clear the table
+                        clearTable();
+
                         // reload the table with no state
                         loadComponentState()
                     }).fail(nf.Common.handleAjaxError);
@@ -256,7 +256,7 @@ nf.ComponentState = (function () {
 
             // conditionally show the cluster node identifier
             if (nf.Canvas.isClustered()) {
-                componentStateColumns.push({id: 'scope', field: 'scope', name: 'Scope', sortable: true, resizable: true, formatter: scopeFormatter});
+                componentStateColumns.push({id: 'scope', field: 'scope', name: 'Scope', sortable: true, resizable: true});
             }
 
             var componentStateOptions = {