You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ex...@apache.org on 2022/07/12 01:06:17 UTC
[nifi] branch main updated: NIFI-10143 Adjusted UTF-8 auto-detection in FTP Processors
This is an automated email from the ASF dual-hosted git repository.
exceptionfactory pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push:
new 274e4feeaa NIFI-10143 Adjusted UTF-8 auto-detection in FTP Processors
274e4feeaa is described below
commit 274e4feeaa4f8173657fe0bfdb66291ef33073ed
Author: Paul Grey <gr...@yahoo.com>
AuthorDate: Mon Jun 27 17:30:26 2022 -0400
NIFI-10143 Adjusted UTF-8 auto-detection in FTP Processors
This closes #6172
Signed-off-by: David Handermann <ex...@apache.org>
---
.../standard/ftp/StandardFTPClientProvider.java | 13 +-
.../nifi/processors/standard/FTPCharsetIT.java | 303 +++++++++++++++++++++
2 files changed, 310 insertions(+), 6 deletions(-)
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ftp/StandardFTPClientProvider.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ftp/StandardFTPClientProvider.java
index 726d1e34ea..da2c3a22f7 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ftp/StandardFTPClientProvider.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ftp/StandardFTPClientProvider.java
@@ -38,19 +38,19 @@ import java.util.Map;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
-import static org.apache.nifi.processors.standard.util.FTPTransfer.createComponentProxyConfigSupplier;
import static org.apache.nifi.processors.standard.util.FTPTransfer.BUFFER_SIZE;
import static org.apache.nifi.processors.standard.util.FTPTransfer.CONNECTION_MODE;
import static org.apache.nifi.processors.standard.util.FTPTransfer.CONNECTION_MODE_ACTIVE;
-import static org.apache.nifi.processors.standard.util.FTPTransfer.DATA_TIMEOUT;
import static org.apache.nifi.processors.standard.util.FTPTransfer.CONNECTION_TIMEOUT;
-import static org.apache.nifi.processors.standard.util.FTPTransfer.UTF8_ENCODING;
-import static org.apache.nifi.processors.standard.util.FTPTransfer.USERNAME;
-import static org.apache.nifi.processors.standard.util.FTPTransfer.PASSWORD;
+import static org.apache.nifi.processors.standard.util.FTPTransfer.DATA_TIMEOUT;
import static org.apache.nifi.processors.standard.util.FTPTransfer.HOSTNAME;
+import static org.apache.nifi.processors.standard.util.FTPTransfer.PASSWORD;
import static org.apache.nifi.processors.standard.util.FTPTransfer.PORT;
import static org.apache.nifi.processors.standard.util.FTPTransfer.TRANSFER_MODE;
import static org.apache.nifi.processors.standard.util.FTPTransfer.TRANSFER_MODE_ASCII;
+import static org.apache.nifi.processors.standard.util.FTPTransfer.USERNAME;
+import static org.apache.nifi.processors.standard.util.FTPTransfer.UTF8_ENCODING;
+import static org.apache.nifi.processors.standard.util.FTPTransfer.createComponentProxyConfigSupplier;
/**
* Standard implementation of FTP Client Provider
@@ -150,11 +150,12 @@ public class StandardFTPClientProvider implements FTPClientProvider {
client.setDataTimeout(dataTimeout);
client.setDefaultTimeout(connectionTimeout);
client.setRemoteVerificationEnabled(false);
+ client.setAutodetectUTF8(true);
final boolean unicodeEnabled = context.getProperty(UTF8_ENCODING).isSet() ? context.getProperty(UTF8_ENCODING).asBoolean() : false;
+ // in non-UTF-8 mode, FTP control encoding should be left as default (ISO-8859-1)
if (unicodeEnabled) {
client.setControlEncoding(StandardCharsets.UTF_8.name());
- client.setAutodetectUTF8(true);
}
}
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/FTPCharsetIT.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/FTPCharsetIT.java
new file mode 100644
index 0000000000..32ae88011e
--- /dev/null
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/FTPCharsetIT.java
@@ -0,0 +1,303 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.standard;
+
+import org.apache.ftpserver.FtpServer;
+import org.apache.ftpserver.FtpServerFactory;
+import org.apache.ftpserver.ftplet.FileSystemFactory;
+import org.apache.ftpserver.ftplet.FileSystemView;
+import org.apache.ftpserver.ftplet.FtpException;
+import org.apache.ftpserver.ftplet.FtpFile;
+import org.apache.ftpserver.ftplet.UserManager;
+import org.apache.ftpserver.listener.ListenerFactory;
+import org.apache.ftpserver.usermanager.ClearTextPasswordEncryptor;
+import org.apache.ftpserver.usermanager.PropertiesUserManagerFactory;
+import org.apache.ftpserver.usermanager.impl.BaseUser;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.processor.Processor;
+import org.apache.nifi.processor.util.list.AbstractListProcessor;
+import org.apache.nifi.processors.standard.util.FTPTransfer;
+import org.apache.nifi.remote.io.socket.NetworkUtils;
+import org.apache.nifi.util.MockFlowFile;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.MethodOrderer;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestMethodOrder;
+import org.junit.jupiter.api.io.TempDir;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Date;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Properties;
+import java.util.Set;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertInstanceOf;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.junit.jupiter.params.provider.Arguments.arguments;
+
+/**
+ * Seed an ASCII charset folder in FTP server with files using i18n known filenames. Iterate through a set of i18n folder
+ * names, copying the known content into each new target. Afterwards, verify that the last folder contains all
+ * files with the expected filenames.
+ * <p>
+ * To test against a live FTP server, run test with system property set
+ * <code>TestFTPCharset=hostname,port,user,password</code>, like
+ * <code>TestFTPCharset=localhost,21,ftpuser,ftppassword</code>.
+ */
+@TestMethodOrder(MethodOrderer.MethodName.class)
+public class FTPCharsetIT {
+ private static final String SERVER_OVERRIDE = System.getProperty(FTPCharsetIT.class.getSimpleName());
+ private static final boolean EMBED_FTP_SERVER = (SERVER_OVERRIDE == null);
+ private static FtpServer FTP_SERVER;
+
+ private static final String USE_UTF8 = Boolean.TRUE.toString();
+ private static final String HOSTNAME = "localhost";
+ private static final String PORT = Integer.toString(NetworkUtils.getAvailableTcpPort());
+ private static final String USER = "ftpuser";
+ private static final String PASSWORD = "admin";
+ private static final String TIMEOUT = "3 secs";
+
+ @TempDir
+ private static File FOLDER_FTP;
+ @TempDir
+ private static File FOLDER_USER_PROPERTIES;
+
+ public static Arguments serverParametersProvider() {
+ final String override = System.getProperty(FTPCharsetIT.class.getSimpleName());
+ if (override == null) {
+ return arguments(HOSTNAME, PORT, USER, PASSWORD);
+ } else {
+ return arguments((Object[]) override.split(","));
+ }
+ }
+
+ public static Stream<Arguments> folderNamesProvider() {
+ return Stream.of(
+ arguments("folder1", "folder2"),
+ arguments("folder2", "æøå"),
+ arguments("æøå", "folder3"),
+ arguments("folder3", "اختبار"),
+ arguments("اختبار", "folder4"),
+ arguments("folder4", "Госагїzатїой"),
+ arguments("Госагїzатїой", "folder5"),
+ arguments("folder5", "し回亡丹し工z丹卞工回几"),
+ arguments("し回亡丹し工z丹卞工回几", "folder6")
+ );
+ }
+
+ public static Stream<String> filenamesProvider() {
+ return Stream.of(
+ "1.txt",
+ "æøå.txt",
+ "اختبار.txt",
+ "Госагїzатїой.txt",
+ "し回亡丹し工z丹卞工回几.txt");
+ }
+
+ @BeforeAll
+ static void startEmbeddedServer() throws IOException, FtpException {
+ if (EMBED_FTP_SERVER) {
+ // setup ftp user
+ final Properties userProperties = new Properties();
+ userProperties.setProperty("ftpserver.user.ftpuser.idletime", "0");
+ userProperties.setProperty("ftpserver.user.ftpuser.enableflag", Boolean.TRUE.toString());
+ userProperties.setProperty("ftpserver.user.ftpuser.userpassword", PASSWORD);
+ userProperties.setProperty("ftpserver.user.ftpuser.writepermission", Boolean.TRUE.toString());
+ userProperties.setProperty("ftpserver.user.ftpuser.homedirectory", FOLDER_FTP.getAbsolutePath());
+ final File userPropertiesFile = new File(FOLDER_USER_PROPERTIES, "user.properties");
+ try (final FileOutputStream fos = new FileOutputStream(userPropertiesFile)) {
+ userProperties.store(fos, "ftp-user-properties");
+ }
+ final PropertiesUserManagerFactory userManagerFactory = new PropertiesUserManagerFactory();
+ userManagerFactory.setUrl(userPropertiesFile.toURI().toURL());
+ userManagerFactory.setPasswordEncryptor(new ClearTextPasswordEncryptor());
+ final UserManager userManager = userManagerFactory.createUserManager();
+ final BaseUser ftpuser = (BaseUser) userManager.getUserByName(USER);
+ // setup embedded ftp server
+ final FtpServerFactory serverFactory = new FtpServerFactory();
+ serverFactory.setUserManager(userManager);
+ final FileSystemFactory fileSystemFactory = serverFactory.getFileSystem();
+ final FileSystemView view = fileSystemFactory.createFileSystemView(ftpuser);
+ final FtpFile workingDirectory = view.getWorkingDirectory();
+ final Object physicalFile = workingDirectory.getPhysicalFile();
+ assertInstanceOf(File.class, physicalFile);
+ assertEquals(FOLDER_FTP.getAbsolutePath(), ((File) physicalFile).getAbsolutePath());
+ final ListenerFactory factory = new ListenerFactory();
+ factory.setPort(Integer.parseInt(PORT));
+ serverFactory.addListener("default", factory.createListener());
+ FTP_SERVER = serverFactory.createServer();
+ FTP_SERVER.start();
+ }
+ }
+
+ @AfterAll
+ static void stopEmbeddedServer() {
+ if (EMBED_FTP_SERVER) {
+ FTP_SERVER.stop();
+ }
+ }
+
+
+ /**
+ * Test connectivity to FTP server.
+ */
+ @Test
+ public void test0SeedFTPPut1() {
+ final TestRunner runnerPut = provisionTestRunner(PutFTP.class);
+ final String folderName = "folder0";
+ final MockFlowFile flowFile = new MockFlowFile(1L);
+ flowFile.putAttributes(Collections.singletonMap(CoreAttributes.FILENAME.key(), "0.txt"));
+ flowFile.setData(new Date().toString().getBytes(UTF_8));
+ runnerPut.enqueue(flowFile);
+ runnerPut.setValidateExpressionUsage(false);
+ runnerPut.setProperty(FTPTransfer.REMOTE_PATH, folderName);
+ runnerPut.setProperty(FTPTransfer.CREATE_DIRECTORY, Boolean.TRUE.toString());
+ runnerPut.setProperty(FTPTransfer.DOT_RENAME, Boolean.FALSE.toString());
+ runnerPut.run();
+ runnerPut.assertTransferCount(PutFileTransfer.REL_FAILURE, 0);
+ runnerPut.assertTransferCount(PutFileTransfer.REL_REJECT, 0);
+ runnerPut.assertTransferCount(PutFileTransfer.REL_SUCCESS, 1);
+ }
+
+ /**
+ * Seed FTP server with data to be propagated.
+ */
+ @Test
+ public void test0SeedFTPPutAll() {
+ int id = 0;
+ final Object[] argumentsFirstFolder = folderNamesProvider().iterator().next().get();
+ final String folderName = Arrays.stream(argumentsFirstFolder).iterator().next().toString();
+ final TestRunner runnerPut = provisionTestRunner(PutFTP.class);
+ runnerPut.setValidateExpressionUsage(false);
+ runnerPut.setProperty(FTPTransfer.REMOTE_PATH, folderName);
+ runnerPut.setProperty(FTPTransfer.CREATE_DIRECTORY, Boolean.TRUE.toString());
+ runnerPut.setProperty(FTPTransfer.DOT_RENAME, Boolean.FALSE.toString());
+
+ final Iterator<String> iteratorFilenames = filenamesProvider().iterator();
+ while (iteratorFilenames.hasNext()) {
+ final String filename = iteratorFilenames.next();
+ final MockFlowFile flowFile = new MockFlowFile(++id);
+ flowFile.putAttributes(Collections.singletonMap(CoreAttributes.FILENAME.key(), filename));
+ flowFile.setData(new Date().toString().getBytes(UTF_8));
+ runnerPut.enqueue(flowFile);
+ }
+ final int fileCount = (int) filenamesProvider().count();
+ runnerPut.run();
+ runnerPut.assertTransferCount(PutFileTransfer.REL_FAILURE, 0);
+ runnerPut.assertTransferCount(PutFileTransfer.REL_REJECT, 0);
+ runnerPut.assertTransferCount(PutFileTransfer.REL_SUCCESS, fileCount);
+ }
+
+ /**
+ * Verify that data has been successfully propagated.
+ */
+ @Test
+ public void test9FTPVerifyAll() {
+ final Set<String> filenamesExpected = filenamesProvider().collect(Collectors.toSet());
+
+ final Object[] argumentsLastFolder = folderNamesProvider()
+ .reduce((prev, next) -> next).orElseThrow(IllegalStateException::new).get();
+ final String folderName = Arrays.stream(argumentsLastFolder)
+ .reduce((prev, next) -> next).orElseThrow(IllegalStateException::new).toString();
+ final TestRunner runnerList = provisionTestRunner(ListFTP.class);
+ runnerList.setProperty(ListFileTransfer.FILE_TRANSFER_LISTING_STRATEGY, AbstractListProcessor.NO_TRACKING);
+ runnerList.setProperty(FTPTransfer.REMOTE_PATH, folderName);
+ runnerList.clearTransferState();
+ runnerList.run(1);
+ runnerList.assertTransferCount(AbstractListProcessor.REL_SUCCESS, filenamesExpected.size());
+ final List<MockFlowFile> flowFilesList = runnerList.getFlowFilesForRelationship(AbstractListProcessor.REL_SUCCESS);
+ for (MockFlowFile flowFile : flowFilesList) {
+ filenamesExpected.remove(flowFile.getAttribute(CoreAttributes.FILENAME.key()));
+ }
+ assertTrue(filenamesExpected.isEmpty());
+ }
+
+ /**
+ * For each parameterized invocation, copy files using FTP protocol from source folder to target folder. This
+ * implicitly verifies charset handling of ListFTP, FetchFTP, and PutFTP processors.
+ *
+ * @param source the folder name from which to retrieve data
+ * @param target the folder name to which data should be copied
+ */
+ @ParameterizedTest
+ @MethodSource("folderNamesProvider")
+ public void test1FTP(final String source, final String target) {
+ final int fileCount = (int) filenamesProvider().count();
+ // ListFTP
+ final TestRunner runnerList = provisionTestRunner(ListFTP.class);
+ runnerList.setProperty(ListFileTransfer.FILE_TRANSFER_LISTING_STRATEGY, AbstractListProcessor.NO_TRACKING);
+ runnerList.setProperty(FTPTransfer.REMOTE_PATH, source);
+ runnerList.clearTransferState();
+ runnerList.run(1);
+ runnerList.assertTransferCount(AbstractListProcessor.REL_SUCCESS, fileCount);
+ final List<MockFlowFile> flowFilesList = runnerList.getFlowFilesForRelationship(AbstractListProcessor.REL_SUCCESS);
+ // FetchFTP
+ final TestRunner runnerFetch = provisionTestRunner(FetchFTP.class);
+ for (MockFlowFile flowFile : flowFilesList) {
+ runnerFetch.enqueue(flowFile);
+ }
+ runnerFetch.setProperty(FetchFTP.REMOTE_FILENAME, "${path}/${filename}");
+ runnerFetch.run(flowFilesList.size());
+ runnerFetch.assertTransferCount(FetchFTP.REL_COMMS_FAILURE, 0);
+ runnerFetch.assertTransferCount(FetchFTP.REL_NOT_FOUND, 0);
+ runnerFetch.assertTransferCount(FetchFTP.REL_PERMISSION_DENIED, 0);
+ runnerFetch.assertTransferCount(FetchFileTransfer.REL_SUCCESS, fileCount);
+ final List<MockFlowFile> flowFilesFetch = runnerFetch.getFlowFilesForRelationship(AbstractListProcessor.REL_SUCCESS);
+ // PutFTP
+ final TestRunner runnerPut = provisionTestRunner(PutFTP.class);
+ for (MockFlowFile flowFile : flowFilesFetch) {
+ runnerPut.enqueue(flowFile);
+ }
+ runnerPut.setValidateExpressionUsage(false);
+ runnerPut.setProperty(FTPTransfer.REMOTE_PATH, target);
+ runnerPut.setProperty(FTPTransfer.CREATE_DIRECTORY, Boolean.TRUE.toString());
+ runnerPut.setProperty(FTPTransfer.DOT_RENAME, Boolean.FALSE.toString());
+ runnerPut.run(flowFilesList.size());
+ runnerPut.assertTransferCount(PutFileTransfer.REL_FAILURE, 0);
+ runnerPut.assertTransferCount(PutFileTransfer.REL_REJECT, 0);
+ runnerPut.assertTransferCount(PutFileTransfer.REL_SUCCESS, fileCount);
+ }
+
+ private static TestRunner provisionTestRunner(final Class<? extends Processor> processorClass) {
+ final TestRunner runner = TestRunners.newTestRunner(processorClass);
+ final Object[] serverParameters = serverParametersProvider().get();
+ int i = -1;
+ runner.setProperty(FTPTransfer.HOSTNAME, serverParameters[++i].toString());
+ runner.setProperty(FTPTransfer.PORT, serverParameters[++i].toString());
+ runner.setProperty(FTPTransfer.USERNAME, serverParameters[++i].toString());
+ runner.setProperty(FTPTransfer.PASSWORD, serverParameters[++i].toString());
+ runner.setProperty(FTPTransfer.UTF8_ENCODING, USE_UTF8);
+ runner.setProperty(FTPTransfer.CONNECTION_TIMEOUT, TIMEOUT);
+ runner.setProperty(FTPTransfer.DATA_TIMEOUT, TIMEOUT);
+ return runner;
+ }
+}