You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@nifi.apache.org by GitBox <gi...@apache.org> on 2022/03/17 20:09:46 UTC

[GitHub] [nifi] markap14 commented on a change in pull request #5696: NIFI-9615 Extending capabilities of NAR provider with restraing, conflict resolution strategy and refactors to make it more flexible

markap14 commented on a change in pull request #5696:
URL: https://github.com/apache/nifi/pull/5696#discussion_r829382858



##########
File path: nifi-commons/nifi-properties/src/main/java/org/apache/nifi/util/NiFiProperties.java
##########
@@ -510,6 +510,18 @@ public Integer getIntegerProperty(final String propertyName, final Integer defau
         }
     }
 
+    public Long getLongProperty(final String propertyName, final Long defaultValue) {
+        final String value = getProperty(propertyName);
+        if (value == null || value.trim().isEmpty()) {
+            return defaultValue;
+        }
+
+        try {
+            return Long.parseLong(value.trim());
+        } catch (final Exception e) {
+            return defaultValue;

Review comment:
       If we ignore a value in nifi.properties, we should log a WARNING indicating that the configured value is invalid so we're falling back to a default.

##########
File path: nifi-docs/src/main/asciidoc/administration-guide.adoc
##########
@@ -4456,24 +4465,40 @@ Ensure that the file has appropriate permissions for the nifi user and group.
 
 Refresh the browser page and the custom processor should now be available when adding a new Processor to your flow.
 
+[[nar-providers]]
 === NAR Providers
 
-NiFi supports fetching NAR files for the autoloading feature from external sources. This can be achieved by using NAR Providers. A NAR Provider serves as a connector between an external data store
-and NiFi.
+NiFi supports fetching NAR files for the autoloading feature from external sources. This can be achieved by using External Resource Providers.
+
+An External Resource Provider serves as a connector between an external data source and NiFi.
 
-When configured, a NAR Provider polls the external source for available NAR files and offers them to the framework. The framework then fetches new NAR files and copies them to
+When configured, an External Resource Provider polls the external source for available NAR files and offers them to the framework. The framework then fetches new NAR files and copies them to
 the `nifi.nar.library.autoload.directory` for autoloading.
 
-NAR Provider can be configured by adding the `nifi.nar.library.provider.<providerName>.implementation` property with value containing the proper implementation class. Some implementations might need
+By default, the polling will happen in every 5 minutes. It is possible to change this frequency by specifying the property `nifi.nar.library.poll.interval` with the number of milliseconds between polls.
+
+By default NAR files will be downloaded if no file with the same name exists in the folder defined by `nifi.nar.library.autoload.directory`. By setting the `nifi.nar.library.conflict.resolution` other conflict resolution strategies might be applied. Currently, the following strategies are supported:
+
+|===
+| Name                                                               |  Description
+| org.apache.nifi.flow.resource.DoNotReplaceResolutionStrategy       |  Will not replace files: if a file exists in the directory with the same name, it will not be downloaded again.
+| org.apache.nifi.flow.resource.ReplaceWithNewerResolutionStrategy   |  Will replace a file in the target directory if there is an available file in the source but with newer modification date.

Review comment:
       It might make more sense to make these configuration options "REPLACE" vs. "IGNORE" in order to align with the strategies that we provide in PutFile, etc.

##########
File path: nifi-nar-bundles/nifi-framework-bundle/nifi-framework-external-resource-utils/src/main/java/org/apache/nifi/flow/resource/BufferingExternalResourceProviderWorker.java
##########
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.flow.resource;
+
+import org.apache.nifi.nar.NarCloseable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.file.Files;
+import java.util.concurrent.CountDownLatch;
+
+/**
+ * It is important to note that this implementation prevents NiFi from catching up a file under writing. For example trying to load a NAR
+ * file which is not fully acquired for example could lead to issues. In order to avoid this, the worker first creates a temporary
+ * file and it will rename it to the expected name only after it has been successfully written to the disk.
+ */
+final class BufferingExternalResourceProviderWorker extends ConflictResolvingExternalResourceProviderWorker {

Review comment:
       I'm not sure that "Buffering" is the right word here - it doesn't buffer anything. It writes it directly out to a file. Buffering would imply that it holds the contents in memory. "CollisionAwareResourceProvider" or even "RenamingResourceProvider" may make more sense

##########
File path: nifi-docs/src/main/asciidoc/administration-guide.adoc
##########
@@ -4456,24 +4465,40 @@ Ensure that the file has appropriate permissions for the nifi user and group.
 
 Refresh the browser page and the custom processor should now be available when adding a new Processor to your flow.
 
+[[nar-providers]]
 === NAR Providers
 
-NiFi supports fetching NAR files for the autoloading feature from external sources. This can be achieved by using NAR Providers. A NAR Provider serves as a connector between an external data store
-and NiFi.
+NiFi supports fetching NAR files for the autoloading feature from external sources. This can be achieved by using External Resource Providers.
+
+An External Resource Provider serves as a connector between an external data source and NiFi.
 
-When configured, a NAR Provider polls the external source for available NAR files and offers them to the framework. The framework then fetches new NAR files and copies them to
+When configured, an External Resource Provider polls the external source for available NAR files and offers them to the framework. The framework then fetches new NAR files and copies them to
 the `nifi.nar.library.autoload.directory` for autoloading.
 
-NAR Provider can be configured by adding the `nifi.nar.library.provider.<providerName>.implementation` property with value containing the proper implementation class. Some implementations might need
+By default, the polling will happen in every 5 minutes. It is possible to change this frequency by specifying the property `nifi.nar.library.poll.interval` with the number of milliseconds between polls.

Review comment:
       We should not support any properties that require the unit be in milliseconds. Rather, we should support "5 mins", "1500 secs" etc. If that happens already, we should be more accurate in our documentation. If that doesn't happen already, we need to fix that.

##########
File path: nifi-api/src/main/java/org/apache/nifi/flow/resource/ExternalResourceProviderInitializationContext.java
##########
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.flow.resource;
+
+import javax.net.ssl.SSLContext;
+import java.util.Map;
+import java.util.Optional;
+import java.util.function.Predicate;
+
+/**
+ * Contains necessary information for extensions of external resource provider functionality.
+ */
+public interface ExternalResourceProviderInitializationContext {
+
+    /**
+     * @return Returns with the available properties.
+     */
+    Map<String, String> getProperties();
+
+    /**
+     * @return An optional predicate, which if presents might filter out unwanted files from the external source during listing.
+     */
+    default Optional<Predicate<ExternalResourceDescriptor>> getFilter() {
+        return Optional.empty();
+    }

Review comment:
       It feels a bit odd to me to have a default implementation that returns an `Optional` - so that if a concrete class wants to override it, they will have to override and then return an `Optional`... I wonder if it makes more sense to just return a `Predicate` with a default implementation that returns `() -> true`?

##########
File path: nifi-nar-bundles/nifi-framework-bundle/nifi-framework-external-resource-utils/src/main/java/org/apache/nifi/flow/resource/ConflictResolvingExternalResourceProviderWorker.java
##########
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.flow.resource;
+
+import org.apache.nifi.nar.NarCloseable;
+import org.apache.nifi.util.FileUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Collection;
+import java.util.UUID;
+import java.util.concurrent.CountDownLatch;
+
+/**
+ * This implementation of the worker uses a strategy in order to decide if an available resources is to download. Using
+ * different strategies result different behaviour. Like for example with {@code DoNotReplaceResolutionStrategy}
+ * the worker will not download a file with the same name until the given file is in the target directory, but in the other hand
+ * {@code ReplaceWithNewerResolutionStrategy} will replace resources if a new version is available in the external source.
+ */
+abstract class ConflictResolvingExternalResourceProviderWorker implements ExternalResourceProviderWorker {
+    private static final Logger LOGGER = LoggerFactory.getLogger(ConflictResolvingExternalResourceProviderWorker.class);
+
+    // A unique id is necessary for temporary files not to collide with temporary files from other instances.
+    private final String id = UUID.randomUUID().toString();
+
+    private final String name;
+    private final ClassLoader providerClassLoader;
+    private final ExternalResourceProvider provider;
+    private final File targetDirectory;
+    private final long pollTimeInMs;
+    private final ExternalResourceConflictResolutionStrategy resolutionStrategy;
+    private final CountDownLatch restrainStartupLatch;
+
+    private volatile boolean stopped = false;
+
+    ConflictResolvingExternalResourceProviderWorker(
+        final String namePrefix,
+        final ClassLoader providerClassLoader,
+        final ExternalResourceProvider provider,
+        final ExternalResourceConflictResolutionStrategy resolutionStrategy,
+        final File targetDirectory,
+        final long pollTimeInMs,
+        final CountDownLatch restrainStartupLatch
+    ) {
+        this.name = namePrefix + " - " + id;
+        this.providerClassLoader = providerClassLoader;
+        this.provider = provider;
+        this.resolutionStrategy = resolutionStrategy;
+        this.targetDirectory = targetDirectory;
+        this.pollTimeInMs = pollTimeInMs;
+        this.restrainStartupLatch = restrainStartupLatch;
+    }
+
+    @Override
+    public void run() {
+        LOGGER.info("External resource provider worker is started");
+
+        while (!stopped) {
+            try {
+                FileUtils.ensureDirectoryExistAndCanReadAndWrite(targetDirectory);
+            } catch (final IOException e) {
+                LOGGER.error("Could not ensure that target directory is accessible", e);
+                stopped = true;
+            }
+
+            if (!stopped) {
+                try {
+                    poll();
+                    restrainStartupLatch.countDown();

Review comment:
       This appears to be a threading bug.
   Consider a case where we have 3 providers: `a`, `b`, and `c`. So we have a CountDownLatch initialized to `3`.
   We also have a poll time of 30 seconds.
   Now, consider that `a` and `b` have no data to grab, but `c` has a lot of data. `c` may take 10 minutes to pull all data, for example.
   What happens here is that `a` runs, finishes its first iteration very quickly. Then it sleeps for 30 seconds.
   Meanwhile, `b` does the same thing.
   While `c` is still working on its first iteration, both `a` and `c` perform their second iteration and call `countDown` - we've now counted down 4 times, which has released the latch. But `c` is still working on its first iteration.
   
   Perhaps an easy solution here is to keep a 'loop counter' of how many times we've polled. Only call `countDown` if its the first polling iteration.

##########
File path: nifi-nar-bundles/nifi-framework-bundle/nifi-framework-external-resource-utils/src/main/java/org/apache/nifi/flow/resource/DoNotReplaceResolutionStrategy.java
##########
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.flow.resource;
+
+import java.io.File;
+import java.util.Arrays;
+
+/**
+ * This strategy will allow fetching a resource if there is no existing resource in the target directory with identical name.
+ */
+public final class DoNotReplaceResolutionStrategy implements ExternalResourceConflictResolutionStrategy {
+
+    @Override
+    public boolean shouldBeFetched(final File targetDirectory, final ExternalResourceDescriptor available) {
+        return !Arrays.stream(targetDirectory.listFiles())
+                .filter(f -> f.getName().equals(available.getLocation()))
+                .findFirst()
+                .isPresent();

Review comment:
       This is quite expensive and not obvious. Any reason to avoid just using:
   `return new File(targetDirectory, available.getLocation()).exists();`?

##########
File path: nifi-docs/src/main/asciidoc/administration-guide.adoc
##########
@@ -4456,24 +4465,40 @@ Ensure that the file has appropriate permissions for the nifi user and group.
 
 Refresh the browser page and the custom processor should now be available when adding a new Processor to your flow.
 
+[[nar-providers]]
 === NAR Providers
 
-NiFi supports fetching NAR files for the autoloading feature from external sources. This can be achieved by using NAR Providers. A NAR Provider serves as a connector between an external data store
-and NiFi.
+NiFi supports fetching NAR files for the autoloading feature from external sources. This can be achieved by using External Resource Providers.
+
+An External Resource Provider serves as a connector between an external data source and NiFi.
 
-When configured, a NAR Provider polls the external source for available NAR files and offers them to the framework. The framework then fetches new NAR files and copies them to
+When configured, an External Resource Provider polls the external source for available NAR files and offers them to the framework. The framework then fetches new NAR files and copies them to
 the `nifi.nar.library.autoload.directory` for autoloading.
 
-NAR Provider can be configured by adding the `nifi.nar.library.provider.<providerName>.implementation` property with value containing the proper implementation class. Some implementations might need
+By default, the polling will happen in every 5 minutes. It is possible to change this frequency by specifying the property `nifi.nar.library.poll.interval` with the number of milliseconds between polls.
+
+By default NAR files will be downloaded if no file with the same name exists in the folder defined by `nifi.nar.library.autoload.directory`. By setting the `nifi.nar.library.conflict.resolution` other conflict resolution strategies might be applied. Currently, the following strategies are supported:
+
+|===
+| Name                                                               |  Description
+| org.apache.nifi.flow.resource.DoNotReplaceResolutionStrategy       |  Will not replace files: if a file exists in the directory with the same name, it will not be downloaded again.
+| org.apache.nifi.flow.resource.ReplaceWithNewerResolutionStrategy   |  Will replace a file in the target directory if there is an available file in the source but with newer modification date.
+|====
+
+The NAR Provider service might restrain NiFi from startup until the first External Resource collection is not yet executed successfully for every provider. In order to override this behaviour, the `nifi.nar.library.restrain.startup` needs to be declared.

Review comment:
       This first sentence is a bit confusing. Perhaps "... until the first External Resource collection succeeds for every provider"?

##########
File path: nifi-nar-bundles/nifi-framework-bundle/nifi-framework-external-resource-utils/src/main/java/org/apache/nifi/flow/resource/BufferingExternalResourceProviderWorker.java
##########
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.flow.resource;
+
+import org.apache.nifi.nar.NarCloseable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.file.Files;
+import java.util.concurrent.CountDownLatch;
+
+/**
+ * It is important to note that this implementation prevents NiFi from catching up a file under writing. For example trying to load a NAR
+ * file which is not fully acquired for example could lead to issues. In order to avoid this, the worker first creates a temporary
+ * file and it will rename it to the expected name only after it has been successfully written to the disk.
+ */
+final class BufferingExternalResourceProviderWorker extends ConflictResolvingExternalResourceProviderWorker {
+    private static final Logger LOGGER = LoggerFactory.getLogger(BufferingExternalResourceProviderWorker.class);
+
+    BufferingExternalResourceProviderWorker(
+        final String namePrefix,
+        final ClassLoader providerClassLoader,
+        final ExternalResourceProvider provider,
+        final ExternalResourceConflictResolutionStrategy resolutionStrategy,
+        final File targetDirectory,
+        final long pollTimeInMs,
+        final CountDownLatch restrainStartupLatch
+    ) {
+        super(namePrefix, providerClassLoader, provider, resolutionStrategy, targetDirectory, pollTimeInMs, restrainStartupLatch);
+    }
+
+    protected void acquireResource(final ExternalResourceDescriptor availableResource) throws IOException {
+        final long startedAt = System.currentTimeMillis();
+        final InputStream inputStream;
+
+        final File targetFile = new File(getTargetDirectory(), availableResource.getLocation());
+        final File bufferFile = new File(getTargetDirectory().getPath() + "/.provider_" + getId() + "_buffer.tmp");
+        final File setAsideFile = new File(getTargetDirectory().getPath() + "/.provider_" + getId() + "_aside.tmp");
+
+        try (final NarCloseable narCloseable = NarCloseable.withComponentNarLoader(getProviderClassLoader())) {
+            inputStream = getProvider().fetchExternalResource(availableResource);
+        }
+
+        if (bufferFile.exists() && !bufferFile.delete()) {
+            throw new ExternalResourceProviderException("Buffer file '" + bufferFile.getName() +"' already exists and cannot be deleted");
+        }
+
+        Files.copy(inputStream, bufferFile.toPath());
+        inputStream.close();

Review comment:
       Need to use a try-with-resources for the InputStream here. If Files.copy() throws an IOException, or if buffer file exists & can't be deleted, this code will have just leaked the InputStream resource.

##########
File path: nifi-api/src/main/java/org/apache/nifi/flow/resource/ExternalResourceProviderInitializationContext.java
##########
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.flow.resource;
+
+import javax.net.ssl.SSLContext;
+import java.util.Map;
+import java.util.Optional;
+import java.util.function.Predicate;
+
+/**
+ * Contains necessary information for extensions of external resource provider functionality.
+ */
+public interface ExternalResourceProviderInitializationContext {
+
+    /**
+     * @return Returns with the available properties.
+     */
+    Map<String, String> getProperties();
+
+    /**
+     * @return An optional predicate, which if presents might filter out unwanted files from the external source during listing.
+     */
+    default Optional<Predicate<ExternalResourceDescriptor>> getFilter() {
+        return Optional.empty();
+    }
+
+    /**
+     * @return Returns an SSLContext created from NiFi's keystore/truststore
+     */
+    SSLContext getNiFiSSLContext();

Review comment:
       This should probably be `getSSLContext()`. The fact that it comes from NiFi's system keystore/truststore is not relevant, and it's very feasible that in the future we would provide a different keystore/truststore to use, rather than using the "system" ssl context.

##########
File path: nifi-nar-bundles/nifi-framework-bundle/nifi-framework-external-resource-utils/src/main/java/org/apache/nifi/flow/resource/CompositeExternalResourceProviderService.java
##########
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.flow.resource;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashSet;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+
+/**
+ * This service implementation is capable to manage multiple {@code ExternalResourceProviderWorker}
+ * instances all pointing to a different external source, possibly using even different type of providers.
+ */
+final class CompositeExternalResourceProviderService implements ExternalResourceProviderService {
+    private static final Logger LOGGER = LoggerFactory.getLogger(CompositeExternalResourceProviderService.class);
+
+    private final String name;
+    private final Set<ExternalResourceProviderWorker> workers = new HashSet<>();
+    private final CountDownLatch restrainStartupLatch;
+    private volatile boolean started = false;
+
+    CompositeExternalResourceProviderService(final String name, final Set<ExternalResourceProviderWorker> workers, final CountDownLatch restrainStartupLatch) {
+        this.name = name;
+        this.workers.addAll(workers);
+        this.restrainStartupLatch = restrainStartupLatch;
+    }
+
+    @Override
+    public synchronized void start() {
+        if (started) {
+            return;
+        }
+
+        LOGGER.info("Starting External Resource Provider Service ...");
+
+        for (final ExternalResourceProviderWorker worker : workers) {
+            final Thread workerThread = getThread(worker);
+            workerThread.start();
+        }
+
+        try {
+            restrainStartupLatch.await();
+        } catch (final InterruptedException e) {
+            Thread.currentThread().interrupt();
+            throw new ExternalResourceProviderException("Starting External Resource Provider Service is interrupted");
+        }
+
+        LOGGER.info("External Resource Provider Service is started successfully");
+    }
+
+    @Override
+    public synchronized void stop() {
+        started = false;
+
+        if (workers != null) {
+            workers.forEach(ExternalResourceProviderWorker::stop);
+            workers.clear();
+        }
+
+        LOGGER.info("External Resource Provider Service is stopped");
+    }
+
+    private Thread getThread(final ExternalResourceProviderWorker worker) {

Review comment:
       Perhaps should name this `createThread` rather than `getThread` - `getThread` implies to me that it's just getting an existing value

##########
File path: nifi-nar-bundles/nifi-framework-bundle/nifi-framework-external-resource-utils/src/main/java/org/apache/nifi/flow/resource/ExternalResourceProviderServiceBuilder.java
##########
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.flow.resource;
+
+import org.apache.nifi.nar.ExtensionManager;
+import org.apache.nifi.nar.NarProvider;
+import org.apache.nifi.nar.NarThreadContextClassLoader;
+import org.apache.nifi.security.util.TlsException;
+import org.apache.nifi.util.NiFiProperties;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.util.HashSet;
+import java.util.Optional;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.CountDownLatch;
+import java.util.function.Predicate;
+
+public final class ExternalResourceProviderServiceBuilder {
+    private static final Logger LOGGER = LoggerFactory.getLogger(ExternalResourceProviderServiceBuilder.class);
+
+    private static final String IMPLEMENTATION_PROPERTY = "implementation";
+    private static final long DEFAULT_POLL_INTERVAL_MS = 300000;
+
+    private final String serviceName;
+    private final ExtensionManager extensionManager;
+    private final NiFiProperties properties;
+    private final String providerPropertyPrefix;
+
+    private File targetDirectory;
+    private long pollInterval = DEFAULT_POLL_INTERVAL_MS;
+    private String conflictResolutionStrategy;
+    private Optional<Predicate<ExternalResourceDescriptor>> filter = Optional.empty();
+    private boolean restrainStartup = true;
+
+    public ExternalResourceProviderServiceBuilder(
+            final String serviceName,
+            final ExtensionManager extensionManager,
+            final NiFiProperties properties,
+            final String providerPropertyPrefix) {
+        this.serviceName = serviceName;
+        this.extensionManager = extensionManager;
+        this.properties = properties;
+        this.providerPropertyPrefix = providerPropertyPrefix;
+    }
+
+    public ExternalResourceProviderServiceBuilder targetDirectory(final File targetDirectory) {
+        this.targetDirectory = targetDirectory;
+        return this;
+    }
+
+    public ExternalResourceProviderServiceBuilder targetDirectoryProperty(final String propertyName, final String defaultValue) {
+        this.targetDirectory = new File(properties.getProperty(propertyName, defaultValue));
+        return this;
+    }
+
+    public ExternalResourceProviderServiceBuilder conflictResolutionStrategy(final String propertyName, final String defaultValue) {
+        this.conflictResolutionStrategy = properties.getProperty(propertyName, defaultValue);
+        return this;
+    }
+
+    public ExternalResourceProviderServiceBuilder pollInterval(final long pollInterval) {
+        this.pollInterval = pollInterval;
+        return this;
+    }
+
+    public ExternalResourceProviderServiceBuilder pollIntervalProperty(final String propertyName) {
+        return pollIntervalProperty(propertyName, DEFAULT_POLL_INTERVAL_MS);
+    }
+
+    public ExternalResourceProviderServiceBuilder pollIntervalProperty(final String propertyName, final long defaultValue) {
+        this.pollInterval = properties.getLongProperty(propertyName, defaultValue);
+        return this;
+    }

Review comment:
       Not sure that we should be providing property names to the builder and letting it lookup a property from some separate properties object. This is a leaky abstraction.
   Instead, we should provide the value to use directly to the builder. If we want to fetch that value from a NiFiProperties object, that's fine, but the NiFiProperties object shouldn't be provided to the builder itself.

##########
File path: nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/flow/resource/hadoop/HDFSExternalResourceProvider.java
##########
@@ -0,0 +1,262 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.flow.resource.hadoop;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.nifi.annotation.behavior.RequiresInstanceClassLoading;
+import org.apache.nifi.flow.resource.ExternalResourceDescriptor;
+import org.apache.nifi.flow.resource.ExternalResourceProvider;
+import org.apache.nifi.flow.resource.ExternalResourceProviderInitializationContext;
+import org.apache.nifi.flow.resource.ImmutableExternalResourceDescriptor;
+import org.apache.nifi.hadoop.SecurityUtil;
+import org.apache.nifi.processors.hadoop.ExtendedConfiguration;
+import org.apache.nifi.processors.hadoop.HdfsResources;
+import org.apache.nifi.security.krb.KerberosKeytabUser;
+import org.apache.nifi.security.krb.KerberosPasswordUser;
+import org.apache.nifi.security.krb.KerberosUser;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.net.SocketFactory;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.InetSocketAddress;
+import java.net.Socket;
+import java.net.URI;
+import java.security.PrivilegedExceptionAction;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+import java.util.Objects;
+import java.util.stream.Collectors;
+
+@RequiresInstanceClassLoading(cloneAncestorResources = true)
+public class HDFSExternalResourceProvider implements ExternalResourceProvider {
+    private static final Logger LOGGER = LoggerFactory.getLogger(HDFSExternalResourceProvider.class);
+
+    private static final String RESOURCES_PARAMETER = "resources";
+    private static final String SOURCE_DIRECTORY_PARAMETER = "source.directory";
+    private static final String STORAGE_LOCATION = "storage.location";
+    private static final String KERBEROS_PRINCIPAL_PARAMETER = "kerberos.principal";
+    private static final String KERBEROS_KEYTAB_PARAMETER = "kerberos.keytab";
+    private static final String KERBEROS_PASSWORD_PARAMETER = "kerberos.password";
+    private static final Object RESOURCES_LOCK = new Object();
+    private static final String STORAGE_LOCATION_PROPERTY = "fs.defaultFS";
+    private static final int BUFFER_SIZE_DEFAULT = 4096;
+    private static final String DELIMITER = "/";
+
+    private volatile List<String> resources = null;
+    private volatile Path sourceDirectory = null;
+    private volatile String storageLocation = null;
+
+    private volatile ExternalResourceProviderInitializationContext context;
+    private volatile boolean initialized = false;
+
+    @Override
+    public void initialize(final ExternalResourceProviderInitializationContext context) {
+        resources = Arrays.stream(Objects.requireNonNull(
+                context.getProperties().get(RESOURCES_PARAMETER)).split(",")).map(s -> s.trim()).filter(s -> !s.isEmpty()).collect(Collectors.toList());
+
+        if (resources.isEmpty()) {
+            throw new IllegalArgumentException("At least one HDFS configuration resource is necessary");
+        }
+
+        final String sourceDirectory = context.getProperties().get(SOURCE_DIRECTORY_PARAMETER);
+
+        if (sourceDirectory == null || sourceDirectory.isEmpty()) {
+            throw new IllegalArgumentException("Provider needs the source directory to be set");
+        }
+
+        this.sourceDirectory = new Path(sourceDirectory);
+        this.storageLocation = context.getProperties().get(STORAGE_LOCATION);
+
+        this.context = context;
+        this.initialized = true;
+    }
+
+    @Override
+    public Collection<ExternalResourceDescriptor> listResources() throws IOException {
+        if (!initialized) {
+            LOGGER.error("Provider is not initialized");

Review comment:
       This should probably throw an Exception, rather than logging an error and moving on

##########
File path: nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/pom.xml
##########
@@ -140,6 +140,12 @@
             <artifactId>nifi-record</artifactId>
             <scope>compile</scope>
         </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-framework-external-resource-utils</artifactId>
+            <version>1.16.0-SNAPSHOT</version>
+            <scope>compile</scope>
+        </dependency>

Review comment:
       We should not have anything in nifi-hadoop-bundle depending on anything in nifi-framework. Anything that is needed by nifi-hadoop-bundle (or any implementation of a provider, for that matter) should exist in `nifi-api` or in a util that lives within `nifi-commons`. May need to break up the `nifi-framework-external-resource-utils` into multiple modules.

##########
File path: nifi-nar-bundles/nifi-framework-bundle/nifi-server-nar/pom.xml
##########
@@ -42,6 +42,11 @@
             <artifactId>nifi-framework-nar-loading-utils</artifactId>
             <version>1.16.0-SNAPSHOT</version>
         </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-framework-external-resource-utils</artifactId>
+            <version>1.16.0-SNAPSHOT</version>
+        </dependency>

Review comment:
       I don't see anything updated to use this in the nifi-server-nar. Is this necessary?

##########
File path: nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/flow/resource/hadoop/HDFSExternalResourceProvider.java
##########
@@ -0,0 +1,262 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.flow.resource.hadoop;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.nifi.annotation.behavior.RequiresInstanceClassLoading;
+import org.apache.nifi.flow.resource.ExternalResourceDescriptor;
+import org.apache.nifi.flow.resource.ExternalResourceProvider;
+import org.apache.nifi.flow.resource.ExternalResourceProviderInitializationContext;
+import org.apache.nifi.flow.resource.ImmutableExternalResourceDescriptor;
+import org.apache.nifi.hadoop.SecurityUtil;
+import org.apache.nifi.processors.hadoop.ExtendedConfiguration;
+import org.apache.nifi.processors.hadoop.HdfsResources;
+import org.apache.nifi.security.krb.KerberosKeytabUser;
+import org.apache.nifi.security.krb.KerberosPasswordUser;
+import org.apache.nifi.security.krb.KerberosUser;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.net.SocketFactory;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.InetSocketAddress;
+import java.net.Socket;
+import java.net.URI;
+import java.security.PrivilegedExceptionAction;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+import java.util.Objects;
+import java.util.stream.Collectors;
+
+@RequiresInstanceClassLoading(cloneAncestorResources = true)
+public class HDFSExternalResourceProvider implements ExternalResourceProvider {
+    private static final Logger LOGGER = LoggerFactory.getLogger(HDFSExternalResourceProvider.class);
+
+    private static final String RESOURCES_PARAMETER = "resources";
+    private static final String SOURCE_DIRECTORY_PARAMETER = "source.directory";
+    private static final String STORAGE_LOCATION = "storage.location";
+    private static final String KERBEROS_PRINCIPAL_PARAMETER = "kerberos.principal";
+    private static final String KERBEROS_KEYTAB_PARAMETER = "kerberos.keytab";
+    private static final String KERBEROS_PASSWORD_PARAMETER = "kerberos.password";
+    private static final Object RESOURCES_LOCK = new Object();
+    private static final String STORAGE_LOCATION_PROPERTY = "fs.defaultFS";
+    private static final int BUFFER_SIZE_DEFAULT = 4096;
+    private static final String DELIMITER = "/";
+
+    private volatile List<String> resources = null;
+    private volatile Path sourceDirectory = null;
+    private volatile String storageLocation = null;
+
+    private volatile ExternalResourceProviderInitializationContext context;
+    private volatile boolean initialized = false;
+
+    @Override
+    public void initialize(final ExternalResourceProviderInitializationContext context) {
+        resources = Arrays.stream(Objects.requireNonNull(
+                context.getProperties().get(RESOURCES_PARAMETER)).split(",")).map(s -> s.trim()).filter(s -> !s.isEmpty()).collect(Collectors.toList());
+
+        if (resources.isEmpty()) {
+            throw new IllegalArgumentException("At least one HDFS configuration resource is necessary");
+        }
+
+        final String sourceDirectory = context.getProperties().get(SOURCE_DIRECTORY_PARAMETER);
+
+        if (sourceDirectory == null || sourceDirectory.isEmpty()) {
+            throw new IllegalArgumentException("Provider needs the source directory to be set");
+        }
+
+        this.sourceDirectory = new Path(sourceDirectory);
+        this.storageLocation = context.getProperties().get(STORAGE_LOCATION);
+
+        this.context = context;
+        this.initialized = true;
+    }
+
+    @Override
+    public Collection<ExternalResourceDescriptor> listResources() throws IOException {
+        if (!initialized) {
+            LOGGER.error("Provider is not initialized");
+        }
+
+        final HdfsResources hdfsResources = getHdfsResources();
+
+        try {
+
+            final FileStatus[] fileStatuses = hdfsResources.getUserGroupInformation()
+                    .doAs((PrivilegedExceptionAction<FileStatus[]>) () -> hdfsResources.getFileSystem().listStatus(sourceDirectory));
+
+            final List<ExternalResourceDescriptor> result = Arrays.stream(fileStatuses)
+                    .filter(fileStatus -> fileStatus.isFile())
+                    .map(HDFSExternalResourceProvider::convertStatusToDescriptor)
+                    .filter(context.getFilter().orElseGet(() -> flowResourceDescriptor -> true))
+                    .collect(Collectors.toList());
+
+            if (LOGGER.isDebugEnabled()) {
+                LOGGER.debug("The following NARs were found: " + String.join(", ", result.stream().map(d -> d.getLocation()).collect(Collectors.toList())));
+            }
+
+            return result;
+        } catch (final InterruptedException e) {
+            Thread.currentThread().interrupt();
+            throw new IOException("Provider cannot list resources", e);
+        }
+    }
+
+    private static ExternalResourceDescriptor convertStatusToDescriptor(final FileStatus fileStatus) {
+        return new ImmutableExternalResourceDescriptor(fileStatus.getPath().getName(), fileStatus.getModificationTime());
+    }
+
+    @Override
+    public InputStream fetchExternalResource(final ExternalResourceDescriptor descriptor) throws IOException {
+        if (!initialized) {
+            LOGGER.error("Provider is not initialized");
+        }
+
+        final String location = descriptor.getLocation();
+        final Path path = getLocation(location);
+        final HdfsResources hdfsResources = getHdfsResources();
+
+        try {
+            if (hdfsResources.getUserGroupInformation().doAs((PrivilegedExceptionAction<Boolean>) () -> !hdfsResources.getFileSystem().exists(path))) {
+                throw new IOException("Provider cannot find " + location);
+            }
+
+            return hdfsResources.getUserGroupInformation()
+                    .doAs((PrivilegedExceptionAction<FSDataInputStream>) () -> hdfsResources.getFileSystem().open(path, BUFFER_SIZE_DEFAULT));

Review comment:
       Any reason not to combine this into a single call to `doAs`?
   ```
   return hdfsResources.getUserGroupInformation().doAs((PrivilegedExceptionAction<FSDataInputStream>) () -> {
       if (!hdfsResources.getFileSystem().exists(path)) {
           throw new IOException("Cannot find file in HDFS at location " + location);
       }
   
       return hdfsResources.getFileSystem().open(path, BUFFER_SIZE_DEFAULT);
   });
   ```
   Or honestly, perhaps even just dropping the call to check if the file exists to begin with, as the call to `FileSystem.open()`should already throw an appropriate IOException

##########
File path: nifi-api/src/main/java/org/apache/nifi/flow/resource/ExternalResourceDescriptor.java
##########
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.flow.resource;
+
+/**
+ * Describes an available resource might be fetched from the external source.
+ */
+public interface ExternalResourceDescriptor {
+
+    /**
+     * @return The location of the resource, where the format depends on the actual provider implementation.
+     */
+    String getLocation();
+
+    /**
+     * @return Returns the modification time of the original resource file using Unix timestamp format.
+     */
+    long getModifiedAt();

Review comment:
       `getLastModified()` might make more sense than `getModifiedAt()` - it's a fairly well recognized convention

##########
File path: nifi-nar-bundles/nifi-framework-bundle/nifi-framework-external-resource-utils/src/main/resources/META-INF/services/org.apache.nifi.flow.resource.ExternalResourceConflictResolutionStrategy
##########
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+org.apache.nifi.flow.resource.DoNotReplaceResolutionStrategy
+org.apache.nifi.flow.resource.ReplaceWithNewerResolutionStrategy

Review comment:
       This feels like overkill to me. This is a strategy for how to deal with conflicts. As mentioned above in the docs, a simple String value of `REPLACE` or `IGNORE` is a lot easier to configure for the user. It also means that we can avoid using ServiceLoader mechanisms, etc. and just support the values we expect - and then just create a new instance of either `ReplaceWithNewer` or `DoNotReplace`

##########
File path: nifi-nar-bundles/nifi-framework-bundle/nifi-framework-external-resource-utils/src/main/java/org/apache/nifi/flow/resource/BufferingExternalResourceProviderWorker.java
##########
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.flow.resource;
+
+import org.apache.nifi.nar.NarCloseable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.file.Files;
+import java.util.concurrent.CountDownLatch;
+
+/**
+ * It is important to note that this implementation prevents NiFi from catching up a file under writing. For example trying to load a NAR
+ * file which is not fully acquired for example could lead to issues. In order to avoid this, the worker first creates a temporary
+ * file and it will rename it to the expected name only after it has been successfully written to the disk.
+ */
+final class BufferingExternalResourceProviderWorker extends ConflictResolvingExternalResourceProviderWorker {
+    private static final Logger LOGGER = LoggerFactory.getLogger(BufferingExternalResourceProviderWorker.class);
+
+    BufferingExternalResourceProviderWorker(
+        final String namePrefix,
+        final ClassLoader providerClassLoader,
+        final ExternalResourceProvider provider,
+        final ExternalResourceConflictResolutionStrategy resolutionStrategy,
+        final File targetDirectory,
+        final long pollTimeInMs,
+        final CountDownLatch restrainStartupLatch
+    ) {
+        super(namePrefix, providerClassLoader, provider, resolutionStrategy, targetDirectory, pollTimeInMs, restrainStartupLatch);
+    }
+
+    protected void acquireResource(final ExternalResourceDescriptor availableResource) throws IOException {
+        final long startedAt = System.currentTimeMillis();
+        final InputStream inputStream;
+
+        final File targetFile = new File(getTargetDirectory(), availableResource.getLocation());
+        final File bufferFile = new File(getTargetDirectory().getPath() + "/.provider_" + getId() + "_buffer.tmp");
+        final File setAsideFile = new File(getTargetDirectory().getPath() + "/.provider_" + getId() + "_aside.tmp");

Review comment:
       I'm not sure what a "buffer file" and a "set aside file" are. But given the way they are used I believe the intent is for them to be used as a temporary file to store the data in, and a backup. Perhaps they should be renamed as such - `tempFile` and `backupFile`

##########
File path: nifi-nar-bundles/nifi-framework-bundle/nifi-framework-external-resource-utils/src/main/java/org/apache/nifi/flow/resource/ReplaceWithNewerResolutionStrategy.java
##########
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.flow.resource;
+
+import java.io.File;
+import java.util.Arrays;
+
+/**
+ * This strategy allows to replace the already acquired external resource if the available is newer, based on the modification time.
+ *
+ * This strategy assumes that the external source maintains the modification time in a proper manner and the local files are
+ * not modified by other parties.
+ */
+public final class ReplaceWithNewerResolutionStrategy implements ExternalResourceConflictResolutionStrategy {
+
+    @Override
+    public boolean shouldBeFetched(final File targetDirectory, final ExternalResourceDescriptor available) {
+        return !Arrays.stream(targetDirectory.listFiles())
+                .filter(f -> f.getName().equals(available.getLocation()))
+                .filter(f -> f.lastModified() >= available.getModifiedAt())
+                .findFirst()
+                .isPresent();

Review comment:
       I think this is also a lot more complex than it needs to be. We can just create the file object and compare last modified:
   ```
   final File file = new File(targetDirectory, available.getName());
   return !file.exists || file.lastModified() >= available.getModifiedAt();
   ```

##########
File path: nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-nar-utils/src/main/java/org/apache/nifi/nar/StandardExtensionDiscoveringManager.java
##########
@@ -111,6 +113,8 @@ public StandardExtensionDiscoveringManager(final Collection<Class<? extends Conf
         definitionMap.put(StateProvider.class, new HashSet<>());
         definitionMap.put(StatusAnalyticsModel.class, new HashSet<>());
         definitionMap.put(NarProvider.class, new HashSet<>());
+        definitionMap.put(ExternalResourceProvider.class, new HashSet<>());
+        definitionMap.put(ExternalResourceConflictResolutionStrategy.class, new HashSet<>());

Review comment:
       As mentioned above, should avoid introducing this notion of ExternalResourceConflictResolutionStrategy when it can more easily be provided as just a simple configuration option

##########
File path: nifi-nar-bundles/nifi-framework-bundle/nifi-framework-external-resource-utils/src/main/java/org/apache/nifi/flow/resource/BufferingExternalResourceProviderWorker.java
##########
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.flow.resource;
+
+import org.apache.nifi.nar.NarCloseable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.file.Files;
+import java.util.concurrent.CountDownLatch;
+
+/**
+ * It is important to note that this implementation prevents NiFi from catching up a file under writing. For example trying to load a NAR
+ * file which is not fully acquired for example could lead to issues. In order to avoid this, the worker first creates a temporary
+ * file and it will rename it to the expected name only after it has been successfully written to the disk.
+ */
+final class BufferingExternalResourceProviderWorker extends ConflictResolvingExternalResourceProviderWorker {
+    private static final Logger LOGGER = LoggerFactory.getLogger(BufferingExternalResourceProviderWorker.class);
+
+    BufferingExternalResourceProviderWorker(
+        final String namePrefix,
+        final ClassLoader providerClassLoader,
+        final ExternalResourceProvider provider,
+        final ExternalResourceConflictResolutionStrategy resolutionStrategy,
+        final File targetDirectory,
+        final long pollTimeInMs,
+        final CountDownLatch restrainStartupLatch
+    ) {
+        super(namePrefix, providerClassLoader, provider, resolutionStrategy, targetDirectory, pollTimeInMs, restrainStartupLatch);
+    }
+
+    protected void acquireResource(final ExternalResourceDescriptor availableResource) throws IOException {
+        final long startedAt = System.currentTimeMillis();
+        final InputStream inputStream;
+
+        final File targetFile = new File(getTargetDirectory(), availableResource.getLocation());

Review comment:
       The docs for the `ExternalResourceDescriptor` say that `getLocation()`'s format depends on the provider. So for all we know, it may be some complex JSON object or a URL. Trying to create a File based on this has a lot of potential to create problems. We should probably instead add a `getFilename()` method on `ExternalResourceDescriptor` in addition to just `getLocation()`




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org