You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ex...@apache.org on 2023/10/13 23:38:23 UTC

[nifi] 02/02: NIFI-12160 Kafka Connect: Check for NAR unpacking before starting

This is an automated email from the ASF dual-hosted git repository.

exceptionfactory pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git

commit b2e3898e171b89e25b5e06f6ee849bb3edabbf9a
Author: Peter Gyori <pg...@apache.org>
AuthorDate: Tue Oct 3 14:42:29 2023 +0200

    NIFI-12160 Kafka Connect: Check for NAR unpacking before starting
    
    Check that required NAR files are unpacked completely before starting the Kafka Connector
    
    This closes #7832
    
    Signed-off-by: David Handermann <ex...@apache.org>
---
 .../kafka/connect/WorkingDirectoryUtilsTest.java   | 222 +++++++++++++++++++++
 .../kafka/connect/StatelessKafkaConnectorUtil.java |  10 +-
 .../nifi/kafka/connect/WorkingDirectoryUtils.java  | 100 ++++++++++
 3 files changed, 329 insertions(+), 3 deletions(-)

diff --git a/nifi-external/nifi-kafka-connect/nifi-kafka-connector-tests/src/test/java/org/apache/nifi/kafka/connect/WorkingDirectoryUtilsTest.java b/nifi-external/nifi-kafka-connect/nifi-kafka-connector-tests/src/test/java/org/apache/nifi/kafka/connect/WorkingDirectoryUtilsTest.java
new file mode 100644
index 0000000000..e98fcaabb4
--- /dev/null
+++ b/nifi-external/nifi-kafka-connect/nifi-kafka-connector-tests/src/test/java/org/apache/nifi/kafka/connect/WorkingDirectoryUtilsTest.java
@@ -0,0 +1,222 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.kafka.connect;
+
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.io.File;
+import java.io.IOException;
+
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.junit.jupiter.api.io.CleanupMode.ALWAYS;
+
+public class WorkingDirectoryUtilsTest {
+
+    @Test
+    public void testDeleteNonexistentFile(@TempDir(cleanup = ALWAYS) File tempDir) {
+        File nonexistentFile = new File(tempDir, "testFile");
+
+        WorkingDirectoryUtils.purgeDirectory(nonexistentFile);
+
+        assertFalse(nonexistentFile.exists());
+    }
+
+    @Test
+    public void testDeleteFlatFile(@TempDir(cleanup = ALWAYS) File tempDir) throws IOException {
+        File file = new File(tempDir, "testFile");
+        file.createNewFile();
+
+        WorkingDirectoryUtils.purgeDirectory(file);
+
+        assertFalse(file.exists());
+    }
+
+    @Test
+    public void testDeleteDirectoryWithContents(@TempDir(cleanup = ALWAYS) File tempDir) throws IOException {
+        File directory = new File(tempDir, "directory");
+        File subDirectory = new File(directory, "subDirectory");
+        File subDirectoryContent = new File(subDirectory, "subDirectoryContent");
+        File directoryContent = new File(directory, "directoryContent");
+
+        directory.mkdir();
+        subDirectory.mkdir();
+        subDirectoryContent.createNewFile();
+        directoryContent.createNewFile();
+
+        WorkingDirectoryUtils.purgeDirectory(directory);
+
+        assertFalse(directory.exists());
+    }
+
+    @Test
+    public void testPurgeUnpackedNarsEmptyRootDirectory(@TempDir(cleanup = ALWAYS) File tempDir) {
+        File rootDirectory = new File(tempDir, "rootDirectory");
+
+        rootDirectory.mkdir();
+
+        WorkingDirectoryUtils.purgeIncompleteUnpackedNars(rootDirectory);
+
+        assertTrue(rootDirectory.exists());
+    }
+
+    @Test
+    public void testPurgeUnpackedNarsRootDirectoryWithFilesOnly(@TempDir(cleanup = ALWAYS) File tempDir) throws IOException {
+        File rootDirectory = new File(tempDir, "rootDirectory");
+        File directoryContent1 = new File(rootDirectory, "file1");
+        File directoryContent2 = new File(rootDirectory, "file2");
+
+        rootDirectory.mkdir();
+        directoryContent1.createNewFile();
+        directoryContent2.createNewFile();
+
+        WorkingDirectoryUtils.purgeIncompleteUnpackedNars(rootDirectory);
+
+        assertTrue(rootDirectory.exists() && directoryContent1.exists() && directoryContent2.exists());
+    }
+
+    @Test
+    public void testPurgeUnpackedNars(@TempDir(cleanup = ALWAYS) File tempDir) throws IOException {
+        File rootDirectory = new File(tempDir, "rootDirectory");
+        rootDirectory.mkdir();
+        TestDirectoryStructure testDirectoryStructure = new TestDirectoryStructure(rootDirectory);
+
+        WorkingDirectoryUtils.purgeIncompleteUnpackedNars(testDirectoryStructure.getRootDirectory());
+
+        assertTrue(testDirectoryStructure.isConsistent());
+    }
+
+    @Test
+    public void testWorkingDirectoryIntegrityRestored(@TempDir(cleanup = ALWAYS) File tempDir) throws IOException {
+        /*
+        workingDirectory
+            - nar
+                - extensions
+                    - *TestDirectoryStructure*
+                - narDirectory
+                - narFile
+            - extensions
+                - *TestDirectoryStructure*
+            - additionalDirectory
+            - workingDirectoryFile
+         */
+        File workingDirectory = new File(tempDir, "workingDirectory");
+        File nar = new File(workingDirectory, "nar");
+        File narExtensions = new File(nar, "extensions");
+        File narDirectory = new File(nar, "narDirectory");
+        File narFile = new File(nar, "narFile");
+        File extensions = new File(workingDirectory, "extensions");
+        File additionalDirectory = new File(workingDirectory, "additionalDirectory");
+        File workingDirectoryFile = new File(workingDirectory, "workingDirectoryFile");
+
+        workingDirectory.mkdir();
+        nar.mkdir();
+        narExtensions.mkdir();
+        narDirectory.mkdir();
+        narFile.createNewFile();
+        extensions.mkdir();
+        additionalDirectory.mkdir();
+        workingDirectoryFile.createNewFile();
+
+        TestDirectoryStructure narExtensionsStructure = new TestDirectoryStructure(narExtensions);
+        TestDirectoryStructure extensionsStructure = new TestDirectoryStructure(extensions);
+
+        WorkingDirectoryUtils.reconcileWorkingDirectory(workingDirectory);
+
+        assertTrue(workingDirectory.exists()
+                && nar.exists()
+                && narExtensionsStructure.isConsistent()
+                && narDirectory.exists()
+                && narFile.exists()
+                && extensionsStructure.isConsistent()
+                && additionalDirectory.exists()
+                && workingDirectoryFile.exists()
+        );
+    }
+
+    private class TestDirectoryStructure {
+        /*
+            rootDirectory
+                - subDirectory1-nar-unpacked
+                    - subDirectory1File1
+                    - nar-digest
+                - subDirectory2
+                    - subDirectory2File1
+                - subDirectory3-nar-unpacked
+                    - subDirectory3Dir1
+                        - subDirectory3Dir1File1
+                    - subDirectory3File1
+                - fileInRoot
+         */
+        File rootDirectory;
+        File subDirectory1;
+        File subDirectory2;
+        File subDirectory3;
+        File fileInRoot;
+        File subDirectory1File1;
+        File subDirectory1File2;
+        File subDirectory2File1;
+        File subDirectory3Dir1;
+        File subDirectory3File1;
+        File subDirectory3Dir1File1;
+
+        public TestDirectoryStructure(final File rootDirectory) throws IOException {
+            this.rootDirectory = rootDirectory;
+            subDirectory1 = new File(rootDirectory, "subDirectory1-" + WorkingDirectoryUtils.NAR_UNPACKED_SUFFIX);
+            subDirectory2 = new File(rootDirectory, "subDirector2");
+            subDirectory3 = new File(rootDirectory, "subDirector3-" + WorkingDirectoryUtils.NAR_UNPACKED_SUFFIX);
+            fileInRoot = new File(rootDirectory, "fileInRoot");
+            subDirectory1File1 = new File(subDirectory1, "subDirectory1File1");
+            subDirectory1File2 = new File(subDirectory1, WorkingDirectoryUtils.HASH_FILENAME);
+            subDirectory2File1 = new File(subDirectory2, "subDirectory2File1");
+            subDirectory3Dir1 = new File(subDirectory3, "subDirectory3Dir1");
+            subDirectory3File1 = new File(subDirectory3, "subDirectory3File1");
+            subDirectory3Dir1File1 = new File(subDirectory3Dir1, "subDirectory3Dir1File1");
+
+            subDirectory1.mkdir();
+            subDirectory2.mkdir();
+            subDirectory3.mkdir();
+            fileInRoot.createNewFile();
+            subDirectory1File1.createNewFile();
+            subDirectory1File2.createNewFile();
+            subDirectory2File1.createNewFile();
+            subDirectory3File1.createNewFile();
+            subDirectory3Dir1.mkdir();
+            subDirectory3Dir1File1.createNewFile();
+        }
+
+        public File getRootDirectory() {
+            return rootDirectory;
+        }
+
+        /**
+         * Checks if all directories ending in 'nar-unpacked' that have a file named 'nar-digest' within still exist,
+         * and the directory ending in 'nar-unpacked' without 'nar-digest' has been removed with all of its contents.
+         * @return true if the above is met.
+         */
+        public boolean isConsistent() {
+            return (rootDirectory.exists()
+                    && subDirectory1.exists() && subDirectory1File1.exists() && subDirectory1File2.exists()
+                    && subDirectory2.exists() && subDirectory2File1.exists()
+                    && !(subDirectory3.exists() || subDirectory3Dir1.exists() || subDirectory3File1.exists() || subDirectory3Dir1File1.exists())
+                    && fileInRoot.exists());
+        }
+    }
+
+}
diff --git a/nifi-external/nifi-kafka-connect/nifi-kafka-connector/src/main/java/org/apache/nifi/kafka/connect/StatelessKafkaConnectorUtil.java b/nifi-external/nifi-kafka-connect/nifi-kafka-connector/src/main/java/org/apache/nifi/kafka/connect/StatelessKafkaConnectorUtil.java
index dcc138c977..5554217f42 100644
--- a/nifi-external/nifi-kafka-connect/nifi-kafka-connector/src/main/java/org/apache/nifi/kafka/connect/StatelessKafkaConnectorUtil.java
+++ b/nifi-external/nifi-kafka-connect/nifi-kafka-connector/src/main/java/org/apache/nifi/kafka/connect/StatelessKafkaConnectorUtil.java
@@ -84,6 +84,7 @@ public class StatelessKafkaConnectorUtil {
             config.setFlowDefinition(dataflowDefinitionProperties);
             dataflowDefinitionProperties.put(StatelessNiFiCommonConfig.BOOTSTRAP_FLOW_NAME, dataflowName);
             MDC.setContextMap(Collections.singletonMap("dataflow", dataflowName));
+            StatelessDataflow dataflow;
 
             // Use a Write Lock to ensure that only a single thread is calling StatelessBootstrap.bootstrap().
             // We do this because the bootstrap() method will expand all NAR files into the working directory.
@@ -91,13 +92,16 @@ public class StatelessKafkaConnectorUtil {
             // unpacking NARs at the same time, as it could potentially result in the working directory becoming corrupted.
             unpackNarLock.lock();
             try {
+                WorkingDirectoryUtils.reconcileWorkingDirectory(engineConfiguration.getWorkingDirectory());
+
                 bootstrap = StatelessBootstrap.bootstrap(engineConfiguration, StatelessNiFiSourceTask.class.getClassLoader());
+
+                dataflowDefinition = bootstrap.parseDataflowDefinition(dataflowDefinitionProperties, parameterOverrides);
+                dataflow = bootstrap.createDataflow(dataflowDefinition);
             } finally {
                 unpackNarLock.unlock();
             }
-
-            dataflowDefinition = bootstrap.parseDataflowDefinition(dataflowDefinitionProperties, parameterOverrides);
-            return bootstrap.createDataflow(dataflowDefinition);
+            return dataflow;
         } catch (final Exception e) {
             throw new RuntimeException("Failed to bootstrap Stateless NiFi Engine", e);
         }
diff --git a/nifi-external/nifi-kafka-connect/nifi-kafka-connector/src/main/java/org/apache/nifi/kafka/connect/WorkingDirectoryUtils.java b/nifi-external/nifi-kafka-connect/nifi-kafka-connector/src/main/java/org/apache/nifi/kafka/connect/WorkingDirectoryUtils.java
new file mode 100644
index 0000000000..1453267a30
--- /dev/null
+++ b/nifi-external/nifi-kafka-connect/nifi-kafka-connector/src/main/java/org/apache/nifi/kafka/connect/WorkingDirectoryUtils.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.kafka.connect;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.util.Arrays;
+
+public class WorkingDirectoryUtils {
+
+    protected static final String NAR_UNPACKED_SUFFIX = "nar-unpacked";
+    protected static final String HASH_FILENAME = "nar-digest";
+    private static final Logger logger = LoggerFactory.getLogger(WorkingDirectoryUtils.class);
+
+    /**
+     * Goes through the nar/extensions and extensions directories within the working directory
+     * and deletes every directory whose name ends in "nar-unpacked" and does not have a
+     * "nar-digest" file in it.
+     * @param workingDirectory File object pointing to the working directory.
+     */
+    public static void reconcileWorkingDirectory(final File workingDirectory) {
+        purgeIncompleteUnpackedNars(new File(new File(workingDirectory, "nar"), "extensions"));
+        purgeIncompleteUnpackedNars(new File(workingDirectory, "extensions"));
+    }
+
+    /**
+     * Receives a directory as parameter and goes through every directory within it that ends in
+     * "nar-unpacked". If a directory ending in "nar-unpacked" does not have a file named
+     * "nar-digest" within it, it gets deleted with all of its contents.
+     * @param directory A File object pointing to the directory that is supposed to contain
+     *                  further directories whose name ends in "nar-unpacked".
+     */
+    public static void purgeIncompleteUnpackedNars(final File directory) {
+        final File[] unpackedDirs = directory.listFiles(file -> file.isDirectory() && file.getName().endsWith(NAR_UNPACKED_SUFFIX));
+        if (unpackedDirs == null || unpackedDirs.length == 0) {
+            logger.debug("Found no unpacked NARs in {}", directory);
+            if (logger.isDebugEnabled()) {
+                logger.debug("Directory contains: {}", Arrays.deepToString(directory.listFiles()));
+            }
+            return;
+        }
+
+        for (final File unpackedDir : unpackedDirs) {
+            final File narHashFile = new File(unpackedDir, HASH_FILENAME);
+            if (narHashFile.exists()) {
+                logger.debug("Already successfully unpacked {}", unpackedDir);
+            } else {
+                purgeDirectory(unpackedDir);
+            }
+        }
+    }
+
+    /**
+     * Delete a directory with all of its contents.
+     * @param directory The directory to be deleted.
+     */
+    public static void purgeDirectory(final File directory) {
+        if (directory.exists()) {
+            deleteRecursively(directory);
+            logger.debug("Cleaned up {}", directory);
+        }
+    }
+
+    private static void deleteRecursively(final File fileOrDirectory) {
+        if (fileOrDirectory.isDirectory()) {
+            final File[] files = fileOrDirectory.listFiles();
+            if (files != null) {
+                for (final File file : files) {
+                    deleteRecursively(file);
+                }
+            }
+        }
+        deleteQuietly(fileOrDirectory);
+    }
+
+    private static void deleteQuietly(final File file) {
+        final boolean deleted = file.delete();
+        if (!deleted) {
+            logger.debug("Failed to cleanup temporary file {}", file);
+        }
+    }
+
+}