You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by mc...@apache.org on 2015/10/30 15:29:48 UTC

[22/50] [abbrv] nifi git commit: NIFI-673: Initial implementation of ListSFTP, FetchSFTP

http://git-wip-us.apache.org/repos/asf/nifi/blob/d1d57931/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/FileTransfer.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/FileTransfer.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/FileTransfer.java
index f0061b8..fe277df 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/FileTransfer.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/FileTransfer.java
@@ -23,6 +23,7 @@ import java.io.InputStream;
 import java.util.List;
 
 import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.Validator;
 import org.apache.nifi.flowfile.FlowFile;
 import org.apache.nifi.processor.util.StandardValidators;
 
@@ -34,6 +35,8 @@ public interface FileTransfer extends Closeable {
 
     InputStream getInputStream(String remoteFileName) throws IOException;
 
+    InputStream getInputStream(String remoteFileName, FlowFile flowFile) throws IOException;
+
     void flush() throws IOException;
 
     FileInfo getRemoteFileInfo(FlowFile flowFile, String path, String remoteFileName) throws IOException;
@@ -51,127 +54,127 @@ public interface FileTransfer extends Closeable {
     void ensureDirectoryExists(FlowFile flowFile, File remoteDirectory) throws IOException;
 
     public static final PropertyDescriptor HOSTNAME = new PropertyDescriptor.Builder()
-            .name("Hostname")
-            .description("The fully qualified hostname or IP address of the remote system")
-            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
-            .required(true)
-            .expressionLanguageSupported(true)
-            .build();
+        .name("Hostname")
+        .description("The fully qualified hostname or IP address of the remote system")
+        .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+        .required(true)
+        .expressionLanguageSupported(true)
+        .build();
     public static final PropertyDescriptor USERNAME = new PropertyDescriptor.Builder()
-            .name("Username")
-            .description("Username")
-            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
-            .required(true)
-            .build();
+        .name("Username")
+        .description("Username")
+        .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+        .required(true)
+        .build();
     public static final PropertyDescriptor PASSWORD = new PropertyDescriptor.Builder()
-            .name("Password")
-            .description("Password for the user account")
-            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
-            .required(false)
-            .sensitive(true)
-            .build();
+        .name("Password")
+        .description("Password for the user account")
+        .addValidator(Validator.VALID)
+        .required(false)
+        .sensitive(true)
+        .build();
     public static final PropertyDescriptor DATA_TIMEOUT = new PropertyDescriptor.Builder()
-            .name("Data Timeout")
-            .description("Amount of time to wait before timing out while transferring data")
-            .required(true)
-            .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
-            .defaultValue("30 sec")
-            .build();
+        .name("Data Timeout")
+        .description("When transferring a file between the local and remote system, this value specifies how long is allowed to elapse without any data being transferred between systems")
+        .required(true)
+        .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
+        .defaultValue("30 sec")
+        .build();
     public static final PropertyDescriptor CONNECTION_TIMEOUT = new PropertyDescriptor.Builder()
-            .name("Connection Timeout")
-            .description("Amount of time to wait before timing out while creating a connection")
-            .required(true)
-            .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
-            .defaultValue("30 sec")
-            .build();
+        .name("Connection Timeout")
+        .description("Amount of time to wait before timing out while creating a connection")
+        .required(true)
+        .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
+        .defaultValue("30 sec")
+        .build();
     public static final PropertyDescriptor REMOTE_PATH = new PropertyDescriptor.Builder()
-            .name("Remote Path")
-            .description("The path on the remote system from which to pull or push files")
-            .required(false)
-            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
-            .expressionLanguageSupported(true)
-            .build();
+        .name("Remote Path")
+        .description("The path on the remote system from which to pull or push files")
+        .required(false)
+        .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+        .expressionLanguageSupported(true)
+        .build();
     public static final PropertyDescriptor CREATE_DIRECTORY = new PropertyDescriptor.Builder()
-            .name("Create Directory")
-            .description("Specifies whether or not the remote directory should be created if it does not exist.")
-            .required(true)
-            .allowableValues("true", "false")
-            .defaultValue("false")
-            .build();
+        .name("Create Directory")
+        .description("Specifies whether or not the remote directory should be created if it does not exist.")
+        .required(true)
+        .allowableValues("true", "false")
+        .defaultValue("false")
+        .build();
 
     public static final PropertyDescriptor USE_COMPRESSION = new PropertyDescriptor.Builder()
-            .name("Use Compression")
-            .description("Indicates whether or not ZLIB compression should be used when transferring files")
-            .allowableValues("true", "false")
-            .defaultValue("false")
-            .required(true)
-            .build();
+        .name("Use Compression")
+        .description("Indicates whether or not ZLIB compression should be used when transferring files")
+        .allowableValues("true", "false")
+        .defaultValue("false")
+        .required(true)
+        .build();
 
     // GET-specific properties
     public static final PropertyDescriptor RECURSIVE_SEARCH = new PropertyDescriptor.Builder()
-            .name("Search Recursively")
-            .description("If true, will pull files from arbitrarily nested subdirectories; otherwise, will not traverse subdirectories")
-            .required(true)
-            .defaultValue("false")
-            .allowableValues("true", "false")
-            .build();
+        .name("Search Recursively")
+        .description("If true, will pull files from arbitrarily nested subdirectories; otherwise, will not traverse subdirectories")
+        .required(true)
+        .defaultValue("false")
+        .allowableValues("true", "false")
+        .build();
     public static final PropertyDescriptor FILE_FILTER_REGEX = new PropertyDescriptor.Builder()
-            .name("File Filter Regex")
-            .description("Provides a Java Regular Expression for filtering Filenames; if a filter is supplied, only files whose names match that Regular Expression will be fetched")
-            .required(false)
-            .addValidator(StandardValidators.REGULAR_EXPRESSION_VALIDATOR)
-            .build();
+        .name("File Filter Regex")
+        .description("Provides a Java Regular Expression for filtering Filenames; if a filter is supplied, only files whose names match that Regular Expression will be fetched")
+        .required(false)
+        .addValidator(StandardValidators.REGULAR_EXPRESSION_VALIDATOR)
+        .build();
     public static final PropertyDescriptor PATH_FILTER_REGEX = new PropertyDescriptor.Builder()
-            .name("Path Filter Regex")
-            .description("When " + RECURSIVE_SEARCH.getName() + " is true, then only subdirectories whose path matches the given Regular Expression will be scanned")
-            .required(false)
-            .addValidator(StandardValidators.REGULAR_EXPRESSION_VALIDATOR)
-            .build();
+        .name("Path Filter Regex")
+        .description("When " + RECURSIVE_SEARCH.getName() + " is true, then only subdirectories whose path matches the given Regular Expression will be scanned")
+        .required(false)
+        .addValidator(StandardValidators.REGULAR_EXPRESSION_VALIDATOR)
+        .build();
     public static final PropertyDescriptor MAX_SELECTS = new PropertyDescriptor.Builder()
-            .name("Max Selects")
-            .description("The maximum number of files to pull in a single connection")
-            .defaultValue("100")
-            .required(true)
-            .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
-            .build();
+        .name("Max Selects")
+        .description("The maximum number of files to pull in a single connection")
+        .defaultValue("100")
+        .required(true)
+        .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
+        .build();
     public static final PropertyDescriptor REMOTE_POLL_BATCH_SIZE = new PropertyDescriptor.Builder()
-            .name("Remote Poll Batch Size")
-            .description("The value specifies how many file paths to find in a given directory on the remote system when doing a file listing. This value "
-                    + "in general should not need to be modified but when polling against a remote system with a tremendous number of files this value can "
-                    + "be critical.  Setting this value too high can result very poor performance and setting it too low can cause the flow to be slower "
-                    + "than normal.")
-            .defaultValue("5000")
-            .addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR)
-            .required(true)
-            .build();
+        .name("Remote Poll Batch Size")
+        .description("The value specifies how many file paths to find in a given directory on the remote system when doing a file listing. This value "
+            + "in general should not need to be modified but when polling against a remote system with a tremendous number of files this value can "
+            + "be critical.  Setting this value too high can result very poor performance and setting it too low can cause the flow to be slower "
+            + "than normal.")
+        .defaultValue("5000")
+        .addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR)
+        .required(true)
+        .build();
     public static final PropertyDescriptor DELETE_ORIGINAL = new PropertyDescriptor.Builder()
