You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by jg...@apache.org on 2021/12/08 20:59:50 UTC

[nifi] branch main updated: NIFI-9421: Running NiFi Stateless with local NARs only

This is an automated email from the ASF dual-hosted git repository.

jgresock pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/main by this push:
     new dafa03a  NIFI-9421: Running NiFi Stateless with local NARs only
dafa03a is described below

commit dafa03a21a7a9b017e621c4319a058ea9b291951
Author: Peter Turcsanyi <tu...@apache.org>
AuthorDate: Tue Nov 30 14:15:38 2021 +0100

    NIFI-9421: Running NiFi Stateless with local NARs only
    
    Signed-off-by: Joe Gresock <jg...@gmail.com>
    
    This closes #5557.
---
 nifi-external/nifi-kafka-connect/README.md         |  2 +
 nifi-stateless/nifi-stateless-assembly/README.md   |  7 +++-
 .../extensions/FileSystemExtensionRepository.java  | 46 +++++++++++++---------
 .../flow/StandardStatelessDataflowFactory.java     |  3 +-
 4 files changed, 36 insertions(+), 22 deletions(-)

diff --git a/nifi-external/nifi-kafka-connect/README.md b/nifi-external/nifi-kafka-connect/README.md
index d8b62ad..c927b85 100644
--- a/nifi-external/nifi-kafka-connect/README.md
+++ b/nifi-external/nifi-kafka-connect/README.md
@@ -156,6 +156,7 @@ a result, Connect can be configured with the URL of a Nexus server. The example
 extensions. When a connector is started, it will first identify which extensions are necessary to run the dataflow, determine which extensions are available,
 and then automatically download any necessary extensions that it currently does not have available. If configuring a Nexus instance that has multiple repositories,
 the name of the repository should be included in the URL. For example: `https://nexus-private.myorganization.org/nexus/repository/my-repository/`.
+If the property is not specified, the necessary extensions (used by the flow) must be provided in the `extensions.directory` before deploying the connector.
 
 `(9) flow.snapshot`: Specifies the dataflow to run. This is the file that was downloaded by right-clicking on the Process Group in NiFi and
 clicking "Download flow". The dataflow can be stored external to the configured and the location can be represented as an HTTP (or HTTPS URL), or a filename.
@@ -292,6 +293,7 @@ a result, Connect can be configured with the URL of a Nexus server. The example
 extensions. When a connector is started, it will first identify which extensions are necessary to run the dataflow, determine which extensions are available,
 and then automatically download any necessary extensions that it currently does not have available. If configuring a Nexus instance that has multiple repositories,
 the name of the repository should be included in the URL. For example: `https://nexus-private.myorganization.org/nexus/repository/my-repository/`.
+If the property is not specified, the necessary extensions (used by the flow) must be provided in the `extensions.directory` before deploying the connector.
 
 `(9) flow.snapshot`: Specifies the dataflow to run. This is the file that was downloaded by right-clicking on the Process Group in NiFi and
 clicking "Download flow". The dataflow can be stored external to the configured and the location can be represented as an HTTP (or HTTPS URL), or a filename.
diff --git a/nifi-stateless/nifi-stateless-assembly/README.md b/nifi-stateless/nifi-stateless-assembly/README.md
index 2cddb33..eaece7e 100644
--- a/nifi-stateless/nifi-stateless-assembly/README.md
+++ b/nifi-stateless/nifi-stateless-assembly/README.md
@@ -221,8 +221,11 @@ The following properties may be used to indicate where extensions are to be loca
 
 When Stateless NiFi is started, it parses the provided dataflow and determines which bundles/extensions are necessary
 to run the dataflow. If an extension is not available, or the version referenced by the flow is not available, Stateless
-may attempt to download the extensions automatically. To do this, one or more Extension Clients must be configured.
-Each client is configured using several properties, which are all tied together using a 'key'. For example, if we have
+may attempt to download the extensions automatically. To do this, one or more Extension Clients need to be configured. If no
+Extension Clients are configured, only those extensions can be used that are already available (e.g. manually downloaded and copied offline)
+in the directories specified by the `nifi.stateless.extensions.directory` and `nifi.stateless.readonly.extensions.directory.<suffix>` properties described above.
+
+Each Extension Client is configured using several properties, which are all tied together using a 'key'. For example, if we have
 4 properties, `nifi.stateless.extension.client.ABC.type`, `nifi.stateless.extension.client.ABC.baseUrl`,
 `nifi.stateless.extension.client.XYZ.type`, and `nifi.stateless.extension.client.XYZ.baseUrl`, then we know that
 the first `type` property refers to the same client as the first `baseUrl` property because they both have the 'key'
diff --git a/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/extensions/FileSystemExtensionRepository.java b/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/extensions/FileSystemExtensionRepository.java
index d007633..45df96c 100644
--- a/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/extensions/FileSystemExtensionRepository.java
+++ b/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/extensions/FileSystemExtensionRepository.java
@@ -25,6 +25,7 @@ import org.apache.nifi.nar.NarClassLoaders;
 import org.apache.nifi.nar.NarLoadResult;
 import org.apache.nifi.nar.NarUnpacker;
 import org.apache.nifi.stateless.engine.NarUnpackLock;
