You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ma...@apache.org on 2016/07/13 21:37:00 UTC

nifi git commit: NIFI-1413: Ensure that if a node's templates don't match the clusters that we take the following actions: -Local templates remain but aren't shown in the cluster's templates. -Any templates from the cluster that don't exist on the node a

Repository: nifi
Updated Branches:
  refs/heads/master a989f6b9c -> 6f6e1b32d


NIFI-1413: Ensure that if a node's templates don't match the clusters that we take the following actions: -Local templates remain but aren't shown in the cluster's templates. -Any templates from the cluster that don't exist on the node are added to the node. -Any conflicting template definitions are replaced by those in the cluster

This closes #596


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/6f6e1b32
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/6f6e1b32
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/6f6e1b32

Branch: refs/heads/master
Commit: 6f6e1b32d98af87c335772fc00089a63b23e7bdf
Parents: a989f6b
Author: Mark Payne <ma...@hotmail.com>
Authored: Thu Jun 30 10:48:46 2016 -0400
Committer: Matt Burgess <ma...@apache.org>
Committed: Wed Jul 13 17:35:15 2016 -0400

----------------------------------------------------------------------
 .../http/StandardHttpResponseMerger.java        |  2 +
 .../http/endpoints/TemplatesEndpointMerger.java | 86 ++++++++++++++++++++
 .../controller/StandardFlowSynchronizer.java    | 49 ++++++++++-
 .../StandardXMLFlowConfigurationDAO.java        |  5 +-
 4 files changed, 136 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/6f6e1b32/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/StandardHttpResponseMerger.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/StandardHttpResponseMerger.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/StandardHttpResponseMerger.java
index 63a2895..590c908 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/StandardHttpResponseMerger.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/StandardHttpResponseMerger.java
@@ -50,6 +50,7 @@ import org.apache.nifi.cluster.coordination.http.endpoints.ReportingTaskEndpoint
 import org.apache.nifi.cluster.coordination.http.endpoints.ReportingTasksEndpointMerger;
 import org.apache.nifi.cluster.coordination.http.endpoints.StatusHistoryEndpointMerger;
 import org.apache.nifi.cluster.coordination.http.endpoints.SystemDiagnosticsEndpointMerger;
+import org.apache.nifi.cluster.coordination.http.endpoints.TemplatesEndpointMerger;
 import org.apache.nifi.cluster.coordination.http.replication.RequestReplicator;
 import org.apache.nifi.cluster.manager.NodeResponse;
 import org.apache.nifi.stream.io.NullOutputStream;
