You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by mc...@apache.org on 2016/09/02 19:08:46 UTC

nifi git commit: NIFI-2708 Added merging of SiteToSite details NIFI-2708 Updated to include ports from clientDto for port merging.

Repository: nifi
Updated Branches:
  refs/heads/master d37480eaf -> 97e2f406d


NIFI-2708 Added merging of SiteToSite details
NIFI-2708 Updated to include ports from clientDto for port merging.

This closes #981


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/97e2f406
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/97e2f406
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/97e2f406

Branch: refs/heads/master
Commit: 97e2f406da83602ab2cf42bf3b7fbfcb9ea1686c
Parents: d37480e
Author: Jeff Storck <jt...@gmail.com>
Authored: Wed Aug 31 19:34:57 2016 -0400
Committer: Matt Gilman <ma...@gmail.com>
Committed: Fri Sep 2 14:55:08 2016 -0400

----------------------------------------------------------------------
 .../http/StandardHttpResponseMerger.java        |   2 +
 .../endpoints/ControllerEndpointMerger.java     | 110 +++++++++++++++++++
 .../nifi/cluster/manager/PortEntityMerger.java  |   2 +-
 3 files changed, 113 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/97e2f406/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/StandardHttpResponseMerger.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/StandardHttpResponseMerger.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/StandardHttpResponseMerger.java
index 7b1d8f6..a7e4883 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/StandardHttpResponseMerger.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/StandardHttpResponseMerger.java
@@ -23,6 +23,7 @@ import org.apache.nifi.cluster.coordination.http.endpoints.ConnectionStatusEndpi
 import org.apache.nifi.cluster.coordination.http.endpoints.ConnectionsEndpointMerger;
 import org.apache.nifi.cluster.coordination.http.endpoints.ControllerBulletinsEndpointMerger;
 import org.apache.nifi.cluster.coordination.http.endpoints.ControllerConfigurationEndpointMerger;
+import org.apache.nifi.cluster.coordination.http.endpoints.ControllerEndpointMerger;
 import org.apache.nifi.cluster.coordination.http.endpoints.ControllerServiceEndpointMerger;
 import org.apache.nifi.cluster.coordination.http.endpoints.ControllerServiceReferenceEndpointMerger;
 import org.apache.nifi.cluster.coordination.http.endpoints.ControllerServicesEndpointMerger;
