You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ma...@apache.org on 2016/10/06 20:19:33 UTC

[2/2] nifi git commit: NIFI-2777: NIFI-2856: - Only performing response merging when the node is the cluster cooridinator even if there is a single response. - Fixing PropertyDescriptor merging to ensure the 'choosen' descriptor is included in map of all

NIFI-2777:
NIFI-2856:
- Only performing response merging when the node is the cluster cooridinator even if there is a single response.
- Fixing PropertyDescriptor merging to ensure the 'choosen' descriptor is included in map of all responses.

This closes #1095.


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/bb6c5d9d
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/bb6c5d9d
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/bb6c5d9d

Branch: refs/heads/master
Commit: bb6c5d9d4ea02433ee8cb63cd142327c02f6e9da
Parents: 9304df4
Author: Matt Gilman <ma...@gmail.com>
Authored: Tue Oct 4 14:52:18 2016 -0400
Committer: Mark Payne <ma...@hotmail.com>
Committed: Thu Oct 6 16:19:19 2016 -0400

----------------------------------------------------------------------
 .../coordination/http/HttpResponseMapper.java   |  65 ++++++
 .../coordination/http/HttpResponseMerger.java   |  65 ------
 .../http/StandardHttpResponseMapper.java        | 230 +++++++++++++++++++
 .../http/StandardHttpResponseMerger.java        | 229 ------------------
 .../StandardAsyncClusterResponse.java           |  22 +-
 .../ThreadPoolRequestReplicator.java            |  26 +--
 .../node/NodeClusterCoordinator.java            |  37 +--
 .../manager/PropertyDescriptorDtoMerger.java    |   2 +-
 .../http/StandardHttpResponseMapperSpec.groovy  | 217 +++++++++++++++++
 .../http/StandardHttpResponseMergerSpec.groovy  | 218 ------------------
 10 files changed, 557 insertions(+), 554 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/bb6c5d9d/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/HttpResponseMapper.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/HttpResponseMapper.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/HttpResponseMapper.java
