You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@nifi.apache.org by markap14 <gi...@git.apache.org> on 2016/04/08 14:19:59 UTC

[GitHub] nifi pull request: NIFI-1727: Refactored logic for merging HTTP Re...

GitHub user markap14 opened a pull request:

    https://github.com/apache/nifi/pull/338

    NIFI-1727: Refactored logic for merging HTTP Requests

    Refactored logic for merging HTTP Requests that are federated across cluster

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/markap14/nifi NIFI-1727

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/nifi/pull/338.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #338
    
----
commit 42d141fe5c5bc438e3cbd8d30c9d13f1b9e5e25b
Author: Mark Payne <ma...@hotmail.com>
Date:   2016-04-08T12:17:11Z

    NIFI-1727: Refactored logic for merging HTTP Requests that are federated across cluster

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request: NIFI-1727: Refactored logic for merging HTTP Re...

Posted by olegz <gi...@git.apache.org>.
Github user olegz commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/338#discussion_r59989219
  
    --- Diff: nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/ControllerServiceEndpointMerger.java ---
    @@ -0,0 +1,145 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.nifi.cluster.coordination.http.endpoints;
    +
    +import java.net.URI;
    +import java.util.HashMap;
    +import java.util.Map;
    +import java.util.Set;
    +import java.util.regex.Pattern;
    +
    +import org.apache.nifi.cluster.manager.NodeResponse;
    +import org.apache.nifi.cluster.protocol.NodeIdentifier;
    +import org.apache.nifi.controller.service.ControllerServiceState;
    +import org.apache.nifi.web.api.dto.ControllerServiceDTO;
    +import org.apache.nifi.web.api.dto.ControllerServiceReferencingComponentDTO;
    +import org.apache.nifi.web.api.entity.ControllerServiceEntity;
    +
    +public class ControllerServiceEndpointMerger extends AbstractSingleEntityEndpoint<ControllerServiceEntity, ControllerServiceDTO> {
    +    public static final String CONTROLLER_SERVICES_URI = "/nifi-api/controller/controller-services/node";
    +    public static final Pattern CONTROLLER_SERVICE_URI_PATTERN = Pattern.compile("/nifi-api/controller/controller-services/node/[a-f0-9\\-]{36}");
    +
    +    @Override
    +    public boolean canHandle(URI uri, String method) {
    +        if (("GET".equalsIgnoreCase(method) || "PUT".equalsIgnoreCase(method)) && CONTROLLER_SERVICE_URI_PATTERN.matcher(uri.getPath()).matches()) {
    +            return true;
    +        } else if ("POST".equalsIgnoreCase(method) && CONTROLLER_SERVICES_URI.equals(uri.getPath())) {
    +            return true;
    +        }
    +
    +        return false;
    +    }
    +
    +    @Override
    +    protected Class<ControllerServiceEntity> getEntityClass() {
    +        return ControllerServiceEntity.class;
    +    }
    +
    +    @Override
    +    protected ControllerServiceDTO getDto(ControllerServiceEntity entity) {
    +        return entity.getControllerService();
    +    }
    +
    +    @Override
    +    protected void mergeResponses(ControllerServiceDTO clientDto, Map<NodeIdentifier, ControllerServiceDTO> dtoMap, Set<NodeResponse> successfulResponses, Set<NodeResponse> problematicResponses) {
    +        final Map<String, Set<NodeIdentifier>> validationErrorMap = new HashMap<>();
    +        final Set<ControllerServiceReferencingComponentDTO> referencingComponents = clientDto.getReferencingComponents();
    +        final Map<NodeIdentifier, Set<ControllerServiceReferencingComponentDTO>> nodeReferencingComponentsMap = new HashMap<>();
    +
    +        String state = null;
    +        for (final Map.Entry<NodeIdentifier, ControllerServiceDTO> nodeEntry : dtoMap.entrySet()) {
    +            final NodeIdentifier nodeId = nodeEntry.getKey();
    +            final ControllerServiceDTO nodeControllerService = nodeEntry.getValue();
    +
    +            if (state == null) {
    +                if (ControllerServiceState.DISABLING.name().equals(nodeControllerService.getState())) {
    +                    state = ControllerServiceState.DISABLING.name();
    +                } else if (ControllerServiceState.ENABLING.name().equals(nodeControllerService.getState())) {
    +                    state = ControllerServiceState.ENABLING.name();
    +                }
    +            }
    --- End diff --
    
    I am not sure I understand the logic here.
    It seems like you'll set _state_ once based on the state of the first element in the loop. That state is going to be used throughout the rest of the loop, regardless of the state of the next entry. Is that intentional?



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request: NIFI-1727: Refactored logic for merging HTTP Re...

Posted by olegz <gi...@git.apache.org>.
Github user olegz commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/338#discussion_r60088663
  
    --- Diff: nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/ControllerServicesEndpointMerger.java ---
    @@ -0,0 +1,56 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.nifi.cluster.coordination.http.endpoints;
    +
    +import java.net.URI;
    +import java.util.Map;
    +import java.util.Set;
    +
    +import org.apache.nifi.cluster.manager.NodeResponse;
    +import org.apache.nifi.cluster.protocol.NodeIdentifier;
    +import org.apache.nifi.web.api.dto.ControllerServiceDTO;
    +import org.apache.nifi.web.api.entity.ControllerServicesEntity;
    +
    +public class ControllerServicesEndpointMerger extends AbstractMultiEntityEndpoint<ControllerServicesEntity, ControllerServiceDTO> {
    +    public static final String CONTROLLER_SERVICES_URI = "/nifi-api/controller/controller-services/node";
    +
    +    @Override
    +    public boolean canHandle(URI uri, String method) {
    +        return "GET".equalsIgnoreCase(method) && CONTROLLER_SERVICES_URI.equals(uri.getPath());
    +    }
    +
    +    @Override
    +    protected Class<ControllerServicesEntity> getEntityClass() {
    +        return ControllerServicesEntity.class;
    +    }
    +
    +    @Override
    +    protected Set<ControllerServiceDTO> getDtos(ControllerServicesEntity entity) {
    +        return entity.getControllerServices();
    +    }
    +
    +    @Override
    +    protected String getComponentId(ControllerServiceDTO dto) {
    +        return dto.getId();
    +    }
    --- End diff --
    
    Not sure about the design considerations, must be missing something. . ., but the two methods above seem to be simply delegating to public accessors of the instance passed as an argument (e.g., dto.getId(), entity.getControllerServices() etc.). Since the calling class obviously has a reference to such instance, it could invoke those operations directly. Correct?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request: NIFI-1727: Refactored logic for merging HTTP Re...

Posted by olegz <gi...@git.apache.org>.
Github user olegz commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/338#discussion_r59988803
  
    --- Diff: nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/StandardHttpResponseMerger.java ---
    @@ -0,0 +1,164 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.nifi.cluster.coordination.http;
    +
    +import java.io.IOException;
    +import java.net.URI;
    +import java.util.ArrayList;
    +import java.util.List;
    +import java.util.Set;
    +import java.util.stream.Collectors;
    +
    +import javax.ws.rs.core.StreamingOutput;
    +
    +import org.apache.nifi.cluster.coordination.http.endpoints.BulletinBoardEndpointMerger;
    +import org.apache.nifi.cluster.coordination.http.endpoints.ComponentStateEndpointMerger;
    +import org.apache.nifi.cluster.coordination.http.endpoints.ConnectionStatusEndpiontMerger;
    +import org.apache.nifi.cluster.coordination.http.endpoints.ControllerServiceEndpointMerger;
    +import org.apache.nifi.cluster.coordination.http.endpoints.ControllerServiceReferenceEndpointMerger;
    +import org.apache.nifi.cluster.coordination.http.endpoints.ControllerServicesEndpointMerger;
    +import org.apache.nifi.cluster.coordination.http.endpoints.ControllerStatusEndpointMerger;
    +import org.apache.nifi.cluster.coordination.http.endpoints.CountersEndpointMerger;
    +import org.apache.nifi.cluster.coordination.http.endpoints.DropRequestEndpiontMerger;
    +import org.apache.nifi.cluster.coordination.http.endpoints.FlowSnippetEndpointMerger;
    +import org.apache.nifi.cluster.coordination.http.endpoints.GroupStatusEndpointMerger;
    +import org.apache.nifi.cluster.coordination.http.endpoints.ListFlowFilesEndpointMerger;
    +import org.apache.nifi.cluster.coordination.http.endpoints.PortStatusEndpointMerger;
    +import org.apache.nifi.cluster.coordination.http.endpoints.ProcessGroupEndpointMerger;
    +import org.apache.nifi.cluster.coordination.http.endpoints.ProcessorEndpointMerger;
    +import org.apache.nifi.cluster.coordination.http.endpoints.ProcessorStatusEndpointMerger;
    +import org.apache.nifi.cluster.coordination.http.endpoints.ProcessorsEndpointMerger;
    +import org.apache.nifi.cluster.coordination.http.endpoints.ProvenanceEventEndpointMerger;
    +import org.apache.nifi.cluster.coordination.http.endpoints.ProvenanceQueryEndpointMerger;
    +import org.apache.nifi.cluster.coordination.http.endpoints.RemoteProcessGroupEndpointMerger;
    +import org.apache.nifi.cluster.coordination.http.endpoints.RemoteProcessGroupStatusEndpointMerger;
    +import org.apache.nifi.cluster.coordination.http.endpoints.RemoteProcessGroupsEndpointMerger;
    +import org.apache.nifi.cluster.coordination.http.endpoints.ReportingTaskEndpointMerger;
    +import org.apache.nifi.cluster.coordination.http.endpoints.ReportingTasksEndpointMerger;
    +import org.apache.nifi.cluster.coordination.http.endpoints.StatusHistoryEndpointMerger;
    +import org.apache.nifi.cluster.coordination.http.endpoints.SystemDiagnosticsEndpointMerger;
    +import org.apache.nifi.cluster.manager.NodeResponse;
    +import org.apache.nifi.stream.io.NullOutputStream;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +public class StandardHttpResponseMerger implements HttpResponseMerger {
    +    private Logger logger = LoggerFactory.getLogger(StandardHttpResponseMerger.class);
    +    private static final List<EndpointResponseMerger> endpointMergers = new ArrayList<>();
    +
    +    static {
    +        endpointMergers.add(new ControllerStatusEndpointMerger());
    +        endpointMergers.add(new GroupStatusEndpointMerger());
    +        endpointMergers.add(new ProcessorStatusEndpointMerger());
    +        endpointMergers.add(new ConnectionStatusEndpiontMerger());
    +        endpointMergers.add(new PortStatusEndpointMerger());
    +        endpointMergers.add(new RemoteProcessGroupStatusEndpointMerger());
    +        endpointMergers.add(new ProcessorEndpointMerger());
    +        endpointMergers.add(new ProcessorsEndpointMerger());
    +        endpointMergers.add(new RemoteProcessGroupEndpointMerger());
    +        endpointMergers.add(new RemoteProcessGroupsEndpointMerger());
    +        endpointMergers.add(new ProcessGroupEndpointMerger());
    +        endpointMergers.add(new FlowSnippetEndpointMerger());
    +        endpointMergers.add(new ProvenanceQueryEndpointMerger());
    +        endpointMergers.add(new ProvenanceEventEndpointMerger());
    +        endpointMergers.add(new ControllerServiceEndpointMerger());
    +        endpointMergers.add(new ControllerServicesEndpointMerger());
    +        endpointMergers.add(new ControllerServiceReferenceEndpointMerger());
    +        endpointMergers.add(new ReportingTaskEndpointMerger());
    +        endpointMergers.add(new ReportingTasksEndpointMerger());
    +        endpointMergers.add(new DropRequestEndpiontMerger());
    +        endpointMergers.add(new ListFlowFilesEndpointMerger());
    +        endpointMergers.add(new ComponentStateEndpointMerger());
    +        endpointMergers.add(new BulletinBoardEndpointMerger());
    +        endpointMergers.add(new StatusHistoryEndpointMerger());
    +        endpointMergers.add(new SystemDiagnosticsEndpointMerger());
    +        endpointMergers.add(new CountersEndpointMerger());
    +    }
    +
    +    @Override
    +    public NodeResponse mergeResponses(final URI uri, final String httpMethod, final Set<NodeResponse> nodeResponses) {
    +        final boolean hasSuccess = hasSuccessfulResponse(nodeResponses);
    +        if (!hasSuccess) {
    +            // It doesn't really matter which response we choose, as all are problematic.
    +            final NodeResponse clientResponse = nodeResponses.iterator().next();
    +
    +            // Drain the response from all nodes except for the 'chosen one'. This ensures that we don't
    +            // leave data lingering on the socket and ensures that we don't consume the content of the response
    +            // that we intend to respond with
    +            drainResponses(nodeResponses, clientResponse);
    +            return clientResponse;
    +        }
    +
    +        // Determine which responses are successful
    +        final Set<NodeResponse> successResponses = nodeResponses.stream().filter(p -> p.is2xx()).collect(Collectors.toSet());
    +        final Set<NodeResponse> problematicResponses = nodeResponses.stream().filter(p -> !p.is2xx()).collect(Collectors.toSet());
    --- End diff --
    
    Consider moving it down after determining a merger since it is not used unless merger != null.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request: NIFI-1727: Refactored logic for merging HTTP Re...

Posted by mcgilman <gi...@git.apache.org>.
Github user mcgilman commented on the pull request:

    https://github.com/apache/nifi/pull/338#issuecomment-207447096
  
    @apiri Pretty sure that Travis is already updated. Just the flag in Maven is still set to 1.7.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request: NIFI-1727: Refactored logic for merging HTTP Re...

Posted by olegz <gi...@git.apache.org>.
Github user olegz commented on the pull request:

    https://github.com/apache/nifi/pull/338#issuecomment-211074685
  
    Mark, besides the initial comments there also seems to be a lot of repetitive logic throughout. Do you think it is something you want to address now?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request: NIFI-1727: Refactored logic for merging HTTP Re...

Posted by olegz <gi...@git.apache.org>.
Github user olegz commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/338#discussion_r59989329
  
    --- Diff: nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/ControllerServiceEndpointMerger.java ---
    @@ -0,0 +1,145 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.nifi.cluster.coordination.http.endpoints;
    +
    +import java.net.URI;
    +import java.util.HashMap;
    +import java.util.Map;
    +import java.util.Set;
    +import java.util.regex.Pattern;
    +
    +import org.apache.nifi.cluster.manager.NodeResponse;
    +import org.apache.nifi.cluster.protocol.NodeIdentifier;
    +import org.apache.nifi.controller.service.ControllerServiceState;
    +import org.apache.nifi.web.api.dto.ControllerServiceDTO;
    +import org.apache.nifi.web.api.dto.ControllerServiceReferencingComponentDTO;
    +import org.apache.nifi.web.api.entity.ControllerServiceEntity;
    +
    +public class ControllerServiceEndpointMerger extends AbstractSingleEntityEndpoint<ControllerServiceEntity, ControllerServiceDTO> {
    +    public static final String CONTROLLER_SERVICES_URI = "/nifi-api/controller/controller-services/node";
    +    public static final Pattern CONTROLLER_SERVICE_URI_PATTERN = Pattern.compile("/nifi-api/controller/controller-services/node/[a-f0-9\\-]{36}");
    +
    +    @Override
    +    public boolean canHandle(URI uri, String method) {
    +        if (("GET".equalsIgnoreCase(method) || "PUT".equalsIgnoreCase(method)) && CONTROLLER_SERVICE_URI_PATTERN.matcher(uri.getPath()).matches()) {
    +            return true;
    +        } else if ("POST".equalsIgnoreCase(method) && CONTROLLER_SERVICES_URI.equals(uri.getPath())) {
    +            return true;
    +        }
    +
    +        return false;
    +    }
    +
    +    @Override
    +    protected Class<ControllerServiceEntity> getEntityClass() {
    +        return ControllerServiceEntity.class;
    +    }
    +
    +    @Override
    +    protected ControllerServiceDTO getDto(ControllerServiceEntity entity) {
    +        return entity.getControllerService();
    +    }
    +
    +    @Override
    +    protected void mergeResponses(ControllerServiceDTO clientDto, Map<NodeIdentifier, ControllerServiceDTO> dtoMap, Set<NodeResponse> successfulResponses, Set<NodeResponse> problematicResponses) {
    +        final Map<String, Set<NodeIdentifier>> validationErrorMap = new HashMap<>();
    +        final Set<ControllerServiceReferencingComponentDTO> referencingComponents = clientDto.getReferencingComponents();
    +        final Map<NodeIdentifier, Set<ControllerServiceReferencingComponentDTO>> nodeReferencingComponentsMap = new HashMap<>();
    +
    +        String state = null;
    +        for (final Map.Entry<NodeIdentifier, ControllerServiceDTO> nodeEntry : dtoMap.entrySet()) {
    +            final NodeIdentifier nodeId = nodeEntry.getKey();
    +            final ControllerServiceDTO nodeControllerService = nodeEntry.getValue();
    +
    +            if (state == null) {
    +                if (ControllerServiceState.DISABLING.name().equals(nodeControllerService.getState())) {
    +                    state = ControllerServiceState.DISABLING.name();
    +                } else if (ControllerServiceState.ENABLING.name().equals(nodeControllerService.getState())) {
    +                    state = ControllerServiceState.ENABLING.name();
    +                }
    +            }
    --- End diff --
    
    Also, regardless of the answer on the previous comment, wouldn't the above be the same as
    ```
    if (state == null) {
        state = nodeControllerService.getState().name();
    }
    ```
    Also, just noticed; Are you performing _equals_ on String vs ControllerServiceState? I mean the above IF has no chance to be true


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request: NIFI-1727: Refactored logic for merging HTTP Re...

Posted by olegz <gi...@git.apache.org>.
Github user olegz commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/338#discussion_r60087458
  
    --- Diff: nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/ControllerServiceEndpointMerger.java ---
    @@ -0,0 +1,145 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.nifi.cluster.coordination.http.endpoints;
    +
    +import java.net.URI;
    +import java.util.HashMap;
    +import java.util.Map;
    +import java.util.Set;
    +import java.util.regex.Pattern;
    +
    +import org.apache.nifi.cluster.manager.NodeResponse;
    +import org.apache.nifi.cluster.protocol.NodeIdentifier;
    +import org.apache.nifi.controller.service.ControllerServiceState;
    +import org.apache.nifi.web.api.dto.ControllerServiceDTO;
    +import org.apache.nifi.web.api.dto.ControllerServiceReferencingComponentDTO;
    +import org.apache.nifi.web.api.entity.ControllerServiceEntity;
    +
    +public class ControllerServiceEndpointMerger extends AbstractSingleEntityEndpoint<ControllerServiceEntity, ControllerServiceDTO> {
    +    public static final String CONTROLLER_SERVICES_URI = "/nifi-api/controller/controller-services/node";
    +    public static final Pattern CONTROLLER_SERVICE_URI_PATTERN = Pattern.compile("/nifi-api/controller/controller-services/node/[a-f0-9\\-]{36}");
    +
    +    @Override
    +    public boolean canHandle(URI uri, String method) {
    +        if (("GET".equalsIgnoreCase(method) || "PUT".equalsIgnoreCase(method)) && CONTROLLER_SERVICE_URI_PATTERN.matcher(uri.getPath()).matches()) {
    +            return true;
    +        } else if ("POST".equalsIgnoreCase(method) && CONTROLLER_SERVICES_URI.equals(uri.getPath())) {
    +            return true;
    +        }
    +
    +        return false;
    +    }
    --- End diff --
    
    Hmm, IF -> true and ELSE -> true. I mean I see what you're doing, but i think it would be more readable if it was:
    ```
    return (("GET".equalsIgnoreCase(method) || "PUT".equalsIgnoreCase(method)) && CONTROLLER_SERVICE_URI_PATTERN.matcher(uri.getPath()).matches()) 
    || ("POST".equalsIgnoreCase(method) && CONTROLLER_SERVICES_URI.equals(uri.getPath()));
    ```



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request: NIFI-1727: Refactored logic for merging HTTP Re...

Posted by mcgilman <gi...@git.apache.org>.
Github user mcgilman commented on the pull request:

    https://github.com/apache/nifi/pull/338#issuecomment-207433484
  
    @markap14 This PR doesn't build for me (or Travis) as the source flag in the pom is still set to 1.7.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request: NIFI-1727: Refactored logic for merging HTTP Re...

Posted by olegz <gi...@git.apache.org>.
Github user olegz commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/338#discussion_r59989423
  
    --- Diff: nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/ControllerServiceReferenceEndpointMerger.java ---
    @@ -0,0 +1,67 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.nifi.cluster.coordination.http.endpoints;
    +
    +import java.net.URI;
    +import java.util.HashMap;
    +import java.util.Map;
    +import java.util.Set;
    +import java.util.regex.Pattern;
    +
    +import org.apache.nifi.cluster.coordination.http.EndpointResponseMerger;
    +import org.apache.nifi.cluster.manager.NodeResponse;
    +import org.apache.nifi.cluster.protocol.NodeIdentifier;
    +import org.apache.nifi.web.api.dto.ControllerServiceReferencingComponentDTO;
    +import org.apache.nifi.web.api.entity.ControllerServiceReferencingComponentsEntity;
    +
    +public class ControllerServiceReferenceEndpointMerger implements EndpointResponseMerger {
    +    public static final Pattern CONTROLLER_SERVICE_REFERENCES_URI_PATTERN = Pattern.compile("/nifi-api/controller/controller-services/node/[a-f0-9\\-]{36}/references");
    +
    +    @Override
    +    public boolean canHandle(URI uri, String method) {
    +        if (("GET".equalsIgnoreCase(method) || "PUT".equalsIgnoreCase(method)) && CONTROLLER_SERVICE_REFERENCES_URI_PATTERN.matcher(uri.getPath()).matches()) {
    --- End diff --
    
    Can you just return that?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request: NIFI-1727: Refactored logic for merging HTTP Re...

Posted by markap14 <gi...@git.apache.org>.
Github user markap14 closed the pull request at:

    https://github.com/apache/nifi/pull/338


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request: NIFI-1727: Refactored logic for merging HTTP Re...

Posted by apiri <gi...@git.apache.org>.
Github user apiri commented on the pull request:

    https://github.com/apache/nifi/pull/338#issuecomment-207446046
  
    As this is moving us to 1.8, could you also please update the travis.yml for the branch to reflect it?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request: NIFI-1727: Refactored logic for merging HTTP Re...

Posted by olegz <gi...@git.apache.org>.
Github user olegz commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/338#discussion_r60090174
  
    --- Diff: nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/ControllerStatusEndpointMerger.java ---
    @@ -0,0 +1,83 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.nifi.cluster.coordination.http.endpoints;
    +
    +import java.net.URI;
    +import java.util.Map;
    +import java.util.Set;
    +import java.util.regex.Pattern;
    +
    +import org.apache.nifi.cluster.manager.NodeResponse;
    +import org.apache.nifi.cluster.manager.StatusMerger;
    +import org.apache.nifi.cluster.protocol.NodeIdentifier;
    +import org.apache.nifi.web.api.dto.BulletinDTO;
    +import org.apache.nifi.web.api.dto.status.ControllerStatusDTO;
    +import org.apache.nifi.web.api.entity.ControllerStatusEntity;
    +
    +public class ControllerStatusEndpointMerger extends AbstractSingleEntityEndpoint<ControllerStatusEntity, ControllerStatusDTO> {
    +    public static final Pattern CONTROLLER_STATUS_URI_PATTERN = Pattern.compile("/nifi-api/controller/status");
    +
    +    @Override
    +    public boolean canHandle(URI uri, String method) {
    +        return "GET".equalsIgnoreCase(method) && CONTROLLER_STATUS_URI_PATTERN.matcher(uri.getPath()).matches();
    +    }
    +
    +    @Override
    +    protected Class<ControllerStatusEntity> getEntityClass() {
    +        return ControllerStatusEntity.class;
    +    }
    +
    +    @Override
    +    protected ControllerStatusDTO getDto(ControllerStatusEntity entity) {
    +        return entity.getControllerStatus();
    +    }
    +
    +    @Override
    +    protected void mergeResponses(ControllerStatusDTO clientDto, Map<NodeIdentifier, ControllerStatusDTO> dtoMap, Set<NodeResponse> successfulResponses, Set<NodeResponse> problematicResponses) {
    +        ControllerStatusDTO mergedStatus = clientDto;
    +        for (final Map.Entry<NodeIdentifier, ControllerStatusDTO> entry : dtoMap.entrySet()) {
    +            final NodeIdentifier nodeId = entry.getKey();
    +            final ControllerStatusDTO nodeStatus = entry.getValue();
    +
    +            final String nodeAddress = nodeId.getApiAddress() + ":" + nodeId.getApiPort();
    +            for (final BulletinDTO bulletin : nodeStatus.getBulletins()) {
    +                bulletin.setNodeAddress(nodeAddress);
    +            }
    +            for (final BulletinDTO bulletin : nodeStatus.getControllerServiceBulletins()) {
    +                bulletin.setNodeAddress(nodeAddress);
    +            }
    +            for (final BulletinDTO bulletin : nodeStatus.getReportingTaskBulletins()) {
    +                bulletin.setNodeAddress(nodeAddress);
    +            }
    --- End diff --
    
    Also, since we are using Java 8 this could be further simplified into a single line
    ```
    Stream.concat(nodeStatus.getBulletins(), nodeStatus.getControllerServiceBulletins(), nodeStatus.getReportingTaskBulletins()).forEach(bulletin:: setNodeAddress)
    ```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request: NIFI-1727: Refactored logic for merging HTTP Re...

Posted by olegz <gi...@git.apache.org>.
Github user olegz commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/338#discussion_r59989402
  
    --- Diff: nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/ControllerServiceEndpointMerger.java ---
    @@ -0,0 +1,145 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.nifi.cluster.coordination.http.endpoints;
    +
    +import java.net.URI;
    +import java.util.HashMap;
    +import java.util.Map;
    +import java.util.Set;
    +import java.util.regex.Pattern;
    +
    +import org.apache.nifi.cluster.manager.NodeResponse;
    +import org.apache.nifi.cluster.protocol.NodeIdentifier;
    +import org.apache.nifi.controller.service.ControllerServiceState;
    +import org.apache.nifi.web.api.dto.ControllerServiceDTO;
    +import org.apache.nifi.web.api.dto.ControllerServiceReferencingComponentDTO;
    +import org.apache.nifi.web.api.entity.ControllerServiceEntity;
    +
    +public class ControllerServiceEndpointMerger extends AbstractSingleEntityEndpoint<ControllerServiceEntity, ControllerServiceDTO> {
    +    public static final String CONTROLLER_SERVICES_URI = "/nifi-api/controller/controller-services/node";
    +    public static final Pattern CONTROLLER_SERVICE_URI_PATTERN = Pattern.compile("/nifi-api/controller/controller-services/node/[a-f0-9\\-]{36}");
    +
    +    @Override
    +    public boolean canHandle(URI uri, String method) {
    +        if (("GET".equalsIgnoreCase(method) || "PUT".equalsIgnoreCase(method)) && CONTROLLER_SERVICE_URI_PATTERN.matcher(uri.getPath()).matches()) {
    +            return true;
    +        } else if ("POST".equalsIgnoreCase(method) && CONTROLLER_SERVICES_URI.equals(uri.getPath())) {
    +            return true;
    +        }
    +
    +        return false;
    +    }
    +
    +    @Override
    +    protected Class<ControllerServiceEntity> getEntityClass() {
    +        return ControllerServiceEntity.class;
    +    }
    +
    +    @Override
    +    protected ControllerServiceDTO getDto(ControllerServiceEntity entity) {
    +        return entity.getControllerService();
    +    }
    +
    +    @Override
    +    protected void mergeResponses(ControllerServiceDTO clientDto, Map<NodeIdentifier, ControllerServiceDTO> dtoMap, Set<NodeResponse> successfulResponses, Set<NodeResponse> problematicResponses) {
    +        final Map<String, Set<NodeIdentifier>> validationErrorMap = new HashMap<>();
    +        final Set<ControllerServiceReferencingComponentDTO> referencingComponents = clientDto.getReferencingComponents();
    +        final Map<NodeIdentifier, Set<ControllerServiceReferencingComponentDTO>> nodeReferencingComponentsMap = new HashMap<>();
    +
    +        String state = null;
    +        for (final Map.Entry<NodeIdentifier, ControllerServiceDTO> nodeEntry : dtoMap.entrySet()) {
    +            final NodeIdentifier nodeId = nodeEntry.getKey();
    +            final ControllerServiceDTO nodeControllerService = nodeEntry.getValue();
    +
    +            if (state == null) {
    +                if (ControllerServiceState.DISABLING.name().equals(nodeControllerService.getState())) {
    +                    state = ControllerServiceState.DISABLING.name();
    +                } else if (ControllerServiceState.ENABLING.name().equals(nodeControllerService.getState())) {
    +                    state = ControllerServiceState.ENABLING.name();
    +                }
    +            }
    +
    +            for (final ControllerServiceReferencingComponentDTO nodeReferencingComponents : nodeControllerService.getReferencingComponents()) {
    +                nodeReferencingComponentsMap.put(nodeId, nodeReferencingComponents.getReferencingComponents());
    +            }
    +
    +            // merge the validation errors
    +            mergeValidationErrors(validationErrorMap, nodeId, nodeControllerService.getValidationErrors());
    +        }
    +
    +        // merge the referencing components
    +        mergeControllerServiceReferences(referencingComponents, nodeReferencingComponentsMap);
    +
    +        // store the 'transition' state is applicable
    +        if (state != null) {
    +            clientDto.setState(state);
    +        }
    +
    +        // set the merged the validation errors
    +        clientDto.setValidationErrors(normalizedMergedValidationErrors(validationErrorMap, dtoMap.size()));
    +    }
    +
    +    public static void mergeControllerServiceReferences(Set<ControllerServiceReferencingComponentDTO> referencingComponents,
    +        Map<NodeIdentifier, Set<ControllerServiceReferencingComponentDTO>> referencingComponentMap) {
    +
    +        final Map<String, Integer> activeThreadCounts = new HashMap<>();
    +        final Map<String, String> states = new HashMap<>();
    +        for (final Map.Entry<NodeIdentifier, Set<ControllerServiceReferencingComponentDTO>> nodeEntry : referencingComponentMap.entrySet()) {
    +            final Set<ControllerServiceReferencingComponentDTO> nodeReferencingComponents = nodeEntry.getValue();
    +
    +            // go through all the nodes referencing components
    +            if (nodeReferencingComponents != null) {
    +                for (final ControllerServiceReferencingComponentDTO nodeReferencingComponent : nodeReferencingComponents) {
    +                    // handle active thread counts
    +                    if (nodeReferencingComponent.getActiveThreadCount() != null && nodeReferencingComponent.getActiveThreadCount() > 0) {
    +                        final Integer current = activeThreadCounts.get(nodeReferencingComponent.getId());
    +                        if (current == null) {
    +                            activeThreadCounts.put(nodeReferencingComponent.getId(), nodeReferencingComponent.getActiveThreadCount());
    +                        } else {
    +                            activeThreadCounts.put(nodeReferencingComponent.getId(), nodeReferencingComponent.getActiveThreadCount() + current);
    +                        }
    +                    }
    +
    +                    // handle controller service state
    +                    final String state = states.get(nodeReferencingComponent.getId());
    +                    if (state == null) {
    +                        if (ControllerServiceState.DISABLING.name().equals(nodeReferencingComponent.getState())) {
    +                            states.put(nodeReferencingComponent.getId(), ControllerServiceState.DISABLING.name());
    +                        } else if (ControllerServiceState.ENABLING.name().equals(nodeReferencingComponent.getState())) {
    +                            states.put(nodeReferencingComponent.getId(), ControllerServiceState.ENABLING.name());
    +                        }
    --- End diff --
    
    Same comment as before about comparing String and ControllerServiceState


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request: NIFI-1727: Refactored logic for merging HTTP Re...

Posted by olegz <gi...@git.apache.org>.
Github user olegz commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/338#discussion_r60093499
  
    --- Diff: nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/FlowSnippetEndpointMerger.java ---
    @@ -0,0 +1,103 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.nifi.cluster.coordination.http.endpoints;
    +
    +import java.net.URI;
    +import java.util.HashMap;
    +import java.util.Map;
    +import java.util.Set;
    +import java.util.regex.Pattern;
    +
    +import org.apache.nifi.cluster.coordination.http.EndpointResponseMerger;
    +import org.apache.nifi.cluster.manager.NodeResponse;
    +import org.apache.nifi.cluster.protocol.NodeIdentifier;
    +import org.apache.nifi.web.api.dto.FlowSnippetDTO;
    +import org.apache.nifi.web.api.dto.ProcessorDTO;
    +import org.apache.nifi.web.api.dto.RemoteProcessGroupDTO;
    +import org.apache.nifi.web.api.entity.FlowSnippetEntity;
    +
    +public class FlowSnippetEndpointMerger implements EndpointResponseMerger {
    +    public static final Pattern TEMPLATE_INSTANCE_URI_PATTERN = Pattern.compile("/nifi-api/controller/process-groups/(?:(?:root)|(?:[a-f0-9\\-]{36}))/template-instance");
    +    public static final Pattern FLOW_SNIPPET_INSTANCE_URI_PATTERN = Pattern.compile("/nifi-api/controller/process-groups/(?:(?:root)|(?:[a-f0-9\\-]{36}))/snippet-instance");
    +
    +    @Override
    +    public boolean canHandle(final URI uri, final String method) {
    +        return "POST".equalsIgnoreCase(method) && (TEMPLATE_INSTANCE_URI_PATTERN.matcher(uri.getPath()).matches() ||
    +            FLOW_SNIPPET_INSTANCE_URI_PATTERN.matcher(uri.getPath()).matches());
    +    }
    +
    +    @Override
    +    public NodeResponse merge(final URI uri, final String method, Set<NodeResponse> successfulResponses, final Set<NodeResponse> problematicResponses, final NodeResponse clientResponse) {
    +        final FlowSnippetEntity responseEntity = clientResponse.getClientResponse().getEntity(FlowSnippetEntity.class);
    +        final FlowSnippetDTO contents = responseEntity.getContents();
    +
    +        if (contents == null) {
    +            return clientResponse;
    +        } else {
    +            final Map<String, Map<NodeIdentifier, ProcessorDTO>> processorMap = new HashMap<>();
    +            final Map<String, Map<NodeIdentifier, RemoteProcessGroupDTO>> remoteProcessGroupMap = new HashMap<>();
    +
    +            for (final NodeResponse nodeResponse : successfulResponses) {
    +                final FlowSnippetEntity nodeResponseEntity = nodeResponse == clientResponse ? responseEntity : nodeResponse.getClientResponse().getEntity(FlowSnippetEntity.class);
    +                final FlowSnippetDTO nodeContents = nodeResponseEntity.getContents();
    +
    +                for (final ProcessorDTO nodeProcessor : nodeContents.getProcessors()) {
    +                    Map<NodeIdentifier, ProcessorDTO> innerMap = processorMap.get(nodeProcessor.getId());
    +                    if (innerMap == null) {
    +                        innerMap = new HashMap<>();
    +                        processorMap.put(nodeProcessor.getId(), innerMap);
    +                    }
    +
    +                    innerMap.put(nodeResponse.getNodeId(), nodeProcessor);
    +                }
    +
    +                for (final RemoteProcessGroupDTO nodeRemoteProcessGroup : nodeContents.getRemoteProcessGroups()) {
    +                    Map<NodeIdentifier, RemoteProcessGroupDTO> innerMap = remoteProcessGroupMap.get(nodeRemoteProcessGroup.getId());
    +                    if (innerMap == null) {
    --- End diff --
    
    This pattern appears 5 times in this PR. May be we should consider some utility helper class
    ```
    public static Map<> getInnerMap(Map<NodeIdentifier, NiFiComponentDTO> parentMap, Sting id) {
         Map<NodeIdentifier, NiFiComponentDTO> innerMap = parentMap.get(id);
         if (innerMap == null) {
              innerMap = new HashMap<>();
              parentMap.put(id, innerMap);
         }
    }
    ```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request: NIFI-1727: Refactored logic for merging HTTP Re...

Posted by olegz <gi...@git.apache.org>.
Github user olegz commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/338#discussion_r59989063
  
    --- Diff: nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/ConnectionStatusEndpiontMerger.java ---
    @@ -0,0 +1,74 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.nifi.cluster.coordination.http.endpoints;
    +
    +import java.net.URI;
    +import java.util.ArrayList;
    +import java.util.Map;
    +import java.util.regex.Pattern;
    +
    +import org.apache.nifi.cluster.manager.StatusMerger;
    +import org.apache.nifi.cluster.protocol.NodeIdentifier;
    +import org.apache.nifi.web.api.dto.status.ConnectionStatusDTO;
    +import org.apache.nifi.web.api.dto.status.NodeConnectionStatusSnapshotDTO;
    +import org.apache.nifi.web.api.entity.ConnectionStatusEntity;
    +
    +public class ConnectionStatusEndpiontMerger extends AbstractNodeStatusEndpoint<ConnectionStatusEntity, ConnectionStatusDTO> {
    +    public static final Pattern CONNECTION_STATUS_URI_PATTERN = Pattern.compile("/nifi-api/controller/process-groups/(?:(?:root)|(?:[a-f0-9\\-]{36}))/connections/[a-f0-9\\-]{36}/status");
    +
    +    @Override
    +    public boolean canHandle(URI uri, String method) {
    +        return "GET".equalsIgnoreCase(method) && CONNECTION_STATUS_URI_PATTERN.matcher(uri.getPath()).matches();
    +    }
    +
    +    @Override
    +    protected Class<ConnectionStatusEntity> getEntityClass() {
    +        return ConnectionStatusEntity.class;
    +    }
    +
    +    @Override
    +    protected ConnectionStatusDTO getDto(ConnectionStatusEntity entity) {
    +        return entity.getConnectionStatus();
    +    }
    +
    +    @Override
    +    protected void mergeResponses(ConnectionStatusDTO clientDto, Map<NodeIdentifier, ConnectionStatusDTO> dtoMap, NodeIdentifier selectedNodeId) {
    +        final ConnectionStatusDTO mergedConnectionStatus = clientDto;
    +        mergedConnectionStatus.setNodeSnapshots(new ArrayList<NodeConnectionStatusSnapshotDTO>());
    +
    +        final NodeConnectionStatusSnapshotDTO selectedNodeSnapshot = new NodeConnectionStatusSnapshotDTO();
    +        selectedNodeSnapshot.setStatusSnapshot(clientDto.getAggregateSnapshot().clone());
    +        selectedNodeSnapshot.setAddress(selectedNodeId.getApiAddress());
    +        selectedNodeSnapshot.setApiPort(selectedNodeId.getApiPort());
    +        selectedNodeSnapshot.setNodeId(selectedNodeId.getId());
    +
    +        mergedConnectionStatus.getNodeSnapshots().add(selectedNodeSnapshot);
    +
    +        // merge the other nodes
    +        for (final Map.Entry<NodeIdentifier, ConnectionStatusDTO> entry : dtoMap.entrySet()) {
    +            final NodeIdentifier nodeId = entry.getKey();
    +            final ConnectionStatusDTO nodeConnectionStatus = entry.getValue();
    +            if (nodeConnectionStatus == clientDto) {
    +                continue;
    +            }
    +
    +            StatusMerger.merge(mergedConnectionStatus, nodeConnectionStatus, nodeId.getId(), nodeId.getApiAddress(), nodeId.getApiPort());
    --- End diff --
    
    I think it would be better (from readability) to 
    ```
    if (nodeConnectionStatus != clientDto) {
        StatusMerger.merge(. . .)
    }
    ```



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request: NIFI-1727: Refactored logic for merging HTTP Re...

Posted by olegz <gi...@git.apache.org>.
Github user olegz commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/338#discussion_r60092307
  
    --- Diff: nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/ListFlowFilesEndpointMerger.java ---
    @@ -0,0 +1,158 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.nifi.cluster.coordination.http.endpoints;
    +
    +import java.net.URI;
    +import java.util.ArrayList;
    +import java.util.Comparator;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.NavigableSet;
    +import java.util.Set;
    +import java.util.TreeSet;
    +import java.util.regex.Pattern;
    +
    +import org.apache.nifi.cluster.manager.NodeResponse;
    +import org.apache.nifi.cluster.protocol.NodeIdentifier;
    +import org.apache.nifi.controller.queue.ListFlowFileState;
    +import org.apache.nifi.web.api.dto.FlowFileSummaryDTO;
    +import org.apache.nifi.web.api.dto.ListingRequestDTO;
    +import org.apache.nifi.web.api.dto.QueueSizeDTO;
    +import org.apache.nifi.web.api.entity.ListingRequestEntity;
    +
    +public class ListFlowFilesEndpointMerger extends AbstractSingleEntityEndpoint<ListingRequestEntity, ListingRequestDTO> {
    +    public static final Pattern LISTING_REQUESTS_URI = Pattern
    +        .compile("/nifi-api/controller/process-groups/(?:(?:root)|(?:[a-f0-9\\-]{36}))/connections/[a-f0-9\\-]{36}/listing-requests");
    +    public static final Pattern LISTING_REQUEST_URI = Pattern
    +        .compile("/nifi-api/controller/process-groups/(?:(?:root)|(?:[a-f0-9\\-]{36}))/connections/[a-f0-9\\-]{36}/listing-requests/[a-f0-9\\-]{36}");
    +
    +    @Override
    +    public boolean canHandle(URI uri, String method) {
    +        if (("GET".equalsIgnoreCase(method) || "DELETE".equalsIgnoreCase(method)) && LISTING_REQUEST_URI.matcher(uri.getPath()).matches()) {
    +            return true;
    +        } else if ("POST".equalsIgnoreCase(method) && LISTING_REQUESTS_URI.matcher(uri.getPath()).matches()) {
    +            return true;
    +        }
    +
    +        return false;
    +    }
    +
    +    @Override
    +    protected Class<ListingRequestEntity> getEntityClass() {
    +        return ListingRequestEntity.class;
    +    }
    +
    +    @Override
    +    protected ListingRequestDTO getDto(ListingRequestEntity entity) {
    +        return entity.getListingRequest();
    +    }
    +
    +    @Override
    +    protected void mergeResponses(ListingRequestDTO clientDto, Map<NodeIdentifier, ListingRequestDTO> dtoMap, Set<NodeResponse> successfulResponses, Set<NodeResponse> problematicResponses) {
    +        final Comparator<FlowFileSummaryDTO> comparator = new Comparator<FlowFileSummaryDTO>() {
    +            @Override
    +            public int compare(final FlowFileSummaryDTO dto1, final FlowFileSummaryDTO dto2) {
    +                int positionCompare = dto1.getPosition().compareTo(dto2.getPosition());
    +                if (positionCompare != 0) {
    +                    return positionCompare;
    +                }
    +
    +                final String address1 = dto1.getClusterNodeAddress();
    +                final String address2 = dto2.getClusterNodeAddress();
    +                if (address1 == null && address2 == null) {
    +                    return 0;
    +                }
    +                if (address1 == null) {
    +                    return 1;
    +                }
    +                if (address2 == null) {
    +                    return -1;
    +                }
    +                return address1.compareTo(address2);
    +            }
    +        };
    +
    +        final NavigableSet<FlowFileSummaryDTO> flowFileSummaries = new TreeSet<>(comparator);
    +
    +        ListFlowFileState state = null;
    +        int numStepsCompleted = 0;
    +        int numStepsTotal = 0;
    +        int objectCount = 0;
    +        long byteCount = 0;
    +        boolean finished = true;
    +        for (final Map.Entry<NodeIdentifier, ListingRequestDTO> entry : dtoMap.entrySet()) {
    +            final NodeIdentifier nodeIdentifier = entry.getKey();
    +            final String nodeAddress = nodeIdentifier.getApiAddress() + ":" + nodeIdentifier.getApiPort();
    +
    +            final ListingRequestDTO nodeRequest = entry.getValue();
    +
    +            numStepsTotal++;
    +            if (Boolean.TRUE.equals(nodeRequest.getFinished())) {
    +                numStepsCompleted++;
    +            }
    +
    +            final QueueSizeDTO nodeQueueSize = nodeRequest.getQueueSize();
    +            objectCount += nodeQueueSize.getObjectCount();
    +            byteCount += nodeQueueSize.getByteCount();
    +
    +            if (!nodeRequest.getFinished()) {
    +                finished = false;
    +            }
    --- End diff --
    
    Couple of questions here
    1. Should this just be ```finished = nodeRequest.getFinished()```
    2. It seems like you are setting it while iterating, but then using its value outside the loop (line:152). So I am assuming that one node can be finished and another is not and the value of 'finished' really depends on the order of iteration. Is that intentional?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request: NIFI-1727: Refactored logic for merging HTTP Re...

Posted by apiri <gi...@git.apache.org>.
Github user apiri commented on the pull request:

    https://github.com/apache/nifi/pull/338#issuecomment-207448114
  
    Whoops, right you are.  That's what I get for not scoping things out.  Carry on 😎


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request: NIFI-1727: Refactored logic for merging HTTP Re...

Posted by olegz <gi...@git.apache.org>.
Github user olegz commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/338#discussion_r59988925
  
    --- Diff: nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/AbstractMultiEntityEndpoint.java ---
    @@ -0,0 +1,99 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.nifi.cluster.coordination.http.endpoints;
    +
    +import java.net.URI;
    +import java.util.HashMap;
    +import java.util.Map;
    +import java.util.Set;
    +
    +import org.apache.nifi.cluster.coordination.http.EndpointResponseMerger;
    +import org.apache.nifi.cluster.manager.NodeResponse;
    +import org.apache.nifi.cluster.protocol.NodeIdentifier;
    +import org.apache.nifi.web.api.entity.Entity;
    +
    +public abstract class AbstractMultiEntityEndpoint<EntityType extends Entity, DtoType> implements EndpointResponseMerger {
    +
    +    @Override
    +    public final NodeResponse merge(final URI uri, final String method, final Set<NodeResponse> successfulResponses, final Set<NodeResponse> problematicResponses, final NodeResponse clientResponse) {
    +        if (!canHandle(uri, method)) {
    +            throw new IllegalArgumentException("Cannot use Endpoint Mapper of type " + getClass().getSimpleName() + " to map responses for URI " + uri + ", HTTP Method " + method);
    +        }
    +
    +        final EntityType responseEntity = clientResponse.getClientResponse().getEntity(getEntityClass());
    +        final Set<DtoType> dtos = getDtos(responseEntity);
    +
    +        final Map<String, Map<NodeIdentifier, DtoType>> dtoMap = new HashMap<>();
    +        for (final NodeResponse nodeResponse : successfulResponses) {
    +            final EntityType nodeResponseEntity = nodeResponse == clientResponse ? responseEntity : nodeResponse.getClientResponse().getEntity(getEntityClass());
    +            final Set<DtoType> nodeDtos = getDtos(nodeResponseEntity);
    +
    +            for (final DtoType nodeDto : nodeDtos) {
    +                final NodeIdentifier nodeId = nodeResponse.getNodeId();
    +                Map<NodeIdentifier, DtoType> innerMap = dtoMap.get(nodeId);
    +                if (innerMap == null) {
    --- End diff --
    
    Any possibility of this code to be invoked by multiple threads?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request: NIFI-1727: Refactored logic for merging HTTP Re...

Posted by olegz <gi...@git.apache.org>.
Github user olegz commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/338#discussion_r60086063
  
    --- Diff: nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/StandardHttpResponseMerger.java ---
    @@ -0,0 +1,164 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.nifi.cluster.coordination.http;
    +
    +import java.io.IOException;
    +import java.net.URI;
    +import java.util.ArrayList;
    +import java.util.List;
    +import java.util.Set;
    +import java.util.stream.Collectors;
    +
    +import javax.ws.rs.core.StreamingOutput;
    +
    +import org.apache.nifi.cluster.coordination.http.endpoints.BulletinBoardEndpointMerger;
    +import org.apache.nifi.cluster.coordination.http.endpoints.ComponentStateEndpointMerger;
    +import org.apache.nifi.cluster.coordination.http.endpoints.ConnectionStatusEndpiontMerger;
    +import org.apache.nifi.cluster.coordination.http.endpoints.ControllerServiceEndpointMerger;
    +import org.apache.nifi.cluster.coordination.http.endpoints.ControllerServiceReferenceEndpointMerger;
    +import org.apache.nifi.cluster.coordination.http.endpoints.ControllerServicesEndpointMerger;
    +import org.apache.nifi.cluster.coordination.http.endpoints.ControllerStatusEndpointMerger;
    +import org.apache.nifi.cluster.coordination.http.endpoints.CountersEndpointMerger;
    +import org.apache.nifi.cluster.coordination.http.endpoints.DropRequestEndpiontMerger;
    +import org.apache.nifi.cluster.coordination.http.endpoints.FlowSnippetEndpointMerger;
    +import org.apache.nifi.cluster.coordination.http.endpoints.GroupStatusEndpointMerger;
    +import org.apache.nifi.cluster.coordination.http.endpoints.ListFlowFilesEndpointMerger;
    +import org.apache.nifi.cluster.coordination.http.endpoints.PortStatusEndpointMerger;
    +import org.apache.nifi.cluster.coordination.http.endpoints.ProcessGroupEndpointMerger;
    +import org.apache.nifi.cluster.coordination.http.endpoints.ProcessorEndpointMerger;
    +import org.apache.nifi.cluster.coordination.http.endpoints.ProcessorStatusEndpointMerger;
    +import org.apache.nifi.cluster.coordination.http.endpoints.ProcessorsEndpointMerger;
    +import org.apache.nifi.cluster.coordination.http.endpoints.ProvenanceEventEndpointMerger;
    +import org.apache.nifi.cluster.coordination.http.endpoints.ProvenanceQueryEndpointMerger;
    +import org.apache.nifi.cluster.coordination.http.endpoints.RemoteProcessGroupEndpointMerger;
    +import org.apache.nifi.cluster.coordination.http.endpoints.RemoteProcessGroupStatusEndpointMerger;
    +import org.apache.nifi.cluster.coordination.http.endpoints.RemoteProcessGroupsEndpointMerger;
    +import org.apache.nifi.cluster.coordination.http.endpoints.ReportingTaskEndpointMerger;
    +import org.apache.nifi.cluster.coordination.http.endpoints.ReportingTasksEndpointMerger;
    +import org.apache.nifi.cluster.coordination.http.endpoints.StatusHistoryEndpointMerger;
    +import org.apache.nifi.cluster.coordination.http.endpoints.SystemDiagnosticsEndpointMerger;
    +import org.apache.nifi.cluster.manager.NodeResponse;
    +import org.apache.nifi.stream.io.NullOutputStream;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +public class StandardHttpResponseMerger implements HttpResponseMerger {
    +    private Logger logger = LoggerFactory.getLogger(StandardHttpResponseMerger.class);
    +    private static final List<EndpointResponseMerger> endpointMergers = new ArrayList<>();
    +
    +    static {
    +        endpointMergers.add(new ControllerStatusEndpointMerger());
    +        endpointMergers.add(new GroupStatusEndpointMerger());
    +        endpointMergers.add(new ProcessorStatusEndpointMerger());
    +        endpointMergers.add(new ConnectionStatusEndpiontMerger());
    +        endpointMergers.add(new PortStatusEndpointMerger());
    +        endpointMergers.add(new RemoteProcessGroupStatusEndpointMerger());
    +        endpointMergers.add(new ProcessorEndpointMerger());
    +        endpointMergers.add(new ProcessorsEndpointMerger());
    +        endpointMergers.add(new RemoteProcessGroupEndpointMerger());
    +        endpointMergers.add(new RemoteProcessGroupsEndpointMerger());
    +        endpointMergers.add(new ProcessGroupEndpointMerger());
    +        endpointMergers.add(new FlowSnippetEndpointMerger());
    +        endpointMergers.add(new ProvenanceQueryEndpointMerger());
    +        endpointMergers.add(new ProvenanceEventEndpointMerger());
    +        endpointMergers.add(new ControllerServiceEndpointMerger());
    +        endpointMergers.add(new ControllerServicesEndpointMerger());
    +        endpointMergers.add(new ControllerServiceReferenceEndpointMerger());
    +        endpointMergers.add(new ReportingTaskEndpointMerger());
    +        endpointMergers.add(new ReportingTasksEndpointMerger());
    +        endpointMergers.add(new DropRequestEndpiontMerger());
    +        endpointMergers.add(new ListFlowFilesEndpointMerger());
    +        endpointMergers.add(new ComponentStateEndpointMerger());
    +        endpointMergers.add(new BulletinBoardEndpointMerger());
    +        endpointMergers.add(new StatusHistoryEndpointMerger());
    +        endpointMergers.add(new SystemDiagnosticsEndpointMerger());
    +        endpointMergers.add(new CountersEndpointMerger());
    +    }
    +
    +    @Override
    +    public NodeResponse mergeResponses(final URI uri, final String httpMethod, final Set<NodeResponse> nodeResponses) {
    +        final boolean hasSuccess = hasSuccessfulResponse(nodeResponses);
    +        if (!hasSuccess) {
    +            // It doesn't really matter which response we choose, as all are problematic.
    +            final NodeResponse clientResponse = nodeResponses.iterator().next();
    +
    +            // Drain the response from all nodes except for the 'chosen one'. This ensures that we don't
    +            // leave data lingering on the socket and ensures that we don't consume the content of the response
    +            // that we intend to respond with
    +            drainResponses(nodeResponses, clientResponse);
    +            return clientResponse;
    +        }
    +
    +        // Determine which responses are successful
    +        final Set<NodeResponse> successResponses = nodeResponses.stream().filter(p -> p.is2xx()).collect(Collectors.toSet());
    +        final Set<NodeResponse> problematicResponses = nodeResponses.stream().filter(p -> !p.is2xx()).collect(Collectors.toSet());
    +
    +        // Choose any of the successful responses to be the 'chosen one'.
    +        final NodeResponse clientResponse = successResponses.iterator().next();
    +
    +        final EndpointResponseMerger merger = getEndpointResponseMerger(uri, httpMethod);
    +        if (merger == null) {
    +            return clientResponse;
    +        }
    +
    +        return merger.merge(uri, httpMethod, successResponses, problematicResponses, clientResponse);
    +    }
    +
    +    @Override
    +    public Set<NodeResponse> getProblematicNodeResponses(final Set<NodeResponse> allResponses) {
    +        // Check if there are any 2xx responses
    +        final boolean containsSuccessfulResponse = hasSuccessfulResponse(allResponses);
    +
    +        if (containsSuccessfulResponse) {
    +            // If there is a 2xx response, we consider a response to be problematic if it is not 2xx
    +            return allResponses.stream().filter(p -> !p.is2xx()).collect(Collectors.toSet());
    +        } else {
    +            // If no node is successful, we consider a problematic response to be only those that are 5xx
    +            return allResponses.stream().filter(p -> p.is5xx()).collect(Collectors.toSet());
    +        }
    --- End diff --
    
    One of the benefits of functional programming is the ability to create a reference to a function and then reuse it. So the above could be further simplified by doing something like this:
    ```
    Predicate<?> predicate = containsSuccessfulResponse ? p -> !p.is2xx() : p -> p.is5xx()
    return allResponses.stream().filter(predicate).collect(Collectors.toSet());
    ```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request: NIFI-1727: Refactored logic for merging HTTP Re...

Posted by olegz <gi...@git.apache.org>.
Github user olegz commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/338#discussion_r59988860
  
    --- Diff: nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/StandardHttpResponseMerger.java ---
    @@ -0,0 +1,164 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.nifi.cluster.coordination.http;
    +
    +import java.io.IOException;
    +import java.net.URI;
    +import java.util.ArrayList;
    +import java.util.List;
    +import java.util.Set;
    +import java.util.stream.Collectors;
    +
    +import javax.ws.rs.core.StreamingOutput;
    +
    +import org.apache.nifi.cluster.coordination.http.endpoints.BulletinBoardEndpointMerger;
    +import org.apache.nifi.cluster.coordination.http.endpoints.ComponentStateEndpointMerger;
    +import org.apache.nifi.cluster.coordination.http.endpoints.ConnectionStatusEndpiontMerger;
    +import org.apache.nifi.cluster.coordination.http.endpoints.ControllerServiceEndpointMerger;
    +import org.apache.nifi.cluster.coordination.http.endpoints.ControllerServiceReferenceEndpointMerger;
    +import org.apache.nifi.cluster.coordination.http.endpoints.ControllerServicesEndpointMerger;
    +import org.apache.nifi.cluster.coordination.http.endpoints.ControllerStatusEndpointMerger;
    +import org.apache.nifi.cluster.coordination.http.endpoints.CountersEndpointMerger;
    +import org.apache.nifi.cluster.coordination.http.endpoints.DropRequestEndpiontMerger;
    +import org.apache.nifi.cluster.coordination.http.endpoints.FlowSnippetEndpointMerger;
    +import org.apache.nifi.cluster.coordination.http.endpoints.GroupStatusEndpointMerger;
    +import org.apache.nifi.cluster.coordination.http.endpoints.ListFlowFilesEndpointMerger;
    +import org.apache.nifi.cluster.coordination.http.endpoints.PortStatusEndpointMerger;
    +import org.apache.nifi.cluster.coordination.http.endpoints.ProcessGroupEndpointMerger;
    +import org.apache.nifi.cluster.coordination.http.endpoints.ProcessorEndpointMerger;
    +import org.apache.nifi.cluster.coordination.http.endpoints.ProcessorStatusEndpointMerger;
    +import org.apache.nifi.cluster.coordination.http.endpoints.ProcessorsEndpointMerger;
    +import org.apache.nifi.cluster.coordination.http.endpoints.ProvenanceEventEndpointMerger;
    +import org.apache.nifi.cluster.coordination.http.endpoints.ProvenanceQueryEndpointMerger;
    +import org.apache.nifi.cluster.coordination.http.endpoints.RemoteProcessGroupEndpointMerger;
    +import org.apache.nifi.cluster.coordination.http.endpoints.RemoteProcessGroupStatusEndpointMerger;
    +import org.apache.nifi.cluster.coordination.http.endpoints.RemoteProcessGroupsEndpointMerger;
    +import org.apache.nifi.cluster.coordination.http.endpoints.ReportingTaskEndpointMerger;
    +import org.apache.nifi.cluster.coordination.http.endpoints.ReportingTasksEndpointMerger;
    +import org.apache.nifi.cluster.coordination.http.endpoints.StatusHistoryEndpointMerger;
    +import org.apache.nifi.cluster.coordination.http.endpoints.SystemDiagnosticsEndpointMerger;
    +import org.apache.nifi.cluster.manager.NodeResponse;
    +import org.apache.nifi.stream.io.NullOutputStream;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +public class StandardHttpResponseMerger implements HttpResponseMerger {
    +    private Logger logger = LoggerFactory.getLogger(StandardHttpResponseMerger.class);
    +    private static final List<EndpointResponseMerger> endpointMergers = new ArrayList<>();
    +
    +    static {
    +        endpointMergers.add(new ControllerStatusEndpointMerger());
    +        endpointMergers.add(new GroupStatusEndpointMerger());
    +        endpointMergers.add(new ProcessorStatusEndpointMerger());
    +        endpointMergers.add(new ConnectionStatusEndpiontMerger());
    +        endpointMergers.add(new PortStatusEndpointMerger());
    +        endpointMergers.add(new RemoteProcessGroupStatusEndpointMerger());
    +        endpointMergers.add(new ProcessorEndpointMerger());
    +        endpointMergers.add(new ProcessorsEndpointMerger());
    +        endpointMergers.add(new RemoteProcessGroupEndpointMerger());
    +        endpointMergers.add(new RemoteProcessGroupsEndpointMerger());
    +        endpointMergers.add(new ProcessGroupEndpointMerger());
    +        endpointMergers.add(new FlowSnippetEndpointMerger());
    +        endpointMergers.add(new ProvenanceQueryEndpointMerger());
    +        endpointMergers.add(new ProvenanceEventEndpointMerger());
    +        endpointMergers.add(new ControllerServiceEndpointMerger());
    +        endpointMergers.add(new ControllerServicesEndpointMerger());
    +        endpointMergers.add(new ControllerServiceReferenceEndpointMerger());
    +        endpointMergers.add(new ReportingTaskEndpointMerger());
    +        endpointMergers.add(new ReportingTasksEndpointMerger());
    +        endpointMergers.add(new DropRequestEndpiontMerger());
    +        endpointMergers.add(new ListFlowFilesEndpointMerger());
    +        endpointMergers.add(new ComponentStateEndpointMerger());
    +        endpointMergers.add(new BulletinBoardEndpointMerger());
    +        endpointMergers.add(new StatusHistoryEndpointMerger());
    +        endpointMergers.add(new SystemDiagnosticsEndpointMerger());
    +        endpointMergers.add(new CountersEndpointMerger());
    +    }
    +
    +    @Override
    +    public NodeResponse mergeResponses(final URI uri, final String httpMethod, final Set<NodeResponse> nodeResponses) {
    +        final boolean hasSuccess = hasSuccessfulResponse(nodeResponses);
    +        if (!hasSuccess) {
    +            // It doesn't really matter which response we choose, as all are problematic.
    +            final NodeResponse clientResponse = nodeResponses.iterator().next();
    +
    +            // Drain the response from all nodes except for the 'chosen one'. This ensures that we don't
    +            // leave data lingering on the socket and ensures that we don't consume the content of the response
    +            // that we intend to respond with
    +            drainResponses(nodeResponses, clientResponse);
    +            return clientResponse;
    +        }
    +
    +        // Determine which responses are successful
    +        final Set<NodeResponse> successResponses = nodeResponses.stream().filter(p -> p.is2xx()).collect(Collectors.toSet());
    +        final Set<NodeResponse> problematicResponses = nodeResponses.stream().filter(p -> !p.is2xx()).collect(Collectors.toSet());
    +
    +        // Choose any of the successful responses to be the 'chosen one'.
    +        final NodeResponse clientResponse = successResponses.iterator().next();
    +
    +        final EndpointResponseMerger merger = getEndpointResponseMerger(uri, httpMethod);
    +        if (merger == null) {
    +            return clientResponse;
    +        }
    +
    +        return merger.merge(uri, httpMethod, successResponses, problematicResponses, clientResponse);
    +    }
    +
    +    @Override
    +    public Set<NodeResponse> getProblematicNodeResponses(final Set<NodeResponse> allResponses) {
    +        // Check if there are any 2xx responses
    +        final boolean containsSuccessfulResponse = hasSuccessfulResponse(allResponses);
    +
    +        if (containsSuccessfulResponse) {
    +            // If there is a 2xx response, we consider a response to be problematic if it is not 2xx
    +            return allResponses.stream().filter(p -> !p.is2xx()).collect(Collectors.toSet());
    +        } else {
    +            // If no node is successful, we consider a problematic response to be only those that are 5xx
    +            return allResponses.stream().filter(p -> p.is5xx()).collect(Collectors.toSet());
    +        }
    +    }
    +
    +    public static boolean isResponseInterpreted(final URI uri, final String httpMethod) {
    +        return getEndpointResponseMerger(uri, httpMethod) != null;
    +    }
    +
    +    private static EndpointResponseMerger getEndpointResponseMerger(final URI uri, final String httpMethod) {
    +        return endpointMergers.stream().filter(p -> p.canHandle(uri, httpMethod)).findFirst().orElse(null);
    +    }
    +
    +    private boolean hasSuccessfulResponse(final Set<NodeResponse> allResponses) {
    +        return allResponses.stream().anyMatch(p -> p.is2xx());
    +    }
    +
    +
    +    private void drainResponses(final Set<NodeResponse> responses, final NodeResponse exclude) {
    +        responses.stream().parallel().filter(response -> response != exclude).forEach(response -> drainResponse(response));
    +    }
    +
    +    private void drainResponse(final NodeResponse response) {
    +        if (response.hasThrowable()) {
    +            return;
    +        }
    +
    +        try {
    +            ((StreamingOutput) response.getResponse().getEntity()).write(new NullOutputStream());
    +        } catch (final IOException ioe) {
    +            logger.info("Failed clearing out non-client response buffer from " + response.getNodeId() + " due to: " + ioe, ioe);
    --- End diff --
    
    Are you sure you wanted to log error as INFO?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---