You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@nifi.apache.org by GitBox <gi...@apache.org> on 2022/12/01 11:51:43 UTC

[GitHub] [nifi] simonbence commented on a diff in pull request #6736: NIFI-10918: When fetching a flow from a Flow Registry, if it referenc…

simonbence commented on code in PR #6736:
URL: https://github.com/apache/nifi/pull/6736#discussion_r1037010206


##########
nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/registry/flow/StandardFlowRegistryClientNode.java:
##########
@@ -332,14 +333,37 @@ private void populateVersionedContentsRecursively(final FlowRegistryClientUserCo
         }
     }
 
-    private FlowRegistryClientNode getRegistryForInternalFlow(final String storageLocation) throws FlowRegistryException, IOException {
-        for (FlowRegistryClientNode registryClientNode : flowManager.getAllFlowRegistryClients()) {
-            if (registryClientNode.isStorageLocationApplicable(storageLocation)) {
-                return registryClientNode;
+    private RegisteredFlowSnapshot fetchFlowContents(final FlowRegistryClientUserContext context, final VersionedFlowCoordinates coordinates,
+                                                     final boolean fetchRemoteFlows) throws FlowRegistryException {
+
+        final String storageLocation = coordinates.getStorageLocation() == null ? coordinates.getRegistryUrl() : coordinates.getStorageLocation();
+        final String bucketId = coordinates.getBucketId();
+        final String flowId = coordinates.getFlowId();
+        final int version = coordinates.getVersion();
+
+        final List<FlowRegistryClientNode> clientNodes = getRegistryClientsForInternalFlow(storageLocation);
+        for (final FlowRegistryClientNode clientNode : clientNodes) {
+            try {
+                logger.debug("Attempting to fetch flow for Bucket [{}] Flow [{}] Version [{}] using {}", bucketId, flowId, version, clientNode);
+                final RegisteredFlowSnapshot snapshot = clientNode.getFlowContents(context, bucketId, flowId, version, fetchRemoteFlows);
+                coordinates.setRegistryId(clientNode.getIdentifier());
+
+                logger.debug("Successfully fetched flow for Bucket [{}] Flow [{}] Version [{}] using {}", bucketId, flowId, version, clientNode);
+                return snapshot;
+            } catch (final Exception e) {
+                logger.debug("Failed to fetch flow", e);

Review Comment:
   It would worth to provide detail about the registry and flow



##########
nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/registry/flow/StandardFlowRegistryClientNode.java:
##########
@@ -332,14 +333,37 @@ private void populateVersionedContentsRecursively(final FlowRegistryClientUserCo
         }
     }
 
-    private FlowRegistryClientNode getRegistryForInternalFlow(final String storageLocation) throws FlowRegistryException, IOException {
-        for (FlowRegistryClientNode registryClientNode : flowManager.getAllFlowRegistryClients()) {
-            if (registryClientNode.isStorageLocationApplicable(storageLocation)) {
-                return registryClientNode;
+    private RegisteredFlowSnapshot fetchFlowContents(final FlowRegistryClientUserContext context, final VersionedFlowCoordinates coordinates,
+                                                     final boolean fetchRemoteFlows) throws FlowRegistryException {
+
+        final String storageLocation = coordinates.getStorageLocation() == null ? coordinates.getRegistryUrl() : coordinates.getStorageLocation();
+        final String bucketId = coordinates.getBucketId();
+        final String flowId = coordinates.getFlowId();
+        final int version = coordinates.getVersion();
+
+        final List<FlowRegistryClientNode> clientNodes = getRegistryClientsForInternalFlow(storageLocation);
+        for (final FlowRegistryClientNode clientNode : clientNodes) {
+            try {
+                logger.debug("Attempting to fetch flow for Bucket [{}] Flow [{}] Version [{}] using {}", bucketId, flowId, version, clientNode);
+                final RegisteredFlowSnapshot snapshot = clientNode.getFlowContents(context, bucketId, flowId, version, fetchRemoteFlows);
+                coordinates.setRegistryId(clientNode.getIdentifier());
+
+                logger.debug("Successfully fetched flow for Bucket [{}] Flow [{}] Version [{}] using {}", bucketId, flowId, version, clientNode);
+                return snapshot;
+            } catch (final Exception e) {
+                logger.debug("Failed to fetch flow", e);
             }
         }
 
-        throw new FlowRegistryException(String.format("No applicable registry found for storage location %s", storageLocation));
+        throw new FlowRegistryException(String.format("Could not find any Registry Client that was able to fetch flow with Bucket [%s] Flow [%s] Version [%s] with Storage Location [%s]",
+            bucketId, flowId, version, storageLocation));
+    }
+
+    private List<FlowRegistryClientNode> getRegistryClientsForInternalFlow(final String storageLocation) {
+        // Sort clients based on whether or not they believe they are applicable for the given storage location
+        final List<FlowRegistryClientNode> matchingClients = new ArrayList<>(flowManager.getAllFlowRegistryClients());
+        matchingClients.sort(Comparator.comparing(client -> client.isStorageLocationApplicable(storageLocation) ? -1 : 1));
+        return matchingClients;

Review Comment:
   Have you considered using the isStorageLocationApplicable as filter instead for an extra safeguard? (Like it would try not every clients but every clients returning true for that call). My reasoning: identifiers of flows and buckets are generated by the registry implementations, which might differ case by case. It is possible that the ids are for example sequentials and different flows might exist with the same bucket and flow id, but without other relation? Like in this case if the bucket id is 1 and flow id is 1, and there are multiple registry instances with an implementation like this, the first will be loaded. Which in this case not necessary a flow which "matches" the storage location, might result an unexpected flow to be loaded 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org