new file mode 100644
index 0000000..659f5e1
--- /dev/null
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/HttpResponseMapper.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.cluster.coordination.http;
+
+import org.apache.nifi.cluster.manager.NodeResponse;
+
+import java.net.URI;
+import java.util.Set;
+
+/**
+ * <p>
+ * An HttpResponseMapper is responsible for taking the responses from all nodes in a cluster
+ * and distilling them down to a single response that would be appropriate to respond with, to the
+ * user/client who made the original web requests.
+ * </p>
+ */
+public interface HttpResponseMapper {
+
+    /**
+     * Maps the responses from all nodes in the cluster to a single NodeResponse object that
+     * is appropriate to respond with
+     *
+     * @param uri the URI of the web request that was made
+     * @param httpMethod the HTTP Method that was used when making the request
+     * @param nodeResponses the responses received from the individual nodes
+     *
+     * @return a single NodeResponse that represents the response that should be returned to the user/client
+     */
+    NodeResponse mapResponses(URI uri, String httpMethod, Set<NodeResponse> nodeResponses, boolean merge);
+
+    /**
+     * Returns a subset (or equal set) of the given Node Responses, such that all of those returned are the responses
+     * that indicate that the node was unable to fulfill the request
+     *
+     * @param allResponses the responses to filter
+     *
+     * @return a subset (or equal set) of the given Node Responses, such that all of those returned are the responses
+     *         that indicate that the node was unable to fulfill the request
+     */
+    Set<NodeResponse> getProblematicNodeResponses(Set<NodeResponse> allResponses);
+
+    /**
+     * Indicates whether or not the responses from nodes for the given URI & HTTP method must be interpreted in order to merge them
+     *
+     * @param uri the URI of the request
+     * @param httpMethod the HTTP Method of the request
+     * @return <code>true</code> if the response must be interpreted, <code>false</code> otherwise
+     */
+    boolean isResponseInterpreted(URI uri, String httpMethod);
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/bb6c5d9d/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/HttpResponseMerger.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/HttpResponseMerger.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/HttpResponseMerger.java
deleted file mode 100644
index 6102b74..0000000
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/HttpResponseMerger.java
+++ /dev/null
@@ -1,65 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.nifi.cluster.coordination.http;
-
-import java.net.URI;
-import java.util.Set;
-
-import org.apache.nifi.cluster.manager.NodeResponse;
-
-/**
- * <p>
- * An HttpResponseMapper is responsible for taking the responses from all nodes in a cluster
- * and distilling them down to a single response that would be appropriate to respond with, to the
- * user/client who made the original web requests.
- * </p>
- */
-public interface HttpResponseMerger {
-
-    /**
-     * Maps the responses from all nodes in the cluster to a single NodeResponse object that
-     * is appropriate to respond with
-     *
-     * @param uri the URI of the web request that was made
-     * @param httpMethod the HTTP Method that was used when making the request
-     * @param nodeResponses the responses received from the individual nodes
-     *
-     * @return a single NodeResponse that represents the response that should be returned to the user/client
-     */
-    NodeResponse mergeResponses(URI uri, String httpMethod, Set<NodeResponse> nodeResponses);
-
-    /**
-     * Returns a subset (or equal set) of the given Node Responses, such that all of those returned are the responses
-     * that indicate that the node was unable to fulfill the request
-     *
-     * @param allResponses the responses to filter
-     *
-     * @return a subset (or equal set) of the given Node Responses, such that all of those returned are the responses
-     *         that indicate that the node was unable to fulfill the request
-     */
-    Set<NodeResponse> getProblematicNodeResponses(Set<NodeResponse> allResponses);
-
-    /**
-     * Indicates whether or not the responses from nodes for the given URI & HTTP method must be interpreted in order to merge them
-     *
-     * @param uri the URI of the request
-     * @param httpMethod the HTTP Method of the request
-     * @return <code>true</code> if the response must be interpreted, <code>false</code> otherwise
-     */
-    boolean isResponseInterpreted(URI uri, String httpMethod);
-}

http://git-wip-us.apache.org/repos/asf/nifi/blob/bb6c5d9d/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/StandardHttpResponseMapper.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/StandardHttpResponseMapper.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/StandardHttpResponseMapper.java
new file mode 100644
index 0000000..68719fe
--- /dev/null
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/StandardHttpResponseMapper.java
@@ -0,0 +1,230 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.cluster.coordination.http;
+
+import org.apache.nifi.cluster.coordination.http.endpoints.BulletinBoardEndpointMerger;
+import org.apache.nifi.cluster.coordination.http.endpoints.ComponentStateEndpointMerger;
+import org.apache.nifi.cluster.coordination.http.endpoints.ConnectionEndpointMerger;
+import org.apache.nifi.cluster.coordination.http.endpoints.ConnectionStatusEndpointMerger;
+import org.apache.nifi.cluster.coordination.http.endpoints.ConnectionsEndpointMerger;
+import org.apache.nifi.cluster.coordination.http.endpoints.ControllerBulletinsEndpointMerger;
+import org.apache.nifi.cluster.coordination.http.endpoints.ControllerConfigurationEndpointMerger;
+import org.apache.nifi.cluster.coordination.http.endpoints.ControllerEndpointMerger;
+import org.apache.nifi.cluster.coordination.http.endpoints.ControllerServiceEndpointMerger;
+import org.apache.nifi.cluster.coordination.http.endpoints.ControllerServiceReferenceEndpointMerger;
+import org.apache.nifi.cluster.coordination.http.endpoints.ControllerServicesEndpointMerger;
+import org.apache.nifi.cluster.coordination.http.endpoints.ControllerStatusEndpointMerger;
+import org.apache.nifi.cluster.coordination.http.endpoints.CountersEndpointMerger;
+import org.apache.nifi.cluster.coordination.http.endpoints.CurrentUserEndpointMerger;
+import org.apache.nifi.cluster.coordination.http.endpoints.DropRequestEndpointMerger;
+import org.apache.nifi.cluster.coordination.http.endpoints.FlowConfigurationEndpointMerger;
+import org.apache.nifi.cluster.coordination.http.endpoints.FlowMerger;
+import org.apache.nifi.cluster.coordination.http.endpoints.FlowSnippetEndpointMerger;
+import org.apache.nifi.cluster.coordination.http.endpoints.FunnelEndpointMerger;
+import org.apache.nifi.cluster.coordination.http.endpoints.FunnelsEndpointMerger;
+import org.apache.nifi.cluster.coordination.http.endpoints.GroupStatusEndpointMerger;
+import org.apache.nifi.cluster.coordination.http.endpoints.InputPortsEndpointMerger;
+import org.apache.nifi.cluster.coordination.http.endpoints.LabelEndpointMerger;
+import org.apache.nifi.cluster.coordination.http.endpoints.LabelsEndpointMerger;
+import org.apache.nifi.cluster.coordination.http.endpoints.ListFlowFilesEndpointMerger;
+import org.apache.nifi.cluster.coordination.http.endpoints.OutputPortsEndpointMerger;
+import org.apache.nifi.cluster.coordination.http.endpoints.PortEndpointMerger;
+import org.apache.nifi.cluster.coordination.http.endpoints.PortStatusEndpointMerger;
+import org.apache.nifi.cluster.coordination.http.endpoints.ProcessGroupEndpointMerger;
+import org.apache.nifi.cluster.coordination.http.endpoints.ProcessGroupsEndpointMerger;
+import org.apache.nifi.cluster.coordination.http.endpoints.ProcessorEndpointMerger;
+import org.apache.nifi.cluster.coordination.http.endpoints.ProcessorStatusEndpointMerger;
+import org.apache.nifi.cluster.coordination.http.endpoints.ProcessorsEndpointMerger;
+import org.apache.nifi.cluster.coordination.http.endpoints.ProvenanceEventEndpointMerger;
+import org.apache.nifi.cluster.coordination.http.endpoints.ProvenanceQueryEndpointMerger;
+import org.apache.nifi.cluster.coordination.http.endpoints.RemoteProcessGroupEndpointMerger;
+import org.apache.nifi.cluster.coordination.http.endpoints.RemoteProcessGroupStatusEndpointMerger;
+import org.apache.nifi.cluster.coordination.http.endpoints.RemoteProcessGroupsEndpointMerger;
+import org.apache.nifi.cluster.coordination.http.endpoints.ReportingTaskEndpointMerger;
+import org.apache.nifi.cluster.coordination.http.endpoints.ReportingTasksEndpointMerger;
+import org.apache.nifi.cluster.coordination.http.endpoints.StatusHistoryEndpointMerger;
+import org.apache.nifi.cluster.coordination.http.endpoints.SystemDiagnosticsEndpointMerger;
+import org.apache.nifi.cluster.coordination.http.endpoints.TemplatesEndpointMerger;
+import org.apache.nifi.cluster.coordination.http.replication.RequestReplicator;
+import org.apache.nifi.cluster.manager.NodeResponse;
+import org.apache.nifi.stream.io.NullOutputStream;
+import org.apache.nifi.util.FormatUtils;
+import org.apache.nifi.util.NiFiProperties;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.ws.rs.core.StreamingOutput;
+import java.io.IOException;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+public class StandardHttpResponseMapper implements HttpResponseMapper {
+
+    private Logger logger = LoggerFactory.getLogger(StandardHttpResponseMapper.class);
+
+    private final List<EndpointResponseMerger> endpointMergers = new ArrayList<>();
+
+    public StandardHttpResponseMapper(final NiFiProperties nifiProperties) {
+        final String snapshotFrequency = nifiProperties.getProperty(NiFiProperties.COMPONENT_STATUS_SNAPSHOT_FREQUENCY, NiFiProperties.DEFAULT_COMPONENT_STATUS_SNAPSHOT_FREQUENCY);
+        long snapshotMillis;
+        try {
+            snapshotMillis = FormatUtils.getTimeDuration(snapshotFrequency, TimeUnit.MILLISECONDS);
+        } catch (final Exception e) {
+            snapshotMillis = FormatUtils.getTimeDuration(NiFiProperties.DEFAULT_COMPONENT_STATUS_SNAPSHOT_FREQUENCY, TimeUnit.MILLISECONDS);
+        }
+        endpointMergers.add(new ControllerStatusEndpointMerger());
+        endpointMergers.add(new ControllerBulletinsEndpointMerger());
+        endpointMergers.add(new GroupStatusEndpointMerger());
+        endpointMergers.add(new ProcessorStatusEndpointMerger());
+        endpointMergers.add(new ConnectionStatusEndpointMerger());
+        endpointMergers.add(new PortStatusEndpointMerger());
+        endpointMergers.add(new RemoteProcessGroupStatusEndpointMerger());
+        endpointMergers.add(new ProcessorEndpointMerger());
+        endpointMergers.add(new ProcessorsEndpointMerger());
+        endpointMergers.add(new ConnectionEndpointMerger());
+        endpointMergers.add(new ConnectionsEndpointMerger());
+        endpointMergers.add(new PortEndpointMerger());
+        endpointMergers.add(new InputPortsEndpointMerger());
+        endpointMergers.add(new OutputPortsEndpointMerger());
+        endpointMergers.add(new RemoteProcessGroupEndpointMerger());
+        endpointMergers.add(new RemoteProcessGroupsEndpointMerger());
+        endpointMergers.add(new ProcessGroupEndpointMerger());
+        endpointMergers.add(new ProcessGroupsEndpointMerger());
+        endpointMergers.add(new FlowSnippetEndpointMerger());
+        endpointMergers.add(new ProvenanceQueryEndpointMerger());
+        endpointMergers.add(new ProvenanceEventEndpointMerger());
+        endpointMergers.add(new ControllerServiceEndpointMerger());
+        endpointMergers.add(new ControllerServicesEndpointMerger());
+        endpointMergers.add(new ControllerServiceReferenceEndpointMerger());
+        endpointMergers.add(new ReportingTaskEndpointMerger());
+        endpointMergers.add(new ReportingTasksEndpointMerger());
+        endpointMergers.add(new DropRequestEndpointMerger());
+        endpointMergers.add(new ListFlowFilesEndpointMerger());
+        endpointMergers.add(new ComponentStateEndpointMerger());
+        endpointMergers.add(new BulletinBoardEndpointMerger());
+        endpointMergers.add(new StatusHistoryEndpointMerger(snapshotMillis));
+        endpointMergers.add(new SystemDiagnosticsEndpointMerger());
+        endpointMergers.add(new CountersEndpointMerger());
+        endpointMergers.add(new FlowMerger());
+        endpointMergers.add(new ControllerConfigurationEndpointMerger());
+        endpointMergers.add(new CurrentUserEndpointMerger());
+        endpointMergers.add(new FlowConfigurationEndpointMerger());
+        endpointMergers.add(new TemplatesEndpointMerger());
+        endpointMergers.add(new LabelEndpointMerger());
+        endpointMergers.add(new LabelsEndpointMerger());
+        endpointMergers.add(new FunnelEndpointMerger());
+        endpointMergers.add(new FunnelsEndpointMerger());
+        endpointMergers.add(new ControllerEndpointMerger());
+    }
+
+    @Override
+    public NodeResponse mapResponses(final URI uri, final String httpMethod, final Set<NodeResponse> nodeResponses, final boolean merge) {
+        final boolean hasSuccess = hasSuccessfulResponse(nodeResponses);
+        if (!hasSuccess) {
+            // If we have a response that is a 3xx, 4xx, or 5xx, then we want to choose that.
+            // Otherwise, it doesn't matter which one we choose. We do this because if we replicate
+            // a mutable request, it's possible that one node will respond with a 409, for instance, while
+            // others respond with a 150-Continue. We do not want to pick the 150-Continue; instead, we want
+            // the failed response.
+            final NodeResponse clientResponse = nodeResponses.stream().filter(p -> p.getStatus() > 299).findAny().orElse(nodeResponses.iterator().next());
+
+            // Drain the response from all nodes except for the 'chosen one'. This ensures that we don't
+            // leave data lingering on the socket and ensures that we don't consume the content of the response
+            // that we intend to respond with
+            drainResponses(nodeResponses, clientResponse);
+            return clientResponse;
+        }
+
+        // Determine which responses are successful
+        final Set<NodeResponse> successResponses = nodeResponses.stream().filter(p -> p.is2xx()).collect(Collectors.toSet());
+        final Set<NodeResponse> problematicResponses = nodeResponses.stream().filter(p -> !p.is2xx()).collect(Collectors.toSet());
+
+        final NodeResponse clientResponse;
+        if ("GET".equalsIgnoreCase(httpMethod) && problematicResponses.size() > 0) {
+            // If there are problematic responses, at least one of the nodes couldn't complete the request
+            clientResponse = problematicResponses.stream().filter(p -> p.getStatus() >= 400 && p.getStatus() < 500).findFirst().orElse(
+                    problematicResponses.stream().filter(p -> p.getStatus() > 500).findFirst().orElse(problematicResponses.iterator().next()));
+            return clientResponse;
+        } else {
+            // Choose any of the successful responses to be the 'chosen one'.
+            clientResponse = successResponses.iterator().next();
+        }
+
+        if (merge == false) {
+            return clientResponse;
+        }
+
+        EndpointResponseMerger merger = getEndpointResponseMerger(uri, httpMethod);
+        if (merger == null) {
+            return clientResponse;
+        }
+
+        final NodeResponse response = merger.merge(uri, httpMethod, successResponses, problematicResponses, clientResponse);
+        return response;
+    }
+
+    @Override
+    public Set<NodeResponse> getProblematicNodeResponses(final Set<NodeResponse> allResponses) {
+        // Check if there are any 2xx responses
+        final boolean containsSuccessfulResponse = hasSuccessfulResponse(allResponses);
+
+        if (containsSuccessfulResponse) {
+            // If there is a 2xx response, we consider a response to be problematic if it is not 2xx
+            return allResponses.stream().filter(p -> !p.is2xx()).collect(Collectors.toSet());
+        } else {
+            // If no node is successful, we consider a problematic response to be only those that are 5xx
+            return allResponses.stream().filter(p -> p.is5xx()).collect(Collectors.toSet());
+        }
+    }
+
+    @Override
+    public boolean isResponseInterpreted(final URI uri, final String httpMethod) {
+        return getEndpointResponseMerger(uri, httpMethod) != null;
+    }
+
+    private EndpointResponseMerger getEndpointResponseMerger(final URI uri, final String httpMethod) {
+        return endpointMergers.stream().filter(p -> p.canHandle(uri, httpMethod)).findFirst().orElse(null);
+    }
+
+    private boolean hasSuccessfulResponse(final Set<NodeResponse> allResponses) {
+        return allResponses.stream().anyMatch(p -> p.is2xx());
+    }
+
+    private void drainResponses(final Set<NodeResponse> responses, final NodeResponse exclude) {
+        responses.stream()
+                .parallel() // "parallelize" the draining of the responses, since we have multiple streams to consume
+                .filter(response -> response != exclude) // don't include the explicitly excluded node
+                .filter(response -> response.getStatus() != RequestReplicator.NODE_CONTINUE_STATUS_CODE) // don't include any 150-NodeContinue responses because they contain no content
+                .forEach(response -> drainResponse(response)); // drain all node responses that didn't get filtered out
+    }
+
+    private void drainResponse(final NodeResponse response) {
+        if (response.hasThrowable()) {
+            return;
+        }
+
+        try {
+            ((StreamingOutput) response.getResponse().getEntity()).write(new NullOutputStream());
+        } catch (final IOException ioe) {
+            logger.info("Failed clearing out non-client response buffer from " + response.getNodeId() + " due to: " + ioe, ioe);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/bb6c5d9d/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/StandardHttpResponseMerger.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/StandardHttpResponseMerger.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/StandardHttpResponseMerger.java
deleted file mode 100644
index 512a0b3..0000000
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/StandardHttpResponseMerger.java
+++ /dev/null
@@ -1,229 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.cluster.coordination.http;
-
-import org.apache.nifi.cluster.coordination.http.endpoints.BulletinBoardEndpointMerger;
-import org.apache.nifi.cluster.coordination.http.endpoints.ComponentStateEndpointMerger;
-import org.apache.nifi.cluster.coordination.http.endpoints.ConnectionEndpointMerger;
-import org.apache.nifi.cluster.coordination.http.endpoints.ConnectionStatusEndpointMerger;
-import org.apache.nifi.cluster.coordination.http.endpoints.ConnectionsEndpointMerger;
-import org.apache.nifi.cluster.coordination.http.endpoints.ControllerBulletinsEndpointMerger;
-import org.apache.nifi.cluster.coordination.http.endpoints.ControllerConfigurationEndpointMerger;
-import org.apache.nifi.cluster.coordination.http.endpoints.ControllerEndpointMerger;
-import org.apache.nifi.cluster.coordination.http.endpoints.ControllerServiceEndpointMerger;
-import org.apache.nifi.cluster.coordination.http.endpoints.ControllerServiceReferenceEndpointMerger;
-import org.apache.nifi.cluster.coordination.http.endpoints.ControllerServicesEndpointMerger;
-import org.apache.nifi.cluster.coordination.http.endpoints.ControllerStatusEndpointMerger;
-import org.apache.nifi.cluster.coordination.http.endpoints.CountersEndpointMerger;
-import org.apache.nifi.cluster.coordination.http.endpoints.CurrentUserEndpointMerger;
-import org.apache.nifi.cluster.coordination.http.endpoints.DropRequestEndpointMerger;
-import org.apache.nifi.cluster.coordination.http.endpoints.FlowConfigurationEndpointMerger;
-import org.apache.nifi.cluster.coordination.http.endpoints.FlowMerger;
-import org.apache.nifi.cluster.coordination.http.endpoints.FlowSnippetEndpointMerger;
-import org.apache.nifi.cluster.coordination.http.endpoints.FunnelEndpointMerger;
-import org.apache.nifi.cluster.coordination.http.endpoints.FunnelsEndpointMerger;
-import org.apache.nifi.cluster.coordination.http.endpoints.GroupStatusEndpointMerger;
-import org.apache.nifi.cluster.coordination.http.endpoints.InputPortsEndpointMerger;
-import org.apache.nifi.cluster.coordination.http.endpoints.LabelEndpointMerger;
-import org.apache.nifi.cluster.coordination.http.endpoints.LabelsEndpointMerger;
-import org.apache.nifi.cluster.coordination.http.endpoints.ListFlowFilesEndpointMerger;
-import org.apache.nifi.cluster.coordination.http.endpoints.OutputPortsEndpointMerger;
-import org.apache.nifi.cluster.coordination.http.endpoints.PortEndpointMerger;
-import org.apache.nifi.cluster.coordination.http.endpoints.PortStatusEndpointMerger;
-import org.apache.nifi.cluster.coordination.http.endpoints.ProcessGroupEndpointMerger;
-import org.apache.nifi.cluster.coordination.http.endpoints.ProcessGroupsEndpointMerger;
-import org.apache.nifi.cluster.coordination.http.endpoints.ProcessorEndpointMerger;
-import org.apache.nifi.cluster.coordination.http.endpoints.ProcessorStatusEndpointMerger;
-import org.apache.nifi.cluster.coordination.http.endpoints.ProcessorsEndpointMerger;
-import org.apache.nifi.cluster.coordination.http.endpoints.ProvenanceEventEndpointMerger;
-import org.apache.nifi.cluster.coordination.http.endpoints.ProvenanceQueryEndpointMerger;
-import org.apache.nifi.cluster.coordination.http.endpoints.RemoteProcessGroupEndpointMerger;
-import org.apache.nifi.cluster.coordination.http.endpoints.RemoteProcessGroupStatusEndpointMerger;
-import org.apache.nifi.cluster.coordination.http.endpoints.RemoteProcessGroupsEndpointMerger;
-import org.apache.nifi.cluster.coordination.http.endpoints.ReportingTaskEndpointMerger;
-import org.apache.nifi.cluster.coordination.http.endpoints.ReportingTasksEndpointMerger;
-import org.apache.nifi.cluster.coordination.http.endpoints.StatusHistoryEndpointMerger;
-import org.apache.nifi.cluster.coordination.http.endpoints.SystemDiagnosticsEndpointMerger;
-import org.apache.nifi.cluster.coordination.http.endpoints.TemplatesEndpointMerger;
-import org.apache.nifi.cluster.coordination.http.replication.RequestReplicator;
-import org.apache.nifi.cluster.manager.NodeResponse;
-import org.apache.nifi.stream.io.NullOutputStream;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import javax.ws.rs.core.StreamingOutput;
-import java.io.IOException;
-import java.net.URI;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Set;
-import java.util.concurrent.TimeUnit;
-import java.util.stream.Collectors;
-import org.apache.nifi.util.FormatUtils;
-import org.apache.nifi.util.NiFiProperties;
-
-public class StandardHttpResponseMerger implements HttpResponseMerger {
-
-    private Logger logger = LoggerFactory.getLogger(StandardHttpResponseMerger.class);
-
-    private final List<EndpointResponseMerger> endpointMergers = new ArrayList<>();
-
-    public StandardHttpResponseMerger(final NiFiProperties nifiProperties) {
-        final String snapshotFrequency = nifiProperties.getProperty(NiFiProperties.COMPONENT_STATUS_SNAPSHOT_FREQUENCY, NiFiProperties.DEFAULT_COMPONENT_STATUS_SNAPSHOT_FREQUENCY);
-        long snapshotMillis;
-        try {
-            snapshotMillis = FormatUtils.getTimeDuration(snapshotFrequency, TimeUnit.MILLISECONDS);
-        } catch (final Exception e) {
-            snapshotMillis = FormatUtils.getTimeDuration(NiFiProperties.DEFAULT_COMPONENT_STATUS_SNAPSHOT_FREQUENCY, TimeUnit.MILLISECONDS);
-        }
-        endpointMergers.add(new ControllerStatusEndpointMerger());
-        endpointMergers.add(new ControllerBulletinsEndpointMerger());
-        endpointMergers.add(new GroupStatusEndpointMerger());
-        endpointMergers.add(new ProcessorStatusEndpointMerger());
-        endpointMergers.add(new ConnectionStatusEndpointMerger());
-        endpointMergers.add(new PortStatusEndpointMerger());
-        endpointMergers.add(new RemoteProcessGroupStatusEndpointMerger());
-        endpointMergers.add(new ProcessorEndpointMerger());
-        endpointMergers.add(new ProcessorsEndpointMerger());
-        endpointMergers.add(new ConnectionEndpointMerger());
-        endpointMergers.add(new ConnectionsEndpointMerger());
-        endpointMergers.add(new PortEndpointMerger());
-        endpointMergers.add(new InputPortsEndpointMerger());
-        endpointMergers.add(new OutputPortsEndpointMerger());
-        endpointMergers.add(new RemoteProcessGroupEndpointMerger());
-        endpointMergers.add(new RemoteProcessGroupsEndpointMerger());
-        endpointMergers.add(new ProcessGroupEndpointMerger());
-        endpointMergers.add(new ProcessGroupsEndpointMerger());
-        endpointMergers.add(new FlowSnippetEndpointMerger());
-        endpointMergers.add(new ProvenanceQueryEndpointMerger());
-        endpointMergers.add(new ProvenanceEventEndpointMerger());
-        endpointMergers.add(new ControllerServiceEndpointMerger());
-        endpointMergers.add(new ControllerServicesEndpointMerger());
-        endpointMergers.add(new ControllerServiceReferenceEndpointMerger());
-        endpointMergers.add(new ReportingTaskEndpointMerger());
-        endpointMergers.add(new ReportingTasksEndpointMerger());
-        endpointMergers.add(new DropRequestEndpointMerger());
-        endpointMergers.add(new ListFlowFilesEndpointMerger());
-        endpointMergers.add(new ComponentStateEndpointMerger());
-        endpointMergers.add(new BulletinBoardEndpointMerger());
-        endpointMergers.add(new StatusHistoryEndpointMerger(snapshotMillis));
-        endpointMergers.add(new SystemDiagnosticsEndpointMerger());
-        endpointMergers.add(new CountersEndpointMerger());
-        endpointMergers.add(new FlowMerger());
-        endpointMergers.add(new ControllerConfigurationEndpointMerger());
-        endpointMergers.add(new CurrentUserEndpointMerger());
-        endpointMergers.add(new FlowConfigurationEndpointMerger());
-        endpointMergers.add(new TemplatesEndpointMerger());
-        endpointMergers.add(new LabelEndpointMerger());
-        endpointMergers.add(new LabelsEndpointMerger());
-        endpointMergers.add(new FunnelEndpointMerger());
-        endpointMergers.add(new FunnelsEndpointMerger());
-        endpointMergers.add(new ControllerEndpointMerger());
-    }
-
-    @Override
-    public NodeResponse mergeResponses(final URI uri, final String httpMethod, final Set<NodeResponse> nodeResponses) {
-        if (nodeResponses.size() == 1) {
-            return nodeResponses.iterator().next();
-        }
-
-        final boolean hasSuccess = hasSuccessfulResponse(nodeResponses);
-        if (!hasSuccess) {
-            // If we have a response that is a 3xx, 4xx, or 5xx, then we want to choose that.
-            // Otherwise, it doesn't matter which one we choose. We do this because if we replicate
-            // a mutable request, it's possible that one node will respond with a 409, for instance, while
-            // others respond with a 150-Continue. We do not want to pick the 150-Continue; instead, we want
-            // the failed response.
-            final NodeResponse clientResponse = nodeResponses.stream().filter(p -> p.getStatus() > 299).findAny().orElse(nodeResponses.iterator().next());
-
-            // Drain the response from all nodes except for the 'chosen one'. This ensures that we don't
-            // leave data lingering on the socket and ensures that we don't consume the content of the response
-            // that we intend to respond with
-            drainResponses(nodeResponses, clientResponse);
-            return clientResponse;
-        }
-
-        // Determine which responses are successful
-        final Set<NodeResponse> successResponses = nodeResponses.stream().filter(p -> p.is2xx()).collect(Collectors.toSet());
-        final Set<NodeResponse> problematicResponses = nodeResponses.stream().filter(p -> !p.is2xx()).collect(Collectors.toSet());
-
-        final NodeResponse clientResponse;
-        if ("GET".equalsIgnoreCase(httpMethod) && problematicResponses.size() > 0) {
-            // If there are problematic responses, at least one of the nodes couldn't complete the request
-            clientResponse = problematicResponses.stream().filter(p -> p.getStatus() >= 400 && p.getStatus() < 500).findFirst().orElse(
-                    problematicResponses.stream().filter(p -> p.getStatus() > 500).findFirst().orElse(problematicResponses.iterator().next()));
-            return clientResponse;
-        } else {
-            // Choose any of the successful responses to be the 'chosen one'.
-            clientResponse = successResponses.iterator().next();
-        }
-        EndpointResponseMerger merger = getEndpointResponseMerger(uri, httpMethod);
-        if (merger == null) {
-            return clientResponse;
-        }
-
-        final NodeResponse response = merger.merge(uri, httpMethod, successResponses, problematicResponses, clientResponse);
-        return response;
-    }
-
-    @Override
-    public Set<NodeResponse> getProblematicNodeResponses(final Set<NodeResponse> allResponses) {
-        // Check if there are any 2xx responses
-        final boolean containsSuccessfulResponse = hasSuccessfulResponse(allResponses);
-
-        if (containsSuccessfulResponse) {
-            // If there is a 2xx response, we consider a response to be problematic if it is not 2xx
-            return allResponses.stream().filter(p -> !p.is2xx()).collect(Collectors.toSet());
-        } else {
-            // If no node is successful, we consider a problematic response to be only those that are 5xx
-            return allResponses.stream().filter(p -> p.is5xx()).collect(Collectors.toSet());
-        }
-    }
-
-    @Override
-    public boolean isResponseInterpreted(final URI uri, final String httpMethod) {
-        return getEndpointResponseMerger(uri, httpMethod) != null;
-    }
-
-    private EndpointResponseMerger getEndpointResponseMerger(final URI uri, final String httpMethod) {
-        return endpointMergers.stream().filter(p -> p.canHandle(uri, httpMethod)).findFirst().orElse(null);
-    }
-
-    private boolean hasSuccessfulResponse(final Set<NodeResponse> allResponses) {
-        return allResponses.stream().anyMatch(p -> p.is2xx());
-    }
-
-    private void drainResponses(final Set<NodeResponse> responses, final NodeResponse exclude) {
-        responses.stream()
-                .parallel() // "parallelize" the draining of the responses, since we have multiple streams to consume
-                .filter(response -> response != exclude) // don't include the explicitly excluded node
-                .filter(response -> response.getStatus() != RequestReplicator.NODE_CONTINUE_STATUS_CODE) // don't include any 150-NodeContinue responses because they contain no content
-                .forEach(response -> drainResponse(response)); // drain all node responses that didn't get filtered out
-    }
-
-    private void drainResponse(final NodeResponse response) {
-        if (response.hasThrowable()) {
-            return;
-        }
-
-        try {
-            ((StreamingOutput) response.getResponse().getEntity()).write(new NullOutputStream());
-        } catch (final IOException ioe) {
-            logger.info("Failed clearing out non-client response buffer from " + response.getNodeId() + " due to: " + ioe, ioe);
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/nifi/blob/bb6c5d9d/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/StandardAsyncClusterResponse.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/StandardAsyncClusterResponse.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/StandardAsyncClusterResponse.java
index 3bcc8e7..926151e 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/StandardAsyncClusterResponse.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/StandardAsyncClusterResponse.java
@@ -17,6 +17,12 @@
 
 package org.apache.nifi.cluster.coordination.http.replication;
 
+import org.apache.nifi.cluster.coordination.http.HttpResponseMapper;
+import org.apache.nifi.cluster.manager.NodeResponse;
+import org.apache.nifi.cluster.protocol.NodeIdentifier;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import java.net.URI;
 import java.util.Collections;
 import java.util.HashMap;
@@ -27,12 +33,6 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.stream.Collectors;
 
-import org.apache.nifi.cluster.coordination.http.HttpResponseMerger;
-import org.apache.nifi.cluster.manager.NodeResponse;
-import org.apache.nifi.cluster.protocol.NodeIdentifier;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
 public class StandardAsyncClusterResponse implements AsyncClusterResponse {
     private static final Logger logger = LoggerFactory.getLogger(StandardAsyncClusterResponse.class);
 
@@ -40,10 +40,11 @@ public class StandardAsyncClusterResponse implements AsyncClusterResponse {
     private final Set<NodeIdentifier> nodeIds;
     private final URI uri;
     private final String method;
-    private final HttpResponseMerger responseMerger;
+    private final HttpResponseMapper responseMapper;
     private final CompletionCallback completionCallback;
     private final Runnable completedResultFetchedCallback;
     private final long creationTimeNanos;
+    private final boolean merge;
 
     private final Map<NodeIdentifier, ResponseHolder> responseMap = new HashMap<>();
     private final AtomicInteger requestsCompleted = new AtomicInteger(0);
@@ -52,18 +53,19 @@ public class StandardAsyncClusterResponse implements AsyncClusterResponse {
     private RuntimeException failure; // guarded by synchronizing on this
 
     public StandardAsyncClusterResponse(final String id, final URI uri, final String method, final Set<NodeIdentifier> nodeIds,
-        final HttpResponseMerger responseMerger, final CompletionCallback completionCallback, final Runnable completedResultFetchedCallback) {
+                                        final HttpResponseMapper responseMapper, final CompletionCallback completionCallback, final Runnable completedResultFetchedCallback, final boolean merge) {
         this.id = id;
         this.nodeIds = Collections.unmodifiableSet(new HashSet<>(nodeIds));
         this.uri = uri;
         this.method = method;
+        this.merge = merge;
 
         creationTimeNanos = System.nanoTime();
         for (final NodeIdentifier nodeId : nodeIds) {
             responseMap.put(nodeId, new ResponseHolder(creationTimeNanos));
         }
 
-        this.responseMerger = responseMerger;
+        this.responseMapper = responseMapper;
         this.completionCallback = completionCallback;
         this.completedResultFetchedCallback = completedResultFetchedCallback;
     }
@@ -142,7 +144,7 @@ public class StandardAsyncClusterResponse implements AsyncClusterResponse {
             .map(p -> p.getResponse())
             .filter(response -> response != null)
             .collect(Collectors.toSet());
-        mergedResponse = responseMerger.mergeResponses(uri, method, nodeResponses);
+        mergedResponse = responseMapper.mapResponses(uri, method, nodeResponses, merge);
 
         logger.debug("Notifying all that merged response is complete for {}", id);
         this.notifyAll();

http://git-wip-us.apache.org/repos/asf/nifi/blob/bb6c5d9d/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/ThreadPoolRequestReplicator.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/ThreadPoolRequestReplicator.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/ThreadPoolRequestReplicator.java
index c1ee77b..258588d 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/ThreadPoolRequestReplicator.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/ThreadPoolRequestReplicator.java
@@ -27,8 +27,8 @@ import org.apache.nifi.authorization.AccessDeniedException;
 import org.apache.nifi.authorization.user.NiFiUser;
 import org.apache.nifi.authorization.user.NiFiUserUtils;
 import org.apache.nifi.cluster.coordination.ClusterCoordinator;
-import org.apache.nifi.cluster.coordination.http.HttpResponseMerger;
-import org.apache.nifi.cluster.coordination.http.StandardHttpResponseMerger;
+import org.apache.nifi.cluster.coordination.http.HttpResponseMapper;
+import org.apache.nifi.cluster.coordination.http.StandardHttpResponseMapper;
 import org.apache.nifi.cluster.coordination.node.NodeConnectionState;
 import org.apache.nifi.cluster.coordination.node.NodeConnectionStatus;
 import org.apache.nifi.cluster.manager.NodeResponse;
@@ -85,7 +85,7 @@ public class ThreadPoolRequestReplicator implements RequestReplicator {
     private final Client client; // the client to use for issuing requests
     private final int connectionTimeoutMs; // connection timeout per node request
     private final int readTimeoutMs; // read timeout per node request
-    private final HttpResponseMerger responseMerger;
+    private final HttpResponseMapper responseMapper;
     private final EventReporter eventReporter;
     private final RequestCompletionCallback callback;
     private final ClusterCoordinator clusterCoordinator;
@@ -140,7 +140,7 @@ public class ThreadPoolRequestReplicator implements RequestReplicator {
         this.clusterCoordinator = clusterCoordinator;
         this.connectionTimeoutMs = (int) FormatUtils.getTimeDuration(connectionTimeout, TimeUnit.MILLISECONDS);
         this.readTimeoutMs = (int) FormatUtils.getTimeDuration(readTimeout, TimeUnit.MILLISECONDS);
-        this.responseMerger = new StandardHttpResponseMerger(nifiProperties);
+        this.responseMapper = new StandardHttpResponseMapper(nifiProperties);
         this.eventReporter = eventReporter;
         this.callback = callback;
 
@@ -249,12 +249,12 @@ public class ThreadPoolRequestReplicator implements RequestReplicator {
             lock.lock();
             try {
                 logger.debug("Lock {} obtained in order to replicate request {} {}", method, uri);
-                return replicate(nodeIds, method, uri, entity, updatedHeaders, performVerification, null, !performVerification);
+                return replicate(nodeIds, method, uri, entity, updatedHeaders, performVerification, null, !performVerification, true);
             } finally {
                 lock.unlock();
             }
         } else {
-            return replicate(nodeIds, method, uri, entity, updatedHeaders, performVerification, null, !performVerification);
+            return replicate(nodeIds, method, uri, entity, updatedHeaders, performVerification, null, !performVerification, true);
         }
     }
 
@@ -269,7 +269,7 @@ public class ThreadPoolRequestReplicator implements RequestReplicator {
             updatedHeaders.put(ProxiedEntitiesUtils.PROXY_ENTITIES_CHAIN, proxiedEntitiesChain);
         }
 
-        return replicate(Collections.singleton(coordinatorNodeId), method, uri, entity, updatedHeaders, false, null, false);
+        return replicate(Collections.singleton(coordinatorNodeId), method, uri, entity, updatedHeaders, false, null, false, false);
     }
 
     /**
@@ -286,7 +286,7 @@ public class ThreadPoolRequestReplicator implements RequestReplicator {
      * @return an AsyncClusterResponse that can be used to obtain the result
      */
     private AsyncClusterResponse replicate(Set<NodeIdentifier> nodeIds, String method, URI uri, Object entity, Map<String, String> headers, boolean performVerification,
-                                           StandardAsyncClusterResponse response, boolean executionPhase) {
+                                           StandardAsyncClusterResponse response, boolean executionPhase, boolean merge) {
 
         // state validation
         Objects.requireNonNull(nodeIds);
@@ -344,7 +344,7 @@ public class ThreadPoolRequestReplicator implements RequestReplicator {
         // create a response object if one was not already passed to us
         if (response == null) {
             response = new StandardAsyncClusterResponse(requestId, uri, method, nodeIds,
-                    responseMerger, completionCallback, responseConsumedCallback);
+                    responseMapper, completionCallback, responseConsumedCallback, merge);
             responseMap.put(requestId, response);
         }
 
@@ -358,7 +358,7 @@ public class ThreadPoolRequestReplicator implements RequestReplicator {
         final boolean mutableRequest = isMutableRequest(method, uri.getPath());
         if (mutableRequest && performVerification) {
             logger.debug("Performing verification (first phase of two-phase commit) for Request ID {}", requestId);
-            performVerification(nodeIds, method, uri, entity, updatedHeaders, response);
+            performVerification(nodeIds, method, uri, entity, updatedHeaders, response, merge);
             return response;
         }
 
@@ -383,7 +383,7 @@ public class ThreadPoolRequestReplicator implements RequestReplicator {
     }
 
 
-    private void performVerification(Set<NodeIdentifier> nodeIds, String method, URI uri, Object entity, Map<String, String> headers, StandardAsyncClusterResponse clusterResponse) {
+    private void performVerification(Set<NodeIdentifier> nodeIds, String method, URI uri, Object entity, Map<String, String> headers, StandardAsyncClusterResponse clusterResponse, boolean merge) {
         logger.debug("Verifying that mutable request {} {} can be made", method, uri.getPath());
 
         final Map<String, String> validationHeaders = new HashMap<>(headers);
@@ -418,7 +418,7 @@ public class ThreadPoolRequestReplicator implements RequestReplicator {
                         // to all nodes and we are finished.
                         if (dissentingCount == 0) {
                             logger.debug("Received verification from all {} nodes that mutable request {} {} can be made", numNodes, method, uri.getPath());
-                            replicate(nodeIds, method, uri, entity, headers, false, clusterResponse, true);
+                            replicate(nodeIds, method, uri, entity, headers, false, clusterResponse, true, merge);
                             return;
                         }
 
@@ -743,7 +743,7 @@ public class ThreadPoolRequestReplicator implements RequestReplicator {
             // create the resource
             WebResource resource = client.resource(uri);
 
-            if (responseMerger.isResponseInterpreted(uri, method)) {
+            if (responseMapper.isResponseInterpreted(uri, method)) {
                 resource.addFilter(new GZIPContentEncodingFilter(false));
             }
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/bb6c5d9d/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/node/NodeClusterCoordinator.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/node/NodeClusterCoordinator.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/node/NodeClusterCoordinator.java
index 4b74e1b..c27f186 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/node/NodeClusterCoordinator.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/node/NodeClusterCoordinator.java
@@ -16,27 +16,12 @@
  */
 package org.apache.nifi.cluster.coordination.node;
 
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.UUID;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.function.Supplier;
-import java.util.regex.Pattern;
-import java.util.stream.Collectors;
 import org.apache.commons.collections4.queue.CircularFifoQueue;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.nifi.cluster.coordination.ClusterCoordinator;
 import org.apache.nifi.cluster.coordination.flow.FlowElection;
-import org.apache.nifi.cluster.coordination.http.HttpResponseMerger;
-import org.apache.nifi.cluster.coordination.http.StandardHttpResponseMerger;
+import org.apache.nifi.cluster.coordination.http.HttpResponseMapper;
+import org.apache.nifi.cluster.coordination.http.StandardHttpResponseMapper;
 import org.apache.nifi.cluster.coordination.http.replication.RequestCompletionCallback;
 import org.apache.nifi.cluster.event.Event;
 import org.apache.nifi.cluster.event.NodeEvent;
@@ -73,6 +58,22 @@ import org.apache.nifi.web.revision.RevisionManager;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.Supplier;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+
 public class NodeClusterCoordinator implements ClusterCoordinator, ProtocolHandler, RequestCompletionCallback {
 
     private static final Logger logger = LoggerFactory.getLogger(NodeClusterCoordinator.class);
@@ -974,7 +975,7 @@ public class NodeClusterCoordinator implements ClusterCoordinator, ProtocolHandl
          * state even if they had problems handling the request.
          */
         if (mutableRequest) {
-            final HttpResponseMerger responseMerger = new StandardHttpResponseMerger(nifiProperties);
+            final HttpResponseMapper responseMerger = new StandardHttpResponseMapper(nifiProperties);
             final Set<NodeResponse> problematicNodeResponses = responseMerger.getProblematicNodeResponses(nodeResponses);
 
             // all nodes failed

http://git-wip-us.apache.org/repos/asf/nifi/blob/bb6c5d9d/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/PropertyDescriptorDtoMerger.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/PropertyDescriptorDtoMerger.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/PropertyDescriptorDtoMerger.java
index e7ab881..3c18ced 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/PropertyDescriptorDtoMerger.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/PropertyDescriptorDtoMerger.java
@@ -33,7 +33,7 @@ public class PropertyDescriptorDtoMerger {
         for (final Map.Entry<NodeIdentifier, PropertyDescriptorDTO> nodeEntry : dtoMap.entrySet()) {
             final PropertyDescriptorDTO nodePropertyDescriptor = nodeEntry.getValue();
             final List<AllowableValueEntity> nodePropertyDescriptorAllowableValues = nodePropertyDescriptor.getAllowableValues();
-            if (clientPropertyDescriptor != nodePropertyDescriptor && nodePropertyDescriptorAllowableValues != null) {
+            if (nodePropertyDescriptorAllowableValues != null) {
                 nodePropertyDescriptorAllowableValues.stream().forEach(allowableValueEntity -> {
                     allowableValueMap.computeIfAbsent(nodePropertyDescriptorAllowableValues.indexOf(allowableValueEntity), propertyDescriptorToAllowableValue -> new ArrayList<>())
                             .add(allowableValueEntity);

http://git-wip-us.apache.org/repos/asf/nifi/blob/bb6c5d9d/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/http/StandardHttpResponseMapperSpec.groovy
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/http/StandardHttpResponseMapperSpec.groovy b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/http/StandardHttpResponseMapperSpec.groovy
new file mode 100644
index 0000000..243fd1a
--- /dev/null
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/http/StandardHttpResponseMapperSpec.groovy
@@ -0,0 +1,217 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.cluster.coordination.http
+
+import com.sun.jersey.api.client.ClientResponse
+import org.apache.nifi.cluster.manager.NodeResponse
+import org.apache.nifi.cluster.protocol.NodeIdentifier
+import org.apache.nifi.util.NiFiProperties
+import org.apache.nifi.web.api.dto.ConnectionDTO
+import org.apache.nifi.web.api.dto.ControllerConfigurationDTO
+import org.apache.nifi.web.api.dto.FunnelDTO
+import org.apache.nifi.web.api.dto.LabelDTO
+import org.apache.nifi.web.api.dto.PermissionsDTO
+import org.apache.nifi.web.api.dto.status.ConnectionStatusDTO
+import org.apache.nifi.web.api.dto.status.ConnectionStatusSnapshotDTO
+import org.apache.nifi.web.api.entity.ConnectionEntity
+import org.apache.nifi.web.api.entity.ConnectionsEntity
+import org.apache.nifi.web.api.entity.ControllerConfigurationEntity
+import org.apache.nifi.web.api.entity.FunnelEntity
+import org.apache.nifi.web.api.entity.FunnelsEntity
+import org.apache.nifi.web.api.entity.LabelEntity
+import org.apache.nifi.web.api.entity.LabelsEntity
+import org.codehaus.jackson.map.ObjectMapper
+import org.codehaus.jackson.map.SerializationConfig
+import org.codehaus.jackson.map.annotate.JsonSerialize
+import org.codehaus.jackson.xc.JaxbAnnotationIntrospector
+import spock.lang.Specification
+import spock.lang.Unroll
+
+@Unroll
+class StandardHttpResponseMapperSpec extends Specification {
+
+    def setup() {
+        def propFile = StandardHttpResponseMapperSpec.class.getResource("/conf/nifi.properties").getFile()
+        System.setProperty NiFiProperties.PROPERTIES_FILE_PATH, propFile
+    }
+
+    def cleanup() {
+        System.clearProperty NiFiProperties.PROPERTIES_FILE_PATH
+    }
+
+    def "MergeResponses: mixed HTTP GET response statuses, expecting #expectedStatus"() {
+        given:
+        def responseMapper = new StandardHttpResponseMapper(NiFiProperties.createBasicNiFiProperties(null,null))
+        def requestUri = new URI('http://server/resource')
+        def requestId = UUID.randomUUID().toString()
+        def Map<ClientResponse, Map<String, Integer>> mockToRequestEntity = [:]
+        def nodeResponseSet = nodeResponseData.collect {
+            int n = it.node
+            def clientResponse = Mock(ClientResponse)
+            mockToRequestEntity.put clientResponse, it
+            new NodeResponse(new NodeIdentifier("cluster-node-$n", 'addr', n, 'sktaddr', n * 10, 'stsaddr', n * 100, n * 1000, false, null), "get", requestUri, clientResponse, 500L, requestId)
+        } as Set
+
+        when:
+        def returnedResponse = responseMapper.mapResponses(requestUri, 'get', nodeResponseSet, true).getStatus()
+
+        then:
+        mockToRequestEntity.entrySet().forEach {
+            ClientResponse mockClientResponse = it.key
+            _ * mockClientResponse.getStatus() >> it.value.status
+        }
+        0 * _
+        returnedResponse == expectedStatus
+
+        where:
+        nodeResponseData                                                                || expectedStatus
+        [[node: 1, status: 200], [node: 2, status: 200], [node: 3, status: 401]] as Set || 401
+        [[node: 1, status: 200], [node: 2, status: 200], [node: 3, status: 403]] as Set || 403
+        [[node: 1, status: 200], [node: 2, status: 403], [node: 3, status: 500]] as Set || 403
+        [[node: 1, status: 200], [node: 2, status: 200], [node: 3, status: 500]] as Set || 500
+    }
+
+    def "MergeResponses: #responseEntities.size() HTTP 200 #httpMethod responses for #requestUriPart"() {
+        given: "json serialization setup"
+        def mapper = new ObjectMapper();
+        def jaxbIntrospector = new JaxbAnnotationIntrospector();
+        def SerializationConfig serializationConfig = mapper.getSerializationConfig();
+        mapper.setSerializationConfig(serializationConfig.withSerializationInclusion(JsonSerialize.Inclusion.NON_NULL).withAnnotationIntrospector(jaxbIntrospector));
+
+        and: "setup of the data to be used in the test"
+        def responseMerger = new StandardHttpResponseMapper(NiFiProperties.createBasicNiFiProperties(null,null))
+        def requestUri = new URI("http://server/$requestUriPart")
+        def requestId = UUID.randomUUID().toString()
+        def Map<ClientResponse, Object> mockToRequestEntity = [:]
+        def n = 0
+        def nodeResponseSet = responseEntities.collect {
+            ++n
+            def clientResponse = Mock(ClientResponse)
+            mockToRequestEntity.put clientResponse, it
+            new NodeResponse(new NodeIdentifier("cluster-node-$n", 'addr', n, 'sktaddr', n * 10, 'stsaddr', n * 100, n * 1000, false, null), "get", requestUri, clientResponse, 500L, requestId)
+        } as Set
+
+        when:
+        def returnedResponse = responseMerger.mapResponses(requestUri, httpMethod, nodeResponseSet, true)
+
+        then:
+        mockToRequestEntity.entrySet().forEach {
+            ClientResponse mockClientResponse = it.key
+            def entity = it.value
+            _ * mockClientResponse.getStatus() >> 200
+            1 * mockClientResponse.getEntity(_) >> entity
+        }
+        responseEntities.size() == mockToRequestEntity.size()
+        0 * _
+        def returnedJson = mapper.writeValueAsString(returnedResponse.getUpdatedEntity())
+        def expectedJson = mapper.writeValueAsString(expectedEntity)
+        returnedJson == expectedJson
+
+        where:
+        requestUriPart                                             | httpMethod | responseEntities                                                                                     ||
+                expectedEntity
+        'nifi-api/controller/config'                               | 'get'      | [
+                new ControllerConfigurationEntity(permissions: new PermissionsDTO(canRead: true, canWrite: true),
+                        component: new ControllerConfigurationDTO(maxEventDrivenThreadCount: 10, maxTimerDrivenThreadCount: 10)),
+                new ControllerConfigurationEntity(permissions: new PermissionsDTO(canRead: true, canWrite: false),
+                        component: new ControllerConfigurationDTO(maxEventDrivenThreadCount: 10, maxTimerDrivenThreadCount: 10)),
+                new ControllerConfigurationEntity(permissions: new PermissionsDTO(canRead: true, canWrite: true),
+                        component: new ControllerConfigurationDTO(maxEventDrivenThreadCount: 10, maxTimerDrivenThreadCount: 10))]                                                      ||
+                // expectedEntity
+                new ControllerConfigurationEntity(permissions: new PermissionsDTO(canRead: true, canWrite: false),
+                        component: new ControllerConfigurationDTO(maxEventDrivenThreadCount: 10, maxTimerDrivenThreadCount: 10))
+        'nifi-api/controller/config'                               | 'put'      | [
+                new ControllerConfigurationEntity(permissions: new PermissionsDTO(canRead: true, canWrite: true),
+                        component: new ControllerConfigurationDTO(maxEventDrivenThreadCount: 10, maxTimerDrivenThreadCount: 10)),
+                new ControllerConfigurationEntity(permissions: new PermissionsDTO(canRead: true, canWrite: false),
+                        component: new ControllerConfigurationDTO(maxEventDrivenThreadCount: 10, maxTimerDrivenThreadCount: 10)),
+                new ControllerConfigurationEntity(permissions: new PermissionsDTO(canRead: true, canWrite: true),
+                        component: new ControllerConfigurationDTO(maxEventDrivenThreadCount: 10, maxTimerDrivenThreadCount: 10))]                                                      ||
+                // expectedEntity
+                new ControllerConfigurationEntity(permissions: new PermissionsDTO(canRead: true, canWrite: false),
+                        component: new ControllerConfigurationDTO(maxEventDrivenThreadCount: 10, maxTimerDrivenThreadCount: 10))
+        "nifi-api/process-groups/${UUID.randomUUID()}/connections" | 'get'      | [
+                new ConnectionsEntity(connections: [new ConnectionEntity(id: '1', permissions: new PermissionsDTO(canRead: true, canWrite: true), status: new
+                        ConnectionStatusDTO(aggregateSnapshot: new ConnectionStatusSnapshotDTO(bytesIn: 300)), component: new ConnectionDTO())] as Set),
+                new ConnectionsEntity(connections: [new ConnectionEntity(id: '1', permissions: new PermissionsDTO(canRead: false, canWrite: false), status: new
+                        ConnectionStatusDTO(aggregateSnapshot: new ConnectionStatusSnapshotDTO(bytesIn: 100)))] as Set),
+                new ConnectionsEntity(connections: [new ConnectionEntity(id: '1', permissions: new PermissionsDTO(canRead: true, canWrite: true), status: new
+                        ConnectionStatusDTO(aggregateSnapshot: new ConnectionStatusSnapshotDTO(bytesIn: 500)), component: new ConnectionDTO())] as Set)] ||
+                // expectedEntity
+                new ConnectionsEntity(connections: [new ConnectionEntity(id: '1', permissions: new PermissionsDTO(canRead: false, canWrite: false),
+                        status: new ConnectionStatusDTO(aggregateSnapshot: new ConnectionStatusSnapshotDTO(bytesIn: 900,
+                                input: '0 (900 bytes)', output: '0 (0 bytes)', queued: '0 (0 bytes)', queuedSize: '0 bytes', queuedCount: 0)))] as Set)
+        "nifi-api/process-groups/${UUID.randomUUID()}/connections" | 'post'     | [
+                new ConnectionEntity(id: '1', permissions: new PermissionsDTO(canRead: true, canWrite: true), status:
+                        new ConnectionStatusDTO(aggregateSnapshot: new ConnectionStatusSnapshotDTO(bytesIn: 300)), component: new ConnectionDTO()),
+                new ConnectionEntity(id: '1', permissions: new PermissionsDTO(canRead: false, canWrite: false), status:
+                        new ConnectionStatusDTO(aggregateSnapshot: new ConnectionStatusSnapshotDTO(bytesIn: 300))),
+                new ConnectionEntity(id: '1', permissions: new PermissionsDTO(canRead: true, canWrite: true), status:
+                        new ConnectionStatusDTO(aggregateSnapshot: new ConnectionStatusSnapshotDTO(bytesIn: 300)), component: new ConnectionDTO())]      ||
+                // expectedEntity
+                new ConnectionEntity(id: '1', permissions: new PermissionsDTO(canRead: false, canWrite: false),
+                        status: new ConnectionStatusDTO(aggregateSnapshot: new ConnectionStatusSnapshotDTO(bytesIn: 900, input: '0 (900 bytes)',
+                                output: '0 (0 bytes)', queued: '0 (0 bytes)', queuedSize: '0 bytes', queuedCount: 0)))
+        "nifi-api/connections/${UUID.randomUUID()}"                | 'get'      | [
+                new ConnectionEntity(id: '1', permissions: new PermissionsDTO(canRead: true, canWrite: true), status:
+                        new ConnectionStatusDTO(aggregateSnapshot: new ConnectionStatusSnapshotDTO(bytesIn: 400)), component: new ConnectionDTO()),
+                new ConnectionEntity(id: '1', permissions: new PermissionsDTO(canRead: false, canWrite: false), status:
+                        new ConnectionStatusDTO(aggregateSnapshot: new ConnectionStatusSnapshotDTO(bytesIn: 300))),
+                new ConnectionEntity(id: '1', permissions: new PermissionsDTO(canRead: true, canWrite: true), status:
+                        new ConnectionStatusDTO(aggregateSnapshot: new ConnectionStatusSnapshotDTO(bytesIn: 300)), component: new ConnectionDTO())]      ||
+                // expectedEntity
+                new ConnectionEntity(id: '1', permissions: new PermissionsDTO(canRead: false, canWrite: false),
+                        status: new ConnectionStatusDTO(aggregateSnapshot: new ConnectionStatusSnapshotDTO(bytesIn: 1000,
+                                input: '0 (1,000 bytes)', output: '0 (0 bytes)', queued: '0 (0 bytes)', queuedSize: '0 bytes', queuedCount: 0)))
+        "nifi-api/process-groups/${UUID.randomUUID()}/labels" | 'get'      | [
+                new LabelsEntity(labels: [new LabelEntity(id: '1', permissions: new PermissionsDTO(canRead: true, canWrite: true), component: new LabelDTO())] as Set),
+                new LabelsEntity(labels: [new LabelEntity(id: '1', permissions: new PermissionsDTO(canRead: false, canWrite: false))] as Set),
+                new LabelsEntity(labels: [new LabelEntity(id: '1', permissions: new PermissionsDTO(canRead: true, canWrite: true), component: new LabelDTO())] as Set)] ||
+                // expectedEntity
+                new LabelsEntity(labels: [new LabelEntity(id: '1', permissions: new PermissionsDTO(canRead: false, canWrite: false))] as Set)
+        "nifi-api/process-groups/${UUID.randomUUID()}/labels" | 'post'     | [
+                new LabelEntity(id: '1', permissions: new PermissionsDTO(canRead: true, canWrite: true), component: new LabelDTO()),
+                new LabelEntity(id: '1', permissions: new PermissionsDTO(canRead: false, canWrite: false)),
+                new LabelEntity(id: '1', permissions: new PermissionsDTO(canRead: true, canWrite: true), component: new LabelDTO())]      ||
+                // expectedEntity
+                new LabelEntity(id: '1', permissions: new PermissionsDTO(canRead: false, canWrite: false))
+        "nifi-api/labels/${UUID.randomUUID()}"                | 'get'      | [
+                new LabelEntity(id: '1', permissions: new PermissionsDTO(canRead: true, canWrite: true), component: new LabelDTO()),
+                new LabelEntity(id: '1', permissions: new PermissionsDTO(canRead: false, canWrite: false)),
+                new LabelEntity(id: '1', permissions: new PermissionsDTO(canRead: true, canWrite: true), component: new LabelDTO())]      ||
+                // expectedEntity
+                new LabelEntity(id: '1', permissions: new PermissionsDTO(canRead: false, canWrite: false))
+        "nifi-api/process-groups/${UUID.randomUUID()}/funnels" | 'get'      | [
+                new FunnelsEntity(funnels: [new FunnelEntity(id: '1', permissions: new PermissionsDTO(canRead: true, canWrite: true), component: new FunnelDTO())] as Set),
+                new FunnelsEntity(funnels: [new FunnelEntity(id: '1', permissions: new PermissionsDTO(canRead: false, canWrite: false))] as Set),
+                new FunnelsEntity(funnels: [new FunnelEntity(id: '1', permissions: new PermissionsDTO(canRead: true, canWrite: true), component: new FunnelDTO())] as Set)] ||
+                // expectedEntity
+                new FunnelsEntity(funnels: [new FunnelEntity(id: '1', permissions: new PermissionsDTO(canRead: false, canWrite: false))] as Set)
+        "nifi-api/process-groups/${UUID.randomUUID()}/funnels" | 'post'     | [
+                new FunnelEntity(id: '1', permissions: new PermissionsDTO(canRead: true, canWrite: true), component: new FunnelDTO()),
+                new FunnelEntity(id: '1', permissions: new PermissionsDTO(canRead: false, canWrite: false)),
+                new FunnelEntity(id: '1', permissions: new PermissionsDTO(canRead: true, canWrite: true), component: new FunnelDTO())]      ||
+                // expectedEntity
+                new LabelEntity(id: '1', permissions: new PermissionsDTO(canRead: false, canWrite: false))
+        "nifi-api/funnels/${UUID.randomUUID()}"                | 'get'      | [
+                new FunnelEntity(id: '1', permissions: new PermissionsDTO(canRead: true, canWrite: true), component: new FunnelDTO()),
+                new FunnelEntity(id: '1', permissions: new PermissionsDTO(canRead: false, canWrite: false)),
+                new FunnelEntity(id: '1', permissions: new PermissionsDTO(canRead: true, canWrite: true), component: new FunnelDTO())]      ||
+                // expectedEntity
+                new FunnelEntity(id: '1', permissions: new PermissionsDTO(canRead: false, canWrite: false))
+    }
+}