You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@nifi.apache.org by GitBox <gi...@apache.org> on 2022/11/30 23:08:40 UTC

[GitHub] [nifi] markap14 opened a new pull request, #6736: NIFI-10918: When fetching a flow from a Flow Registry, if it referenc…

markap14 opened a new pull request, #6736:
URL: https://github.com/apache/nifi/pull/6736

   …es any 'internal versioned flows' instead of requiring that we have a client configured for the appropriate URL, attempt to fetch the flow from each client. We will start with the clients that do report that they can handle the URL but will try others as well. As soon as we successfully fetch the flow, we stop.
   
   <!-- Licensed to the Apache Software Foundation (ASF) under one or more -->
   <!-- contributor license agreements.  See the NOTICE file distributed with -->
   <!-- this work for additional information regarding copyright ownership. -->
   <!-- The ASF licenses this file to You under the Apache License, Version 2.0 -->
   <!-- (the "License"); you may not use this file except in compliance with -->
   <!-- the License.  You may obtain a copy of the License at -->
   <!--     http://www.apache.org/licenses/LICENSE-2.0 -->
   <!-- Unless required by applicable law or agreed to in writing, software -->
   <!-- distributed under the License is distributed on an "AS IS" BASIS, -->
   <!-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -->
   <!-- See the License for the specific language governing permissions and -->
   <!-- limitations under the License. -->
   
   # Summary
   
   [NIFI-00000](https://issues.apache.org/jira/browse/NIFI-00000)
   
   # Tracking
   
   Please complete the following tracking steps prior to pull request creation.
   
   ### Issue Tracking
   
   - [ ] [Apache NiFi Jira](https://issues.apache.org/jira/browse/NIFI) issue created
   
   ### Pull Request Tracking
   
   - [ ] Pull Request title starts with Apache NiFi Jira issue number, such as `NIFI-00000`
   - [ ] Pull Request commit message starts with Apache NiFi Jira issue number, as such `NIFI-00000`
   
   ### Pull Request Formatting
   
   - [ ] Pull Request based on current revision of the `main` branch
   - [ ] Pull Request refers to a feature branch with one commit containing changes
   
   # Verification
   
   Please indicate the verification steps performed prior to pull request creation.
   
   ### Build
   
   - [ ] Build completed using `mvn clean install -P contrib-check`
     - [ ] JDK 8
     - [ ] JDK 11
     - [ ] JDK 17
   
   ### Licensing
   
   - [ ] New dependencies are compatible with the [Apache License 2.0](https://apache.org/licenses/LICENSE-2.0) according to the [License Policy](https://www.apache.org/legal/resolved.html)
   - [ ] New dependencies are documented in applicable `LICENSE` and `NOTICE` files
   
   ### Documentation
   
   - [ ] Documentation formatting appears as expected in rendered files
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi] simonbence commented on a diff in pull request #6736: NIFI-10918: When fetching a flow from a Flow Registry, if it referenc…

Posted by GitBox <gi...@apache.org>.
simonbence commented on code in PR #6736:
URL: https://github.com/apache/nifi/pull/6736#discussion_r1037010206


##########
nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/registry/flow/StandardFlowRegistryClientNode.java:
##########
@@ -332,14 +333,37 @@ private void populateVersionedContentsRecursively(final FlowRegistryClientUserCo
         }
     }
 
-    private FlowRegistryClientNode getRegistryForInternalFlow(final String storageLocation) throws FlowRegistryException, IOException {
-        for (FlowRegistryClientNode registryClientNode : flowManager.getAllFlowRegistryClients()) {
-            if (registryClientNode.isStorageLocationApplicable(storageLocation)) {
-                return registryClientNode;
+    private RegisteredFlowSnapshot fetchFlowContents(final FlowRegistryClientUserContext context, final VersionedFlowCoordinates coordinates,
+                                                     final boolean fetchRemoteFlows) throws FlowRegistryException {
+
+        final String storageLocation = coordinates.getStorageLocation() == null ? coordinates.getRegistryUrl() : coordinates.getStorageLocation();
+        final String bucketId = coordinates.getBucketId();
+        final String flowId = coordinates.getFlowId();
+        final int version = coordinates.getVersion();
+
+        final List<FlowRegistryClientNode> clientNodes = getRegistryClientsForInternalFlow(storageLocation);
+        for (final FlowRegistryClientNode clientNode : clientNodes) {
+            try {
+                logger.debug("Attempting to fetch flow for Bucket [{}] Flow [{}] Version [{}] using {}", bucketId, flowId, version, clientNode);
+                final RegisteredFlowSnapshot snapshot = clientNode.getFlowContents(context, bucketId, flowId, version, fetchRemoteFlows);
+                coordinates.setRegistryId(clientNode.getIdentifier());
+
+                logger.debug("Successfully fetched flow for Bucket [{}] Flow [{}] Version [{}] using {}", bucketId, flowId, version, clientNode);
+                return snapshot;
+            } catch (final Exception e) {
+                logger.debug("Failed to fetch flow", e);

Review Comment:
   It would worth to provide detail about the registry and flow



##########
nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/registry/flow/StandardFlowRegistryClientNode.java:
##########
@@ -332,14 +333,37 @@ private void populateVersionedContentsRecursively(final FlowRegistryClientUserCo
         }
     }
 
-    private FlowRegistryClientNode getRegistryForInternalFlow(final String storageLocation) throws FlowRegistryException, IOException {
-        for (FlowRegistryClientNode registryClientNode : flowManager.getAllFlowRegistryClients()) {
-            if (registryClientNode.isStorageLocationApplicable(storageLocation)) {
-                return registryClientNode;
+    private RegisteredFlowSnapshot fetchFlowContents(final FlowRegistryClientUserContext context, final VersionedFlowCoordinates coordinates,
+                                                     final boolean fetchRemoteFlows) throws FlowRegistryException {
+
+        final String storageLocation = coordinates.getStorageLocation() == null ? coordinates.getRegistryUrl() : coordinates.getStorageLocation();
+        final String bucketId = coordinates.getBucketId();
+        final String flowId = coordinates.getFlowId();
+        final int version = coordinates.getVersion();
+
+        final List<FlowRegistryClientNode> clientNodes = getRegistryClientsForInternalFlow(storageLocation);
+        for (final FlowRegistryClientNode clientNode : clientNodes) {
+            try {
+                logger.debug("Attempting to fetch flow for Bucket [{}] Flow [{}] Version [{}] using {}", bucketId, flowId, version, clientNode);
+                final RegisteredFlowSnapshot snapshot = clientNode.getFlowContents(context, bucketId, flowId, version, fetchRemoteFlows);
+                coordinates.setRegistryId(clientNode.getIdentifier());
+
+                logger.debug("Successfully fetched flow for Bucket [{}] Flow [{}] Version [{}] using {}", bucketId, flowId, version, clientNode);
+                return snapshot;
+            } catch (final Exception e) {
+                logger.debug("Failed to fetch flow", e);
             }
         }
 
-        throw new FlowRegistryException(String.format("No applicable registry found for storage location %s", storageLocation));
+        throw new FlowRegistryException(String.format("Could not find any Registry Client that was able to fetch flow with Bucket [%s] Flow [%s] Version [%s] with Storage Location [%s]",
+            bucketId, flowId, version, storageLocation));
+    }
+
+    private List<FlowRegistryClientNode> getRegistryClientsForInternalFlow(final String storageLocation) {
+        // Sort clients based on whether or not they believe they are applicable for the given storage location
+        final List<FlowRegistryClientNode> matchingClients = new ArrayList<>(flowManager.getAllFlowRegistryClients());
+        matchingClients.sort(Comparator.comparing(client -> client.isStorageLocationApplicable(storageLocation) ? -1 : 1));
+        return matchingClients;

Review Comment:
   Have you considered using the isStorageLocationApplicable as filter instead for an extra safeguard? (Like it would try not every clients but every clients returning true for that call). My reasoning: identifiers of flows and buckets are generated by the registry implementations, which might differ case by case. It is possible that the ids are for example sequentials and different flows might exist with the same bucket and flow id, but without other relation? Like in this case if the bucket id is 1 and flow id is 1, and there are multiple registry instances with an implementation like this, the first will be loaded. Which in this case not necessary a flow which "matches" the storage location, might result an unexpected flow to be loaded 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi] asfgit closed pull request #6736: NIFI-10918: When fetching a flow from a Flow Registry, if it referenc…

Posted by GitBox <gi...@apache.org>.
asfgit closed pull request #6736: NIFI-10918: When fetching a flow from a Flow Registry, if it referenc…
URL: https://github.com/apache/nifi/pull/6736


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi] markap14 commented on a diff in pull request #6736: NIFI-10918: When fetching a flow from a Flow Registry, if it referenc…

Posted by GitBox <gi...@apache.org>.
markap14 commented on code in PR #6736:
URL: https://github.com/apache/nifi/pull/6736#discussion_r1037128202


##########
nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/registry/flow/StandardFlowRegistryClientNode.java:
##########
@@ -332,14 +333,37 @@ private void populateVersionedContentsRecursively(final FlowRegistryClientUserCo
         }
     }
 
-    private FlowRegistryClientNode getRegistryForInternalFlow(final String storageLocation) throws FlowRegistryException, IOException {
-        for (FlowRegistryClientNode registryClientNode : flowManager.getAllFlowRegistryClients()) {
-            if (registryClientNode.isStorageLocationApplicable(storageLocation)) {
-                return registryClientNode;
+    private RegisteredFlowSnapshot fetchFlowContents(final FlowRegistryClientUserContext context, final VersionedFlowCoordinates coordinates,
+                                                     final boolean fetchRemoteFlows) throws FlowRegistryException {
+
+        final String storageLocation = coordinates.getStorageLocation() == null ? coordinates.getRegistryUrl() : coordinates.getStorageLocation();
+        final String bucketId = coordinates.getBucketId();
+        final String flowId = coordinates.getFlowId();
+        final int version = coordinates.getVersion();
+
+        final List<FlowRegistryClientNode> clientNodes = getRegistryClientsForInternalFlow(storageLocation);
+        for (final FlowRegistryClientNode clientNode : clientNodes) {
+            try {
+                logger.debug("Attempting to fetch flow for Bucket [{}] Flow [{}] Version [{}] using {}", bucketId, flowId, version, clientNode);
+                final RegisteredFlowSnapshot snapshot = clientNode.getFlowContents(context, bucketId, flowId, version, fetchRemoteFlows);
+                coordinates.setRegistryId(clientNode.getIdentifier());
+
+                logger.debug("Successfully fetched flow for Bucket [{}] Flow [{}] Version [{}] using {}", bucketId, flowId, version, clientNode);
+                return snapshot;
+            } catch (final Exception e) {
+                logger.debug("Failed to fetch flow", e);
             }
         }
 
-        throw new FlowRegistryException(String.format("No applicable registry found for storage location %s", storageLocation));
+        throw new FlowRegistryException(String.format("Could not find any Registry Client that was able to fetch flow with Bucket [%s] Flow [%s] Version [%s] with Storage Location [%s]",
+            bucketId, flowId, version, storageLocation));
+    }
+
+    private List<FlowRegistryClientNode> getRegistryClientsForInternalFlow(final String storageLocation) {
+        // Sort clients based on whether or not they believe they are applicable for the given storage location
+        final List<FlowRegistryClientNode> matchingClients = new ArrayList<>(flowManager.getAllFlowRegistryClients());
+        matchingClients.sort(Comparator.comparing(client -> client.isStorageLocationApplicable(storageLocation) ? -1 : 1));
+        return matchingClients;

Review Comment:
   Interesting, I didn't consider the idea of buckets and flows having one-up IDs. I suppose it's possible. We could update the documentation to recommend against that.
   
   We need to check all registry clients, however, because a URL is not sufficient to know whether or not a given client is applicable. For example, if the port or hostname of the registry changes (as in the example in the Jira, when the registry went from being non-secure to secure) all of a sudden we can no longer load any flow that has an 'inner versioned flow' simply because the URL is no longer accurate.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi] simonbence commented on pull request #6736: NIFI-10918: When fetching a flow from a Flow Registry, if it referenc…

Posted by GitBox <gi...@apache.org>.
simonbence commented on PR #6736:
URL: https://github.com/apache/nifi/pull/6736#issuecomment-1335181613

   The code passed the test case successfully and also the debug shows that it tries to load the flow from the incorrect repository, then it falls back and loads it correctly. I will merge


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi] markap14 commented on a diff in pull request #6736: NIFI-10918: When fetching a flow from a Flow Registry, if it referenc…

Posted by GitBox <gi...@apache.org>.
markap14 commented on code in PR #6736:
URL: https://github.com/apache/nifi/pull/6736#discussion_r1037122363


##########
nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/registry/flow/StandardFlowRegistryClientNode.java:
##########
@@ -332,14 +333,37 @@ private void populateVersionedContentsRecursively(final FlowRegistryClientUserCo
         }
     }
 
-    private FlowRegistryClientNode getRegistryForInternalFlow(final String storageLocation) throws FlowRegistryException, IOException {
-        for (FlowRegistryClientNode registryClientNode : flowManager.getAllFlowRegistryClients()) {
-            if (registryClientNode.isStorageLocationApplicable(storageLocation)) {
-                return registryClientNode;
+    private RegisteredFlowSnapshot fetchFlowContents(final FlowRegistryClientUserContext context, final VersionedFlowCoordinates coordinates,
+                                                     final boolean fetchRemoteFlows) throws FlowRegistryException {
+
+        final String storageLocation = coordinates.getStorageLocation() == null ? coordinates.getRegistryUrl() : coordinates.getStorageLocation();
+        final String bucketId = coordinates.getBucketId();
+        final String flowId = coordinates.getFlowId();
+        final int version = coordinates.getVersion();
+
+        final List<FlowRegistryClientNode> clientNodes = getRegistryClientsForInternalFlow(storageLocation);
+        for (final FlowRegistryClientNode clientNode : clientNodes) {
+            try {
+                logger.debug("Attempting to fetch flow for Bucket [{}] Flow [{}] Version [{}] using {}", bucketId, flowId, version, clientNode);
+                final RegisteredFlowSnapshot snapshot = clientNode.getFlowContents(context, bucketId, flowId, version, fetchRemoteFlows);
+                coordinates.setRegistryId(clientNode.getIdentifier());
+
+                logger.debug("Successfully fetched flow for Bucket [{}] Flow [{}] Version [{}] using {}", bucketId, flowId, version, clientNode);
+                return snapshot;
+            } catch (final Exception e) {
+                logger.debug("Failed to fetch flow", e);

Review Comment:
   I didn't bother with logging that because I thought it too verbose. Specifically, because it's at a debug level and about 5 lines above we log very specifically the flow that we're about to fetch. Because of that, I didn't want to overcomplicate the log message.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi] simonbence commented on pull request #6736: NIFI-10918: When fetching a flow from a Flow Registry, if it referenc…

Posted by GitBox <gi...@apache.org>.
simonbence commented on PR #6736:
URL: https://github.com/apache/nifi/pull/6736#issuecomment-1335149789

   I execute the test case I added to the ticket. If it is successfull, I merge this into the main.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org