You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by jo...@apache.org on 2023/01/25 16:55:34 UTC

[nifi] branch main updated: NIFI-11001: This closes #6836. When new components are added, track a mapping of its Versioned Compoenent ID to the component itself. Then, use this when creating connections. This is important because if a Process Group has multiple components with the same Versioned Component ID across multiple child groups, the Versioned Component ID may be changed upon adding the component. Maintaining this mapping allows us to still create the connection correctly.

This is an automated email from the ASF dual-hosted git repository.

joewitt pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/main by this push:
     new 228a3441b5 NIFI-11001: This closes #6836. When new components are added, track a mapping of its Versioned Compoenent ID to the component itself. Then, use this when creating connections. This is important because if a Process Group has multiple components with the same Versioned Component ID across multiple child groups, the Versioned Component ID may be changed upon adding the component. Maintaining this mapping allows us to still create the connection correctly.
228a3441b5 is described below

commit 228a3441b56278dbaf3d46bec5d8246e74da10b1
Author: Mark Payne <ma...@hotmail.com>
AuthorDate: Wed Jan 11 11:18:37 2023 -0500

    NIFI-11001: This closes #6836. When new components are added, track a mapping of its Versioned Compoenent ID to the component itself. Then, use this when creating connections. This is important because if a Process Group has multiple components with the same Versioned Component ID across multiple child groups, the Versioned Component ID may be changed upon adding the component. Maintaining this mapping allows us to still create the connection correctly.
    
    Signed-off-by: Joe Witt <jo...@apache.org>
---
 .../ConnectableAdditionTracker.java                | 76 ++++++++++++++++++++++
 .../StandardVersionedComponentSynchronizer.java    | 18 ++++-
 2 files changed, 92 insertions(+), 2 deletions(-)

diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/flow/synchronization/ConnectableAdditionTracker.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/flow/synchronization/ConnectableAdditionTracker.java
new file mode 100644
index 0000000000..6c545a5197
--- /dev/null
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/flow/synchronization/ConnectableAdditionTracker.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.flow.synchronization;
+
+import org.apache.nifi.connectable.Connectable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+
+public class ConnectableAdditionTracker {
+    private static final Logger logger = LoggerFactory.getLogger(ConnectableAdditionTracker.class);
+
+    private final Map<ComponentKey, Connectable> tracking = new HashMap<>();
+
+    public void addComponent(final String instantiatedGroupId, final String versionedComponentId, final Connectable component) {
+        final ComponentKey key = new ComponentKey(instantiatedGroupId, versionedComponentId);
+        if (tracking.containsKey(key)) {
+            logger.debug("Component [{}] and Versioned Component ID [{}] added to Process Group [{}] but component with same Versioned ID already added",
+                component, versionedComponentId, instantiatedGroupId);
+        }
+
+        tracking.putIfAbsent(key, component);
+    }
+
+    public Optional<Connectable> getComponent(final String instantiatedGroupId, final String versionedComponentId) {
+        final ComponentKey key = new ComponentKey(instantiatedGroupId, versionedComponentId);
+        return Optional.ofNullable(tracking.get(key));
+    }
+
+    private static class ComponentKey {
+        private final String instantiatedGroupId;
+        private final String versionedComponentId;
+
+        public ComponentKey(final String instantiatedGroupid, final String versionedComponentId) {
+            this.instantiatedGroupId = instantiatedGroupid;
+            this.versionedComponentId = versionedComponentId;
+        }
+
+        @Override
+        public boolean equals(final Object o) {
+            if (this == o) {
+                return true;
+            }
+            if (o == null || getClass() != o.getClass()) {
+                return false;
+            }
+            final ComponentKey that = (ComponentKey) o;
+            return Objects.equals(instantiatedGroupId, that.instantiatedGroupId)
+                && Objects.equals(versionedComponentId, that.versionedComponentId);
+        }
+
+        @Override
+        public int hashCode() {
+            return Objects.hash(instantiatedGroupId, versionedComponentId);
+        }
+    }
+}
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/flow/synchronization/StandardVersionedComponentSynchronizer.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/flow/synchronization/StandardVersionedComponentSynchronizer.java
index 00743a44e6..5e96c0a506 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/flow/synchronization/StandardVersionedComponentSynchronizer.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/flow/synchronization/StandardVersionedComponentSynchronizer.java
@@ -153,6 +153,7 @@ public class StandardVersionedComponentSynchronizer implements VersionedComponen
 
     private Set<String> preExistingVariables = new HashSet<>();
     private FlowSynchronizationOptions syncOptions;
+    private final ConnectableAdditionTracker connectableAdditionTracker = new ConnectableAdditionTracker();
 
     public StandardVersionedComponentSynchronizer(final VersionedFlowSynchronizationContext context) {
         this.context = context;
@@ -2200,6 +2201,7 @@ public class StandardVersionedComponentSynchronizer implements VersionedComponen
         funnel.setVersionedComponentId(proposed.getIdentifier());
         destination.addFunnel(funnel);
         updateFunnel(funnel, proposed);
+        connectableAdditionTracker.addComponent(destination.getIdentifier(), proposed.getIdentifier(), funnel);
 
         return funnel;
     }
@@ -2336,6 +2338,7 @@ public class StandardVersionedComponentSynchronizer implements VersionedComponen
         port.setVersionedComponentId(proposed.getIdentifier());
         destination.addInputPort(port);
         updatePort(port, proposed, temporaryName);
+        connectableAdditionTracker.addComponent(destination.getIdentifier(), proposed.getIdentifier(), port);
 
         return port;
     }
@@ -2354,6 +2357,7 @@ public class StandardVersionedComponentSynchronizer implements VersionedComponen
         port.setVersionedComponentId(proposed.getIdentifier());
         destination.addOutputPort(port);
         updatePort(port, proposed, temporaryName);
+        connectableAdditionTracker.addComponent(destination.getIdentifier(), proposed.getIdentifier(), port);
 
         return port;
     }
@@ -2393,6 +2397,7 @@ public class StandardVersionedComponentSynchronizer implements VersionedComponen
         // Notify the processor node that the configuration (properties, e.g.) has been restored
         final ProcessContext processContext = context.getProcessContextFactory().apply(procNode);
         procNode.onConfigurationRestored(processContext);
+        connectableAdditionTracker.addComponent(destination.getIdentifier(), proposed.getIdentifier(), procNode);
 
         return procNode;
     }
@@ -3275,8 +3280,17 @@ public class StandardVersionedComponentSynchronizer implements VersionedComponen
 
         // If we're synchronizing and the component is not available by the instance ID, lookup the component by the ID instead.
         final Connectable connectableById = getConnectable(group, connectableComponent, ConnectableComponent::getId);
-        LOG.debug("Found no connectable in Process Group {} by Instance ID. Lookup by ID {} yielded {}", connectable, connectableComponent.getId(), connectableById);
-        return connectableById;
+        LOG.debug("Found no connectable in Process Group {} by Instance ID. Lookup by ID {} yielded {}", group, connectableComponent.getId(), connectableById);
+        if (connectableById != null) {
+            return connectableById;
+        }
+
+        final Optional<Connectable> addedComponent = connectableAdditionTracker.getComponent(group.getIdentifier(), connectableComponent.getId());
+        if (addedComponent.isPresent()) {
+            LOG.debug("Found Connectable in Process Group {} as newly added component {}", group, addedComponent.get());
+        }
+
+        return addedComponent.orElse(null);
     }
 
     private Connectable getConnectable(final ProcessGroup group, final ConnectableComponent connectableComponent, final Function<ConnectableComponent, String> idFunction) {