You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ma...@apache.org on 2016/07/13 21:37:00 UTC
nifi git commit: NIFI-1413: Ensure that if a node's templates don't
match the clusters that we take the following actions: -Local templates
remain but aren't shown in the cluster's templates. -Any templates from the
cluster that don't exist on the node a
Repository: nifi
Updated Branches:
refs/heads/master a989f6b9c -> 6f6e1b32d
NIFI-1413: Ensure that if a node's templates don't match the clusters that we take the following actions: -Local templates remain but aren't shown in the cluster's templates. -Any templates from the cluster that don't exist on the node are added to the node. -Any conflicting template definitions are replaced by those in the cluster
This closes #596
Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/6f6e1b32
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/6f6e1b32
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/6f6e1b32
Branch: refs/heads/master
Commit: 6f6e1b32d98af87c335772fc00089a63b23e7bdf
Parents: a989f6b
Author: Mark Payne <ma...@hotmail.com>
Authored: Thu Jun 30 10:48:46 2016 -0400
Committer: Matt Burgess <ma...@apache.org>
Committed: Wed Jul 13 17:35:15 2016 -0400
----------------------------------------------------------------------
.../http/StandardHttpResponseMerger.java | 2 +
.../http/endpoints/TemplatesEndpointMerger.java | 86 ++++++++++++++++++++
.../controller/StandardFlowSynchronizer.java | 49 ++++++++++-
.../StandardXMLFlowConfigurationDAO.java | 5 +-
4 files changed, 136 insertions(+), 6 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/nifi/blob/6f6e1b32/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/StandardHttpResponseMerger.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/StandardHttpResponseMerger.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/StandardHttpResponseMerger.java
index 63a2895..590c908 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/StandardHttpResponseMerger.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/StandardHttpResponseMerger.java
@@ -50,6 +50,7 @@ import org.apache.nifi.cluster.coordination.http.endpoints.ReportingTaskEndpoint
import org.apache.nifi.cluster.coordination.http.endpoints.ReportingTasksEndpointMerger;
import org.apache.nifi.cluster.coordination.http.endpoints.StatusHistoryEndpointMerger;
import org.apache.nifi.cluster.coordination.http.endpoints.SystemDiagnosticsEndpointMerger;
+import org.apache.nifi.cluster.coordination.http.endpoints.TemplatesEndpointMerger;
import org.apache.nifi.cluster.coordination.http.replication.RequestReplicator;
import org.apache.nifi.cluster.manager.NodeResponse;
import org.apache.nifi.stream.io.NullOutputStream;
@@ -102,6 +103,7 @@ public class StandardHttpResponseMerger implements HttpResponseMerger {
endpointMergers.add(new SystemDiagnosticsEndpointMerger());
endpointMergers.add(new CountersEndpointMerger());
endpointMergers.add(new FlowMerger());
+ endpointMergers.add(new TemplatesEndpointMerger());
}
public StandardHttpResponseMerger() {
http://git-wip-us.apache.org/repos/asf/nifi/blob/6f6e1b32/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/TemplatesEndpointMerger.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/TemplatesEndpointMerger.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/TemplatesEndpointMerger.java
new file mode 100644
index 0000000..a07289d
--- /dev/null
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/TemplatesEndpointMerger.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.cluster.coordination.http.endpoints;
+
+import java.net.URI;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+
+import org.apache.nifi.cluster.coordination.http.EndpointResponseMerger;
+import org.apache.nifi.cluster.manager.NodeResponse;
+import org.apache.nifi.web.api.dto.TemplateDTO;
+import org.apache.nifi.web.api.entity.TemplatesEntity;
+
+public class TemplatesEndpointMerger implements EndpointResponseMerger {
+ public static final Pattern TEMPLATES_URI_PATTERN = Pattern.compile("/nifi-api/process-groups/(?:(?:root)|(?:[a-f0-9\\-]{36}))/templates");
+
+ @Override
+ public boolean canHandle(final URI uri, final String method) {
+ return "GET".equalsIgnoreCase(method) && TEMPLATES_URI_PATTERN.matcher(uri.getPath()).matches();
+ }
+
+ protected Class<TemplatesEntity> getEntityClass() {
+ return TemplatesEntity.class;
+ }
+
+ protected Set<TemplateDTO> getDtos(final TemplatesEntity entity) {
+ return entity.getTemplates();
+ }
+
+ protected String getComponentId(final TemplateDTO dto) {
+ return dto.getId();
+ }
+
+ @Override
+ public final NodeResponse merge(final URI uri, final String method, final Set<NodeResponse> successfulResponses, final Set<NodeResponse> problematicResponses, final NodeResponse clientResponse) {
+ if (!canHandle(uri, method)) {
+ throw new IllegalArgumentException("Cannot use Endpoint Mapper of type " + getClass().getSimpleName() + " to map responses for URI " + uri + ", HTTP Method " + method);
+ }
+
+ final TemplatesEntity responseEntity = clientResponse.getClientResponse().getEntity(getEntityClass());
+
+ // Find the templates that all nodes know about. We do this by mapping Template ID to Template and
+ // then for each node, removing any template whose ID is not known to that node. After iterating over
+ // all of the nodes, we are left with a Map whose contents are those Templates known by all nodes.
+ Map<String, TemplateDTO> templatesById = null;
+ for (final NodeResponse nodeResponse : successfulResponses) {
+ final TemplatesEntity entity = nodeResponse == clientResponse ? responseEntity : nodeResponse.getClientResponse().getEntity(TemplatesEntity.class);
+ final Set<TemplateDTO> templateDtos = entity.getTemplates();
+ final Map<String, TemplateDTO> nodeTemplatesById = templateDtos.stream().collect(Collectors.toMap(dto -> dto.getId(), dto -> dto));
+
+ if (templatesById == null) {
+ // Create new HashMap so that the map that we have is modifiable.
+ templatesById = new HashMap<>(nodeTemplatesById);
+ } else {
+ // Only keep templates that are known by this node.
+ templatesById.keySet().retainAll(nodeTemplatesById.keySet());
+ }
+ }
+
+ // Set the templates to the set of templates that all nodes know about
+ responseEntity.setTemplates(new HashSet<>(templatesById.values()));
+
+ // create a new client response
+ return new NodeResponse(clientResponse, responseEntity);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/nifi/blob/6f6e1b32/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSynchronizer.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSynchronizer.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSynchronizer.java
index 7fedac0..8a6be16 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSynchronizer.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSynchronizer.java
@@ -142,7 +142,6 @@ public class StandardFlowSynchronizer implements FlowSynchronizer {
@Override
public void sync(final FlowController controller, final DataFlow proposedFlow, final StringEncryptor encryptor)
throws FlowSerializationException, UninheritableFlowException, FlowSynchronizationException {
- // TODO - Include templates
// handle corner cases involving no proposed flow
if (proposedFlow == null) {
@@ -285,6 +284,20 @@ public class StandardFlowSynchronizer implements FlowSynchronizer {
rootGroup = updateProcessGroup(controller, /* parent group */ null, rootGroupElement, encryptor, encodingVersion);
}
+ // If there are any Templates that do not exist in the Proposed Flow that do exist in the 'existing flow', we need
+ // to ensure that we also add those to the appropriate Process Groups, so that we don't lose them.
+ final Document existingFlowConfiguration = parseFlowBytes(existingFlow);
+ if (existingFlowConfiguration != null) {
+ final Element existingRootElement = (Element) existingFlowConfiguration.getElementsByTagName("flowController").item(0);
+ if (existingRootElement != null) {
+ final Element existingRootGroupElement = (Element) existingRootElement.getElementsByTagName("rootGroup").item(0);
+ if (existingRootElement != null) {
+ final FlowEncodingVersion existingEncodingVersion = FlowEncodingVersion.parse(existingFlowConfiguration.getDocumentElement());
+ addLocalTemplates(existingRootGroupElement, rootGroup, existingEncodingVersion);
+ }
+ }
+ }
+
final Element controllerServicesElement = DomUtils.getChild(rootElement, "controllerServices");
if (controllerServicesElement != null) {
final List<Element> serviceElements = DomUtils.getChildElementsByTagName(controllerServicesElement, "controllerService");
@@ -346,6 +359,29 @@ public class StandardFlowSynchronizer implements FlowSynchronizer {
}
}
+ private void addLocalTemplates(final Element processGroupElement, final ProcessGroup processGroup, final FlowEncodingVersion encodingVersion) {
+ // Replace the templates with those from the proposed flow
+ final List<Element> templateNodeList = getChildrenByTagName(processGroupElement, "template");
+ if (templateNodeList != null) {
+ for (final Element templateElement : templateNodeList) {
+ final TemplateDTO templateDto = TemplateUtils.parseDto(templateElement);
+ final Template template = new Template(templateDto);
+
+ // If the Process Group does not have the template, add it.
+ if (processGroup.getTemplate(template.getIdentifier()) == null) {
+ processGroup.addTemplate(template);
+ }
+ }
+ }
+
+ final List<Element> childGroupElements = getChildrenByTagName(processGroupElement, "processGroup");
+ for (final Element childGroupElement : childGroupElements) {
+ final String childGroupId = getString(childGroupElement, "id");
+ final ProcessGroup childGroup = processGroup.getProcessGroup(childGroupId);
+ addLocalTemplates(childGroupElement, childGroup, encodingVersion);
+ }
+ }
+
void scaleRootGroup(final ProcessGroup rootGroup, final FlowEncodingVersion encodingVersion) {
if (encodingVersion == null || encodingVersion.getMajorVersion() < 1) {
// Calculate new Positions if the encoding version of the flow is older than 1.0.
@@ -735,12 +771,17 @@ public class StandardFlowSynchronizer implements FlowSynchronizer {
// Replace the templates with those from the proposed flow
final List<Element> templateNodeList = getChildrenByTagName(processGroupElement, "template");
- for (final Template template : processGroup.getTemplates()) {
- processGroup.removeTemplate(template);
- }
for (final Element templateElement : templateNodeList) {
final TemplateDTO templateDto = TemplateUtils.parseDto(templateElement);
final Template template = new Template(templateDto);
+
+ // If the Process Group already has the template, remove it and add it again. We do this
+ // to ensure that all of the nodes have the same view of the template. Templates are immutable,
+ // so any two nodes that have a template with the same ID should have the exact same template.
+ // This just makes sure that they do.
+ if (processGroup.getTemplate(template.getIdentifier()) != null) {
+ processGroup.removeTemplate(template);
+ }
processGroup.addTemplate(template);
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/6f6e1b32/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/persistence/StandardXMLFlowConfigurationDAO.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/persistence/StandardXMLFlowConfigurationDAO.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/persistence/StandardXMLFlowConfigurationDAO.java
index ffe212d..5047683 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/persistence/StandardXMLFlowConfigurationDAO.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/persistence/StandardXMLFlowConfigurationDAO.java
@@ -16,7 +16,6 @@
*/
package org.apache.nifi.persistence;
-import java.io.ByteArrayInputStream;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
@@ -80,7 +79,9 @@ public final class StandardXMLFlowConfigurationDAO implements FlowConfigurationD
final FlowSynchronizer flowSynchronizer = new StandardFlowSynchronizer(encryptor);
controller.synchronize(flowSynchronizer, dataFlow);
- save(new ByteArrayInputStream(dataFlow.getFlow()));
+
+ // save based on the controller, not the provided data flow because Process Groups may contain 'local' templates.
+ save(controller);
}
@Override