You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ma...@apache.org on 2016/10/06 20:19:32 UTC
[1/2] nifi git commit: NIFI-2777: NIFI-2856: - Only performing
response merging when the node is the cluster cooridinator even if there is a
single response. - Fixing PropertyDescriptor merging to ensure the 'choosen'
descriptor is included in map of all
Repository: nifi
Updated Branches:
refs/heads/master 9304df4de -> bb6c5d9d4
http://git-wip-us.apache.org/repos/asf/nifi/blob/bb6c5d9d/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/http/StandardHttpResponseMergerSpec.groovy
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/http/StandardHttpResponseMergerSpec.groovy b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/http/StandardHttpResponseMergerSpec.groovy
deleted file mode 100644
index 03aa08a..0000000
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/http/StandardHttpResponseMergerSpec.groovy
+++ /dev/null
@@ -1,218 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.cluster.coordination.http
-
-import com.sun.jersey.api.client.ClientResponse
-import org.apache.nifi.cluster.manager.NodeResponse
-import org.apache.nifi.cluster.protocol.NodeIdentifier
-import org.apache.nifi.util.NiFiProperties
-import org.apache.nifi.web.api.dto.AccessPolicyDTO
-import org.apache.nifi.web.api.dto.ConnectionDTO
-import org.apache.nifi.web.api.dto.ControllerConfigurationDTO
-import org.apache.nifi.web.api.dto.FunnelDTO
-import org.apache.nifi.web.api.dto.LabelDTO
-import org.apache.nifi.web.api.dto.PermissionsDTO
-import org.apache.nifi.web.api.dto.status.ConnectionStatusDTO
-import org.apache.nifi.web.api.dto.status.ConnectionStatusSnapshotDTO
-import org.apache.nifi.web.api.entity.ConnectionEntity
-import org.apache.nifi.web.api.entity.ConnectionsEntity
-import org.apache.nifi.web.api.entity.ControllerConfigurationEntity
-import org.apache.nifi.web.api.entity.FunnelEntity
-import org.apache.nifi.web.api.entity.FunnelsEntity
-import org.apache.nifi.web.api.entity.LabelEntity
-import org.apache.nifi.web.api.entity.LabelsEntity
-import org.codehaus.jackson.map.ObjectMapper
-import org.codehaus.jackson.map.SerializationConfig
-import org.codehaus.jackson.map.annotate.JsonSerialize
-import org.codehaus.jackson.xc.JaxbAnnotationIntrospector
-import spock.lang.Specification
-import spock.lang.Unroll
-
-@Unroll
-class StandardHttpResponseMergerSpec extends Specification {
-
- def setup() {
- def propFile = StandardHttpResponseMergerSpec.class.getResource("/conf/nifi.properties").getFile()
- System.setProperty NiFiProperties.PROPERTIES_FILE_PATH, propFile
- }
-
- def cleanup() {
- System.clearProperty NiFiProperties.PROPERTIES_FILE_PATH
- }
-
- def "MergeResponses: mixed HTTP GET response statuses, expecting #expectedStatus"() {
- given:
- def responseMerger = new StandardHttpResponseMerger(NiFiProperties.createBasicNiFiProperties(null,null))
- def requestUri = new URI('http://server/resource')
- def requestId = UUID.randomUUID().toString()
- def Map<ClientResponse, Map<String, Integer>> mockToRequestEntity = [:]
- def nodeResponseSet = nodeResponseData.collect {
- int n = it.node
- def clientResponse = Mock(ClientResponse)
- mockToRequestEntity.put clientResponse, it
- new NodeResponse(new NodeIdentifier("cluster-node-$n", 'addr', n, 'sktaddr', n * 10, 'stsaddr', n * 100, n * 1000, false, null), "get", requestUri, clientResponse, 500L, requestId)
- } as Set
-
- when:
- def returnedResponse = responseMerger.mergeResponses(requestUri, 'get', nodeResponseSet).getStatus()
-
- then:
- mockToRequestEntity.entrySet().forEach {
- ClientResponse mockClientResponse = it.key
- _ * mockClientResponse.getStatus() >> it.value.status
- }
- 0 * _
- returnedResponse == expectedStatus
-
- where:
- nodeResponseData || expectedStatus
- [[node: 1, status: 200], [node: 2, status: 200], [node: 3, status: 401]] as Set || 401
- [[node: 1, status: 200], [node: 2, status: 200], [node: 3, status: 403]] as Set || 403
- [[node: 1, status: 200], [node: 2, status: 403], [node: 3, status: 500]] as Set || 403
- [[node: 1, status: 200], [node: 2, status: 200], [node: 3, status: 500]] as Set || 500
- }
-
- def "MergeResponses: #responseEntities.size() HTTP 200 #httpMethod responses for #requestUriPart"() {
- given: "json serialization setup"
- def mapper = new ObjectMapper();
- def jaxbIntrospector = new JaxbAnnotationIntrospector();
- def SerializationConfig serializationConfig = mapper.getSerializationConfig();
- mapper.setSerializationConfig(serializationConfig.withSerializationInclusion(JsonSerialize.Inclusion.NON_NULL).withAnnotationIntrospector(jaxbIntrospector));
-
- and: "setup of the data to be used in the test"
- def responseMerger = new StandardHttpResponseMerger(NiFiProperties.createBasicNiFiProperties(null,null))
- def requestUri = new URI("http://server/$requestUriPart")
- def requestId = UUID.randomUUID().toString()
- def Map<ClientResponse, Object> mockToRequestEntity = [:]
- def n = 0
- def nodeResponseSet = responseEntities.collect {
- ++n
- def clientResponse = Mock(ClientResponse)
- mockToRequestEntity.put clientResponse, it
- new NodeResponse(new NodeIdentifier("cluster-node-$n", 'addr', n, 'sktaddr', n * 10, 'stsaddr', n * 100, n * 1000, false, null), "get", requestUri, clientResponse, 500L, requestId)
- } as Set
-
- when:
- def returnedResponse = responseMerger.mergeResponses(requestUri, httpMethod, nodeResponseSet)
-
- then:
- mockToRequestEntity.entrySet().forEach {
- ClientResponse mockClientResponse = it.key
- def entity = it.value
- _ * mockClientResponse.getStatus() >> 200
- 1 * mockClientResponse.getEntity(_) >> entity
- }
- responseEntities.size() == mockToRequestEntity.size()
- 0 * _
- def returnedJson = mapper.writeValueAsString(returnedResponse.getUpdatedEntity())
- def expectedJson = mapper.writeValueAsString(expectedEntity)
- returnedJson == expectedJson
-
- where:
- requestUriPart | httpMethod | responseEntities ||
- expectedEntity
- 'nifi-api/controller/config' | 'get' | [
- new ControllerConfigurationEntity(permissions: new PermissionsDTO(canRead: true, canWrite: true),
- component: new ControllerConfigurationDTO(maxEventDrivenThreadCount: 10, maxTimerDrivenThreadCount: 10)),
- new ControllerConfigurationEntity(permissions: new PermissionsDTO(canRead: true, canWrite: false),
- component: new ControllerConfigurationDTO(maxEventDrivenThreadCount: 10, maxTimerDrivenThreadCount: 10)),
- new ControllerConfigurationEntity(permissions: new PermissionsDTO(canRead: true, canWrite: true),
- component: new ControllerConfigurationDTO(maxEventDrivenThreadCount: 10, maxTimerDrivenThreadCount: 10))] ||
- // expectedEntity
- new ControllerConfigurationEntity(permissions: new PermissionsDTO(canRead: true, canWrite: false),
- component: new ControllerConfigurationDTO(maxEventDrivenThreadCount: 10, maxTimerDrivenThreadCount: 10))
- 'nifi-api/controller/config' | 'put' | [
- new ControllerConfigurationEntity(permissions: new PermissionsDTO(canRead: true, canWrite: true),
- component: new ControllerConfigurationDTO(maxEventDrivenThreadCount: 10, maxTimerDrivenThreadCount: 10)),
- new ControllerConfigurationEntity(permissions: new PermissionsDTO(canRead: true, canWrite: false),
- component: new ControllerConfigurationDTO(maxEventDrivenThreadCount: 10, maxTimerDrivenThreadCount: 10)),
- new ControllerConfigurationEntity(permissions: new PermissionsDTO(canRead: true, canWrite: true),
- component: new ControllerConfigurationDTO(maxEventDrivenThreadCount: 10, maxTimerDrivenThreadCount: 10))] ||
- // expectedEntity
- new ControllerConfigurationEntity(permissions: new PermissionsDTO(canRead: true, canWrite: false),
- component: new ControllerConfigurationDTO(maxEventDrivenThreadCount: 10, maxTimerDrivenThreadCount: 10))
- "nifi-api/process-groups/${UUID.randomUUID()}/connections" | 'get' | [
- new ConnectionsEntity(connections: [new ConnectionEntity(id: '1', permissions: new PermissionsDTO(canRead: true, canWrite: true), status: new
- ConnectionStatusDTO(aggregateSnapshot: new ConnectionStatusSnapshotDTO(bytesIn: 300)), component: new ConnectionDTO())] as Set),
- new ConnectionsEntity(connections: [new ConnectionEntity(id: '1', permissions: new PermissionsDTO(canRead: false, canWrite: false), status: new
- ConnectionStatusDTO(aggregateSnapshot: new ConnectionStatusSnapshotDTO(bytesIn: 100)))] as Set),
- new ConnectionsEntity(connections: [new ConnectionEntity(id: '1', permissions: new PermissionsDTO(canRead: true, canWrite: true), status: new
- ConnectionStatusDTO(aggregateSnapshot: new ConnectionStatusSnapshotDTO(bytesIn: 500)), component: new ConnectionDTO())] as Set)] ||
- // expectedEntity
- new ConnectionsEntity(connections: [new ConnectionEntity(id: '1', permissions: new PermissionsDTO(canRead: false, canWrite: false),
- status: new ConnectionStatusDTO(aggregateSnapshot: new ConnectionStatusSnapshotDTO(bytesIn: 900,
- input: '0 (900 bytes)', output: '0 (0 bytes)', queued: '0 (0 bytes)', queuedSize: '0 bytes', queuedCount: 0)))] as Set)
- "nifi-api/process-groups/${UUID.randomUUID()}/connections" | 'post' | [
- new ConnectionEntity(id: '1', permissions: new PermissionsDTO(canRead: true, canWrite: true), status:
- new ConnectionStatusDTO(aggregateSnapshot: new ConnectionStatusSnapshotDTO(bytesIn: 300)), component: new ConnectionDTO()),
- new ConnectionEntity(id: '1', permissions: new PermissionsDTO(canRead: false, canWrite: false), status:
- new ConnectionStatusDTO(aggregateSnapshot: new ConnectionStatusSnapshotDTO(bytesIn: 300))),
- new ConnectionEntity(id: '1', permissions: new PermissionsDTO(canRead: true, canWrite: true), status:
- new ConnectionStatusDTO(aggregateSnapshot: new ConnectionStatusSnapshotDTO(bytesIn: 300)), component: new ConnectionDTO())] ||
- // expectedEntity
- new ConnectionEntity(id: '1', permissions: new PermissionsDTO(canRead: false, canWrite: false),
- status: new ConnectionStatusDTO(aggregateSnapshot: new ConnectionStatusSnapshotDTO(bytesIn: 900, input: '0 (900 bytes)',
- output: '0 (0 bytes)', queued: '0 (0 bytes)', queuedSize: '0 bytes', queuedCount: 0)))
- "nifi-api/connections/${UUID.randomUUID()}" | 'get' | [
- new ConnectionEntity(id: '1', permissions: new PermissionsDTO(canRead: true, canWrite: true), status:
- new ConnectionStatusDTO(aggregateSnapshot: new ConnectionStatusSnapshotDTO(bytesIn: 400)), component: new ConnectionDTO()),
- new ConnectionEntity(id: '1', permissions: new PermissionsDTO(canRead: false, canWrite: false), status:
- new ConnectionStatusDTO(aggregateSnapshot: new ConnectionStatusSnapshotDTO(bytesIn: 300))),
- new ConnectionEntity(id: '1', permissions: new PermissionsDTO(canRead: true, canWrite: true), status:
- new ConnectionStatusDTO(aggregateSnapshot: new ConnectionStatusSnapshotDTO(bytesIn: 300)), component: new ConnectionDTO())] ||
- // expectedEntity
- new ConnectionEntity(id: '1', permissions: new PermissionsDTO(canRead: false, canWrite: false),
- status: new ConnectionStatusDTO(aggregateSnapshot: new ConnectionStatusSnapshotDTO(bytesIn: 1000,
- input: '0 (1,000 bytes)', output: '0 (0 bytes)', queued: '0 (0 bytes)', queuedSize: '0 bytes', queuedCount: 0)))
- "nifi-api/process-groups/${UUID.randomUUID()}/labels" | 'get' | [
- new LabelsEntity(labels: [new LabelEntity(id: '1', permissions: new PermissionsDTO(canRead: true, canWrite: true), component: new LabelDTO())] as Set),
- new LabelsEntity(labels: [new LabelEntity(id: '1', permissions: new PermissionsDTO(canRead: false, canWrite: false))] as Set),
- new LabelsEntity(labels: [new LabelEntity(id: '1', permissions: new PermissionsDTO(canRead: true, canWrite: true), component: new LabelDTO())] as Set)] ||
- // expectedEntity
- new LabelsEntity(labels: [new LabelEntity(id: '1', permissions: new PermissionsDTO(canRead: false, canWrite: false))] as Set)
- "nifi-api/process-groups/${UUID.randomUUID()}/labels" | 'post' | [
- new LabelEntity(id: '1', permissions: new PermissionsDTO(canRead: true, canWrite: true), component: new LabelDTO()),
- new LabelEntity(id: '1', permissions: new PermissionsDTO(canRead: false, canWrite: false)),
- new LabelEntity(id: '1', permissions: new PermissionsDTO(canRead: true, canWrite: true), component: new LabelDTO())] ||
- // expectedEntity
- new LabelEntity(id: '1', permissions: new PermissionsDTO(canRead: false, canWrite: false))
- "nifi-api/labels/${UUID.randomUUID()}" | 'get' | [
- new LabelEntity(id: '1', permissions: new PermissionsDTO(canRead: true, canWrite: true), component: new LabelDTO()),
- new LabelEntity(id: '1', permissions: new PermissionsDTO(canRead: false, canWrite: false)),
- new LabelEntity(id: '1', permissions: new PermissionsDTO(canRead: true, canWrite: true), component: new LabelDTO())] ||
- // expectedEntity
- new LabelEntity(id: '1', permissions: new PermissionsDTO(canRead: false, canWrite: false))
- "nifi-api/process-groups/${UUID.randomUUID()}/funnels" | 'get' | [
- new FunnelsEntity(funnels: [new FunnelEntity(id: '1', permissions: new PermissionsDTO(canRead: true, canWrite: true), component: new FunnelDTO())] as Set),
- new FunnelsEntity(funnels: [new FunnelEntity(id: '1', permissions: new PermissionsDTO(canRead: false, canWrite: false))] as Set),
- new FunnelsEntity(funnels: [new FunnelEntity(id: '1', permissions: new PermissionsDTO(canRead: true, canWrite: true), component: new FunnelDTO())] as Set)] ||
- // expectedEntity
- new FunnelsEntity(funnels: [new FunnelEntity(id: '1', permissions: new PermissionsDTO(canRead: false, canWrite: false))] as Set)
- "nifi-api/process-groups/${UUID.randomUUID()}/funnels" | 'post' | [
- new FunnelEntity(id: '1', permissions: new PermissionsDTO(canRead: true, canWrite: true), component: new FunnelDTO()),
- new FunnelEntity(id: '1', permissions: new PermissionsDTO(canRead: false, canWrite: false)),
- new FunnelEntity(id: '1', permissions: new PermissionsDTO(canRead: true, canWrite: true), component: new FunnelDTO())] ||
- // expectedEntity
- new LabelEntity(id: '1', permissions: new PermissionsDTO(canRead: false, canWrite: false))
- "nifi-api/funnels/${UUID.randomUUID()}" | 'get' | [
- new FunnelEntity(id: '1', permissions: new PermissionsDTO(canRead: true, canWrite: true), component: new FunnelDTO()),
- new FunnelEntity(id: '1', permissions: new PermissionsDTO(canRead: false, canWrite: false)),
- new FunnelEntity(id: '1', permissions: new PermissionsDTO(canRead: true, canWrite: true), component: new FunnelDTO())] ||
- // expectedEntity
- new FunnelEntity(id: '1', permissions: new PermissionsDTO(canRead: false, canWrite: false))
- }
-}
[2/2] nifi git commit: NIFI-2777: NIFI-2856: - Only performing
response merging when the node is the cluster cooridinator even if there is a
single response. - Fixing PropertyDescriptor merging to ensure the 'choosen'
descriptor is included in map of all
Posted by ma...@apache.org.
NIFI-2777:
NIFI-2856:
- Only performing response merging when the node is the cluster cooridinator even if there is a single response.
- Fixing PropertyDescriptor merging to ensure the 'choosen' descriptor is included in map of all responses.
This closes #1095.
Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/bb6c5d9d
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/bb6c5d9d
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/bb6c5d9d
Branch: refs/heads/master
Commit: bb6c5d9d4ea02433ee8cb63cd142327c02f6e9da
Parents: 9304df4
Author: Matt Gilman <ma...@gmail.com>
Authored: Tue Oct 4 14:52:18 2016 -0400
Committer: Mark Payne <ma...@hotmail.com>
Committed: Thu Oct 6 16:19:19 2016 -0400
----------------------------------------------------------------------
.../coordination/http/HttpResponseMapper.java | 65 ++++++
.../coordination/http/HttpResponseMerger.java | 65 ------
.../http/StandardHttpResponseMapper.java | 230 +++++++++++++++++++
.../http/StandardHttpResponseMerger.java | 229 ------------------
.../StandardAsyncClusterResponse.java | 22 +-
.../ThreadPoolRequestReplicator.java | 26 +--
.../node/NodeClusterCoordinator.java | 37 +--
.../manager/PropertyDescriptorDtoMerger.java | 2 +-
.../http/StandardHttpResponseMapperSpec.groovy | 217 +++++++++++++++++
.../http/StandardHttpResponseMergerSpec.groovy | 218 ------------------
10 files changed, 557 insertions(+), 554 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/nifi/blob/bb6c5d9d/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/HttpResponseMapper.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/HttpResponseMapper.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/HttpResponseMapper.java
new file mode 100644
index 0000000..659f5e1
--- /dev/null
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/HttpResponseMapper.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.cluster.coordination.http;
+
+import org.apache.nifi.cluster.manager.NodeResponse;
+
+import java.net.URI;
+import java.util.Set;
+
+/**
+ * <p>
+ * An HttpResponseMapper is responsible for taking the responses from all nodes in a cluster
+ * and distilling them down to a single response that would be appropriate to respond with, to the
+ * user/client who made the original web requests.
+ * </p>
+ */
+public interface HttpResponseMapper {
+
+ /**
+ * Maps the responses from all nodes in the cluster to a single NodeResponse object that
+ * is appropriate to respond with
+ *
+ * @param uri the URI of the web request that was made
+ * @param httpMethod the HTTP Method that was used when making the request
+ * @param nodeResponses the responses received from the individual nodes
+ *
+ * @return a single NodeResponse that represents the response that should be returned to the user/client
+ */
+ NodeResponse mapResponses(URI uri, String httpMethod, Set<NodeResponse> nodeResponses, boolean merge);
+
+ /**
+ * Returns a subset (or equal set) of the given Node Responses, such that all of those returned are the responses
+ * that indicate that the node was unable to fulfill the request
+ *
+ * @param allResponses the responses to filter
+ *
+ * @return a subset (or equal set) of the given Node Responses, such that all of those returned are the responses
+ * that indicate that the node was unable to fulfill the request
+ */
+ Set<NodeResponse> getProblematicNodeResponses(Set<NodeResponse> allResponses);
+
+ /**
+ * Indicates whether or not the responses from nodes for the given URI & HTTP method must be interpreted in order to merge them
+ *
+ * @param uri the URI of the request
+ * @param httpMethod the HTTP Method of the request
+ * @return <code>true</code> if the response must be interpreted, <code>false</code> otherwise
+ */
+ boolean isResponseInterpreted(URI uri, String httpMethod);
+}
http://git-wip-us.apache.org/repos/asf/nifi/blob/bb6c5d9d/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/HttpResponseMerger.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/HttpResponseMerger.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/HttpResponseMerger.java
deleted file mode 100644
index 6102b74..0000000
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/HttpResponseMerger.java
+++ /dev/null
@@ -1,65 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.nifi.cluster.coordination.http;
-
-import java.net.URI;
-import java.util.Set;
-
-import org.apache.nifi.cluster.manager.NodeResponse;
-
-/**
- * <p>
- * An HttpResponseMapper is responsible for taking the responses from all nodes in a cluster
- * and distilling them down to a single response that would be appropriate to respond with, to the
- * user/client who made the original web requests.
- * </p>
- */
-public interface HttpResponseMerger {
-
- /**
- * Maps the responses from all nodes in the cluster to a single NodeResponse object that
- * is appropriate to respond with
- *
- * @param uri the URI of the web request that was made
- * @param httpMethod the HTTP Method that was used when making the request
- * @param nodeResponses the responses received from the individual nodes
- *
- * @return a single NodeResponse that represents the response that should be returned to the user/client
- */
- NodeResponse mergeResponses(URI uri, String httpMethod, Set<NodeResponse> nodeResponses);
-
- /**
- * Returns a subset (or equal set) of the given Node Responses, such that all of those returned are the responses
- * that indicate that the node was unable to fulfill the request
- *
- * @param allResponses the responses to filter
- *
- * @return a subset (or equal set) of the given Node Responses, such that all of those returned are the responses
- * that indicate that the node was unable to fulfill the request
- */
- Set<NodeResponse> getProblematicNodeResponses(Set<NodeResponse> allResponses);
-
- /**
- * Indicates whether or not the responses from nodes for the given URI & HTTP method must be interpreted in order to merge them
- *
- * @param uri the URI of the request
- * @param httpMethod the HTTP Method of the request
- * @return <code>true</code> if the response must be interpreted, <code>false</code> otherwise
- */
- boolean isResponseInterpreted(URI uri, String httpMethod);
-}
http://git-wip-us.apache.org/repos/asf/nifi/blob/bb6c5d9d/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/StandardHttpResponseMapper.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/StandardHttpResponseMapper.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/StandardHttpResponseMapper.java
new file mode 100644
index 0000000..68719fe
--- /dev/null
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/StandardHttpResponseMapper.java
@@ -0,0 +1,230 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.cluster.coordination.http;
+
+import org.apache.nifi.cluster.coordination.http.endpoints.BulletinBoardEndpointMerger;
+import org.apache.nifi.cluster.coordination.http.endpoints.ComponentStateEndpointMerger;
+import org.apache.nifi.cluster.coordination.http.endpoints.ConnectionEndpointMerger;
+import org.apache.nifi.cluster.coordination.http.endpoints.ConnectionStatusEndpointMerger;
+import org.apache.nifi.cluster.coordination.http.endpoints.ConnectionsEndpointMerger;
+import org.apache.nifi.cluster.coordination.http.endpoints.ControllerBulletinsEndpointMerger;
+import org.apache.nifi.cluster.coordination.http.endpoints.ControllerConfigurationEndpointMerger;
+import org.apache.nifi.cluster.coordination.http.endpoints.ControllerEndpointMerger;
+import org.apache.nifi.cluster.coordination.http.endpoints.ControllerServiceEndpointMerger;
+import org.apache.nifi.cluster.coordination.http.endpoints.ControllerServiceReferenceEndpointMerger;
+import org.apache.nifi.cluster.coordination.http.endpoints.ControllerServicesEndpointMerger;
+import org.apache.nifi.cluster.coordination.http.endpoints.ControllerStatusEndpointMerger;
+import org.apache.nifi.cluster.coordination.http.endpoints.CountersEndpointMerger;
+import org.apache.nifi.cluster.coordination.http.endpoints.CurrentUserEndpointMerger;
+import org.apache.nifi.cluster.coordination.http.endpoints.DropRequestEndpointMerger;
+import org.apache.nifi.cluster.coordination.http.endpoints.FlowConfigurationEndpointMerger;
+import org.apache.nifi.cluster.coordination.http.endpoints.FlowMerger;
+import org.apache.nifi.cluster.coordination.http.endpoints.FlowSnippetEndpointMerger;
+import org.apache.nifi.cluster.coordination.http.endpoints.FunnelEndpointMerger;
+import org.apache.nifi.cluster.coordination.http.endpoints.FunnelsEndpointMerger;
+import org.apache.nifi.cluster.coordination.http.endpoints.GroupStatusEndpointMerger;
+import org.apache.nifi.cluster.coordination.http.endpoints.InputPortsEndpointMerger;
+import org.apache.nifi.cluster.coordination.http.endpoints.LabelEndpointMerger;
+import org.apache.nifi.cluster.coordination.http.endpoints.LabelsEndpointMerger;
+import org.apache.nifi.cluster.coordination.http.endpoints.ListFlowFilesEndpointMerger;
+import org.apache.nifi.cluster.coordination.http.endpoints.OutputPortsEndpointMerger;
+import org.apache.nifi.cluster.coordination.http.endpoints.PortEndpointMerger;
+import org.apache.nifi.cluster.coordination.http.endpoints.PortStatusEndpointMerger;
+import org.apache.nifi.cluster.coordination.http.endpoints.ProcessGroupEndpointMerger;
+import org.apache.nifi.cluster.coordination.http.endpoints.ProcessGroupsEndpointMerger;
+import org.apache.nifi.cluster.coordination.http.endpoints.ProcessorEndpointMerger;
+import org.apache.nifi.cluster.coordination.http.endpoints.ProcessorStatusEndpointMerger;
+import org.apache.nifi.cluster.coordination.http.endpoints.ProcessorsEndpointMerger;
+import org.apache.nifi.cluster.coordination.http.endpoints.ProvenanceEventEndpointMerger;
+import org.apache.nifi.cluster.coordination.http.endpoints.ProvenanceQueryEndpointMerger;
+import org.apache.nifi.cluster.coordination.http.endpoints.RemoteProcessGroupEndpointMerger;
+import org.apache.nifi.cluster.coordination.http.endpoints.RemoteProcessGroupStatusEndpointMerger;
+import org.apache.nifi.cluster.coordination.http.endpoints.RemoteProcessGroupsEndpointMerger;
+import org.apache.nifi.cluster.coordination.http.endpoints.ReportingTaskEndpointMerger;
+import org.apache.nifi.cluster.coordination.http.endpoints.ReportingTasksEndpointMerger;
+import org.apache.nifi.cluster.coordination.http.endpoints.StatusHistoryEndpointMerger;
+import org.apache.nifi.cluster.coordination.http.endpoints.SystemDiagnosticsEndpointMerger;
+import org.apache.nifi.cluster.coordination.http.endpoints.TemplatesEndpointMerger;
+import org.apache.nifi.cluster.coordination.http.replication.RequestReplicator;
+import org.apache.nifi.cluster.manager.NodeResponse;
+import org.apache.nifi.stream.io.NullOutputStream;
+import org.apache.nifi.util.FormatUtils;
+import org.apache.nifi.util.NiFiProperties;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.ws.rs.core.StreamingOutput;
+import java.io.IOException;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+public class StandardHttpResponseMapper implements HttpResponseMapper {
+
+ private Logger logger = LoggerFactory.getLogger(StandardHttpResponseMapper.class);
+
+ private final List<EndpointResponseMerger> endpointMergers = new ArrayList<>();
+
+ public StandardHttpResponseMapper(final NiFiProperties nifiProperties) {
+ final String snapshotFrequency = nifiProperties.getProperty(NiFiProperties.COMPONENT_STATUS_SNAPSHOT_FREQUENCY, NiFiProperties.DEFAULT_COMPONENT_STATUS_SNAPSHOT_FREQUENCY);
+ long snapshotMillis;
+ try {
+ snapshotMillis = FormatUtils.getTimeDuration(snapshotFrequency, TimeUnit.MILLISECONDS);
+ } catch (final Exception e) {
+ snapshotMillis = FormatUtils.getTimeDuration(NiFiProperties.DEFAULT_COMPONENT_STATUS_SNAPSHOT_FREQUENCY, TimeUnit.MILLISECONDS);
+ }
+ endpointMergers.add(new ControllerStatusEndpointMerger());
+ endpointMergers.add(new ControllerBulletinsEndpointMerger());
+ endpointMergers.add(new GroupStatusEndpointMerger());
+ endpointMergers.add(new ProcessorStatusEndpointMerger());
+ endpointMergers.add(new ConnectionStatusEndpointMerger());
+ endpointMergers.add(new PortStatusEndpointMerger());
+ endpointMergers.add(new RemoteProcessGroupStatusEndpointMerger());
+ endpointMergers.add(new ProcessorEndpointMerger());
+ endpointMergers.add(new ProcessorsEndpointMerger());
+ endpointMergers.add(new ConnectionEndpointMerger());
+ endpointMergers.add(new ConnectionsEndpointMerger());
+ endpointMergers.add(new PortEndpointMerger());
+ endpointMergers.add(new InputPortsEndpointMerger());
+ endpointMergers.add(new OutputPortsEndpointMerger());
+ endpointMergers.add(new RemoteProcessGroupEndpointMerger());
+ endpointMergers.add(new RemoteProcessGroupsEndpointMerger());
+ endpointMergers.add(new ProcessGroupEndpointMerger());
+ endpointMergers.add(new ProcessGroupsEndpointMerger());
+ endpointMergers.add(new FlowSnippetEndpointMerger());
+ endpointMergers.add(new ProvenanceQueryEndpointMerger());
+ endpointMergers.add(new ProvenanceEventEndpointMerger());
+ endpointMergers.add(new ControllerServiceEndpointMerger());
+ endpointMergers.add(new ControllerServicesEndpointMerger());
+ endpointMergers.add(new ControllerServiceReferenceEndpointMerger());
+ endpointMergers.add(new ReportingTaskEndpointMerger());
+ endpointMergers.add(new ReportingTasksEndpointMerger());
+ endpointMergers.add(new DropRequestEndpointMerger());
+ endpointMergers.add(new ListFlowFilesEndpointMerger());
+ endpointMergers.add(new ComponentStateEndpointMerger());
+ endpointMergers.add(new BulletinBoardEndpointMerger());
+ endpointMergers.add(new StatusHistoryEndpointMerger(snapshotMillis));
+ endpointMergers.add(new SystemDiagnosticsEndpointMerger());
+ endpointMergers.add(new CountersEndpointMerger());
+ endpointMergers.add(new FlowMerger());
+ endpointMergers.add(new ControllerConfigurationEndpointMerger());
+ endpointMergers.add(new CurrentUserEndpointMerger());
+ endpointMergers.add(new FlowConfigurationEndpointMerger());
+ endpointMergers.add(new TemplatesEndpointMerger());
+ endpointMergers.add(new LabelEndpointMerger());
+ endpointMergers.add(new LabelsEndpointMerger());
+ endpointMergers.add(new FunnelEndpointMerger());
+ endpointMergers.add(new FunnelsEndpointMerger());
+ endpointMergers.add(new ControllerEndpointMerger());
+ }
+
+ @Override
+ public NodeResponse mapResponses(final URI uri, final String httpMethod, final Set<NodeResponse> nodeResponses, final boolean merge) {
+ final boolean hasSuccess = hasSuccessfulResponse(nodeResponses);
+ if (!hasSuccess) {
+ // If we have a response that is a 3xx, 4xx, or 5xx, then we want to choose that.
+ // Otherwise, it doesn't matter which one we choose. We do this because if we replicate
+ // a mutable request, it's possible that one node will respond with a 409, for instance, while
+ // others respond with a 150-Continue. We do not want to pick the 150-Continue; instead, we want
+ // the failed response.
+ final NodeResponse clientResponse = nodeResponses.stream().filter(p -> p.getStatus() > 299).findAny().orElse(nodeResponses.iterator().next());
+
+ // Drain the response from all nodes except for the 'chosen one'. This ensures that we don't
+ // leave data lingering on the socket and ensures that we don't consume the content of the response
+ // that we intend to respond with
+ drainResponses(nodeResponses, clientResponse);
+ return clientResponse;
+ }
+
+ // Determine which responses are successful
+ final Set<NodeResponse> successResponses = nodeResponses.stream().filter(p -> p.is2xx()).collect(Collectors.toSet());
+ final Set<NodeResponse> problematicResponses = nodeResponses.stream().filter(p -> !p.is2xx()).collect(Collectors.toSet());
+
+ final NodeResponse clientResponse;
+ if ("GET".equalsIgnoreCase(httpMethod) && problematicResponses.size() > 0) {
+ // If there are problematic responses, at least one of the nodes couldn't complete the request
+ clientResponse = problematicResponses.stream().filter(p -> p.getStatus() >= 400 && p.getStatus() < 500).findFirst().orElse(
+ problematicResponses.stream().filter(p -> p.getStatus() > 500).findFirst().orElse(problematicResponses.iterator().next()));
+ return clientResponse;
+ } else {
+ // Choose any of the successful responses to be the 'chosen one'.
+ clientResponse = successResponses.iterator().next();
+ }
+
+ if (merge == false) {
+ return clientResponse;
+ }
+
+ EndpointResponseMerger merger = getEndpointResponseMerger(uri, httpMethod);
+ if (merger == null) {
+ return clientResponse;
+ }
+
+ final NodeResponse response = merger.merge(uri, httpMethod, successResponses, problematicResponses, clientResponse);
+ return response;
+ }
+
+ @Override
+ public Set<NodeResponse> getProblematicNodeResponses(final Set<NodeResponse> allResponses) {
+ // Check if there are any 2xx responses
+ final boolean containsSuccessfulResponse = hasSuccessfulResponse(allResponses);
+
+ if (containsSuccessfulResponse) {
+ // If there is a 2xx response, we consider a response to be problematic if it is not 2xx
+ return allResponses.stream().filter(p -> !p.is2xx()).collect(Collectors.toSet());
+ } else {
+ // If no node is successful, we consider a problematic response to be only those that are 5xx
+ return allResponses.stream().filter(p -> p.is5xx()).collect(Collectors.toSet());
+ }
+ }
+
+ @Override
+ public boolean isResponseInterpreted(final URI uri, final String httpMethod) {
+ return getEndpointResponseMerger(uri, httpMethod) != null;
+ }
+
+ private EndpointResponseMerger getEndpointResponseMerger(final URI uri, final String httpMethod) {
+ return endpointMergers.stream().filter(p -> p.canHandle(uri, httpMethod)).findFirst().orElse(null);
+ }
+
+ private boolean hasSuccessfulResponse(final Set<NodeResponse> allResponses) {
+ return allResponses.stream().anyMatch(p -> p.is2xx());
+ }
+
+ private void drainResponses(final Set<NodeResponse> responses, final NodeResponse exclude) {
+ responses.stream()
+ .parallel() // "parallelize" the draining of the responses, since we have multiple streams to consume
+ .filter(response -> response != exclude) // don't include the explicitly excluded node
+ .filter(response -> response.getStatus() != RequestReplicator.NODE_CONTINUE_STATUS_CODE) // don't include any 150-NodeContinue responses because they contain no content
+ .forEach(response -> drainResponse(response)); // drain all node responses that didn't get filtered out
+ }
+
+ private void drainResponse(final NodeResponse response) {
+ if (response.hasThrowable()) {
+ return;
+ }
+
+ try {
+ ((StreamingOutput) response.getResponse().getEntity()).write(new NullOutputStream());
+ } catch (final IOException ioe) {
+ logger.info("Failed clearing out non-client response buffer from " + response.getNodeId() + " due to: " + ioe, ioe);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/nifi/blob/bb6c5d9d/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/StandardHttpResponseMerger.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/StandardHttpResponseMerger.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/StandardHttpResponseMerger.java
deleted file mode 100644
index 512a0b3..0000000
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/StandardHttpResponseMerger.java
+++ /dev/null
@@ -1,229 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.cluster.coordination.http;
-
-import org.apache.nifi.cluster.coordination.http.endpoints.BulletinBoardEndpointMerger;
-import org.apache.nifi.cluster.coordination.http.endpoints.ComponentStateEndpointMerger;
-import org.apache.nifi.cluster.coordination.http.endpoints.ConnectionEndpointMerger;
-import org.apache.nifi.cluster.coordination.http.endpoints.ConnectionStatusEndpointMerger;
-import org.apache.nifi.cluster.coordination.http.endpoints.ConnectionsEndpointMerger;
-import org.apache.nifi.cluster.coordination.http.endpoints.ControllerBulletinsEndpointMerger;
-import org.apache.nifi.cluster.coordination.http.endpoints.ControllerConfigurationEndpointMerger;
-import org.apache.nifi.cluster.coordination.http.endpoints.ControllerEndpointMerger;
-import org.apache.nifi.cluster.coordination.http.endpoints.ControllerServiceEndpointMerger;
-import org.apache.nifi.cluster.coordination.http.endpoints.ControllerServiceReferenceEndpointMerger;
-import org.apache.nifi.cluster.coordination.http.endpoints.ControllerServicesEndpointMerger;
-import org.apache.nifi.cluster.coordination.http.endpoints.ControllerStatusEndpointMerger;
-import org.apache.nifi.cluster.coordination.http.endpoints.CountersEndpointMerger;
-import org.apache.nifi.cluster.coordination.http.endpoints.CurrentUserEndpointMerger;
-import org.apache.nifi.cluster.coordination.http.endpoints.DropRequestEndpointMerger;
-import org.apache.nifi.cluster.coordination.http.endpoints.FlowConfigurationEndpointMerger;
-import org.apache.nifi.cluster.coordination.http.endpoints.FlowMerger;
-import org.apache.nifi.cluster.coordination.http.endpoints.FlowSnippetEndpointMerger;
-import org.apache.nifi.cluster.coordination.http.endpoints.FunnelEndpointMerger;
-import org.apache.nifi.cluster.coordination.http.endpoints.FunnelsEndpointMerger;
-import org.apache.nifi.cluster.coordination.http.endpoints.GroupStatusEndpointMerger;
-import org.apache.nifi.cluster.coordination.http.endpoints.InputPortsEndpointMerger;
-import org.apache.nifi.cluster.coordination.http.endpoints.LabelEndpointMerger;
-import org.apache.nifi.cluster.coordination.http.endpoints.LabelsEndpointMerger;
-import org.apache.nifi.cluster.coordination.http.endpoints.ListFlowFilesEndpointMerger;
-import org.apache.nifi.cluster.coordination.http.endpoints.OutputPortsEndpointMerger;
-import org.apache.nifi.cluster.coordination.http.endpoints.PortEndpointMerger;
-import org.apache.nifi.cluster.coordination.http.endpoints.PortStatusEndpointMerger;
-import org.apache.nifi.cluster.coordination.http.endpoints.ProcessGroupEndpointMerger;
-import org.apache.nifi.cluster.coordination.http.endpoints.ProcessGroupsEndpointMerger;
-import org.apache.nifi.cluster.coordination.http.endpoints.ProcessorEndpointMerger;
-import org.apache.nifi.cluster.coordination.http.endpoints.ProcessorStatusEndpointMerger;
-import org.apache.nifi.cluster.coordination.http.endpoints.ProcessorsEndpointMerger;
-import org.apache.nifi.cluster.coordination.http.endpoints.ProvenanceEventEndpointMerger;
-import org.apache.nifi.cluster.coordination.http.endpoints.ProvenanceQueryEndpointMerger;
-import org.apache.nifi.cluster.coordination.http.endpoints.RemoteProcessGroupEndpointMerger;
-import org.apache.nifi.cluster.coordination.http.endpoints.RemoteProcessGroupStatusEndpointMerger;
-import org.apache.nifi.cluster.coordination.http.endpoints.RemoteProcessGroupsEndpointMerger;
-import org.apache.nifi.cluster.coordination.http.endpoints.ReportingTaskEndpointMerger;
-import org.apache.nifi.cluster.coordination.http.endpoints.ReportingTasksEndpointMerger;
-import org.apache.nifi.cluster.coordination.http.endpoints.StatusHistoryEndpointMerger;
-import org.apache.nifi.cluster.coordination.http.endpoints.SystemDiagnosticsEndpointMerger;
-import org.apache.nifi.cluster.coordination.http.endpoints.TemplatesEndpointMerger;
-import org.apache.nifi.cluster.coordination.http.replication.RequestReplicator;
-import org.apache.nifi.cluster.manager.NodeResponse;
-import org.apache.nifi.stream.io.NullOutputStream;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import javax.ws.rs.core.StreamingOutput;
-import java.io.IOException;
-import java.net.URI;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Set;
-import java.util.concurrent.TimeUnit;
-import java.util.stream.Collectors;
-import org.apache.nifi.util.FormatUtils;
-import org.apache.nifi.util.NiFiProperties;
-
-public class StandardHttpResponseMerger implements HttpResponseMerger {
-
- private Logger logger = LoggerFactory.getLogger(StandardHttpResponseMerger.class);
-
- private final List<EndpointResponseMerger> endpointMergers = new ArrayList<>();
-
- public StandardHttpResponseMerger(final NiFiProperties nifiProperties) {
- final String snapshotFrequency = nifiProperties.getProperty(NiFiProperties.COMPONENT_STATUS_SNAPSHOT_FREQUENCY, NiFiProperties.DEFAULT_COMPONENT_STATUS_SNAPSHOT_FREQUENCY);
- long snapshotMillis;
- try {
- snapshotMillis = FormatUtils.getTimeDuration(snapshotFrequency, TimeUnit.MILLISECONDS);
- } catch (final Exception e) {
- snapshotMillis = FormatUtils.getTimeDuration(NiFiProperties.DEFAULT_COMPONENT_STATUS_SNAPSHOT_FREQUENCY, TimeUnit.MILLISECONDS);
- }
- endpointMergers.add(new ControllerStatusEndpointMerger());
- endpointMergers.add(new ControllerBulletinsEndpointMerger());
- endpointMergers.add(new GroupStatusEndpointMerger());
- endpointMergers.add(new ProcessorStatusEndpointMerger());
- endpointMergers.add(new ConnectionStatusEndpointMerger());
- endpointMergers.add(new PortStatusEndpointMerger());
- endpointMergers.add(new RemoteProcessGroupStatusEndpointMerger());
- endpointMergers.add(new ProcessorEndpointMerger());
- endpointMergers.add(new ProcessorsEndpointMerger());
- endpointMergers.add(new ConnectionEndpointMerger());
- endpointMergers.add(new ConnectionsEndpointMerger());
- endpointMergers.add(new PortEndpointMerger());
- endpointMergers.add(new InputPortsEndpointMerger());
- endpointMergers.add(new OutputPortsEndpointMerger());
- endpointMergers.add(new RemoteProcessGroupEndpointMerger());
- endpointMergers.add(new RemoteProcessGroupsEndpointMerger());
- endpointMergers.add(new ProcessGroupEndpointMerger());
- endpointMergers.add(new ProcessGroupsEndpointMerger());
- endpointMergers.add(new FlowSnippetEndpointMerger());
- endpointMergers.add(new ProvenanceQueryEndpointMerger());
- endpointMergers.add(new ProvenanceEventEndpointMerger());
- endpointMergers.add(new ControllerServiceEndpointMerger());
- endpointMergers.add(new ControllerServicesEndpointMerger());
- endpointMergers.add(new ControllerServiceReferenceEndpointMerger());
- endpointMergers.add(new ReportingTaskEndpointMerger());
- endpointMergers.add(new ReportingTasksEndpointMerger());
- endpointMergers.add(new DropRequestEndpointMerger());
- endpointMergers.add(new ListFlowFilesEndpointMerger());
- endpointMergers.add(new ComponentStateEndpointMerger());
- endpointMergers.add(new BulletinBoardEndpointMerger());
- endpointMergers.add(new StatusHistoryEndpointMerger(snapshotMillis));
- endpointMergers.add(new SystemDiagnosticsEndpointMerger());
- endpointMergers.add(new CountersEndpointMerger());
- endpointMergers.add(new FlowMerger());
- endpointMergers.add(new ControllerConfigurationEndpointMerger());
- endpointMergers.add(new CurrentUserEndpointMerger());
- endpointMergers.add(new FlowConfigurationEndpointMerger());
- endpointMergers.add(new TemplatesEndpointMerger());
- endpointMergers.add(new LabelEndpointMerger());
- endpointMergers.add(new LabelsEndpointMerger());
- endpointMergers.add(new FunnelEndpointMerger());
- endpointMergers.add(new FunnelsEndpointMerger());
- endpointMergers.add(new ControllerEndpointMerger());
- }
-
- @Override
- public NodeResponse mergeResponses(final URI uri, final String httpMethod, final Set<NodeResponse> nodeResponses) {
- if (nodeResponses.size() == 1) {
- return nodeResponses.iterator().next();
- }
-
- final boolean hasSuccess = hasSuccessfulResponse(nodeResponses);
- if (!hasSuccess) {
- // If we have a response that is a 3xx, 4xx, or 5xx, then we want to choose that.
- // Otherwise, it doesn't matter which one we choose. We do this because if we replicate
- // a mutable request, it's possible that one node will respond with a 409, for instance, while
- // others respond with a 150-Continue. We do not want to pick the 150-Continue; instead, we want
- // the failed response.
- final NodeResponse clientResponse = nodeResponses.stream().filter(p -> p.getStatus() > 299).findAny().orElse(nodeResponses.iterator().next());
-
- // Drain the response from all nodes except for the 'chosen one'. This ensures that we don't
- // leave data lingering on the socket and ensures that we don't consume the content of the response
- // that we intend to respond with
- drainResponses(nodeResponses, clientResponse);
- return clientResponse;
- }
-
- // Determine which responses are successful
- final Set<NodeResponse> successResponses = nodeResponses.stream().filter(p -> p.is2xx()).collect(Collectors.toSet());
- final Set<NodeResponse> problematicResponses = nodeResponses.stream().filter(p -> !p.is2xx()).collect(Collectors.toSet());
-
- final NodeResponse clientResponse;
- if ("GET".equalsIgnoreCase(httpMethod) && problematicResponses.size() > 0) {
- // If there are problematic responses, at least one of the nodes couldn't complete the request
- clientResponse = problematicResponses.stream().filter(p -> p.getStatus() >= 400 && p.getStatus() < 500).findFirst().orElse(
- problematicResponses.stream().filter(p -> p.getStatus() > 500).findFirst().orElse(problematicResponses.iterator().next()));
- return clientResponse;
- } else {
- // Choose any of the successful responses to be the 'chosen one'.
- clientResponse = successResponses.iterator().next();
- }
- EndpointResponseMerger merger = getEndpointResponseMerger(uri, httpMethod);
- if (merger == null) {
- return clientResponse;
- }
-
- final NodeResponse response = merger.merge(uri, httpMethod, successResponses, problematicResponses, clientResponse);
- return response;
- }
-
- @Override
- public Set<NodeResponse> getProblematicNodeResponses(final Set<NodeResponse> allResponses) {
- // Check if there are any 2xx responses
- final boolean containsSuccessfulResponse = hasSuccessfulResponse(allResponses);
-
- if (containsSuccessfulResponse) {
- // If there is a 2xx response, we consider a response to be problematic if it is not 2xx
- return allResponses.stream().filter(p -> !p.is2xx()).collect(Collectors.toSet());
- } else {
- // If no node is successful, we consider a problematic response to be only those that are 5xx
- return allResponses.stream().filter(p -> p.is5xx()).collect(Collectors.toSet());
- }
- }
-
- @Override
- public boolean isResponseInterpreted(final URI uri, final String httpMethod) {
- return getEndpointResponseMerger(uri, httpMethod) != null;
- }
-
- private EndpointResponseMerger getEndpointResponseMerger(final URI uri, final String httpMethod) {
- return endpointMergers.stream().filter(p -> p.canHandle(uri, httpMethod)).findFirst().orElse(null);
- }
-
- private boolean hasSuccessfulResponse(final Set<NodeResponse> allResponses) {
- return allResponses.stream().anyMatch(p -> p.is2xx());
- }
-
- private void drainResponses(final Set<NodeResponse> responses, final NodeResponse exclude) {
- responses.stream()
- .parallel() // "parallelize" the draining of the responses, since we have multiple streams to consume
- .filter(response -> response != exclude) // don't include the explicitly excluded node
- .filter(response -> response.getStatus() != RequestReplicator.NODE_CONTINUE_STATUS_CODE) // don't include any 150-NodeContinue responses because they contain no content
- .forEach(response -> drainResponse(response)); // drain all node responses that didn't get filtered out
- }
-
- private void drainResponse(final NodeResponse response) {
- if (response.hasThrowable()) {
- return;
- }
-
- try {
- ((StreamingOutput) response.getResponse().getEntity()).write(new NullOutputStream());
- } catch (final IOException ioe) {
- logger.info("Failed clearing out non-client response buffer from " + response.getNodeId() + " due to: " + ioe, ioe);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/nifi/blob/bb6c5d9d/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/StandardAsyncClusterResponse.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/StandardAsyncClusterResponse.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/StandardAsyncClusterResponse.java
index 3bcc8e7..926151e 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/StandardAsyncClusterResponse.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/StandardAsyncClusterResponse.java
@@ -17,6 +17,12 @@
package org.apache.nifi.cluster.coordination.http.replication;
+import org.apache.nifi.cluster.coordination.http.HttpResponseMapper;
+import org.apache.nifi.cluster.manager.NodeResponse;
+import org.apache.nifi.cluster.protocol.NodeIdentifier;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import java.net.URI;
import java.util.Collections;
import java.util.HashMap;
@@ -27,12 +33,6 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
-import org.apache.nifi.cluster.coordination.http.HttpResponseMerger;
-import org.apache.nifi.cluster.manager.NodeResponse;
-import org.apache.nifi.cluster.protocol.NodeIdentifier;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
public class StandardAsyncClusterResponse implements AsyncClusterResponse {
private static final Logger logger = LoggerFactory.getLogger(StandardAsyncClusterResponse.class);
@@ -40,10 +40,11 @@ public class StandardAsyncClusterResponse implements AsyncClusterResponse {
private final Set<NodeIdentifier> nodeIds;
private final URI uri;
private final String method;
- private final HttpResponseMerger responseMerger;
+ private final HttpResponseMapper responseMapper;
private final CompletionCallback completionCallback;
private final Runnable completedResultFetchedCallback;
private final long creationTimeNanos;
+ private final boolean merge;
private final Map<NodeIdentifier, ResponseHolder> responseMap = new HashMap<>();
private final AtomicInteger requestsCompleted = new AtomicInteger(0);
@@ -52,18 +53,19 @@ public class StandardAsyncClusterResponse implements AsyncClusterResponse {
private RuntimeException failure; // guarded by synchronizing on this
public StandardAsyncClusterResponse(final String id, final URI uri, final String method, final Set<NodeIdentifier> nodeIds,
- final HttpResponseMerger responseMerger, final CompletionCallback completionCallback, final Runnable completedResultFetchedCallback) {
+ final HttpResponseMapper responseMapper, final CompletionCallback completionCallback, final Runnable completedResultFetchedCallback, final boolean merge) {
this.id = id;
this.nodeIds = Collections.unmodifiableSet(new HashSet<>(nodeIds));
this.uri = uri;
this.method = method;
+ this.merge = merge;
creationTimeNanos = System.nanoTime();
for (final NodeIdentifier nodeId : nodeIds) {
responseMap.put(nodeId, new ResponseHolder(creationTimeNanos));
}
- this.responseMerger = responseMerger;
+ this.responseMapper = responseMapper;
this.completionCallback = completionCallback;
this.completedResultFetchedCallback = completedResultFetchedCallback;
}
@@ -142,7 +144,7 @@ public class StandardAsyncClusterResponse implements AsyncClusterResponse {
.map(p -> p.getResponse())
.filter(response -> response != null)
.collect(Collectors.toSet());
- mergedResponse = responseMerger.mergeResponses(uri, method, nodeResponses);
+ mergedResponse = responseMapper.mapResponses(uri, method, nodeResponses, merge);
logger.debug("Notifying all that merged response is complete for {}", id);
this.notifyAll();
http://git-wip-us.apache.org/repos/asf/nifi/blob/bb6c5d9d/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/ThreadPoolRequestReplicator.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/ThreadPoolRequestReplicator.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/ThreadPoolRequestReplicator.java
index c1ee77b..258588d 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/ThreadPoolRequestReplicator.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/ThreadPoolRequestReplicator.java
@@ -27,8 +27,8 @@ import org.apache.nifi.authorization.AccessDeniedException;
import org.apache.nifi.authorization.user.NiFiUser;
import org.apache.nifi.authorization.user.NiFiUserUtils;
import org.apache.nifi.cluster.coordination.ClusterCoordinator;
-import org.apache.nifi.cluster.coordination.http.HttpResponseMerger;
-import org.apache.nifi.cluster.coordination.http.StandardHttpResponseMerger;
+import org.apache.nifi.cluster.coordination.http.HttpResponseMapper;
+import org.apache.nifi.cluster.coordination.http.StandardHttpResponseMapper;
import org.apache.nifi.cluster.coordination.node.NodeConnectionState;
import org.apache.nifi.cluster.coordination.node.NodeConnectionStatus;
import org.apache.nifi.cluster.manager.NodeResponse;
@@ -85,7 +85,7 @@ public class ThreadPoolRequestReplicator implements RequestReplicator {
private final Client client; // the client to use for issuing requests
private final int connectionTimeoutMs; // connection timeout per node request
private final int readTimeoutMs; // read timeout per node request
- private final HttpResponseMerger responseMerger;
+ private final HttpResponseMapper responseMapper;
private final EventReporter eventReporter;
private final RequestCompletionCallback callback;
private final ClusterCoordinator clusterCoordinator;
@@ -140,7 +140,7 @@ public class ThreadPoolRequestReplicator implements RequestReplicator {
this.clusterCoordinator = clusterCoordinator;
this.connectionTimeoutMs = (int) FormatUtils.getTimeDuration(connectionTimeout, TimeUnit.MILLISECONDS);
this.readTimeoutMs = (int) FormatUtils.getTimeDuration(readTimeout, TimeUnit.MILLISECONDS);
- this.responseMerger = new StandardHttpResponseMerger(nifiProperties);
+ this.responseMapper = new StandardHttpResponseMapper(nifiProperties);
this.eventReporter = eventReporter;
this.callback = callback;
@@ -249,12 +249,12 @@ public class ThreadPoolRequestReplicator implements RequestReplicator {
lock.lock();
try {
logger.debug("Lock {} obtained in order to replicate request {} {}", method, uri);
- return replicate(nodeIds, method, uri, entity, updatedHeaders, performVerification, null, !performVerification);
+ return replicate(nodeIds, method, uri, entity, updatedHeaders, performVerification, null, !performVerification, true);
} finally {
lock.unlock();
}
} else {
- return replicate(nodeIds, method, uri, entity, updatedHeaders, performVerification, null, !performVerification);
+ return replicate(nodeIds, method, uri, entity, updatedHeaders, performVerification, null, !performVerification, true);
}
}
@@ -269,7 +269,7 @@ public class ThreadPoolRequestReplicator implements RequestReplicator {
updatedHeaders.put(ProxiedEntitiesUtils.PROXY_ENTITIES_CHAIN, proxiedEntitiesChain);
}
- return replicate(Collections.singleton(coordinatorNodeId), method, uri, entity, updatedHeaders, false, null, false);
+ return replicate(Collections.singleton(coordinatorNodeId), method, uri, entity, updatedHeaders, false, null, false, false);
}
/**
@@ -286,7 +286,7 @@ public class ThreadPoolRequestReplicator implements RequestReplicator {
* @return an AsyncClusterResponse that can be used to obtain the result
*/
private AsyncClusterResponse replicate(Set<NodeIdentifier> nodeIds, String method, URI uri, Object entity, Map<String, String> headers, boolean performVerification,
- StandardAsyncClusterResponse response, boolean executionPhase) {
+ StandardAsyncClusterResponse response, boolean executionPhase, boolean merge) {
// state validation
Objects.requireNonNull(nodeIds);
@@ -344,7 +344,7 @@ public class ThreadPoolRequestReplicator implements RequestReplicator {
// create a response object if one was not already passed to us
if (response == null) {
response = new StandardAsyncClusterResponse(requestId, uri, method, nodeIds,
- responseMerger, completionCallback, responseConsumedCallback);
+ responseMapper, completionCallback, responseConsumedCallback, merge);
responseMap.put(requestId, response);
}
@@ -358,7 +358,7 @@ public class ThreadPoolRequestReplicator implements RequestReplicator {
final boolean mutableRequest = isMutableRequest(method, uri.getPath());
if (mutableRequest && performVerification) {
logger.debug("Performing verification (first phase of two-phase commit) for Request ID {}", requestId);
- performVerification(nodeIds, method, uri, entity, updatedHeaders, response);
+ performVerification(nodeIds, method, uri, entity, updatedHeaders, response, merge);
return response;
}
@@ -383,7 +383,7 @@ public class ThreadPoolRequestReplicator implements RequestReplicator {
}
- private void performVerification(Set<NodeIdentifier> nodeIds, String method, URI uri, Object entity, Map<String, String> headers, StandardAsyncClusterResponse clusterResponse) {
+ private void performVerification(Set<NodeIdentifier> nodeIds, String method, URI uri, Object entity, Map<String, String> headers, StandardAsyncClusterResponse clusterResponse, boolean merge) {
logger.debug("Verifying that mutable request {} {} can be made", method, uri.getPath());
final Map<String, String> validationHeaders = new HashMap<>(headers);
@@ -418,7 +418,7 @@ public class ThreadPoolRequestReplicator implements RequestReplicator {
// to all nodes and we are finished.
if (dissentingCount == 0) {
logger.debug("Received verification from all {} nodes that mutable request {} {} can be made", numNodes, method, uri.getPath());
- replicate(nodeIds, method, uri, entity, headers, false, clusterResponse, true);
+ replicate(nodeIds, method, uri, entity, headers, false, clusterResponse, true, merge);
return;
}
@@ -743,7 +743,7 @@ public class ThreadPoolRequestReplicator implements RequestReplicator {
// create the resource
WebResource resource = client.resource(uri);
- if (responseMerger.isResponseInterpreted(uri, method)) {
+ if (responseMapper.isResponseInterpreted(uri, method)) {
resource.addFilter(new GZIPContentEncodingFilter(false));
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/bb6c5d9d/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/node/NodeClusterCoordinator.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/node/NodeClusterCoordinator.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/node/NodeClusterCoordinator.java
index 4b74e1b..c27f186 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/node/NodeClusterCoordinator.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/node/NodeClusterCoordinator.java
@@ -16,27 +16,12 @@
*/
package org.apache.nifi.cluster.coordination.node;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.UUID;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.function.Supplier;
-import java.util.regex.Pattern;
-import java.util.stream.Collectors;
import org.apache.commons.collections4.queue.CircularFifoQueue;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.cluster.coordination.ClusterCoordinator;
import org.apache.nifi.cluster.coordination.flow.FlowElection;
-import org.apache.nifi.cluster.coordination.http.HttpResponseMerger;
-import org.apache.nifi.cluster.coordination.http.StandardHttpResponseMerger;
+import org.apache.nifi.cluster.coordination.http.HttpResponseMapper;
+import org.apache.nifi.cluster.coordination.http.StandardHttpResponseMapper;
import org.apache.nifi.cluster.coordination.http.replication.RequestCompletionCallback;
import org.apache.nifi.cluster.event.Event;
import org.apache.nifi.cluster.event.NodeEvent;
@@ -73,6 +58,22 @@ import org.apache.nifi.web.revision.RevisionManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.Supplier;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+
public class NodeClusterCoordinator implements ClusterCoordinator, ProtocolHandler, RequestCompletionCallback {
private static final Logger logger = LoggerFactory.getLogger(NodeClusterCoordinator.class);
@@ -974,7 +975,7 @@ public class NodeClusterCoordinator implements ClusterCoordinator, ProtocolHandl
* state even if they had problems handling the request.
*/
if (mutableRequest) {
- final HttpResponseMerger responseMerger = new StandardHttpResponseMerger(nifiProperties);
+ final HttpResponseMapper responseMerger = new StandardHttpResponseMapper(nifiProperties);
final Set<NodeResponse> problematicNodeResponses = responseMerger.getProblematicNodeResponses(nodeResponses);
// all nodes failed
http://git-wip-us.apache.org/repos/asf/nifi/blob/bb6c5d9d/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/PropertyDescriptorDtoMerger.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/PropertyDescriptorDtoMerger.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/PropertyDescriptorDtoMerger.java
index e7ab881..3c18ced 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/PropertyDescriptorDtoMerger.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/PropertyDescriptorDtoMerger.java
@@ -33,7 +33,7 @@ public class PropertyDescriptorDtoMerger {
for (final Map.Entry<NodeIdentifier, PropertyDescriptorDTO> nodeEntry : dtoMap.entrySet()) {
final PropertyDescriptorDTO nodePropertyDescriptor = nodeEntry.getValue();
final List<AllowableValueEntity> nodePropertyDescriptorAllowableValues = nodePropertyDescriptor.getAllowableValues();
- if (clientPropertyDescriptor != nodePropertyDescriptor && nodePropertyDescriptorAllowableValues != null) {
+ if (nodePropertyDescriptorAllowableValues != null) {
nodePropertyDescriptorAllowableValues.stream().forEach(allowableValueEntity -> {
allowableValueMap.computeIfAbsent(nodePropertyDescriptorAllowableValues.indexOf(allowableValueEntity), propertyDescriptorToAllowableValue -> new ArrayList<>())
.add(allowableValueEntity);
http://git-wip-us.apache.org/repos/asf/nifi/blob/bb6c5d9d/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/http/StandardHttpResponseMapperSpec.groovy
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/http/StandardHttpResponseMapperSpec.groovy b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/http/StandardHttpResponseMapperSpec.groovy
new file mode 100644
index 0000000..243fd1a
--- /dev/null
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/http/StandardHttpResponseMapperSpec.groovy
@@ -0,0 +1,217 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.cluster.coordination.http
+
+import com.sun.jersey.api.client.ClientResponse
+import org.apache.nifi.cluster.manager.NodeResponse
+import org.apache.nifi.cluster.protocol.NodeIdentifier
+import org.apache.nifi.util.NiFiProperties
+import org.apache.nifi.web.api.dto.ConnectionDTO
+import org.apache.nifi.web.api.dto.ControllerConfigurationDTO
+import org.apache.nifi.web.api.dto.FunnelDTO
+import org.apache.nifi.web.api.dto.LabelDTO
+import org.apache.nifi.web.api.dto.PermissionsDTO
+import org.apache.nifi.web.api.dto.status.ConnectionStatusDTO
+import org.apache.nifi.web.api.dto.status.ConnectionStatusSnapshotDTO
+import org.apache.nifi.web.api.entity.ConnectionEntity
+import org.apache.nifi.web.api.entity.ConnectionsEntity
+import org.apache.nifi.web.api.entity.ControllerConfigurationEntity
+import org.apache.nifi.web.api.entity.FunnelEntity
+import org.apache.nifi.web.api.entity.FunnelsEntity
+import org.apache.nifi.web.api.entity.LabelEntity
+import org.apache.nifi.web.api.entity.LabelsEntity
+import org.codehaus.jackson.map.ObjectMapper
+import org.codehaus.jackson.map.SerializationConfig
+import org.codehaus.jackson.map.annotate.JsonSerialize
+import org.codehaus.jackson.xc.JaxbAnnotationIntrospector
+import spock.lang.Specification
+import spock.lang.Unroll
+
+@Unroll
+class StandardHttpResponseMapperSpec extends Specification {
+
+ def setup() {
+ def propFile = StandardHttpResponseMapperSpec.class.getResource("/conf/nifi.properties").getFile()
+ System.setProperty NiFiProperties.PROPERTIES_FILE_PATH, propFile
+ }
+
+ def cleanup() {
+ System.clearProperty NiFiProperties.PROPERTIES_FILE_PATH
+ }
+
+ def "MergeResponses: mixed HTTP GET response statuses, expecting #expectedStatus"() {
+ given:
+ def responseMapper = new StandardHttpResponseMapper(NiFiProperties.createBasicNiFiProperties(null,null))
+ def requestUri = new URI('http://server/resource')
+ def requestId = UUID.randomUUID().toString()
+ def Map<ClientResponse, Map<String, Integer>> mockToRequestEntity = [:]
+ def nodeResponseSet = nodeResponseData.collect {
+ int n = it.node
+ def clientResponse = Mock(ClientResponse)
+ mockToRequestEntity.put clientResponse, it
+ new NodeResponse(new NodeIdentifier("cluster-node-$n", 'addr', n, 'sktaddr', n * 10, 'stsaddr', n * 100, n * 1000, false, null), "get", requestUri, clientResponse, 500L, requestId)
+ } as Set
+
+ when:
+ def returnedResponse = responseMapper.mapResponses(requestUri, 'get', nodeResponseSet, true).getStatus()
+
+ then:
+ mockToRequestEntity.entrySet().forEach {
+ ClientResponse mockClientResponse = it.key
+ _ * mockClientResponse.getStatus() >> it.value.status
+ }
+ 0 * _
+ returnedResponse == expectedStatus
+
+ where:
+ nodeResponseData || expectedStatus
+ [[node: 1, status: 200], [node: 2, status: 200], [node: 3, status: 401]] as Set || 401
+ [[node: 1, status: 200], [node: 2, status: 200], [node: 3, status: 403]] as Set || 403
+ [[node: 1, status: 200], [node: 2, status: 403], [node: 3, status: 500]] as Set || 403
+ [[node: 1, status: 200], [node: 2, status: 200], [node: 3, status: 500]] as Set || 500
+ }
+
+ def "MergeResponses: #responseEntities.size() HTTP 200 #httpMethod responses for #requestUriPart"() {
+ given: "json serialization setup"
+ def mapper = new ObjectMapper();
+ def jaxbIntrospector = new JaxbAnnotationIntrospector();
+ def SerializationConfig serializationConfig = mapper.getSerializationConfig();
+ mapper.setSerializationConfig(serializationConfig.withSerializationInclusion(JsonSerialize.Inclusion.NON_NULL).withAnnotationIntrospector(jaxbIntrospector));
+
+ and: "setup of the data to be used in the test"
+ def responseMerger = new StandardHttpResponseMapper(NiFiProperties.createBasicNiFiProperties(null,null))
+ def requestUri = new URI("http://server/$requestUriPart")
+ def requestId = UUID.randomUUID().toString()
+ def Map<ClientResponse, Object> mockToRequestEntity = [:]
+ def n = 0
+ def nodeResponseSet = responseEntities.collect {
+ ++n
+ def clientResponse = Mock(ClientResponse)
+ mockToRequestEntity.put clientResponse, it
+ new NodeResponse(new NodeIdentifier("cluster-node-$n", 'addr', n, 'sktaddr', n * 10, 'stsaddr', n * 100, n * 1000, false, null), "get", requestUri, clientResponse, 500L, requestId)
+ } as Set
+
+ when:
+ def returnedResponse = responseMerger.mapResponses(requestUri, httpMethod, nodeResponseSet, true)
+
+ then:
+ mockToRequestEntity.entrySet().forEach {
+ ClientResponse mockClientResponse = it.key
+ def entity = it.value
+ _ * mockClientResponse.getStatus() >> 200
+ 1 * mockClientResponse.getEntity(_) >> entity
+ }
+ responseEntities.size() == mockToRequestEntity.size()
+ 0 * _
+ def returnedJson = mapper.writeValueAsString(returnedResponse.getUpdatedEntity())
+ def expectedJson = mapper.writeValueAsString(expectedEntity)
+ returnedJson == expectedJson
+
+ where:
+ requestUriPart | httpMethod | responseEntities ||
+ expectedEntity
+ 'nifi-api/controller/config' | 'get' | [
+ new ControllerConfigurationEntity(permissions: new PermissionsDTO(canRead: true, canWrite: true),
+ component: new ControllerConfigurationDTO(maxEventDrivenThreadCount: 10, maxTimerDrivenThreadCount: 10)),
+ new ControllerConfigurationEntity(permissions: new PermissionsDTO(canRead: true, canWrite: false),
+ component: new ControllerConfigurationDTO(maxEventDrivenThreadCount: 10, maxTimerDrivenThreadCount: 10)),
+ new ControllerConfigurationEntity(permissions: new PermissionsDTO(canRead: true, canWrite: true),
+ component: new ControllerConfigurationDTO(maxEventDrivenThreadCount: 10, maxTimerDrivenThreadCount: 10))] ||
+ // expectedEntity
+ new ControllerConfigurationEntity(permissions: new PermissionsDTO(canRead: true, canWrite: false),
+ component: new ControllerConfigurationDTO(maxEventDrivenThreadCount: 10, maxTimerDrivenThreadCount: 10))
+ 'nifi-api/controller/config' | 'put' | [
+ new ControllerConfigurationEntity(permissions: new PermissionsDTO(canRead: true, canWrite: true),
+ component: new ControllerConfigurationDTO(maxEventDrivenThreadCount: 10, maxTimerDrivenThreadCount: 10)),
+ new ControllerConfigurationEntity(permissions: new PermissionsDTO(canRead: true, canWrite: false),
+ component: new ControllerConfigurationDTO(maxEventDrivenThreadCount: 10, maxTimerDrivenThreadCount: 10)),
+ new ControllerConfigurationEntity(permissions: new PermissionsDTO(canRead: true, canWrite: true),
+ component: new ControllerConfigurationDTO(maxEventDrivenThreadCount: 10, maxTimerDrivenThreadCount: 10))] ||
+ // expectedEntity
+ new ControllerConfigurationEntity(permissions: new PermissionsDTO(canRead: true, canWrite: false),
+ component: new ControllerConfigurationDTO(maxEventDrivenThreadCount: 10, maxTimerDrivenThreadCount: 10))
+ "nifi-api/process-groups/${UUID.randomUUID()}/connections" | 'get' | [
+ new ConnectionsEntity(connections: [new ConnectionEntity(id: '1', permissions: new PermissionsDTO(canRead: true, canWrite: true), status: new
+ ConnectionStatusDTO(aggregateSnapshot: new ConnectionStatusSnapshotDTO(bytesIn: 300)), component: new ConnectionDTO())] as Set),
+ new ConnectionsEntity(connections: [new ConnectionEntity(id: '1', permissions: new PermissionsDTO(canRead: false, canWrite: false), status: new
+ ConnectionStatusDTO(aggregateSnapshot: new ConnectionStatusSnapshotDTO(bytesIn: 100)))] as Set),
+ new ConnectionsEntity(connections: [new ConnectionEntity(id: '1', permissions: new PermissionsDTO(canRead: true, canWrite: true), status: new
+ ConnectionStatusDTO(aggregateSnapshot: new ConnectionStatusSnapshotDTO(bytesIn: 500)), component: new ConnectionDTO())] as Set)] ||
+ // expectedEntity
+ new ConnectionsEntity(connections: [new ConnectionEntity(id: '1', permissions: new PermissionsDTO(canRead: false, canWrite: false),
+ status: new ConnectionStatusDTO(aggregateSnapshot: new ConnectionStatusSnapshotDTO(bytesIn: 900,
+ input: '0 (900 bytes)', output: '0 (0 bytes)', queued: '0 (0 bytes)', queuedSize: '0 bytes', queuedCount: 0)))] as Set)
+ "nifi-api/process-groups/${UUID.randomUUID()}/connections" | 'post' | [
+ new ConnectionEntity(id: '1', permissions: new PermissionsDTO(canRead: true, canWrite: true), status:
+ new ConnectionStatusDTO(aggregateSnapshot: new ConnectionStatusSnapshotDTO(bytesIn: 300)), component: new ConnectionDTO()),
+ new ConnectionEntity(id: '1', permissions: new PermissionsDTO(canRead: false, canWrite: false), status:
+ new ConnectionStatusDTO(aggregateSnapshot: new ConnectionStatusSnapshotDTO(bytesIn: 300))),
+ new ConnectionEntity(id: '1', permissions: new PermissionsDTO(canRead: true, canWrite: true), status:
+ new ConnectionStatusDTO(aggregateSnapshot: new ConnectionStatusSnapshotDTO(bytesIn: 300)), component: new ConnectionDTO())] ||
+ // expectedEntity
+ new ConnectionEntity(id: '1', permissions: new PermissionsDTO(canRead: false, canWrite: false),
+ status: new ConnectionStatusDTO(aggregateSnapshot: new ConnectionStatusSnapshotDTO(bytesIn: 900, input: '0 (900 bytes)',
+ output: '0 (0 bytes)', queued: '0 (0 bytes)', queuedSize: '0 bytes', queuedCount: 0)))
+ "nifi-api/connections/${UUID.randomUUID()}" | 'get' | [
+ new ConnectionEntity(id: '1', permissions: new PermissionsDTO(canRead: true, canWrite: true), status:
+ new ConnectionStatusDTO(aggregateSnapshot: new ConnectionStatusSnapshotDTO(bytesIn: 400)), component: new ConnectionDTO()),
+ new ConnectionEntity(id: '1', permissions: new PermissionsDTO(canRead: false, canWrite: false), status:
+ new ConnectionStatusDTO(aggregateSnapshot: new ConnectionStatusSnapshotDTO(bytesIn: 300))),
+ new ConnectionEntity(id: '1', permissions: new PermissionsDTO(canRead: true, canWrite: true), status:
+ new ConnectionStatusDTO(aggregateSnapshot: new ConnectionStatusSnapshotDTO(bytesIn: 300)), component: new ConnectionDTO())] ||
+ // expectedEntity
+ new ConnectionEntity(id: '1', permissions: new PermissionsDTO(canRead: false, canWrite: false),
+ status: new ConnectionStatusDTO(aggregateSnapshot: new ConnectionStatusSnapshotDTO(bytesIn: 1000,
+ input: '0 (1,000 bytes)', output: '0 (0 bytes)', queued: '0 (0 bytes)', queuedSize: '0 bytes', queuedCount: 0)))
+ "nifi-api/process-groups/${UUID.randomUUID()}/labels" | 'get' | [
+ new LabelsEntity(labels: [new LabelEntity(id: '1', permissions: new PermissionsDTO(canRead: true, canWrite: true), component: new LabelDTO())] as Set),
+ new LabelsEntity(labels: [new LabelEntity(id: '1', permissions: new PermissionsDTO(canRead: false, canWrite: false))] as Set),
+ new LabelsEntity(labels: [new LabelEntity(id: '1', permissions: new PermissionsDTO(canRead: true, canWrite: true), component: new LabelDTO())] as Set)] ||
+ // expectedEntity
+ new LabelsEntity(labels: [new LabelEntity(id: '1', permissions: new PermissionsDTO(canRead: false, canWrite: false))] as Set)
+ "nifi-api/process-groups/${UUID.randomUUID()}/labels" | 'post' | [
+ new LabelEntity(id: '1', permissions: new PermissionsDTO(canRead: true, canWrite: true), component: new LabelDTO()),
+ new LabelEntity(id: '1', permissions: new PermissionsDTO(canRead: false, canWrite: false)),
+ new LabelEntity(id: '1', permissions: new PermissionsDTO(canRead: true, canWrite: true), component: new LabelDTO())] ||
+ // expectedEntity
+ new LabelEntity(id: '1', permissions: new PermissionsDTO(canRead: false, canWrite: false))
+ "nifi-api/labels/${UUID.randomUUID()}" | 'get' | [
+ new LabelEntity(id: '1', permissions: new PermissionsDTO(canRead: true, canWrite: true), component: new LabelDTO()),
+ new LabelEntity(id: '1', permissions: new PermissionsDTO(canRead: false, canWrite: false)),
+ new LabelEntity(id: '1', permissions: new PermissionsDTO(canRead: true, canWrite: true), component: new LabelDTO())] ||
+ // expectedEntity
+ new LabelEntity(id: '1', permissions: new PermissionsDTO(canRead: false, canWrite: false))
+ "nifi-api/process-groups/${UUID.randomUUID()}/funnels" | 'get' | [
+ new FunnelsEntity(funnels: [new FunnelEntity(id: '1', permissions: new PermissionsDTO(canRead: true, canWrite: true), component: new FunnelDTO())] as Set),
+ new FunnelsEntity(funnels: [new FunnelEntity(id: '1', permissions: new PermissionsDTO(canRead: false, canWrite: false))] as Set),
+ new FunnelsEntity(funnels: [new FunnelEntity(id: '1', permissions: new PermissionsDTO(canRead: true, canWrite: true), component: new FunnelDTO())] as Set)] ||
+ // expectedEntity
+ new FunnelsEntity(funnels: [new FunnelEntity(id: '1', permissions: new PermissionsDTO(canRead: false, canWrite: false))] as Set)
+ "nifi-api/process-groups/${UUID.randomUUID()}/funnels" | 'post' | [
+ new FunnelEntity(id: '1', permissions: new PermissionsDTO(canRead: true, canWrite: true), component: new FunnelDTO()),
+ new FunnelEntity(id: '1', permissions: new PermissionsDTO(canRead: false, canWrite: false)),
+ new FunnelEntity(id: '1', permissions: new PermissionsDTO(canRead: true, canWrite: true), component: new FunnelDTO())] ||
+ // expectedEntity
+ new LabelEntity(id: '1', permissions: new PermissionsDTO(canRead: false, canWrite: false))
+ "nifi-api/funnels/${UUID.randomUUID()}" | 'get' | [
+ new FunnelEntity(id: '1', permissions: new PermissionsDTO(canRead: true, canWrite: true), component: new FunnelDTO()),
+ new FunnelEntity(id: '1', permissions: new PermissionsDTO(canRead: false, canWrite: false)),
+ new FunnelEntity(id: '1', permissions: new PermissionsDTO(canRead: true, canWrite: true), component: new FunnelDTO())] ||
+ // expectedEntity
+ new FunnelEntity(id: '1', permissions: new PermissionsDTO(canRead: false, canWrite: false))
+ }
+}