You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ma...@apache.org on 2015/10/25 16:17:40 UTC

[1/5] nifi git commit: NIFI-673: Added sftp.listing.user attribute to FlowFiles created by ListSFTP; ensure that FetchSFTP indicates that the username supports Expression Language

Repository: nifi
Updated Branches:
  refs/heads/master 8a8006085 -> 385bfbb2c


NIFI-673: Added sftp.listing.user attribute to FlowFiles created by ListSFTP; ensure that FetchSFTP indicates that the username supports Expression Language


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/4e382880
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/4e382880
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/4e382880

Branch: refs/heads/master
Commit: 4e38288062f33ecafdfd3b4bc453bdffe54e752e
Parents: b0322d9
Author: Mark Payne <ma...@hotmail.com>
Authored: Fri Oct 23 16:13:52 2015 -0400
Committer: Mark Payne <ma...@hotmail.com>
Committed: Sun Oct 25 11:13:02 2015 -0400

----------------------------------------------------------------------
 .../apache/nifi/processors/standard/FetchFileTransfer.java   | 7 +++++++
 .../java/org/apache/nifi/processors/standard/FetchSFTP.java  | 2 +-
 .../apache/nifi/processors/standard/ListFileTransfer.java    | 8 ++++++++
 .../java/org/apache/nifi/processors/standard/ListSFTP.java   | 3 ++-
 4 files changed, 18 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/4e382880/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/FetchFileTransfer.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/FetchFileTransfer.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/FetchFileTransfer.java