-            .name("Delete Original")
-            .description("Determines whether or not the file is deleted from the remote system after it has been successfully transferred")
-            .defaultValue("true")
-            .allowableValues("true", "false")
-            .required(true)
-            .build();
+        .name("Delete Original")
+        .description("Determines whether or not the file is deleted from the remote system after it has been successfully transferred")
+        .defaultValue("true")
+        .allowableValues("true", "false")
+        .required(true)
+        .build();
     public static final PropertyDescriptor POLLING_INTERVAL = new PropertyDescriptor.Builder()
-            .name("Polling Interval")
-            .description("Determines how long to wait between fetching the listing for new files")
-            .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
-            .required(true)
-            .defaultValue("60 sec")
-            .build();
+        .name("Polling Interval")
+        .description("Determines how long to wait between fetching the listing for new files")
+        .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
+        .required(true)
+        .defaultValue("60 sec")
+        .build();
     public static final PropertyDescriptor IGNORE_DOTTED_FILES = new PropertyDescriptor.Builder()
-            .name("Ignore Dotted Files")
-            .description("If true, files whose names begin with a dot (\".\") will be ignored")
-            .allowableValues("true", "false")
-            .defaultValue("true")
-            .required(true)
-            .build();
+        .name("Ignore Dotted Files")
+        .description("If true, files whose names begin with a dot (\".\") will be ignored")
+        .allowableValues("true", "false")
+        .defaultValue("true")
+        .required(true)
+        .build();
     public static final PropertyDescriptor USE_NATURAL_ORDERING = new PropertyDescriptor.Builder()
-            .name("Use Natural Ordering")
-            .description("If true, will pull files in the order in which they are naturally listed; otherwise, the order in which the files will be pulled is not defined")
-            .allowableValues("true", "false")
-            .defaultValue("false")
-            .required(true)
-            .build();
+        .name("Use Natural Ordering")
+        .description("If true, will pull files in the order in which they are naturally listed; otherwise, the order in which the files will be pulled is not defined")
+        .allowableValues("true", "false")
+        .defaultValue("false")
+        .required(true)
+        .build();
 
     // PUT-specific properties
     public static final String FILE_MODIFY_DATE_ATTR_FORMAT = "yyyy-MM-dd'T'HH:mm:ssZ";