+import org.apache.nifi.stateless.engine.StatelessEngineConfiguration;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -46,40 +47,49 @@ public class FileSystemExtensionRepository implements ExtensionRepository {
 
     private final ExtensionDiscoveringManager extensionManager;
     private final NarClassLoaders narClassLoaders;
-    private final File writableLibDirectory;
+    private final File narDirectory;
+    private final File writableExtensionDirectory;
     private final Set<File> readOnlyExtensionDirectories;
     private final File workingDirectory;
     private final List<ExtensionClient> clients;
 
 
-    public FileSystemExtensionRepository(final ExtensionDiscoveringManager extensionManager, final File writableLibDirectory, final Collection<File> readOnlyExtensionDirectories,
-                                         final File workingDirectory, final NarClassLoaders narClassLoaders, final List<ExtensionClient> clients) {
+    public FileSystemExtensionRepository(final ExtensionDiscoveringManager extensionManager, final StatelessEngineConfiguration engineConfiguration, final NarClassLoaders narClassLoaders,
+                                         final List<ExtensionClient> clients) {
         this.extensionManager = extensionManager;
-        this.writableLibDirectory = writableLibDirectory;
-        this.readOnlyExtensionDirectories = readOnlyExtensionDirectories == null ? Collections.emptySet() : new HashSet<>(readOnlyExtensionDirectories);
-        this.workingDirectory = workingDirectory;
+        this.narDirectory = engineConfiguration.getNarDirectory();
+        this.writableExtensionDirectory = engineConfiguration.getExtensionsDirectory();
+        this.readOnlyExtensionDirectories = engineConfiguration.getReadOnlyExtensionsDirectories() == null
+                ? Collections.emptySet()
+                : new HashSet<>(engineConfiguration.getReadOnlyExtensionsDirectories());
+        this.workingDirectory = engineConfiguration.getWorkingDirectory();
         this.narClassLoaders = narClassLoaders;
         this.clients = clients;
     }
 
     @Override
     public void initialize() throws IOException {
-        if (readOnlyExtensionDirectories.isEmpty()) {
-            return;
+        final Set<File> narFiles = new HashSet<>();
+
+        // if nar.directory and extensions.directory are the same, StatelessBootstrap has already loaded the nars
+        if (writableExtensionDirectory != null && !writableExtensionDirectory.equals(narDirectory)) {
+            narFiles.addAll(listNarFiles(writableExtensionDirectory));
         }
 
-        final Set<File> readOnlyNars = new HashSet<>();
         for (final File extensionDir : readOnlyExtensionDirectories) {
-            final File[] narFiles = extensionDir.listFiles(file -> file.getName().endsWith(".nar"));
-            if (narFiles == null) {
-                logger.warn("Failed to perform listing of read-only extensions directory {}. Will not load extensions from this directory.", extensionDir.getAbsolutePath());
-                continue;
-            }
-
-            readOnlyNars.addAll(Arrays.asList(narFiles));
+            narFiles.addAll(listNarFiles(extensionDir));
         }
 
-        loadExtensions(readOnlyNars);
+        loadExtensions(narFiles);
+    }
+
+    private Collection<File> listNarFiles(File extensionDir) {
+        final File[] narFiles = extensionDir.listFiles(file -> file.getName().endsWith(".nar"));
+        if (narFiles == null) {
+            logger.warn("Failed to perform listing of extensions directory {}. Will not preload extensions from this directory.", extensionDir.getAbsolutePath());
+            return Collections.emptyList();
+        }
+        return Arrays.asList(narFiles);
     }
 
     @Override
@@ -111,7 +121,7 @@ public class FileSystemExtensionRepository implements ExtensionRepository {
             return CompletableFuture.completedFuture(Collections.emptySet());
         }
 
-        final DownloadQueue downloadQueue = new DownloadQueue(extensionManager, executorService, concurrentDownloads, bundleCoordinates, writableLibDirectory, clients);
+        final DownloadQueue downloadQueue = new DownloadQueue(extensionManager, executorService, concurrentDownloads, bundleCoordinates, writableExtensionDirectory, clients);
         final CompletableFuture<Void> downloadFuture = downloadQueue.download();
         logger.info("Beginning download of extensions {}", bundleCoordinates);
 
diff --git a/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/flow/StandardStatelessDataflowFactory.java b/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/flow/StandardStatelessDataflowFactory.java
index 3f64964..be85c5a 100644
--- a/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/flow/StandardStatelessDataflowFactory.java
+++ b/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/flow/StandardStatelessDataflowFactory.java
@@ -142,8 +142,7 @@ public class StandardStatelessDataflowFactory implements StatelessDataflowFactor
                 extensionClients.add(extensionClient);
             }
 
-            final ExtensionRepository extensionRepository = new FileSystemExtensionRepository(extensionManager, engineConfiguration.getExtensionsDirectory(),
-                engineConfiguration.getReadOnlyExtensionsDirectories(), engineConfiguration.getWorkingDirectory(), narClassLoaders, extensionClients);
+            final ExtensionRepository extensionRepository = new FileSystemExtensionRepository(extensionManager, engineConfiguration, narClassLoaders, extensionClients);
             extensionRepository.initialize();
 
             final VariableRegistry variableRegistry = VariableRegistry.EMPTY_REGISTRY;