index ab0be78..a405afb 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/FetchFileTransfer.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/FetchFileTransfer.java
@@ -77,6 +77,13 @@ public abstract class FetchFileTransfer extends AbstractProcessor {
         .expressionLanguageSupported(true)
         .required(true)
         .build();
+    public static final PropertyDescriptor USERNAME = new PropertyDescriptor.Builder()
+        .name("Username")
+        .description("Username")
+        .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+        .expressionLanguageSupported(true)
+        .required(true)
+        .build();
     public static final PropertyDescriptor REMOTE_FILENAME = new PropertyDescriptor.Builder()
         .name("Remote File")
         .description("The fully qualified filename on the remote system")

http://git-wip-us.apache.org/repos/asf/nifi/blob/4e382880/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/FetchSFTP.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/FetchSFTP.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/FetchSFTP.java
index ad81c83..b95d864 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/FetchSFTP.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/FetchSFTP.java
@@ -53,7 +53,7 @@ public class FetchSFTP extends FetchFileTransfer {
         final List<PropertyDescriptor> properties = new ArrayList<>();
         properties.add(HOSTNAME);
         properties.add(port);
-        properties.add(SFTPTransfer.USERNAME);
+        properties.add(USERNAME);
         properties.add(SFTPTransfer.PASSWORD);
         properties.add(SFTPTransfer.PRIVATE_KEY_PATH);
         properties.add(SFTPTransfer.PRIVATE_KEY_PASSPHRASE);

http://git-wip-us.apache.org/repos/asf/nifi/blob/4e382880/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListFileTransfer.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListFileTransfer.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListFileTransfer.java
index 1176fd0..b6c8c28 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListFileTransfer.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListFileTransfer.java
@@ -45,6 +45,13 @@ public abstract class ListFileTransfer extends AbstractListProcessor<FileInfo> {
         .expressionLanguageSupported(true)
         .required(true)
         .build();
+    public static final PropertyDescriptor USERNAME = new PropertyDescriptor.Builder()
+        .name("Username")
+        .description("Username")
+        .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+        .expressionLanguageSupported(true)
+        .required(true)
+        .build();
     public static final PropertyDescriptor REMOTE_PATH = new PropertyDescriptor.Builder()
         .name("Remote Path")
         .description("The path on the remote system from which to pull or push files")
@@ -64,6 +71,7 @@ public abstract class ListFileTransfer extends AbstractListProcessor<FileInfo> {
         attributes.put("file.group", fileInfo.getGroup());
         attributes.put("file.permissions", fileInfo.getPermissions());
         attributes.put(CoreAttributes.FILENAME.key(), fileInfo.getFileName());
+        attributes.put(getProtocolName() + ".listing.user", context.getProperty(USERNAME).evaluateAttributeExpressions().getValue());
 
         final String fullPath = fileInfo.getFullPathFileName();
         if (fullPath != null) {

http://git-wip-us.apache.org/repos/asf/nifi/blob/4e382880/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListSFTP.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListSFTP.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListSFTP.java
index 925e5f8..7226263 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListSFTP.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListSFTP.java
@@ -39,6 +39,7 @@ import org.apache.nifi.processors.standard.util.SFTPTransfer;
 @WritesAttributes({
     @WritesAttribute(attribute = "sftp.remote.host", description = "The hostname of the SFTP Server"),
     @WritesAttribute(attribute = "sftp.remote.port", description = "The port that was connected to on the SFTP Server"),
+    @WritesAttribute(attribute = "sftp.listing.user", description = "The username of the user that performed the SFTP Listing"),
     @WritesAttribute(attribute = "file.owner", description = "The numeric owner id of the source file"),
     @WritesAttribute(attribute = "file.group", description = "The numeric group id of the source file"),
     @WritesAttribute(attribute = "file.permissions", description = "The read/write/execute permissions of the source file"),
@@ -54,7 +55,7 @@ public class ListSFTP extends ListFileTransfer {
         final List<PropertyDescriptor> properties = new ArrayList<>();
         properties.add(HOSTNAME);
         properties.add(port);
-        properties.add(SFTPTransfer.USERNAME);
+        properties.add(USERNAME);
         properties.add(SFTPTransfer.PASSWORD);
         properties.add(SFTPTransfer.PRIVATE_KEY_PATH);
         properties.add(SFTPTransfer.PRIVATE_KEY_PASSPHRASE);


[2/5] nifi git commit: NIFI-673: Added Completion Strategy to FetchSFTP

Posted by ma...@apache.org.
NIFI-673: Added Completion Strategy to FetchSFTP


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/b0322d9f
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/b0322d9f
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/b0322d9f

Branch: refs/heads/master
Commit: b0322d9ffe8d117aae4faf7dd3e2881a28940f96
Parents: d1d5793
Author: Mark Payne <ma...@hotmail.com>
Authored: Mon Oct 5 16:11:40 2015 -0400
Committer: Mark Payne <ma...@hotmail.com>
Committed: Sun Oct 25 11:13:02 2015 -0400

----------------------------------------------------------------------
 .../standard/AbstractListProcessor.java         |  38 +++---
 .../processors/standard/FetchFileTransfer.java  |  66 ++++++++--
 .../nifi/processors/standard/FetchSFTP.java     |  15 ++-
 .../processors/standard/ListFileTransfer.java   |   8 ++
 .../nifi/processors/standard/ListSFTP.java      |   7 +-
 .../processors/standard/util/EntityListing.java |   2 +-
 .../processors/standard/util/FTPTransfer.java   |  26 ++--
 .../processors/standard/util/FileTransfer.java  |   2 +
 .../processors/standard/util/SFTPTransfer.java  |  17 +++
 .../standard/TestFetchFileTransfer.java         | 131 +++++++++++++++++++
 10 files changed, 265 insertions(+), 47 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/b0322d9f/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractListProcessor.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractListProcessor.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractListProcessor.java
index 8a7fade..e592483 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractListProcessor.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractListProcessor.java
@@ -70,23 +70,25 @@ import org.codehaus.jackson.map.ObjectMapper;
  *
  * <p>
  * In order to make use of this abstract class, the entities listed must meet the following criteria:
- * <ul>
- * <li>
- * Entity must have a timestamp associated with it. This timestamp is used to determine if entities are "new" or not. Any entity that is
- * returned by the listing will be considered "new" if the timestamp is later than the latest timestamp pulled.
- * </li>
- * <li>
- * Entity must have a unique identifier. This is used in conjunction with the timestamp in order to determine whether or not the entity is
- * new. If the timestamp of an entity is before the latest timestamp pulled, then the entity is not considered new. If the timestamp is later
- * than the last timestamp pulled, then the entity is considered new. If the timestamp is equal to the latest timestamp pulled, then the entity's
- * identifier is compared to all of the entity identifiers that have that same timestamp in order to determine whether or not the entity has been
- * seen already.
- * </li>
- * <li>
- * Entity must have a user-readable name that can be used for logging purposes.
- * </li>
  * </p>
  *
+ * <ul>
+ *  <li>
+ *      Entity must have a timestamp associated with it. This timestamp is used to determine if entities are "new" or not. Any entity that is
+ *      returned by the listing will be considered "new" if the timestamp is later than the latest timestamp pulled.
+ *  </li>
+ *  <li>
+ *      Entity must have a unique identifier. This is used in conjunction with the timestamp in order to determine whether or not the entity is
+ *      new. If the timestamp of an entity is before the latest timestamp pulled, then the entity is not considered new. If the timestamp is later
+ *      than the last timestamp pulled, then the entity is considered new. If the timestamp is equal to the latest timestamp pulled, then the entity's
+ *      identifier is compared to all of the entity identifiers that have that same timestamp in order to determine whether or not the entity has been
+ *      seen already.
+ *  </li>
+ *  <li>
+ *      Entity must have a user-readable name that can be used for logging purposes.
+ *  </li>
+ * </ul>
+ *
  * <p>
  * This class persists state across restarts so that even if NiFi is restarted, duplicates will not be pulled from the remote system. This is performed using
  * two different mechanisms. First, state is stored locally. This allows the system to be restarted and begin processing where it left off. The state that is
@@ -111,6 +113,7 @@ import org.codehaus.jackson.map.ObjectMapper;
  *
  * <p>
  * Subclasses are responsible for the following:
+ * </p>
  *
  * <ul>
  * <li>
@@ -134,7 +137,6 @@ import org.codehaus.jackson.map.ObjectMapper;
  * a boolean indicating whether or not a change in the value of the provided property should trigger the timestamp and identifier information to be cleared.
  * </li>
  * </ul>
- * </p>
  */
 @TriggerSerially
 public abstract class AbstractListProcessor<T extends ListableEntity> extends AbstractProcessor {
@@ -372,8 +374,8 @@ public abstract class AbstractListProcessor<T extends ListableEntity> extends Ab
         int listCount = 0;
         Long latestListingTimestamp = null;
         for (final T entity : entityList) {
-            final boolean list = (minTimestamp == null || entity.getTimestamp() > minTimestamp ||
-                (entity.getTimestamp() == minTimestamp && !latestIdentifiersListed.contains(entity.getIdentifier())));
+            final boolean list = (minTimestamp == null || entity.getTimestamp() > minTimestamp
+                || (entity.getTimestamp() == minTimestamp && !latestIdentifiersListed.contains(entity.getIdentifier())));
 
             // Create the FlowFile for this path.
             if (list) {

http://git-wip-us.apache.org/repos/asf/nifi/blob/b0322d9f/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/FetchFileTransfer.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/FetchFileTransfer.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/FetchFileTransfer.java
index 5eecac3..ab0be78 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/FetchFileTransfer.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/FetchFileTransfer.java
@@ -31,7 +31,9 @@ import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.TimeUnit;
 
+import org.apache.commons.lang3.StringUtils;
 import org.apache.nifi.annotation.lifecycle.OnStopped;
+import org.apache.nifi.components.AllowableValue;
 import org.apache.nifi.components.PropertyDescriptor;
 import org.apache.nifi.flowfile.FlowFile;
 import org.apache.nifi.flowfile.attributes.CoreAttributes;
@@ -49,9 +51,18 @@ import org.apache.nifi.util.StopWatch;
 import org.apache.nifi.util.Tuple;
 
 /**
- * A base class for FetchSFTP, FetchFTP processors
+ * A base class for FetchSFTP, FetchFTP processors.
+ *
+ * Note that implementations of this class should never use the @SupportsBatching annotation! Doing so
+ * could result in data loss!
  */
 public abstract class FetchFileTransfer extends AbstractProcessor {
+
+    static final AllowableValue COMPLETION_NONE = new AllowableValue("None", "None", "Leave the file as-is");
+    static final AllowableValue COMPLETION_MOVE = new AllowableValue("Move File", "Move File", "Move the file to the directory specified by the <Move Destination Directory> property");
+    static final AllowableValue COMPLETION_DELETE = new AllowableValue("Delete File", "Delete File", "Deletes the original file from the remote system");
+
+
     static final PropertyDescriptor HOSTNAME = new PropertyDescriptor.Builder()
         .name("Hostname")
         .description("The fully-qualified hostname or IP address of the host to fetch the data from")
@@ -73,13 +84,25 @@ public abstract class FetchFileTransfer extends AbstractProcessor {
         .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
         .expressionLanguageSupported(true)
         .build();
-    public static final PropertyDescriptor DELETE_ORIGINAL = new PropertyDescriptor.Builder()
-        .name("Delete Original")
-        .description("Determines whether or not the file is deleted from the remote system after it has been successfully transferred")
-        .defaultValue("true")
-        .allowableValues("true", "false")
+    static final PropertyDescriptor COMPLETION_STRATEGY = new PropertyDescriptor.Builder()
+        .name("Completion Strategy")
+        .description("Specifies what to do with the original file on the server once it has been pulled into NiFi. If the Completion Strategy fails, a warning will be "
+            + "logged but the data will still be transferred.")
+        .expressionLanguageSupported(false)
+        .allowableValues(COMPLETION_NONE, COMPLETION_MOVE, COMPLETION_DELETE)
+        .defaultValue(COMPLETION_NONE.getValue())
         .required(true)
         .build();
+    static final PropertyDescriptor MOVE_DESTINATION_DIR = new PropertyDescriptor.Builder()
+        .name("Move Destination Directory")
+        .description("The directory on the remote server to the move the original file to once it has been ingested into NiFi. "
+            + "This property is ignored unless the Completion Strategy is set to \"Move File\". The specified directory must already exist on"
+            + "the remote system, or the rename will fail.")
+        .expressionLanguageSupported(true)
+        .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+        .required(false)
+        .build();
+
 
     public static final Relationship REL_SUCCESS = new Relationship.Builder()
         .name("success")
@@ -156,7 +179,8 @@ public abstract class FetchFileTransfer extends AbstractProcessor {
         properties.add(HOSTNAME);
         properties.add(UNDEFAULTED_PORT);
         properties.add(REMOTE_FILENAME);
-        properties.add(DELETE_ORIGINAL);
+        properties.add(COMPLETION_STRATEGY);
+        properties.add(MOVE_DESTINATION_DIR);
         return properties;
     }
 
@@ -203,6 +227,7 @@ public abstract class FetchFileTransfer extends AbstractProcessor {
         final InputStream in;
         try {
             in = transfer.getInputStream(filename, flowFile);
+
             flowFile = session.write(flowFile, new OutputStreamCallback() {
                 @Override
                 public void process(final OutputStream out) throws IOException {
@@ -250,15 +275,34 @@ public abstract class FetchFileTransfer extends AbstractProcessor {
             stopWatch.getElapsed(TimeUnit.MILLISECONDS));
         session.transfer(flowFile, REL_SUCCESS);
 
-        // delete remote file is necessary
-        final boolean deleteOriginal = context.getProperty(DELETE_ORIGINAL).asBoolean();
-        if (deleteOriginal) {
+        // it is critical that we commit the session before moving/deleting the remote file. Otherwise, we could have a situation where
+        // we ingest the data, delete/move the remote file, and then NiFi dies/is shut down before the session is committed. This would
+        // result in data loss! If we commit the session first, we are safe.
+        session.commit();
+
+        final String completionStrategy = context.getProperty(COMPLETION_STRATEGY).getValue();
+        if (COMPLETION_DELETE.getValue().equalsIgnoreCase(completionStrategy)) {
             try {
                 transfer.deleteFile(null, filename);
             } catch (final FileNotFoundException e) {
                 // file doesn't exist -- effectively the same as removing it. Move on.
             } catch (final IOException ioe) {
-                getLogger().warn("Successfully fetched the content for {} from {}:{}{} but failed to remove the remote file due to {}", new Object[] {flowFile, host, port, filename, ioe}, ioe);
+                getLogger().warn("Successfully fetched the content for {} from {}:{}{} but failed to remove the remote file due to {}",
+                    new Object[] {flowFile, host, port, filename, ioe}, ioe);
+            }
+        } else if (COMPLETION_MOVE.getValue().equalsIgnoreCase(completionStrategy)) {
+            String targetDir = context.getProperty(MOVE_DESTINATION_DIR).evaluateAttributeExpressions(flowFile).getValue();
+            if (!targetDir.endsWith("/")) {
+                targetDir = targetDir + "/";
+            }
+            final String simpleFilename = StringUtils.substringAfterLast(filename, "/");
+            final String target = targetDir + simpleFilename;
+
+            try {
+                transfer.rename(filename, target);
+            } catch (final IOException ioe) {
+                getLogger().warn("Successfully fetched the content for {} from {}:{}{} but failed to rename the remote file due to {}",
+                    new Object[] {flowFile, host, port, filename, ioe}, ioe);
             }
         }
     }

http://git-wip-us.apache.org/repos/asf/nifi/blob/b0322d9f/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/FetchSFTP.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/FetchSFTP.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/FetchSFTP.java
index 6387e19..ad81c83 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/FetchSFTP.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/FetchSFTP.java
@@ -22,7 +22,6 @@ import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
 
-import org.apache.nifi.annotation.behavior.SupportsBatching;
 import org.apache.nifi.annotation.behavior.WritesAttribute;
 import org.apache.nifi.annotation.behavior.WritesAttributes;
 import org.apache.nifi.annotation.documentation.CapabilityDescription;
@@ -35,8 +34,7 @@ import org.apache.nifi.processor.ProcessContext;
 import org.apache.nifi.processors.standard.util.FileTransfer;
 import org.apache.nifi.processors.standard.util.SFTPTransfer;
 
-
-@SupportsBatching
+// Note that we do not use @SupportsBatching annotation. This processor cannot support batching because it must ensure that session commits happen before remote files are deleted.
 @Tags({"sftp", "get", "retrieve", "files", "fetch", "remote", "ingest", "source", "input"})
 @CapabilityDescription("Fetches the content of a file from a remote SFTP server and overwrites the contents of an incoming FlowFile with the content of the remote file.")
 @SeeAlso({GetSFTP.class, PutSFTP.class, GetFTP.class, PutFTP.class})
@@ -50,15 +48,18 @@ public class FetchSFTP extends FetchFileTransfer {
 
     @Override
     protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        final PropertyDescriptor port = new PropertyDescriptor.Builder().fromPropertyDescriptor(UNDEFAULTED_PORT).defaultValue("22").build();
+
         final List<PropertyDescriptor> properties = new ArrayList<>();
-        properties.add(FetchFileTransfer.HOSTNAME);
-        properties.add(SFTPTransfer.PORT);
+        properties.add(HOSTNAME);
+        properties.add(port);
         properties.add(SFTPTransfer.USERNAME);
         properties.add(SFTPTransfer.PASSWORD);
         properties.add(SFTPTransfer.PRIVATE_KEY_PATH);
         properties.add(SFTPTransfer.PRIVATE_KEY_PASSPHRASE);
-        properties.add(FetchFileTransfer.REMOTE_FILENAME);
-        properties.add(SFTPTransfer.DELETE_ORIGINAL);
+        properties.add(REMOTE_FILENAME);
+        properties.add(COMPLETION_STRATEGY);
+        properties.add(MOVE_DESTINATION_DIR);
         properties.add(SFTPTransfer.CONNECTION_TIMEOUT);
         properties.add(SFTPTransfer.DATA_TIMEOUT);
         properties.add(SFTPTransfer.USE_KEEPALIVE_ON_TIMEOUT);

http://git-wip-us.apache.org/repos/asf/nifi/blob/b0322d9f/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListFileTransfer.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListFileTransfer.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListFileTransfer.java
index d6e1cd1..1176fd0 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListFileTransfer.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListFileTransfer.java
@@ -38,6 +38,13 @@ public abstract class ListFileTransfer extends AbstractListProcessor<FileInfo> {
         .required(true)
         .expressionLanguageSupported(true)
         .build();
+    static final PropertyDescriptor UNDEFAULTED_PORT = new PropertyDescriptor.Builder()
+        .name("Port")
+        .description("The port to connect to on the remote host to fetch the data from")
+        .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+        .expressionLanguageSupported(true)
+        .required(true)
+        .build();
     public static final PropertyDescriptor REMOTE_PATH = new PropertyDescriptor.Builder()
         .name("Remote Path")
         .description("The path on the remote system from which to pull or push files")
@@ -52,6 +59,7 @@ public abstract class ListFileTransfer extends AbstractListProcessor<FileInfo> {
     protected Map<String, String> createAttributes(final FileInfo fileInfo, final ProcessContext context) {
         final Map<String, String> attributes = new HashMap<>();
         attributes.put(getProtocolName() + ".remote.host", context.getProperty(HOSTNAME).evaluateAttributeExpressions().getValue());
+        attributes.put(getProtocolName() + ".remote.port", context.getProperty(UNDEFAULTED_PORT).evaluateAttributeExpressions().getValue());
         attributes.put("file.owner", fileInfo.getOwner());
         attributes.put("file.group", fileInfo.getGroup());
         attributes.put("file.permissions", fileInfo.getPermissions());

http://git-wip-us.apache.org/repos/asf/nifi/blob/b0322d9f/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListSFTP.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListSFTP.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListSFTP.java
index 3b6b69e..925e5f8 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListSFTP.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListSFTP.java
@@ -38,6 +38,7 @@ import org.apache.nifi.processors.standard.util.SFTPTransfer;
 @SeeAlso({FetchSFTP.class, GetSFTP.class, PutSFTP.class})
 @WritesAttributes({
     @WritesAttribute(attribute = "sftp.remote.host", description = "The hostname of the SFTP Server"),
+    @WritesAttribute(attribute = "sftp.remote.port", description = "The port that was connected to on the SFTP Server"),
     @WritesAttribute(attribute = "file.owner", description = "The numeric owner id of the source file"),
     @WritesAttribute(attribute = "file.group", description = "The numeric group id of the source file"),
     @WritesAttribute(attribute = "file.permissions", description = "The read/write/execute permissions of the source file"),
@@ -48,9 +49,11 @@ public class ListSFTP extends ListFileTransfer {
 
     @Override
     protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        final PropertyDescriptor port = new PropertyDescriptor.Builder().fromPropertyDescriptor(UNDEFAULTED_PORT).defaultValue("22").build();
+
         final List<PropertyDescriptor> properties = new ArrayList<>();
-        properties.add(SFTPTransfer.HOSTNAME);
-        properties.add(SFTPTransfer.PORT);
+        properties.add(HOSTNAME);
+        properties.add(port);
         properties.add(SFTPTransfer.USERNAME);
         properties.add(SFTPTransfer.PASSWORD);
         properties.add(SFTPTransfer.PRIVATE_KEY_PATH);

http://git-wip-us.apache.org/repos/asf/nifi/blob/b0322d9f/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/EntityListing.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/EntityListing.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/EntityListing.java
index 56489f0..2d9525f 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/EntityListing.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/EntityListing.java
@@ -61,7 +61,7 @@ public class EntityListing {
     /**
      * Sets the Collection containing the identifiers of all entities in the listing whose Timestamp was
      * equal to {@link #getLatestTimestamp()}
-     * 
+     *
      * @param matchingIdentifiers the identifiers that have last modified date matching the latest timestamp
      */
     public void setMatchingIdentifiers(Collection<String> matchingIdentifiers) {

http://git-wip-us.apache.org/repos/asf/nifi/blob/b0322d9f/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/FTPTransfer.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/FTPTransfer.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/FTPTransfer.java
index 7f659d4..a038eb7 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/FTPTransfer.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/FTPTransfer.java
@@ -135,7 +135,7 @@ public class FTPTransfer implements FileTransfer {
                 client.disconnect();
             }
         } catch (final Exception ex) {
-            logger.warn("Failed to close FTPClient due to {}", new Object[] { ex.toString() }, ex);
+            logger.warn("Failed to close FTPClient due to {}", new Object[] {ex.toString()}, ex);
         }
         client = null;
     }
@@ -334,9 +334,9 @@ public class FTPTransfer implements FileTransfer {
         final boolean cdSuccessful = setWorkingDirectory(remoteDirectory);
 
         if (!cdSuccessful) {
-            logger.debug("Remote Directory {} does not exist; creating it", new Object[] { remoteDirectory });
+            logger.debug("Remote Directory {} does not exist; creating it", new Object[] {remoteDirectory});
             if (client.makeDirectory(remoteDirectory)) {
-                logger.debug("Created {}", new Object[] { remoteDirectory });
+                logger.debug("Created {}", new Object[] {remoteDirectory});
             } else {
                 throw new IOException("Failed to create remote directory " + remoteDirectory);
             }
@@ -392,10 +392,10 @@ public class FTPTransfer implements FileTransfer {
                 final String time = outformat.format(fileModifyTime);
                 if (!client.setModificationTime(tempFilename, time)) {
                     // FTP server probably doesn't support MFMT command
-                    logger.warn("Could not set lastModifiedTime on {} to {}", new Object[] { flowFile, lastModifiedTime });
+                    logger.warn("Could not set lastModifiedTime on {} to {}", new Object[] {flowFile, lastModifiedTime});
                 }
             } catch (final Exception e) {
-                logger.error("Failed to set lastModifiedTime on {} to {} due to {}", new Object[] { flowFile, lastModifiedTime, e });
+                logger.error("Failed to set lastModifiedTime on {} to {} due to {}", new Object[] {flowFile, lastModifiedTime, e});
             }
         }
         final String permissions = ctx.getProperty(PERMISSIONS).evaluateAttributeExpressions(flowFile).getValue();
@@ -404,17 +404,17 @@ public class FTPTransfer implements FileTransfer {
                 int perms = numberPermissions(permissions);
                 if (perms >= 0) {
                     if (!client.sendSiteCommand("chmod " + Integer.toOctalString(perms) + " " + tempFilename)) {
-                        logger.warn("Could not set permission on {} to {}", new Object[] { flowFile, permissions });
+                        logger.warn("Could not set permission on {} to {}", new Object[] {flowFile, permissions});
                     }
                 }
             } catch (final Exception e) {
-                logger.error("Failed to set permission on {} to {} due to {}", new Object[] { flowFile, permissions, e });
+                logger.error("Failed to set permission on {} to {} due to {}", new Object[] {flowFile, permissions, e});
             }
         }
 
         if (!filename.equals(tempFilename)) {
             try {
-                logger.debug("Renaming remote path from {} to {} for {}", new Object[] { tempFilename, filename, flowFile });
+                logger.debug("Renaming remote path from {} to {} for {}", new Object[] {tempFilename, filename, flowFile});
                 final boolean renameSuccessful = client.rename(tempFilename, filename);
                 if (!renameSuccessful) {
                     throw new IOException("Failed to rename temporary file " + tempFilename + " to " + fullPath + " due to: " + client.getReplyString());
@@ -432,6 +432,16 @@ public class FTPTransfer implements FileTransfer {
         return fullPath;
     }
 
+
+    @Override
+    public void rename(final String source, final String target) throws IOException {
+        final FTPClient client = getClient(null);
+        final boolean renameSuccessful = client.rename(source, target);
+        if (!renameSuccessful) {
+            throw new IOException("Failed to rename temporary file " + source + " to " + target + " due to: " + client.getReplyString());
+        }
+    }
+
     @Override
     public void deleteFile(final String path, final String remoteFileName) throws IOException {
         final FTPClient client = getClient(null);

http://git-wip-us.apache.org/repos/asf/nifi/blob/b0322d9f/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/FileTransfer.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/FileTransfer.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/FileTransfer.java
index fe277df..8d48de2 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/FileTransfer.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/FileTransfer.java
@@ -43,6 +43,8 @@ public interface FileTransfer extends Closeable {
 
     String put(FlowFile flowFile, String path, String filename, InputStream content) throws IOException;
 
+    void rename(String source, String target) throws IOException;
+
     void deleteFile(String path, String remoteFileName) throws IOException;
 
     void deleteDirectory(String remoteDirectoryName) throws IOException;

http://git-wip-us.apache.org/repos/asf/nifi/blob/b0322d9f/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/SFTPTransfer.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/SFTPTransfer.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/SFTPTransfer.java
index c28f275..9bad520 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/SFTPTransfer.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/SFTPTransfer.java
@@ -603,6 +603,23 @@ public class SFTPTransfer implements FileTransfer {
         return fullPath;
     }
 
+    @Override
+    public void rename(final String source, final String target) throws IOException {
+        final ChannelSftp sftp = getChannel(null);
+        try {
+            sftp.rename(source, target);
+        } catch (final SftpException e) {
+            switch (e.id) {
+                case ChannelSftp.SSH_FX_NO_SUCH_FILE:
+                    throw new FileNotFoundException();
+                case ChannelSftp.SSH_FX_PERMISSION_DENIED:
+                    throw new PermissionDeniedException("Could not rename remote file " + source + " to " + target + " due to insufficient permissions");
+                default:
+                    throw new IOException(e);
+            }
+        }
+    }
+
     protected int numberPermissions(String perms) {
         int number = -1;
         final Pattern rwxPattern = Pattern.compile("^[rwx-]{9}$");

http://git-wip-us.apache.org/repos/asf/nifi/blob/b0322d9f/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestFetchFileTransfer.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestFetchFileTransfer.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestFetchFileTransfer.java
index 7aa8f9c..4175a77 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestFetchFileTransfer.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestFetchFileTransfer.java
@@ -17,7 +17,9 @@
 
 package org.apache.nifi.processors.standard;
 
+import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
 
 import java.io.ByteArrayInputStream;
 import java.io.File;
@@ -92,8 +94,119 @@ public class TestFetchFileTransfer {
         runner.assertAllFlowFilesTransferred(FetchFileTransfer.REL_PERMISSION_DENIED, 1);
     }
 
+
+    @Test
+    public void testMoveFileWithNoTrailingSlashDirName() {
+        final TestableFetchFileTransfer proc = new TestableFetchFileTransfer();
+        final TestRunner runner = TestRunners.newTestRunner(proc);
+        runner.setProperty(FetchFileTransfer.HOSTNAME, "localhost");
+        runner.setProperty(FetchFileTransfer.UNDEFAULTED_PORT, "11");
+        runner.setProperty(FetchFileTransfer.REMOTE_FILENAME, "${filename}");
+        runner.setProperty(FetchFileTransfer.COMPLETION_STRATEGY, FetchFileTransfer.COMPLETION_MOVE.getValue());
+        runner.setProperty(FetchFileTransfer.MOVE_DESTINATION_DIR, "/moved");
+
+        proc.addContent("hello.txt", "world".getBytes());
+        final Map<String, String> attrs = new HashMap<>();
+        attrs.put("filename", "hello.txt");
+        runner.enqueue(new byte[0], attrs);
+
+        runner.run(1, false, false);
+        runner.assertAllFlowFilesTransferred(FetchFileTransfer.REL_SUCCESS, 1);
+
+        proc.fileContents.containsKey("/moved/hello.txt");
+        assertEquals(1, proc.fileContents.size());
+    }
+
+    @Test
+    public void testMoveFileWithTrailingSlashDirName() {
+        final TestableFetchFileTransfer proc = new TestableFetchFileTransfer();
+        final TestRunner runner = TestRunners.newTestRunner(proc);
+        runner.setProperty(FetchFileTransfer.HOSTNAME, "localhost");
+        runner.setProperty(FetchFileTransfer.UNDEFAULTED_PORT, "11");
+        runner.setProperty(FetchFileTransfer.REMOTE_FILENAME, "${filename}");
+        runner.setProperty(FetchFileTransfer.COMPLETION_STRATEGY, FetchFileTransfer.COMPLETION_MOVE.getValue());
+        runner.setProperty(FetchFileTransfer.MOVE_DESTINATION_DIR, "/moved/");
+
+        proc.addContent("hello.txt", "world".getBytes());
+        final Map<String, String> attrs = new HashMap<>();
+        attrs.put("filename", "hello.txt");
+        runner.enqueue(new byte[0], attrs);
+
+        runner.run(1, false, false);
+        runner.assertAllFlowFilesTransferred(FetchFileTransfer.REL_SUCCESS, 1);
+
+        proc.fileContents.containsKey("/moved/hello.txt");
+        assertEquals(1, proc.fileContents.size());
+    }
+
+    @Test
+    public void testDeleteFile() {
+        final TestableFetchFileTransfer proc = new TestableFetchFileTransfer();
+        final TestRunner runner = TestRunners.newTestRunner(proc);
+        runner.setProperty(FetchFileTransfer.HOSTNAME, "localhost");
+        runner.setProperty(FetchFileTransfer.UNDEFAULTED_PORT, "11");
+        runner.setProperty(FetchFileTransfer.REMOTE_FILENAME, "${filename}");
+        runner.setProperty(FetchFileTransfer.COMPLETION_STRATEGY, FetchFileTransfer.COMPLETION_DELETE.getValue());
+
+        proc.addContent("hello.txt", "world".getBytes());
+        final Map<String, String> attrs = new HashMap<>();
+        attrs.put("filename", "hello.txt");
+        runner.enqueue(new byte[0], attrs);
+
+        runner.run(1, false, false);
+        runner.assertAllFlowFilesTransferred(FetchFileTransfer.REL_SUCCESS, 1);
+        assertTrue(proc.fileContents.isEmpty());
+    }
+
+    @Test
+    public void testDeleteFails() {
+        final TestableFetchFileTransfer proc = new TestableFetchFileTransfer();
+        final TestRunner runner = TestRunners.newTestRunner(proc);
+        runner.setProperty(FetchFileTransfer.HOSTNAME, "localhost");
+        runner.setProperty(FetchFileTransfer.UNDEFAULTED_PORT, "11");
+        runner.setProperty(FetchFileTransfer.REMOTE_FILENAME, "${filename}");
+        runner.setProperty(FetchFileTransfer.COMPLETION_STRATEGY, FetchFileTransfer.COMPLETION_DELETE.getValue());
+
+        proc.addContent("hello.txt", "world".getBytes());
+        final Map<String, String> attrs = new HashMap<>();
+        attrs.put("filename", "hello.txt");
+        runner.enqueue(new byte[0], attrs);
+        proc.allowDelete = false;
+
+        runner.run(1, false, false);
+        runner.assertAllFlowFilesTransferred(FetchFileTransfer.REL_SUCCESS, 1);
+        assertFalse(proc.fileContents.isEmpty());
+    }
+
+    @Test
+    public void testRenameFails() {
+        final TestableFetchFileTransfer proc = new TestableFetchFileTransfer();
+        final TestRunner runner = TestRunners.newTestRunner(proc);
+        runner.setProperty(FetchFileTransfer.HOSTNAME, "localhost");
+        runner.setProperty(FetchFileTransfer.UNDEFAULTED_PORT, "11");
+        runner.setProperty(FetchFileTransfer.REMOTE_FILENAME, "${filename}");
+        runner.setProperty(FetchFileTransfer.COMPLETION_STRATEGY, FetchFileTransfer.COMPLETION_MOVE.getValue());
+        runner.setProperty(FetchFileTransfer.MOVE_DESTINATION_DIR, "/moved/");
+
+        proc.addContent("hello.txt", "world".getBytes());
+        final Map<String, String> attrs = new HashMap<>();
+        attrs.put("filename", "hello.txt");
+        runner.enqueue(new byte[0], attrs);
+        proc.allowDelete = false;
+        proc.allowRename = false;
+
+        runner.run(1, false, false);
+        runner.assertAllFlowFilesTransferred(FetchFileTransfer.REL_SUCCESS, 1);
+        assertEquals(1, proc.fileContents.size());
+
+        assertTrue(proc.fileContents.containsKey("hello.txt"));
+    }
+
+
     private static class TestableFetchFileTransfer extends FetchFileTransfer {
         private boolean allowAccess = true;
+        private boolean allowDelete = true;
+        private boolean allowRename = true;
         private boolean closed = false;
         private final Map<String, byte[]> fileContents = new HashMap<>();
 
@@ -154,6 +267,10 @@ public class TestFetchFileTransfer {
 
                 @Override
                 public void deleteFile(String path, String remoteFileName) throws IOException {
+                    if (!allowDelete) {
+                        throw new PermissionDeniedException("test permission denied");
+                    }
+
                     if (!fileContents.containsKey(remoteFileName)) {
                         throw new FileNotFoundException();
                     }
@@ -162,6 +279,20 @@ public class TestFetchFileTransfer {
                 }
 
                 @Override
+                public void rename(String source, String target) throws IOException {
+                    if (!allowRename) {
+                        throw new PermissionDeniedException("test permission denied");
+                    }
+
+                    if (!fileContents.containsKey(source)) {
+                        throw new FileNotFoundException();
+                    }
+
+                    final byte[] content = fileContents.remove(source);
+                    fileContents.put(target, content);
+                }
+
+                @Override
                 public void deleteDirectory(String remoteDirectoryName) throws IOException {
 
                 }


[5/5] nifi git commit: NIFI-673: Rebased from master; Added InputRequirement annotation, as it is now merged into master

Posted by ma...@apache.org.
NIFI-673: Rebased from master; Added InputRequirement annotation, as it is now merged into master


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/385bfbb2
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/385bfbb2
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/385bfbb2

Branch: refs/heads/master
Commit: 385bfbb2c635f74aca318617175cd57a8e26ecda
Parents: 4e38288
Author: Mark Payne <ma...@hotmail.com>
Authored: Sun Oct 25 11:14:11 2015 -0400
Committer: Mark Payne <ma...@hotmail.com>
Committed: Sun Oct 25 11:14:11 2015 -0400

----------------------------------------------------------------------
 .../main/java/org/apache/nifi/processors/standard/FetchSFTP.java  | 3 +++
 .../main/java/org/apache/nifi/processors/standard/ListSFTP.java   | 3 +++
 2 files changed, 6 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/385bfbb2/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/FetchSFTP.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/FetchSFTP.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/FetchSFTP.java
index b95d864..8379987 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/FetchSFTP.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/FetchSFTP.java
@@ -22,6 +22,8 @@ import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
 
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
 import org.apache.nifi.annotation.behavior.WritesAttribute;
 import org.apache.nifi.annotation.behavior.WritesAttributes;
 import org.apache.nifi.annotation.documentation.CapabilityDescription;
@@ -35,6 +37,7 @@ import org.apache.nifi.processors.standard.util.FileTransfer;
 import org.apache.nifi.processors.standard.util.SFTPTransfer;
 
 // Note that we do not use @SupportsBatching annotation. This processor cannot support batching because it must ensure that session commits happen before remote files are deleted.
+@InputRequirement(Requirement.INPUT_REQUIRED)
 @Tags({"sftp", "get", "retrieve", "files", "fetch", "remote", "ingest", "source", "input"})
 @CapabilityDescription("Fetches the content of a file from a remote SFTP server and overwrites the contents of an incoming FlowFile with the content of the remote file.")
 @SeeAlso({GetSFTP.class, PutSFTP.class, GetFTP.class, PutFTP.class})

http://git-wip-us.apache.org/repos/asf/nifi/blob/385bfbb2/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListSFTP.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListSFTP.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListSFTP.java
index 7226263..609b693 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListSFTP.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListSFTP.java
@@ -20,6 +20,8 @@ package org.apache.nifi.processors.standard;
 import java.util.ArrayList;
 import java.util.List;
 
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
 import org.apache.nifi.annotation.behavior.TriggerSerially;
 import org.apache.nifi.annotation.behavior.WritesAttribute;
 import org.apache.nifi.annotation.behavior.WritesAttributes;
@@ -32,6 +34,7 @@ import org.apache.nifi.processors.standard.util.FileTransfer;
 import org.apache.nifi.processors.standard.util.SFTPTransfer;
 
 @TriggerSerially
+@InputRequirement(Requirement.INPUT_FORBIDDEN)
 @Tags({"list", "sftp", "remote", "ingest", "source", "input", "files"})
 @CapabilityDescription("Performs a listing of the files residing on an SFTP server. For each file that is found on the remote server, a new FlowFile will be created with the filename attribute "
     + "set to the name of the file on the remote server. This can then be used in conjunction with FetchSFTP in order to fetch those files.")


[3/5] nifi git commit: NIFI-673: Initial implementation of ListSFTP, FetchSFTP

Posted by ma...@apache.org.
http://git-wip-us.apache.org/repos/asf/nifi/blob/d1d57931/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/FileTransfer.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/FileTransfer.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/FileTransfer.java
index f0061b8..fe277df 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/FileTransfer.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/FileTransfer.java
@@ -23,6 +23,7 @@ import java.io.InputStream;
 import java.util.List;
 
 import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.Validator;
 import org.apache.nifi.flowfile.FlowFile;
 import org.apache.nifi.processor.util.StandardValidators;
 
@@ -34,6 +35,8 @@ public interface FileTransfer extends Closeable {
 
     InputStream getInputStream(String remoteFileName) throws IOException;
 
+    InputStream getInputStream(String remoteFileName, FlowFile flowFile) throws IOException;
+
     void flush() throws IOException;
 
     FileInfo getRemoteFileInfo(FlowFile flowFile, String path, String remoteFileName) throws IOException;
@@ -51,127 +54,127 @@ public interface FileTransfer extends Closeable {
     void ensureDirectoryExists(FlowFile flowFile, File remoteDirectory) throws IOException;
 
     public static final PropertyDescriptor HOSTNAME = new PropertyDescriptor.Builder()
-            .name("Hostname")
-            .description("The fully qualified hostname or IP address of the remote system")
-            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
-            .required(true)
-            .expressionLanguageSupported(true)
-            .build();
+        .name("Hostname")
+        .description("The fully qualified hostname or IP address of the remote system")
+        .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+        .required(true)
+        .expressionLanguageSupported(true)
+        .build();
     public static final PropertyDescriptor USERNAME = new PropertyDescriptor.Builder()
-            .name("Username")
-            .description("Username")
-            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
-            .required(true)
-            .build();
+        .name("Username")
+        .description("Username")
+        .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+        .required(true)
+        .build();
     public static final PropertyDescriptor PASSWORD = new PropertyDescriptor.Builder()
-            .name("Password")
-            .description("Password for the user account")
-            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
-            .required(false)
-            .sensitive(true)
-            .build();
+        .name("Password")
+        .description("Password for the user account")
+        .addValidator(Validator.VALID)
+        .required(false)
+        .sensitive(true)
+        .build();
     public static final PropertyDescriptor DATA_TIMEOUT = new PropertyDescriptor.Builder()
-            .name("Data Timeout")
-            .description("Amount of time to wait before timing out while transferring data")
-            .required(true)
-            .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
-            .defaultValue("30 sec")
-            .build();
+        .name("Data Timeout")
+        .description("When transferring a file between the local and remote system, this value specifies how long is allowed to elapse without any data being transferred between systems")
+        .required(true)
+        .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
+        .defaultValue("30 sec")
+        .build();
     public static final PropertyDescriptor CONNECTION_TIMEOUT = new PropertyDescriptor.Builder()
-            .name("Connection Timeout")
-            .description("Amount of time to wait before timing out while creating a connection")
-            .required(true)
-            .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
-            .defaultValue("30 sec")
-            .build();
+        .name("Connection Timeout")
+        .description("Amount of time to wait before timing out while creating a connection")
+        .required(true)
+        .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
+        .defaultValue("30 sec")
+        .build();
     public static final PropertyDescriptor REMOTE_PATH = new PropertyDescriptor.Builder()
-            .name("Remote Path")
-            .description("The path on the remote system from which to pull or push files")
-            .required(false)
-            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
-            .expressionLanguageSupported(true)
-            .build();
+        .name("Remote Path")
+        .description("The path on the remote system from which to pull or push files")
+        .required(false)
+        .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+        .expressionLanguageSupported(true)
+        .build();
     public static final PropertyDescriptor CREATE_DIRECTORY = new PropertyDescriptor.Builder()
-            .name("Create Directory")
-            .description("Specifies whether or not the remote directory should be created if it does not exist.")
-            .required(true)
-            .allowableValues("true", "false")
-            .defaultValue("false")
-            .build();
+        .name("Create Directory")
+        .description("Specifies whether or not the remote directory should be created if it does not exist.")
+        .required(true)
+        .allowableValues("true", "false")
+        .defaultValue("false")
+        .build();
 
     public static final PropertyDescriptor USE_COMPRESSION = new PropertyDescriptor.Builder()
-            .name("Use Compression")
-            .description("Indicates whether or not ZLIB compression should be used when transferring files")
-            .allowableValues("true", "false")
-            .defaultValue("false")
-            .required(true)
-            .build();
+        .name("Use Compression")
+        .description("Indicates whether or not ZLIB compression should be used when transferring files")
+        .allowableValues("true", "false")
+        .defaultValue("false")
+        .required(true)
+        .build();
 
     // GET-specific properties
     public static final PropertyDescriptor RECURSIVE_SEARCH = new PropertyDescriptor.Builder()
-            .name("Search Recursively")
-            .description("If true, will pull files from arbitrarily nested subdirectories; otherwise, will not traverse subdirectories")
-            .required(true)
-            .defaultValue("false")
-            .allowableValues("true", "false")
-            .build();
+        .name("Search Recursively")
+        .description("If true, will pull files from arbitrarily nested subdirectories; otherwise, will not traverse subdirectories")
+        .required(true)
+        .defaultValue("false")
+        .allowableValues("true", "false")
+        .build();
     public static final PropertyDescriptor FILE_FILTER_REGEX = new PropertyDescriptor.Builder()
-            .name("File Filter Regex")
-            .description("Provides a Java Regular Expression for filtering Filenames; if a filter is supplied, only files whose names match that Regular Expression will be fetched")
-            .required(false)
-            .addValidator(StandardValidators.REGULAR_EXPRESSION_VALIDATOR)
-            .build();
+        .name("File Filter Regex")
+        .description("Provides a Java Regular Expression for filtering Filenames; if a filter is supplied, only files whose names match that Regular Expression will be fetched")
+        .required(false)
+        .addValidator(StandardValidators.REGULAR_EXPRESSION_VALIDATOR)
+        .build();
     public static final PropertyDescriptor PATH_FILTER_REGEX = new PropertyDescriptor.Builder()
-            .name("Path Filter Regex")
-            .description("When " + RECURSIVE_SEARCH.getName() + " is true, then only subdirectories whose path matches the given Regular Expression will be scanned")
-            .required(false)
-            .addValidator(StandardValidators.REGULAR_EXPRESSION_VALIDATOR)
-            .build();
+        .name("Path Filter Regex")
+        .description("When " + RECURSIVE_SEARCH.getName() + " is true, then only subdirectories whose path matches the given Regular Expression will be scanned")
+        .required(false)
+        .addValidator(StandardValidators.REGULAR_EXPRESSION_VALIDATOR)
+        .build();
     public static final PropertyDescriptor MAX_SELECTS = new PropertyDescriptor.Builder()
-            .name("Max Selects")
-            .description("The maximum number of files to pull in a single connection")
-            .defaultValue("100")
-            .required(true)
-            .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
-            .build();
+        .name("Max Selects")
+        .description("The maximum number of files to pull in a single connection")
+        .defaultValue("100")
+        .required(true)
+        .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
+        .build();
     public static final PropertyDescriptor REMOTE_POLL_BATCH_SIZE = new PropertyDescriptor.Builder()
-            .name("Remote Poll Batch Size")
-            .description("The value specifies how many file paths to find in a given directory on the remote system when doing a file listing. This value "
-                    + "in general should not need to be modified but when polling against a remote system with a tremendous number of files this value can "
-                    + "be critical.  Setting this value too high can result very poor performance and setting it too low can cause the flow to be slower "
-                    + "than normal.")
-            .defaultValue("5000")
-            .addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR)
-            .required(true)
-            .build();
+        .name("Remote Poll Batch Size")
+        .description("The value specifies how many file paths to find in a given directory on the remote system when doing a file listing. This value "
+            + "in general should not need to be modified but when polling against a remote system with a tremendous number of files this value can "
+            + "be critical.  Setting this value too high can result very poor performance and setting it too low can cause the flow to be slower "
+            + "than normal.")
+        .defaultValue("5000")
+        .addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR)
+        .required(true)
+        .build();
     public static final PropertyDescriptor DELETE_ORIGINAL = new PropertyDescriptor.Builder()
-            .name("Delete Original")
-            .description("Determines whether or not the file is deleted from the remote system after it has been successfully transferred")
-            .defaultValue("true")
-            .allowableValues("true", "false")
-            .required(true)
-            .build();
+        .name("Delete Original")
+        .description("Determines whether or not the file is deleted from the remote system after it has been successfully transferred")
+        .defaultValue("true")
+        .allowableValues("true", "false")
+        .required(true)
+        .build();
     public static final PropertyDescriptor POLLING_INTERVAL = new PropertyDescriptor.Builder()
-            .name("Polling Interval")
-            .description("Determines how long to wait between fetching the listing for new files")
-            .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
-            .required(true)
-            .defaultValue("60 sec")
-            .build();
+        .name("Polling Interval")
+        .description("Determines how long to wait between fetching the listing for new files")
+        .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
+        .required(true)
+        .defaultValue("60 sec")
+        .build();
     public static final PropertyDescriptor IGNORE_DOTTED_FILES = new PropertyDescriptor.Builder()
-            .name("Ignore Dotted Files")
-            .description("If true, files whose names begin with a dot (\".\") will be ignored")
-            .allowableValues("true", "false")
-            .defaultValue("true")
-            .required(true)
-            .build();
+        .name("Ignore Dotted Files")
+        .description("If true, files whose names begin with a dot (\".\") will be ignored")
+        .allowableValues("true", "false")
+        .defaultValue("true")
+        .required(true)
+        .build();
     public static final PropertyDescriptor USE_NATURAL_ORDERING = new PropertyDescriptor.Builder()
-            .name("Use Natural Ordering")
-            .description("If true, will pull files in the order in which they are naturally listed; otherwise, the order in which the files will be pulled is not defined")
-            .allowableValues("true", "false")
-            .defaultValue("false")
-            .required(true)
-            .build();
+        .name("Use Natural Ordering")
+        .description("If true, will pull files in the order in which they are naturally listed; otherwise, the order in which the files will be pulled is not defined")
+        .allowableValues("true", "false")
+        .defaultValue("false")
+        .required(true)
+        .build();
 
     // PUT-specific properties
     public static final String FILE_MODIFY_DATE_ATTR_FORMAT = "yyyy-MM-dd'T'HH:mm:ssZ";
@@ -183,77 +186,77 @@ public interface FileTransfer extends Closeable {
     public static final String CONFLICT_RESOLUTION_FAIL = "FAIL";
     public static final String CONFLICT_RESOLUTION_NONE = "NONE";
     public static final PropertyDescriptor CONFLICT_RESOLUTION = new PropertyDescriptor.Builder()
-            .name("Conflict Resolution")
-            .description("Determines how to handle the problem of filename collisions")
-            .required(true)
-            .allowableValues(CONFLICT_RESOLUTION_REPLACE, CONFLICT_RESOLUTION_IGNORE, CONFLICT_RESOLUTION_RENAME, CONFLICT_RESOLUTION_REJECT, CONFLICT_RESOLUTION_FAIL, CONFLICT_RESOLUTION_NONE)
-            .defaultValue(CONFLICT_RESOLUTION_NONE)
-            .build();
+        .name("Conflict Resolution")
+        .description("Determines how to handle the problem of filename collisions")
+        .required(true)
+        .allowableValues(CONFLICT_RESOLUTION_REPLACE, CONFLICT_RESOLUTION_IGNORE, CONFLICT_RESOLUTION_RENAME, CONFLICT_RESOLUTION_REJECT, CONFLICT_RESOLUTION_FAIL, CONFLICT_RESOLUTION_NONE)
+        .defaultValue(CONFLICT_RESOLUTION_NONE)
+        .build();
     public static final PropertyDescriptor REJECT_ZERO_BYTE = new PropertyDescriptor.Builder()
-            .name("Reject Zero-Byte Files")
-            .description("Determines whether or not Zero-byte files should be rejected without attempting to transfer")
-            .allowableValues("true", "false")
-            .defaultValue("true")
-            .build();
+        .name("Reject Zero-Byte Files")
+        .description("Determines whether or not Zero-byte files should be rejected without attempting to transfer")
+        .allowableValues("true", "false")
+        .defaultValue("true")
+        .build();
     public static final PropertyDescriptor DOT_RENAME = new PropertyDescriptor.Builder()
-            .name("Dot Rename")
-            .description("If true, then the filename of the sent file is prepended with a \".\" and then renamed back to the "
-                    + "original once the file is completely sent. Otherwise, there is no rename. This property is ignored if the "
-                    + "Temporary Filename property is set.")
-            .allowableValues("true", "false")
-            .defaultValue("true")
-            .build();
+        .name("Dot Rename")
+        .description("If true, then the filename of the sent file is prepended with a \".\" and then renamed back to the "
+            + "original once the file is completely sent. Otherwise, there is no rename. This property is ignored if the "
+            + "Temporary Filename property is set.")
+        .allowableValues("true", "false")
+        .defaultValue("true")
+        .build();
     public static final PropertyDescriptor TEMP_FILENAME = new PropertyDescriptor.Builder()
-            .name("Temporary Filename")
-            .description("If set, the filename of the sent file will be equal to the value specified during the transfer and after successful "
-                    + "completion will be renamed to the original filename. If this value is set, the Dot Rename property is ignored.")
-            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
-            .expressionLanguageSupported(true)
-            .required(false)
-            .build();
+        .name("Temporary Filename")
+        .description("If set, the filename of the sent file will be equal to the value specified during the transfer and after successful "
+            + "completion will be renamed to the original filename. If this value is set, the Dot Rename property is ignored.")
+        .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+        .expressionLanguageSupported(true)
+        .required(false)
+        .build();
     public static final PropertyDescriptor LAST_MODIFIED_TIME = new PropertyDescriptor.Builder()
-            .name("Last Modified Time")
-            .description("The lastModifiedTime to assign to the file after transferring it. If not set, the lastModifiedTime will not be changed. "
-                    + "Format must be yyyy-MM-dd'T'HH:mm:ssZ. You may also use expression language such as ${file.lastModifiedTime}. If the value "
-                    + "is invalid, the processor will not be invalid but will fail to change lastModifiedTime of the file.")
-            .required(false)
-            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
-            .expressionLanguageSupported(true)
-            .build();
+        .name("Last Modified Time")
+        .description("The lastModifiedTime to assign to the file after transferring it. If not set, the lastModifiedTime will not be changed. "
+            + "Format must be yyyy-MM-dd'T'HH:mm:ssZ. You may also use expression language such as ${file.lastModifiedTime}. If the value "
+            + "is invalid, the processor will not be invalid but will fail to change lastModifiedTime of the file.")
+        .required(false)
+        .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+        .expressionLanguageSupported(true)
+        .build();
     public static final PropertyDescriptor PERMISSIONS = new PropertyDescriptor.Builder()
-            .name("Permissions")
-            .description("The permissions to assign to the file after transferring it. Format must be either UNIX rwxrwxrwx with a - in place of "
-                    + "denied permissions (e.g. rw-r--r--) or an octal number (e.g. 644). If not set, the permissions will not be changed. You may "
-                    + "also use expression language such as ${file.permissions}. If the value is invalid, the processor will not be invalid but will "
-                    + "fail to change permissions of the file.")
-            .required(false)
-            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
-            .expressionLanguageSupported(true)
-            .build();
+        .name("Permissions")
+        .description("The permissions to assign to the file after transferring it. Format must be either UNIX rwxrwxrwx with a - in place of "
+            + "denied permissions (e.g. rw-r--r--) or an octal number (e.g. 644). If not set, the permissions will not be changed. You may "
+            + "also use expression language such as ${file.permissions}. If the value is invalid, the processor will not be invalid but will "
+            + "fail to change permissions of the file.")
+        .required(false)
+        .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+        .expressionLanguageSupported(true)
+        .build();
     public static final PropertyDescriptor REMOTE_OWNER = new PropertyDescriptor.Builder()
-            .name("Remote Owner")
-            .description("Integer value representing the User ID to set on the file after transferring it. If not set, the owner will not be set. "
-                    + "You may also use expression language such as ${file.owner}. If the value is invalid, the processor will not be invalid but "
-                    + "will fail to change the owner of the file.")
-            .required(false)
-            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
-            .expressionLanguageSupported(true)
-            .build();
+        .name("Remote Owner")
+        .description("Integer value representing the User ID to set on the file after transferring it. If not set, the owner will not be set. "
+            + "You may also use expression language such as ${file.owner}. If the value is invalid, the processor will not be invalid but "
+            + "will fail to change the owner of the file.")
+        .required(false)
+        .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+        .expressionLanguageSupported(true)
+        .build();
     public static final PropertyDescriptor REMOTE_GROUP = new PropertyDescriptor.Builder()
-            .name("Remote Group")
-            .description("Integer value representing the Group ID to set on the file after transferring it. If not set, the group will not be set. "
-                    + "You may also use expression language such as ${file.group}. If the value is invalid, the processor will not be invalid but "
-                    + "will fail to change the group of the file.")
-            .required(false)
-            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
-            .expressionLanguageSupported(true)
-            .build();
+        .name("Remote Group")
+        .description("Integer value representing the Group ID to set on the file after transferring it. If not set, the group will not be set. "
+            + "You may also use expression language such as ${file.group}. If the value is invalid, the processor will not be invalid but "
+            + "will fail to change the group of the file.")
+        .required(false)
+        .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+        .expressionLanguageSupported(true)
+        .build();
     public static final PropertyDescriptor BATCH_SIZE = new PropertyDescriptor.Builder()
-            .name("Batch Size")
-            .description("The maximum number of FlowFiles to send in a single connection")
-            .required(true)
-            .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
-            .defaultValue("500")
-            .build();
+        .name("Batch Size")
+        .description("The maximum number of FlowFiles to send in a single connection")
+        .required(true)
+        .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
+        .defaultValue("500")
+        .build();
 
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/d1d57931/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/ListableEntity.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/ListableEntity.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/ListableEntity.java
new file mode 100644
index 0000000..6e019ff
--- /dev/null
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/ListableEntity.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.standard.util;
+
+public interface ListableEntity {
+
+    /**
+     * @return The name of the remote entity
+     */
+    String getName();
+
+    /**
+     * @return the identifier of the remote entity. This may or may not be the same as the name of the
+     *         entity but should be unique across all entities.
+     */
+    String getIdentifier();
+
+
+    /**
+     * @return the timestamp for this entity so that we can be efficient about not performing listings of the same
+     *         entities multiple times
+     */
+    long getTimestamp();
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/d1d57931/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/PermissionDeniedException.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/PermissionDeniedException.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/PermissionDeniedException.java
new file mode 100644
index 0000000..465995e
--- /dev/null
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/PermissionDeniedException.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.standard.util;
+
+import java.io.IOException;
+
+public class PermissionDeniedException extends IOException {
+    private static final long serialVersionUID = -6215434916883053982L;
+
+    public PermissionDeniedException(final String message) {
+        super(message);
+    }
+
+    public PermissionDeniedException(final String message, final Throwable t) {
+        super(message, t);
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/d1d57931/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/SFTPTransfer.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/SFTPTransfer.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/SFTPTransfer.java
index 19955e7..c28f275 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/SFTPTransfer.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/SFTPTransfer.java
@@ -17,6 +17,7 @@
 package org.apache.nifi.processors.standard.util;
 
 import java.io.File;
+import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.io.InputStream;
 import java.nio.file.Path;
@@ -51,45 +52,45 @@ import com.jcraft.jsch.SftpException;
 public class SFTPTransfer implements FileTransfer {
 
     public static final PropertyDescriptor PRIVATE_KEY_PATH = new PropertyDescriptor.Builder()
-            .name("Private Key Path")
-            .description("The fully qualified path to the Private Key file")
-            .required(false)
-            .addValidator(StandardValidators.FILE_EXISTS_VALIDATOR)
-            .build();
+        .name("Private Key Path")
+        .description("The fully qualified path to the Private Key file")
+        .required(false)
+        .addValidator(StandardValidators.FILE_EXISTS_VALIDATOR)
+        .build();
     public static final PropertyDescriptor PRIVATE_KEY_PASSPHRASE = new PropertyDescriptor.Builder()
-            .name("Private Key Passphrase")
-            .description("Password for the private key")
-            .required(false)
-            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
-            .sensitive(true)
-            .build();
+        .name("Private Key Passphrase")
+        .description("Password for the private key")
+        .required(false)
+        .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+        .sensitive(true)
+        .build();
     public static final PropertyDescriptor HOST_KEY_FILE = new PropertyDescriptor.Builder()
-            .name("Host Key File")
-            .description("If supplied, the given file will be used as the Host Key; otherwise, no use host key file will be used")
-            .addValidator(StandardValidators.FILE_EXISTS_VALIDATOR)
-            .required(false)
-            .build();
+        .name("Host Key File")
+        .description("If supplied, the given file will be used as the Host Key; otherwise, no use host key file will be used")
+        .addValidator(StandardValidators.FILE_EXISTS_VALIDATOR)
+        .required(false)
+        .build();
     public static final PropertyDescriptor STRICT_HOST_KEY_CHECKING = new PropertyDescriptor.Builder()
-            .name("Strict Host Key Checking")
-            .description("Indicates whether or not strict enforcement of hosts keys should be applied")
-            .allowableValues("true", "false")
-            .defaultValue("false")
-            .required(true)
-            .build();
+        .name("Strict Host Key Checking")
+        .description("Indicates whether or not strict enforcement of hosts keys should be applied")
+        .allowableValues("true", "false")
+        .defaultValue("false")
+        .required(true)
+        .build();
     public static final PropertyDescriptor PORT = new PropertyDescriptor.Builder()
-            .name("Port")
-            .description("The port that the remote system is listening on for file transfers")
-            .addValidator(StandardValidators.PORT_VALIDATOR)
-            .required(true)
-            .defaultValue("22")
-            .build();
+        .name("Port")
+        .description("The port that the remote system is listening on for file transfers")
+        .addValidator(StandardValidators.PORT_VALIDATOR)
+        .required(true)
+        .defaultValue("22")
+        .build();
     public static final PropertyDescriptor USE_KEEPALIVE_ON_TIMEOUT = new PropertyDescriptor.Builder()
-            .name("Send Keep Alive On Timeout")
-            .description("Indicates whether or not to send a single Keep Alive message when SSH socket times out")
-            .allowableValues("true", "false")
-            .defaultValue("true")
-            .required(true)
-            .build();
+        .name("Send Keep Alive On Timeout")
+        .description("Indicates whether or not to send a single Keep Alive message when SSH socket times out")
+        .allowableValues("true", "false")
+        .defaultValue("true")
+        .required(true)
+        .build();
 
     /**
      * Dynamic property which is used to decide if the {@link #ensureDirectoryExists(FlowFile, File)} method should perform a {@link ChannelSftp#ls(String)} before calling
@@ -99,12 +100,12 @@ public class SFTPTransfer implements FileTransfer {
      * This property is dynamic until deemed a worthy inclusion as proper.
      */
     public static final PropertyDescriptor DISABLE_DIRECTORY_LISTING = new PropertyDescriptor.Builder()
-            .name("Disable Directory Listing")
-            .description("Disables directory listings before operations which might fail, such as configurations which create directory structures.")
-            .addValidator(StandardValidators.BOOLEAN_VALIDATOR)
-            .dynamic(true)
-            .defaultValue("false")
-            .build();
+        .name("Disable Directory Listing")
+        .description("Disables directory listings before operations which might fail, such as configurations which create directory structures.")
+        .addValidator(StandardValidators.BOOLEAN_VALIDATOR)
+        .dynamic(true)
+        .defaultValue("false")
+        .build();
 
     private final ProcessorLog logger;
 
@@ -133,7 +134,16 @@ public class SFTPTransfer implements FileTransfer {
     public List<FileInfo> getListing() throws IOException {
         final String path = ctx.getProperty(FileTransfer.REMOTE_PATH).evaluateAttributeExpressions().getValue();
         final int depth = 0;
-        final int maxResults = ctx.getProperty(FileTransfer.REMOTE_POLL_BATCH_SIZE).asInteger();
+
+        final int maxResults;
+        final PropertyValue batchSizeValue = ctx.getProperty(FileTransfer.REMOTE_POLL_BATCH_SIZE);
+        if (batchSizeValue == null) {
+            maxResults = Integer.MAX_VALUE;
+        } else {
+            final Integer configuredValue = batchSizeValue.asInteger();
+            maxResults = configuredValue == null ? Integer.MAX_VALUE : configuredValue;
+        }
+
         final List<FileInfo> listing = new ArrayList<>(1000);
         getListing(path, depth, maxResults, listing);
         return listing;
@@ -222,7 +232,15 @@ public class SFTPTransfer implements FileTransfer {
                 sftp.ls(path, filter);
             }
         } catch (final SftpException e) {
-            throw new IOException("Failed to obtain file listing for " + (path == null ? "current directory" : path), e);
+            final String pathDesc = path == null ? "current directory" : path;
+            switch (e.id) {
+                case ChannelSftp.SSH_FX_NO_SUCH_FILE:
+                    throw new FileNotFoundException("Could not perform listing on " + pathDesc + " because could not find the file on the remote server");
+                case ChannelSftp.SSH_FX_PERMISSION_DENIED:
+                    throw new PermissionDeniedException("Could not perform listing on " + pathDesc + " due to insufficient permissions");
+                default:
+                    throw new IOException("Failed to obtain file listing for " + pathDesc, e);
+            }
         }
 
         for (final LsEntry entry : subDirs) {
@@ -251,24 +269,36 @@ public class SFTPTransfer implements FileTransfer {
         }
 
         FileInfo.Builder builder = new FileInfo.Builder()
-                .filename(entry.getFilename())
-                .fullPathFileName(newFullForwardPath)
-                .directory(entry.getAttrs().isDir())
-                .size(entry.getAttrs().getSize())
-                .lastModifiedTime(entry.getAttrs().getMTime() * 1000L)
-                .permissions(perms)
-                .owner(Integer.toString(entry.getAttrs().getUId()))
-                .group(Integer.toString(entry.getAttrs().getGId()));
+            .filename(entry.getFilename())
+            .fullPathFileName(newFullForwardPath)
+            .directory(entry.getAttrs().isDir())
+            .size(entry.getAttrs().getSize())
+            .lastModifiedTime(entry.getAttrs().getMTime() * 1000L)
+            .permissions(perms)
+            .owner(Integer.toString(entry.getAttrs().getUId()))
+            .group(Integer.toString(entry.getAttrs().getGId()));
         return builder.build();
     }
 
     @Override
     public InputStream getInputStream(final String remoteFileName) throws IOException {
-        final ChannelSftp sftp = getChannel(null);
+        return getInputStream(remoteFileName, null);
+    }
+
+    @Override
+    public InputStream getInputStream(final String remoteFileName, final FlowFile flowFile) throws IOException {
+        final ChannelSftp sftp = getChannel(flowFile);
         try {
             return sftp.get(remoteFileName);
         } catch (final SftpException e) {
-            throw new IOException("Failed to obtain file content for " + remoteFileName, e);
+            switch (e.id) {
+                case ChannelSftp.SSH_FX_NO_SUCH_FILE:
+                    throw new FileNotFoundException("Could not find file " + remoteFileName + " on remote SFTP Server");
+                case ChannelSftp.SSH_FX_PERMISSION_DENIED:
+                    throw new PermissionDeniedException("Insufficient permissions to read file " + remoteFileName + " from remote SFTP Server", e);
+                default:
+                    throw new IOException("Failed to obtain file content for " + remoteFileName, e);
+            }
         }
     }
 
@@ -283,7 +313,14 @@ public class SFTPTransfer implements FileTransfer {
         try {
             sftp.rm(fullPath);
         } catch (final SftpException e) {
-            throw new IOException("Failed to delete remote file " + fullPath, e);
+            switch (e.id) {
+                case ChannelSftp.SSH_FX_NO_SUCH_FILE:
+                    throw new FileNotFoundException("Could not find file " + remoteFileName + " to remove from remote SFTP Server");
+                case ChannelSftp.SSH_FX_PERMISSION_DENIED:
+                    throw new PermissionDeniedException("Insufficient permissions to delete file " + remoteFileName + " from remote SFTP Server", e);
+                default:
+                    throw new IOException("Failed to delete remote file " + fullPath, e);
+            }
         }
     }
 
@@ -333,10 +370,10 @@ public class SFTPTransfer implements FileTransfer {
             if (directoryName.getParent() != null && !directoryName.getParentFile().equals(new File(File.separator))) {
                 ensureDirectoryExists(flowFile, directoryName.getParentFile());
             }
-            logger.debug("Remote Directory {} does not exist; creating it", new Object[]{remoteDirectory});
+            logger.debug("Remote Directory {} does not exist; creating it", new Object[] {remoteDirectory});
             try {
                 channel.mkdir(remoteDirectory);
-                logger.debug("Created {}", new Object[]{remoteDirectory});
+                logger.debug("Created {}", new Object[] {remoteDirectory});
             } catch (final SftpException e) {
                 throw new IOException("Failed to create remote directory " + remoteDirectory + " due to " + e, e);
             }
@@ -358,9 +395,9 @@ public class SFTPTransfer implements FileTransfer {
 
         final JSch jsch = new JSch();
         try {
-            final Session session = jsch.getSession(ctx.getProperty(USERNAME).getValue(),
-                    ctx.getProperty(HOSTNAME).evaluateAttributeExpressions(flowFile).getValue(),
-                    ctx.getProperty(PORT).evaluateAttributeExpressions(flowFile).asInteger().intValue());
+            final Session session = jsch.getSession(ctx.getProperty(USERNAME).evaluateAttributeExpressions(flowFile).getValue(),
+                ctx.getProperty(HOSTNAME).evaluateAttributeExpressions(flowFile).getValue(),
+                ctx.getProperty(PORT).evaluateAttributeExpressions(flowFile).asInteger().intValue());
 
             final String hostKeyVal = ctx.getProperty(HOST_KEY_FILE).getValue();
             if (hostKeyVal != null) {
@@ -371,7 +408,8 @@ public class SFTPTransfer implements FileTransfer {
             properties.setProperty("StrictHostKeyChecking", ctx.getProperty(STRICT_HOST_KEY_CHECKING).asBoolean() ? "yes" : "no");
             properties.setProperty("PreferredAuthentications", "publickey,password");
 
-            if (ctx.getProperty(FileTransfer.USE_COMPRESSION).asBoolean()) {
+            final PropertyValue compressionValue = ctx.getProperty(FileTransfer.USE_COMPRESSION);
+            if (compressionValue != null && "true".equalsIgnoreCase(compressionValue.getValue())) {
                 properties.setProperty("compression.s2c", "zlib@openssh.com,zlib,none");
                 properties.setProperty("compression.c2s", "zlib@openssh.com,zlib,none");
             } else {
@@ -381,12 +419,12 @@ public class SFTPTransfer implements FileTransfer {
 
             session.setConfig(properties);
 
-            final String privateKeyFile = ctx.getProperty(PRIVATE_KEY_PATH).getValue();
+            final String privateKeyFile = ctx.getProperty(PRIVATE_KEY_PATH).evaluateAttributeExpressions(flowFile).getValue();
             if (privateKeyFile != null) {
-                jsch.addIdentity(privateKeyFile, ctx.getProperty(PRIVATE_KEY_PASSPHRASE).getValue());
+                jsch.addIdentity(privateKeyFile, ctx.getProperty(PRIVATE_KEY_PASSPHRASE).evaluateAttributeExpressions(flowFile).getValue());
             }
 
-            final String password = ctx.getProperty(FileTransfer.PASSWORD).getValue();
+            final String password = ctx.getProperty(FileTransfer.PASSWORD).evaluateAttributeExpressions(flowFile).getValue();
             if (password != null) {
                 session.setPassword(password);
             }
@@ -428,7 +466,7 @@ public class SFTPTransfer implements FileTransfer {
                 sftp.exit();
             }
         } catch (final Exception ex) {
-            logger.warn("Failed to close ChannelSftp due to {}", new Object[]{ex.toString()}, ex);
+            logger.warn("Failed to close ChannelSftp due to {}", new Object[] {ex.toString()}, ex);
         }
         sftp = null;
 
@@ -437,7 +475,7 @@ public class SFTPTransfer implements FileTransfer {
                 session.disconnect();
             }
         } catch (final Exception ex) {
-            logger.warn("Failed to close session due to {}", new Object[]{ex.toString()}, ex);
+            logger.warn("Failed to close session due to {}", new Object[] {ex.toString()}, ex);
         }
         session = null;
     }
@@ -515,7 +553,7 @@ public class SFTPTransfer implements FileTransfer {
                 int time = (int) (fileModifyTime.getTime() / 1000L);
                 sftp.setMtime(tempPath, time);
             } catch (final Exception e) {
-                logger.error("Failed to set lastModifiedTime on {} to {} due to {}", new Object[]{tempPath, lastModifiedTime, e});
+                logger.error("Failed to set lastModifiedTime on {} to {} due to {}", new Object[] {tempPath, lastModifiedTime, e});
             }
         }
 
@@ -527,7 +565,7 @@ public class SFTPTransfer implements FileTransfer {
                     sftp.chmod(perms, tempPath);
                 }
             } catch (final Exception e) {
-                logger.error("Failed to set permission on {} to {} due to {}", new Object[]{tempPath, permissions, e});
+                logger.error("Failed to set permission on {} to {} due to {}", new Object[] {tempPath, permissions, e});
             }
         }
 
@@ -536,7 +574,7 @@ public class SFTPTransfer implements FileTransfer {
             try {
                 sftp.chown(Integer.parseInt(owner), tempPath);
             } catch (final Exception e) {
-                logger.error("Failed to set owner on {} to {} due to {}", new Object[]{tempPath, owner, e});
+                logger.error("Failed to set owner on {} to {} due to {}", new Object[] {tempPath, owner, e});
             }
         }
 
@@ -545,7 +583,7 @@ public class SFTPTransfer implements FileTransfer {
             try {
                 sftp.chgrp(Integer.parseInt(group), tempPath);
             } catch (final Exception e) {
-                logger.error("Failed to set group on {} to {} due to {}", new Object[]{tempPath, group, e});
+                logger.error("Failed to set group on {} to {} due to {}", new Object[] {tempPath, group, e});
             }
         }
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/d1d57931/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
index ff39ad3..b12fb6f 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
@@ -28,6 +28,7 @@ org.apache.nifi.processors.standard.EvaluateXQuery
 org.apache.nifi.processors.standard.ExecuteStreamCommand
 org.apache.nifi.processors.standard.ExecuteProcess
 org.apache.nifi.processors.standard.ExtractText
+org.apache.nifi.processors.standard.FetchSFTP
 org.apache.nifi.processors.standard.GenerateFlowFile
 org.apache.nifi.processors.standard.GetFile
 org.apache.nifi.processors.standard.GetFTP
@@ -43,6 +44,7 @@ org.apache.nifi.processors.standard.GetJMSQueue
 org.apache.nifi.processors.standard.GetJMSTopic
 org.apache.nifi.processors.standard.ListenHTTP
 org.apache.nifi.processors.standard.ListenUDP
+org.apache.nifi.processors.standard.ListSFTP
 org.apache.nifi.processors.standard.LogAttribute
 org.apache.nifi.processors.standard.MergeContent
 org.apache.nifi.processors.standard.ModifyBytes

http://git-wip-us.apache.org/repos/asf/nifi/blob/d1d57931/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestAbstractListProcessor.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestAbstractListProcessor.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestAbstractListProcessor.java
new file mode 100644
index 0000000..ba84939
--- /dev/null
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestAbstractListProcessor.java
@@ -0,0 +1,221 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.standard;
+
+import static org.junit.Assert.assertEquals;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.controller.AbstractControllerService;
+import org.apache.nifi.distributed.cache.client.Deserializer;
+import org.apache.nifi.distributed.cache.client.DistributedMapCacheClient;
+import org.apache.nifi.distributed.cache.client.Serializer;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processors.standard.util.ListableEntity;
+import org.apache.nifi.reporting.InitializationException;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.junit.Test;
+
+public class TestAbstractListProcessor {
+
+    @Test
+    public void testOnlyNewEntriesEmitted() {
+        final ConcreteListProcessor proc = new ConcreteListProcessor();
+        final TestRunner runner = TestRunners.newTestRunner(proc);
+        runner.run();
+
+        runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 0);
+        proc.addEntity("name", "id", 1492L);
+        runner.run();
+
+        runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 1);
+        runner.clearTransferState();
+
+        proc.addEntity("name", "id2", 1492L);
+        runner.run();
+        runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 1);
+        runner.clearTransferState();
+
+        proc.addEntity("name", "id2", 1492L);
+        runner.run();
+        runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 0);
+        runner.clearTransferState();
+
+        proc.addEntity("name", "id3", 1491L);
+        runner.run();
+        runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 0);
+        runner.clearTransferState();
+
+        proc.addEntity("name", "id2", 1492L);
+        runner.run();
+        runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 0);
+        runner.clearTransferState();
+
+        proc.addEntity("name", "id2", 1493L);
+        runner.run();
+        runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 1);
+        runner.clearTransferState();
+
+        proc.addEntity("name", "id2", 1493L);
+        runner.run();
+        runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 0);
+        runner.clearTransferState();
+
+        proc.addEntity("name", "id2", 1493L);
+        runner.run();
+        runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 0);
+        runner.clearTransferState();
+
+        proc.addEntity("name", "id", 1494L);
+        runner.run();
+        runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 1);
+        runner.clearTransferState();
+    }
+
+    @Test
+    public void testStateStoredInDistributedService() throws InitializationException {
+        final ConcreteListProcessor proc = new ConcreteListProcessor();
+        final TestRunner runner = TestRunners.newTestRunner(proc);
+        final DistributedCache cache = new DistributedCache();
+        runner.addControllerService("cache", cache);
+        runner.enableControllerService(cache);
+        runner.setProperty(AbstractListProcessor.DISTRIBUTED_CACHE_SERVICE, "cache");
+
+        runner.run();
+
+        proc.addEntity("name", "id", 1492L);
+        runner.run();
+
+        assertEquals(1, cache.stored.size());
+    }
+
+    @Test
+    public void testFetchOnStart() throws InitializationException {
+        final ConcreteListProcessor proc = new ConcreteListProcessor();
+        final TestRunner runner = TestRunners.newTestRunner(proc);
+        final DistributedCache cache = new DistributedCache();
+        runner.addControllerService("cache", cache);
+        runner.enableControllerService(cache);
+        runner.setProperty(AbstractListProcessor.DISTRIBUTED_CACHE_SERVICE, "cache");
+
+        runner.run();
+
+        assertEquals(1, cache.fetchCount);
+    }
+
+    private static class DistributedCache extends AbstractControllerService implements DistributedMapCacheClient {
+        private final Map<Object, Object> stored = new HashMap<>();
+        private int fetchCount = 0;
+
+        @Override
+        public <K, V> boolean putIfAbsent(K key, V value, Serializer<K> keySerializer, Serializer<V> valueSerializer) throws IOException {
+            return false;
+        }
+
+        @Override
+        public <K, V> V getAndPutIfAbsent(K key, V value, Serializer<K> keySerializer, Serializer<V> valueSerializer, Deserializer<V> valueDeserializer) throws IOException {
+            return null;
+        }
+
+        @Override
+        public <K> boolean containsKey(K key, Serializer<K> keySerializer) throws IOException {
+            return false;
+        }
+
+        @Override
+        public <K, V> void put(K key, V value, Serializer<K> keySerializer, Serializer<V> valueSerializer) throws IOException {
+            stored.put(key, value);
+        }
+
+        @Override
+        @SuppressWarnings("unchecked")
+        public <K, V> V get(K key, Serializer<K> keySerializer, Deserializer<V> valueDeserializer) throws IOException {
+            fetchCount++;
+            return (V) stored.get(key);
+        }
+
+        @Override
+        public void close() throws IOException {
+        }
+
+        @Override
+        public <K> boolean remove(K key, Serializer<K> serializer) throws IOException {
+            final Object value = stored.remove(key);
+            return value != null;
+        }
+    }
+
+
+    private static class ConcreteListProcessor extends AbstractListProcessor<ListableEntity> {
+        private final List<ListableEntity> entities = new ArrayList<>();
+
+        @Override
+        protected File getPersistenceFile() {
+            return new File("target/ListProcessor-local-state.json");
+        }
+
+        public void addEntity(final String name, final String identifier, final long timestamp) {
+            final ListableEntity entity = new ListableEntity() {
+                @Override
+                public String getName() {
+                    return name;
+                }
+
+                @Override
+                public String getIdentifier() {
+                    return identifier;
+                }
+
+                @Override
+                public long getTimestamp() {
+                    return timestamp;
+                }
+            };
+
+            entities.add(entity);
+        }
+
+        @Override
+        protected Map<String, String> createAttributes(final ListableEntity entity, final ProcessContext context) {
+            return Collections.emptyMap();
+        }
+
+        @Override
+        protected String getPath(final ProcessContext context) {
+            return "/path";
+        }
+
+        @Override
+        protected List<ListableEntity> performListing(final ProcessContext context, final Long minTimestamp) throws IOException {
+            return Collections.unmodifiableList(entities);
+        }
+
+        @Override
+        protected boolean isListingResetNecessary(PropertyDescriptor property) {
+            return false;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/d1d57931/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestFetchFileTransfer.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestFetchFileTransfer.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestFetchFileTransfer.java
new file mode 100644
index 0000000..7aa8f9c
--- /dev/null
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestFetchFileTransfer.java
@@ -0,0 +1,186 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.standard;
+
+import static org.junit.Assert.assertFalse;
+
+import java.io.ByteArrayInputStream;
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processors.standard.util.FileInfo;
+import org.apache.nifi.processors.standard.util.FileTransfer;
+import org.apache.nifi.processors.standard.util.PermissionDeniedException;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.junit.Test;
+
+public class TestFetchFileTransfer {
+
+    @Test
+    public void testContentFetched() {
+        final TestableFetchFileTransfer proc = new TestableFetchFileTransfer();
+        final TestRunner runner = TestRunners.newTestRunner(proc);
+        runner.setProperty(FetchFileTransfer.HOSTNAME, "localhost");
+        runner.setProperty(FetchFileTransfer.UNDEFAULTED_PORT, "11");
+        runner.setProperty(FetchFileTransfer.REMOTE_FILENAME, "${filename}");
+
+        proc.addContent("hello.txt", "world".getBytes());
+        final Map<String, String> attrs = new HashMap<>();
+        attrs.put("filename", "hello.txt");
+        runner.enqueue(new byte[0], attrs);
+
+        runner.run(1, false, false);
+        runner.assertAllFlowFilesTransferred(FetchFileTransfer.REL_SUCCESS, 1);
+        assertFalse(proc.closed);
+        runner.getFlowFilesForRelationship(FetchFileTransfer.REL_SUCCESS).get(0).assertContentEquals("world");
+    }
+
+    @Test
+    public void testContentNotFound() {
+        final TestableFetchFileTransfer proc = new TestableFetchFileTransfer();
+        final TestRunner runner = TestRunners.newTestRunner(proc);
+        runner.setProperty(FetchFileTransfer.HOSTNAME, "localhost");
+        runner.setProperty(FetchFileTransfer.UNDEFAULTED_PORT, "11");
+        runner.setProperty(FetchFileTransfer.REMOTE_FILENAME, "${filename}");
+
+        final Map<String, String> attrs = new HashMap<>();
+        attrs.put("filename", "hello.txt");
+        runner.enqueue(new byte[0], attrs);
+
+        runner.run(1, false, false);
+        runner.assertAllFlowFilesTransferred(FetchFileTransfer.REL_NOT_FOUND, 1);
+    }
+
+    @Test
+    public void testInsufficientPermissions() {
+        final TestableFetchFileTransfer proc = new TestableFetchFileTransfer();
+        final TestRunner runner = TestRunners.newTestRunner(proc);
+        runner.setProperty(FetchFileTransfer.HOSTNAME, "localhost");
+        runner.setProperty(FetchFileTransfer.UNDEFAULTED_PORT, "11");
+        runner.setProperty(FetchFileTransfer.REMOTE_FILENAME, "${filename}");
+
+        proc.addContent("hello.txt", "world".getBytes());
+        proc.allowAccess = false;
+        final Map<String, String> attrs = new HashMap<>();
+        attrs.put("filename", "hello.txt");
+        runner.enqueue(new byte[0], attrs);
+
+        runner.run(1, false, false);
+        runner.assertAllFlowFilesTransferred(FetchFileTransfer.REL_PERMISSION_DENIED, 1);
+    }
+
+    private static class TestableFetchFileTransfer extends FetchFileTransfer {
+        private boolean allowAccess = true;
+        private boolean closed = false;
+        private final Map<String, byte[]> fileContents = new HashMap<>();
+
+        public void addContent(final String filename, final byte[] content) {
+            this.fileContents.put(filename, content);
+        }
+
+        @Override
+        protected FileTransfer createFileTransfer(final ProcessContext context) {
+            return new FileTransfer() {
+                @Override
+                public void close() throws IOException {
+                    closed = true;
+                }
+
+                @Override
+                public String getHomeDirectory(FlowFile flowFile) throws IOException {
+                    return null;
+                }
+
+                @Override
+                public List<FileInfo> getListing() throws IOException {
+                    return null;
+                }
+
+                @Override
+                public InputStream getInputStream(final String remoteFileName) throws IOException {
+                    return getInputStream(remoteFileName, null);
+                }
+
+                @Override
+                public InputStream getInputStream(String remoteFileName, FlowFile flowFile) throws IOException {
+                    if (!allowAccess) {
+                        throw new PermissionDeniedException("test permission denied");
+                    }
+
+                    final byte[] content = fileContents.get(remoteFileName);
+                    if (content == null) {
+                        throw new FileNotFoundException();
+                    }
+
+                    return new ByteArrayInputStream(content);
+                }
+
+                @Override
+                public void flush() throws IOException {
+                }
+
+                @Override
+                public FileInfo getRemoteFileInfo(FlowFile flowFile, String path, String remoteFileName) throws IOException {
+                    return null;
+                }
+
+                @Override
+                public String put(FlowFile flowFile, String path, String filename, InputStream content) throws IOException {
+                    return null;
+                }
+
+                @Override
+                public void deleteFile(String path, String remoteFileName) throws IOException {
+                    if (!fileContents.containsKey(remoteFileName)) {
+                        throw new FileNotFoundException();
+                    }
+
+                    fileContents.remove(remoteFileName);
+                }
+
+                @Override
+                public void deleteDirectory(String remoteDirectoryName) throws IOException {
+
+                }
+
+                @Override
+                public boolean isClosed() {
+                    return false;
+                }
+
+                @Override
+                public String getProtocolName() {
+                    return "test";
+                }
+
+                @Override
+                public void ensureDirectoryExists(FlowFile flowFile, File remoteDirectory) throws IOException {
+
+                }
+            };
+        }
+    }
+}


[4/5] nifi git commit: NIFI-673: Initial implementation of ListSFTP, FetchSFTP

Posted by ma...@apache.org.
NIFI-673: Initial implementation of ListSFTP, FetchSFTP


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/d1d57931
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/d1d57931
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/d1d57931

Branch: refs/heads/master
Commit: d1d57931bf996a230ab7941cb6c1524286c97606
Parents: 8a80060
Author: Mark Payne <ma...@hotmail.com>
Authored: Sun Oct 4 15:48:28 2015 -0400
Committer: Mark Payne <ma...@hotmail.com>
Committed: Sun Oct 25 11:13:02 2015 -0400

----------------------------------------------------------------------
 .../standard/AbstractListProcessor.java         | 505 +++++++++++++++++++
 .../processors/standard/FetchFileTransfer.java  | 296 +++++++++++
 .../nifi/processors/standard/FetchSFTP.java     |  89 ++++
 .../processors/standard/ListFileTransfer.java   | 103 ++++
 .../nifi/processors/standard/ListSFTP.java      |  81 +++
 .../processors/standard/util/EntityListing.java |  71 +++
 .../processors/standard/util/FTPTransfer.java   | 135 ++---
 .../nifi/processors/standard/util/FileInfo.java |  18 +-
 .../processors/standard/util/FileTransfer.java  | 335 ++++++------
 .../standard/util/ListableEntity.java           |  40 ++
 .../util/PermissionDeniedException.java         |  32 ++
 .../processors/standard/util/SFTPTransfer.java  | 174 ++++---
 .../org.apache.nifi.processor.Processor         |   2 +
 .../standard/TestAbstractListProcessor.java     | 221 ++++++++
 .../standard/TestFetchFileTransfer.java         | 186 +++++++
 15 files changed, 1988 insertions(+), 300 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/d1d57931/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractListProcessor.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractListProcessor.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractListProcessor.java
new file mode 100644
index 0000000..8a7fade
--- /dev/null
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractListProcessor.java
@@ -0,0 +1,505 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.standard;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.Date;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+
+import org.apache.nifi.annotation.behavior.TriggerSerially;
+import org.apache.nifi.annotation.notification.OnPrimaryNodeStateChange;
+import org.apache.nifi.annotation.notification.PrimaryNodeState;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.distributed.cache.client.Deserializer;
+import org.apache.nifi.distributed.cache.client.DistributedMapCacheClient;
+import org.apache.nifi.distributed.cache.client.Serializer;
+import org.apache.nifi.distributed.cache.client.exception.DeserializationException;
+import org.apache.nifi.distributed.cache.client.exception.SerializationException;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processors.standard.util.EntityListing;
+import org.apache.nifi.processors.standard.util.ListableEntity;
+import org.codehaus.jackson.JsonGenerationException;
+import org.codehaus.jackson.JsonNode;
+import org.codehaus.jackson.JsonParseException;
+import org.codehaus.jackson.map.JsonMappingException;
+import org.codehaus.jackson.map.ObjectMapper;
+
+/**
+ * <p>
+ * An Abstract Processor that is intended to simplify the coding required in order to perform Listing operations of remote resources.
+ * Those remote resources may be files, "objects", "messages", or any other sort of entity that may need to be listed in such a way that
+ * we identity the entity only once. Each of these objects, messages, etc. is referred to as an "entity" for the scope of this Processor.
+ * </p>
+ *
+ * <p>
+ * This class is responsible for triggering the listing to occur, filtering the results returned such that only new (unlisted) entities
+ * or entities that have been modified will be emitted from the Processor.
+ * </p>
+ *
+ * <p>
+ * In order to make use of this abstract class, the entities listed must meet the following criteria:
+ * <ul>
+ * <li>
+ * Entity must have a timestamp associated with it. This timestamp is used to determine if entities are "new" or not. Any entity that is
+ * returned by the listing will be considered "new" if the timestamp is later than the latest timestamp pulled.
+ * </li>
+ * <li>
+ * Entity must have a unique identifier. This is used in conjunction with the timestamp in order to determine whether or not the entity is
+ * new. If the timestamp of an entity is before the latest timestamp pulled, then the entity is not considered new. If the timestamp is later
+ * than the last timestamp pulled, then the entity is considered new. If the timestamp is equal to the latest timestamp pulled, then the entity's
+ * identifier is compared to all of the entity identifiers that have that same timestamp in order to determine whether or not the entity has been
+ * seen already.
+ * </li>
+ * <li>
+ * Entity must have a user-readable name that can be used for logging purposes.
+ * </li>
+ * </p>
+ *
+ * <p>
+ * This class persists state across restarts so that even if NiFi is restarted, duplicates will not be pulled from the remote system. This is performed using
+ * two different mechanisms. First, state is stored locally. This allows the system to be restarted and begin processing where it left off. The state that is
+ * stored is the latest timestamp that has been pulled (as determined by the timestamps of the entities that are returned), as well as the unique identifier of
+ * each entity that has that timestamp. See the section above for information about how these pieces of information are used in order to determine entity uniqueness.
+ * </p>
+ *
+ * <p>
+ * In addition to storing state locally, the Processor exposes an optional <code>Distributed Cache Service</code> property. In standalone deployment of NiFi, this is
+ * not necessary. However, in a clustered environment, subclasses of this class are expected to be run only on primary node. While this means that the local state is
+ * accurate as long as the primary node remains constant, the primary node in the cluster can be changed. As a result, if running in a clustered environment, it is
+ * recommended that this property be set. This allows the same state that is described above to also be replicated across the cluster. If this property is set, then
+ * on restart the Processor will not begin listing until it has retrieved an updated state from this service, as it does not know whether or not another node has
+ * modified the state in the mean time.
+ * </p>
+ *
+ * <p>
+ * For each new entity that is listed, the Processor will send a FlowFile to the 'success' relationship. The FlowFile will have no content but will have some set
+ * of attributes (defined by the concrete implementation) that can be used to fetch those remote resources or interact with them in whatever way makes sense for
+ * the configured dataflow.
+ * </p>
+ *
+ * <p>
+ * Subclasses are responsible for the following:
+ *
+ * <ul>
+ * <li>
+ * Perform a listing of remote resources. The subclass will implement the {@link #performListing(ProcessContext, Long)} method, which creates a listing of all
+ * entities on the remote system that have timestamps later than the provided timestamp. If the entities returned have a timestamp before the provided one, those
+ * entities will be filtered out. It is therefore not necessary to perform the filtering of timestamps but is provided in order to give the implementation the ability
+ * to filter those resources on the server side rather than pulling back all of the information, if it makes sense to do so in the concrete implementation.
+ * </li>
+ * <li>
+ * Creating a Map of attributes that are applicable for an entity. The attributes that are assigned to each FlowFile are exactly those returned by the
+ * {@link #createAttributes(ListableEntity, ProcessContext)}.
+ * </li>
+ * <li>
+ * Returning the configured path. Many resources can be comprised of a "path" (or a "container" or "bucket", etc.) as well as name or identifier that is unique only
+ * within that path. The {@link #getPath(ProcessContext)} method is responsible for returning the path that is currently being polled for entities. If this does concept
+ * does not apply for the concrete implementation, it is recommended that the concrete implementation return "." or "/" for all invocations of this method.
+ * </li>
+ * <li>
+ * Determining when the listing must be cleared. It is sometimes necessary to clear state about which entities have already been ingested, as the result of a user
+ * changing a property value. The {@link #isListingResetNecessary(PropertyDescriptor)} method is responsible for determining when the listing needs to be reset by returning
+ * a boolean indicating whether or not a change in the value of the provided property should trigger the timestamp and identifier information to be cleared.
+ * </li>
+ * </ul>
+ * </p>
+ */
+@TriggerSerially
+public abstract class AbstractListProcessor<T extends ListableEntity> extends AbstractProcessor {
+    public static final PropertyDescriptor DISTRIBUTED_CACHE_SERVICE = new PropertyDescriptor.Builder()
+        .name("Distributed Cache Service")
+        .description("Specifies the Controller Service that should be used to maintain state about what has been pulled from the remote server so that if a new node "
+            + "begins pulling data, it won't duplicate all of the work that has been done. If not specified, the information will not be shared across the cluster. "
+            + "This property does not need to be set for standalone instances of NiFi but should be configured if NiFi is run within a cluster.")
+        .required(false)
+        .identifiesControllerService(DistributedMapCacheClient.class)
+        .build();
+
+
+
+    public static final Relationship REL_SUCCESS = new Relationship.Builder()
+        .name("success")
+        .description("All FlowFiles that are received are routed to success")
+        .build();
+
+
+    private volatile Long lastListingTime = null;
+    private volatile Set<String> latestIdentifiersListed = new HashSet<>();
+    private volatile boolean electedPrimaryNode = false;
+
+    protected File getPersistenceFile() {
+        return new File("conf/state/" + getIdentifier());
+    }
+
+    @Override
+    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        final List<PropertyDescriptor> properties = new ArrayList<>();
+        properties.add(DISTRIBUTED_CACHE_SERVICE);
+        return properties;
+    }
+
+    @Override
+    public void onPropertyModified(final PropertyDescriptor descriptor, final String oldValue, final String newValue) {
+        if (isListingResetNecessary(descriptor)) {
+            lastListingTime = null; // clear lastListingTime so that we have to fetch new time
+            latestIdentifiersListed = new HashSet<>();
+        }
+    }
+
+    @Override
+    public Set<Relationship> getRelationships() {
+        final Set<Relationship> relationships = new HashSet<>();
+        relationships.add(REL_SUCCESS);
+        return relationships;
+    }
+
+    protected String getKey(final String directory) {
+        return getIdentifier() + ".lastListingTime." + directory;
+    }
+
+    @OnPrimaryNodeStateChange
+    public void onPrimaryNodeChange(final PrimaryNodeState newState) {
+        if (newState == PrimaryNodeState.ELECTED_PRIMARY_NODE) {
+            electedPrimaryNode = true;
+        }
+    }
+
+    private EntityListing deserialize(final String serializedState) throws JsonParseException, JsonMappingException, IOException {
+        final ObjectMapper mapper = new ObjectMapper();
+        final JsonNode jsonNode = mapper.readTree(serializedState);
+        return mapper.readValue(jsonNode, EntityListing.class);
+    }
+
+
+    private Long getMinTimestamp(final String directory, final DistributedMapCacheClient client) throws IOException {
+        // Determine the timestamp for the last file that we've listed.
+        Long minTimestamp = lastListingTime;
+        if (minTimestamp == null || electedPrimaryNode) {
+            // We haven't yet restored any state from local or distributed state - or it's been at least a minute since
+            // we have performed a listing. In this case,
+            // First, attempt to get timestamp from distributed cache service.
+            if (client != null) {
+                try {
+                    final StringSerDe serde = new StringSerDe();
+                    final String serializedState = client.get(getKey(directory), serde, serde);
+                    if (serializedState == null || serializedState.isEmpty()) {
+                        minTimestamp = null;
+                        this.latestIdentifiersListed = Collections.emptySet();
+                    } else {
+                        final EntityListing listing = deserialize(serializedState);
+                        this.lastListingTime = listing.getLatestTimestamp().getTime();
+                        minTimestamp = listing.getLatestTimestamp().getTime();
+                        this.latestIdentifiersListed = new HashSet<>(listing.getMatchingIdentifiers());
+                    }
+
+                    this.lastListingTime = minTimestamp;
+                    electedPrimaryNode = false; // no requirement to pull an update from the distributed cache anymore.
+                } catch (final IOException ioe) {
+                    throw ioe;
+                }
+            }
+
+            // Check the persistence file. We want to use the latest timestamp that we have so that
+            // we don't duplicate data.
+            try {
+                final File persistenceFile = getPersistenceFile();
+                if (persistenceFile.exists()) {
+                    try (final FileInputStream fis = new FileInputStream(persistenceFile)) {
+                        final Properties props = new Properties();
+                        props.load(fis);
+
+                        // get the local timestamp for this directory, if it exists.
+                        final String locallyPersistedValue = props.getProperty(directory);
+                        if (locallyPersistedValue != null) {
+                            final EntityListing listing = deserialize(locallyPersistedValue);
+                            final long localTimestamp = listing.getLatestTimestamp().getTime();
+
+                            // If distributed state doesn't have an entry or the local entry is later than the distributed state,
+                            // update the distributed state so that we are in sync.
+                            if (client != null && (minTimestamp == null || localTimestamp > minTimestamp)) {
+                                minTimestamp = localTimestamp;
+
+                                // Our local persistence file shows a later time than the Distributed service.
+                                // Update the distributed service to match our local state.
+                                try {
+                                    final StringSerDe serde = new StringSerDe();
+                                    client.put(getKey(directory), locallyPersistedValue, serde, serde);
+                                } catch (final IOException ioe) {
+                                    getLogger().warn("Local timestamp for {} is {}, which is later than Distributed state but failed to update Distributed "
+                                        + "state due to {}. If a new node performs Listing, data duplication may occur",
+                                        new Object[] {directory, locallyPersistedValue, ioe});
+                                }
+                            }
+                        }
+                    }
+                }
+            } catch (final IOException ioe) {
+                getLogger().warn("Failed to recover local state due to {}. Assuming that the state from the distributed cache is correct.", ioe);
+            }
+        }
+
+        return minTimestamp;
+    }
+
+
+    private String serializeState(final List<T> entities) throws JsonGenerationException, JsonMappingException, IOException {
+        // we need to keep track of all files that we pulled in that had a modification time equal to
+        // lastListingTime so that we can avoid pulling those files in again. We can't just ignore any files
+        // that have a mod time equal to that timestamp because more files may come in with the same timestamp
+        // later in the same millisecond.
+        if (entities.isEmpty()) {
+            return null;
+        } else {
+            final List<T> sortedEntities = new ArrayList<>(entities);
+            Collections.sort(sortedEntities, new Comparator<ListableEntity>() {
+                @Override
+                public int compare(final ListableEntity o1, final ListableEntity o2) {
+                    return Long.compare(o1.getTimestamp(), o2.getTimestamp());
+                }
+            });
+
+            final long latestListingModTime = sortedEntities.get(sortedEntities.size() - 1).getTimestamp();
+            final Set<String> idsWithTimestampEqualToListingTime = new HashSet<>();
+            for (int i = sortedEntities.size() - 1; i >= 0; i--) {
+                final ListableEntity entity = sortedEntities.get(i);
+                if (entity.getTimestamp() == latestListingModTime) {
+                    idsWithTimestampEqualToListingTime.add(entity.getIdentifier());
+                }
+            }
+
+            this.latestIdentifiersListed = idsWithTimestampEqualToListingTime;
+
+            final EntityListing listing = new EntityListing();
+            listing.setLatestTimestamp(new Date(latestListingModTime));
+            final Set<String> ids = new HashSet<>();
+            for (final String id : idsWithTimestampEqualToListingTime) {
+                ids.add(id);
+            }
+            listing.setMatchingIdentifiers(ids);
+
+            final ObjectMapper mapper = new ObjectMapper();
+            final String serializedState = mapper.writerWithType(EntityListing.class).writeValueAsString(listing);
+            return serializedState;
+        }
+    }
+
+    protected void persistLocalState(final String path, final String serializedState) throws IOException {
+        // we need to keep track of all files that we pulled in that had a modification time equal to
+        // lastListingTime so that we can avoid pulling those files in again. We can't just ignore any files
+        // that have a mod time equal to that timestamp because more files may come in with the same timestamp
+        // later in the same millisecond.
+        final File persistenceFile = getPersistenceFile();
+        final File dir = persistenceFile.getParentFile();
+        if (!dir.exists() && !dir.mkdirs()) {
+            throw new IOException("Could not create directory " + dir.getAbsolutePath() + " in order to save local state");
+        }
+
+        final Properties props = new Properties();
+        if (persistenceFile.exists()) {
+            try (final FileInputStream fis = new FileInputStream(persistenceFile)) {
+                props.load(fis);
+            }
+        }
+
+        props.setProperty(path, serializedState);
+
+        try (final FileOutputStream fos = new FileOutputStream(persistenceFile)) {
+            props.store(fos, null);
+        }
+    }
+
+
+    @Override
+    public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
+        final String path = getPath(context);
+        final DistributedMapCacheClient client = context.getProperty(DISTRIBUTED_CACHE_SERVICE).asControllerService(DistributedMapCacheClient.class);
+
+        final Long minTimestamp;
+        try {
+            minTimestamp = getMinTimestamp(path, client);
+        } catch (final IOException ioe) {
+            getLogger().error("Failed to retrieve timestamp of last listing from Distributed Cache Service. Will not perform listing until this is accomplished.");
+            context.yield();
+            return;
+        }
+
+        final List<T> entityList;
+        try {
+            entityList = performListing(context, minTimestamp);
+        } catch (final IOException e) {
+            getLogger().error("Failed to perform listing on remote host due to {}", e);
+            context.yield();
+            return;
+        }
+
+        if (entityList == null) {
+            context.yield();
+            return;
+        }
+
+        int listCount = 0;
+        Long latestListingTimestamp = null;
+        for (final T entity : entityList) {
+            final boolean list = (minTimestamp == null || entity.getTimestamp() > minTimestamp ||
+                (entity.getTimestamp() == minTimestamp && !latestIdentifiersListed.contains(entity.getIdentifier())));
+
+            // Create the FlowFile for this path.
+            if (list) {
+                final Map<String, String> attributes = createAttributes(entity, context);
+                FlowFile flowFile = session.create();
+                flowFile = session.putAllAttributes(flowFile, attributes);
+                session.transfer(flowFile, REL_SUCCESS);
+                listCount++;
+
+                if (latestListingTimestamp == null || entity.getTimestamp() > latestListingTimestamp) {
+                    latestListingTimestamp = entity.getTimestamp();
+                }
+            }
+        }
+
+        if (listCount > 0) {
+            getLogger().info("Successfully created listing with {} new objects", new Object[] {listCount});
+            session.commit();
+
+            // We have performed a listing and pushed the FlowFiles out.
+            // Now, we need to persist state about the Last Modified timestamp of the newest file
+            // that we pulled in. We do this in order to avoid pulling in the same file twice.
+            // However, we want to save the state both locally and remotely.
+            // We store the state remotely so that if a new Primary Node is chosen, it can pick up where the
+            // previously Primary Node left off.
+            // We also store the state locally so that if the node is restarted, and the node cannot contact
+            // the distributed state cache, the node can continue to run (if it is primary node).
+            String serializedState = null;
+            try {
+                serializedState = serializeState(entityList);
+            } catch (final Exception e) {
+                getLogger().error("Failed to serialize state due to {}", new Object[] {e});
+            }
+
+            if (serializedState != null) {
+                // Save our state locally.
+                try {
+                    persistLocalState(path, serializedState);
+                } catch (final IOException ioe) {
+                    getLogger().warn("Unable to save state locally. If the node is restarted now, data may be duplicated. Failure is due to {}", ioe);
+                }
+
+                // Attempt to save state to remote server.
+                if (client != null) {
+                    try {
+                        client.put(getKey(path), serializedState, new StringSerDe(), new StringSerDe());
+                    } catch (final IOException ioe) {
+                        getLogger().warn("Unable to communicate with distributed cache server due to {}. Persisting state locally instead.", ioe);
+                    }
+                }
+            }
+
+            lastListingTime = latestListingTimestamp;
+        } else {
+            getLogger().debug("There is no data to list. Yielding.");
+            context.yield();
+
+            // lastListingTime = 0 so that we don't continually poll the distributed cache / local file system
+            if (lastListingTime == null) {
+                lastListingTime = 0L;
+            }
+
+            return;
+        }
+    }
+
+
+    /**
+     * Creates a Map of attributes that should be applied to the FlowFile to represent this entity. This processor will emit a FlowFile for each "new" entity
+     * (see the documentation for this class for a discussion of how this class determines whether or not an entity is "new"). The FlowFile will contain no
+     * content. The attributes that will be included are exactly the attributes that are returned by this method.
+     *
+     * @param entity the entity represented by the FlowFile
+     * @param context the ProcessContext for obtaining configuration information
+     * @return a Map of attributes for this entity
+     */
+    protected abstract Map<String, String> createAttributes(T entity, ProcessContext context);
+
+    /**
+     * Returns the path to perform a listing on.
+     * Many resources can be comprised of a "path" (or a "container" or "bucket", etc.) as well as name or identifier that is unique only
+     * within that path. This method is responsible for returning the path that is currently being polled for entities. If this does concept
+     * does not apply for the concrete implementation, it is recommended that the concrete implementation return "." or "/" for all invocations of this method.
+     *
+     * @param context the ProcessContex to use in order to obtain configuration
+     * @return the path that is to be used to perform the listing, or <code>null</code> if not applicable.
+     */
+    protected abstract String getPath(final ProcessContext context);
+
+    /**
+     * Performs a listing of the remote entities that can be pulled. If any entity that is returned has already been "discovered" or "emitted"
+     * by this Processor, it will be ignored. A discussion of how the Processor determines those entities that have already been emitted is
+     * provided above in the documentation for this class. Any entity that is returned by this method with a timestamp prior to the minTimestamp
+     * will be filtered out by the Processor. Therefore, it is not necessary that implementations perform this filtering but can be more efficient
+     * if the filtering can be performed on the server side prior to retrieving the information.
+     *
+     * @param context the ProcessContex to use in order to pull the appropriate entities
+     * @param minTimestamp the minimum timestamp of entities that should be returned.
+     *
+     * @return a Listing of entities that have a timestamp >= minTimestamp
+     */
+    protected abstract List<T> performListing(final ProcessContext context, final Long minTimestamp) throws IOException;
+
+    /**
+     * Determines whether or not the listing must be reset if the value of the given property is changed
+     *
+     * @param property the property that has changed
+     * @return <code>true</code> if a change in value of the given property necessitates that the listing be reset, <code>false</code> otherwise.
+     */
+    protected abstract boolean isListingResetNecessary(final PropertyDescriptor property);
+
+
+
+    private static class StringSerDe implements Serializer<String>, Deserializer<String> {
+        @Override
+        public String deserialize(final byte[] value) throws DeserializationException, IOException {
+            if (value == null) {
+                return null;
+            }
+
+            return new String(value, StandardCharsets.UTF_8);
+        }
+
+        @Override
+        public void serialize(final String value, final OutputStream out) throws SerializationException, IOException {
+            out.write(value.getBytes(StandardCharsets.UTF_8));
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/d1d57931/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/FetchFileTransfer.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/FetchFileTransfer.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/FetchFileTransfer.java
new file mode 100644
index 0000000..5eecac3
--- /dev/null
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/FetchFileTransfer.java
@@ -0,0 +1,296 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.standard;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.nifi.annotation.lifecycle.OnStopped;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.io.OutputStreamCallback;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.processors.standard.util.FileTransfer;
+import org.apache.nifi.processors.standard.util.PermissionDeniedException;
+import org.apache.nifi.stream.io.StreamUtils;
+import org.apache.nifi.util.StopWatch;
+import org.apache.nifi.util.Tuple;
+
+/**
+ * A base class for FetchSFTP, FetchFTP processors
+ */
+public abstract class FetchFileTransfer extends AbstractProcessor {
+    static final PropertyDescriptor HOSTNAME = new PropertyDescriptor.Builder()
+        .name("Hostname")
+        .description("The fully-qualified hostname or IP address of the host to fetch the data from")
+        .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+        .expressionLanguageSupported(true)
+        .required(true)
+        .build();
+    static final PropertyDescriptor UNDEFAULTED_PORT = new PropertyDescriptor.Builder()
+        .name("Port")
+        .description("The port to connect to on the remote host to fetch the data from")
+        .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+        .expressionLanguageSupported(true)
+        .required(true)
+        .build();
+    public static final PropertyDescriptor REMOTE_FILENAME = new PropertyDescriptor.Builder()
+        .name("Remote File")
+        .description("The fully qualified filename on the remote system")
+        .required(true)
+        .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+        .expressionLanguageSupported(true)
+        .build();
+    public static final PropertyDescriptor DELETE_ORIGINAL = new PropertyDescriptor.Builder()
+        .name("Delete Original")
+        .description("Determines whether or not the file is deleted from the remote system after it has been successfully transferred")
+        .defaultValue("true")
+        .allowableValues("true", "false")
+        .required(true)
+        .build();
+
+    public static final Relationship REL_SUCCESS = new Relationship.Builder()
+        .name("success")
+        .description("All FlowFiles that are received are routed to success")
+        .build();
+    static final Relationship REL_COMMS_FAILURE = new Relationship.Builder()
+        .name("comms.failure")
+        .description("Any FlowFile that could not be fetched from the remote server due to a communications failure will be transferred to this Relationship.")
+        .build();
+    static final Relationship REL_NOT_FOUND = new Relationship.Builder()
+        .name("not.found")
+        .description("Any FlowFile for which we receive a 'Not Found' message from the remote server will be transferred to this Relationship.")
+        .build();
+    static final Relationship REL_PERMISSION_DENIED = new Relationship.Builder()
+        .name("permission.denied")
+        .description("Any FlowFile that could not be fetched from the remote server due to insufficient permissions will be transferred to this Relationship.")
+        .build();
+
+    private final Map<Tuple<String, Integer>, BlockingQueue<FileTransferIdleWrapper>> fileTransferMap = new HashMap<>();
+    private final long IDLE_CONNECTION_MILLIS = TimeUnit.SECONDS.toMillis(10L); // amount of time to wait before closing an idle connection
+    private volatile long lastClearTime = System.currentTimeMillis();
+
+    @Override
+    public Set<Relationship> getRelationships() {
+        final Set<Relationship> relationships = new HashSet<>();
+        relationships.add(REL_SUCCESS);
+        relationships.add(REL_NOT_FOUND);
+        relationships.add(REL_PERMISSION_DENIED);
+        relationships.add(REL_COMMS_FAILURE);
+        return relationships;
+    }
+
+    /**
+     * Close connections that are idle or optionally close all connections.
+     * Connections are considered "idle" if they have not been used in 10 seconds.
+     *
+     * @param closeNonIdleConnections if <code>true</code> will close all connection; if <code>false</code> will close only idle connections
+     */
+    private void closeConnections(final boolean closeNonIdleConnections) {
+        for (final Map.Entry<Tuple<String, Integer>, BlockingQueue<FileTransferIdleWrapper>> entry : fileTransferMap.entrySet()) {
+            final BlockingQueue<FileTransferIdleWrapper> wrapperQueue = entry.getValue();
+
+            final List<FileTransferIdleWrapper> putBack = new ArrayList<>();
+            FileTransferIdleWrapper wrapper;
+            while ((wrapper = wrapperQueue.poll()) != null) {
+                final long lastUsed = wrapper.getLastUsed();
+                final long nanosSinceLastUse = System.nanoTime() - lastUsed;
+                if (!closeNonIdleConnections && TimeUnit.NANOSECONDS.toMillis(nanosSinceLastUse) < IDLE_CONNECTION_MILLIS) {
+                    putBack.add(wrapper);
+                } else {
+                    try {
+                        wrapper.getFileTransfer().close();
+                    } catch (final IOException ioe) {
+                        getLogger().warn("Failed to close Idle Connection due to {}", new Object[] {ioe}, ioe);
+                    }
+                }
+            }
+
+            for (final FileTransferIdleWrapper toPutBack : putBack) {
+                wrapperQueue.offer(toPutBack);
+            }
+        }
+    }
+
+    @OnStopped
+    public void cleanup() {
+        // close all connections
+        closeConnections(true);
+    }
+
+    @Override
+    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        final List<PropertyDescriptor> properties = new ArrayList<>();
+        properties.add(HOSTNAME);
+        properties.add(UNDEFAULTED_PORT);
+        properties.add(REMOTE_FILENAME);
+        properties.add(DELETE_ORIGINAL);
+        return properties;
+    }
+
+    @Override
+    public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
+        FlowFile flowFile = session.get();
+        if (flowFile == null) {
+            return;
+        }
+
+        final StopWatch stopWatch = new StopWatch(true);
+        final String host = context.getProperty(HOSTNAME).evaluateAttributeExpressions(flowFile).getValue();
+        final int port = context.getProperty(UNDEFAULTED_PORT).evaluateAttributeExpressions(flowFile).asInteger();
+        final String filename = context.getProperty(REMOTE_FILENAME).evaluateAttributeExpressions(flowFile).getValue();
+
+        // Try to get a FileTransfer object from our cache.
+        BlockingQueue<FileTransferIdleWrapper> transferQueue;
+        synchronized (fileTransferMap) {
+            final Tuple<String, Integer> tuple = new Tuple<>(host, port);
+
+            transferQueue = fileTransferMap.get(tuple);
+            if (transferQueue == null) {
+                transferQueue = new LinkedBlockingQueue<>();
+                fileTransferMap.put(tuple, transferQueue);
+            }
+
+            // periodically close idle connections
+            if (System.currentTimeMillis() - lastClearTime > IDLE_CONNECTION_MILLIS) {
+                closeConnections(false);
+                lastClearTime = System.currentTimeMillis();
+            }
+        }
+
+        // we have a queue of FileTransfer Objects. Get one from the queue or create a new one.
+        FileTransfer transfer;
+        FileTransferIdleWrapper transferWrapper = transferQueue.poll();
+        if (transferWrapper == null) {
+            transfer = createFileTransfer(context);
+        } else {
+            transfer = transferWrapper.getFileTransfer();
+        }
+
+        // Pull data from remote system.
+        final InputStream in;
+        try {
+            in = transfer.getInputStream(filename, flowFile);
+            flowFile = session.write(flowFile, new OutputStreamCallback() {
+                @Override
+                public void process(final OutputStream out) throws IOException {
+                    StreamUtils.copy(in, out);
+                }
+            });
+            transfer.flush();
+            transferQueue.offer(new FileTransferIdleWrapper(transfer, System.nanoTime()));
+        } catch (final FileNotFoundException e) {
+            getLogger().error("Failed to fetch content for {} from filename {} on remote host {} because the file could not be found on the remote system; routing to {}",
+                new Object[] {flowFile, filename, host, REL_NOT_FOUND.getName()});
+            session.transfer(session.penalize(flowFile), REL_NOT_FOUND);
+            session.getProvenanceReporter().route(flowFile, REL_NOT_FOUND);
+            return;
+        } catch (final PermissionDeniedException e) {
+            getLogger().error("Failed to fetch content for {} from filename {} on remote host {} due to insufficient permissions; routing to {}",
+                new Object[] {flowFile, filename, host, REL_PERMISSION_DENIED.getName()});
+            session.transfer(session.penalize(flowFile), REL_PERMISSION_DENIED);
+            session.getProvenanceReporter().route(flowFile, REL_PERMISSION_DENIED);
+            return;
+        } catch (final IOException e) {
+            try {
+                transfer.close();
+            } catch (final IOException e1) {
+                getLogger().warn("Failed to close connection to {}:{} due to {}", new Object[] {host, port, e.toString()}, e);
+            }
+
+            getLogger().error("Failed to fetch content for {} from filename {} on remote host {}:{} due to {}; routing to failure",
+                new Object[] {flowFile, filename, host, port, e.toString()}, e);
+            session.transfer(session.penalize(flowFile), REL_COMMS_FAILURE);
+            return;
+        }
+
+        // Add FlowFile attributes
+        final String protocolName = transfer.getProtocolName();
+        final Map<String, String> attributes = new HashMap<>();
+        attributes.put(protocolName + ".remote.host", host);
+        attributes.put(protocolName + ".remote.port", String.valueOf(port));
+        attributes.put(protocolName + ".remote.filename", filename);
+        attributes.put(CoreAttributes.FILENAME.key(), filename);
+        flowFile = session.putAllAttributes(flowFile, attributes);
+
+        // emit provenance event and transfer FlowFile
+        session.getProvenanceReporter().modifyContent(flowFile, "Content replaced with content from " + protocolName + "://" + host + ":" + port + "/" + filename,
+            stopWatch.getElapsed(TimeUnit.MILLISECONDS));
+        session.transfer(flowFile, REL_SUCCESS);
+
+        // delete remote file is necessary
+        final boolean deleteOriginal = context.getProperty(DELETE_ORIGINAL).asBoolean();
+        if (deleteOriginal) {
+            try {
+                transfer.deleteFile(null, filename);
+            } catch (final FileNotFoundException e) {
+                // file doesn't exist -- effectively the same as removing it. Move on.
+            } catch (final IOException ioe) {
+                getLogger().warn("Successfully fetched the content for {} from {}:{}{} but failed to remove the remote file due to {}", new Object[] {flowFile, host, port, filename, ioe}, ioe);
+            }
+        }
+    }
+
+
+    /**
+     * Creates a new instance of a FileTransfer that can be used to pull files from a remote system.
+     *
+     * @param context the ProcessContext to use in order to obtain configured properties
+     * @return a FileTransfer that can be used to pull files from a remote system
+     */
+    protected abstract FileTransfer createFileTransfer(ProcessContext context);
+
+    /**
+     * Wrapper around a FileTransfer object that is used to know when the FileTransfer was last used, so that
+     * we have the ability to close connections that are "idle," or unused for some period of time.
+     */
+    private static class FileTransferIdleWrapper {
+        private final FileTransfer fileTransfer;
+        private final long lastUsed;
+
+        public FileTransferIdleWrapper(final FileTransfer fileTransfer, final long lastUsed) {
+            this.fileTransfer = fileTransfer;
+            this.lastUsed = lastUsed;
+        }
+
+        public FileTransfer getFileTransfer() {
+            return fileTransfer;
+        }
+
+        public long getLastUsed() {
+            return this.lastUsed;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/d1d57931/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/FetchSFTP.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/FetchSFTP.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/FetchSFTP.java
new file mode 100644
index 0000000..6387e19
--- /dev/null
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/FetchSFTP.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.standard;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.nifi.annotation.behavior.SupportsBatching;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processors.standard.util.FileTransfer;
+import org.apache.nifi.processors.standard.util.SFTPTransfer;
+
+
+@SupportsBatching
+@Tags({"sftp", "get", "retrieve", "files", "fetch", "remote", "ingest", "source", "input"})
+@CapabilityDescription("Fetches the content of a file from a remote SFTP server and overwrites the contents of an incoming FlowFile with the content of the remote file.")
+@SeeAlso({GetSFTP.class, PutSFTP.class, GetFTP.class, PutFTP.class})
+@WritesAttributes({
+    @WritesAttribute(attribute = "sftp.remote.host", description = "The hostname or IP address from which the file was pulled"),
+    @WritesAttribute(attribute = "sftp.remote.port", description = "The port that was used to communicate with the remote SFTP server"),
+    @WritesAttribute(attribute = "sftp.remote.filename", description = "The name of the remote file that was pulled"),
+    @WritesAttribute(attribute = "filename", description = "The filename is updated to point to the filename fo the remote file"),
+})
+public class FetchSFTP extends FetchFileTransfer {
+
+    @Override
+    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        final List<PropertyDescriptor> properties = new ArrayList<>();
+        properties.add(FetchFileTransfer.HOSTNAME);
+        properties.add(SFTPTransfer.PORT);
+        properties.add(SFTPTransfer.USERNAME);
+        properties.add(SFTPTransfer.PASSWORD);
+        properties.add(SFTPTransfer.PRIVATE_KEY_PATH);
+        properties.add(SFTPTransfer.PRIVATE_KEY_PASSPHRASE);
+        properties.add(FetchFileTransfer.REMOTE_FILENAME);
+        properties.add(SFTPTransfer.DELETE_ORIGINAL);
+        properties.add(SFTPTransfer.CONNECTION_TIMEOUT);
+        properties.add(SFTPTransfer.DATA_TIMEOUT);
+        properties.add(SFTPTransfer.USE_KEEPALIVE_ON_TIMEOUT);
+        properties.add(SFTPTransfer.HOST_KEY_FILE);
+        properties.add(SFTPTransfer.STRICT_HOST_KEY_CHECKING);
+        properties.add(SFTPTransfer.USE_COMPRESSION);
+        return properties;
+    }
+
+    @Override
+    protected Collection<ValidationResult> customValidate(final ValidationContext validationContext) {
+        if (!validationContext.getProperty(SFTPTransfer.PASSWORD).isSet() && !(validationContext.getProperty(SFTPTransfer.PRIVATE_KEY_PASSPHRASE).isSet()
+            && validationContext.getProperty(SFTPTransfer.PRIVATE_KEY_PATH).isSet())) {
+            return Collections.singleton(new ValidationResult.Builder()
+                .subject("Password")
+                .valid(false)
+                .explanation("Must set either password or Private Key Path & Passphrase")
+                .build());
+        }
+
+        return Collections.emptyList();
+    }
+
+    @Override
+    protected FileTransfer createFileTransfer(final ProcessContext context) {
+        return new SFTPTransfer(context, getLogger());
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/d1d57931/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListFileTransfer.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListFileTransfer.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListFileTransfer.java
new file mode 100644
index 0000000..d6e1cd1
--- /dev/null
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListFileTransfer.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.standard;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.processors.standard.util.FileInfo;
+import org.apache.nifi.processors.standard.util.FileTransfer;
+
+public abstract class ListFileTransfer extends AbstractListProcessor<FileInfo> {
+    public static final PropertyDescriptor HOSTNAME = new PropertyDescriptor.Builder()
+        .name("Hostname")
+        .description("The fully qualified hostname or IP address of the remote system")
+        .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+        .required(true)
+        .expressionLanguageSupported(true)
+        .build();
+    public static final PropertyDescriptor REMOTE_PATH = new PropertyDescriptor.Builder()
+        .name("Remote Path")
+        .description("The path on the remote system from which to pull or push files")
+        .required(false)
+        .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+        .expressionLanguageSupported(true)
+        .defaultValue(".")
+        .build();
+
+
+    @Override
+    protected Map<String, String> createAttributes(final FileInfo fileInfo, final ProcessContext context) {
+        final Map<String, String> attributes = new HashMap<>();
+        attributes.put(getProtocolName() + ".remote.host", context.getProperty(HOSTNAME).evaluateAttributeExpressions().getValue());
+        attributes.put("file.owner", fileInfo.getOwner());
+        attributes.put("file.group", fileInfo.getGroup());
+        attributes.put("file.permissions", fileInfo.getPermissions());
+        attributes.put(CoreAttributes.FILENAME.key(), fileInfo.getFileName());
+
+        final String fullPath = fileInfo.getFullPathFileName();
+        if (fullPath != null) {
+            final int index = fullPath.lastIndexOf("/");
+            if (index > -1) {
+                final String path = fullPath.substring(0, index);
+                attributes.put(CoreAttributes.PATH.key(), path);
+            }
+        }
+        return attributes;
+    }
+
+    @Override
+    protected String getPath(final ProcessContext context) {
+        return context.getProperty(REMOTE_PATH).getValue();
+    }
+
+    @Override
+    protected List<FileInfo> performListing(final ProcessContext context, final Long minTimestamp) throws IOException {
+        final FileTransfer transfer = getFileTransfer(context);
+        final List<FileInfo> listing = transfer.getListing();
+        if (minTimestamp == null) {
+            return listing;
+        }
+
+        final Iterator<FileInfo> itr = listing.iterator();
+        while (itr.hasNext()) {
+            final FileInfo next = itr.next();
+            if (next.getLastModifiedTime() < minTimestamp) {
+                itr.remove();
+            }
+        }
+
+        return listing;
+    }
+
+    @Override
+    protected boolean isListingResetNecessary(final PropertyDescriptor property) {
+        return HOSTNAME.equals(property) || REMOTE_PATH.equals(property);
+    }
+
+    protected abstract FileTransfer getFileTransfer(final ProcessContext context);
+
+    protected abstract String getProtocolName();
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/d1d57931/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListSFTP.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListSFTP.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListSFTP.java
new file mode 100644
index 0000000..3b6b69e
--- /dev/null
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListSFTP.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.standard;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.nifi.annotation.behavior.TriggerSerially;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processors.standard.util.FileTransfer;
+import org.apache.nifi.processors.standard.util.SFTPTransfer;
+
+@TriggerSerially
+@Tags({"list", "sftp", "remote", "ingest", "source", "input", "files"})
+@CapabilityDescription("Performs a listing of the files residing on an SFTP server. For each file that is found on the remote server, a new FlowFile will be created with the filename attribute "
+    + "set to the name of the file on the remote server. This can then be used in conjunction with FetchSFTP in order to fetch those files.")
+@SeeAlso({FetchSFTP.class, GetSFTP.class, PutSFTP.class})
+@WritesAttributes({
+    @WritesAttribute(attribute = "sftp.remote.host", description = "The hostname of the SFTP Server"),
+    @WritesAttribute(attribute = "file.owner", description = "The numeric owner id of the source file"),
+    @WritesAttribute(attribute = "file.group", description = "The numeric group id of the source file"),
+    @WritesAttribute(attribute = "file.permissions", description = "The read/write/execute permissions of the source file"),
+    @WritesAttribute(attribute = "filename", description = "The name of the file on the SFTP Server"),
+    @WritesAttribute(attribute = "path", description = "The fully qualified name of the directory on the SFTP Server from which the file was pulled"),
+})
+public class ListSFTP extends ListFileTransfer {
+
+    @Override
+    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        final List<PropertyDescriptor> properties = new ArrayList<>();
+        properties.add(SFTPTransfer.HOSTNAME);
+        properties.add(SFTPTransfer.PORT);
+        properties.add(SFTPTransfer.USERNAME);
+        properties.add(SFTPTransfer.PASSWORD);
+        properties.add(SFTPTransfer.PRIVATE_KEY_PATH);
+        properties.add(SFTPTransfer.PRIVATE_KEY_PASSPHRASE);
+        properties.add(REMOTE_PATH);
+        properties.add(DISTRIBUTED_CACHE_SERVICE);
+        properties.add(SFTPTransfer.RECURSIVE_SEARCH);
+        properties.add(SFTPTransfer.FILE_FILTER_REGEX);
+        properties.add(SFTPTransfer.PATH_FILTER_REGEX);
+        properties.add(SFTPTransfer.IGNORE_DOTTED_FILES);
+        properties.add(SFTPTransfer.STRICT_HOST_KEY_CHECKING);
+        properties.add(SFTPTransfer.HOST_KEY_FILE);
+        properties.add(SFTPTransfer.CONNECTION_TIMEOUT);
+        properties.add(SFTPTransfer.DATA_TIMEOUT);
+        properties.add(SFTPTransfer.USE_KEEPALIVE_ON_TIMEOUT);
+        return properties;
+    }
+
+    @Override
+    protected FileTransfer getFileTransfer(final ProcessContext context) {
+        return new SFTPTransfer(context, getLogger());
+    }
+
+    @Override
+    protected String getProtocolName() {
+        return "sftp";
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/d1d57931/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/EntityListing.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/EntityListing.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/EntityListing.java
new file mode 100644
index 0000000..56489f0
--- /dev/null
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/EntityListing.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.standard.util;
+
+import java.util.Collection;
+import java.util.Date;
+
+import javax.xml.bind.annotation.XmlTransient;
+import javax.xml.bind.annotation.XmlType;
+
+/**
+ * A simple POJO for maintaining state about the last entities listed by an AbstractListProcessor that was performed so that
+ * we can avoid pulling the same file multiple times
+ */
+@XmlType(name = "listing")
+public class EntityListing {
+
+    private Date latestTimestamp;
+    private Collection<String> matchingIdentifiers;
+
+    /**
+     * @return the modification date of the newest file that was contained in the listing
+     */
+    public Date getLatestTimestamp() {
+        return latestTimestamp;
+    }
+
+    /**
+     * Sets the timestamp of the modification date of the newest file that was contained in the listing
+     *
+     * @param latestTimestamp the timestamp of the modification date of the newest file that was contained in the listing
+     */
+    public void setLatestTimestamp(Date latestTimestamp) {
+        this.latestTimestamp = latestTimestamp;
+    }
+
+    /**
+     * @return a Collection containing the identifiers of all entities in the listing whose timestamp
+     *         was equal to {@link #getLatestTimestamp()}
+     */
+    @XmlTransient
+    public Collection<String> getMatchingIdentifiers() {
+        return matchingIdentifiers;
+    }
+
+    /**
+     * Sets the Collection containing the identifiers of all entities in the listing whose Timestamp was
+     * equal to {@link #getLatestTimestamp()}
+     * 
+     * @param matchingIdentifiers the identifiers that have last modified date matching the latest timestamp
+     */
+    public void setMatchingIdentifiers(Collection<String> matchingIdentifiers) {
+        this.matchingIdentifiers = matchingIdentifiers;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/d1d57931/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/FTPTransfer.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/FTPTransfer.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/FTPTransfer.java
index 41a42bb..7f659d4 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/FTPTransfer.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/FTPTransfer.java
@@ -34,16 +34,16 @@ import java.util.Locale;
 import java.util.concurrent.TimeUnit;
 import java.util.regex.Pattern;
 
+import org.apache.commons.net.ftp.FTPClient;
+import org.apache.commons.net.ftp.FTPFile;
+import org.apache.commons.net.ftp.FTPHTTPClient;
+import org.apache.commons.net.ftp.FTPReply;
 import org.apache.nifi.components.PropertyDescriptor;
 import org.apache.nifi.flowfile.FlowFile;
 import org.apache.nifi.logging.ProcessorLog;
 import org.apache.nifi.processor.ProcessContext;
 import org.apache.nifi.processor.exception.ProcessException;
 import org.apache.nifi.processor.util.StandardValidators;
-import org.apache.commons.net.ftp.FTPClient;
-import org.apache.commons.net.ftp.FTPFile;
-import org.apache.commons.net.ftp.FTPHTTPClient;
-import org.apache.commons.net.ftp.FTPReply;
 
 public class FTPTransfer implements FileTransfer {
 
@@ -57,53 +57,53 @@ public class FTPTransfer implements FileTransfer {
     public static final String PROXY_TYPE_SOCKS = Proxy.Type.SOCKS.name();
 
     public static final PropertyDescriptor CONNECTION_MODE = new PropertyDescriptor.Builder()
-            .name("Connection Mode")
-            .description("The FTP Connection Mode")
-            .allowableValues(CONNECTION_MODE_ACTIVE, CONNECTION_MODE_PASSIVE)
-            .defaultValue(CONNECTION_MODE_PASSIVE)
-            .build();
+        .name("Connection Mode")
+        .description("The FTP Connection Mode")
+        .allowableValues(CONNECTION_MODE_ACTIVE, CONNECTION_MODE_PASSIVE)
+        .defaultValue(CONNECTION_MODE_PASSIVE)
+        .build();
     public static final PropertyDescriptor TRANSFER_MODE = new PropertyDescriptor.Builder()
-            .name("Transfer Mode")
-            .description("The FTP Transfer Mode")
-            .allowableValues(TRANSFER_MODE_BINARY, TRANSFER_MODE_ASCII)
-            .defaultValue(TRANSFER_MODE_BINARY)
-            .build();
+        .name("Transfer Mode")
+        .description("The FTP Transfer Mode")
+        .allowableValues(TRANSFER_MODE_BINARY, TRANSFER_MODE_ASCII)
+        .defaultValue(TRANSFER_MODE_BINARY)
+        .build();
     public static final PropertyDescriptor PORT = new PropertyDescriptor.Builder()
-            .name("Port")
-            .description("The port that the remote system is listening on for file transfers")
-            .addValidator(StandardValidators.PORT_VALIDATOR)
-            .required(true)
-            .defaultValue("21")
-            .build();
+        .name("Port")
+        .description("The port that the remote system is listening on for file transfers")
+        .addValidator(StandardValidators.PORT_VALIDATOR)
+        .required(true)
+        .defaultValue("21")
+        .build();
     public static final PropertyDescriptor PROXY_TYPE = new PropertyDescriptor.Builder()
-            .name("Proxy Type")
-            .description("Proxy type used for file transfers")
-            .allowableValues(PROXY_TYPE_DIRECT, PROXY_TYPE_HTTP, PROXY_TYPE_SOCKS)
-            .defaultValue(PROXY_TYPE_DIRECT)
-            .build();
+        .name("Proxy Type")
+        .description("Proxy type used for file transfers")
+        .allowableValues(PROXY_TYPE_DIRECT, PROXY_TYPE_HTTP, PROXY_TYPE_SOCKS)
+        .defaultValue(PROXY_TYPE_DIRECT)
+        .build();
     public static final PropertyDescriptor PROXY_HOST = new PropertyDescriptor.Builder()
-            .name("Proxy Host")
-            .description("The fully qualified hostname or IP address of the proxy server")
-            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
-            .build();
+        .name("Proxy Host")
+        .description("The fully qualified hostname or IP address of the proxy server")
+        .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+        .build();
     public static final PropertyDescriptor PROXY_PORT = new PropertyDescriptor.Builder()
-            .name("Proxy Port")
-            .description("The port of the proxy server")
-            .addValidator(StandardValidators.PORT_VALIDATOR)
-            .build();
+        .name("Proxy Port")
+        .description("The port of the proxy server")
+        .addValidator(StandardValidators.PORT_VALIDATOR)
+        .build();
     public static final PropertyDescriptor HTTP_PROXY_USERNAME = new PropertyDescriptor.Builder()
-            .name("Http Proxy Username")
-            .description("Http Proxy Username")
-            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
-            .required(false)
-            .build();
+        .name("Http Proxy Username")
+        .description("Http Proxy Username")
+        .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+        .required(false)
+        .build();
     public static final PropertyDescriptor HTTP_PROXY_PASSWORD = new PropertyDescriptor.Builder()
-            .name("Http Proxy Password")
-            .description("Http Proxy Password")
-            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
-            .required(false)
-            .sensitive(true)
-            .build();
+        .name("Http Proxy Password")
+        .description("Http Proxy Password")
+        .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+        .required(false)
+        .sensitive(true)
+        .build();
 
     private final ProcessorLog logger;
 
@@ -135,7 +135,7 @@ public class FTPTransfer implements FileTransfer {
                 client.disconnect();
             }
         } catch (final Exception ex) {
-            logger.warn("Failed to close FTPClient due to {}", new Object[]{ex.toString()}, ex);
+            logger.warn("Failed to close FTPClient due to {}", new Object[] { ex.toString() }, ex);
         }
         client = null;
     }
@@ -261,19 +261,24 @@ public class FTPTransfer implements FileTransfer {
         perms.append(file.hasPermission(FTPFile.WORLD_ACCESS, FTPFile.EXECUTE_PERMISSION) ? "x" : "-");
 
         FileInfo.Builder builder = new FileInfo.Builder()
-                .filename(file.getName())
-                .fullPathFileName(newFullForwardPath)
-                .directory(file.isDirectory())
-                .size(file.getSize())
-                .lastModifiedTime(file.getTimestamp().getTimeInMillis())
-                .permissions(perms.toString())
-                .owner(file.getUser())
-                .group(file.getGroup());
+            .filename(file.getName())
+            .fullPathFileName(newFullForwardPath)
+            .directory(file.isDirectory())
+            .size(file.getSize())
+            .lastModifiedTime(file.getTimestamp().getTimeInMillis())
+            .permissions(perms.toString())
+            .owner(file.getUser())
+            .group(file.getGroup());
         return builder.build();
     }
 
     @Override
-    public InputStream getInputStream(final String remoteFileName) throws IOException {
+    public InputStream getInputStream(String remoteFileName) throws IOException {
+        return getInputStream(remoteFileName, null);
+    }
+
+    @Override
+    public InputStream getInputStream(final String remoteFileName, final FlowFile flowFile) throws IOException {
         final FTPClient client = getClient(null);
         InputStream in = client.retrieveFileStream(remoteFileName);
         if (in == null) {
@@ -329,9 +334,9 @@ public class FTPTransfer implements FileTransfer {
         final boolean cdSuccessful = setWorkingDirectory(remoteDirectory);
 
         if (!cdSuccessful) {
-            logger.debug("Remote Directory {} does not exist; creating it", new Object[]{remoteDirectory});
+            logger.debug("Remote Directory {} does not exist; creating it", new Object[] { remoteDirectory });
             if (client.makeDirectory(remoteDirectory)) {
-                logger.debug("Created {}", new Object[]{remoteDirectory});
+                logger.debug("Created {}", new Object[] { remoteDirectory });
             } else {
                 throw new IOException("Failed to create remote directory " + remoteDirectory);
             }
@@ -387,10 +392,10 @@ public class FTPTransfer implements FileTransfer {
                 final String time = outformat.format(fileModifyTime);
                 if (!client.setModificationTime(tempFilename, time)) {
                     // FTP server probably doesn't support MFMT command
-                    logger.warn("Could not set lastModifiedTime on {} to {}", new Object[]{flowFile, lastModifiedTime});
+                    logger.warn("Could not set lastModifiedTime on {} to {}", new Object[] { flowFile, lastModifiedTime });
                 }
             } catch (final Exception e) {
-                logger.error("Failed to set lastModifiedTime on {} to {} due to {}", new Object[]{flowFile, lastModifiedTime, e});
+                logger.error("Failed to set lastModifiedTime on {} to {} due to {}", new Object[] { flowFile, lastModifiedTime, e });
             }
         }
         final String permissions = ctx.getProperty(PERMISSIONS).evaluateAttributeExpressions(flowFile).getValue();
@@ -399,17 +404,17 @@ public class FTPTransfer implements FileTransfer {
                 int perms = numberPermissions(permissions);
                 if (perms >= 0) {
                     if (!client.sendSiteCommand("chmod " + Integer.toOctalString(perms) + " " + tempFilename)) {
-                        logger.warn("Could not set permission on {} to {}", new Object[]{flowFile, permissions});
+                        logger.warn("Could not set permission on {} to {}", new Object[] { flowFile, permissions });
                     }
                 }
             } catch (final Exception e) {
-                logger.error("Failed to set permission on {} to {} due to {}", new Object[]{flowFile, permissions, e});
+                logger.error("Failed to set permission on {} to {} due to {}", new Object[] { flowFile, permissions, e });
             }
         }
 
         if (!filename.equals(tempFilename)) {
             try {
-                logger.debug("Renaming remote path from {} to {} for {}", new Object[]{tempFilename, filename, flowFile});
+                logger.debug("Renaming remote path from {} to {} for {}", new Object[] { tempFilename, filename, flowFile });
                 final boolean renameSuccessful = client.rename(tempFilename, filename);
                 if (!renameSuccessful) {
                     throw new IOException("Failed to rename temporary file " + tempFilename + " to " + fullPath + " due to: " + client.getReplyString());
@@ -513,13 +518,13 @@ public class FTPTransfer implements FileTransfer {
             inetAddress = InetAddress.getByName(remoteHostname);
         }
 
-        client.connect(inetAddress, ctx.getProperty(PORT).asInteger());
+        client.connect(inetAddress, ctx.getProperty(PORT).evaluateAttributeExpressions(flowFile).asInteger());
         this.closed = false;
         client.setDataTimeout(ctx.getProperty(DATA_TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS).intValue());
         client.setSoTimeout(ctx.getProperty(CONNECTION_TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS).intValue());
 
-        final String username = ctx.getProperty(USERNAME).getValue();
-        final String password = ctx.getProperty(PASSWORD).getValue();
+        final String username = ctx.getProperty(USERNAME).evaluateAttributeExpressions(flowFile).getValue();
+        final String password = ctx.getProperty(PASSWORD).evaluateAttributeExpressions(flowFile).getValue();
         final boolean loggedIn = client.login(username, password);
         if (!loggedIn) {
             throw new IOException("Could not login for user '" + username + "'");
@@ -532,7 +537,7 @@ public class FTPTransfer implements FileTransfer {
             client.enterLocalPassiveMode();
         }
 
-        final String transferMode = ctx.getProperty(TRANSFER_MODE).getValue();
+        final String transferMode = ctx.getProperty(TRANSFER_MODE).evaluateAttributeExpressions(flowFile).getValue();
         final int fileType = (transferMode.equalsIgnoreCase(TRANSFER_MODE_ASCII)) ? FTPClient.ASCII_FILE_TYPE : FTPClient.BINARY_FILE_TYPE;
         if (!client.setFileType(fileType)) {
             throw new IOException("Unable to set transfer mode to type " + transferMode);

http://git-wip-us.apache.org/repos/asf/nifi/blob/d1d57931/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/FileInfo.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/FileInfo.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/FileInfo.java
index c57b4e0..b893f75 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/FileInfo.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/FileInfo.java
@@ -18,7 +18,7 @@ package org.apache.nifi.processors.standard.util;
 
 import java.io.Serializable;
 
-public class FileInfo implements Comparable<FileInfo>, Serializable {
+public class FileInfo implements Comparable<FileInfo>, Serializable, ListableEntity {
 
     private static final long serialVersionUID = 1L;
 
@@ -164,4 +164,20 @@ public class FileInfo implements Comparable<FileInfo>, Serializable {
             return this;
         }
     }
+
+    @Override
+    public String getName() {
+        return getFileName();
+    }
+
+    @Override
+    public String getIdentifier() {
+        final String fullPathName = getFullPathFileName();
+        return fullPathName == null ? getName() : fullPathName;
+    }
+
+    @Override
+    public long getTimestamp() {
+        return getLastModifiedTime();
+    }
 }