You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ex...@apache.org on 2023/10/13 23:38:23 UTC
[nifi] 02/02: NIFI-12160 Kafka Connect: Check for NAR unpacking before starting
This is an automated email from the ASF dual-hosted git repository.
exceptionfactory pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git
commit b2e3898e171b89e25b5e06f6ee849bb3edabbf9a
Author: Peter Gyori <pg...@apache.org>
AuthorDate: Tue Oct 3 14:42:29 2023 +0200
NIFI-12160 Kafka Connect: Check for NAR unpacking before starting
Check that required NAR files are unpacked completely before starting the Kafka Connector
This closes #7832
Signed-off-by: David Handermann <ex...@apache.org>
---
.../kafka/connect/WorkingDirectoryUtilsTest.java | 222 +++++++++++++++++++++
.../kafka/connect/StatelessKafkaConnectorUtil.java | 10 +-
.../nifi/kafka/connect/WorkingDirectoryUtils.java | 100 ++++++++++
3 files changed, 329 insertions(+), 3 deletions(-)
diff --git a/nifi-external/nifi-kafka-connect/nifi-kafka-connector-tests/src/test/java/org/apache/nifi/kafka/connect/WorkingDirectoryUtilsTest.java b/nifi-external/nifi-kafka-connect/nifi-kafka-connector-tests/src/test/java/org/apache/nifi/kafka/connect/WorkingDirectoryUtilsTest.java
new file mode 100644
index 0000000000..e98fcaabb4
--- /dev/null
+++ b/nifi-external/nifi-kafka-connect/nifi-kafka-connector-tests/src/test/java/org/apache/nifi/kafka/connect/WorkingDirectoryUtilsTest.java
@@ -0,0 +1,222 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.kafka.connect;
+
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.io.File;
+import java.io.IOException;
+
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.junit.jupiter.api.io.CleanupMode.ALWAYS;
+
+public class WorkingDirectoryUtilsTest {
+
+ @Test
+ public void testDeleteNonexistentFile(@TempDir(cleanup = ALWAYS) File tempDir) {
+ File nonexistentFile = new File(tempDir, "testFile");
+
+ WorkingDirectoryUtils.purgeDirectory(nonexistentFile);
+
+ assertFalse(nonexistentFile.exists());
+ }
+
+ @Test
+ public void testDeleteFlatFile(@TempDir(cleanup = ALWAYS) File tempDir) throws IOException {
+ File file = new File(tempDir, "testFile");
+ file.createNewFile();
+
+ WorkingDirectoryUtils.purgeDirectory(file);
+
+ assertFalse(file.exists());
+ }
+
+ @Test
+ public void testDeleteDirectoryWithContents(@TempDir(cleanup = ALWAYS) File tempDir) throws IOException {
+ File directory = new File(tempDir, "directory");
+ File subDirectory = new File(directory, "subDirectory");
+ File subDirectoryContent = new File(subDirectory, "subDirectoryContent");
+ File directoryContent = new File(directory, "directoryContent");
+
+ directory.mkdir();
+ subDirectory.mkdir();
+ subDirectoryContent.createNewFile();
+ directoryContent.createNewFile();
+
+ WorkingDirectoryUtils.purgeDirectory(directory);
+
+ assertFalse(directory.exists());
+ }
+
+ @Test
+ public void testPurgeUnpackedNarsEmptyRootDirectory(@TempDir(cleanup = ALWAYS) File tempDir) {
+ File rootDirectory = new File(tempDir, "rootDirectory");
+
+ rootDirectory.mkdir();
+
+ WorkingDirectoryUtils.purgeIncompleteUnpackedNars(rootDirectory);
+
+ assertTrue(rootDirectory.exists());
+ }
+
+ @Test
+ public void testPurgeUnpackedNarsRootDirectoryWithFilesOnly(@TempDir(cleanup = ALWAYS) File tempDir) throws IOException {
+ File rootDirectory = new File(tempDir, "rootDirectory");
+ File directoryContent1 = new File(rootDirectory, "file1");
+ File directoryContent2 = new File(rootDirectory, "file2");
+
+ rootDirectory.mkdir();
+ directoryContent1.createNewFile();
+ directoryContent2.createNewFile();
+
+ WorkingDirectoryUtils.purgeIncompleteUnpackedNars(rootDirectory);
+
+ assertTrue(rootDirectory.exists() && directoryContent1.exists() && directoryContent2.exists());
+ }
+
+ @Test
+ public void testPurgeUnpackedNars(@TempDir(cleanup = ALWAYS) File tempDir) throws IOException {
+ File rootDirectory = new File(tempDir, "rootDirectory");
+ rootDirectory.mkdir();
+ TestDirectoryStructure testDirectoryStructure = new TestDirectoryStructure(rootDirectory);
+
+ WorkingDirectoryUtils.purgeIncompleteUnpackedNars(testDirectoryStructure.getRootDirectory());
+
+ assertTrue(testDirectoryStructure.isConsistent());
+ }
+
+ @Test
+ public void testWorkingDirectoryIntegrityRestored(@TempDir(cleanup = ALWAYS) File tempDir) throws IOException {
+ /*
+ workingDirectory
+ - nar
+ - extensions
+ - *TestDirectoryStructure*
+ - narDirectory
+ - narFile
+ - extensions
+ - *TestDirectoryStructure*
+ - additionalDirectory
+ - workingDirectoryFile
+ */
+ File workingDirectory = new File(tempDir, "workingDirectory");
+ File nar = new File(workingDirectory, "nar");
+ File narExtensions = new File(nar, "extensions");
+ File narDirectory = new File(nar, "narDirectory");
+ File narFile = new File(nar, "narFile");
+ File extensions = new File(workingDirectory, "extensions");
+ File additionalDirectory = new File(workingDirectory, "additionalDirectory");
+ File workingDirectoryFile = new File(workingDirectory, "workingDirectoryFile");
+
+ workingDirectory.mkdir();
+ nar.mkdir();
+ narExtensions.mkdir();
+ narDirectory.mkdir();
+ narFile.createNewFile();
+ extensions.mkdir();
+ additionalDirectory.mkdir();
+ workingDirectoryFile.createNewFile();
+
+ TestDirectoryStructure narExtensionsStructure = new TestDirectoryStructure(narExtensions);
+ TestDirectoryStructure extensionsStructure = new TestDirectoryStructure(extensions);
+
+ WorkingDirectoryUtils.reconcileWorkingDirectory(workingDirectory);
+
+ assertTrue(workingDirectory.exists()
+ && nar.exists()
+ && narExtensionsStructure.isConsistent()
+ && narDirectory.exists()
+ && narFile.exists()
+ && extensionsStructure.isConsistent()
+ && additionalDirectory.exists()
+ && workingDirectoryFile.exists()
+ );
+ }
+
+ private class TestDirectoryStructure {
+ /*
+ rootDirectory
+ - subDirectory1-nar-unpacked
+ - subDirectory1File1
+ - nar-digest
+ - subDirectory2
+ - subDirectory2File1
+ - subDirectory3-nar-unpacked
+ - subDirectory3Dir1
+ - subDirectory3Dir1File1
+ - subDirectory3File1
+ - fileInRoot
+ */
+ File rootDirectory;
+ File subDirectory1;
+ File subDirectory2;
+ File subDirectory3;
+ File fileInRoot;
+ File subDirectory1File1;
+ File subDirectory1File2;
+ File subDirectory2File1;
+ File subDirectory3Dir1;
+ File subDirectory3File1;
+ File subDirectory3Dir1File1;
+
+ public TestDirectoryStructure(final File rootDirectory) throws IOException {
+ this.rootDirectory = rootDirectory;
+ subDirectory1 = new File(rootDirectory, "subDirectory1-" + WorkingDirectoryUtils.NAR_UNPACKED_SUFFIX);
+ subDirectory2 = new File(rootDirectory, "subDirector2");
+ subDirectory3 = new File(rootDirectory, "subDirector3-" + WorkingDirectoryUtils.NAR_UNPACKED_SUFFIX);
+ fileInRoot = new File(rootDirectory, "fileInRoot");
+ subDirectory1File1 = new File(subDirectory1, "subDirectory1File1");
+ subDirectory1File2 = new File(subDirectory1, WorkingDirectoryUtils.HASH_FILENAME);
+ subDirectory2File1 = new File(subDirectory2, "subDirectory2File1");
+ subDirectory3Dir1 = new File(subDirectory3, "subDirectory3Dir1");
+ subDirectory3File1 = new File(subDirectory3, "subDirectory3File1");
+ subDirectory3Dir1File1 = new File(subDirectory3Dir1, "subDirectory3Dir1File1");
+
+ subDirectory1.mkdir();
+ subDirectory2.mkdir();
+ subDirectory3.mkdir();
+ fileInRoot.createNewFile();
+ subDirectory1File1.createNewFile();
+ subDirectory1File2.createNewFile();
+ subDirectory2File1.createNewFile();
+ subDirectory3File1.createNewFile();
+ subDirectory3Dir1.mkdir();
+ subDirectory3Dir1File1.createNewFile();
+ }
+
+ public File getRootDirectory() {
+ return rootDirectory;
+ }
+
+ /**
+ * Checks if all directories ending in 'nar-unpacked' that have a file named 'nar-digest' within still exist,
+ * and the directory ending in 'nar-unpacked' without 'nar-digest' has been removed with all of its contents.
+ * @return true if the above is met.
+ */
+ public boolean isConsistent() {
+ return (rootDirectory.exists()
+ && subDirectory1.exists() && subDirectory1File1.exists() && subDirectory1File2.exists()
+ && subDirectory2.exists() && subDirectory2File1.exists()
+ && !(subDirectory3.exists() || subDirectory3Dir1.exists() || subDirectory3File1.exists() || subDirectory3Dir1File1.exists())
+ && fileInRoot.exists());
+ }
+ }
+
+}
diff --git a/nifi-external/nifi-kafka-connect/nifi-kafka-connector/src/main/java/org/apache/nifi/kafka/connect/StatelessKafkaConnectorUtil.java b/nifi-external/nifi-kafka-connect/nifi-kafka-connector/src/main/java/org/apache/nifi/kafka/connect/StatelessKafkaConnectorUtil.java
index dcc138c977..5554217f42 100644
--- a/nifi-external/nifi-kafka-connect/nifi-kafka-connector/src/main/java/org/apache/nifi/kafka/connect/StatelessKafkaConnectorUtil.java
+++ b/nifi-external/nifi-kafka-connect/nifi-kafka-connector/src/main/java/org/apache/nifi/kafka/connect/StatelessKafkaConnectorUtil.java
@@ -84,6 +84,7 @@ public class StatelessKafkaConnectorUtil {
config.setFlowDefinition(dataflowDefinitionProperties);
dataflowDefinitionProperties.put(StatelessNiFiCommonConfig.BOOTSTRAP_FLOW_NAME, dataflowName);
MDC.setContextMap(Collections.singletonMap("dataflow", dataflowName));
+ StatelessDataflow dataflow;
// Use a Write Lock to ensure that only a single thread is calling StatelessBootstrap.bootstrap().
// We do this because the bootstrap() method will expand all NAR files into the working directory.
@@ -91,13 +92,16 @@ public class StatelessKafkaConnectorUtil {
// unpacking NARs at the same time, as it could potentially result in the working directory becoming corrupted.
unpackNarLock.lock();
try {
+ WorkingDirectoryUtils.reconcileWorkingDirectory(engineConfiguration.getWorkingDirectory());
+
bootstrap = StatelessBootstrap.bootstrap(engineConfiguration, StatelessNiFiSourceTask.class.getClassLoader());
+
+ dataflowDefinition = bootstrap.parseDataflowDefinition(dataflowDefinitionProperties, parameterOverrides);
+ dataflow = bootstrap.createDataflow(dataflowDefinition);
} finally {
unpackNarLock.unlock();
}
-
- dataflowDefinition = bootstrap.parseDataflowDefinition(dataflowDefinitionProperties, parameterOverrides);
- return bootstrap.createDataflow(dataflowDefinition);
+ return dataflow;
} catch (final Exception e) {
throw new RuntimeException("Failed to bootstrap Stateless NiFi Engine", e);
}
diff --git a/nifi-external/nifi-kafka-connect/nifi-kafka-connector/src/main/java/org/apache/nifi/kafka/connect/WorkingDirectoryUtils.java b/nifi-external/nifi-kafka-connect/nifi-kafka-connector/src/main/java/org/apache/nifi/kafka/connect/WorkingDirectoryUtils.java
new file mode 100644
index 0000000000..1453267a30
--- /dev/null
+++ b/nifi-external/nifi-kafka-connect/nifi-kafka-connector/src/main/java/org/apache/nifi/kafka/connect/WorkingDirectoryUtils.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.kafka.connect;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.util.Arrays;
+
+public class WorkingDirectoryUtils {
+
+ protected static final String NAR_UNPACKED_SUFFIX = "nar-unpacked";
+ protected static final String HASH_FILENAME = "nar-digest";
+ private static final Logger logger = LoggerFactory.getLogger(WorkingDirectoryUtils.class);
+
+ /**
+ * Goes through the nar/extensions and extensions directories within the working directory
+ * and deletes every directory whose name ends in "nar-unpacked" and does not have a
+ * "nar-digest" file in it.
+ * @param workingDirectory File object pointing to the working directory.
+ */
+ public static void reconcileWorkingDirectory(final File workingDirectory) {
+ purgeIncompleteUnpackedNars(new File(new File(workingDirectory, "nar"), "extensions"));
+ purgeIncompleteUnpackedNars(new File(workingDirectory, "extensions"));
+ }
+
+ /**
+ * Receives a directory as parameter and goes through every directory within it that ends in
+ * "nar-unpacked". If a directory ending in "nar-unpacked" does not have a file named
+ * "nar-digest" within it, it gets deleted with all of its contents.
+ * @param directory A File object pointing to the directory that is supposed to contain
+ * further directories whose name ends in "nar-unpacked".
+ */
+ public static void purgeIncompleteUnpackedNars(final File directory) {
+ final File[] unpackedDirs = directory.listFiles(file -> file.isDirectory() && file.getName().endsWith(NAR_UNPACKED_SUFFIX));
+ if (unpackedDirs == null || unpackedDirs.length == 0) {
+ logger.debug("Found no unpacked NARs in {}", directory);
+ if (logger.isDebugEnabled()) {
+ logger.debug("Directory contains: {}", Arrays.deepToString(directory.listFiles()));
+ }
+ return;
+ }
+
+ for (final File unpackedDir : unpackedDirs) {
+ final File narHashFile = new File(unpackedDir, HASH_FILENAME);
+ if (narHashFile.exists()) {
+ logger.debug("Already successfully unpacked {}", unpackedDir);
+ } else {
+ purgeDirectory(unpackedDir);
+ }
+ }
+ }
+
+ /**
+ * Delete a directory with all of its contents.
+ * @param directory The directory to be deleted.
+ */
+ public static void purgeDirectory(final File directory) {
+ if (directory.exists()) {
+ deleteRecursively(directory);
+ logger.debug("Cleaned up {}", directory);
+ }
+ }
+
+ private static void deleteRecursively(final File fileOrDirectory) {
+ if (fileOrDirectory.isDirectory()) {
+ final File[] files = fileOrDirectory.listFiles();
+ if (files != null) {
+ for (final File file : files) {
+ deleteRecursively(file);
+ }
+ }
+ }
+ deleteQuietly(fileOrDirectory);
+ }
+
+ private static void deleteQuietly(final File file) {
+ final boolean deleted = file.delete();
+ if (!deleted) {
+ logger.debug("Failed to cleanup temporary file {}", file);
+ }
+ }
+
+}