@@ -131,6 +132,7 @@ public class StandardHttpResponseMerger implements HttpResponseMerger {
         endpointMergers.add(new LabelsEndpointMerger());
         endpointMergers.add(new FunnelEndpointMerger());
         endpointMergers.add(new FunnelsEndpointMerger());
+        endpointMergers.add(new ControllerEndpointMerger());
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/nifi/blob/97e2f406/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/ControllerEndpointMerger.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/ControllerEndpointMerger.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/ControllerEndpointMerger.java
new file mode 100644
index 0000000..6e38860
--- /dev/null
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/ControllerEndpointMerger.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.cluster.coordination.http.endpoints;
+
+import com.google.common.collect.Sets;
+import org.apache.nifi.cluster.manager.NodeResponse;
+import org.apache.nifi.cluster.manager.PortEntityMerger;
+import org.apache.nifi.cluster.protocol.NodeIdentifier;
+import org.apache.nifi.web.api.dto.ControllerDTO;
+import org.apache.nifi.web.api.dto.PortDTO;
+import org.apache.nifi.web.api.entity.ControllerEntity;
+
+import java.net.URI;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+import java.util.regex.Pattern;
+
+public class ControllerEndpointMerger extends AbstractSingleDTOEndpoint<ControllerEntity, ControllerDTO> {
+    public static final Pattern CONTROLLER_URI_PATTERN = Pattern.compile("/nifi-api/site-to-site");
+    private PortEntityMerger portMerger = new PortEntityMerger();
+
+    @Override
+    protected Class<ControllerEntity> getEntityClass() {
+        return ControllerEntity.class;
+    }
+
+    @Override
+    protected ControllerDTO getDto(ControllerEntity entity) {
+        return entity.getController();
+    }
+
+    @Override
+    protected void mergeResponses(ControllerDTO clientDto, Map<NodeIdentifier, ControllerDTO> dtoMap, Set<NodeResponse> successfulResponses, Set<NodeResponse> problematicResponses) {
+        ControllerDTO mergedController = clientDto;
+        final Map<String, Map<NodeIdentifier, PortDTO>> inputPortMap = new HashMap<>(); // map of port id to map of node id to port dto
+        final Map<String, Map<NodeIdentifier, PortDTO>> outputPortMap = new HashMap<>(); // map of port id to map of node id to port dto
+
+        for (final Map.Entry<NodeIdentifier, ControllerDTO> entry : dtoMap.entrySet()) {
+            final NodeIdentifier nodeId = entry.getKey();
+            final ControllerDTO nodeController = entry.getValue();
+
+            // gather all input and output ports for merging, including the ports from clientDto
+            nodeController.getInputPorts().stream().forEach(inputPort -> inputPortMap.computeIfAbsent(inputPort.getId(), nodeIdToInputPort -> new HashMap<>()).put(nodeId, inputPort));
+            nodeController.getOutputPorts().stream().forEach(outputPort -> outputPortMap.computeIfAbsent(outputPort.getId(), nodeIdToOutputPort -> new HashMap<>()).put(nodeId, outputPort));
+        }
+
+        /*
+         * Note on port merging: only merge the ports if they exist in the client response and all node responses.  Due to authorization possibly different per node, only ports that have been
+         * returned from every node need to be merged.  If a node doesn't return a port DTO due to authorization issues, the responses for that port ID should be dropped from the client response.
+         */
+
+        // merge input ports
+        for (Map<NodeIdentifier, PortDTO> inputPortByNodeId : inputPortMap.values()) {
+            final Collection<PortDTO> nodeInputPorts = inputPortByNodeId.values();
+            if (!nodeInputPorts.isEmpty()) {
+                final PortDTO inputPort = nodeInputPorts.iterator().next();
+                final PortDTO clientInputPort = clientDto.getInputPorts().stream().filter(p -> p.getId().equals(inputPort.getId())).findFirst().orElse(null);
+                if (clientInputPort != null) {
+                    PortEntityMerger.mergeDtos(clientInputPort, inputPortByNodeId);
+                }
+            }
+        }
+
+        // merge output ports
+        for (Map<NodeIdentifier, PortDTO> outputPortByNodeId : outputPortMap.values()) {
+            final Collection<PortDTO> nodeOutputPorts = outputPortByNodeId.values();
+            if (!nodeOutputPorts.isEmpty()) {
+                final PortDTO outputPort = nodeOutputPorts.iterator().next();
+                final PortDTO clientOutputPort = clientDto.getInputPorts().stream().filter(p -> p.getId().equals(outputPort.getId())).findFirst().orElse(null);
+                if (clientOutputPort != null) {
+                    PortEntityMerger.mergeDtos(clientOutputPort, outputPortByNodeId);
+                }
+            }
+        }
+
+        // get intersection of input and output ports
+        final Set<PortDTO> clientInputPorts = Sets.newHashSet(clientDto.getInputPorts());
+        final Set<PortDTO> clientOutputPorts = Sets.newHashSet(clientDto.getOutputPorts());
+        dtoMap.values().forEach(controller -> {
+            clientInputPorts.retainAll(controller.getInputPorts());
+            clientOutputPorts.retainAll(controller.getOutputPorts());
+        });
+
+        clientDto.setInputPorts(clientInputPorts);
+        clientDto.setInputPortCount(clientInputPorts.size());
+        clientDto.setOutputPorts(clientOutputPorts);
+        clientDto.setOutputPortCount(clientOutputPorts.size());
+    }
+
+    @Override
+    public boolean canHandle(URI uri, String method) {
+        return "GET".equalsIgnoreCase(method) && CONTROLLER_URI_PATTERN.matcher(uri.getPath()).matches();
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/97e2f406/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/PortEntityMerger.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/PortEntityMerger.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/PortEntityMerger.java
index cd73084..2929741 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/PortEntityMerger.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/PortEntityMerger.java
@@ -65,7 +65,7 @@ public class PortEntityMerger implements ComponentEntityMerger<PortEntity>, Comp
                 statusNodeIdentifier.getApiPort());
     }
 
-    private static void mergeDtos(final PortDTO clientDto, final Map<NodeIdentifier, PortDTO> dtoMap) {
+    public static void mergeDtos(final PortDTO clientDto, final Map<NodeIdentifier, PortDTO> dtoMap) {
         // if unauthorized for the client dto, simple return
         if (clientDto == null) {
             return;