You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@nifi.apache.org by GitBox <gi...@apache.org> on 2020/08/14 20:58:52 UTC

[GitHub] [nifi] pgyori opened a new pull request #4481: NIFI-7624: ListenFTP processor

pgyori opened a new pull request #4481:
URL: https://github.com/apache/nifi/pull/4481


   https://issues.apache.org/jira/browse/NIFI-7624
   
   #### Description of PR
   
   A processor that starts an FTP server and converts all incoming files into FlowFiles.
   Folders can be created, viewed and deleted. Files do not get listed in the server's filesystem, as they are not stored like on a regular FTP server, but converted into FlowFiles and sent to the processor's SUCCESS relationship. Thus, files can only be put on this server, and not downloaded from it.
   
   In order to streamline the review of the contribution we ask you
   to ensure the following steps have been taken:
   
   ### For all changes:
   - [ ] Is there a JIRA ticket associated with this PR? Is it referenced 
        in the commit message?
   
   - [ ] Does your PR title start with **NIFI-XXXX** where XXXX is the JIRA number you are trying to resolve? Pay particular attention to the hyphen "-" character.
   
   - [ ] Has your PR been rebased against the latest commit within the target branch (typically `main`)?
   
   - [ ] Is your initial contribution a single, squashed commit? _Additional commits in response to PR reviewer feedback should be made on this branch and pushed to allow change tracking. Do not `squash` or use `--force` when pushing to allow for clean monitoring of changes._
   
   ### For code changes:
   - [ ] Have you ensured that the full suite of tests is executed via `mvn -Pcontrib-check clean install` at the root `nifi` folder?
   - [ ] Have you written or updated unit tests to verify your changes?
   - [ ] Have you verified that the full build is successful on JDK 8?
   - [ ] Have you verified that the full build is successful on JDK 11?
   - [ ] If adding new dependencies to the code, are these dependencies licensed in a way that is compatible for inclusion under [ASF 2.0](http://www.apache.org/legal/resolved.html#category-a)? 
   - [ ] If applicable, have you updated the `LICENSE` file, including the main `LICENSE` file under `nifi-assembly`?
   - [ ] If applicable, have you updated the `NOTICE` file, including the main `NOTICE` file found under `nifi-assembly`?
   - [ ] If adding new Properties, have you added `.displayName` in addition to .name (programmatic access) for each of the new properties?
   
   ### For documentation related changes:
   - [ ] Have you ensured that format looks appropriate for the output in which it is rendered?
   
   ### Note:
   Please ensure that once the PR is submitted, you check GitHub Actions CI for build issues and submit an update to your PR as soon as possible.
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi] pgyori commented on a change in pull request #4481: NIFI-7624: ListenFTP processor

Posted by GitBox <gi...@apache.org>.
pgyori commented on a change in pull request #4481:
URL: https://github.com/apache/nifi/pull/4481#discussion_r483063769



##########
File path: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenFTP.java
##########
@@ -0,0 +1,215 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.standard;
+
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.annotation.lifecycle.OnStopped;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.processor.AbstractSessionFactoryProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSessionFactory;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.processors.standard.ftp.NifiFtpServer;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicReference;
+
+@InputRequirement(Requirement.INPUT_FORBIDDEN)
+@Tags({"ingest", "ftp", "listen"})
+@CapabilityDescription("Starts an FTP Server and listens on a given port to transform incoming files into FlowFiles. "
+        + "The URI of the Service will be ftp://{hostname}:{port}. The default port is 2221.")
+public class ListenFTP extends AbstractSessionFactoryProcessor {
+
+    public static final Relationship RELATIONSHIP_SUCCESS = new Relationship.Builder()
+            .name("success")
+            .description("Relationship for successfully received files")
+            .build();
+
+    public static final PropertyDescriptor BIND_ADDRESS = new PropertyDescriptor.Builder()
+            .name("bind-address")
+            .displayName("Bind Address")
+            .description("The address the FTP server should be bound to. If not provided, the server binds to all available addresses.")

Review comment:
       If the host computer has multiple network adapters, multiple network addresses will be available for the listener to be bound to. By default, the listener binds to all of these addresses (this happens when the property is not set). By setting this property we instruct the listener to only bind to one specific address, not all of the available addresses.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi] pgyori commented on a change in pull request #4481: NIFI-7624: ListenFTP processor

Posted by GitBox <gi...@apache.org>.
pgyori commented on a change in pull request #4481:
URL: https://github.com/apache/nifi/pull/4481#discussion_r485021600