@@ -183,77 +186,77 @@ public interface FileTransfer extends Closeable {
     public static final String CONFLICT_RESOLUTION_FAIL = "FAIL";
     public static final String CONFLICT_RESOLUTION_NONE = "NONE";
     public static final PropertyDescriptor CONFLICT_RESOLUTION = new PropertyDescriptor.Builder()
-            .name("Conflict Resolution")
-            .description("Determines how to handle the problem of filename collisions")
-            .required(true)
-            .allowableValues(CONFLICT_RESOLUTION_REPLACE, CONFLICT_RESOLUTION_IGNORE, CONFLICT_RESOLUTION_RENAME, CONFLICT_RESOLUTION_REJECT, CONFLICT_RESOLUTION_FAIL, CONFLICT_RESOLUTION_NONE)
-            .defaultValue(CONFLICT_RESOLUTION_NONE)
-            .build();
+        .name("Conflict Resolution")
+        .description("Determines how to handle the problem of filename collisions")
+        .required(true)
+        .allowableValues(CONFLICT_RESOLUTION_REPLACE, CONFLICT_RESOLUTION_IGNORE, CONFLICT_RESOLUTION_RENAME, CONFLICT_RESOLUTION_REJECT, CONFLICT_RESOLUTION_FAIL, CONFLICT_RESOLUTION_NONE)
+        .defaultValue(CONFLICT_RESOLUTION_NONE)
+        .build();
     public static final PropertyDescriptor REJECT_ZERO_BYTE = new PropertyDescriptor.Builder()
-            .name("Reject Zero-Byte Files")
-            .description("Determines whether or not Zero-byte files should be rejected without attempting to transfer")
-            .allowableValues("true", "false")
-            .defaultValue("true")
-            .build();
+        .name("Reject Zero-Byte Files")
+        .description("Determines whether or not Zero-byte files should be rejected without attempting to transfer")
+        .allowableValues("true", "false")
+        .defaultValue("true")
+        .build();
     public static final PropertyDescriptor DOT_RENAME = new PropertyDescriptor.Builder()
-            .name("Dot Rename")
-            .description("If true, then the filename of the sent file is prepended with a \".\" and then renamed back to the "
-                    + "original once the file is completely sent. Otherwise, there is no rename. This property is ignored if the "
-                    + "Temporary Filename property is set.")
-            .allowableValues("true", "false")
-            .defaultValue("true")
-            .build();
+        .name("Dot Rename")
+        .description("If true, then the filename of the sent file is prepended with a \".\" and then renamed back to the "
+            + "original once the file is completely sent. Otherwise, there is no rename. This property is ignored if the "
+            + "Temporary Filename property is set.")
+        .allowableValues("true", "false")
+        .defaultValue("true")
+        .build();
     public static final PropertyDescriptor TEMP_FILENAME = new PropertyDescriptor.Builder()
-            .name("Temporary Filename")
-            .description("If set, the filename of the sent file will be equal to the value specified during the transfer and after successful "
-                    + "completion will be renamed to the original filename. If this value is set, the Dot Rename property is ignored.")
-            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
-            .expressionLanguageSupported(true)
-            .required(false)
-            .build();
+        .name("Temporary Filename")
+        .description("If set, the filename of the sent file will be equal to the value specified during the transfer and after successful "
+            + "completion will be renamed to the original filename. If this value is set, the Dot Rename property is ignored.")
+        .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+        .expressionLanguageSupported(true)
+        .required(false)
+        .build();
     public static final PropertyDescriptor LAST_MODIFIED_TIME = new PropertyDescriptor.Builder()
-            .name("Last Modified Time")
-            .description("The lastModifiedTime to assign to the file after transferring it. If not set, the lastModifiedTime will not be changed. "
-                    + "Format must be yyyy-MM-dd'T'HH:mm:ssZ. You may also use expression language such as ${file.lastModifiedTime}. If the value "
-                    + "is invalid, the processor will not be invalid but will fail to change lastModifiedTime of the file.")
-            .required(false)
-            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
-            .expressionLanguageSupported(true)
-            .build();
+        .name("Last Modified Time")
+        .description("The lastModifiedTime to assign to the file after transferring it. If not set, the lastModifiedTime will not be changed. "
+            + "Format must be yyyy-MM-dd'T'HH:mm:ssZ. You may also use expression language such as ${file.lastModifiedTime}. If the value "
+            + "is invalid, the processor will not be invalid but will fail to change lastModifiedTime of the file.")
+        .required(false)
+        .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+        .expressionLanguageSupported(true)
+        .build();
     public static final PropertyDescriptor PERMISSIONS = new PropertyDescriptor.Builder()
-            .name("Permissions")
-            .description("The permissions to assign to the file after transferring it. Format must be either UNIX rwxrwxrwx with a - in place of "
-                    + "denied permissions (e.g. rw-r--r--) or an octal number (e.g. 644). If not set, the permissions will not be changed. You may "
-                    + "also use expression language such as ${file.permissions}. If the value is invalid, the processor will not be invalid but will "
-                    + "fail to change permissions of the file.")
-            .required(false)
-            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
-            .expressionLanguageSupported(true)
-            .build();
+        .name("Permissions")
+        .description("The permissions to assign to the file after transferring it. Format must be either UNIX rwxrwxrwx with a - in place of "
+            + "denied permissions (e.g. rw-r--r--) or an octal number (e.g. 644). If not set, the permissions will not be changed. You may "
+            + "also use expression language such as ${file.permissions}. If the value is invalid, the processor will not be invalid but will "
+            + "fail to change permissions of the file.")
+        .required(false)
+        .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+        .expressionLanguageSupported(true)
+        .build();
     public static final PropertyDescriptor REMOTE_OWNER = new PropertyDescriptor.Builder()
-            .name("Remote Owner")
-            .description("Integer value representing the User ID to set on the file after transferring it. If not set, the owner will not be set. "
-                    + "You may also use expression language such as ${file.owner}. If the value is invalid, the processor will not be invalid but "
-                    + "will fail to change the owner of the file.")
-            .required(false)
-            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
-            .expressionLanguageSupported(true)
-            .build();
+        .name("Remote Owner")
+        .description("Integer value representing the User ID to set on the file after transferring it. If not set, the owner will not be set. "
+            + "You may also use expression language such as ${file.owner}. If the value is invalid, the processor will not be invalid but "
+            + "will fail to change the owner of the file.")
+        .required(false)
+        .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+        .expressionLanguageSupported(true)
+        .build();
     public static final PropertyDescriptor REMOTE_GROUP = new PropertyDescriptor.Builder()
-            .name("Remote Group")
-            .description("Integer value representing the Group ID to set on the file after transferring it. If not set, the group will not be set. "
-                    + "You may also use expression language such as ${file.group}. If the value is invalid, the processor will not be invalid but "
-                    + "will fail to change the group of the file.")
-            .required(false)
-            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
-            .expressionLanguageSupported(true)
-            .build();
+        .name("Remote Group")
+        .description("Integer value representing the Group ID to set on the file after transferring it. If not set, the group will not be set. "
+            + "You may also use expression language such as ${file.group}. If the value is invalid, the processor will not be invalid but "
+            + "will fail to change the group of the file.")
+        .required(false)
+        .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+        .expressionLanguageSupported(true)
+        .build();
     public static final PropertyDescriptor BATCH_SIZE = new PropertyDescriptor.Builder()
-            .name("Batch Size")
-            .description("The maximum number of FlowFiles to send in a single connection")
-            .required(true)
-            .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
-            .defaultValue("500")
-            .build();
+        .name("Batch Size")
+        .description("The maximum number of FlowFiles to send in a single connection")
+        .required(true)
+        .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
+        .defaultValue("500")
+        .build();
 
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/d1d57931/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/ListableEntity.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/ListableEntity.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/ListableEntity.java
new file mode 100644
index 0000000..6e019ff
--- /dev/null
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/ListableEntity.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.standard.util;
+
+public interface ListableEntity {
+
+    /**
+     * @return The name of the remote entity
+     */
+    String getName();
+
+    /**
+     * @return the identifier of the remote entity. This may or may not be the same as the name of the
+     *         entity but should be unique across all entities.
+     */
+    String getIdentifier();
+
+
+    /**
+     * @return the timestamp for this entity so that we can be efficient about not performing listings of the same
+     *         entities multiple times
+     */
+    long getTimestamp();
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/d1d57931/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/PermissionDeniedException.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/PermissionDeniedException.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/PermissionDeniedException.java
new file mode 100644
index 0000000..465995e
--- /dev/null
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/PermissionDeniedException.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.standard.util;
+
+import java.io.IOException;
+
+public class PermissionDeniedException extends IOException {
+    private static final long serialVersionUID = -6215434916883053982L;
+
+    public PermissionDeniedException(final String message) {
+        super(message);
+    }
+
+    public PermissionDeniedException(final String message, final Throwable t) {
+        super(message, t);
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/d1d57931/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/SFTPTransfer.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/SFTPTransfer.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/SFTPTransfer.java
index 19955e7..c28f275 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/SFTPTransfer.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/SFTPTransfer.java
@@ -17,6 +17,7 @@
 package org.apache.nifi.processors.standard.util;
 
 import java.io.File;
+import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.io.InputStream;
 import java.nio.file.Path;
@@ -51,45 +52,45 @@ import com.jcraft.jsch.SftpException;
 public class SFTPTransfer implements FileTransfer {
 
     public static final PropertyDescriptor PRIVATE_KEY_PATH = new PropertyDescriptor.Builder()
-            .name("Private Key Path")
-            .description("The fully qualified path to the Private Key file")
-            .required(false)
-            .addValidator(StandardValidators.FILE_EXISTS_VALIDATOR)
-            .build();
+        .name("Private Key Path")
+        .description("The fully qualified path to the Private Key file")
+        .required(false)
+        .addValidator(StandardValidators.FILE_EXISTS_VALIDATOR)
+        .build();
     public static final PropertyDescriptor PRIVATE_KEY_PASSPHRASE = new PropertyDescriptor.Builder()
-            .name("Private Key Passphrase")
-            .description("Password for the private key")
-            .required(false)
-            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
-            .sensitive(true)
-            .build();
+        .name("Private Key Passphrase")
+        .description("Password for the private key")
+        .required(false)
+        .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+        .sensitive(true)
+        .build();
     public static final PropertyDescriptor HOST_KEY_FILE = new PropertyDescriptor.Builder()
-            .name("Host Key File")
-            .description("If supplied, the given file will be used as the Host Key; otherwise, no use host key file will be used")
-            .addValidator(StandardValidators.FILE_EXISTS_VALIDATOR)
-            .required(false)
-            .build();
+        .name("Host Key File")
+        .description("If supplied, the given file will be used as the Host Key; otherwise, no use host key file will be used")
+        .addValidator(StandardValidators.FILE_EXISTS_VALIDATOR)
+        .required(false)
+        .build();
     public static final PropertyDescriptor STRICT_HOST_KEY_CHECKING = new PropertyDescriptor.Builder()
-            .name("Strict Host Key Checking")
-            .description("Indicates whether or not strict enforcement of hosts keys should be applied")
-            .allowableValues("true", "false")
-            .defaultValue("false")
-            .required(true)
-            .build();
+        .name("Strict Host Key Checking")
+        .description("Indicates whether or not strict enforcement of hosts keys should be applied")
+        .allowableValues("true", "false")
+        .defaultValue("false")
+        .required(true)
+        .build();
     public static final PropertyDescriptor PORT = new PropertyDescriptor.Builder()
-            .name("Port")
-            .description("The port that the remote system is listening on for file transfers")
-            .addValidator(StandardValidators.PORT_VALIDATOR)
-            .required(true)
-            .defaultValue("22")
-            .build();
+        .name("Port")
+        .description("The port that the remote system is listening on for file transfers")
+        .addValidator(StandardValidators.PORT_VALIDATOR)
+        .required(true)
+        .defaultValue("22")
+        .build();
     public static final PropertyDescriptor USE_KEEPALIVE_ON_TIMEOUT = new PropertyDescriptor.Builder()
-            .name("Send Keep Alive On Timeout")
-            .description("Indicates whether or not to send a single Keep Alive message when SSH socket times out")
-            .allowableValues("true", "false")
-            .defaultValue("true")
-            .required(true)
-            .build();
+        .name("Send Keep Alive On Timeout")
+        .description("Indicates whether or not to send a single Keep Alive message when SSH socket times out")
+        .allowableValues("true", "false")
+        .defaultValue("true")
+        .required(true)
+        .build();
 
     /**
      * Dynamic property which is used to decide if the {@link #ensureDirectoryExists(FlowFile, File)} method should perform a {@link ChannelSftp#ls(String)} before calling
@@ -99,12 +100,12 @@ public class SFTPTransfer implements FileTransfer {
      * This property is dynamic until deemed a worthy inclusion as proper.
      */
     public static final PropertyDescriptor DISABLE_DIRECTORY_LISTING = new PropertyDescriptor.Builder()
-            .name("Disable Directory Listing")
-            .description("Disables directory listings before operations which might fail, such as configurations which create directory structures.")
-            .addValidator(StandardValidators.BOOLEAN_VALIDATOR)
-            .dynamic(true)
-            .defaultValue("false")
-            .build();
+        .name("Disable Directory Listing")
+        .description("Disables directory listings before operations which might fail, such as configurations which create directory structures.")
+        .addValidator(StandardValidators.BOOLEAN_VALIDATOR)
+        .dynamic(true)
+        .defaultValue("false")
+        .build();
 
     private final ProcessorLog logger;
 
@@ -133,7 +134,16 @@ public class SFTPTransfer implements FileTransfer {
     public List<FileInfo> getListing() throws IOException {
         final String path = ctx.getProperty(FileTransfer.REMOTE_PATH).evaluateAttributeExpressions().getValue();
         final int depth = 0;
-        final int maxResults = ctx.getProperty(FileTransfer.REMOTE_POLL_BATCH_SIZE).asInteger();
+
+        final int maxResults;
+        final PropertyValue batchSizeValue = ctx.getProperty(FileTransfer.REMOTE_POLL_BATCH_SIZE);
+        if (batchSizeValue == null) {
+            maxResults = Integer.MAX_VALUE;
+        } else {
+            final Integer configuredValue = batchSizeValue.asInteger();
+            maxResults = configuredValue == null ? Integer.MAX_VALUE : configuredValue;
+        }
+
         final List<FileInfo> listing = new ArrayList<>(1000);
         getListing(path, depth, maxResults, listing);
         return listing;
@@ -222,7 +232,15 @@ public class SFTPTransfer implements FileTransfer {
                 sftp.ls(path, filter);
             }
         } catch (final SftpException e) {
-            throw new IOException("Failed to obtain file listing for " + (path == null ? "current directory" : path), e);
+            final String pathDesc = path == null ? "current directory" : path;
+            switch (e.id) {
+                case ChannelSftp.SSH_FX_NO_SUCH_FILE:
+                    throw new FileNotFoundException("Could not perform listing on " + pathDesc + " because could not find the file on the remote server");
+                case ChannelSftp.SSH_FX_PERMISSION_DENIED:
+                    throw new PermissionDeniedException("Could not perform listing on " + pathDesc + " due to insufficient permissions");
+                default:
+                    throw new IOException("Failed to obtain file listing for " + pathDesc, e);
+            }
         }
 
         for (final LsEntry entry : subDirs) {
@@ -251,24 +269,36 @@ public class SFTPTransfer implements FileTransfer {
         }
 
         FileInfo.Builder builder = new FileInfo.Builder()
-                .filename(entry.getFilename())
-                .fullPathFileName(newFullForwardPath)
-                .directory(entry.getAttrs().isDir())
-                .size(entry.getAttrs().getSize())
-                .lastModifiedTime(entry.getAttrs().getMTime() * 1000L)
-                .permissions(perms)
-                .owner(Integer.toString(entry.getAttrs().getUId()))
-                .group(Integer.toString(entry.getAttrs().getGId()));
+            .filename(entry.getFilename())
+            .fullPathFileName(newFullForwardPath)
+            .directory(entry.getAttrs().isDir())
+            .size(entry.getAttrs().getSize())
+            .lastModifiedTime(entry.getAttrs().getMTime() * 1000L)
+            .permissions(perms)
+            .owner(Integer.toString(entry.getAttrs().getUId()))
+            .group(Integer.toString(entry.getAttrs().getGId()));
         return builder.build();
     }
 
     @Override
     public InputStream getInputStream(final String remoteFileName) throws IOException {
-        final ChannelSftp sftp = getChannel(null);
+        return getInputStream(remoteFileName, null);
+    }
+
+    @Override
+    public InputStream getInputStream(final String remoteFileName, final FlowFile flowFile) throws IOException {
+        final ChannelSftp sftp = getChannel(flowFile);
         try {
             return sftp.get(remoteFileName);
         } catch (final SftpException e) {
-            throw new IOException("Failed to obtain file content for " + remoteFileName, e);
+            switch (e.id) {
+                case ChannelSftp.SSH_FX_NO_SUCH_FILE:
+                    throw new FileNotFoundException("Could not find file " + remoteFileName + " on remote SFTP Server");
+                case ChannelSftp.SSH_FX_PERMISSION_DENIED:
+                    throw new PermissionDeniedException("Insufficient permissions to read file " + remoteFileName + " from remote SFTP Server", e);
+                default:
+                    throw new IOException("Failed to obtain file content for " + remoteFileName, e);
+            }
         }
     }
 
@@ -283,7 +313,14 @@ public class SFTPTransfer implements FileTransfer {
         try {
             sftp.rm(fullPath);
         } catch (final SftpException e) {
-            throw new IOException("Failed to delete remote file " + fullPath, e);
+            switch (e.id) {
+                case ChannelSftp.SSH_FX_NO_SUCH_FILE:
+                    throw new FileNotFoundException("Could not find file " + remoteFileName + " to remove from remote SFTP Server");
+                case ChannelSftp.SSH_FX_PERMISSION_DENIED:
+                    throw new PermissionDeniedException("Insufficient permissions to delete file " + remoteFileName + " from remote SFTP Server", e);
+                default:
+                    throw new IOException("Failed to delete remote file " + fullPath, e);
+            }
         }
     }
 
@@ -333,10 +370,10 @@ public class SFTPTransfer implements FileTransfer {
             if (directoryName.getParent() != null && !directoryName.getParentFile().equals(new File(File.separator))) {
                 ensureDirectoryExists(flowFile, directoryName.getParentFile());
             }
-            logger.debug("Remote Directory {} does not exist; creating it", new Object[]{remoteDirectory});
+            logger.debug("Remote Directory {} does not exist; creating it", new Object[] {remoteDirectory});
             try {
                 channel.mkdir(remoteDirectory);
-                logger.debug("Created {}", new Object[]{remoteDirectory});
+                logger.debug("Created {}", new Object[] {remoteDirectory});
             } catch (final SftpException e) {
                 throw new IOException("Failed to create remote directory " + remoteDirectory + " due to " + e, e);
             }
@@ -358,9 +395,9 @@ public class SFTPTransfer implements FileTransfer {
 
         final JSch jsch = new JSch();
         try {
-            final Session session = jsch.getSession(ctx.getProperty(USERNAME).getValue(),
-                    ctx.getProperty(HOSTNAME).evaluateAttributeExpressions(flowFile).getValue(),
-                    ctx.getProperty(PORT).evaluateAttributeExpressions(flowFile).asInteger().intValue());
+            final Session session = jsch.getSession(ctx.getProperty(USERNAME).evaluateAttributeExpressions(flowFile).getValue(),
+                ctx.getProperty(HOSTNAME).evaluateAttributeExpressions(flowFile).getValue(),
+                ctx.getProperty(PORT).evaluateAttributeExpressions(flowFile).asInteger().intValue());
 
             final String hostKeyVal = ctx.getProperty(HOST_KEY_FILE).getValue();
             if (hostKeyVal != null) {
@@ -371,7 +408,8 @@ public class SFTPTransfer implements FileTransfer {
             properties.setProperty("StrictHostKeyChecking", ctx.getProperty(STRICT_HOST_KEY_CHECKING).asBoolean() ? "yes" : "no");
             properties.setProperty("PreferredAuthentications", "publickey,password");
 
-            if (ctx.getProperty(FileTransfer.USE_COMPRESSION).asBoolean()) {
+            final PropertyValue compressionValue = ctx.getProperty(FileTransfer.USE_COMPRESSION);
+            if (compressionValue != null && "true".equalsIgnoreCase(compressionValue.getValue())) {
                 properties.setProperty("compression.s2c", "zlib@openssh.com,zlib,none");
                 properties.setProperty("compression.c2s", "zlib@openssh.com,zlib,none");
             } else {
@@ -381,12 +419,12 @@ public class SFTPTransfer implements FileTransfer {
 
             session.setConfig(properties);
 
-            final String privateKeyFile = ctx.getProperty(PRIVATE_KEY_PATH).getValue();
+            final String privateKeyFile = ctx.getProperty(PRIVATE_KEY_PATH).evaluateAttributeExpressions(flowFile).getValue();
             if (privateKeyFile != null) {
-                jsch.addIdentity(privateKeyFile, ctx.getProperty(PRIVATE_KEY_PASSPHRASE).getValue());
+                jsch.addIdentity(privateKeyFile, ctx.getProperty(PRIVATE_KEY_PASSPHRASE).evaluateAttributeExpressions(flowFile).getValue());
             }
 
-            final String password = ctx.getProperty(FileTransfer.PASSWORD).getValue();
+            final String password = ctx.getProperty(FileTransfer.PASSWORD).evaluateAttributeExpressions(flowFile).getValue();
             if (password != null) {
                 session.setPassword(password);
             }
@@ -428,7 +466,7 @@ public class SFTPTransfer implements FileTransfer {
                 sftp.exit();
             }
         } catch (final Exception ex) {
-            logger.warn("Failed to close ChannelSftp due to {}", new Object[]{ex.toString()}, ex);
+            logger.warn("Failed to close ChannelSftp due to {}", new Object[] {ex.toString()}, ex);
         }
         sftp = null;
 
@@ -437,7 +475,7 @@ public class SFTPTransfer implements FileTransfer {
                 session.disconnect();
             }
         } catch (final Exception ex) {
-            logger.warn("Failed to close session due to {}", new Object[]{ex.toString()}, ex);
+            logger.warn("Failed to close session due to {}", new Object[] {ex.toString()}, ex);
         }
         session = null;
     }
@@ -515,7 +553,7 @@ public class SFTPTransfer implements FileTransfer {
                 int time = (int) (fileModifyTime.getTime() / 1000L);
                 sftp.setMtime(tempPath, time);
             } catch (final Exception e) {
-                logger.error("Failed to set lastModifiedTime on {} to {} due to {}", new Object[]{tempPath, lastModifiedTime, e});
+                logger.error("Failed to set lastModifiedTime on {} to {} due to {}", new Object[] {tempPath, lastModifiedTime, e});
             }
         }
 
@@ -527,7 +565,7 @@ public class SFTPTransfer implements FileTransfer {
                     sftp.chmod(perms, tempPath);
                 }
             } catch (final Exception e) {
-                logger.error("Failed to set permission on {} to {} due to {}", new Object[]{tempPath, permissions, e});
+                logger.error("Failed to set permission on {} to {} due to {}", new Object[] {tempPath, permissions, e});
             }
         }
 
@@ -536,7 +574,7 @@ public class SFTPTransfer implements FileTransfer {
             try {
                 sftp.chown(Integer.parseInt(owner), tempPath);
             } catch (final Exception e) {
-                logger.error("Failed to set owner on {} to {} due to {}", new Object[]{tempPath, owner, e});
+                logger.error("Failed to set owner on {} to {} due to {}", new Object[] {tempPath, owner, e});
             }
         }
 
@@ -545,7 +583,7 @@ public class SFTPTransfer implements FileTransfer {
             try {
                 sftp.chgrp(Integer.parseInt(group), tempPath);
             } catch (final Exception e) {
-                logger.error("Failed to set group on {} to {} due to {}", new Object[]{tempPath, group, e});
+                logger.error("Failed to set group on {} to {} due to {}", new Object[] {tempPath, group, e});
             }
         }
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/d1d57931/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
index ff39ad3..b12fb6f 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
@@ -28,6 +28,7 @@ org.apache.nifi.processors.standard.EvaluateXQuery
 org.apache.nifi.processors.standard.ExecuteStreamCommand
 org.apache.nifi.processors.standard.ExecuteProcess
 org.apache.nifi.processors.standard.ExtractText
+org.apache.nifi.processors.standard.FetchSFTP
 org.apache.nifi.processors.standard.GenerateFlowFile
 org.apache.nifi.processors.standard.GetFile
 org.apache.nifi.processors.standard.GetFTP
@@ -43,6 +44,7 @@ org.apache.nifi.processors.standard.GetJMSQueue
 org.apache.nifi.processors.standard.GetJMSTopic
 org.apache.nifi.processors.standard.ListenHTTP
 org.apache.nifi.processors.standard.ListenUDP
+org.apache.nifi.processors.standard.ListSFTP
 org.apache.nifi.processors.standard.LogAttribute
 org.apache.nifi.processors.standard.MergeContent
 org.apache.nifi.processors.standard.ModifyBytes

http://git-wip-us.apache.org/repos/asf/nifi/blob/d1d57931/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestAbstractListProcessor.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestAbstractListProcessor.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestAbstractListProcessor.java
new file mode 100644
index 0000000..ba84939
--- /dev/null
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestAbstractListProcessor.java
@@ -0,0 +1,221 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.standard;
+
+import static org.junit.Assert.assertEquals;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.controller.AbstractControllerService;
+import org.apache.nifi.distributed.cache.client.Deserializer;
+import org.apache.nifi.distributed.cache.client.DistributedMapCacheClient;
+import org.apache.nifi.distributed.cache.client.Serializer;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processors.standard.util.ListableEntity;
+import org.apache.nifi.reporting.InitializationException;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.junit.Test;
+
+public class TestAbstractListProcessor {
+
+    @Test
+    public void testOnlyNewEntriesEmitted() {
+        final ConcreteListProcessor proc = new ConcreteListProcessor();
+        final TestRunner runner = TestRunners.newTestRunner(proc);
+        runner.run();
+
+        runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 0);
+        proc.addEntity("name", "id", 1492L);
+        runner.run();
+
+        runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 1);
+        runner.clearTransferState();
+
+        proc.addEntity("name", "id2", 1492L);
+        runner.run();
+        runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 1);
+        runner.clearTransferState();
+
+        proc.addEntity("name", "id2", 1492L);
+        runner.run();
+        runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 0);
+        runner.clearTransferState();
+
+        proc.addEntity("name", "id3", 1491L);
+        runner.run();
+        runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 0);
+        runner.clearTransferState();
+
+        proc.addEntity("name", "id2", 1492L);
+        runner.run();
+        runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 0);
+        runner.clearTransferState();
+
+        proc.addEntity("name", "id2", 1493L);
+        runner.run();
+        runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 1);
+        runner.clearTransferState();
+
+        proc.addEntity("name", "id2", 1493L);
+        runner.run();
+        runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 0);
+        runner.clearTransferState();
+
+        proc.addEntity("name", "id2", 1493L);
+        runner.run();
+        runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 0);
+        runner.clearTransferState();
+
+        proc.addEntity("name", "id", 1494L);
+        runner.run();
+        runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 1);
+        runner.clearTransferState();
+    }
+
+    @Test
+    public void testStateStoredInDistributedService() throws InitializationException {
+        final ConcreteListProcessor proc = new ConcreteListProcessor();
+        final TestRunner runner = TestRunners.newTestRunner(proc);
+        final DistributedCache cache = new DistributedCache();
+        runner.addControllerService("cache", cache);
+        runner.enableControllerService(cache);
+        runner.setProperty(AbstractListProcessor.DISTRIBUTED_CACHE_SERVICE, "cache");
+
+        runner.run();
+
+        proc.addEntity("name", "id", 1492L);
+        runner.run();
+
+        assertEquals(1, cache.stored.size());
+    }
+
+    @Test
+    public void testFetchOnStart() throws InitializationException {
+        final ConcreteListProcessor proc = new ConcreteListProcessor();
+        final TestRunner runner = TestRunners.newTestRunner(proc);
+        final DistributedCache cache = new DistributedCache();
+        runner.addControllerService("cache", cache);
+        runner.enableControllerService(cache);
+        runner.setProperty(AbstractListProcessor.DISTRIBUTED_CACHE_SERVICE, "cache");
+
+        runner.run();
+
+        assertEquals(1, cache.fetchCount);
+    }
+
+    private static class DistributedCache extends AbstractControllerService implements DistributedMapCacheClient {
+        private final Map<Object, Object> stored = new HashMap<>();
+        private int fetchCount = 0;
+
+        @Override
+        public <K, V> boolean putIfAbsent(K key, V value, Serializer<K> keySerializer, Serializer<V> valueSerializer) throws IOException {
+            return false;
+        }
+
+        @Override
+        public <K, V> V getAndPutIfAbsent(K key, V value, Serializer<K> keySerializer, Serializer<V> valueSerializer, Deserializer<V> valueDeserializer) throws IOException {
+            return null;
+        }
+
+        @Override
+        public <K> boolean containsKey(K key, Serializer<K> keySerializer) throws IOException {
+            return false;
+        }
+
+        @Override
+        public <K, V> void put(K key, V value, Serializer<K> keySerializer, Serializer<V> valueSerializer) throws IOException {
+            stored.put(key, value);
+        }
+
+        @Override
+        @SuppressWarnings("unchecked")
+        public <K, V> V get(K key, Serializer<K> keySerializer, Deserializer<V> valueDeserializer) throws IOException {
+            fetchCount++;
+            return (V) stored.get(key);
+        }
+
+        @Override
+        public void close() throws IOException {
+        }
+
+        @Override
+        public <K> boolean remove(K key, Serializer<K> serializer) throws IOException {
+            final Object value = stored.remove(key);
+            return value != null;
+        }
+    }
+
+
+    private static class ConcreteListProcessor extends AbstractListProcessor<ListableEntity> {
+        private final List<ListableEntity> entities = new ArrayList<>();
+
+        @Override
+        protected File getPersistenceFile() {
+            return new File("target/ListProcessor-local-state.json");
+        }
+
+        public void addEntity(final String name, final String identifier, final long timestamp) {
+            final ListableEntity entity = new ListableEntity() {
+                @Override
+                public String getName() {
+                    return name;
+                }
+
+                @Override
+                public String getIdentifier() {
+                    return identifier;
+                }
+
+                @Override
+                public long getTimestamp() {
+                    return timestamp;
+                }
+            };
+
+            entities.add(entity);
+        }
+
+        @Override
+        protected Map<String, String> createAttributes(final ListableEntity entity, final ProcessContext context) {
+            return Collections.emptyMap();
+        }
+
+        @Override
+        protected String getPath(final ProcessContext context) {
+            return "/path";
+        }
+
+        @Override
+        protected List<ListableEntity> performListing(final ProcessContext context, final Long minTimestamp) throws IOException {
+            return Collections.unmodifiableList(entities);
+        }
+
+        @Override
+        protected boolean isListingResetNecessary(PropertyDescriptor property) {
+            return false;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/d1d57931/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestFetchFileTransfer.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestFetchFileTransfer.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestFetchFileTransfer.java
new file mode 100644
index 0000000..7aa8f9c
--- /dev/null
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestFetchFileTransfer.java
@@ -0,0 +1,186 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.standard;
+
+import static org.junit.Assert.assertFalse;
+
+import java.io.ByteArrayInputStream;
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processors.standard.util.FileInfo;
+import org.apache.nifi.processors.standard.util.FileTransfer;
+import org.apache.nifi.processors.standard.util.PermissionDeniedException;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.junit.Test;
+
+public class TestFetchFileTransfer {
+
+    @Test
+    public void testContentFetched() {
+        final TestableFetchFileTransfer proc = new TestableFetchFileTransfer();
+        final TestRunner runner = TestRunners.newTestRunner(proc);
+        runner.setProperty(FetchFileTransfer.HOSTNAME, "localhost");
+        runner.setProperty(FetchFileTransfer.UNDEFAULTED_PORT, "11");
+        runner.setProperty(FetchFileTransfer.REMOTE_FILENAME, "${filename}");
+
+        proc.addContent("hello.txt", "world".getBytes());
+        final Map<String, String> attrs = new HashMap<>();
+        attrs.put("filename", "hello.txt");
+        runner.enqueue(new byte[0], attrs);
+
+        runner.run(1, false, false);
+        runner.assertAllFlowFilesTransferred(FetchFileTransfer.REL_SUCCESS, 1);
+        assertFalse(proc.closed);
+        runner.getFlowFilesForRelationship(FetchFileTransfer.REL_SUCCESS).get(0).assertContentEquals("world");
+    }
+
+    @Test
+    public void testContentNotFound() {
+        final TestableFetchFileTransfer proc = new TestableFetchFileTransfer();
+        final TestRunner runner = TestRunners.newTestRunner(proc);
+        runner.setProperty(FetchFileTransfer.HOSTNAME, "localhost");
+        runner.setProperty(FetchFileTransfer.UNDEFAULTED_PORT, "11");
+        runner.setProperty(FetchFileTransfer.REMOTE_FILENAME, "${filename}");
+
+        final Map<String, String> attrs = new HashMap<>();
+        attrs.put("filename", "hello.txt");
+        runner.enqueue(new byte[0], attrs);
+
+        runner.run(1, false, false);
+        runner.assertAllFlowFilesTransferred(FetchFileTransfer.REL_NOT_FOUND, 1);
+    }
+
+    @Test
+    public void testInsufficientPermissions() {
+        final TestableFetchFileTransfer proc = new TestableFetchFileTransfer();
+        final TestRunner runner = TestRunners.newTestRunner(proc);
+        runner.setProperty(FetchFileTransfer.HOSTNAME, "localhost");
+        runner.setProperty(FetchFileTransfer.UNDEFAULTED_PORT, "11");
+        runner.setProperty(FetchFileTransfer.REMOTE_FILENAME, "${filename}");
+
+        proc.addContent("hello.txt", "world".getBytes());
+        proc.allowAccess = false;
+        final Map<String, String> attrs = new HashMap<>();
+        attrs.put("filename", "hello.txt");
+        runner.enqueue(new byte[0], attrs);
+
+        runner.run(1, false, false);
+        runner.assertAllFlowFilesTransferred(FetchFileTransfer.REL_PERMISSION_DENIED, 1);
+    }
+
+    private static class TestableFetchFileTransfer extends FetchFileTransfer {
+        private boolean allowAccess = true;
+        private boolean closed = false;
+        private final Map<String, byte[]> fileContents = new HashMap<>();
+
+        public void addContent(final String filename, final byte[] content) {
+            this.fileContents.put(filename, content);
+        }
+
+        @Override
+        protected FileTransfer createFileTransfer(final ProcessContext context) {
+            return new FileTransfer() {
+                @Override
+                public void close() throws IOException {
+                    closed = true;
+                }
+
+                @Override
+                public String getHomeDirectory(FlowFile flowFile) throws IOException {
+                    return null;
+                }
+
+                @Override
+                public List<FileInfo> getListing() throws IOException {
+                    return null;
+                }
+
+                @Override
+                public InputStream getInputStream(final String remoteFileName) throws IOException {
+                    return getInputStream(remoteFileName, null);
+                }
+
+                @Override
+                public InputStream getInputStream(String remoteFileName, FlowFile flowFile) throws IOException {
+                    if (!allowAccess) {
+                        throw new PermissionDeniedException("test permission denied");
+                    }
+
+                    final byte[] content = fileContents.get(remoteFileName);
+                    if (content == null) {
+                        throw new FileNotFoundException();
+                    }
+
+                    return new ByteArrayInputStream(content);
+                }
+
+                @Override
+                public void flush() throws IOException {
+                }
+
+                @Override
+                public FileInfo getRemoteFileInfo(FlowFile flowFile, String path, String remoteFileName) throws IOException {
+                    return null;
+                }
+
+                @Override
+                public String put(FlowFile flowFile, String path, String filename, InputStream content) throws IOException {
+                    return null;
+                }
+
+                @Override
+                public void deleteFile(String path, String remoteFileName) throws IOException {
+                    if (!fileContents.containsKey(remoteFileName)) {
+                        throw new FileNotFoundException();
+                    }
+
+                    fileContents.remove(remoteFileName);
+                }
+
+                @Override
+                public void deleteDirectory(String remoteDirectoryName) throws IOException {
+
+                }
+
+                @Override
+                public boolean isClosed() {
+                    return false;
+                }
+
+                @Override
+                public String getProtocolName() {
+                    return "test";
+                }
+
+                @Override
+                public void ensureDirectoryExists(FlowFile flowFile, File remoteDirectory) throws IOException {
+
+                }
+            };
+        }
+    }
+}