@@ -102,6 +103,7 @@ public class StandardHttpResponseMerger implements HttpResponseMerger {
         endpointMergers.add(new SystemDiagnosticsEndpointMerger());
         endpointMergers.add(new CountersEndpointMerger());
         endpointMergers.add(new FlowMerger());
+        endpointMergers.add(new TemplatesEndpointMerger());
     }
 
     public StandardHttpResponseMerger() {

http://git-wip-us.apache.org/repos/asf/nifi/blob/6f6e1b32/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/TemplatesEndpointMerger.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/TemplatesEndpointMerger.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/TemplatesEndpointMerger.java
new file mode 100644
index 0000000..a07289d
--- /dev/null
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/TemplatesEndpointMerger.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.cluster.coordination.http.endpoints;
+
+import java.net.URI;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+
+import org.apache.nifi.cluster.coordination.http.EndpointResponseMerger;
+import org.apache.nifi.cluster.manager.NodeResponse;
+import org.apache.nifi.web.api.dto.TemplateDTO;
+import org.apache.nifi.web.api.entity.TemplatesEntity;
+
+public class TemplatesEndpointMerger implements EndpointResponseMerger {
+    public static final Pattern TEMPLATES_URI_PATTERN = Pattern.compile("/nifi-api/process-groups/(?:(?:root)|(?:[a-f0-9\\-]{36}))/templates");
+
+    @Override
+    public boolean canHandle(final URI uri, final String method) {
+        return "GET".equalsIgnoreCase(method) && TEMPLATES_URI_PATTERN.matcher(uri.getPath()).matches();
+    }
+
+    protected Class<TemplatesEntity> getEntityClass() {
+        return TemplatesEntity.class;
+    }
+
+    protected Set<TemplateDTO> getDtos(final TemplatesEntity entity) {
+        return entity.getTemplates();
+    }
+
+    protected String getComponentId(final TemplateDTO dto) {
+        return dto.getId();
+    }
+
+    @Override
+    public final NodeResponse merge(final URI uri, final String method, final Set<NodeResponse> successfulResponses, final Set<NodeResponse> problematicResponses, final NodeResponse clientResponse) {
+        if (!canHandle(uri, method)) {
+            throw new IllegalArgumentException("Cannot use Endpoint Mapper of type " + getClass().getSimpleName() + " to map responses for URI " + uri + ", HTTP Method " + method);
+        }
+
+        final TemplatesEntity responseEntity = clientResponse.getClientResponse().getEntity(getEntityClass());
+
+        // Find the templates that all nodes know about. We do this by mapping Template ID to Template and
+        // then for each node, removing any template whose ID is not known to that node. After iterating over
+        // all of the nodes, we are left with a Map whose contents are those Templates known by all nodes.
+        Map<String, TemplateDTO> templatesById = null;
+        for (final NodeResponse nodeResponse : successfulResponses) {
+            final TemplatesEntity entity = nodeResponse == clientResponse ? responseEntity : nodeResponse.getClientResponse().getEntity(TemplatesEntity.class);
+            final Set<TemplateDTO> templateDtos = entity.getTemplates();
+            final Map<String, TemplateDTO> nodeTemplatesById = templateDtos.stream().collect(Collectors.toMap(dto -> dto.getId(), dto -> dto));
+
+            if (templatesById == null) {
+                // Create new HashMap so that the map that we have is modifiable.
+                templatesById = new HashMap<>(nodeTemplatesById);
+            } else {
+                // Only keep templates that are known by this node.
+                templatesById.keySet().retainAll(nodeTemplatesById.keySet());
+            }
+        }
+
+        // Set the templates to the set of templates that all nodes know about
+        responseEntity.setTemplates(new HashSet<>(templatesById.values()));
+
+        // create a new client response
+        return new NodeResponse(clientResponse, responseEntity);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/6f6e1b32/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSynchronizer.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSynchronizer.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSynchronizer.java
index 7fedac0..8a6be16 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSynchronizer.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSynchronizer.java
@@ -142,7 +142,6 @@ public class StandardFlowSynchronizer implements FlowSynchronizer {
     @Override
     public void sync(final FlowController controller, final DataFlow proposedFlow, final StringEncryptor encryptor)
             throws FlowSerializationException, UninheritableFlowException, FlowSynchronizationException {
-        // TODO - Include templates
 
         // handle corner cases involving no proposed flow
         if (proposedFlow == null) {
@@ -285,6 +284,20 @@ public class StandardFlowSynchronizer implements FlowSynchronizer {
                         rootGroup = updateProcessGroup(controller, /* parent group */ null, rootGroupElement, encryptor, encodingVersion);
                     }
 
+                    // If there are any Templates that do not exist in the Proposed Flow that do exist in the 'existing flow', we need
+                    // to ensure that we also add those to the appropriate Process Groups, so that we don't lose them.
+                    final Document existingFlowConfiguration = parseFlowBytes(existingFlow);
+                    if (existingFlowConfiguration != null) {
+                        final Element existingRootElement = (Element) existingFlowConfiguration.getElementsByTagName("flowController").item(0);
+                        if (existingRootElement != null) {
+                            final Element existingRootGroupElement = (Element) existingRootElement.getElementsByTagName("rootGroup").item(0);
+                            if (existingRootElement != null) {
+                                final FlowEncodingVersion existingEncodingVersion = FlowEncodingVersion.parse(existingFlowConfiguration.getDocumentElement());
+                                addLocalTemplates(existingRootGroupElement, rootGroup, existingEncodingVersion);
+                            }
+                        }
+                    }
+
                     final Element controllerServicesElement = DomUtils.getChild(rootElement, "controllerServices");
                     if (controllerServicesElement != null) {
                         final List<Element> serviceElements = DomUtils.getChildElementsByTagName(controllerServicesElement, "controllerService");
@@ -346,6 +359,29 @@ public class StandardFlowSynchronizer implements FlowSynchronizer {
         }
     }
 
+    private void addLocalTemplates(final Element processGroupElement, final ProcessGroup processGroup, final FlowEncodingVersion encodingVersion) {
+        // Replace the templates with those from the proposed flow
+        final List<Element> templateNodeList = getChildrenByTagName(processGroupElement, "template");
+        if (templateNodeList != null) {
+            for (final Element templateElement : templateNodeList) {
+                final TemplateDTO templateDto = TemplateUtils.parseDto(templateElement);
+                final Template template = new Template(templateDto);
+
+                // If the Process Group does not have the template, add it.
+                if (processGroup.getTemplate(template.getIdentifier()) == null) {
+                    processGroup.addTemplate(template);
+                }
+            }
+        }
+
+        final List<Element> childGroupElements = getChildrenByTagName(processGroupElement, "processGroup");
+        for (final Element childGroupElement : childGroupElements) {
+            final String childGroupId = getString(childGroupElement, "id");
+            final ProcessGroup childGroup = processGroup.getProcessGroup(childGroupId);
+            addLocalTemplates(childGroupElement, childGroup, encodingVersion);
+        }
+    }
+
     void scaleRootGroup(final ProcessGroup rootGroup, final FlowEncodingVersion encodingVersion) {
         if (encodingVersion == null || encodingVersion.getMajorVersion() < 1) {
             // Calculate new Positions if the encoding version of the flow is older than 1.0.
@@ -735,12 +771,17 @@ public class StandardFlowSynchronizer implements FlowSynchronizer {
 
         // Replace the templates with those from the proposed flow
         final List<Element> templateNodeList = getChildrenByTagName(processGroupElement, "template");
-        for (final Template template : processGroup.getTemplates()) {
-            processGroup.removeTemplate(template);
-        }
         for (final Element templateElement : templateNodeList) {
             final TemplateDTO templateDto = TemplateUtils.parseDto(templateElement);
             final Template template = new Template(templateDto);
+
+            // If the Process Group already has the template, remove it and add it again. We do this
+            // to ensure that all of the nodes have the same view of the template. Templates are immutable,
+            // so any two nodes that have a template with the same ID should have the exact same template.
+            // This just makes sure that they do.
+            if (processGroup.getTemplate(template.getIdentifier()) != null) {
+                processGroup.removeTemplate(template);
+            }
             processGroup.addTemplate(template);
         }
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/6f6e1b32/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/persistence/StandardXMLFlowConfigurationDAO.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/persistence/StandardXMLFlowConfigurationDAO.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/persistence/StandardXMLFlowConfigurationDAO.java
index ffe212d..5047683 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/persistence/StandardXMLFlowConfigurationDAO.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/persistence/StandardXMLFlowConfigurationDAO.java
@@ -16,7 +16,6 @@
  */
 package org.apache.nifi.persistence;
 
-import java.io.ByteArrayInputStream;
 import java.io.File;
 import java.io.IOException;
 import java.io.InputStream;
@@ -80,7 +79,9 @@ public final class StandardXMLFlowConfigurationDAO implements FlowConfigurationD
 
         final FlowSynchronizer flowSynchronizer = new StandardFlowSynchronizer(encryptor);
         controller.synchronize(flowSynchronizer, dataFlow);
-        save(new ByteArrayInputStream(dataFlow.getFlow()));
+
+        // save based on the controller, not the provided data flow because Process Groups may contain 'local' templates.
+        save(controller);
     }
 
     @Override