##########
File path: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ftp/filesystem/DefaultVirtualFileSystem.java
##########
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.standard.ftp.filesystem;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+public class DefaultVirtualFileSystem implements VirtualFileSystem {
+
+    private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
+    private final List<VirtualPath> existingPaths;
+
+    public DefaultVirtualFileSystem() {
+        existingPaths = new ArrayList<>();
+        existingPaths.add(ROOT);
+    }
+
+    @Override
+    public boolean mkdir(VirtualPath newFile) {
+        lock.writeLock().lock();
+        try {
+            if (existingPaths.contains(newFile)) {
+                return false;
+            } else {
+                if (existingPaths.contains(newFile.getParent())) {
+                    existingPaths.add(newFile);
+                    return true;
+                } else {
+                    return false;
+                }
+            }
+        } finally {
+            lock.writeLock().unlock();
+        }
+    }
+
+    @Override
+    public boolean exists(VirtualPath virtualFile) {
+        lock.readLock().lock();
+        try {
+            return existingPaths.contains(virtualFile);
+        } finally {
+            lock.readLock().unlock();
+        }
+    }
+
+    @Override
+    public boolean delete(VirtualPath virtualFile) {
+        if (virtualFile.equals(ROOT)) { // Root cannot be deleted
+            return false;
+        }
+
+        lock.writeLock().lock();
+        try {
+            if (existingPaths.contains(virtualFile)) {
+                if (!hasSubDirectories(virtualFile)) {
+                    return existingPaths.remove(virtualFile);
+                }
+            }
+            return false;
+        } finally {
+            lock.writeLock().unlock();
+        }
+    }
+
+    private boolean hasSubDirectories(VirtualPath directory) {
+        return existingPaths.stream().anyMatch(e -> isChildOf(directory, e));
+    }
+
+    private boolean isChildOf(VirtualPath parent, VirtualPath childCandidate) {
+        if (childCandidate.equals(ROOT)) {
+            return false;
+        }
+        return parent.equals(childCandidate.getParent());
+    }
+
+    @Override
+    public List<VirtualPath> listChildren(VirtualPath parent) {
+        List<VirtualPath> children = new ArrayList<>();

Review comment:
       Yes, I thought about it. DefaultVirtualFileSystem is a naive implementation that can easily be replaced with a more advanced VirtualFileSystem implementation.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi] pgyori commented on a change in pull request #4481: NIFI-7624: ListenFTP processor

Posted by GitBox <gi...@apache.org>.
pgyori commented on a change in pull request #4481:
URL: https://github.com/apache/nifi/pull/4481#discussion_r483089557



##########
File path: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenFTP.java
##########
@@ -0,0 +1,215 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.standard;
+
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.annotation.lifecycle.OnStopped;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.processor.AbstractSessionFactoryProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSessionFactory;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.processors.standard.ftp.NifiFtpServer;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicReference;
+
+@InputRequirement(Requirement.INPUT_FORBIDDEN)
+@Tags({"ingest", "ftp", "listen"})
+@CapabilityDescription("Starts an FTP Server and listens on a given port to transform incoming files into FlowFiles. "
+        + "The URI of the Service will be ftp://{hostname}:{port}. The default port is 2221.")
+public class ListenFTP extends AbstractSessionFactoryProcessor {
+
+    public static final Relationship RELATIONSHIP_SUCCESS = new Relationship.Builder()
+            .name("success")
+            .description("Relationship for successfully received files")
+            .build();
+
+    public static final PropertyDescriptor BIND_ADDRESS = new PropertyDescriptor.Builder()
+            .name("bind-address")
+            .displayName("Bind Address")
+            .description("The address the FTP server should be bound to. If not provided, the server binds to all available addresses.")
+            .required(false)
+            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .build();
+
+    public static final PropertyDescriptor PORT = new PropertyDescriptor.Builder()
+            .name("listening-port")
+            .displayName("Listening Port")
+            .description("The Port to listen on for incoming connections. On Linux, root privileges are required to use port numbers below 1024.")
+            .required(true)
+            .defaultValue("2221")
+            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .addValidator(StandardValidators.PORT_VALIDATOR)
+            .build();
+
+    public static final PropertyDescriptor USERNAME = new PropertyDescriptor.Builder()
+            .name("username")
+            .displayName("Username")
+            .description("The name of the user that is allowed to log in to the FTP server. " +
+                    "If a username is provided, a password must also be provided. " +
+                    "If no username is specified, anonymous connections will be permitted.")
+            .required(false)
+            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+            .build();
+
+    public static final PropertyDescriptor PASSWORD = new PropertyDescriptor.Builder()
+            .name("password")
+            .displayName("Password")
+            .description("If a Username is specified, then a password must also be specified. " +
+                    "The password provided by the client trying to log in to the FTP server will be checked against this password.")
+            .required(false)
+            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+            .sensitive(true)
+            .build();
+
+    private static final List<PropertyDescriptor> PROPERTIES = Collections.unmodifiableList(Arrays.asList(
+            BIND_ADDRESS,
+            PORT,
+            USERNAME,
+            PASSWORD
+    ));
+
+    private static final Set<Relationship> RELATIONSHIPS = Collections.unmodifiableSet(new HashSet<>(Collections.singletonList(
+            RELATIONSHIP_SUCCESS
+    )));
+
+    private volatile NifiFtpServer ftpServer;
+    private volatile CountDownLatch sessionFactorySetSignal;
+    private final AtomicReference<ProcessSessionFactory> sessionFactory = new AtomicReference<>();
+
+    @Override
+    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return PROPERTIES;
+    }
+
+    @Override
+    public Set<Relationship> getRelationships() {
+        return RELATIONSHIPS;
+    }
+
+    @OnScheduled
+    public void startFtpServer(ProcessContext context) {
+        if (ftpServer == null) {
+            String username = context.getProperty(USERNAME).evaluateAttributeExpressions().getValue();
+            String password = context.getProperty(PASSWORD).evaluateAttributeExpressions().getValue();
+            String bindAddress = context.getProperty(BIND_ADDRESS).evaluateAttributeExpressions().getValue();
+            int port = context.getProperty(PORT).evaluateAttributeExpressions().asInteger();
+
+            try {
+                sessionFactorySetSignal = new CountDownLatch(1);
+                ftpServer = new NifiFtpServer.Builder()
+                        .sessionFactory(sessionFactory)
+                        .sessionFactorySetSignal(sessionFactorySetSignal)
+                        .bindAddress(bindAddress)
+                        .port(port)
+                        .username(username)
+                        .password(password)
+                        .build();
+                ftpServer.start();
+            } catch (ProcessException processException) {
+                getLogger().error(processException.getMessage(), processException);
+                stopFtpServer();
+                throw processException;
+            }
+        } else {
+            getLogger().warn("Ftp server already started.");
+        }
+    }
+
+    @OnStopped
+    public void stopFtpServer() {
+        if (ftpServer != null && !ftpServer.isStopped()) {
+            ftpServer.stop();
+        }
+        ftpServer = null;
+    }
+
+    @Override
+    public void onTrigger(ProcessContext context, ProcessSessionFactory sessionFactory) throws ProcessException {
+        if (this.sessionFactory.compareAndSet(null, sessionFactory)) {
+            sessionFactorySetSignal.countDown();
+        }
+        context.yield();
+    }
+
+    @Override
+    protected Collection<ValidationResult> customValidate(ValidationContext context) {
+        List<ValidationResult> results = new ArrayList<>(2);
+        String username = context.getProperty(USERNAME).evaluateAttributeExpressions().getValue();
+        String password = context.getProperty(PASSWORD).evaluateAttributeExpressions().getValue();
+
+        if ((username == null) && (password != null)) {
+            results.add(usernameOrPasswordIsNull(USERNAME, PASSWORD));
+        } else if ((username != null) && (password == null)) {
+            results.add(usernameOrPasswordIsNull(PASSWORD, USERNAME));
+        } else if ((username != null) && (password != null)) {
+            validateAgainstEmptyString(username, USERNAME, results);

Review comment:
       We must make sure this check is performed even if the NON_BLANK_VALIDATOR is removed from the PropertyDescriptor for some reason. At this point I would rather remove the NON_BLANK_VALIDATOR than break the completeness of this check.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi] pgyori commented on a change in pull request #4481: NIFI-7624: ListenFTP processor

Posted by GitBox <gi...@apache.org>.
pgyori commented on a change in pull request #4481:
URL: https://github.com/apache/nifi/pull/4481#discussion_r483101237



##########
File path: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenFTP.java
##########
@@ -0,0 +1,215 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.standard;
+
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.annotation.lifecycle.OnStopped;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.processor.AbstractSessionFactoryProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSessionFactory;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.processors.standard.ftp.NifiFtpServer;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicReference;
+
+@InputRequirement(Requirement.INPUT_FORBIDDEN)
+@Tags({"ingest", "ftp", "listen"})
+@CapabilityDescription("Starts an FTP Server and listens on a given port to transform incoming files into FlowFiles. "
+        + "The URI of the Service will be ftp://{hostname}:{port}. The default port is 2221.")
+public class ListenFTP extends AbstractSessionFactoryProcessor {
+
+    public static final Relationship RELATIONSHIP_SUCCESS = new Relationship.Builder()
+            .name("success")
+            .description("Relationship for successfully received files")
+            .build();
+
+    public static final PropertyDescriptor BIND_ADDRESS = new PropertyDescriptor.Builder()
+            .name("bind-address")
+            .displayName("Bind Address")
+            .description("The address the FTP server should be bound to. If not provided, the server binds to all available addresses.")
+            .required(false)
+            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .build();
+
+    public static final PropertyDescriptor PORT = new PropertyDescriptor.Builder()
+            .name("listening-port")
+            .displayName("Listening Port")
+            .description("The Port to listen on for incoming connections. On Linux, root privileges are required to use port numbers below 1024.")
+            .required(true)
+            .defaultValue("2221")
+            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .addValidator(StandardValidators.PORT_VALIDATOR)
+            .build();
+
+    public static final PropertyDescriptor USERNAME = new PropertyDescriptor.Builder()
+            .name("username")
+            .displayName("Username")
+            .description("The name of the user that is allowed to log in to the FTP server. " +
+                    "If a username is provided, a password must also be provided. " +
+                    "If no username is specified, anonymous connections will be permitted.")
+            .required(false)
+            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+            .build();
+
+    public static final PropertyDescriptor PASSWORD = new PropertyDescriptor.Builder()
+            .name("password")
+            .displayName("Password")
+            .description("If a Username is specified, then a password must also be specified. " +
+                    "The password provided by the client trying to log in to the FTP server will be checked against this password.")
+            .required(false)
+            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+            .sensitive(true)
+            .build();
+
+    private static final List<PropertyDescriptor> PROPERTIES = Collections.unmodifiableList(Arrays.asList(
+            BIND_ADDRESS,
+            PORT,
+            USERNAME,
+            PASSWORD
+    ));
+
+    private static final Set<Relationship> RELATIONSHIPS = Collections.unmodifiableSet(new HashSet<>(Collections.singletonList(
+            RELATIONSHIP_SUCCESS
+    )));
+
+    private volatile NifiFtpServer ftpServer;
+    private volatile CountDownLatch sessionFactorySetSignal;
+    private final AtomicReference<ProcessSessionFactory> sessionFactory = new AtomicReference<>();
+
+    @Override
+    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return PROPERTIES;
+    }
+
+    @Override
+    public Set<Relationship> getRelationships() {
+        return RELATIONSHIPS;
+    }
+
+    @OnScheduled
+    public void startFtpServer(ProcessContext context) {
+        if (ftpServer == null) {
+            String username = context.getProperty(USERNAME).evaluateAttributeExpressions().getValue();
+            String password = context.getProperty(PASSWORD).evaluateAttributeExpressions().getValue();
+            String bindAddress = context.getProperty(BIND_ADDRESS).evaluateAttributeExpressions().getValue();
+            int port = context.getProperty(PORT).evaluateAttributeExpressions().asInteger();
+
+            try {
+                sessionFactorySetSignal = new CountDownLatch(1);
+                ftpServer = new NifiFtpServer.Builder()
+                        .sessionFactory(sessionFactory)
+                        .sessionFactorySetSignal(sessionFactorySetSignal)
+                        .bindAddress(bindAddress)
+                        .port(port)
+                        .username(username)
+                        .password(password)
+                        .build();
+                ftpServer.start();
+            } catch (ProcessException processException) {
+                getLogger().error(processException.getMessage(), processException);
+                stopFtpServer();
+                throw processException;
+            }
+        } else {
+            getLogger().warn("Ftp server already started.");
+        }
+    }
+
+    @OnStopped
+    public void stopFtpServer() {
+        if (ftpServer != null && !ftpServer.isStopped()) {
+            ftpServer.stop();
+        }
+        ftpServer = null;
+    }
+
+    @Override
+    public void onTrigger(ProcessContext context, ProcessSessionFactory sessionFactory) throws ProcessException {
+        if (this.sessionFactory.compareAndSet(null, sessionFactory)) {
+            sessionFactorySetSignal.countDown();
+        }
+        context.yield();
+    }
+
+    @Override
+    protected Collection<ValidationResult> customValidate(ValidationContext context) {
+        List<ValidationResult> results = new ArrayList<>(2);
+        String username = context.getProperty(USERNAME).evaluateAttributeExpressions().getValue();
+        String password = context.getProperty(PASSWORD).evaluateAttributeExpressions().getValue();
+
+        if ((username == null) && (password != null)) {
+            results.add(usernameOrPasswordIsNull(USERNAME, PASSWORD));
+        } else if ((username != null) && (password == null)) {
+            results.add(usernameOrPasswordIsNull(PASSWORD, USERNAME));
+        } else if ((username != null) && (password != null)) {
+            validateAgainstEmptyString(username, USERNAME, results);
+            validateAgainstEmptyString(password, PASSWORD, results);
+        }
+        return results;
+    }
+
+    private ValidationResult usernameOrPasswordIsNull(PropertyDescriptor nullProperty, PropertyDescriptor nonNullProperty) {

Review comment:
       Actually, this is more explicit this way when seen on the UI because it points out which property is left blank. In the ValidationResult, the problematic property is provided (the one that is null in this case), and the framework appends this error message to that. So when the username is left blank, it says "'Username' is invalid because 'Username' and 'Password' should either both be provided or none of them", and if the password is left blank, it says "'Password' is invalid because 'Password' and 'Username' should either both be provided or none of them". The variable order is a small trade-off for this piece of code to be reusable.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi] pgyori commented on a change in pull request #4481: NIFI-7624: ListenFTP processor

Posted by GitBox <gi...@apache.org>.
pgyori commented on a change in pull request #4481:
URL: https://github.com/apache/nifi/pull/4481#discussion_r484903148



##########
File path: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ftp/commands/FtpCommandSTOR.java
##########
@@ -0,0 +1,233 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.standard.ftp.commands;
+
+import org.apache.ftpserver.command.AbstractCommand;
+import org.apache.ftpserver.ftplet.DataConnection;
+import org.apache.ftpserver.ftplet.DataConnectionFactory;
+import org.apache.ftpserver.ftplet.DefaultFtpReply;
+import org.apache.ftpserver.ftplet.FtpException;
+import org.apache.ftpserver.ftplet.FtpFile;
+import org.apache.ftpserver.ftplet.FtpReply;
+import org.apache.ftpserver.ftplet.FtpRequest;
+import org.apache.ftpserver.impl.FtpIoSession;
+import org.apache.ftpserver.impl.FtpServerContext;
+import org.apache.ftpserver.impl.IODataConnectionFactory;
+import org.apache.ftpserver.impl.LocalizedDataTransferFtpReply;
+import org.apache.ftpserver.impl.LocalizedFtpReply;
+import org.apache.ftpserver.impl.ServerFtpStatistics;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.ProcessSessionFactory;
+import org.apache.nifi.processors.standard.ListenFTP;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.net.InetAddress;
+import java.net.SocketException;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicReference;
+
+public class FtpCommandSTOR extends AbstractCommand {
+
+    private static final Logger LOG = LoggerFactory.getLogger(FtpCommandSTOR.class);
+    private final AtomicReference<ProcessSessionFactory> sessionFactory;
+    private final CountDownLatch sessionFactorySetSignal;
+
+    public FtpCommandSTOR(AtomicReference<ProcessSessionFactory> sessionFactory, CountDownLatch sessionFactorySetSignal) {
+        this.sessionFactory = sessionFactory;
+        this.sessionFactorySetSignal = sessionFactorySetSignal;
+    }
+
+    /**
+     * Execute command.
+     */
+    public void execute(final FtpIoSession ftpSession, final FtpServerContext context, final FtpRequest request) {
+        try {
+            executeCommand(ftpSession, context, request);
+        } catch (FtpCommandException ftpCommandException) {
+            if (ftpCommandException.getSubId() == null) {
+                ftpSession.write(new DefaultFtpReply(ftpCommandException.getFtpReturnCode(), ftpCommandException.getBasicMessage()));
+            } else {
+                ftpSession.write(LocalizedDataTransferFtpReply.translate(ftpSession, request, context,
+                        ftpCommandException.getFtpReturnCode(),
+                        ftpCommandException.getSubId(),
+                        ftpCommandException.getBasicMessage(),
+                        ftpCommandException.getFtpFile()));
+            }
+        } finally {
+            ftpSession.resetState();
+            ftpSession.getDataConnection().closeDataConnection();
+        }
+    }
+
+    private void executeCommand(FtpIoSession ftpSession, FtpServerContext context, FtpRequest request)
+            throws FtpCommandException {
+
+        final String fileName = getArgument(request);
+
+        checkDataConnection(ftpSession);
+
+        final FtpFile ftpFile = getFtpFile(ftpSession, fileName);
+
+        checkWritePermission(ftpFile);
+
+        sendReturnCode150(ftpSession, context, request, ftpFile.getAbsolutePath());
+
+        final DataConnection dataConnection = openDataConnection(ftpSession, ftpFile);
+
+        transferData(dataConnection, ftpSession, context, request, ftpFile);
+    }
+
+    private String getArgument(final FtpRequest request) throws FtpCommandException {
+        final String argument = request.getArgument();
+        if (argument == null) {
+            throw new FtpCommandException(FtpReply.REPLY_501_SYNTAX_ERROR_IN_PARAMETERS_OR_ARGUMENTS, "STOR", null, null);

Review comment:
       This must be the subID because it is passed on to the ftpserver library which uses it as the key to look up the correct reply message which might be subject to translation.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi] pgyori commented on a change in pull request #4481: NIFI-7624: ListenFTP processor

Posted by GitBox <gi...@apache.org>.
pgyori commented on a change in pull request #4481:
URL: https://github.com/apache/nifi/pull/4481#discussion_r485802826



##########
File path: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ftp/filesystem/VirtualFtpFile.java
##########
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.standard.ftp.filesystem;
+
+import org.apache.ftpserver.ftplet.FtpFile;
+
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.Calendar;
+import java.util.List;
+
+public class VirtualFtpFile implements FtpFile {
+
+    private final VirtualPath path;
+    private final VirtualFileSystem fileSystem;
+
+    public VirtualFtpFile(VirtualPath path, VirtualFileSystem fileSystem) throws IllegalArgumentException {
+        if (path == null || fileSystem == null) {
+            throw new IllegalArgumentException("File path and fileSystem cannot be null");
+        }
+        this.path = path;
+        this.fileSystem = fileSystem;
+    }
+
+    @Override
+    public String getAbsolutePath() {
+        return path.toString();
+    }
+
+    @Override
+    public String getName() {
+        return path.getFileName();
+    }
+
+    @Override
+    public boolean isHidden() {
+        return false;
+    }
+
+    @Override
+    public boolean isDirectory() {
+        return true; // Only directories are handled since files are converted into flowfiles immediately.
+    }
+
+    @Override
+    public boolean isFile() {
+        return false; // Only directories are handled since files are converted into flowfiles immediately.
+    }
+
+    @Override
+    public boolean doesExist() {
+        return fileSystem.exists(path);
+    }
+
+    @Override
+    public boolean isReadable() {
+        return true;
+    }
+
+    @Override
+    public boolean isWritable() {
+        return true;
+    }
+
+    @Override
+    public boolean isRemovable() {
+        return true; //Every virtual directory can be deleted
+    }
+
+    @Override
+    public String getOwnerName() {
+        return "Owner";

Review comment:
       In the original FtpServer library, the getOwnerName() and getGroupName() methods also seem to be stubs, returning only the strings "user" and "group", so to keep things simple I followed this practice.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi] simonbence commented on a change in pull request #4481: NIFI-7624: ListenFTP processor

Posted by GitBox <gi...@apache.org>.
simonbence commented on a change in pull request #4481:
URL: https://github.com/apache/nifi/pull/4481#discussion_r475555095



##########
File path: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenFTP.java
##########
@@ -0,0 +1,215 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.standard;
+
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.annotation.lifecycle.OnStopped;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.processor.AbstractSessionFactoryProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSessionFactory;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.processors.standard.ftp.NifiFtpServer;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicReference;
+
+@InputRequirement(Requirement.INPUT_FORBIDDEN)
+@Tags({"ingest", "ftp", "listen"})
+@CapabilityDescription("Starts an FTP Server and listens on a given port to transform incoming files into FlowFiles. "
+        + "The URI of the Service will be ftp://{hostname}:{port}. The default port is 2221.")
+public class ListenFTP extends AbstractSessionFactoryProcessor {
+
+    public static final Relationship RELATIONSHIP_SUCCESS = new Relationship.Builder()
+            .name("success")
+            .description("Relationship for successfully received files")
+            .build();
+
+    public static final PropertyDescriptor BIND_ADDRESS = new PropertyDescriptor.Builder()
+            .name("bind-address")
+            .displayName("Bind Address")
+            .description("The address the FTP server should be bound to. If not provided, the server binds to all available addresses.")

Review comment:
       Could you please specify what are available addresses? Based on the code, my understanding is the following: only a singular bindAddress will be used in order to create a listener. That address is specified here. If it is not specified properly, the Listener will be created with incorrect URI, most probably resulting misbehaviour. It is possible that I did not realise some details, but could you please double-check? (and extend the description please)

##########
File path: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenFTP.java
##########
@@ -0,0 +1,215 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.standard;
+
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.annotation.lifecycle.OnStopped;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.processor.AbstractSessionFactoryProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSessionFactory;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.processors.standard.ftp.NifiFtpServer;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicReference;
+
+@InputRequirement(Requirement.INPUT_FORBIDDEN)
+@Tags({"ingest", "ftp", "listen"})
+@CapabilityDescription("Starts an FTP Server and listens on a given port to transform incoming files into FlowFiles. "

Review comment:
       Minor:
   
   "Starts and FTP server which listens on the specified port and transforms uploaded files into FlowFiles" might sound a little more precise. Also, Service might not be capital

##########
File path: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml
##########
@@ -390,6 +390,10 @@
             <version>1.12.0-SNAPSHOT</version>
             <scope>compile</scope>
         </dependency>
+        <dependency>
+            <groupId>org.hamcrest</groupId>
+            <artifactId>hamcrest-all</artifactId>

Review comment:
       I think this should be enough to be added with test scope

##########
File path: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ftp/commands/FtpCommandSTOR.java
##########
@@ -0,0 +1,233 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.standard.ftp.commands;
+
+import org.apache.ftpserver.command.AbstractCommand;
+import org.apache.ftpserver.ftplet.DataConnection;
+import org.apache.ftpserver.ftplet.DataConnectionFactory;
+import org.apache.ftpserver.ftplet.DefaultFtpReply;
+import org.apache.ftpserver.ftplet.FtpException;
+import org.apache.ftpserver.ftplet.FtpFile;
+import org.apache.ftpserver.ftplet.FtpReply;
+import org.apache.ftpserver.ftplet.FtpRequest;
+import org.apache.ftpserver.impl.FtpIoSession;
+import org.apache.ftpserver.impl.FtpServerContext;
+import org.apache.ftpserver.impl.IODataConnectionFactory;
+import org.apache.ftpserver.impl.LocalizedDataTransferFtpReply;
+import org.apache.ftpserver.impl.LocalizedFtpReply;
+import org.apache.ftpserver.impl.ServerFtpStatistics;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.ProcessSessionFactory;
+import org.apache.nifi.processors.standard.ListenFTP;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.net.InetAddress;
+import java.net.SocketException;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicReference;
+
+public class FtpCommandSTOR extends AbstractCommand {
+
+    private static final Logger LOG = LoggerFactory.getLogger(FtpCommandSTOR.class);
+    private final AtomicReference<ProcessSessionFactory> sessionFactory;
+    private final CountDownLatch sessionFactorySetSignal;
+
+    public FtpCommandSTOR(AtomicReference<ProcessSessionFactory> sessionFactory, CountDownLatch sessionFactorySetSignal) {
+        this.sessionFactory = sessionFactory;
+        this.sessionFactorySetSignal = sessionFactorySetSignal;
+    }
+
+    /**
+     * Execute command.
+     */
+    public void execute(final FtpIoSession ftpSession, final FtpServerContext context, final FtpRequest request) {
+        try {
+            executeCommand(ftpSession, context, request);
+        } catch (FtpCommandException ftpCommandException) {
+            if (ftpCommandException.getSubId() == null) {
+                ftpSession.write(new DefaultFtpReply(ftpCommandException.getFtpReturnCode(), ftpCommandException.getBasicMessage()));
+            } else {
+                ftpSession.write(LocalizedDataTransferFtpReply.translate(ftpSession, request, context,
+                        ftpCommandException.getFtpReturnCode(),
+                        ftpCommandException.getSubId(),
+                        ftpCommandException.getBasicMessage(),
+                        ftpCommandException.getFtpFile()));
+            }
+        } finally {
+            ftpSession.resetState();
+            ftpSession.getDataConnection().closeDataConnection();
+        }
+    }
+
+    private void executeCommand(FtpIoSession ftpSession, FtpServerContext context, FtpRequest request)
+            throws FtpCommandException {
+
+        final String fileName = getArgument(request);
+
+        checkDataConnection(ftpSession);
+
+        final FtpFile ftpFile = getFtpFile(ftpSession, fileName);
+
+        checkWritePermission(ftpFile);
+
+        sendReturnCode150(ftpSession, context, request, ftpFile.getAbsolutePath());
+
+        final DataConnection dataConnection = openDataConnection(ftpSession, ftpFile);
+
+        transferData(dataConnection, ftpSession, context, request, ftpFile);
+    }
+
+    private String getArgument(final FtpRequest request) throws FtpCommandException {
+        final String argument = request.getArgument();
+        if (argument == null) {
+            throw new FtpCommandException(FtpReply.REPLY_501_SYNTAX_ERROR_IN_PARAMETERS_OR_ARGUMENTS, "STOR", null, null);
+        }
+        return argument;
+    }
+
+    private void checkDataConnection(final FtpIoSession ftpSession) throws FtpCommandException {
+        DataConnectionFactory dataConnectionFactory = ftpSession.getDataConnection();
+        if (dataConnectionFactory instanceof IODataConnectionFactory) {
+            InetAddress address = ((IODataConnectionFactory) dataConnectionFactory)
+                    .getInetAddress();
+            if (address == null) {
+                throw new FtpCommandException(FtpReply.REPLY_503_BAD_SEQUENCE_OF_COMMANDS, null, "PORT or PASV must be issued first", null);
+            }
+        }
+    }
+
+    private FtpFile getFtpFile(final FtpIoSession ftpSession, final String fileName) throws FtpCommandException {
+        FtpFile ftpFile = null;
+        try {
+            ftpFile = ftpSession.getFileSystemView().getFile(fileName);
+        } catch (FtpException e) {
+            LOG.error("Exception getting file object", e);
+        }
+        if (ftpFile == null) {
+            throw new FtpCommandException(FtpReply.REPLY_550_REQUESTED_ACTION_NOT_TAKEN, "STOR.invalid", fileName, ftpFile);
+        }
+        return ftpFile;
+    }
+
+    private void checkWritePermission(final FtpFile ftpFile) throws FtpCommandException {
+        if (!ftpFile.isWritable()) {
+            throw new FtpCommandException(FtpReply.REPLY_550_REQUESTED_ACTION_NOT_TAKEN, "STOR.permission", ftpFile.getAbsolutePath(), ftpFile);
+        }
+    }
+
+    private void sendReturnCode150(final FtpIoSession ftpSession, final FtpServerContext context, final FtpRequest request, final String fileAbsolutePath) {
+        ftpSession.write(LocalizedFtpReply.translate(ftpSession, request, context,
+                FtpReply.REPLY_150_FILE_STATUS_OKAY,
+                "STOR",
+                fileAbsolutePath)).awaitUninterruptibly(10000);
+    }
+
+    private DataConnection openDataConnection(final FtpIoSession ftpSession, final FtpFile ftpFile) throws FtpCommandException {
+        final DataConnection dataConnection;
+        try {
+            dataConnection = ftpSession.getDataConnection().openConnection();
+        } catch (Exception exception) {
+            LOG.error("Exception getting the input data stream", exception);
+            throw new FtpCommandException(FtpReply.REPLY_425_CANT_OPEN_DATA_CONNECTION,
+                    "STOR",
+                    ftpFile.getAbsolutePath(),
+                    ftpFile);
+        }
+        return dataConnection;
+    }
+
+    private void transferData(final DataConnection dataConnection, final FtpIoSession ftpSession,
+                              final FtpServerContext context, final FtpRequest request, final FtpFile ftpFile)
+            throws FtpCommandException {
+
+        final ProcessSession processSession;
+        try {
+            processSession = createProcessSession();
+        } catch (InterruptedException|TimeoutException exception) {
+            LOG.error("ProcessSession could not be acquired, command STOR aborted.", exception);
+            throw new FtpCommandException(FtpReply.REPLY_425_CANT_OPEN_DATA_CONNECTION, null, "File transfer failed.", null);
+        }
+        FlowFile flowFile = processSession.create();
+        long transferredBytes = 0L;
+        try (OutputStream flowFileOutputStream = processSession.write(flowFile)) {
+            transferredBytes = dataConnection.transferFromClient(ftpSession.getFtpletSession(), flowFileOutputStream);
+            LOG.info("File received {}", ftpFile.getAbsolutePath());
+        } catch (SocketException socketException) {
+            LOG.error("Socket exception during data transfer", socketException);
+            processSession.rollback();
+            throw new FtpCommandException(FtpReply.REPLY_426_CONNECTION_CLOSED_TRANSFER_ABORTED,
+                    "STOR",

Review comment:
       Will this STOR subId will be truth in case of STOU command? (In which case STOR runs at the moment)

##########
File path: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenFTP.java
##########
@@ -0,0 +1,215 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.standard;
+
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.annotation.lifecycle.OnStopped;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.processor.AbstractSessionFactoryProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSessionFactory;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.processors.standard.ftp.NifiFtpServer;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicReference;
+
+@InputRequirement(Requirement.INPUT_FORBIDDEN)
+@Tags({"ingest", "ftp", "listen"})
+@CapabilityDescription("Starts an FTP Server and listens on a given port to transform incoming files into FlowFiles. "
+        + "The URI of the Service will be ftp://{hostname}:{port}. The default port is 2221.")
+public class ListenFTP extends AbstractSessionFactoryProcessor {
+
+    public static final Relationship RELATIONSHIP_SUCCESS = new Relationship.Builder()
+            .name("success")
+            .description("Relationship for successfully received files")
+            .build();
+
+    public static final PropertyDescriptor BIND_ADDRESS = new PropertyDescriptor.Builder()
+            .name("bind-address")
+            .displayName("Bind Address")
+            .description("The address the FTP server should be bound to. If not provided, the server binds to all available addresses.")
+            .required(false)
+            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .build();
+
+    public static final PropertyDescriptor PORT = new PropertyDescriptor.Builder()
+            .name("listening-port")
+            .displayName("Listening Port")
+            .description("The Port to listen on for incoming connections. On Linux, root privileges are required to use port numbers below 1024.")
+            .required(true)
+            .defaultValue("2221")
+            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .addValidator(StandardValidators.PORT_VALIDATOR)
+            .build();
+
+    public static final PropertyDescriptor USERNAME = new PropertyDescriptor.Builder()
+            .name("username")
+            .displayName("Username")
+            .description("The name of the user that is allowed to log in to the FTP server. " +
+                    "If a username is provided, a password must also be provided. " +
+                    "If no username is specified, anonymous connections will be permitted.")
+            .required(false)
+            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+            .build();
+
+    public static final PropertyDescriptor PASSWORD = new PropertyDescriptor.Builder()
+            .name("password")
+            .displayName("Password")
+            .description("If a Username is specified, then a password must also be specified. " +
+                    "The password provided by the client trying to log in to the FTP server will be checked against this password.")
+            .required(false)
+            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+            .sensitive(true)
+            .build();
+
+    private static final List<PropertyDescriptor> PROPERTIES = Collections.unmodifiableList(Arrays.asList(
+            BIND_ADDRESS,
+            PORT,
+            USERNAME,
+            PASSWORD
+    ));
+
+    private static final Set<Relationship> RELATIONSHIPS = Collections.unmodifiableSet(new HashSet<>(Collections.singletonList(
+            RELATIONSHIP_SUCCESS
+    )));
+
+    private volatile NifiFtpServer ftpServer;
+    private volatile CountDownLatch sessionFactorySetSignal;
+    private final AtomicReference<ProcessSessionFactory> sessionFactory = new AtomicReference<>();
+
+    @Override
+    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return PROPERTIES;
+    }
+
+    @Override
+    public Set<Relationship> getRelationships() {
+        return RELATIONSHIPS;
+    }
+
+    @OnScheduled
+    public void startFtpServer(ProcessContext context) {
+        if (ftpServer == null) {
+            String username = context.getProperty(USERNAME).evaluateAttributeExpressions().getValue();
+            String password = context.getProperty(PASSWORD).evaluateAttributeExpressions().getValue();
+            String bindAddress = context.getProperty(BIND_ADDRESS).evaluateAttributeExpressions().getValue();
+            int port = context.getProperty(PORT).evaluateAttributeExpressions().asInteger();
+
+            try {
+                sessionFactorySetSignal = new CountDownLatch(1);
+                ftpServer = new NifiFtpServer.Builder()
+                        .sessionFactory(sessionFactory)
+                        .sessionFactorySetSignal(sessionFactorySetSignal)
+                        .bindAddress(bindAddress)
+                        .port(port)
+                        .username(username)
+                        .password(password)
+                        .build();
+                ftpServer.start();
+            } catch (ProcessException processException) {
+                getLogger().error(processException.getMessage(), processException);
+                stopFtpServer();
+                throw processException;
+            }
+        } else {
+            getLogger().warn("Ftp server already started.");
+        }
+    }
+
+    @OnStopped
+    public void stopFtpServer() {
+        if (ftpServer != null && !ftpServer.isStopped()) {
+            ftpServer.stop();
+        }
+        ftpServer = null;
+    }
+
+    @Override
+    public void onTrigger(ProcessContext context, ProcessSessionFactory sessionFactory) throws ProcessException {
+        if (this.sessionFactory.compareAndSet(null, sessionFactory)) {
+            sessionFactorySetSignal.countDown();
+        }
+        context.yield();
+    }
+
+    @Override
+    protected Collection<ValidationResult> customValidate(ValidationContext context) {
+        List<ValidationResult> results = new ArrayList<>(2);
+        String username = context.getProperty(USERNAME).evaluateAttributeExpressions().getValue();
+        String password = context.getProperty(PASSWORD).evaluateAttributeExpressions().getValue();
+
+        if ((username == null) && (password != null)) {
+            results.add(usernameOrPasswordIsNull(USERNAME, PASSWORD));
+        } else if ((username != null) && (password == null)) {
+            results.add(usernameOrPasswordIsNull(PASSWORD, USERNAME));
+        } else if ((username != null) && (password != null)) {
+            validateAgainstEmptyString(username, USERNAME, results);
+            validateAgainstEmptyString(password, PASSWORD, results);
+        }
+        return results;
+    }
+
+    private ValidationResult usernameOrPasswordIsNull(PropertyDescriptor nullProperty, PropertyDescriptor nonNullProperty) {
+        String explanation = String.format("'%s' and '%s' should either both be provided or none of them", nullProperty.getDisplayName(), nonNullProperty.getDisplayName());
+        return createValidationResult(nullProperty.getDisplayName(), explanation);
+    }
+
+    private void validateAgainstEmptyString(String propertyValue, PropertyDescriptor property, Collection<ValidationResult> validationResults) {
+        if (propertyValue.isBlank()) {
+            if (propertyValue.isEmpty()) {
+                validationResults.add(propertyIsEmptyString(property));
+            } else {
+                validationResults.add(propertyContainsOnlyWhitespace(property));
+            }
+        }
+    }
+
+    private ValidationResult propertyIsEmptyString(PropertyDescriptor property) {
+        String explanation = String.format("'%s' cannot be an empty string", property.getDisplayName());
+        return createValidationResult(property.getDisplayName(), explanation);
+    }
+
+    private ValidationResult propertyContainsOnlyWhitespace(PropertyDescriptor property) {

Review comment:
       I think it would be nicer to add this into the PropertyDescriptor(s) as an additional validator

##########
File path: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ftp/commands/FtpCommandHELP.java
##########
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.standard.ftp.commands;
+
+import org.apache.ftpserver.command.AbstractCommand;
+import org.apache.ftpserver.ftplet.DefaultFtpReply;
+import org.apache.ftpserver.ftplet.FtpReply;
+import org.apache.ftpserver.ftplet.FtpRequest;
+import org.apache.ftpserver.impl.FtpIoSession;
+import org.apache.ftpserver.impl.FtpServerContext;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeSet;
+
+public class FtpCommandHELP extends AbstractCommand {
+
+    private static Map<String, String> COMMAND_SPECIFIC_HELP;
+    private Set<String> availableCommands = new TreeSet<>();
+
+    static {
+        Map<String, String> commands = new HashMap<>();
+        commands.put("ABOR", "Syntax: ABOR");
+        commands.put("APPE", "Syntax: APPE <sp> <pathname>");
+        commands.put("AUTH", "Syntax: AUTH <sp> <security_mechanism>");
+        commands.put("CDUP", "Syntax: CDUP");
+        commands.put("CWD", "Syntax: CWD <sp> <pathname>");
+        commands.put("DELE", "Syntax: DELE <sp> <pathname>");
+        commands.put("EPRT", "Syntax: EPRT<space><d><net-prt><d><net-addr><d><tcp-port><d>");
+        commands.put("EPSV", "Syntax: EPSV");
+        commands.put("FEAT", "Syntax: FEAT");
+        commands.put("HELP", "Syntax: HELP [<sp> <string>]");
+        commands.put("LIST", "Syntax: LIST [<sp> <pathname>]");
+        commands.put("MDTM", "Syntax: MDTM <sp> <pathname>");
+        commands.put("MKD", "Syntax: MKD <sp> <pathname>");
+        commands.put("MLSD", "Syntax: MLSD [<sp> <pathname>]");
+        commands.put("MLST", "Syntax: MLST [<sp> <pathname>]");
+        commands.put("MODE", "Syntax: MODE <sp> <mode-code>");
+        commands.put("NLST", "Syntax: NLST [<sp> <pathname>]");
+        commands.put("NOOP", "Syntax: NOOP");
+        commands.put("OPTS", "Syntax: OPTS <sp> <options>");
+        commands.put("PASS", "Syntax: PASS <sp> <password>");
+        commands.put("PASV", "Syntax: PASV");
+        commands.put("PBSZ", "Syntax: PBSZ <sp> <buffer_size>");
+        commands.put("PORT", "Syntax: PORT <sp> <host-port>");
+        commands.put("PROT", "Syntax: PROT <sp> <protection_level>");
+        commands.put("PWD", "Syntax: PWD");
+        commands.put("QUIT", "Syntax: QUIT");
+        commands.put("REIN", "Syntax: REIN");
+        commands.put("REST", "Syntax: REST <sp> <marker>");
+        commands.put("RETR", "Syntax: RETR <sp> <pathname>");
+        commands.put("RMD", "Syntax: RMD <sp> <pathname>");
+        commands.put("RNFR", "Syntax: RNFR <sp> <pathname>");
+        commands.put("RNTO", "Syntax: RNTO <sp> <pathname>");
+        commands.put("SITE", "Syntax: SITE <sp> <string>");
+        commands.put("SIZE", "Syntax: SIZE <sp> <pathname>");
+        commands.put("STAT", "Syntax: STAT [<sp> <pathname>]");
+        commands.put("STOR", "Syntax: STOR <sp> <pathname>");
+        commands.put("STOU", "Syntax: STOU");
+        commands.put("SYST", "Syntax: SYST");
+        commands.put("TYPE", "Syntax: TYPE <sp> <type-code>");
+        commands.put("USER", "Syntax: USER <sp> <username>");
+        COMMAND_SPECIFIC_HELP = Collections.unmodifiableMap(commands);
+    }
+
+    public void addCommand(String command) {
+        if (!command.startsWith("SITE_")) { // Parameterized commands of SITE will not appear in the general help.
+            availableCommands.add(command);
+        }
+    }
+
+    /**
+     * Execute command.
+     */
+    public void execute(final FtpIoSession session,
+                        final FtpServerContext context, final FtpRequest request) {
+
+        // reset state variables
+        session.resetState();
+
+        if (!request.hasArgument()) {
+            sendDefaultHelpMessage(session);
+        } else {
+            handleRequestWithArgument(session, request);
+        }
+    }
+
+    private void sendDefaultHelpMessage(FtpIoSession session) {
+        sendCustomHelpMessage(session, getDefaultHelpMessage());
+    }
+
+    private String getDefaultHelpMessage() {
+        StringBuffer helpMessage = new StringBuffer("The following commands are supported.\n");
+        int currentNumberOfCommandsInARow = 0;
+        int maxNumberOfCommandsInARow = 5;
+        for (String command : availableCommands) {
+            if (currentNumberOfCommandsInARow == maxNumberOfCommandsInARow) {
+                helpMessage.append("\n");
+                currentNumberOfCommandsInARow = 0;
+            }
+            helpMessage.append(command + ", ");

Review comment:
       Minor: If I am right, this will generate a comma at the end. Java Stream API provides Collectors.joining(), which helps with this (of course, the line end must be added separately)

##########
File path: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ftp/commands/FtpCommandSTOR.java
##########
@@ -0,0 +1,233 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.standard.ftp.commands;
+
+import org.apache.ftpserver.command.AbstractCommand;
+import org.apache.ftpserver.ftplet.DataConnection;
+import org.apache.ftpserver.ftplet.DataConnectionFactory;
+import org.apache.ftpserver.ftplet.DefaultFtpReply;
+import org.apache.ftpserver.ftplet.FtpException;
+import org.apache.ftpserver.ftplet.FtpFile;
+import org.apache.ftpserver.ftplet.FtpReply;
+import org.apache.ftpserver.ftplet.FtpRequest;
+import org.apache.ftpserver.impl.FtpIoSession;
+import org.apache.ftpserver.impl.FtpServerContext;
+import org.apache.ftpserver.impl.IODataConnectionFactory;
+import org.apache.ftpserver.impl.LocalizedDataTransferFtpReply;
+import org.apache.ftpserver.impl.LocalizedFtpReply;
+import org.apache.ftpserver.impl.ServerFtpStatistics;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.ProcessSessionFactory;
+import org.apache.nifi.processors.standard.ListenFTP;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.net.InetAddress;
+import java.net.SocketException;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicReference;
+
+public class FtpCommandSTOR extends AbstractCommand {
+
+    private static final Logger LOG = LoggerFactory.getLogger(FtpCommandSTOR.class);
+    private final AtomicReference<ProcessSessionFactory> sessionFactory;
+    private final CountDownLatch sessionFactorySetSignal;
+
+    public FtpCommandSTOR(AtomicReference<ProcessSessionFactory> sessionFactory, CountDownLatch sessionFactorySetSignal) {
+        this.sessionFactory = sessionFactory;
+        this.sessionFactorySetSignal = sessionFactorySetSignal;
+    }
+
+    /**
+     * Execute command.
+     */
+    public void execute(final FtpIoSession ftpSession, final FtpServerContext context, final FtpRequest request) {
+        try {
+            executeCommand(ftpSession, context, request);
+        } catch (FtpCommandException ftpCommandException) {
+            if (ftpCommandException.getSubId() == null) {
+                ftpSession.write(new DefaultFtpReply(ftpCommandException.getFtpReturnCode(), ftpCommandException.getBasicMessage()));
+            } else {
+                ftpSession.write(LocalizedDataTransferFtpReply.translate(ftpSession, request, context,
+                        ftpCommandException.getFtpReturnCode(),
+                        ftpCommandException.getSubId(),
+                        ftpCommandException.getBasicMessage(),
+                        ftpCommandException.getFtpFile()));
+            }
+        } finally {
+            ftpSession.resetState();
+            ftpSession.getDataConnection().closeDataConnection();
+        }
+    }
+
+    private void executeCommand(FtpIoSession ftpSession, FtpServerContext context, FtpRequest request)
+            throws FtpCommandException {
+
+        final String fileName = getArgument(request);
+
+        checkDataConnection(ftpSession);
+
+        final FtpFile ftpFile = getFtpFile(ftpSession, fileName);
+
+        checkWritePermission(ftpFile);
+
+        sendReturnCode150(ftpSession, context, request, ftpFile.getAbsolutePath());

Review comment:
       Naming: sendFileStatusOkay or alike would be more descriptive

##########
File path: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ftp/filesystem/DefaultVirtualFileSystem.java
##########
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.standard.ftp.filesystem;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+public class DefaultVirtualFileSystem implements VirtualFileSystem {
+
+    private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
+    private final List<VirtualPath> existingPaths;

Review comment:
       If the ordering of the paths are not important, I would suggest to use a set implementation to declaratively provide safety against duplication

##########
File path: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ftp/filesystem/VirtualFtpFile.java
##########
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.standard.ftp.filesystem;
+
+import org.apache.ftpserver.ftplet.FtpFile;
+
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.Calendar;
+import java.util.List;
+
+public class VirtualFtpFile implements FtpFile {
+
+    private final VirtualPath path;
+    private final VirtualFileSystem fileSystem;
+
+    public VirtualFtpFile(VirtualPath path, VirtualFileSystem fileSystem) throws IllegalArgumentException {
+        if (path == null || fileSystem == null) {
+            throw new IllegalArgumentException("File path and fileSystem cannot be null");
+        }
+        this.path = path;
+        this.fileSystem = fileSystem;
+    }
+
+    @Override
+    public String getAbsolutePath() {
+        return path.toString();
+    }
+
+    @Override
+    public String getName() {
+        return path.getFileName();
+    }
+
+    @Override
+    public boolean isHidden() {
+        return false;
+    }
+
+    @Override
+    public boolean isDirectory() {
+        return true; // Only directories are handled since files are converted into flowfiles immediately.
+    }
+
+    @Override
+    public boolean isFile() {
+        return false; // Only directories are handled since files are converted into flowfiles immediately.
+    }
+
+    @Override
+    public boolean doesExist() {
+        return fileSystem.exists(path);
+    }
+
+    @Override
+    public boolean isReadable() {
+        return true;
+    }
+
+    @Override
+    public boolean isWritable() {
+        return true;
+    }
+
+    @Override
+    public boolean isRemovable() {
+        return true; //Every virtual directory can be deleted
+    }
+
+    @Override
+    public String getOwnerName() {
+        return "Owner";
+    }
+
+    @Override
+    public String getGroupName() {
+        return "Group";
+    }
+
+    @Override
+    public int getLinkCount() {
+        return 1;
+    }
+
+    @Override
+    public long getLastModified() {
+        return Calendar.getInstance().getTimeInMillis();

Review comment:
       Minor: why not add a creation time timestamp during construction time?

##########
File path: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenFTP.java
##########
@@ -0,0 +1,215 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.standard;
+
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.annotation.lifecycle.OnStopped;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.processor.AbstractSessionFactoryProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSessionFactory;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.processors.standard.ftp.NifiFtpServer;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicReference;
+
+@InputRequirement(Requirement.INPUT_FORBIDDEN)
+@Tags({"ingest", "ftp", "listen"})
+@CapabilityDescription("Starts an FTP Server and listens on a given port to transform incoming files into FlowFiles. "
+        + "The URI of the Service will be ftp://{hostname}:{port}. The default port is 2221.")
+public class ListenFTP extends AbstractSessionFactoryProcessor {
+
+    public static final Relationship RELATIONSHIP_SUCCESS = new Relationship.Builder()
+            .name("success")
+            .description("Relationship for successfully received files")
+            .build();
+
+    public static final PropertyDescriptor BIND_ADDRESS = new PropertyDescriptor.Builder()
+            .name("bind-address")
+            .displayName("Bind Address")
+            .description("The address the FTP server should be bound to. If not provided, the server binds to all available addresses.")
+            .required(false)
+            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)

Review comment:
       Would not StandardValidators.URI_VALIDATOR be more accurate?

##########
File path: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenFTP.java
##########
@@ -0,0 +1,215 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.standard;
+
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.annotation.lifecycle.OnStopped;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.processor.AbstractSessionFactoryProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSessionFactory;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.processors.standard.ftp.NifiFtpServer;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicReference;
+
+@InputRequirement(Requirement.INPUT_FORBIDDEN)
+@Tags({"ingest", "ftp", "listen"})
+@CapabilityDescription("Starts an FTP Server and listens on a given port to transform incoming files into FlowFiles. "
+        + "The URI of the Service will be ftp://{hostname}:{port}. The default port is 2221.")
+public class ListenFTP extends AbstractSessionFactoryProcessor {
+
+    public static final Relationship RELATIONSHIP_SUCCESS = new Relationship.Builder()
+            .name("success")
+            .description("Relationship for successfully received files")
+            .build();
+
+    public static final PropertyDescriptor BIND_ADDRESS = new PropertyDescriptor.Builder()
+            .name("bind-address")
+            .displayName("Bind Address")
+            .description("The address the FTP server should be bound to. If not provided, the server binds to all available addresses.")

Review comment:
       Maybe "If not specified" or "If not set"

##########
File path: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenFTP.java
##########
@@ -0,0 +1,215 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.standard;
+
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.annotation.lifecycle.OnStopped;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.processor.AbstractSessionFactoryProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSessionFactory;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.processors.standard.ftp.NifiFtpServer;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicReference;
+
+@InputRequirement(Requirement.INPUT_FORBIDDEN)
+@Tags({"ingest", "ftp", "listen"})
+@CapabilityDescription("Starts an FTP Server and listens on a given port to transform incoming files into FlowFiles. "
+        + "The URI of the Service will be ftp://{hostname}:{port}. The default port is 2221.")
+public class ListenFTP extends AbstractSessionFactoryProcessor {
+
+    public static final Relationship RELATIONSHIP_SUCCESS = new Relationship.Builder()
+            .name("success")
+            .description("Relationship for successfully received files")
+            .build();
+
+    public static final PropertyDescriptor BIND_ADDRESS = new PropertyDescriptor.Builder()
+            .name("bind-address")
+            .displayName("Bind Address")
+            .description("The address the FTP server should be bound to. If not provided, the server binds to all available addresses.")
+            .required(false)
+            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .build();
+
+    public static final PropertyDescriptor PORT = new PropertyDescriptor.Builder()
+            .name("listening-port")
+            .displayName("Listening Port")
+            .description("The Port to listen on for incoming connections. On Linux, root privileges are required to use port numbers below 1024.")
+            .required(true)
+            .defaultValue("2221")
+            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .addValidator(StandardValidators.PORT_VALIDATOR)
+            .build();
+
+    public static final PropertyDescriptor USERNAME = new PropertyDescriptor.Builder()
+            .name("username")
+            .displayName("Username")
+            .description("The name of the user that is allowed to log in to the FTP server. " +
+                    "If a username is provided, a password must also be provided. " +
+                    "If no username is specified, anonymous connections will be permitted.")
+            .required(false)
+            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+            .build();
+
+    public static final PropertyDescriptor PASSWORD = new PropertyDescriptor.Builder()
+            .name("password")
+            .displayName("Password")
+            .description("If a Username is specified, then a password must also be specified. " +

Review comment:
       Minor: "If the Username..."

##########
File path: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenFTP.java
##########
@@ -0,0 +1,215 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.standard;
+
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.annotation.lifecycle.OnStopped;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.processor.AbstractSessionFactoryProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSessionFactory;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.processors.standard.ftp.NifiFtpServer;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicReference;
+
+@InputRequirement(Requirement.INPUT_FORBIDDEN)
+@Tags({"ingest", "ftp", "listen"})
+@CapabilityDescription("Starts an FTP Server and listens on a given port to transform incoming files into FlowFiles. "
+        + "The URI of the Service will be ftp://{hostname}:{port}. The default port is 2221.")
+public class ListenFTP extends AbstractSessionFactoryProcessor {
+
+    public static final Relationship RELATIONSHIP_SUCCESS = new Relationship.Builder()
+            .name("success")
+            .description("Relationship for successfully received files")
+            .build();
+
+    public static final PropertyDescriptor BIND_ADDRESS = new PropertyDescriptor.Builder()
+            .name("bind-address")
+            .displayName("Bind Address")
+            .description("The address the FTP server should be bound to. If not provided, the server binds to all available addresses.")
+            .required(false)
+            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .build();
+
+    public static final PropertyDescriptor PORT = new PropertyDescriptor.Builder()
+            .name("listening-port")
+            .displayName("Listening Port")
+            .description("The Port to listen on for incoming connections. On Linux, root privileges are required to use port numbers below 1024.")
+            .required(true)
+            .defaultValue("2221")
+            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .addValidator(StandardValidators.PORT_VALIDATOR)
+            .build();
+
+    public static final PropertyDescriptor USERNAME = new PropertyDescriptor.Builder()
+            .name("username")
+            .displayName("Username")
+            .description("The name of the user that is allowed to log in to the FTP server. " +
+                    "If a username is provided, a password must also be provided. " +
+                    "If no username is specified, anonymous connections will be permitted.")
+            .required(false)
+            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+            .build();
+
+    public static final PropertyDescriptor PASSWORD = new PropertyDescriptor.Builder()
+            .name("password")
+            .displayName("Password")
+            .description("If a Username is specified, then a password must also be specified. " +
+                    "The password provided by the client trying to log in to the FTP server will be checked against this password.")
+            .required(false)
+            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+            .sensitive(true)
+            .build();
+
+    private static final List<PropertyDescriptor> PROPERTIES = Collections.unmodifiableList(Arrays.asList(
+            BIND_ADDRESS,
+            PORT,
+            USERNAME,
+            PASSWORD
+    ));
+
+    private static final Set<Relationship> RELATIONSHIPS = Collections.unmodifiableSet(new HashSet<>(Collections.singletonList(
+            RELATIONSHIP_SUCCESS
+    )));
+
+    private volatile NifiFtpServer ftpServer;
+    private volatile CountDownLatch sessionFactorySetSignal;
+    private final AtomicReference<ProcessSessionFactory> sessionFactory = new AtomicReference<>();
+
+    @Override
+    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return PROPERTIES;
+    }
+
+    @Override
+    public Set<Relationship> getRelationships() {
+        return RELATIONSHIPS;
+    }
+
+    @OnScheduled
+    public void startFtpServer(ProcessContext context) {
+        if (ftpServer == null) {
+            String username = context.getProperty(USERNAME).evaluateAttributeExpressions().getValue();
+            String password = context.getProperty(PASSWORD).evaluateAttributeExpressions().getValue();
+            String bindAddress = context.getProperty(BIND_ADDRESS).evaluateAttributeExpressions().getValue();
+            int port = context.getProperty(PORT).evaluateAttributeExpressions().asInteger();
+
+            try {
+                sessionFactorySetSignal = new CountDownLatch(1);
+                ftpServer = new NifiFtpServer.Builder()
+                        .sessionFactory(sessionFactory)
+                        .sessionFactorySetSignal(sessionFactorySetSignal)
+                        .bindAddress(bindAddress)
+                        .port(port)
+                        .username(username)
+                        .password(password)
+                        .build();
+                ftpServer.start();
+            } catch (ProcessException processException) {
+                getLogger().error(processException.getMessage(), processException);
+                stopFtpServer();
+                throw processException;
+            }
+        } else {
+            getLogger().warn("Ftp server already started.");
+        }
+    }
+
+    @OnStopped
+    public void stopFtpServer() {
+        if (ftpServer != null && !ftpServer.isStopped()) {
+            ftpServer.stop();
+        }
+        ftpServer = null;
+    }
+
+    @Override
+    public void onTrigger(ProcessContext context, ProcessSessionFactory sessionFactory) throws ProcessException {
+        if (this.sessionFactory.compareAndSet(null, sessionFactory)) {
+            sessionFactorySetSignal.countDown();
+        }
+        context.yield();
+    }
+
+    @Override
+    protected Collection<ValidationResult> customValidate(ValidationContext context) {
+        List<ValidationResult> results = new ArrayList<>(2);
+        String username = context.getProperty(USERNAME).evaluateAttributeExpressions().getValue();
+        String password = context.getProperty(PASSWORD).evaluateAttributeExpressions().getValue();
+
+        if ((username == null) && (password != null)) {
+            results.add(usernameOrPasswordIsNull(USERNAME, PASSWORD));
+        } else if ((username != null) && (password == null)) {
+            results.add(usernameOrPasswordIsNull(PASSWORD, USERNAME));
+        } else if ((username != null) && (password != null)) {
+            validateAgainstEmptyString(username, USERNAME, results);

Review comment:
       Is not this check already done in the PropertyDescriptors?

##########
File path: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ftp/commands/FtpCommandSTOR.java
##########
@@ -0,0 +1,233 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.standard.ftp.commands;
+
+import org.apache.ftpserver.command.AbstractCommand;
+import org.apache.ftpserver.ftplet.DataConnection;
+import org.apache.ftpserver.ftplet.DataConnectionFactory;
+import org.apache.ftpserver.ftplet.DefaultFtpReply;
+import org.apache.ftpserver.ftplet.FtpException;
+import org.apache.ftpserver.ftplet.FtpFile;
+import org.apache.ftpserver.ftplet.FtpReply;
+import org.apache.ftpserver.ftplet.FtpRequest;
+import org.apache.ftpserver.impl.FtpIoSession;
+import org.apache.ftpserver.impl.FtpServerContext;
+import org.apache.ftpserver.impl.IODataConnectionFactory;
+import org.apache.ftpserver.impl.LocalizedDataTransferFtpReply;
+import org.apache.ftpserver.impl.LocalizedFtpReply;
+import org.apache.ftpserver.impl.ServerFtpStatistics;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.ProcessSessionFactory;
+import org.apache.nifi.processors.standard.ListenFTP;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.net.InetAddress;
+import java.net.SocketException;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicReference;
+
+public class FtpCommandSTOR extends AbstractCommand {
+
+    private static final Logger LOG = LoggerFactory.getLogger(FtpCommandSTOR.class);
+    private final AtomicReference<ProcessSessionFactory> sessionFactory;
+    private final CountDownLatch sessionFactorySetSignal;
+
+    public FtpCommandSTOR(AtomicReference<ProcessSessionFactory> sessionFactory, CountDownLatch sessionFactorySetSignal) {
+        this.sessionFactory = sessionFactory;
+        this.sessionFactorySetSignal = sessionFactorySetSignal;
+    }
+
+    /**
+     * Execute command.

Review comment:
       This comments is unnecessary. I suggest either to remove it or provide more details.

##########
File path: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ftp/NifiFtpServer.java
##########
@@ -0,0 +1,275 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.standard.ftp;
+
+import org.apache.ftpserver.ConnectionConfig;
+import org.apache.ftpserver.ConnectionConfigFactory;
+import org.apache.ftpserver.FtpServer;
+import org.apache.ftpserver.FtpServerConfigurationException;
+import org.apache.ftpserver.FtpServerFactory;
+import org.apache.ftpserver.command.Command;
+import org.apache.ftpserver.command.CommandFactory;
+import org.apache.ftpserver.command.CommandFactoryFactory;
+import org.apache.ftpserver.command.impl.ABOR;
+import org.apache.ftpserver.command.impl.AUTH;
+import org.apache.ftpserver.command.impl.CDUP;
+import org.apache.ftpserver.command.impl.CWD;
+import org.apache.ftpserver.command.impl.EPRT;
+import org.apache.ftpserver.command.impl.EPSV;
+import org.apache.ftpserver.command.impl.FEAT;
+import org.apache.ftpserver.command.impl.LIST;
+import org.apache.ftpserver.command.impl.MDTM;
+import org.apache.ftpserver.command.impl.MKD;
+import org.apache.ftpserver.command.impl.MLSD;
+import org.apache.ftpserver.command.impl.MLST;
+import org.apache.ftpserver.command.impl.MODE;
+import org.apache.ftpserver.command.impl.NLST;
+import org.apache.ftpserver.command.impl.NOOP;
+import org.apache.ftpserver.command.impl.OPTS;
+import org.apache.ftpserver.command.impl.PASS;
+import org.apache.ftpserver.command.impl.PASV;
+import org.apache.ftpserver.command.impl.PBSZ;
+import org.apache.ftpserver.command.impl.PORT;
+import org.apache.ftpserver.command.impl.PROT;
+import org.apache.ftpserver.command.impl.PWD;
+import org.apache.ftpserver.command.impl.QUIT;
+import org.apache.ftpserver.command.impl.REIN;
+import org.apache.ftpserver.command.impl.RMD;
+import org.apache.ftpserver.command.impl.SITE;
+import org.apache.ftpserver.command.impl.SITE_DESCUSER;
+import org.apache.ftpserver.command.impl.SITE_HELP;
+import org.apache.ftpserver.command.impl.SITE_STAT;
+import org.apache.ftpserver.command.impl.SITE_WHO;
+import org.apache.ftpserver.command.impl.SITE_ZONE;
+import org.apache.ftpserver.command.impl.SIZE;
+import org.apache.ftpserver.command.impl.STAT;
+import org.apache.ftpserver.command.impl.STRU;
+import org.apache.ftpserver.command.impl.SYST;
+import org.apache.ftpserver.command.impl.TYPE;
+import org.apache.ftpserver.command.impl.USER;
+import org.apache.ftpserver.ftplet.Authority;
+import org.apache.ftpserver.ftplet.User;
+import org.apache.ftpserver.listener.Listener;
+import org.apache.ftpserver.listener.ListenerFactory;
+import org.apache.ftpserver.usermanager.impl.BaseUser;
+import org.apache.ftpserver.usermanager.impl.WritePermission;
+import org.apache.nifi.processor.ProcessSessionFactory;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processors.standard.ftp.commands.FtpCommandHELP;
+import org.apache.nifi.processors.standard.ftp.commands.FtpCommandSTOR;
+import org.apache.nifi.processors.standard.ftp.commands.NotSupportedCommand;
+import org.apache.nifi.processors.standard.ftp.filesystem.DefaultVirtualFileSystem;
+import org.apache.nifi.processors.standard.ftp.filesystem.VirtualFileSystem;
+import org.apache.nifi.processors.standard.ftp.filesystem.VirtualFileSystemFactory;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicReference;
+
+public class NifiFtpServer {
+
+    private final Map<String, Command> commandMap = new HashMap<>();
+    private final FtpCommandHELP customHelpCommand = new FtpCommandHELP();
+
+    private final FtpServer server;
+    private static final String HOME_DIRECTORY = "/virtual/ftproot";
+
+    private NifiFtpServer(Builder builder) throws ProcessException {
+        try {
+            initializeCommandMap(builder.sessionFactory, builder.sessionFactorySetSignal);
+
+            VirtualFileSystem fileSystem = new DefaultVirtualFileSystem();
+            boolean anonymousLoginEnabled = (builder.username == null);
+
+            FtpServerFactory serverFactory = new FtpServerFactory();
+            serverFactory.setFileSystem(new VirtualFileSystemFactory(fileSystem));
+            serverFactory.setCommandFactory(createCommandFactory(commandMap));
+            serverFactory.setConnectionConfig(createConnectionConfig(anonymousLoginEnabled));
+            serverFactory.addListener("default", createListener(builder.bindAddress, builder.port));
+            if (anonymousLoginEnabled) {
+                serverFactory.getUserManager().save(createAnonymousUser(HOME_DIRECTORY, Collections.singletonList(new WritePermission())));
+            } else {
+                serverFactory.getUserManager().save(createUser(builder.username, builder.password, HOME_DIRECTORY, Collections.singletonList(new WritePermission())));
+            }
+            server = serverFactory.createServer();
+        } catch (Exception exception) {
+            throw new ProcessException("FTP server could not be started.", exception);
+        }
+    }
+
+    public void start() throws ProcessException {
+        try {
+            server.start();
+        } catch (Exception exception) {
+            throw new ProcessException("FTP server could not be started.", exception);
+        }
+    }
+
+    public void stop() {
+        server.stop();
+    }
+
+    public boolean isStopped() {
+        return server.isStopped();
+    }
+
+    private CommandFactory createCommandFactory(Map<String, Command> commandMap) {
+        CommandFactoryFactory commandFactoryFactory = new CommandFactoryFactory();
+        commandFactoryFactory.setUseDefaultCommands(false);
+        commandFactoryFactory.setCommandMap(commandMap);
+        return commandFactoryFactory.createCommandFactory();
+    }
+
+    private ConnectionConfig createConnectionConfig(boolean anonymousLoginEnabled) {
+        ConnectionConfigFactory connectionConfigFactory = new ConnectionConfigFactory();
+        connectionConfigFactory.setAnonymousLoginEnabled(anonymousLoginEnabled);
+        return connectionConfigFactory.createConnectionConfig();
+    }
+
+    private Listener createListener(String bindAddress, int port) throws FtpServerConfigurationException {
+        ListenerFactory listenerFactory = new ListenerFactory();
+        listenerFactory.setServerAddress(bindAddress);
+        listenerFactory.setPort(port);
+        return listenerFactory.createListener();
+    }
+
+    private User createUser(String username, String password, String homeDirectory, List<Authority> authorities) {
+        BaseUser user = new BaseUser();
+        user.setName(username);
+        user.setPassword(password);
+        user.setHomeDirectory(homeDirectory);
+        user.setAuthorities(authorities);
+        return user;
+    }
+
+    private User createAnonymousUser(String homeDirectory, List<Authority> authorities) {
+        BaseUser user = new BaseUser();
+        user.setName("anonymous");
+        user.setHomeDirectory(homeDirectory);
+        user.setAuthorities(authorities);
+        return user;
+    }
+
+    private void initializeCommandMap(AtomicReference<ProcessSessionFactory> sessionFactory, CountDownLatch sessionFactorySetSignal) {
+        addToCommandMap("ABOR", new ABOR());
+        addToCommandMap("ACCT", new NotSupportedCommand("Operation (ACCT) not supported."));
+        addToCommandMap("APPE", new NotSupportedCommand("Operation (APPE) not supported."));
+        addToCommandMap("AUTH", new AUTH());
+        addToCommandMap("CDUP", new CDUP());
+        addToCommandMap("CWD", new CWD());
+        addToCommandMap("DELE", new NotSupportedCommand("Operation (DELE) not supported."));
+        addToCommandMap("EPRT", new EPRT());
+        addToCommandMap("EPSV", new EPSV());
+        addToCommandMap("FEAT", new FEAT());
+        addToCommandMap("HELP", customHelpCommand);
+        addToCommandMap("LIST", new LIST());
+        addToCommandMap("MFMT", new NotSupportedCommand("Operation (MFMT) not supported."));
+        addToCommandMap("MDTM", new MDTM());
+        addToCommandMap("MLST", new MLST());
+        addToCommandMap("MKD", new MKD());
+        addToCommandMap("MLSD", new MLSD());
+        addToCommandMap("MODE", new MODE());
+        addToCommandMap("NLST", new NLST());
+        addToCommandMap("NOOP", new NOOP());
+        addToCommandMap("OPTS", new OPTS());
+        addToCommandMap("PASS", new PASS());
+        addToCommandMap("PASV", new PASV());
+        addToCommandMap("PBSZ", new PBSZ());
+        addToCommandMap("PORT", new PORT());
+        addToCommandMap("PROT", new PROT());
+        addToCommandMap("PWD", new PWD());
+        addToCommandMap("QUIT", new QUIT());
+        addToCommandMap("REIN", new REIN());
+        addToCommandMap("REST", new NotSupportedCommand("Operation (REST) not supported."));
+        addToCommandMap("RETR", new NotSupportedCommand("Operation (RETR) not supported."));
+        addToCommandMap("RMD", new RMD());
+        //addToCommandMap("RNFR", new RNFR());

Review comment:
       Will this be available?

##########
File path: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenFTP.java
##########
@@ -0,0 +1,215 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.standard;
+
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.annotation.lifecycle.OnStopped;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.processor.AbstractSessionFactoryProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSessionFactory;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.processors.standard.ftp.NifiFtpServer;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicReference;
+
+@InputRequirement(Requirement.INPUT_FORBIDDEN)
+@Tags({"ingest", "ftp", "listen"})
+@CapabilityDescription("Starts an FTP Server and listens on a given port to transform incoming files into FlowFiles. "
+        + "The URI of the Service will be ftp://{hostname}:{port}. The default port is 2221.")
+public class ListenFTP extends AbstractSessionFactoryProcessor {
+
+    public static final Relationship RELATIONSHIP_SUCCESS = new Relationship.Builder()
+            .name("success")
+            .description("Relationship for successfully received files")
+            .build();
+
+    public static final PropertyDescriptor BIND_ADDRESS = new PropertyDescriptor.Builder()
+            .name("bind-address")
+            .displayName("Bind Address")
+            .description("The address the FTP server should be bound to. If not provided, the server binds to all available addresses.")
+            .required(false)
+            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .build();
+
+    public static final PropertyDescriptor PORT = new PropertyDescriptor.Builder()
+            .name("listening-port")
+            .displayName("Listening Port")
+            .description("The Port to listen on for incoming connections. On Linux, root privileges are required to use port numbers below 1024.")
+            .required(true)
+            .defaultValue("2221")
+            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .addValidator(StandardValidators.PORT_VALIDATOR)
+            .build();
+
+    public static final PropertyDescriptor USERNAME = new PropertyDescriptor.Builder()
+            .name("username")
+            .displayName("Username")
+            .description("The name of the user that is allowed to log in to the FTP server. " +
+                    "If a username is provided, a password must also be provided. " +
+                    "If no username is specified, anonymous connections will be permitted.")
+            .required(false)
+            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+            .build();
+
+    public static final PropertyDescriptor PASSWORD = new PropertyDescriptor.Builder()
+            .name("password")
+            .displayName("Password")
+            .description("If a Username is specified, then a password must also be specified. " +
+                    "The password provided by the client trying to log in to the FTP server will be checked against this password.")
+            .required(false)
+            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+            .sensitive(true)
+            .build();
+
+    private static final List<PropertyDescriptor> PROPERTIES = Collections.unmodifiableList(Arrays.asList(
+            BIND_ADDRESS,
+            PORT,
+            USERNAME,
+            PASSWORD
+    ));
+
+    private static final Set<Relationship> RELATIONSHIPS = Collections.unmodifiableSet(new HashSet<>(Collections.singletonList(
+            RELATIONSHIP_SUCCESS
+    )));
+
+    private volatile NifiFtpServer ftpServer;
+    private volatile CountDownLatch sessionFactorySetSignal;
+    private final AtomicReference<ProcessSessionFactory> sessionFactory = new AtomicReference<>();
+
+    @Override
+    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return PROPERTIES;
+    }
+
+    @Override
+    public Set<Relationship> getRelationships() {
+        return RELATIONSHIPS;
+    }
+
+    @OnScheduled
+    public void startFtpServer(ProcessContext context) {
+        if (ftpServer == null) {
+            String username = context.getProperty(USERNAME).evaluateAttributeExpressions().getValue();
+            String password = context.getProperty(PASSWORD).evaluateAttributeExpressions().getValue();
+            String bindAddress = context.getProperty(BIND_ADDRESS).evaluateAttributeExpressions().getValue();
+            int port = context.getProperty(PORT).evaluateAttributeExpressions().asInteger();
+
+            try {
+                sessionFactorySetSignal = new CountDownLatch(1);
+                ftpServer = new NifiFtpServer.Builder()
+                        .sessionFactory(sessionFactory)
+                        .sessionFactorySetSignal(sessionFactorySetSignal)
+                        .bindAddress(bindAddress)
+                        .port(port)
+                        .username(username)
+                        .password(password)
+                        .build();
+                ftpServer.start();
+            } catch (ProcessException processException) {
+                getLogger().error(processException.getMessage(), processException);
+                stopFtpServer();
+                throw processException;
+            }
+        } else {
+            getLogger().warn("Ftp server already started.");
+        }
+    }
+
+    @OnStopped
+    public void stopFtpServer() {
+        if (ftpServer != null && !ftpServer.isStopped()) {
+            ftpServer.stop();
+        }
+        ftpServer = null;
+    }
+
+    @Override
+    public void onTrigger(ProcessContext context, ProcessSessionFactory sessionFactory) throws ProcessException {
+        if (this.sessionFactory.compareAndSet(null, sessionFactory)) {
+            sessionFactorySetSignal.countDown();
+        }
+        context.yield();
+    }
+
+    @Override
+    protected Collection<ValidationResult> customValidate(ValidationContext context) {
+        List<ValidationResult> results = new ArrayList<>(2);
+        String username = context.getProperty(USERNAME).evaluateAttributeExpressions().getValue();
+        String password = context.getProperty(PASSWORD).evaluateAttributeExpressions().getValue();
+
+        if ((username == null) && (password != null)) {
+            results.add(usernameOrPasswordIsNull(USERNAME, PASSWORD));
+        } else if ((username != null) && (password == null)) {
+            results.add(usernameOrPasswordIsNull(PASSWORD, USERNAME));
+        } else if ((username != null) && (password != null)) {
+            validateAgainstEmptyString(username, USERNAME, results);
+            validateAgainstEmptyString(password, PASSWORD, results);
+        }
+        return results;
+    }
+
+    private ValidationResult usernameOrPasswordIsNull(PropertyDescriptor nullProperty, PropertyDescriptor nonNullProperty) {

Review comment:
       I think this is misleading (username and password in variable order). It would be sufficient to mention that both should be provided or not.

##########
File path: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ftp/commands/FtpCommandSTOR.java
##########
@@ -0,0 +1,233 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.standard.ftp.commands;
+
+import org.apache.ftpserver.command.AbstractCommand;
+import org.apache.ftpserver.ftplet.DataConnection;
+import org.apache.ftpserver.ftplet.DataConnectionFactory;
+import org.apache.ftpserver.ftplet.DefaultFtpReply;
+import org.apache.ftpserver.ftplet.FtpException;
+import org.apache.ftpserver.ftplet.FtpFile;
+import org.apache.ftpserver.ftplet.FtpReply;
+import org.apache.ftpserver.ftplet.FtpRequest;
+import org.apache.ftpserver.impl.FtpIoSession;
+import org.apache.ftpserver.impl.FtpServerContext;
+import org.apache.ftpserver.impl.IODataConnectionFactory;
+import org.apache.ftpserver.impl.LocalizedDataTransferFtpReply;
+import org.apache.ftpserver.impl.LocalizedFtpReply;
+import org.apache.ftpserver.impl.ServerFtpStatistics;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.ProcessSessionFactory;
+import org.apache.nifi.processors.standard.ListenFTP;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.net.InetAddress;
+import java.net.SocketException;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicReference;
+
+public class FtpCommandSTOR extends AbstractCommand {
+
+    private static final Logger LOG = LoggerFactory.getLogger(FtpCommandSTOR.class);
+    private final AtomicReference<ProcessSessionFactory> sessionFactory;
+    private final CountDownLatch sessionFactorySetSignal;
+
+    public FtpCommandSTOR(AtomicReference<ProcessSessionFactory> sessionFactory, CountDownLatch sessionFactorySetSignal) {
+        this.sessionFactory = sessionFactory;
+        this.sessionFactorySetSignal = sessionFactorySetSignal;
+    }
+
+    /**
+     * Execute command.
+     */
+    public void execute(final FtpIoSession ftpSession, final FtpServerContext context, final FtpRequest request) {
+        try {
+            executeCommand(ftpSession, context, request);
+        } catch (FtpCommandException ftpCommandException) {
+            if (ftpCommandException.getSubId() == null) {
+                ftpSession.write(new DefaultFtpReply(ftpCommandException.getFtpReturnCode(), ftpCommandException.getBasicMessage()));
+            } else {
+                ftpSession.write(LocalizedDataTransferFtpReply.translate(ftpSession, request, context,
+                        ftpCommandException.getFtpReturnCode(),
+                        ftpCommandException.getSubId(),
+                        ftpCommandException.getBasicMessage(),
+                        ftpCommandException.getFtpFile()));
+            }
+        } finally {
+            ftpSession.resetState();
+            ftpSession.getDataConnection().closeDataConnection();
+        }
+    }
+
+    private void executeCommand(FtpIoSession ftpSession, FtpServerContext context, FtpRequest request)
+            throws FtpCommandException {
+
+        final String fileName = getArgument(request);
+
+        checkDataConnection(ftpSession);
+
+        final FtpFile ftpFile = getFtpFile(ftpSession, fileName);
+
+        checkWritePermission(ftpFile);
+
+        sendReturnCode150(ftpSession, context, request, ftpFile.getAbsolutePath());
+
+        final DataConnection dataConnection = openDataConnection(ftpSession, ftpFile);
+
+        transferData(dataConnection, ftpSession, context, request, ftpFile);
+    }
+
+    private String getArgument(final FtpRequest request) throws FtpCommandException {
+        final String argument = request.getArgument();
+        if (argument == null) {
+            throw new FtpCommandException(FtpReply.REPLY_501_SYNTAX_ERROR_IN_PARAMETERS_OR_ARGUMENTS, "STOR", null, null);
+        }
+        return argument;
+    }
+
+    private void checkDataConnection(final FtpIoSession ftpSession) throws FtpCommandException {
+        DataConnectionFactory dataConnectionFactory = ftpSession.getDataConnection();
+        if (dataConnectionFactory instanceof IODataConnectionFactory) {
+            InetAddress address = ((IODataConnectionFactory) dataConnectionFactory)
+                    .getInetAddress();
+            if (address == null) {
+                throw new FtpCommandException(FtpReply.REPLY_503_BAD_SEQUENCE_OF_COMMANDS, null, "PORT or PASV must be issued first", null);
+            }
+        }
+    }
+
+    private FtpFile getFtpFile(final FtpIoSession ftpSession, final String fileName) throws FtpCommandException {
+        FtpFile ftpFile = null;
+        try {
+            ftpFile = ftpSession.getFileSystemView().getFile(fileName);
+        } catch (FtpException e) {
+            LOG.error("Exception getting file object", e);
+        }
+        if (ftpFile == null) {
+            throw new FtpCommandException(FtpReply.REPLY_550_REQUESTED_ACTION_NOT_TAKEN, "STOR.invalid", fileName, ftpFile);
+        }
+        return ftpFile;
+    }
+
+    private void checkWritePermission(final FtpFile ftpFile) throws FtpCommandException {
+        if (!ftpFile.isWritable()) {
+            throw new FtpCommandException(FtpReply.REPLY_550_REQUESTED_ACTION_NOT_TAKEN, "STOR.permission", ftpFile.getAbsolutePath(), ftpFile);
+        }
+    }
+
+    private void sendReturnCode150(final FtpIoSession ftpSession, final FtpServerContext context, final FtpRequest request, final String fileAbsolutePath) {
+        ftpSession.write(LocalizedFtpReply.translate(ftpSession, request, context,
+                FtpReply.REPLY_150_FILE_STATUS_OKAY,
+                "STOR",
+                fileAbsolutePath)).awaitUninterruptibly(10000);
+    }
+
+    private DataConnection openDataConnection(final FtpIoSession ftpSession, final FtpFile ftpFile) throws FtpCommandException {
+        final DataConnection dataConnection;
+        try {
+            dataConnection = ftpSession.getDataConnection().openConnection();
+        } catch (Exception exception) {
+            LOG.error("Exception getting the input data stream", exception);
+            throw new FtpCommandException(FtpReply.REPLY_425_CANT_OPEN_DATA_CONNECTION,
+                    "STOR",
+                    ftpFile.getAbsolutePath(),
+                    ftpFile);
+        }
+        return dataConnection;
+    }
+
+    private void transferData(final DataConnection dataConnection, final FtpIoSession ftpSession,
+                              final FtpServerContext context, final FtpRequest request, final FtpFile ftpFile)
+            throws FtpCommandException {
+
+        final ProcessSession processSession;
+        try {
+            processSession = createProcessSession();
+        } catch (InterruptedException|TimeoutException exception) {
+            LOG.error("ProcessSession could not be acquired, command STOR aborted.", exception);
+            throw new FtpCommandException(FtpReply.REPLY_425_CANT_OPEN_DATA_CONNECTION, null, "File transfer failed.", null);
+        }
+        FlowFile flowFile = processSession.create();
+        long transferredBytes = 0L;
+        try (OutputStream flowFileOutputStream = processSession.write(flowFile)) {
+            transferredBytes = dataConnection.transferFromClient(ftpSession.getFtpletSession(), flowFileOutputStream);
+            LOG.info("File received {}", ftpFile.getAbsolutePath());
+        } catch (SocketException socketException) {
+            LOG.error("Socket exception during data transfer", socketException);
+            processSession.rollback();
+            throw new FtpCommandException(FtpReply.REPLY_426_CONNECTION_CLOSED_TRANSFER_ABORTED,
+                    "STOR",
+                    ftpFile.getAbsolutePath(),
+                    ftpFile);
+        } catch (IOException ioException) {
+            LOG.error("IOException during data transfer", ioException);
+            processSession.rollback();
+            throw new FtpCommandException(FtpReply.REPLY_551_REQUESTED_ACTION_ABORTED_PAGE_TYPE_UNKNOWN,
+                    "STOR",
+                    ftpFile.getAbsolutePath(),
+                    ftpFile);
+        }
+
+        try {
+            // notify the statistics component
+            ServerFtpStatistics ftpStat = (ServerFtpStatistics) context.getFtpStatistics();
+            ftpStat.setUpload(ftpSession, ftpFile, transferredBytes);
+
+            processSession.putAttribute(flowFile, CoreAttributes.FILENAME.key(), ftpFile.getName());
+            processSession.putAttribute(flowFile, CoreAttributes.PATH.key(), getPath(ftpFile));
+
+            processSession.getProvenanceReporter().modifyContent(flowFile);
+
+            processSession.transfer(flowFile, ListenFTP.RELATIONSHIP_SUCCESS);

Review comment:
       This creates a circular dependency on class level between ListenFtp >>> NiFiFtpServer >>> FtpCommandSTOR >>> ListenFtp. It would be better to provide a Relationship instance via constructor / setter which would happened to be this instance

##########
File path: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ftp/NifiFtpServer.java
##########
@@ -0,0 +1,275 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.standard.ftp;
+
+import org.apache.ftpserver.ConnectionConfig;
+import org.apache.ftpserver.ConnectionConfigFactory;
+import org.apache.ftpserver.FtpServer;
+import org.apache.ftpserver.FtpServerConfigurationException;
+import org.apache.ftpserver.FtpServerFactory;
+import org.apache.ftpserver.command.Command;
+import org.apache.ftpserver.command.CommandFactory;
+import org.apache.ftpserver.command.CommandFactoryFactory;
+import org.apache.ftpserver.command.impl.ABOR;
+import org.apache.ftpserver.command.impl.AUTH;
+import org.apache.ftpserver.command.impl.CDUP;
+import org.apache.ftpserver.command.impl.CWD;
+import org.apache.ftpserver.command.impl.EPRT;
+import org.apache.ftpserver.command.impl.EPSV;
+import org.apache.ftpserver.command.impl.FEAT;
+import org.apache.ftpserver.command.impl.LIST;
+import org.apache.ftpserver.command.impl.MDTM;
+import org.apache.ftpserver.command.impl.MKD;
+import org.apache.ftpserver.command.impl.MLSD;
+import org.apache.ftpserver.command.impl.MLST;
+import org.apache.ftpserver.command.impl.MODE;
+import org.apache.ftpserver.command.impl.NLST;
+import org.apache.ftpserver.command.impl.NOOP;
+import org.apache.ftpserver.command.impl.OPTS;
+import org.apache.ftpserver.command.impl.PASS;
+import org.apache.ftpserver.command.impl.PASV;
+import org.apache.ftpserver.command.impl.PBSZ;
+import org.apache.ftpserver.command.impl.PORT;
+import org.apache.ftpserver.command.impl.PROT;
+import org.apache.ftpserver.command.impl.PWD;
+import org.apache.ftpserver.command.impl.QUIT;
+import org.apache.ftpserver.command.impl.REIN;
+import org.apache.ftpserver.command.impl.RMD;
+import org.apache.ftpserver.command.impl.SITE;
+import org.apache.ftpserver.command.impl.SITE_DESCUSER;
+import org.apache.ftpserver.command.impl.SITE_HELP;
+import org.apache.ftpserver.command.impl.SITE_STAT;
+import org.apache.ftpserver.command.impl.SITE_WHO;
+import org.apache.ftpserver.command.impl.SITE_ZONE;
+import org.apache.ftpserver.command.impl.SIZE;
+import org.apache.ftpserver.command.impl.STAT;
+import org.apache.ftpserver.command.impl.STRU;
+import org.apache.ftpserver.command.impl.SYST;
+import org.apache.ftpserver.command.impl.TYPE;
+import org.apache.ftpserver.command.impl.USER;
+import org.apache.ftpserver.ftplet.Authority;
+import org.apache.ftpserver.ftplet.User;
+import org.apache.ftpserver.listener.Listener;
+import org.apache.ftpserver.listener.ListenerFactory;
+import org.apache.ftpserver.usermanager.impl.BaseUser;
+import org.apache.ftpserver.usermanager.impl.WritePermission;
+import org.apache.nifi.processor.ProcessSessionFactory;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processors.standard.ftp.commands.FtpCommandHELP;
+import org.apache.nifi.processors.standard.ftp.commands.FtpCommandSTOR;
+import org.apache.nifi.processors.standard.ftp.commands.NotSupportedCommand;
+import org.apache.nifi.processors.standard.ftp.filesystem.DefaultVirtualFileSystem;
+import org.apache.nifi.processors.standard.ftp.filesystem.VirtualFileSystem;
+import org.apache.nifi.processors.standard.ftp.filesystem.VirtualFileSystemFactory;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicReference;
+
+public class NifiFtpServer {
+
+    private final Map<String, Command> commandMap = new HashMap<>();
+    private final FtpCommandHELP customHelpCommand = new FtpCommandHELP();
+
+    private final FtpServer server;

Review comment:
       It might be my personal preference but I think, this class has too many responsibilities. Two things might be extracted:
   - Creation of the FtpServer instance
   - The semi-declarative list of available commands
   
   I think, it would be more readable if only the "business logic" would still remain, all the wiring/building would be moved in other place(s)

##########
File path: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ftp/commands/FtpCommandHELP.java
##########
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.standard.ftp.commands;
+
+import org.apache.ftpserver.command.AbstractCommand;
+import org.apache.ftpserver.ftplet.DefaultFtpReply;
+import org.apache.ftpserver.ftplet.FtpReply;
+import org.apache.ftpserver.ftplet.FtpRequest;
+import org.apache.ftpserver.impl.FtpIoSession;
+import org.apache.ftpserver.impl.FtpServerContext;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeSet;
+
+public class FtpCommandHELP extends AbstractCommand {
+
+    private static Map<String, String> COMMAND_SPECIFIC_HELP;
+    private Set<String> availableCommands = new TreeSet<>();
+
+    static {
+        Map<String, String> commands = new HashMap<>();

Review comment:
       I really find this duplication of the command list unnecessary. I think a more elegant way to do this would be: adding a 'String getHelpText()' method to the AbstractCommand and when implementing, return these strings. Then provide the map reference to the help. (Support might be flagged as well)

##########
File path: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ftp/NifiFtpServer.java
##########
@@ -0,0 +1,275 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.standard.ftp;
+
+import org.apache.ftpserver.ConnectionConfig;
+import org.apache.ftpserver.ConnectionConfigFactory;
+import org.apache.ftpserver.FtpServer;
+import org.apache.ftpserver.FtpServerConfigurationException;
+import org.apache.ftpserver.FtpServerFactory;
+import org.apache.ftpserver.command.Command;
+import org.apache.ftpserver.command.CommandFactory;
+import org.apache.ftpserver.command.CommandFactoryFactory;
+import org.apache.ftpserver.command.impl.ABOR;
+import org.apache.ftpserver.command.impl.AUTH;
+import org.apache.ftpserver.command.impl.CDUP;
+import org.apache.ftpserver.command.impl.CWD;
+import org.apache.ftpserver.command.impl.EPRT;
+import org.apache.ftpserver.command.impl.EPSV;
+import org.apache.ftpserver.command.impl.FEAT;
+import org.apache.ftpserver.command.impl.LIST;
+import org.apache.ftpserver.command.impl.MDTM;
+import org.apache.ftpserver.command.impl.MKD;
+import org.apache.ftpserver.command.impl.MLSD;
+import org.apache.ftpserver.command.impl.MLST;
+import org.apache.ftpserver.command.impl.MODE;
+import org.apache.ftpserver.command.impl.NLST;
+import org.apache.ftpserver.command.impl.NOOP;
+import org.apache.ftpserver.command.impl.OPTS;
+import org.apache.ftpserver.command.impl.PASS;
+import org.apache.ftpserver.command.impl.PASV;
+import org.apache.ftpserver.command.impl.PBSZ;
+import org.apache.ftpserver.command.impl.PORT;
+import org.apache.ftpserver.command.impl.PROT;
+import org.apache.ftpserver.command.impl.PWD;
+import org.apache.ftpserver.command.impl.QUIT;
+import org.apache.ftpserver.command.impl.REIN;
+import org.apache.ftpserver.command.impl.RMD;
+import org.apache.ftpserver.command.impl.SITE;
+import org.apache.ftpserver.command.impl.SITE_DESCUSER;
+import org.apache.ftpserver.command.impl.SITE_HELP;
+import org.apache.ftpserver.command.impl.SITE_STAT;
+import org.apache.ftpserver.command.impl.SITE_WHO;
+import org.apache.ftpserver.command.impl.SITE_ZONE;
+import org.apache.ftpserver.command.impl.SIZE;
+import org.apache.ftpserver.command.impl.STAT;
+import org.apache.ftpserver.command.impl.STRU;
+import org.apache.ftpserver.command.impl.SYST;
+import org.apache.ftpserver.command.impl.TYPE;
+import org.apache.ftpserver.command.impl.USER;
+import org.apache.ftpserver.ftplet.Authority;
+import org.apache.ftpserver.ftplet.User;
+import org.apache.ftpserver.listener.Listener;
+import org.apache.ftpserver.listener.ListenerFactory;
+import org.apache.ftpserver.usermanager.impl.BaseUser;
+import org.apache.ftpserver.usermanager.impl.WritePermission;
+import org.apache.nifi.processor.ProcessSessionFactory;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processors.standard.ftp.commands.FtpCommandHELP;
+import org.apache.nifi.processors.standard.ftp.commands.FtpCommandSTOR;
+import org.apache.nifi.processors.standard.ftp.commands.NotSupportedCommand;
+import org.apache.nifi.processors.standard.ftp.filesystem.DefaultVirtualFileSystem;
+import org.apache.nifi.processors.standard.ftp.filesystem.VirtualFileSystem;
+import org.apache.nifi.processors.standard.ftp.filesystem.VirtualFileSystemFactory;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicReference;
+
+public class NifiFtpServer {
+
+    private final Map<String, Command> commandMap = new HashMap<>();
+    private final FtpCommandHELP customHelpCommand = new FtpCommandHELP();
+
+    private final FtpServer server;
+    private static final String HOME_DIRECTORY = "/virtual/ftproot";
+
+    private NifiFtpServer(Builder builder) throws ProcessException {
+        try {
+            initializeCommandMap(builder.sessionFactory, builder.sessionFactorySetSignal);
+
+            VirtualFileSystem fileSystem = new DefaultVirtualFileSystem();

Review comment:
       Why not wrap this into the VirtualFileSystemFactory?

##########
File path: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ftp/commands/FtpCommandException.java
##########
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.standard.ftp.commands;
+
+import org.apache.ftpserver.ftplet.FtpFile;
+
+public class FtpCommandException extends Exception {
+
+    private final int ftpReturnCode;
+    private final String subId;
+    private final String basicMessage;
+    private final FtpFile ftpFile;
+
+    public FtpCommandException(int ftpReturnCode, String subId, String basicMessage, FtpFile ftpFile) {
+        super(subId);
+        this.ftpReturnCode = ftpReturnCode;
+        this.subId = subId;
+        this.basicMessage = basicMessage;

Review comment:
       You could hook on Exception's message attribute instead of introducing a new one (in case, it's not utilised elsewhere). The primary advantage is that, you would be able to properly depend on the toString from Throwable.

##########
File path: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ftp/NifiFtpServer.java
##########
@@ -0,0 +1,275 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.standard.ftp;
+
+import org.apache.ftpserver.ConnectionConfig;
+import org.apache.ftpserver.ConnectionConfigFactory;
+import org.apache.ftpserver.FtpServer;
+import org.apache.ftpserver.FtpServerConfigurationException;
+import org.apache.ftpserver.FtpServerFactory;
+import org.apache.ftpserver.command.Command;
+import org.apache.ftpserver.command.CommandFactory;
+import org.apache.ftpserver.command.CommandFactoryFactory;
+import org.apache.ftpserver.command.impl.ABOR;
+import org.apache.ftpserver.command.impl.AUTH;
+import org.apache.ftpserver.command.impl.CDUP;
+import org.apache.ftpserver.command.impl.CWD;
+import org.apache.ftpserver.command.impl.EPRT;
+import org.apache.ftpserver.command.impl.EPSV;
+import org.apache.ftpserver.command.impl.FEAT;
+import org.apache.ftpserver.command.impl.LIST;
+import org.apache.ftpserver.command.impl.MDTM;
+import org.apache.ftpserver.command.impl.MKD;
+import org.apache.ftpserver.command.impl.MLSD;
+import org.apache.ftpserver.command.impl.MLST;
+import org.apache.ftpserver.command.impl.MODE;
+import org.apache.ftpserver.command.impl.NLST;
+import org.apache.ftpserver.command.impl.NOOP;
+import org.apache.ftpserver.command.impl.OPTS;
+import org.apache.ftpserver.command.impl.PASS;
+import org.apache.ftpserver.command.impl.PASV;
+import org.apache.ftpserver.command.impl.PBSZ;
+import org.apache.ftpserver.command.impl.PORT;
+import org.apache.ftpserver.command.impl.PROT;
+import org.apache.ftpserver.command.impl.PWD;
+import org.apache.ftpserver.command.impl.QUIT;
+import org.apache.ftpserver.command.impl.REIN;
+import org.apache.ftpserver.command.impl.RMD;
+import org.apache.ftpserver.command.impl.SITE;
+import org.apache.ftpserver.command.impl.SITE_DESCUSER;
+import org.apache.ftpserver.command.impl.SITE_HELP;
+import org.apache.ftpserver.command.impl.SITE_STAT;
+import org.apache.ftpserver.command.impl.SITE_WHO;
+import org.apache.ftpserver.command.impl.SITE_ZONE;
+import org.apache.ftpserver.command.impl.SIZE;
+import org.apache.ftpserver.command.impl.STAT;
+import org.apache.ftpserver.command.impl.STRU;
+import org.apache.ftpserver.command.impl.SYST;
+import org.apache.ftpserver.command.impl.TYPE;
+import org.apache.ftpserver.command.impl.USER;
+import org.apache.ftpserver.ftplet.Authority;
+import org.apache.ftpserver.ftplet.User;
+import org.apache.ftpserver.listener.Listener;
+import org.apache.ftpserver.listener.ListenerFactory;
+import org.apache.ftpserver.usermanager.impl.BaseUser;
+import org.apache.ftpserver.usermanager.impl.WritePermission;
+import org.apache.nifi.processor.ProcessSessionFactory;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processors.standard.ftp.commands.FtpCommandHELP;
+import org.apache.nifi.processors.standard.ftp.commands.FtpCommandSTOR;
+import org.apache.nifi.processors.standard.ftp.commands.NotSupportedCommand;
+import org.apache.nifi.processors.standard.ftp.filesystem.DefaultVirtualFileSystem;
+import org.apache.nifi.processors.standard.ftp.filesystem.VirtualFileSystem;
+import org.apache.nifi.processors.standard.ftp.filesystem.VirtualFileSystemFactory;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicReference;
+
+public class NifiFtpServer {
+
+    private final Map<String, Command> commandMap = new HashMap<>();
+    private final FtpCommandHELP customHelpCommand = new FtpCommandHELP();
+
+    private final FtpServer server;
+    private static final String HOME_DIRECTORY = "/virtual/ftproot";
+
+    private NifiFtpServer(Builder builder) throws ProcessException {
+        try {
+            initializeCommandMap(builder.sessionFactory, builder.sessionFactorySetSignal);
+
+            VirtualFileSystem fileSystem = new DefaultVirtualFileSystem();
+            boolean anonymousLoginEnabled = (builder.username == null);
+
+            FtpServerFactory serverFactory = new FtpServerFactory();
+            serverFactory.setFileSystem(new VirtualFileSystemFactory(fileSystem));
+            serverFactory.setCommandFactory(createCommandFactory(commandMap));
+            serverFactory.setConnectionConfig(createConnectionConfig(anonymousLoginEnabled));
+            serverFactory.addListener("default", createListener(builder.bindAddress, builder.port));
+            if (anonymousLoginEnabled) {
+                serverFactory.getUserManager().save(createAnonymousUser(HOME_DIRECTORY, Collections.singletonList(new WritePermission())));
+            } else {
+                serverFactory.getUserManager().save(createUser(builder.username, builder.password, HOME_DIRECTORY, Collections.singletonList(new WritePermission())));
+            }
+            server = serverFactory.createServer();
+        } catch (Exception exception) {
+            throw new ProcessException("FTP server could not be started.", exception);
+        }
+    }
+
+    public void start() throws ProcessException {
+        try {
+            server.start();
+        } catch (Exception exception) {
+            throw new ProcessException("FTP server could not be started.", exception);
+        }
+    }
+
+    public void stop() {
+        server.stop();
+    }
+
+    public boolean isStopped() {
+        return server.isStopped();
+    }
+
+    private CommandFactory createCommandFactory(Map<String, Command> commandMap) {
+        CommandFactoryFactory commandFactoryFactory = new CommandFactoryFactory();
+        commandFactoryFactory.setUseDefaultCommands(false);
+        commandFactoryFactory.setCommandMap(commandMap);
+        return commandFactoryFactory.createCommandFactory();
+    }
+
+    private ConnectionConfig createConnectionConfig(boolean anonymousLoginEnabled) {
+        ConnectionConfigFactory connectionConfigFactory = new ConnectionConfigFactory();
+        connectionConfigFactory.setAnonymousLoginEnabled(anonymousLoginEnabled);
+        return connectionConfigFactory.createConnectionConfig();
+    }
+
+    private Listener createListener(String bindAddress, int port) throws FtpServerConfigurationException {
+        ListenerFactory listenerFactory = new ListenerFactory();
+        listenerFactory.setServerAddress(bindAddress);
+        listenerFactory.setPort(port);
+        return listenerFactory.createListener();
+    }
+
+    private User createUser(String username, String password, String homeDirectory, List<Authority> authorities) {
+        BaseUser user = new BaseUser();
+        user.setName(username);
+        user.setPassword(password);
+        user.setHomeDirectory(homeDirectory);
+        user.setAuthorities(authorities);
+        return user;
+    }
+
+    private User createAnonymousUser(String homeDirectory, List<Authority> authorities) {
+        BaseUser user = new BaseUser();
+        user.setName("anonymous");
+        user.setHomeDirectory(homeDirectory);
+        user.setAuthorities(authorities);
+        return user;
+    }
+
+    private void initializeCommandMap(AtomicReference<ProcessSessionFactory> sessionFactory, CountDownLatch sessionFactorySetSignal) {
+        addToCommandMap("ABOR", new ABOR());
+        addToCommandMap("ACCT", new NotSupportedCommand("Operation (ACCT) not supported."));
+        addToCommandMap("APPE", new NotSupportedCommand("Operation (APPE) not supported."));
+        addToCommandMap("AUTH", new AUTH());
+        addToCommandMap("CDUP", new CDUP());
+        addToCommandMap("CWD", new CWD());
+        addToCommandMap("DELE", new NotSupportedCommand("Operation (DELE) not supported."));
+        addToCommandMap("EPRT", new EPRT());
+        addToCommandMap("EPSV", new EPSV());
+        addToCommandMap("FEAT", new FEAT());
+        addToCommandMap("HELP", customHelpCommand);
+        addToCommandMap("LIST", new LIST());
+        addToCommandMap("MFMT", new NotSupportedCommand("Operation (MFMT) not supported."));
+        addToCommandMap("MDTM", new MDTM());
+        addToCommandMap("MLST", new MLST());
+        addToCommandMap("MKD", new MKD());
+        addToCommandMap("MLSD", new MLSD());
+        addToCommandMap("MODE", new MODE());
+        addToCommandMap("NLST", new NLST());
+        addToCommandMap("NOOP", new NOOP());
+        addToCommandMap("OPTS", new OPTS());
+        addToCommandMap("PASS", new PASS());
+        addToCommandMap("PASV", new PASV());
+        addToCommandMap("PBSZ", new PBSZ());
+        addToCommandMap("PORT", new PORT());
+        addToCommandMap("PROT", new PROT());
+        addToCommandMap("PWD", new PWD());
+        addToCommandMap("QUIT", new QUIT());
+        addToCommandMap("REIN", new REIN());
+        addToCommandMap("REST", new NotSupportedCommand("Operation (REST) not supported."));
+        addToCommandMap("RETR", new NotSupportedCommand("Operation (RETR) not supported."));
+        addToCommandMap("RMD", new RMD());
+        //addToCommandMap("RNFR", new RNFR());
+        //addToCommandMap("RNTO", new RNTO());
+        addToCommandMap("SITE", new SITE());
+        addToCommandMap("SIZE", new SIZE());
+        addToCommandMap("SITE_DESCUSER", new SITE_DESCUSER());
+        addToCommandMap("SITE_HELP", new SITE_HELP());
+        addToCommandMap("SITE_STAT", new SITE_STAT());
+        addToCommandMap("SITE_WHO", new SITE_WHO());
+        addToCommandMap("SITE_ZONE", new SITE_ZONE());
+
+        addToCommandMap("STAT", new STAT());
+        addToCommandMap("STOR", new FtpCommandSTOR(sessionFactory, sessionFactorySetSignal));
+        addToCommandMap("STOU", new FtpCommandSTOR(sessionFactory, sessionFactorySetSignal));

Review comment:
       If this deliberately uses STOR instead of STOU, please leave a comment in the code for avoid later confusion

##########
File path: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ftp/commands/FtpCommandHELP.java
##########
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.standard.ftp.commands;
+
+import org.apache.ftpserver.command.AbstractCommand;
+import org.apache.ftpserver.ftplet.DefaultFtpReply;
+import org.apache.ftpserver.ftplet.FtpReply;
+import org.apache.ftpserver.ftplet.FtpRequest;
+import org.apache.ftpserver.impl.FtpIoSession;
+import org.apache.ftpserver.impl.FtpServerContext;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeSet;
+
+public class FtpCommandHELP extends AbstractCommand {
+
+    private static Map<String, String> COMMAND_SPECIFIC_HELP;
+    private Set<String> availableCommands = new TreeSet<>();
+
+    static {
+        Map<String, String> commands = new HashMap<>();
+        commands.put("ABOR", "Syntax: ABOR");
+        commands.put("APPE", "Syntax: APPE <sp> <pathname>");
+        commands.put("AUTH", "Syntax: AUTH <sp> <security_mechanism>");
+        commands.put("CDUP", "Syntax: CDUP");
+        commands.put("CWD", "Syntax: CWD <sp> <pathname>");
+        commands.put("DELE", "Syntax: DELE <sp> <pathname>");
+        commands.put("EPRT", "Syntax: EPRT<space><d><net-prt><d><net-addr><d><tcp-port><d>");
+        commands.put("EPSV", "Syntax: EPSV");
+        commands.put("FEAT", "Syntax: FEAT");
+        commands.put("HELP", "Syntax: HELP [<sp> <string>]");
+        commands.put("LIST", "Syntax: LIST [<sp> <pathname>]");
+        commands.put("MDTM", "Syntax: MDTM <sp> <pathname>");
+        commands.put("MKD", "Syntax: MKD <sp> <pathname>");
+        commands.put("MLSD", "Syntax: MLSD [<sp> <pathname>]");
+        commands.put("MLST", "Syntax: MLST [<sp> <pathname>]");
+        commands.put("MODE", "Syntax: MODE <sp> <mode-code>");
+        commands.put("NLST", "Syntax: NLST [<sp> <pathname>]");
+        commands.put("NOOP", "Syntax: NOOP");
+        commands.put("OPTS", "Syntax: OPTS <sp> <options>");
+        commands.put("PASS", "Syntax: PASS <sp> <password>");
+        commands.put("PASV", "Syntax: PASV");
+        commands.put("PBSZ", "Syntax: PBSZ <sp> <buffer_size>");
+        commands.put("PORT", "Syntax: PORT <sp> <host-port>");
+        commands.put("PROT", "Syntax: PROT <sp> <protection_level>");
+        commands.put("PWD", "Syntax: PWD");
+        commands.put("QUIT", "Syntax: QUIT");
+        commands.put("REIN", "Syntax: REIN");
+        commands.put("REST", "Syntax: REST <sp> <marker>");
+        commands.put("RETR", "Syntax: RETR <sp> <pathname>");
+        commands.put("RMD", "Syntax: RMD <sp> <pathname>");
+        commands.put("RNFR", "Syntax: RNFR <sp> <pathname>");
+        commands.put("RNTO", "Syntax: RNTO <sp> <pathname>");
+        commands.put("SITE", "Syntax: SITE <sp> <string>");
+        commands.put("SIZE", "Syntax: SIZE <sp> <pathname>");
+        commands.put("STAT", "Syntax: STAT [<sp> <pathname>]");
+        commands.put("STOR", "Syntax: STOR <sp> <pathname>");
+        commands.put("STOU", "Syntax: STOU");
+        commands.put("SYST", "Syntax: SYST");
+        commands.put("TYPE", "Syntax: TYPE <sp> <type-code>");
+        commands.put("USER", "Syntax: USER <sp> <username>");
+        COMMAND_SPECIFIC_HELP = Collections.unmodifiableMap(commands);
+    }
+
+    public void addCommand(String command) {
+        if (!command.startsWith("SITE_")) { // Parameterized commands of SITE will not appear in the general help.
+            availableCommands.add(command);
+        }
+    }
+
+    /**
+     * Execute command.
+     */
+    public void execute(final FtpIoSession session,
+                        final FtpServerContext context, final FtpRequest request) {
+
+        // reset state variables
+        session.resetState();
+
+        if (!request.hasArgument()) {
+            sendDefaultHelpMessage(session);
+        } else {
+            handleRequestWithArgument(session, request);
+        }
+    }
+
+    private void sendDefaultHelpMessage(FtpIoSession session) {
+        sendCustomHelpMessage(session, getDefaultHelpMessage());
+    }
+
+    private String getDefaultHelpMessage() {
+        StringBuffer helpMessage = new StringBuffer("The following commands are supported.\n");
+        int currentNumberOfCommandsInARow = 0;
+        int maxNumberOfCommandsInARow = 5;

Review comment:
       Minor: constants like this might be real constants

##########
File path: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ftp/filesystem/DefaultVirtualFileSystem.java
##########
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.standard.ftp.filesystem;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+public class DefaultVirtualFileSystem implements VirtualFileSystem {
+
+    private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
+    private final List<VirtualPath> existingPaths;
+
+    public DefaultVirtualFileSystem() {
+        existingPaths = new ArrayList<>();

Review comment:
       Minor: why not creating during declaration?

##########
File path: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ftp/filesystem/VirtualFileSystemView.java
##########
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.standard.ftp.filesystem;
+
+import org.apache.ftpserver.ftplet.FileSystemView;
+import org.apache.ftpserver.ftplet.FtpException;
+import org.apache.ftpserver.ftplet.FtpFile;
+import org.apache.ftpserver.ftplet.User;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class VirtualFileSystemView implements FileSystemView {
+
+    private static final Logger LOG = LoggerFactory.getLogger(VirtualFileSystemView.class);
+    private final VirtualFileSystem fileSystem;
+    private VirtualPath currentDirectory = VirtualFileSystem.ROOT;
+
+    public VirtualFileSystemView(User user, VirtualFileSystem fileSystem) throws IllegalArgumentException {
+        if (user == null || fileSystem == null) {
+            throw new IllegalArgumentException("User and filesystem cannot be null.");
+        } else {
+            LOG.info("Virtual filesystem view created for user \"{}\"", user.getName());
+            this.fileSystem = fileSystem;
+        }
+    }
+
+    @Override
+    public FtpFile getHomeDirectory() {
+        return new VirtualFtpFile(VirtualFileSystem.ROOT, fileSystem);
+    }
+
+    @Override
+    public FtpFile getWorkingDirectory() {
+        return new VirtualFtpFile(currentDirectory, fileSystem);
+    }
+
+    @Override
+    public boolean changeWorkingDirectory(String targetPath) {
+        VirtualPath targetDirectory = currentDirectory.resolve(targetPath);
+        if (fileSystem.exists(targetDirectory)) {
+            currentDirectory = targetDirectory;
+            return true;
+        } else {
+            return false;
+        }
+    }
+
+    @Override
+    public FtpFile getFile(String fileName) throws FtpException {
+        VirtualPath filePath = currentDirectory.resolve(fileName);
+        VirtualPath parent = filePath.getParent();
+        if (parent != null) {

Review comment:
       Minor: please merge if condtions

##########
File path: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ftp/filesystem/DefaultVirtualFileSystem.java
##########
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.standard.ftp.filesystem;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+public class DefaultVirtualFileSystem implements VirtualFileSystem {
+
+    private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
+    private final List<VirtualPath> existingPaths;
+
+    public DefaultVirtualFileSystem() {
+        existingPaths = new ArrayList<>();
+        existingPaths.add(ROOT);
+    }
+
+    @Override
+    public boolean mkdir(VirtualPath newFile) {
+        lock.writeLock().lock();
+        try {
+            if (existingPaths.contains(newFile)) {
+                return false;
+            } else {
+                if (existingPaths.contains(newFile.getParent())) {
+                    existingPaths.add(newFile);
+                    return true;
+                } else {
+                    return false;
+                }
+            }
+        } finally {
+            lock.writeLock().unlock();
+        }
+    }
+
+    @Override
+    public boolean exists(VirtualPath virtualFile) {
+        lock.readLock().lock();
+        try {
+            return existingPaths.contains(virtualFile);
+        } finally {
+            lock.readLock().unlock();
+        }
+    }
+
+    @Override
+    public boolean delete(VirtualPath virtualFile) {
+        if (virtualFile.equals(ROOT)) { // Root cannot be deleted
+            return false;
+        }
+
+        lock.writeLock().lock();
+        try {
+            if (existingPaths.contains(virtualFile)) {
+                if (!hasSubDirectories(virtualFile)) {
+                    return existingPaths.remove(virtualFile);
+                }
+            }
+            return false;
+        } finally {
+            lock.writeLock().unlock();
+        }
+    }
+
+    private boolean hasSubDirectories(VirtualPath directory) {
+        return existingPaths.stream().anyMatch(e -> isChildOf(directory, e));
+    }
+
+    private boolean isChildOf(VirtualPath parent, VirtualPath childCandidate) {
+        if (childCandidate.equals(ROOT)) {
+            return false;
+        }
+        return parent.equals(childCandidate.getParent());
+    }
+
+    @Override
+    public List<VirtualPath> listChildren(VirtualPath parent) {
+        List<VirtualPath> children = new ArrayList<>();
+
+        lock.readLock().lock();
+        try {
+            if (parent.equals(ROOT)) {
+                for (VirtualPath existingPath : existingPaths) {

Review comment:
       Minor: With stream api this might be a little more readable and compact: filter and forAll would help

##########
File path: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ftp/commands/NotSupportedCommand.java
##########
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.standard.ftp.commands;
+
+import org.apache.ftpserver.command.AbstractCommand;
+import org.apache.ftpserver.ftplet.DefaultFtpReply;
+import org.apache.ftpserver.ftplet.FtpReply;
+import org.apache.ftpserver.ftplet.FtpRequest;
+import org.apache.ftpserver.impl.FtpIoSession;
+import org.apache.ftpserver.impl.FtpServerContext;
+
+public class NotSupportedCommand extends AbstractCommand {

Review comment:
       I think, involving Ftp into the name would make the naming more precise. For AbstractCommand, I think FtpCommand would be fine as well. This naming is pretty general, making it hard to find the classes in a bigger project like NiFi. (For example: there is a different AbstractCommand in the project, which has way different purposes)

##########
File path: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ftp/filesystem/VirtualFileSystemView.java
##########
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.standard.ftp.filesystem;
+
+import org.apache.ftpserver.ftplet.FileSystemView;
+import org.apache.ftpserver.ftplet.FtpException;
+import org.apache.ftpserver.ftplet.FtpFile;
+import org.apache.ftpserver.ftplet.User;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class VirtualFileSystemView implements FileSystemView {
+
+    private static final Logger LOG = LoggerFactory.getLogger(VirtualFileSystemView.class);
+    private final VirtualFileSystem fileSystem;
+    private VirtualPath currentDirectory = VirtualFileSystem.ROOT;
+
+    public VirtualFileSystemView(User user, VirtualFileSystem fileSystem) throws IllegalArgumentException {
+        if (user == null || fileSystem == null) {

Review comment:
       Suggestion: Objects.requireNonNull

##########
File path: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ftp/commands/FtpCommandSTOR.java
##########
@@ -0,0 +1,233 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.standard.ftp.commands;
+
+import org.apache.ftpserver.command.AbstractCommand;
+import org.apache.ftpserver.ftplet.DataConnection;
+import org.apache.ftpserver.ftplet.DataConnectionFactory;
+import org.apache.ftpserver.ftplet.DefaultFtpReply;
+import org.apache.ftpserver.ftplet.FtpException;
+import org.apache.ftpserver.ftplet.FtpFile;
+import org.apache.ftpserver.ftplet.FtpReply;
+import org.apache.ftpserver.ftplet.FtpRequest;
+import org.apache.ftpserver.impl.FtpIoSession;
+import org.apache.ftpserver.impl.FtpServerContext;
+import org.apache.ftpserver.impl.IODataConnectionFactory;
+import org.apache.ftpserver.impl.LocalizedDataTransferFtpReply;
+import org.apache.ftpserver.impl.LocalizedFtpReply;
+import org.apache.ftpserver.impl.ServerFtpStatistics;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.ProcessSessionFactory;
+import org.apache.nifi.processors.standard.ListenFTP;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.net.InetAddress;
+import java.net.SocketException;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicReference;
+
+public class FtpCommandSTOR extends AbstractCommand {
+
+    private static final Logger LOG = LoggerFactory.getLogger(FtpCommandSTOR.class);
+    private final AtomicReference<ProcessSessionFactory> sessionFactory;
+    private final CountDownLatch sessionFactorySetSignal;
+
+    public FtpCommandSTOR(AtomicReference<ProcessSessionFactory> sessionFactory, CountDownLatch sessionFactorySetSignal) {
+        this.sessionFactory = sessionFactory;
+        this.sessionFactorySetSignal = sessionFactorySetSignal;
+    }
+
+    /**
+     * Execute command.
+     */
+    public void execute(final FtpIoSession ftpSession, final FtpServerContext context, final FtpRequest request) {
+        try {
+            executeCommand(ftpSession, context, request);
+        } catch (FtpCommandException ftpCommandException) {
+            if (ftpCommandException.getSubId() == null) {
+                ftpSession.write(new DefaultFtpReply(ftpCommandException.getFtpReturnCode(), ftpCommandException.getBasicMessage()));
+            } else {
+                ftpSession.write(LocalizedDataTransferFtpReply.translate(ftpSession, request, context,
+                        ftpCommandException.getFtpReturnCode(),
+                        ftpCommandException.getSubId(),
+                        ftpCommandException.getBasicMessage(),
+                        ftpCommandException.getFtpFile()));
+            }
+        } finally {
+            ftpSession.resetState();
+            ftpSession.getDataConnection().closeDataConnection();
+        }
+    }
+
+    private void executeCommand(FtpIoSession ftpSession, FtpServerContext context, FtpRequest request)
+            throws FtpCommandException {
+
+        final String fileName = getArgument(request);
+
+        checkDataConnection(ftpSession);
+
+        final FtpFile ftpFile = getFtpFile(ftpSession, fileName);
+
+        checkWritePermission(ftpFile);
+
+        sendReturnCode150(ftpSession, context, request, ftpFile.getAbsolutePath());
+
+        final DataConnection dataConnection = openDataConnection(ftpSession, ftpFile);
+
+        transferData(dataConnection, ftpSession, context, request, ftpFile);
+    }
+
+    private String getArgument(final FtpRequest request) throws FtpCommandException {
+        final String argument = request.getArgument();
+        if (argument == null) {
+            throw new FtpCommandException(FtpReply.REPLY_501_SYNTAX_ERROR_IN_PARAMETERS_OR_ARGUMENTS, "STOR", null, null);
+        }
+        return argument;
+    }
+
+    private void checkDataConnection(final FtpIoSession ftpSession) throws FtpCommandException {
+        DataConnectionFactory dataConnectionFactory = ftpSession.getDataConnection();
+        if (dataConnectionFactory instanceof IODataConnectionFactory) {
+            InetAddress address = ((IODataConnectionFactory) dataConnectionFactory)
+                    .getInetAddress();
+            if (address == null) {
+                throw new FtpCommandException(FtpReply.REPLY_503_BAD_SEQUENCE_OF_COMMANDS, null, "PORT or PASV must be issued first", null);

Review comment:
       Minor: I am not perfectly sure, but I see two general patterns here: one where all of the arguments are used and one where subId and ftpFile are empty. I would suggest to think about using two separate kind of exceptions based on this differentiation.

##########
File path: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ftp/commands/FtpCommandSTOR.java
##########
@@ -0,0 +1,233 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.standard.ftp.commands;
+
+import org.apache.ftpserver.command.AbstractCommand;
+import org.apache.ftpserver.ftplet.DataConnection;
+import org.apache.ftpserver.ftplet.DataConnectionFactory;
+import org.apache.ftpserver.ftplet.DefaultFtpReply;
+import org.apache.ftpserver.ftplet.FtpException;
+import org.apache.ftpserver.ftplet.FtpFile;
+import org.apache.ftpserver.ftplet.FtpReply;
+import org.apache.ftpserver.ftplet.FtpRequest;
+import org.apache.ftpserver.impl.FtpIoSession;
+import org.apache.ftpserver.impl.FtpServerContext;
+import org.apache.ftpserver.impl.IODataConnectionFactory;
+import org.apache.ftpserver.impl.LocalizedDataTransferFtpReply;
+import org.apache.ftpserver.impl.LocalizedFtpReply;
+import org.apache.ftpserver.impl.ServerFtpStatistics;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.ProcessSessionFactory;
+import org.apache.nifi.processors.standard.ListenFTP;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.net.InetAddress;
+import java.net.SocketException;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicReference;
+
+public class FtpCommandSTOR extends AbstractCommand {
+
+    private static final Logger LOG = LoggerFactory.getLogger(FtpCommandSTOR.class);
+    private final AtomicReference<ProcessSessionFactory> sessionFactory;
+    private final CountDownLatch sessionFactorySetSignal;
+
+    public FtpCommandSTOR(AtomicReference<ProcessSessionFactory> sessionFactory, CountDownLatch sessionFactorySetSignal) {
+        this.sessionFactory = sessionFactory;
+        this.sessionFactorySetSignal = sessionFactorySetSignal;
+    }
+
+    /**
+     * Execute command.
+     */
+    public void execute(final FtpIoSession ftpSession, final FtpServerContext context, final FtpRequest request) {
+        try {
+            executeCommand(ftpSession, context, request);
+        } catch (FtpCommandException ftpCommandException) {
+            if (ftpCommandException.getSubId() == null) {
+                ftpSession.write(new DefaultFtpReply(ftpCommandException.getFtpReturnCode(), ftpCommandException.getBasicMessage()));
+            } else {
+                ftpSession.write(LocalizedDataTransferFtpReply.translate(ftpSession, request, context,
+                        ftpCommandException.getFtpReturnCode(),
+                        ftpCommandException.getSubId(),
+                        ftpCommandException.getBasicMessage(),
+                        ftpCommandException.getFtpFile()));
+            }
+        } finally {
+            ftpSession.resetState();
+            ftpSession.getDataConnection().closeDataConnection();
+        }
+    }
+
+    private void executeCommand(FtpIoSession ftpSession, FtpServerContext context, FtpRequest request)
+            throws FtpCommandException {
+
+        final String fileName = getArgument(request);
+
+        checkDataConnection(ftpSession);
+
+        final FtpFile ftpFile = getFtpFile(ftpSession, fileName);
+
+        checkWritePermission(ftpFile);
+
+        sendReturnCode150(ftpSession, context, request, ftpFile.getAbsolutePath());
+
+        final DataConnection dataConnection = openDataConnection(ftpSession, ftpFile);
+
+        transferData(dataConnection, ftpSession, context, request, ftpFile);
+    }
+
+    private String getArgument(final FtpRequest request) throws FtpCommandException {
+        final String argument = request.getArgument();
+        if (argument == null) {
+            throw new FtpCommandException(FtpReply.REPLY_501_SYNTAX_ERROR_IN_PARAMETERS_OR_ARGUMENTS, "STOR", null, null);

Review comment:
       Minor: Would be nicer to use the basicMessage (what if in an upper level of the stack someone would wish to log it?)

##########
File path: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/ftp/TestVirtualFileSystemView.java
##########
@@ -0,0 +1,223 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.standard.ftp;
+
+import org.apache.ftpserver.ftplet.FileSystemFactory;
+import org.apache.ftpserver.ftplet.FileSystemView;
+import org.apache.ftpserver.ftplet.FtpException;
+import org.apache.ftpserver.ftplet.FtpFile;
+import org.apache.ftpserver.ftplet.User;
+import org.apache.ftpserver.usermanager.impl.BaseUser;
+import org.apache.ftpserver.usermanager.impl.WritePermission;
+import org.apache.nifi.processors.standard.ftp.filesystem.DefaultVirtualFileSystem;
+import org.apache.nifi.processors.standard.ftp.filesystem.VirtualFileSystem;
+import org.apache.nifi.processors.standard.ftp.filesystem.VirtualFileSystemFactory;
+import org.apache.nifi.processors.standard.ftp.filesystem.VirtualPath;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.util.Collections;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+
+public class TestVirtualFileSystemView {
+
+    private FileSystemView fileSystemView;
+    private static VirtualFileSystem fileSystem;
+
+    @BeforeClass
+    public static void setupVirtualFileSystem() {
+        fileSystem = new DefaultVirtualFileSystem();
+        fileSystem.mkdir(new VirtualPath("/Directory1"));
+        fileSystem.mkdir(new VirtualPath("/Directory1/SubDirectory1"));
+        fileSystem.mkdir(new VirtualPath("/Directory1/SubDirectory1/SubSubDirectory"));
+        fileSystem.mkdir(new VirtualPath("/Directory1/SubDirectory2"));
+        fileSystem.mkdir(new VirtualPath("/Directory2"));
+        fileSystem.mkdir(new VirtualPath("/Directory2/SubDirectory3"));
+        fileSystem.mkdir(new VirtualPath("/Directory2/SubDirectory4"));
+    }
+
+    @Before
+    public void setup() throws FtpException {
+        User user = createUser();
+        FileSystemFactory fileSystemFactory = new VirtualFileSystemFactory(fileSystem);
+        fileSystemView = fileSystemFactory.createFileSystemView(user);
+    }
+
+    @Test
+    public void testInRootDirectory() throws FtpException {
+
+        // WHEN
+        // We do not change directories
+
+        // THEN
+        assertHomeDirectory("/");
+        assertCurrentDirectory("/");
+    }
+
+    @Test
+    public void testTryToMakeRootDirectory() {
+
+        // WHEN
+        boolean directoryCreated = fileSystem.mkdir(VirtualFileSystem.ROOT);
+
+        // THEN
+        assertFalse(directoryCreated);
+    }
+
+    @Test
+    public void testChangeToAnotherDirectory() throws FtpException {
+
+        // WHEN
+        fileSystemView.changeWorkingDirectory("/Directory1");
+
+        // THEN
+        assertHomeDirectory("/");
+        assertCurrentDirectory("/Directory1");
+    }
+
+    @Test
+    public void testChangeToRootDirectory() throws FtpException {
+
+        // WHEN
+        fileSystemView.changeWorkingDirectory("/");
+
+        // THEN
+        assertHomeDirectory("/");
+        assertCurrentDirectory("/");
+    }
+
+    @Test
+    public void testChangeToUnspecifiedDirectory() throws FtpException {
+
+        // WHEN
+        fileSystemView.changeWorkingDirectory("");
+
+        // THEN
+        assertHomeDirectory("/");
+        assertCurrentDirectory("/");
+    }
+
+    @Test
+    public void testChangeToSameDirectory() throws FtpException {
+
+        // WHEN
+        fileSystemView.changeWorkingDirectory(".");
+
+        // THEN
+        assertHomeDirectory("/");
+        assertCurrentDirectory("/");
+    }
+
+    @Test
+    public void testChangeToSameDirectoryNonRoot() throws FtpException {
+        // GIVEN
+        fileSystemView.changeWorkingDirectory("/Directory1");
+
+        // WHEN
+        fileSystemView.changeWorkingDirectory(".");
+
+        // THEN
+        assertHomeDirectory("/");
+        assertCurrentDirectory("/Directory1");
+    }
+
+    @Test
+    public void testChangeToParentDirectory() throws FtpException {
+        // GIVEN
+        fileSystemView.changeWorkingDirectory("/Directory1");
+
+        // WHEN
+        fileSystemView.changeWorkingDirectory("..");
+
+        // THEN
+        assertHomeDirectory("/");
+        assertCurrentDirectory("/");
+    }
+
+    @Test
+    public void testChangeToParentDirectoryNonRoot() throws FtpException {
+        // GIVEN
+        fileSystemView.changeWorkingDirectory("/Directory1");
+        fileSystemView.changeWorkingDirectory("SubDirectory1");
+
+        // WHEN
+        fileSystemView.changeWorkingDirectory("..");
+
+        // THEN
+        assertHomeDirectory("/");
+        assertCurrentDirectory("/Directory1");
+    }
+
+    @Test
+    public void testChangeToNonExistentDirectory() throws FtpException {
+        // GIVEN
+
+        // WHEN
+        boolean changeDirectoryResult = fileSystemView.changeWorkingDirectory("/Directory2/SubDirectory3/SubSubDirectory");
+
+        // THEN
+        assertFalse(changeDirectoryResult);
+        assertHomeDirectory("/");
+        assertCurrentDirectory("/");
+    }
+
+    @Test
+    public void testGetFileAbsolute() throws FtpException {
+        // GIVEN
+        fileSystemView.changeWorkingDirectory("/Directory1/SubDirectory1");
+
+        // WHEN
+        FtpFile file = fileSystemView.getFile("/Directory2/SubDirectory3");
+
+        // THEN
+        assertEquals("/Directory2/SubDirectory3", file.getAbsolutePath());
+    }
+
+    @Test
+    public void testGetFileNonAbsolute() throws FtpException {
+        // GIVEN
+        fileSystemView.changeWorkingDirectory("/Directory1/SubDirectory1");
+
+        // WHEN
+        FtpFile file = fileSystemView.getFile("SubSubDirectory");
+
+        // THEN
+        assertEquals("/Directory1/SubDirectory1/SubSubDirectory", file.getAbsolutePath());
+    }
+
+    private User createUser() {

Review comment:
       As the user object is not really utilised in many places, a mock would be simpler

##########
File path: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ftp/filesystem/VirtualFileSystem.java
##########
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.standard.ftp.filesystem;
+
+import java.util.List;
+
+public interface VirtualFileSystem {

Review comment:
       Please add javadoc

##########
File path: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ftp/filesystem/VirtualFtpFile.java
##########
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.standard.ftp.filesystem;
+
+import org.apache.ftpserver.ftplet.FtpFile;
+
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.Calendar;
+import java.util.List;
+
+public class VirtualFtpFile implements FtpFile {
+
+    private final VirtualPath path;
+    private final VirtualFileSystem fileSystem;
+
+    public VirtualFtpFile(VirtualPath path, VirtualFileSystem fileSystem) throws IllegalArgumentException {
+        if (path == null || fileSystem == null) {

Review comment:
       Suggestion: Objects.requireNonNull

##########
File path: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ftp/filesystem/VirtualPath.java
##########
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.standard.ftp.filesystem;
+
+import java.io.File;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+
+public class VirtualPath {
+
+    private final Path path; // always normalized
+
+    public VirtualPath(String path) {
+        String absolutePath = "/" + normalizeSeparator(path);

Review comment:
       What if path originally starts with "/"? Should not be checked if it is before adding that? (Even if this is handled correctly, this might have effect on unicity as '/something' and '//something' are not considered as identical with the current code

##########
File path: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ftp/filesystem/VirtualPath.java
##########
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.standard.ftp.filesystem;
+
+import java.io.File;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+
+public class VirtualPath {
+
+    private final Path path; // always normalized
+
+    public VirtualPath(String path) {
+        String absolutePath = "/" + normalizeSeparator(path);
+        this.path = Paths.get(absolutePath).normalize();
+    }
+
+    private String normalizeSeparator(String path) {
+        String normalizedPath = path.replace(File.separatorChar, '/');
+        normalizedPath = normalizedPath.replace('\\', '/');
+        return normalizedPath;
+    }
+
+    public String getFileName() {
+        if (path.getFileName() == null) {
+            return "/";
+        } else {
+            return path.getFileName().toString();
+        }
+    }
+
+    public VirtualPath getParent() {
+        if (path.getParent() == null) {

Review comment:
       Minor: Optional.ofNullable(path.getFileName()).orElse(ROOT).toString() might be a little compact (if there is a ROOT constant with "/" as path.

##########
File path: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ftp/filesystem/DefaultVirtualFileSystem.java
##########
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.standard.ftp.filesystem;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+public class DefaultVirtualFileSystem implements VirtualFileSystem {
+
+    private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
+    private final List<VirtualPath> existingPaths;
+
+    public DefaultVirtualFileSystem() {
+        existingPaths = new ArrayList<>();
+        existingPaths.add(ROOT);
+    }
+
+    @Override
+    public boolean mkdir(VirtualPath newFile) {
+        lock.writeLock().lock();
+        try {
+            if (existingPaths.contains(newFile)) {
+                return false;
+            } else {
+                if (existingPaths.contains(newFile.getParent())) {
+                    existingPaths.add(newFile);
+                    return true;
+                } else {
+                    return false;
+                }
+            }
+        } finally {
+            lock.writeLock().unlock();
+        }
+    }
+
+    @Override
+    public boolean exists(VirtualPath virtualFile) {
+        lock.readLock().lock();
+        try {
+            return existingPaths.contains(virtualFile);
+        } finally {
+            lock.readLock().unlock();
+        }
+    }
+
+    @Override
+    public boolean delete(VirtualPath virtualFile) {
+        if (virtualFile.equals(ROOT)) { // Root cannot be deleted
+            return false;
+        }
+
+        lock.writeLock().lock();
+        try {
+            if (existingPaths.contains(virtualFile)) {
+                if (!hasSubDirectories(virtualFile)) {
+                    return existingPaths.remove(virtualFile);
+                }
+            }
+            return false;
+        } finally {
+            lock.writeLock().unlock();
+        }
+    }
+
+    private boolean hasSubDirectories(VirtualPath directory) {
+        return existingPaths.stream().anyMatch(e -> isChildOf(directory, e));
+    }
+
+    private boolean isChildOf(VirtualPath parent, VirtualPath childCandidate) {
+        if (childCandidate.equals(ROOT)) {
+            return false;
+        }
+        return parent.equals(childCandidate.getParent());
+    }
+
+    @Override
+    public List<VirtualPath> listChildren(VirtualPath parent) {
+        List<VirtualPath> children = new ArrayList<>();

Review comment:
       I am not sure that we need to put a fully developed file system behind the virtual file system, but have you considered using a tree structure? Algorithms like this would be much simpler

##########
File path: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ftp/filesystem/VirtualFtpFile.java
##########
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.standard.ftp.filesystem;
+
+import org.apache.ftpserver.ftplet.FtpFile;
+
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.Calendar;
+import java.util.List;
+
+public class VirtualFtpFile implements FtpFile {
+
+    private final VirtualPath path;
+    private final VirtualFileSystem fileSystem;
+
+    public VirtualFtpFile(VirtualPath path, VirtualFileSystem fileSystem) throws IllegalArgumentException {
+        if (path == null || fileSystem == null) {
+            throw new IllegalArgumentException("File path and fileSystem cannot be null");
+        }
+        this.path = path;
+        this.fileSystem = fileSystem;
+    }
+
+    @Override
+    public String getAbsolutePath() {
+        return path.toString();
+    }
+
+    @Override
+    public String getName() {
+        return path.getFileName();
+    }
+
+    @Override
+    public boolean isHidden() {
+        return false;
+    }
+
+    @Override
+    public boolean isDirectory() {
+        return true; // Only directories are handled since files are converted into flowfiles immediately.
+    }
+
+    @Override
+    public boolean isFile() {
+        return false; // Only directories are handled since files are converted into flowfiles immediately.
+    }
+
+    @Override
+    public boolean doesExist() {
+        return fileSystem.exists(path);
+    }
+
+    @Override
+    public boolean isReadable() {
+        return true;
+    }
+
+    @Override
+    public boolean isWritable() {
+        return true;
+    }
+
+    @Override
+    public boolean isRemovable() {
+        return true; //Every virtual directory can be deleted
+    }
+
+    @Override
+    public String getOwnerName() {
+        return "Owner";
+    }
+
+    @Override
+    public String getGroupName() {
+        return "Group";
+    }
+
+    @Override
+    public int getLinkCount() {
+        return 1;
+    }
+
+    @Override
+    public long getLastModified() {
+        return Calendar.getInstance().getTimeInMillis();
+    }
+
+    @Override
+    public boolean setLastModified(long l) throws UnsupportedOperationException {
+        throw new UnsupportedOperationException("VirtualFtpFile.setLastModified()");
+    }
+
+    @Override
+    public long getSize() {
+        return 0;
+    }
+
+    @Override
+    public Object getPhysicalFile() throws UnsupportedOperationException {
+        throw new UnsupportedOperationException("VirtualFtpFile.getPhysicalFile()");
+    }
+
+    @Override
+    public boolean mkdir() {
+        return fileSystem.mkdir(path);
+    }
+
+    @Override
+    public boolean delete() {
+        return fileSystem.delete(path);
+    }
+
+    @Override
+    public boolean move(FtpFile ftpFile) throws UnsupportedOperationException {
+        throw new UnsupportedOperationException("VirtualFtpFile.move()");
+    }
+
+    @Override
+    public List<? extends FtpFile> listFiles() {
+        List<VirtualPath> paths = fileSystem.listChildren(path);
+        List<VirtualFtpFile> files = new ArrayList<>();
+        for (VirtualPath path : paths) {
+            files.add(new VirtualFtpFile(path, fileSystem));
+        }
+        return files;
+    }
+
+    @Override
+    public OutputStream createOutputStream(long l) throws UnsupportedOperationException {
+        throw new UnsupportedOperationException("VirtualFtpFile.createOutputStream()");
+    }
+
+    @Override
+    public InputStream createInputStream(long l) throws UnsupportedOperationException {
+        throw new UnsupportedOperationException("VirtualFtpFile.createInputStream()");
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (o == this) {
+            return true;
+        }
+        if (!(o instanceof VirtualFtpFile)) {
+            return false;
+        }
+        VirtualFtpFile other = (VirtualFtpFile) o;
+        return path.equals(other.path);

Review comment:
       Should not be the fileSystem involved? Do we consider the same path from two different file systems equal?

##########
File path: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ftp/filesystem/VirtualFtpFile.java
##########
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.standard.ftp.filesystem;
+
+import org.apache.ftpserver.ftplet.FtpFile;
+
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.Calendar;
+import java.util.List;
+
+public class VirtualFtpFile implements FtpFile {
+
+    private final VirtualPath path;
+    private final VirtualFileSystem fileSystem;
+
+    public VirtualFtpFile(VirtualPath path, VirtualFileSystem fileSystem) throws IllegalArgumentException {
+        if (path == null || fileSystem == null) {
+            throw new IllegalArgumentException("File path and fileSystem cannot be null");
+        }
+        this.path = path;
+        this.fileSystem = fileSystem;
+    }
+
+    @Override
+    public String getAbsolutePath() {
+        return path.toString();
+    }
+
+    @Override
+    public String getName() {
+        return path.getFileName();
+    }
+
+    @Override
+    public boolean isHidden() {
+        return false;
+    }
+
+    @Override
+    public boolean isDirectory() {
+        return true; // Only directories are handled since files are converted into flowfiles immediately.
+    }
+
+    @Override
+    public boolean isFile() {
+        return false; // Only directories are handled since files are converted into flowfiles immediately.
+    }
+
+    @Override
+    public boolean doesExist() {
+        return fileSystem.exists(path);
+    }
+
+    @Override
+    public boolean isReadable() {
+        return true;
+    }
+
+    @Override
+    public boolean isWritable() {
+        return true;
+    }
+
+    @Override
+    public boolean isRemovable() {
+        return true; //Every virtual directory can be deleted
+    }
+
+    @Override
+    public String getOwnerName() {
+        return "Owner";

Review comment:
       I am not sure it matters, but in case non-anonymous connection, would it not make sense to use that? Is there any scenario might depend on this?

##########
File path: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/ftp/TestVirtualFileSystemView.java
##########
@@ -0,0 +1,223 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.standard.ftp;
+
+import org.apache.ftpserver.ftplet.FileSystemFactory;
+import org.apache.ftpserver.ftplet.FileSystemView;
+import org.apache.ftpserver.ftplet.FtpException;
+import org.apache.ftpserver.ftplet.FtpFile;
+import org.apache.ftpserver.ftplet.User;
+import org.apache.ftpserver.usermanager.impl.BaseUser;
+import org.apache.ftpserver.usermanager.impl.WritePermission;
+import org.apache.nifi.processors.standard.ftp.filesystem.DefaultVirtualFileSystem;
+import org.apache.nifi.processors.standard.ftp.filesystem.VirtualFileSystem;
+import org.apache.nifi.processors.standard.ftp.filesystem.VirtualFileSystemFactory;
+import org.apache.nifi.processors.standard.ftp.filesystem.VirtualPath;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.util.Collections;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+
+public class TestVirtualFileSystemView {

Review comment:
       Could you please add cases for the following scenarios?
   
   ```
   /Directory1/
   /Directory1//
   //Directory1
   //Directory1/SubDirectory1/../SubDirectory1
   //
   Directory1
   /Direc tory1 (the space is deliberate)
   \Directory1\SubDirectory1
   /űáú▣☃/SubDirectory1
   ```

##########
File path: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/ftp/TestVirtualFileSystemView.java
##########
@@ -0,0 +1,223 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.standard.ftp;
+
+import org.apache.ftpserver.ftplet.FileSystemFactory;
+import org.apache.ftpserver.ftplet.FileSystemView;
+import org.apache.ftpserver.ftplet.FtpException;
+import org.apache.ftpserver.ftplet.FtpFile;
+import org.apache.ftpserver.ftplet.User;
+import org.apache.ftpserver.usermanager.impl.BaseUser;
+import org.apache.ftpserver.usermanager.impl.WritePermission;
+import org.apache.nifi.processors.standard.ftp.filesystem.DefaultVirtualFileSystem;
+import org.apache.nifi.processors.standard.ftp.filesystem.VirtualFileSystem;
+import org.apache.nifi.processors.standard.ftp.filesystem.VirtualFileSystemFactory;
+import org.apache.nifi.processors.standard.ftp.filesystem.VirtualPath;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.util.Collections;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+
+public class TestVirtualFileSystemView {
+
+    private FileSystemView fileSystemView;
+    private static VirtualFileSystem fileSystem;
+
+    @BeforeClass
+    public static void setupVirtualFileSystem() {
+        fileSystem = new DefaultVirtualFileSystem();
+        fileSystem.mkdir(new VirtualPath("/Directory1"));
+        fileSystem.mkdir(new VirtualPath("/Directory1/SubDirectory1"));
+        fileSystem.mkdir(new VirtualPath("/Directory1/SubDirectory1/SubSubDirectory"));
+        fileSystem.mkdir(new VirtualPath("/Directory1/SubDirectory2"));
+        fileSystem.mkdir(new VirtualPath("/Directory2"));
+        fileSystem.mkdir(new VirtualPath("/Directory2/SubDirectory3"));
+        fileSystem.mkdir(new VirtualPath("/Directory2/SubDirectory4"));
+    }
+
+    @Before
+    public void setup() throws FtpException {
+        User user = createUser();
+        FileSystemFactory fileSystemFactory = new VirtualFileSystemFactory(fileSystem);
+        fileSystemView = fileSystemFactory.createFileSystemView(user);
+    }
+
+    @Test
+    public void testInRootDirectory() throws FtpException {
+
+        // WHEN
+        // We do not change directories
+
+        // THEN
+        assertHomeDirectory("/");
+        assertCurrentDirectory("/");
+    }
+
+    @Test
+    public void testTryToMakeRootDirectory() {
+
+        // WHEN
+        boolean directoryCreated = fileSystem.mkdir(VirtualFileSystem.ROOT);
+
+        // THEN
+        assertFalse(directoryCreated);
+    }
+
+    @Test
+    public void testChangeToAnotherDirectory() throws FtpException {
+
+        // WHEN
+        fileSystemView.changeWorkingDirectory("/Directory1");
+
+        // THEN
+        assertHomeDirectory("/");
+        assertCurrentDirectory("/Directory1");
+    }
+
+    @Test
+    public void testChangeToRootDirectory() throws FtpException {
+
+        // WHEN
+        fileSystemView.changeWorkingDirectory("/");
+
+        // THEN
+        assertHomeDirectory("/");
+        assertCurrentDirectory("/");
+    }
+
+    @Test
+    public void testChangeToUnspecifiedDirectory() throws FtpException {
+
+        // WHEN
+        fileSystemView.changeWorkingDirectory("");
+
+        // THEN
+        assertHomeDirectory("/");
+        assertCurrentDirectory("/");
+    }
+
+    @Test
+    public void testChangeToSameDirectory() throws FtpException {
+
+        // WHEN
+        fileSystemView.changeWorkingDirectory(".");
+
+        // THEN
+        assertHomeDirectory("/");
+        assertCurrentDirectory("/");
+    }
+
+    @Test
+    public void testChangeToSameDirectoryNonRoot() throws FtpException {
+        // GIVEN
+        fileSystemView.changeWorkingDirectory("/Directory1");
+
+        // WHEN
+        fileSystemView.changeWorkingDirectory(".");
+
+        // THEN
+        assertHomeDirectory("/");
+        assertCurrentDirectory("/Directory1");
+    }
+
+    @Test
+    public void testChangeToParentDirectory() throws FtpException {
+        // GIVEN
+        fileSystemView.changeWorkingDirectory("/Directory1");
+
+        // WHEN
+        fileSystemView.changeWorkingDirectory("..");
+
+        // THEN
+        assertHomeDirectory("/");
+        assertCurrentDirectory("/");
+    }
+
+    @Test
+    public void testChangeToParentDirectoryNonRoot() throws FtpException {
+        // GIVEN
+        fileSystemView.changeWorkingDirectory("/Directory1");
+        fileSystemView.changeWorkingDirectory("SubDirectory1");
+
+        // WHEN
+        fileSystemView.changeWorkingDirectory("..");
+
+        // THEN
+        assertHomeDirectory("/");
+        assertCurrentDirectory("/Directory1");
+    }
+
+    @Test
+    public void testChangeToNonExistentDirectory() throws FtpException {
+        // GIVEN
+
+        // WHEN
+        boolean changeDirectoryResult = fileSystemView.changeWorkingDirectory("/Directory2/SubDirectory3/SubSubDirectory");
+
+        // THEN
+        assertFalse(changeDirectoryResult);
+        assertHomeDirectory("/");
+        assertCurrentDirectory("/");
+    }
+
+    @Test
+    public void testGetFileAbsolute() throws FtpException {
+        // GIVEN
+        fileSystemView.changeWorkingDirectory("/Directory1/SubDirectory1");
+
+        // WHEN
+        FtpFile file = fileSystemView.getFile("/Directory2/SubDirectory3");
+
+        // THEN
+        assertEquals("/Directory2/SubDirectory3", file.getAbsolutePath());
+    }
+
+    @Test
+    public void testGetFileNonAbsolute() throws FtpException {
+        // GIVEN
+        fileSystemView.changeWorkingDirectory("/Directory1/SubDirectory1");
+
+        // WHEN
+        FtpFile file = fileSystemView.getFile("SubSubDirectory");
+
+        // THEN
+        assertEquals("/Directory1/SubDirectory1/SubSubDirectory", file.getAbsolutePath());
+    }
+
+    private User createUser() {
+        BaseUser user = new BaseUser();
+        user.setName("Username");
+        user.setPassword("Password");
+        user.setHomeDirectory("/abc/def");
+        user.setAuthorities(Collections.singletonList(new WritePermission()));
+        return user;
+    }
+
+    private void assertHomeDirectory(String expectedHomeDirectory) throws FtpException {

Review comment:
       Minor: it might increase expressiveness if an Equals or EqualsWith postfix would be added. The same is truth for the one below




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi] pgyori commented on pull request #4481: NIFI-7624: ListenFTP processor

Posted by GitBox <gi...@apache.org>.
pgyori commented on pull request #4481:
URL: https://github.com/apache/nifi/pull/4481#issuecomment-679220528


   @pvillard31 the error was due to a method call that is not present in Java 8. Fixed that with the next commit.
   I'll check the FTPS possibility and let you know.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi] simonbence commented on a change in pull request #4481: NIFI-7624: ListenFTP processor

Posted by GitBox <gi...@apache.org>.
simonbence commented on a change in pull request #4481:
URL: https://github.com/apache/nifi/pull/4481#discussion_r483437039



##########
File path: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenFTP.java
##########
@@ -0,0 +1,215 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.standard;
+
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.annotation.lifecycle.OnStopped;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.processor.AbstractSessionFactoryProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSessionFactory;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.processors.standard.ftp.NifiFtpServer;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicReference;
+
+@InputRequirement(Requirement.INPUT_FORBIDDEN)
+@Tags({"ingest", "ftp", "listen"})
+@CapabilityDescription("Starts an FTP Server and listens on a given port to transform incoming files into FlowFiles. "
+        + "The URI of the Service will be ftp://{hostname}:{port}. The default port is 2221.")
+public class ListenFTP extends AbstractSessionFactoryProcessor {
+
+    public static final Relationship RELATIONSHIP_SUCCESS = new Relationship.Builder()
+            .name("success")
+            .description("Relationship for successfully received files")
+            .build();
+
+    public static final PropertyDescriptor BIND_ADDRESS = new PropertyDescriptor.Builder()
+            .name("bind-address")
+            .displayName("Bind Address")
+            .description("The address the FTP server should be bound to. If not provided, the server binds to all available addresses.")
+            .required(false)
+            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .build();
+
+    public static final PropertyDescriptor PORT = new PropertyDescriptor.Builder()
+            .name("listening-port")
+            .displayName("Listening Port")
+            .description("The Port to listen on for incoming connections. On Linux, root privileges are required to use port numbers below 1024.")
+            .required(true)
+            .defaultValue("2221")
+            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .addValidator(StandardValidators.PORT_VALIDATOR)
+            .build();
+
+    public static final PropertyDescriptor USERNAME = new PropertyDescriptor.Builder()
+            .name("username")
+            .displayName("Username")
+            .description("The name of the user that is allowed to log in to the FTP server. " +
+                    "If a username is provided, a password must also be provided. " +
+                    "If no username is specified, anonymous connections will be permitted.")
+            .required(false)
+            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+            .build();
+
+    public static final PropertyDescriptor PASSWORD = new PropertyDescriptor.Builder()
+            .name("password")
+            .displayName("Password")
+            .description("If a Username is specified, then a password must also be specified. " +
+                    "The password provided by the client trying to log in to the FTP server will be checked against this password.")
+            .required(false)
+            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+            .sensitive(true)
+            .build();
+
+    private static final List<PropertyDescriptor> PROPERTIES = Collections.unmodifiableList(Arrays.asList(
+            BIND_ADDRESS,
+            PORT,
+            USERNAME,
+            PASSWORD
+    ));
+
+    private static final Set<Relationship> RELATIONSHIPS = Collections.unmodifiableSet(new HashSet<>(Collections.singletonList(
+            RELATIONSHIP_SUCCESS
+    )));
+
+    private volatile NifiFtpServer ftpServer;
+    private volatile CountDownLatch sessionFactorySetSignal;
+    private final AtomicReference<ProcessSessionFactory> sessionFactory = new AtomicReference<>();
+
+    @Override
+    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return PROPERTIES;
+    }
+
+    @Override
+    public Set<Relationship> getRelationships() {
+        return RELATIONSHIPS;
+    }
+
+    @OnScheduled
+    public void startFtpServer(ProcessContext context) {
+        if (ftpServer == null) {
+            String username = context.getProperty(USERNAME).evaluateAttributeExpressions().getValue();
+            String password = context.getProperty(PASSWORD).evaluateAttributeExpressions().getValue();
+            String bindAddress = context.getProperty(BIND_ADDRESS).evaluateAttributeExpressions().getValue();
+            int port = context.getProperty(PORT).evaluateAttributeExpressions().asInteger();
+
+            try {
+                sessionFactorySetSignal = new CountDownLatch(1);
+                ftpServer = new NifiFtpServer.Builder()
+                        .sessionFactory(sessionFactory)
+                        .sessionFactorySetSignal(sessionFactorySetSignal)
+                        .bindAddress(bindAddress)
+                        .port(port)
+                        .username(username)
+                        .password(password)
+                        .build();
+                ftpServer.start();
+            } catch (ProcessException processException) {
+                getLogger().error(processException.getMessage(), processException);
+                stopFtpServer();
+                throw processException;
+            }
+        } else {
+            getLogger().warn("Ftp server already started.");
+        }
+    }
+
+    @OnStopped
+    public void stopFtpServer() {
+        if (ftpServer != null && !ftpServer.isStopped()) {
+            ftpServer.stop();
+        }
+        ftpServer = null;
+    }
+
+    @Override
+    public void onTrigger(ProcessContext context, ProcessSessionFactory sessionFactory) throws ProcessException {
+        if (this.sessionFactory.compareAndSet(null, sessionFactory)) {
+            sessionFactorySetSignal.countDown();
+        }
+        context.yield();
+    }
+
+    @Override
+    protected Collection<ValidationResult> customValidate(ValidationContext context) {
+        List<ValidationResult> results = new ArrayList<>(2);
+        String username = context.getProperty(USERNAME).evaluateAttributeExpressions().getValue();
+        String password = context.getProperty(PASSWORD).evaluateAttributeExpressions().getValue();
+
+        if ((username == null) && (password != null)) {
+            results.add(usernameOrPasswordIsNull(USERNAME, PASSWORD));
+        } else if ((username != null) && (password == null)) {
+            results.add(usernameOrPasswordIsNull(PASSWORD, USERNAME));
+        } else if ((username != null) && (password != null)) {
+            validateAgainstEmptyString(username, USERNAME, results);

Review comment:
       I think this is something we do not have to prepare. Duplication because of possible future changes can lead into strange corners. I would think, no one will remove a validator without a strong reason and reviews




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi] pgyori commented on a change in pull request #4481: NIFI-7624: ListenFTP processor

Posted by GitBox <gi...@apache.org>.
pgyori commented on a change in pull request #4481:
URL: https://github.com/apache/nifi/pull/4481#discussion_r484907576



##########
File path: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ftp/commands/NotSupportedCommand.java
##########
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.standard.ftp.commands;
+
+import org.apache.ftpserver.command.AbstractCommand;
+import org.apache.ftpserver.ftplet.DefaultFtpReply;
+import org.apache.ftpserver.ftplet.FtpReply;
+import org.apache.ftpserver.ftplet.FtpRequest;
+import org.apache.ftpserver.impl.FtpIoSession;
+import org.apache.ftpserver.impl.FtpServerContext;
+
+public class NotSupportedCommand extends AbstractCommand {

Review comment:
       AbstractCommand is managed under the ftpserver library thus cannot be renamed. NotSupportedCommand is under the ftp.commands package to make it easier to identify.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi] pgyori commented on a change in pull request #4481: NIFI-7624: ListenFTP processor

Posted by GitBox <gi...@apache.org>.
pgyori commented on a change in pull request #4481:
URL: https://github.com/apache/nifi/pull/4481#discussion_r483647107



##########
File path: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ftp/NifiFtpServer.java
##########
@@ -0,0 +1,275 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.standard.ftp;
+
+import org.apache.ftpserver.ConnectionConfig;
+import org.apache.ftpserver.ConnectionConfigFactory;
+import org.apache.ftpserver.FtpServer;
+import org.apache.ftpserver.FtpServerConfigurationException;
+import org.apache.ftpserver.FtpServerFactory;
+import org.apache.ftpserver.command.Command;
+import org.apache.ftpserver.command.CommandFactory;
+import org.apache.ftpserver.command.CommandFactoryFactory;
+import org.apache.ftpserver.command.impl.ABOR;
+import org.apache.ftpserver.command.impl.AUTH;
+import org.apache.ftpserver.command.impl.CDUP;
+import org.apache.ftpserver.command.impl.CWD;
+import org.apache.ftpserver.command.impl.EPRT;
+import org.apache.ftpserver.command.impl.EPSV;
+import org.apache.ftpserver.command.impl.FEAT;
+import org.apache.ftpserver.command.impl.LIST;
+import org.apache.ftpserver.command.impl.MDTM;
+import org.apache.ftpserver.command.impl.MKD;
+import org.apache.ftpserver.command.impl.MLSD;
+import org.apache.ftpserver.command.impl.MLST;
+import org.apache.ftpserver.command.impl.MODE;
+import org.apache.ftpserver.command.impl.NLST;
+import org.apache.ftpserver.command.impl.NOOP;
+import org.apache.ftpserver.command.impl.OPTS;
+import org.apache.ftpserver.command.impl.PASS;
+import org.apache.ftpserver.command.impl.PASV;
+import org.apache.ftpserver.command.impl.PBSZ;
+import org.apache.ftpserver.command.impl.PORT;
+import org.apache.ftpserver.command.impl.PROT;
+import org.apache.ftpserver.command.impl.PWD;
+import org.apache.ftpserver.command.impl.QUIT;
+import org.apache.ftpserver.command.impl.REIN;
+import org.apache.ftpserver.command.impl.RMD;
+import org.apache.ftpserver.command.impl.SITE;
+import org.apache.ftpserver.command.impl.SITE_DESCUSER;
+import org.apache.ftpserver.command.impl.SITE_HELP;
+import org.apache.ftpserver.command.impl.SITE_STAT;
+import org.apache.ftpserver.command.impl.SITE_WHO;
+import org.apache.ftpserver.command.impl.SITE_ZONE;
+import org.apache.ftpserver.command.impl.SIZE;
+import org.apache.ftpserver.command.impl.STAT;
+import org.apache.ftpserver.command.impl.STRU;
+import org.apache.ftpserver.command.impl.SYST;
+import org.apache.ftpserver.command.impl.TYPE;
+import org.apache.ftpserver.command.impl.USER;
+import org.apache.ftpserver.ftplet.Authority;
+import org.apache.ftpserver.ftplet.User;
+import org.apache.ftpserver.listener.Listener;
+import org.apache.ftpserver.listener.ListenerFactory;
+import org.apache.ftpserver.usermanager.impl.BaseUser;
+import org.apache.ftpserver.usermanager.impl.WritePermission;
+import org.apache.nifi.processor.ProcessSessionFactory;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processors.standard.ftp.commands.FtpCommandHELP;
+import org.apache.nifi.processors.standard.ftp.commands.FtpCommandSTOR;
+import org.apache.nifi.processors.standard.ftp.commands.NotSupportedCommand;
+import org.apache.nifi.processors.standard.ftp.filesystem.DefaultVirtualFileSystem;
+import org.apache.nifi.processors.standard.ftp.filesystem.VirtualFileSystem;
+import org.apache.nifi.processors.standard.ftp.filesystem.VirtualFileSystemFactory;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicReference;
+
+public class NifiFtpServer {
+
+    private final Map<String, Command> commandMap = new HashMap<>();
+    private final FtpCommandHELP customHelpCommand = new FtpCommandHELP();
+
+    private final FtpServer server;
+    private static final String HOME_DIRECTORY = "/virtual/ftproot";
+
+    private NifiFtpServer(Builder builder) throws ProcessException {
+        try {
+            initializeCommandMap(builder.sessionFactory, builder.sessionFactorySetSignal);
+
+            VirtualFileSystem fileSystem = new DefaultVirtualFileSystem();
+            boolean anonymousLoginEnabled = (builder.username == null);
+
+            FtpServerFactory serverFactory = new FtpServerFactory();
+            serverFactory.setFileSystem(new VirtualFileSystemFactory(fileSystem));
+            serverFactory.setCommandFactory(createCommandFactory(commandMap));
+            serverFactory.setConnectionConfig(createConnectionConfig(anonymousLoginEnabled));
+            serverFactory.addListener("default", createListener(builder.bindAddress, builder.port));
+            if (anonymousLoginEnabled) {
+                serverFactory.getUserManager().save(createAnonymousUser(HOME_DIRECTORY, Collections.singletonList(new WritePermission())));
+            } else {
+                serverFactory.getUserManager().save(createUser(builder.username, builder.password, HOME_DIRECTORY, Collections.singletonList(new WritePermission())));
+            }
+            server = serverFactory.createServer();
+        } catch (Exception exception) {
+            throw new ProcessException("FTP server could not be started.", exception);
+        }
+    }
+
+    public void start() throws ProcessException {
+        try {
+            server.start();
+        } catch (Exception exception) {
+            throw new ProcessException("FTP server could not be started.", exception);
+        }
+    }
+
+    public void stop() {
+        server.stop();
+    }
+
+    public boolean isStopped() {
+        return server.isStopped();
+    }
+
+    private CommandFactory createCommandFactory(Map<String, Command> commandMap) {
+        CommandFactoryFactory commandFactoryFactory = new CommandFactoryFactory();
+        commandFactoryFactory.setUseDefaultCommands(false);
+        commandFactoryFactory.setCommandMap(commandMap);
+        return commandFactoryFactory.createCommandFactory();
+    }
+
+    private ConnectionConfig createConnectionConfig(boolean anonymousLoginEnabled) {
+        ConnectionConfigFactory connectionConfigFactory = new ConnectionConfigFactory();
+        connectionConfigFactory.setAnonymousLoginEnabled(anonymousLoginEnabled);
+        return connectionConfigFactory.createConnectionConfig();
+    }
+
+    private Listener createListener(String bindAddress, int port) throws FtpServerConfigurationException {
+        ListenerFactory listenerFactory = new ListenerFactory();
+        listenerFactory.setServerAddress(bindAddress);
+        listenerFactory.setPort(port);
+        return listenerFactory.createListener();
+    }
+
+    private User createUser(String username, String password, String homeDirectory, List<Authority> authorities) {
+        BaseUser user = new BaseUser();
+        user.setName(username);
+        user.setPassword(password);
+        user.setHomeDirectory(homeDirectory);
+        user.setAuthorities(authorities);
+        return user;
+    }
+
+    private User createAnonymousUser(String homeDirectory, List<Authority> authorities) {
+        BaseUser user = new BaseUser();
+        user.setName("anonymous");
+        user.setHomeDirectory(homeDirectory);
+        user.setAuthorities(authorities);
+        return user;
+    }
+
+    private void initializeCommandMap(AtomicReference<ProcessSessionFactory> sessionFactory, CountDownLatch sessionFactorySetSignal) {
+        addToCommandMap("ABOR", new ABOR());
+        addToCommandMap("ACCT", new NotSupportedCommand("Operation (ACCT) not supported."));
+        addToCommandMap("APPE", new NotSupportedCommand("Operation (APPE) not supported."));
+        addToCommandMap("AUTH", new AUTH());
+        addToCommandMap("CDUP", new CDUP());
+        addToCommandMap("CWD", new CWD());
+        addToCommandMap("DELE", new NotSupportedCommand("Operation (DELE) not supported."));
+        addToCommandMap("EPRT", new EPRT());
+        addToCommandMap("EPSV", new EPSV());
+        addToCommandMap("FEAT", new FEAT());
+        addToCommandMap("HELP", customHelpCommand);
+        addToCommandMap("LIST", new LIST());
+        addToCommandMap("MFMT", new NotSupportedCommand("Operation (MFMT) not supported."));
+        addToCommandMap("MDTM", new MDTM());
+        addToCommandMap("MLST", new MLST());
+        addToCommandMap("MKD", new MKD());
+        addToCommandMap("MLSD", new MLSD());
+        addToCommandMap("MODE", new MODE());
+        addToCommandMap("NLST", new NLST());
+        addToCommandMap("NOOP", new NOOP());
+        addToCommandMap("OPTS", new OPTS());
+        addToCommandMap("PASS", new PASS());
+        addToCommandMap("PASV", new PASV());
+        addToCommandMap("PBSZ", new PBSZ());
+        addToCommandMap("PORT", new PORT());
+        addToCommandMap("PROT", new PROT());
+        addToCommandMap("PWD", new PWD());
+        addToCommandMap("QUIT", new QUIT());
+        addToCommandMap("REIN", new REIN());
+        addToCommandMap("REST", new NotSupportedCommand("Operation (REST) not supported."));
+        addToCommandMap("RETR", new NotSupportedCommand("Operation (RETR) not supported."));
+        addToCommandMap("RMD", new RMD());
+        //addToCommandMap("RNFR", new RNFR());

Review comment:
       For now, this will not be available. Removed the comment and changed the implementation to use NotSupportedCommand.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi] pgyori commented on a change in pull request #4481: NIFI-7624: ListenFTP processor

Posted by GitBox <gi...@apache.org>.
pgyori commented on a change in pull request #4481:
URL: https://github.com/apache/nifi/pull/4481#discussion_r484858773



##########
File path: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ftp/commands/FtpCommandHELP.java
##########
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.standard.ftp.commands;
+
+import org.apache.ftpserver.command.AbstractCommand;
+import org.apache.ftpserver.ftplet.DefaultFtpReply;
+import org.apache.ftpserver.ftplet.FtpReply;
+import org.apache.ftpserver.ftplet.FtpRequest;
+import org.apache.ftpserver.impl.FtpIoSession;
+import org.apache.ftpserver.impl.FtpServerContext;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeSet;
+
+public class FtpCommandHELP extends AbstractCommand {
+
+    private static Map<String, String> COMMAND_SPECIFIC_HELP;
+    private Set<String> availableCommands = new TreeSet<>();
+
+    static {
+        Map<String, String> commands = new HashMap<>();

Review comment:
       I completely agree, and I would not have implemented this complicated way of setting up the help command, if it was not necessary. The reason is simple: AbstractCommand is defined in the ftpserver library and is not managed by me. Since I do not override all the ftp commands, and the original commands inherit from AbstractCommand, the custom commands also need to inherit from that. In the original HELP command in the library, the help text was hard-coded. This is more complicated, but it is dynamic.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi] pgyori commented on a change in pull request #4481: NIFI-7624: ListenFTP processor

Posted by GitBox <gi...@apache.org>.
pgyori commented on a change in pull request #4481:
URL: https://github.com/apache/nifi/pull/4481#discussion_r483103711



##########
File path: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenFTP.java
##########
@@ -0,0 +1,215 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.standard;
+
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.annotation.lifecycle.OnStopped;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.processor.AbstractSessionFactoryProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSessionFactory;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.processors.standard.ftp.NifiFtpServer;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicReference;
+
+@InputRequirement(Requirement.INPUT_FORBIDDEN)
+@Tags({"ingest", "ftp", "listen"})
+@CapabilityDescription("Starts an FTP Server and listens on a given port to transform incoming files into FlowFiles. "
+        + "The URI of the Service will be ftp://{hostname}:{port}. The default port is 2221.")
+public class ListenFTP extends AbstractSessionFactoryProcessor {
+
+    public static final Relationship RELATIONSHIP_SUCCESS = new Relationship.Builder()
+            .name("success")
+            .description("Relationship for successfully received files")
+            .build();
+
+    public static final PropertyDescriptor BIND_ADDRESS = new PropertyDescriptor.Builder()
+            .name("bind-address")
+            .displayName("Bind Address")
+            .description("The address the FTP server should be bound to. If not provided, the server binds to all available addresses.")
+            .required(false)
+            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .build();
+
+    public static final PropertyDescriptor PORT = new PropertyDescriptor.Builder()
+            .name("listening-port")
+            .displayName("Listening Port")
+            .description("The Port to listen on for incoming connections. On Linux, root privileges are required to use port numbers below 1024.")
+            .required(true)
+            .defaultValue("2221")
+            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .addValidator(StandardValidators.PORT_VALIDATOR)
+            .build();
+
+    public static final PropertyDescriptor USERNAME = new PropertyDescriptor.Builder()
+            .name("username")
+            .displayName("Username")
+            .description("The name of the user that is allowed to log in to the FTP server. " +
+                    "If a username is provided, a password must also be provided. " +
+                    "If no username is specified, anonymous connections will be permitted.")
+            .required(false)
+            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+            .build();
+
+    public static final PropertyDescriptor PASSWORD = new PropertyDescriptor.Builder()
+            .name("password")
+            .displayName("Password")
+            .description("If a Username is specified, then a password must also be specified. " +
+                    "The password provided by the client trying to log in to the FTP server will be checked against this password.")
+            .required(false)
+            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+            .sensitive(true)
+            .build();
+
+    private static final List<PropertyDescriptor> PROPERTIES = Collections.unmodifiableList(Arrays.asList(
+            BIND_ADDRESS,
+            PORT,
+            USERNAME,
+            PASSWORD
+    ));
+
+    private static final Set<Relationship> RELATIONSHIPS = Collections.unmodifiableSet(new HashSet<>(Collections.singletonList(
+            RELATIONSHIP_SUCCESS
+    )));
+
+    private volatile NifiFtpServer ftpServer;
+    private volatile CountDownLatch sessionFactorySetSignal;
+    private final AtomicReference<ProcessSessionFactory> sessionFactory = new AtomicReference<>();
+
+    @Override
+    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return PROPERTIES;
+    }
+
+    @Override
+    public Set<Relationship> getRelationships() {
+        return RELATIONSHIPS;
+    }
+
+    @OnScheduled
+    public void startFtpServer(ProcessContext context) {
+        if (ftpServer == null) {
+            String username = context.getProperty(USERNAME).evaluateAttributeExpressions().getValue();
+            String password = context.getProperty(PASSWORD).evaluateAttributeExpressions().getValue();
+            String bindAddress = context.getProperty(BIND_ADDRESS).evaluateAttributeExpressions().getValue();
+            int port = context.getProperty(PORT).evaluateAttributeExpressions().asInteger();
+
+            try {
+                sessionFactorySetSignal = new CountDownLatch(1);
+                ftpServer = new NifiFtpServer.Builder()
+                        .sessionFactory(sessionFactory)
+                        .sessionFactorySetSignal(sessionFactorySetSignal)
+                        .bindAddress(bindAddress)
+                        .port(port)
+                        .username(username)
+                        .password(password)
+                        .build();
+                ftpServer.start();
+            } catch (ProcessException processException) {
+                getLogger().error(processException.getMessage(), processException);
+                stopFtpServer();
+                throw processException;
+            }
+        } else {
+            getLogger().warn("Ftp server already started.");
+        }
+    }
+
+    @OnStopped
+    public void stopFtpServer() {
+        if (ftpServer != null && !ftpServer.isStopped()) {
+            ftpServer.stop();
+        }
+        ftpServer = null;
+    }
+
+    @Override
+    public void onTrigger(ProcessContext context, ProcessSessionFactory sessionFactory) throws ProcessException {
+        if (this.sessionFactory.compareAndSet(null, sessionFactory)) {
+            sessionFactorySetSignal.countDown();
+        }
+        context.yield();
+    }
+
+    @Override
+    protected Collection<ValidationResult> customValidate(ValidationContext context) {
+        List<ValidationResult> results = new ArrayList<>(2);
+        String username = context.getProperty(USERNAME).evaluateAttributeExpressions().getValue();
+        String password = context.getProperty(PASSWORD).evaluateAttributeExpressions().getValue();
+
+        if ((username == null) && (password != null)) {
+            results.add(usernameOrPasswordIsNull(USERNAME, PASSWORD));
+        } else if ((username != null) && (password == null)) {
+            results.add(usernameOrPasswordIsNull(PASSWORD, USERNAME));
+        } else if ((username != null) && (password != null)) {
+            validateAgainstEmptyString(username, USERNAME, results);
+            validateAgainstEmptyString(password, PASSWORD, results);
+        }
+        return results;
+    }
+
+    private ValidationResult usernameOrPasswordIsNull(PropertyDescriptor nullProperty, PropertyDescriptor nonNullProperty) {
+        String explanation = String.format("'%s' and '%s' should either both be provided or none of them", nullProperty.getDisplayName(), nonNullProperty.getDisplayName());
+        return createValidationResult(nullProperty.getDisplayName(), explanation);
+    }
+
+    private void validateAgainstEmptyString(String propertyValue, PropertyDescriptor property, Collection<ValidationResult> validationResults) {
+        if (propertyValue.isBlank()) {
+            if (propertyValue.isEmpty()) {
+                validationResults.add(propertyIsEmptyString(property));
+            } else {
+                validationResults.add(propertyContainsOnlyWhitespace(property));
+            }
+        }
+    }
+
+    private ValidationResult propertyIsEmptyString(PropertyDescriptor property) {
+        String explanation = String.format("'%s' cannot be an empty string", property.getDisplayName());
+        return createValidationResult(property.getDisplayName(), explanation);
+    }
+
+    private ValidationResult propertyContainsOnlyWhitespace(PropertyDescriptor property) {

Review comment:
       As mentioned in a reply to one of the previous comments, the NON_BLANK_VALIDATOR does this validation, actually.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi] pgyori commented on a change in pull request #4481: NIFI-7624: ListenFTP processor

Posted by GitBox <gi...@apache.org>.
pgyori commented on a change in pull request #4481:
URL: https://github.com/apache/nifi/pull/4481#discussion_r483068199



##########
File path: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenFTP.java
##########
@@ -0,0 +1,215 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.standard;
+
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.annotation.lifecycle.OnStopped;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.processor.AbstractSessionFactoryProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSessionFactory;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.processors.standard.ftp.NifiFtpServer;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicReference;
+
+@InputRequirement(Requirement.INPUT_FORBIDDEN)
+@Tags({"ingest", "ftp", "listen"})
+@CapabilityDescription("Starts an FTP Server and listens on a given port to transform incoming files into FlowFiles. "
+        + "The URI of the Service will be ftp://{hostname}:{port}. The default port is 2221.")
+public class ListenFTP extends AbstractSessionFactoryProcessor {
+
+    public static final Relationship RELATIONSHIP_SUCCESS = new Relationship.Builder()
+            .name("success")
+            .description("Relationship for successfully received files")
+            .build();
+
+    public static final PropertyDescriptor BIND_ADDRESS = new PropertyDescriptor.Builder()
+            .name("bind-address")
+            .displayName("Bind Address")
+            .description("The address the FTP server should be bound to. If not provided, the server binds to all available addresses.")
+            .required(false)
+            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)

Review comment:
       A URI_VALIDATOR might not be the perfect solution here, as it is later used as an InetAddress. But it's a good point. I extended the custom validator for the processor to also validate this property. Thanks!




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi] pgyori commented on a change in pull request #4481: NIFI-7624: ListenFTP processor

Posted by GitBox <gi...@apache.org>.
pgyori commented on a change in pull request #4481:
URL: https://github.com/apache/nifi/pull/4481#discussion_r484541741



##########
File path: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ftp/NifiFtpServer.java
##########
@@ -0,0 +1,275 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.standard.ftp;
+
+import org.apache.ftpserver.ConnectionConfig;
+import org.apache.ftpserver.ConnectionConfigFactory;
+import org.apache.ftpserver.FtpServer;
+import org.apache.ftpserver.FtpServerConfigurationException;
+import org.apache.ftpserver.FtpServerFactory;
+import org.apache.ftpserver.command.Command;
+import org.apache.ftpserver.command.CommandFactory;
+import org.apache.ftpserver.command.CommandFactoryFactory;
+import org.apache.ftpserver.command.impl.ABOR;
+import org.apache.ftpserver.command.impl.AUTH;
+import org.apache.ftpserver.command.impl.CDUP;
+import org.apache.ftpserver.command.impl.CWD;
+import org.apache.ftpserver.command.impl.EPRT;
+import org.apache.ftpserver.command.impl.EPSV;
+import org.apache.ftpserver.command.impl.FEAT;
+import org.apache.ftpserver.command.impl.LIST;
+import org.apache.ftpserver.command.impl.MDTM;
+import org.apache.ftpserver.command.impl.MKD;
+import org.apache.ftpserver.command.impl.MLSD;
+import org.apache.ftpserver.command.impl.MLST;
+import org.apache.ftpserver.command.impl.MODE;
+import org.apache.ftpserver.command.impl.NLST;
+import org.apache.ftpserver.command.impl.NOOP;
+import org.apache.ftpserver.command.impl.OPTS;
+import org.apache.ftpserver.command.impl.PASS;
+import org.apache.ftpserver.command.impl.PASV;
+import org.apache.ftpserver.command.impl.PBSZ;
+import org.apache.ftpserver.command.impl.PORT;
+import org.apache.ftpserver.command.impl.PROT;
+import org.apache.ftpserver.command.impl.PWD;
+import org.apache.ftpserver.command.impl.QUIT;
+import org.apache.ftpserver.command.impl.REIN;
+import org.apache.ftpserver.command.impl.RMD;
+import org.apache.ftpserver.command.impl.SITE;
+import org.apache.ftpserver.command.impl.SITE_DESCUSER;
+import org.apache.ftpserver.command.impl.SITE_HELP;
+import org.apache.ftpserver.command.impl.SITE_STAT;
+import org.apache.ftpserver.command.impl.SITE_WHO;
+import org.apache.ftpserver.command.impl.SITE_ZONE;
+import org.apache.ftpserver.command.impl.SIZE;
+import org.apache.ftpserver.command.impl.STAT;
+import org.apache.ftpserver.command.impl.STRU;
+import org.apache.ftpserver.command.impl.SYST;
+import org.apache.ftpserver.command.impl.TYPE;
+import org.apache.ftpserver.command.impl.USER;
+import org.apache.ftpserver.ftplet.Authority;
+import org.apache.ftpserver.ftplet.User;
+import org.apache.ftpserver.listener.Listener;
+import org.apache.ftpserver.listener.ListenerFactory;
+import org.apache.ftpserver.usermanager.impl.BaseUser;
+import org.apache.ftpserver.usermanager.impl.WritePermission;
+import org.apache.nifi.processor.ProcessSessionFactory;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processors.standard.ftp.commands.FtpCommandHELP;
+import org.apache.nifi.processors.standard.ftp.commands.FtpCommandSTOR;
+import org.apache.nifi.processors.standard.ftp.commands.NotSupportedCommand;
+import org.apache.nifi.processors.standard.ftp.filesystem.DefaultVirtualFileSystem;
+import org.apache.nifi.processors.standard.ftp.filesystem.VirtualFileSystem;
+import org.apache.nifi.processors.standard.ftp.filesystem.VirtualFileSystemFactory;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicReference;
+
+public class NifiFtpServer {
+
+    private final Map<String, Command> commandMap = new HashMap<>();
+    private final FtpCommandHELP customHelpCommand = new FtpCommandHELP();
+
+    private final FtpServer server;

Review comment:
       That's right, thanks! A CommandMapFactory has been created to take care of assembling the command map. Moved most of the preparation steps to the Builder to make the NifiFtpServer cleaner.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi] pvillard31 commented on pull request #4481: NIFI-7624: ListenFTP processor

Posted by GitBox <gi...@apache.org>.
pvillard31 commented on pull request #4481:
URL: https://github.com/apache/nifi/pull/4481#issuecomment-694146447


   Looked at the changes to support FTPS, it looks good to me. Merging to the main branch, thanks @pgyori and thanks @simonbence for the extensive review.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi] pgyori commented on a change in pull request #4481: NIFI-7624: ListenFTP processor

Posted by GitBox <gi...@apache.org>.
pgyori commented on a change in pull request #4481:
URL: https://github.com/apache/nifi/pull/4481#discussion_r485790932



##########
File path: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ftp/filesystem/VirtualFtpFile.java
##########
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.standard.ftp.filesystem;
+
+import org.apache.ftpserver.ftplet.FtpFile;
+
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.Calendar;
+import java.util.List;
+
+public class VirtualFtpFile implements FtpFile {
+
+    private final VirtualPath path;
+    private final VirtualFileSystem fileSystem;
+
+    public VirtualFtpFile(VirtualPath path, VirtualFileSystem fileSystem) throws IllegalArgumentException {
+        if (path == null || fileSystem == null) {
+            throw new IllegalArgumentException("File path and fileSystem cannot be null");
+        }
+        this.path = path;
+        this.fileSystem = fileSystem;
+    }
+
+    @Override
+    public String getAbsolutePath() {
+        return path.toString();
+    }
+
+    @Override
+    public String getName() {
+        return path.getFileName();
+    }
+
+    @Override
+    public boolean isHidden() {
+        return false;
+    }
+
+    @Override
+    public boolean isDirectory() {
+        return true; // Only directories are handled since files are converted into flowfiles immediately.
+    }
+
+    @Override
+    public boolean isFile() {
+        return false; // Only directories are handled since files are converted into flowfiles immediately.
+    }
+
+    @Override
+    public boolean doesExist() {
+        return fileSystem.exists(path);
+    }
+
+    @Override
+    public boolean isReadable() {
+        return true;
+    }
+
+    @Override
+    public boolean isWritable() {
+        return true;
+    }
+
+    @Override
+    public boolean isRemovable() {
+        return true; //Every virtual directory can be deleted
+    }
+
+    @Override
+    public String getOwnerName() {
+        return "Owner";
+    }
+
+    @Override
+    public String getGroupName() {
+        return "Group";
+    }
+
+    @Override
+    public int getLinkCount() {
+        return 1;
+    }
+
+    @Override
+    public long getLastModified() {
+        return Calendar.getInstance().getTimeInMillis();
+    }
+
+    @Override
+    public boolean setLastModified(long l) throws UnsupportedOperationException {
+        throw new UnsupportedOperationException("VirtualFtpFile.setLastModified()");
+    }
+
+    @Override
+    public long getSize() {
+        return 0;
+    }
+
+    @Override
+    public Object getPhysicalFile() throws UnsupportedOperationException {
+        throw new UnsupportedOperationException("VirtualFtpFile.getPhysicalFile()");
+    }
+
+    @Override
+    public boolean mkdir() {
+        return fileSystem.mkdir(path);
+    }
+
+    @Override
+    public boolean delete() {
+        return fileSystem.delete(path);
+    }
+
+    @Override
+    public boolean move(FtpFile ftpFile) throws UnsupportedOperationException {
+        throw new UnsupportedOperationException("VirtualFtpFile.move()");
+    }
+
+    @Override
+    public List<? extends FtpFile> listFiles() {
+        List<VirtualPath> paths = fileSystem.listChildren(path);
+        List<VirtualFtpFile> files = new ArrayList<>();
+        for (VirtualPath path : paths) {
+            files.add(new VirtualFtpFile(path, fileSystem));
+        }
+        return files;
+    }
+
+    @Override
+    public OutputStream createOutputStream(long l) throws UnsupportedOperationException {
+        throw new UnsupportedOperationException("VirtualFtpFile.createOutputStream()");
+    }
+
+    @Override
+    public InputStream createInputStream(long l) throws UnsupportedOperationException {
+        throw new UnsupportedOperationException("VirtualFtpFile.createInputStream()");
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (o == this) {
+            return true;
+        }
+        if (!(o instanceof VirtualFtpFile)) {
+            return false;
+        }
+        VirtualFtpFile other = (VirtualFtpFile) o;
+        return path.equals(other.path);

Review comment:
       That's right, fileSystem should also be considered. Thanks!




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi] pgyori commented on a change in pull request #4481: NIFI-7624: ListenFTP processor

Posted by GitBox <gi...@apache.org>.
pgyori commented on a change in pull request #4481:
URL: https://github.com/apache/nifi/pull/4481#discussion_r485845732



##########
File path: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ftp/filesystem/VirtualPath.java
##########
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.standard.ftp.filesystem;
+
+import java.io.File;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+
+public class VirtualPath {
+
+    private final Path path; // always normalized
+
+    public VirtualPath(String path) {
+        String absolutePath = "/" + normalizeSeparator(path);

Review comment:
       That's right, '/something' and '//something' not being considered identical was a bug. Fixed it. Thanks!




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi] asfgit closed pull request #4481: NIFI-7624: ListenFTP processor

Posted by GitBox <gi...@apache.org>.
asfgit closed pull request #4481:
URL: https://github.com/apache/nifi/pull/4481


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi] pgyori commented on pull request #4481: NIFI-7624: ListenFTP processor

Posted by GitBox <gi...@apache.org>.
pgyori commented on pull request #4481:
URL: https://github.com/apache/nifi/pull/4481#issuecomment-683974569


   @pvillard31 , I committed the changes that enable using SSL, with an SSL context service.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org