You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by mc...@apache.org on 2015/10/30 15:29:48 UTC
[22/50] [abbrv] nifi git commit: NIFI-673: Initial implementation of
ListSFTP, FetchSFTP
http://git-wip-us.apache.org/repos/asf/nifi/blob/d1d57931/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/FileTransfer.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/FileTransfer.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/FileTransfer.java
index f0061b8..fe277df 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/FileTransfer.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/FileTransfer.java
@@ -23,6 +23,7 @@ import java.io.InputStream;
import java.util.List;
import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.Validator;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.util.StandardValidators;
@@ -34,6 +35,8 @@ public interface FileTransfer extends Closeable {
InputStream getInputStream(String remoteFileName) throws IOException;
+ InputStream getInputStream(String remoteFileName, FlowFile flowFile) throws IOException;
+
void flush() throws IOException;
FileInfo getRemoteFileInfo(FlowFile flowFile, String path, String remoteFileName) throws IOException;
@@ -51,127 +54,127 @@ public interface FileTransfer extends Closeable {
void ensureDirectoryExists(FlowFile flowFile, File remoteDirectory) throws IOException;
public static final PropertyDescriptor HOSTNAME = new PropertyDescriptor.Builder()
- .name("Hostname")
- .description("The fully qualified hostname or IP address of the remote system")
- .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
- .required(true)
- .expressionLanguageSupported(true)
- .build();
+ .name("Hostname")
+ .description("The fully qualified hostname or IP address of the remote system")
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .required(true)
+ .expressionLanguageSupported(true)
+ .build();
public static final PropertyDescriptor USERNAME = new PropertyDescriptor.Builder()
- .name("Username")
- .description("Username")
- .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
- .required(true)
- .build();
+ .name("Username")
+ .description("Username")
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .required(true)
+ .build();
public static final PropertyDescriptor PASSWORD = new PropertyDescriptor.Builder()
- .name("Password")
- .description("Password for the user account")
- .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
- .required(false)
- .sensitive(true)
- .build();
+ .name("Password")
+ .description("Password for the user account")
+ .addValidator(Validator.VALID)
+ .required(false)
+ .sensitive(true)
+ .build();
public static final PropertyDescriptor DATA_TIMEOUT = new PropertyDescriptor.Builder()
- .name("Data Timeout")
- .description("Amount of time to wait before timing out while transferring data")
- .required(true)
- .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
- .defaultValue("30 sec")
- .build();
+ .name("Data Timeout")
+ .description("When transferring a file between the local and remote system, this value specifies how long is allowed to elapse without any data being transferred between systems")
+ .required(true)
+ .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
+ .defaultValue("30 sec")
+ .build();
public static final PropertyDescriptor CONNECTION_TIMEOUT = new PropertyDescriptor.Builder()
- .name("Connection Timeout")
- .description("Amount of time to wait before timing out while creating a connection")
- .required(true)
- .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
- .defaultValue("30 sec")
- .build();
+ .name("Connection Timeout")
+ .description("Amount of time to wait before timing out while creating a connection")
+ .required(true)
+ .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
+ .defaultValue("30 sec")
+ .build();
public static final PropertyDescriptor REMOTE_PATH = new PropertyDescriptor.Builder()
- .name("Remote Path")
- .description("The path on the remote system from which to pull or push files")
- .required(false)
- .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
- .expressionLanguageSupported(true)
- .build();
+ .name("Remote Path")
+ .description("The path on the remote system from which to pull or push files")
+ .required(false)
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .expressionLanguageSupported(true)
+ .build();
public static final PropertyDescriptor CREATE_DIRECTORY = new PropertyDescriptor.Builder()
- .name("Create Directory")
- .description("Specifies whether or not the remote directory should be created if it does not exist.")
- .required(true)
- .allowableValues("true", "false")
- .defaultValue("false")
- .build();
+ .name("Create Directory")
+ .description("Specifies whether or not the remote directory should be created if it does not exist.")
+ .required(true)
+ .allowableValues("true", "false")
+ .defaultValue("false")
+ .build();
public static final PropertyDescriptor USE_COMPRESSION = new PropertyDescriptor.Builder()
- .name("Use Compression")
- .description("Indicates whether or not ZLIB compression should be used when transferring files")
- .allowableValues("true", "false")
- .defaultValue("false")
- .required(true)
- .build();
+ .name("Use Compression")
+ .description("Indicates whether or not ZLIB compression should be used when transferring files")
+ .allowableValues("true", "false")
+ .defaultValue("false")
+ .required(true)
+ .build();
// GET-specific properties
public static final PropertyDescriptor RECURSIVE_SEARCH = new PropertyDescriptor.Builder()
- .name("Search Recursively")
- .description("If true, will pull files from arbitrarily nested subdirectories; otherwise, will not traverse subdirectories")
- .required(true)
- .defaultValue("false")
- .allowableValues("true", "false")
- .build();
+ .name("Search Recursively")
+ .description("If true, will pull files from arbitrarily nested subdirectories; otherwise, will not traverse subdirectories")
+ .required(true)
+ .defaultValue("false")
+ .allowableValues("true", "false")
+ .build();
public static final PropertyDescriptor FILE_FILTER_REGEX = new PropertyDescriptor.Builder()
- .name("File Filter Regex")
- .description("Provides a Java Regular Expression for filtering Filenames; if a filter is supplied, only files whose names match that Regular Expression will be fetched")
- .required(false)
- .addValidator(StandardValidators.REGULAR_EXPRESSION_VALIDATOR)
- .build();
+ .name("File Filter Regex")
+ .description("Provides a Java Regular Expression for filtering Filenames; if a filter is supplied, only files whose names match that Regular Expression will be fetched")
+ .required(false)
+ .addValidator(StandardValidators.REGULAR_EXPRESSION_VALIDATOR)
+ .build();
public static final PropertyDescriptor PATH_FILTER_REGEX = new PropertyDescriptor.Builder()
- .name("Path Filter Regex")
- .description("When " + RECURSIVE_SEARCH.getName() + " is true, then only subdirectories whose path matches the given Regular Expression will be scanned")
- .required(false)
- .addValidator(StandardValidators.REGULAR_EXPRESSION_VALIDATOR)
- .build();
+ .name("Path Filter Regex")
+ .description("When " + RECURSIVE_SEARCH.getName() + " is true, then only subdirectories whose path matches the given Regular Expression will be scanned")
+ .required(false)
+ .addValidator(StandardValidators.REGULAR_EXPRESSION_VALIDATOR)
+ .build();
public static final PropertyDescriptor MAX_SELECTS = new PropertyDescriptor.Builder()
- .name("Max Selects")
- .description("The maximum number of files to pull in a single connection")
- .defaultValue("100")
- .required(true)
- .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
- .build();
+ .name("Max Selects")
+ .description("The maximum number of files to pull in a single connection")
+ .defaultValue("100")
+ .required(true)
+ .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
+ .build();
public static final PropertyDescriptor REMOTE_POLL_BATCH_SIZE = new PropertyDescriptor.Builder()
- .name("Remote Poll Batch Size")
- .description("The value specifies how many file paths to find in a given directory on the remote system when doing a file listing. This value "
- + "in general should not need to be modified but when polling against a remote system with a tremendous number of files this value can "
- + "be critical. Setting this value too high can result very poor performance and setting it too low can cause the flow to be slower "
- + "than normal.")
- .defaultValue("5000")
- .addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR)
- .required(true)
- .build();
+ .name("Remote Poll Batch Size")
+ .description("The value specifies how many file paths to find in a given directory on the remote system when doing a file listing. This value "
+ + "in general should not need to be modified but when polling against a remote system with a tremendous number of files this value can "
+ + "be critical. Setting this value too high can result very poor performance and setting it too low can cause the flow to be slower "
+ + "than normal.")
+ .defaultValue("5000")
+ .addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR)
+ .required(true)
+ .build();
public static final PropertyDescriptor DELETE_ORIGINAL = new PropertyDescriptor.Builder()
- .name("Delete Original")
- .description("Determines whether or not the file is deleted from the remote system after it has been successfully transferred")
- .defaultValue("true")
- .allowableValues("true", "false")
- .required(true)
- .build();
+ .name("Delete Original")
+ .description("Determines whether or not the file is deleted from the remote system after it has been successfully transferred")
+ .defaultValue("true")
+ .allowableValues("true", "false")
+ .required(true)
+ .build();
public static final PropertyDescriptor POLLING_INTERVAL = new PropertyDescriptor.Builder()
- .name("Polling Interval")
- .description("Determines how long to wait between fetching the listing for new files")
- .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
- .required(true)
- .defaultValue("60 sec")
- .build();
+ .name("Polling Interval")
+ .description("Determines how long to wait between fetching the listing for new files")
+ .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
+ .required(true)
+ .defaultValue("60 sec")
+ .build();
public static final PropertyDescriptor IGNORE_DOTTED_FILES = new PropertyDescriptor.Builder()
- .name("Ignore Dotted Files")
- .description("If true, files whose names begin with a dot (\".\") will be ignored")
- .allowableValues("true", "false")
- .defaultValue("true")
- .required(true)
- .build();
+ .name("Ignore Dotted Files")
+ .description("If true, files whose names begin with a dot (\".\") will be ignored")
+ .allowableValues("true", "false")
+ .defaultValue("true")
+ .required(true)
+ .build();
public static final PropertyDescriptor USE_NATURAL_ORDERING = new PropertyDescriptor.Builder()
- .name("Use Natural Ordering")
- .description("If true, will pull files in the order in which they are naturally listed; otherwise, the order in which the files will be pulled is not defined")
- .allowableValues("true", "false")
- .defaultValue("false")
- .required(true)
- .build();
+ .name("Use Natural Ordering")
+ .description("If true, will pull files in the order in which they are naturally listed; otherwise, the order in which the files will be pulled is not defined")
+ .allowableValues("true", "false")
+ .defaultValue("false")
+ .required(true)
+ .build();
// PUT-specific properties
public static final String FILE_MODIFY_DATE_ATTR_FORMAT = "yyyy-MM-dd'T'HH:mm:ssZ";
@@ -183,77 +186,77 @@ public interface FileTransfer extends Closeable {
public static final String CONFLICT_RESOLUTION_FAIL = "FAIL";
public static final String CONFLICT_RESOLUTION_NONE = "NONE";
public static final PropertyDescriptor CONFLICT_RESOLUTION = new PropertyDescriptor.Builder()
- .name("Conflict Resolution")
- .description("Determines how to handle the problem of filename collisions")
- .required(true)
- .allowableValues(CONFLICT_RESOLUTION_REPLACE, CONFLICT_RESOLUTION_IGNORE, CONFLICT_RESOLUTION_RENAME, CONFLICT_RESOLUTION_REJECT, CONFLICT_RESOLUTION_FAIL, CONFLICT_RESOLUTION_NONE)
- .defaultValue(CONFLICT_RESOLUTION_NONE)
- .build();
+ .name("Conflict Resolution")
+ .description("Determines how to handle the problem of filename collisions")
+ .required(true)
+ .allowableValues(CONFLICT_RESOLUTION_REPLACE, CONFLICT_RESOLUTION_IGNORE, CONFLICT_RESOLUTION_RENAME, CONFLICT_RESOLUTION_REJECT, CONFLICT_RESOLUTION_FAIL, CONFLICT_RESOLUTION_NONE)
+ .defaultValue(CONFLICT_RESOLUTION_NONE)
+ .build();
public static final PropertyDescriptor REJECT_ZERO_BYTE = new PropertyDescriptor.Builder()
- .name("Reject Zero-Byte Files")
- .description("Determines whether or not Zero-byte files should be rejected without attempting to transfer")
- .allowableValues("true", "false")
- .defaultValue("true")
- .build();
+ .name("Reject Zero-Byte Files")
+ .description("Determines whether or not Zero-byte files should be rejected without attempting to transfer")
+ .allowableValues("true", "false")
+ .defaultValue("true")
+ .build();
public static final PropertyDescriptor DOT_RENAME = new PropertyDescriptor.Builder()
- .name("Dot Rename")
- .description("If true, then the filename of the sent file is prepended with a \".\" and then renamed back to the "
- + "original once the file is completely sent. Otherwise, there is no rename. This property is ignored if the "
- + "Temporary Filename property is set.")
- .allowableValues("true", "false")
- .defaultValue("true")
- .build();
+ .name("Dot Rename")
+ .description("If true, then the filename of the sent file is prepended with a \".\" and then renamed back to the "
+ + "original once the file is completely sent. Otherwise, there is no rename. This property is ignored if the "
+ + "Temporary Filename property is set.")
+ .allowableValues("true", "false")
+ .defaultValue("true")
+ .build();
public static final PropertyDescriptor TEMP_FILENAME = new PropertyDescriptor.Builder()
- .name("Temporary Filename")
- .description("If set, the filename of the sent file will be equal to the value specified during the transfer and after successful "
- + "completion will be renamed to the original filename. If this value is set, the Dot Rename property is ignored.")
- .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
- .expressionLanguageSupported(true)
- .required(false)
- .build();
+ .name("Temporary Filename")
+ .description("If set, the filename of the sent file will be equal to the value specified during the transfer and after successful "
+ + "completion will be renamed to the original filename. If this value is set, the Dot Rename property is ignored.")
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .expressionLanguageSupported(true)
+ .required(false)
+ .build();
public static final PropertyDescriptor LAST_MODIFIED_TIME = new PropertyDescriptor.Builder()
- .name("Last Modified Time")
- .description("The lastModifiedTime to assign to the file after transferring it. If not set, the lastModifiedTime will not be changed. "
- + "Format must be yyyy-MM-dd'T'HH:mm:ssZ. You may also use expression language such as ${file.lastModifiedTime}. If the value "
- + "is invalid, the processor will not be invalid but will fail to change lastModifiedTime of the file.")
- .required(false)
- .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
- .expressionLanguageSupported(true)
- .build();
+ .name("Last Modified Time")
+ .description("The lastModifiedTime to assign to the file after transferring it. If not set, the lastModifiedTime will not be changed. "
+ + "Format must be yyyy-MM-dd'T'HH:mm:ssZ. You may also use expression language such as ${file.lastModifiedTime}. If the value "
+ + "is invalid, the processor will not be invalid but will fail to change lastModifiedTime of the file.")
+ .required(false)
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .expressionLanguageSupported(true)
+ .build();
public static final PropertyDescriptor PERMISSIONS = new PropertyDescriptor.Builder()
- .name("Permissions")
- .description("The permissions to assign to the file after transferring it. Format must be either UNIX rwxrwxrwx with a - in place of "
- + "denied permissions (e.g. rw-r--r--) or an octal number (e.g. 644). If not set, the permissions will not be changed. You may "
- + "also use expression language such as ${file.permissions}. If the value is invalid, the processor will not be invalid but will "
- + "fail to change permissions of the file.")
- .required(false)
- .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
- .expressionLanguageSupported(true)
- .build();
+ .name("Permissions")
+ .description("The permissions to assign to the file after transferring it. Format must be either UNIX rwxrwxrwx with a - in place of "
+ + "denied permissions (e.g. rw-r--r--) or an octal number (e.g. 644). If not set, the permissions will not be changed. You may "
+ + "also use expression language such as ${file.permissions}. If the value is invalid, the processor will not be invalid but will "
+ + "fail to change permissions of the file.")
+ .required(false)
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .expressionLanguageSupported(true)
+ .build();
public static final PropertyDescriptor REMOTE_OWNER = new PropertyDescriptor.Builder()
- .name("Remote Owner")
- .description("Integer value representing the User ID to set on the file after transferring it. If not set, the owner will not be set. "
- + "You may also use expression language such as ${file.owner}. If the value is invalid, the processor will not be invalid but "
- + "will fail to change the owner of the file.")
- .required(false)
- .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
- .expressionLanguageSupported(true)
- .build();
+ .name("Remote Owner")
+ .description("Integer value representing the User ID to set on the file after transferring it. If not set, the owner will not be set. "
+ + "You may also use expression language such as ${file.owner}. If the value is invalid, the processor will not be invalid but "
+ + "will fail to change the owner of the file.")
+ .required(false)
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .expressionLanguageSupported(true)
+ .build();
public static final PropertyDescriptor REMOTE_GROUP = new PropertyDescriptor.Builder()
- .name("Remote Group")
- .description("Integer value representing the Group ID to set on the file after transferring it. If not set, the group will not be set. "
- + "You may also use expression language such as ${file.group}. If the value is invalid, the processor will not be invalid but "
- + "will fail to change the group of the file.")
- .required(false)
- .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
- .expressionLanguageSupported(true)
- .build();
+ .name("Remote Group")
+ .description("Integer value representing the Group ID to set on the file after transferring it. If not set, the group will not be set. "
+ + "You may also use expression language such as ${file.group}. If the value is invalid, the processor will not be invalid but "
+ + "will fail to change the group of the file.")
+ .required(false)
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .expressionLanguageSupported(true)
+ .build();
public static final PropertyDescriptor BATCH_SIZE = new PropertyDescriptor.Builder()
- .name("Batch Size")
- .description("The maximum number of FlowFiles to send in a single connection")
- .required(true)
- .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
- .defaultValue("500")
- .build();
+ .name("Batch Size")
+ .description("The maximum number of FlowFiles to send in a single connection")
+ .required(true)
+ .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
+ .defaultValue("500")
+ .build();
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/d1d57931/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/ListableEntity.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/ListableEntity.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/ListableEntity.java
new file mode 100644
index 0000000..6e019ff
--- /dev/null
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/ListableEntity.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.standard.util;
+
+public interface ListableEntity {
+
+ /**
+ * @return The name of the remote entity
+ */
+ String getName();
+
+ /**
+ * @return the identifier of the remote entity. This may or may not be the same as the name of the
+ * entity but should be unique across all entities.
+ */
+ String getIdentifier();
+
+
+ /**
+ * @return the timestamp for this entity so that we can be efficient about not performing listings of the same
+ * entities multiple times
+ */
+ long getTimestamp();
+
+}
http://git-wip-us.apache.org/repos/asf/nifi/blob/d1d57931/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/PermissionDeniedException.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/PermissionDeniedException.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/PermissionDeniedException.java
new file mode 100644
index 0000000..465995e
--- /dev/null
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/PermissionDeniedException.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.standard.util;
+
+import java.io.IOException;
+
+public class PermissionDeniedException extends IOException {
+ private static final long serialVersionUID = -6215434916883053982L;
+
+ public PermissionDeniedException(final String message) {
+ super(message);
+ }
+
+ public PermissionDeniedException(final String message, final Throwable t) {
+ super(message, t);
+ }
+}
http://git-wip-us.apache.org/repos/asf/nifi/blob/d1d57931/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/SFTPTransfer.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/SFTPTransfer.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/SFTPTransfer.java
index 19955e7..c28f275 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/SFTPTransfer.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/SFTPTransfer.java
@@ -17,6 +17,7 @@
package org.apache.nifi.processors.standard.util;
import java.io.File;
+import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
import java.nio.file.Path;
@@ -51,45 +52,45 @@ import com.jcraft.jsch.SftpException;
public class SFTPTransfer implements FileTransfer {
public static final PropertyDescriptor PRIVATE_KEY_PATH = new PropertyDescriptor.Builder()
- .name("Private Key Path")
- .description("The fully qualified path to the Private Key file")
- .required(false)
- .addValidator(StandardValidators.FILE_EXISTS_VALIDATOR)
- .build();
+ .name("Private Key Path")
+ .description("The fully qualified path to the Private Key file")
+ .required(false)
+ .addValidator(StandardValidators.FILE_EXISTS_VALIDATOR)
+ .build();
public static final PropertyDescriptor PRIVATE_KEY_PASSPHRASE = new PropertyDescriptor.Builder()
- .name("Private Key Passphrase")
- .description("Password for the private key")
- .required(false)
- .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
- .sensitive(true)
- .build();
+ .name("Private Key Passphrase")
+ .description("Password for the private key")
+ .required(false)
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .sensitive(true)
+ .build();
public static final PropertyDescriptor HOST_KEY_FILE = new PropertyDescriptor.Builder()
- .name("Host Key File")
- .description("If supplied, the given file will be used as the Host Key; otherwise, no use host key file will be used")
- .addValidator(StandardValidators.FILE_EXISTS_VALIDATOR)
- .required(false)
- .build();
+ .name("Host Key File")
+ .description("If supplied, the given file will be used as the Host Key; otherwise, no use host key file will be used")
+ .addValidator(StandardValidators.FILE_EXISTS_VALIDATOR)
+ .required(false)
+ .build();
public static final PropertyDescriptor STRICT_HOST_KEY_CHECKING = new PropertyDescriptor.Builder()
- .name("Strict Host Key Checking")
- .description("Indicates whether or not strict enforcement of hosts keys should be applied")
- .allowableValues("true", "false")
- .defaultValue("false")
- .required(true)
- .build();
+ .name("Strict Host Key Checking")
+ .description("Indicates whether or not strict enforcement of hosts keys should be applied")
+ .allowableValues("true", "false")
+ .defaultValue("false")
+ .required(true)
+ .build();
public static final PropertyDescriptor PORT = new PropertyDescriptor.Builder()
- .name("Port")
- .description("The port that the remote system is listening on for file transfers")
- .addValidator(StandardValidators.PORT_VALIDATOR)
- .required(true)
- .defaultValue("22")
- .build();
+ .name("Port")
+ .description("The port that the remote system is listening on for file transfers")
+ .addValidator(StandardValidators.PORT_VALIDATOR)
+ .required(true)
+ .defaultValue("22")
+ .build();
public static final PropertyDescriptor USE_KEEPALIVE_ON_TIMEOUT = new PropertyDescriptor.Builder()
- .name("Send Keep Alive On Timeout")
- .description("Indicates whether or not to send a single Keep Alive message when SSH socket times out")
- .allowableValues("true", "false")
- .defaultValue("true")
- .required(true)
- .build();
+ .name("Send Keep Alive On Timeout")
+ .description("Indicates whether or not to send a single Keep Alive message when SSH socket times out")
+ .allowableValues("true", "false")
+ .defaultValue("true")
+ .required(true)
+ .build();
/**
* Dynamic property which is used to decide if the {@link #ensureDirectoryExists(FlowFile, File)} method should perform a {@link ChannelSftp#ls(String)} before calling
@@ -99,12 +100,12 @@ public class SFTPTransfer implements FileTransfer {
* This property is dynamic until deemed a worthy inclusion as proper.
*/
public static final PropertyDescriptor DISABLE_DIRECTORY_LISTING = new PropertyDescriptor.Builder()
- .name("Disable Directory Listing")
- .description("Disables directory listings before operations which might fail, such as configurations which create directory structures.")
- .addValidator(StandardValidators.BOOLEAN_VALIDATOR)
- .dynamic(true)
- .defaultValue("false")
- .build();
+ .name("Disable Directory Listing")
+ .description("Disables directory listings before operations which might fail, such as configurations which create directory structures.")
+ .addValidator(StandardValidators.BOOLEAN_VALIDATOR)
+ .dynamic(true)
+ .defaultValue("false")
+ .build();
private final ProcessorLog logger;
@@ -133,7 +134,16 @@ public class SFTPTransfer implements FileTransfer {
public List<FileInfo> getListing() throws IOException {
final String path = ctx.getProperty(FileTransfer.REMOTE_PATH).evaluateAttributeExpressions().getValue();
final int depth = 0;
- final int maxResults = ctx.getProperty(FileTransfer.REMOTE_POLL_BATCH_SIZE).asInteger();
+
+ final int maxResults;
+ final PropertyValue batchSizeValue = ctx.getProperty(FileTransfer.REMOTE_POLL_BATCH_SIZE);
+ if (batchSizeValue == null) {
+ maxResults = Integer.MAX_VALUE;
+ } else {
+ final Integer configuredValue = batchSizeValue.asInteger();
+ maxResults = configuredValue == null ? Integer.MAX_VALUE : configuredValue;
+ }
+
final List<FileInfo> listing = new ArrayList<>(1000);
getListing(path, depth, maxResults, listing);
return listing;
@@ -222,7 +232,15 @@ public class SFTPTransfer implements FileTransfer {
sftp.ls(path, filter);
}
} catch (final SftpException e) {
- throw new IOException("Failed to obtain file listing for " + (path == null ? "current directory" : path), e);
+ final String pathDesc = path == null ? "current directory" : path;
+ switch (e.id) {
+ case ChannelSftp.SSH_FX_NO_SUCH_FILE:
+ throw new FileNotFoundException("Could not perform listing on " + pathDesc + " because could not find the file on the remote server");
+ case ChannelSftp.SSH_FX_PERMISSION_DENIED:
+ throw new PermissionDeniedException("Could not perform listing on " + pathDesc + " due to insufficient permissions");
+ default:
+ throw new IOException("Failed to obtain file listing for " + pathDesc, e);
+ }
}
for (final LsEntry entry : subDirs) {
@@ -251,24 +269,36 @@ public class SFTPTransfer implements FileTransfer {
}
FileInfo.Builder builder = new FileInfo.Builder()
- .filename(entry.getFilename())
- .fullPathFileName(newFullForwardPath)
- .directory(entry.getAttrs().isDir())
- .size(entry.getAttrs().getSize())
- .lastModifiedTime(entry.getAttrs().getMTime() * 1000L)
- .permissions(perms)
- .owner(Integer.toString(entry.getAttrs().getUId()))
- .group(Integer.toString(entry.getAttrs().getGId()));
+ .filename(entry.getFilename())
+ .fullPathFileName(newFullForwardPath)
+ .directory(entry.getAttrs().isDir())
+ .size(entry.getAttrs().getSize())
+ .lastModifiedTime(entry.getAttrs().getMTime() * 1000L)
+ .permissions(perms)
+ .owner(Integer.toString(entry.getAttrs().getUId()))
+ .group(Integer.toString(entry.getAttrs().getGId()));
return builder.build();
}
@Override
public InputStream getInputStream(final String remoteFileName) throws IOException {
- final ChannelSftp sftp = getChannel(null);
+ return getInputStream(remoteFileName, null);
+ }
+
+ @Override
+ public InputStream getInputStream(final String remoteFileName, final FlowFile flowFile) throws IOException {
+ final ChannelSftp sftp = getChannel(flowFile);
try {
return sftp.get(remoteFileName);
} catch (final SftpException e) {
- throw new IOException("Failed to obtain file content for " + remoteFileName, e);
+ switch (e.id) {
+ case ChannelSftp.SSH_FX_NO_SUCH_FILE:
+ throw new FileNotFoundException("Could not find file " + remoteFileName + " on remote SFTP Server");
+ case ChannelSftp.SSH_FX_PERMISSION_DENIED:
+ throw new PermissionDeniedException("Insufficient permissions to read file " + remoteFileName + " from remote SFTP Server", e);
+ default:
+ throw new IOException("Failed to obtain file content for " + remoteFileName, e);
+ }
}
}
@@ -283,7 +313,14 @@ public class SFTPTransfer implements FileTransfer {
try {
sftp.rm(fullPath);
} catch (final SftpException e) {
- throw new IOException("Failed to delete remote file " + fullPath, e);
+ switch (e.id) {
+ case ChannelSftp.SSH_FX_NO_SUCH_FILE:
+ throw new FileNotFoundException("Could not find file " + remoteFileName + " to remove from remote SFTP Server");
+ case ChannelSftp.SSH_FX_PERMISSION_DENIED:
+ throw new PermissionDeniedException("Insufficient permissions to delete file " + remoteFileName + " from remote SFTP Server", e);
+ default:
+ throw new IOException("Failed to delete remote file " + fullPath, e);
+ }
}
}
@@ -333,10 +370,10 @@ public class SFTPTransfer implements FileTransfer {
if (directoryName.getParent() != null && !directoryName.getParentFile().equals(new File(File.separator))) {
ensureDirectoryExists(flowFile, directoryName.getParentFile());
}
- logger.debug("Remote Directory {} does not exist; creating it", new Object[]{remoteDirectory});
+ logger.debug("Remote Directory {} does not exist; creating it", new Object[] {remoteDirectory});
try {
channel.mkdir(remoteDirectory);
- logger.debug("Created {}", new Object[]{remoteDirectory});
+ logger.debug("Created {}", new Object[] {remoteDirectory});
} catch (final SftpException e) {
throw new IOException("Failed to create remote directory " + remoteDirectory + " due to " + e, e);
}
@@ -358,9 +395,9 @@ public class SFTPTransfer implements FileTransfer {
final JSch jsch = new JSch();
try {
- final Session session = jsch.getSession(ctx.getProperty(USERNAME).getValue(),
- ctx.getProperty(HOSTNAME).evaluateAttributeExpressions(flowFile).getValue(),
- ctx.getProperty(PORT).evaluateAttributeExpressions(flowFile).asInteger().intValue());
+ final Session session = jsch.getSession(ctx.getProperty(USERNAME).evaluateAttributeExpressions(flowFile).getValue(),
+ ctx.getProperty(HOSTNAME).evaluateAttributeExpressions(flowFile).getValue(),
+ ctx.getProperty(PORT).evaluateAttributeExpressions(flowFile).asInteger().intValue());
final String hostKeyVal = ctx.getProperty(HOST_KEY_FILE).getValue();
if (hostKeyVal != null) {
@@ -371,7 +408,8 @@ public class SFTPTransfer implements FileTransfer {
properties.setProperty("StrictHostKeyChecking", ctx.getProperty(STRICT_HOST_KEY_CHECKING).asBoolean() ? "yes" : "no");
properties.setProperty("PreferredAuthentications", "publickey,password");
- if (ctx.getProperty(FileTransfer.USE_COMPRESSION).asBoolean()) {
+ final PropertyValue compressionValue = ctx.getProperty(FileTransfer.USE_COMPRESSION);
+ if (compressionValue != null && "true".equalsIgnoreCase(compressionValue.getValue())) {
properties.setProperty("compression.s2c", "zlib@openssh.com,zlib,none");
properties.setProperty("compression.c2s", "zlib@openssh.com,zlib,none");
} else {
@@ -381,12 +419,12 @@ public class SFTPTransfer implements FileTransfer {
session.setConfig(properties);
- final String privateKeyFile = ctx.getProperty(PRIVATE_KEY_PATH).getValue();
+ final String privateKeyFile = ctx.getProperty(PRIVATE_KEY_PATH).evaluateAttributeExpressions(flowFile).getValue();
if (privateKeyFile != null) {
- jsch.addIdentity(privateKeyFile, ctx.getProperty(PRIVATE_KEY_PASSPHRASE).getValue());
+ jsch.addIdentity(privateKeyFile, ctx.getProperty(PRIVATE_KEY_PASSPHRASE).evaluateAttributeExpressions(flowFile).getValue());
}
- final String password = ctx.getProperty(FileTransfer.PASSWORD).getValue();
+ final String password = ctx.getProperty(FileTransfer.PASSWORD).evaluateAttributeExpressions(flowFile).getValue();
if (password != null) {
session.setPassword(password);
}
@@ -428,7 +466,7 @@ public class SFTPTransfer implements FileTransfer {
sftp.exit();
}
} catch (final Exception ex) {
- logger.warn("Failed to close ChannelSftp due to {}", new Object[]{ex.toString()}, ex);
+ logger.warn("Failed to close ChannelSftp due to {}", new Object[] {ex.toString()}, ex);
}
sftp = null;
@@ -437,7 +475,7 @@ public class SFTPTransfer implements FileTransfer {
session.disconnect();
}
} catch (final Exception ex) {
- logger.warn("Failed to close session due to {}", new Object[]{ex.toString()}, ex);
+ logger.warn("Failed to close session due to {}", new Object[] {ex.toString()}, ex);
}
session = null;
}
@@ -515,7 +553,7 @@ public class SFTPTransfer implements FileTransfer {
int time = (int) (fileModifyTime.getTime() / 1000L);
sftp.setMtime(tempPath, time);
} catch (final Exception e) {
- logger.error("Failed to set lastModifiedTime on {} to {} due to {}", new Object[]{tempPath, lastModifiedTime, e});
+ logger.error("Failed to set lastModifiedTime on {} to {} due to {}", new Object[] {tempPath, lastModifiedTime, e});
}
}
@@ -527,7 +565,7 @@ public class SFTPTransfer implements FileTransfer {
sftp.chmod(perms, tempPath);
}
} catch (final Exception e) {
- logger.error("Failed to set permission on {} to {} due to {}", new Object[]{tempPath, permissions, e});
+ logger.error("Failed to set permission on {} to {} due to {}", new Object[] {tempPath, permissions, e});
}
}
@@ -536,7 +574,7 @@ public class SFTPTransfer implements FileTransfer {
try {
sftp.chown(Integer.parseInt(owner), tempPath);
} catch (final Exception e) {
- logger.error("Failed to set owner on {} to {} due to {}", new Object[]{tempPath, owner, e});
+ logger.error("Failed to set owner on {} to {} due to {}", new Object[] {tempPath, owner, e});
}
}
@@ -545,7 +583,7 @@ public class SFTPTransfer implements FileTransfer {
try {
sftp.chgrp(Integer.parseInt(group), tempPath);
} catch (final Exception e) {
- logger.error("Failed to set group on {} to {} due to {}", new Object[]{tempPath, group, e});
+ logger.error("Failed to set group on {} to {} due to {}", new Object[] {tempPath, group, e});
}
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/d1d57931/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
index ff39ad3..b12fb6f 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
@@ -28,6 +28,7 @@ org.apache.nifi.processors.standard.EvaluateXQuery
org.apache.nifi.processors.standard.ExecuteStreamCommand
org.apache.nifi.processors.standard.ExecuteProcess
org.apache.nifi.processors.standard.ExtractText
+org.apache.nifi.processors.standard.FetchSFTP
org.apache.nifi.processors.standard.GenerateFlowFile
org.apache.nifi.processors.standard.GetFile
org.apache.nifi.processors.standard.GetFTP
@@ -43,6 +44,7 @@ org.apache.nifi.processors.standard.GetJMSQueue
org.apache.nifi.processors.standard.GetJMSTopic
org.apache.nifi.processors.standard.ListenHTTP
org.apache.nifi.processors.standard.ListenUDP
+org.apache.nifi.processors.standard.ListSFTP
org.apache.nifi.processors.standard.LogAttribute
org.apache.nifi.processors.standard.MergeContent
org.apache.nifi.processors.standard.ModifyBytes
http://git-wip-us.apache.org/repos/asf/nifi/blob/d1d57931/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestAbstractListProcessor.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestAbstractListProcessor.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestAbstractListProcessor.java
new file mode 100644
index 0000000..ba84939
--- /dev/null
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestAbstractListProcessor.java
@@ -0,0 +1,221 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.standard;
+
+import static org.junit.Assert.assertEquals;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.controller.AbstractControllerService;
+import org.apache.nifi.distributed.cache.client.Deserializer;
+import org.apache.nifi.distributed.cache.client.DistributedMapCacheClient;
+import org.apache.nifi.distributed.cache.client.Serializer;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processors.standard.util.ListableEntity;
+import org.apache.nifi.reporting.InitializationException;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.junit.Test;
+
+public class TestAbstractListProcessor {
+
+ @Test
+ public void testOnlyNewEntriesEmitted() {
+ final ConcreteListProcessor proc = new ConcreteListProcessor();
+ final TestRunner runner = TestRunners.newTestRunner(proc);
+ runner.run();
+
+ runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 0);
+ proc.addEntity("name", "id", 1492L);
+ runner.run();
+
+ runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 1);
+ runner.clearTransferState();
+
+ proc.addEntity("name", "id2", 1492L);
+ runner.run();
+ runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 1);
+ runner.clearTransferState();
+
+ proc.addEntity("name", "id2", 1492L);
+ runner.run();
+ runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 0);
+ runner.clearTransferState();
+
+ proc.addEntity("name", "id3", 1491L);
+ runner.run();
+ runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 0);
+ runner.clearTransferState();
+
+ proc.addEntity("name", "id2", 1492L);
+ runner.run();
+ runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 0);
+ runner.clearTransferState();
+
+ proc.addEntity("name", "id2", 1493L);
+ runner.run();
+ runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 1);
+ runner.clearTransferState();
+
+ proc.addEntity("name", "id2", 1493L);
+ runner.run();
+ runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 0);
+ runner.clearTransferState();
+
+ proc.addEntity("name", "id2", 1493L);
+ runner.run();
+ runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 0);
+ runner.clearTransferState();
+
+ proc.addEntity("name", "id", 1494L);
+ runner.run();
+ runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 1);
+ runner.clearTransferState();
+ }
+
+ @Test
+ public void testStateStoredInDistributedService() throws InitializationException {
+ final ConcreteListProcessor proc = new ConcreteListProcessor();
+ final TestRunner runner = TestRunners.newTestRunner(proc);
+ final DistributedCache cache = new DistributedCache();
+ runner.addControllerService("cache", cache);
+ runner.enableControllerService(cache);
+ runner.setProperty(AbstractListProcessor.DISTRIBUTED_CACHE_SERVICE, "cache");
+
+ runner.run();
+
+ proc.addEntity("name", "id", 1492L);
+ runner.run();
+
+ assertEquals(1, cache.stored.size());
+ }
+
+ @Test
+ public void testFetchOnStart() throws InitializationException {
+ final ConcreteListProcessor proc = new ConcreteListProcessor();
+ final TestRunner runner = TestRunners.newTestRunner(proc);
+ final DistributedCache cache = new DistributedCache();
+ runner.addControllerService("cache", cache);
+ runner.enableControllerService(cache);
+ runner.setProperty(AbstractListProcessor.DISTRIBUTED_CACHE_SERVICE, "cache");
+
+ runner.run();
+
+ assertEquals(1, cache.fetchCount);
+ }
+
+ private static class DistributedCache extends AbstractControllerService implements DistributedMapCacheClient {
+ private final Map<Object, Object> stored = new HashMap<>();
+ private int fetchCount = 0;
+
+ @Override
+ public <K, V> boolean putIfAbsent(K key, V value, Serializer<K> keySerializer, Serializer<V> valueSerializer) throws IOException {
+ return false;
+ }
+
+ @Override
+ public <K, V> V getAndPutIfAbsent(K key, V value, Serializer<K> keySerializer, Serializer<V> valueSerializer, Deserializer<V> valueDeserializer) throws IOException {
+ return null;
+ }
+
+ @Override
+ public <K> boolean containsKey(K key, Serializer<K> keySerializer) throws IOException {
+ return false;
+ }
+
+ @Override
+ public <K, V> void put(K key, V value, Serializer<K> keySerializer, Serializer<V> valueSerializer) throws IOException {
+ stored.put(key, value);
+ }
+
+ @Override
+ @SuppressWarnings("unchecked")
+ public <K, V> V get(K key, Serializer<K> keySerializer, Deserializer<V> valueDeserializer) throws IOException {
+ fetchCount++;
+ return (V) stored.get(key);
+ }
+
+ @Override
+ public void close() throws IOException {
+ }
+
+ @Override
+ public <K> boolean remove(K key, Serializer<K> serializer) throws IOException {
+ final Object value = stored.remove(key);
+ return value != null;
+ }
+ }
+
+
+ private static class ConcreteListProcessor extends AbstractListProcessor<ListableEntity> {
+ private final List<ListableEntity> entities = new ArrayList<>();
+
+ @Override
+ protected File getPersistenceFile() {
+ return new File("target/ListProcessor-local-state.json");
+ }
+
+ public void addEntity(final String name, final String identifier, final long timestamp) {
+ final ListableEntity entity = new ListableEntity() {
+ @Override
+ public String getName() {
+ return name;
+ }
+
+ @Override
+ public String getIdentifier() {
+ return identifier;
+ }
+
+ @Override
+ public long getTimestamp() {
+ return timestamp;
+ }
+ };
+
+ entities.add(entity);
+ }
+
+ @Override
+ protected Map<String, String> createAttributes(final ListableEntity entity, final ProcessContext context) {
+ return Collections.emptyMap();
+ }
+
+ @Override
+ protected String getPath(final ProcessContext context) {
+ return "/path";
+ }
+
+ @Override
+ protected List<ListableEntity> performListing(final ProcessContext context, final Long minTimestamp) throws IOException {
+ return Collections.unmodifiableList(entities);
+ }
+
+ @Override
+ protected boolean isListingResetNecessary(PropertyDescriptor property) {
+ return false;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/nifi/blob/d1d57931/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestFetchFileTransfer.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestFetchFileTransfer.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestFetchFileTransfer.java
new file mode 100644
index 0000000..7aa8f9c
--- /dev/null
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestFetchFileTransfer.java
@@ -0,0 +1,186 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.standard;
+
+import static org.junit.Assert.assertFalse;
+
+import java.io.ByteArrayInputStream;
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processors.standard.util.FileInfo;
+import org.apache.nifi.processors.standard.util.FileTransfer;
+import org.apache.nifi.processors.standard.util.PermissionDeniedException;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.junit.Test;
+
+public class TestFetchFileTransfer {
+
+ @Test
+ public void testContentFetched() {
+ final TestableFetchFileTransfer proc = new TestableFetchFileTransfer();
+ final TestRunner runner = TestRunners.newTestRunner(proc);
+ runner.setProperty(FetchFileTransfer.HOSTNAME, "localhost");
+ runner.setProperty(FetchFileTransfer.UNDEFAULTED_PORT, "11");
+ runner.setProperty(FetchFileTransfer.REMOTE_FILENAME, "${filename}");
+
+ proc.addContent("hello.txt", "world".getBytes());
+ final Map<String, String> attrs = new HashMap<>();
+ attrs.put("filename", "hello.txt");
+ runner.enqueue(new byte[0], attrs);
+
+ runner.run(1, false, false);
+ runner.assertAllFlowFilesTransferred(FetchFileTransfer.REL_SUCCESS, 1);
+ assertFalse(proc.closed);
+ runner.getFlowFilesForRelationship(FetchFileTransfer.REL_SUCCESS).get(0).assertContentEquals("world");
+ }
+
+ @Test
+ public void testContentNotFound() {
+ final TestableFetchFileTransfer proc = new TestableFetchFileTransfer();
+ final TestRunner runner = TestRunners.newTestRunner(proc);
+ runner.setProperty(FetchFileTransfer.HOSTNAME, "localhost");
+ runner.setProperty(FetchFileTransfer.UNDEFAULTED_PORT, "11");
+ runner.setProperty(FetchFileTransfer.REMOTE_FILENAME, "${filename}");
+
+ final Map<String, String> attrs = new HashMap<>();
+ attrs.put("filename", "hello.txt");
+ runner.enqueue(new byte[0], attrs);
+
+ runner.run(1, false, false);
+ runner.assertAllFlowFilesTransferred(FetchFileTransfer.REL_NOT_FOUND, 1);
+ }
+
+ @Test
+ public void testInsufficientPermissions() {
+ final TestableFetchFileTransfer proc = new TestableFetchFileTransfer();
+ final TestRunner runner = TestRunners.newTestRunner(proc);
+ runner.setProperty(FetchFileTransfer.HOSTNAME, "localhost");
+ runner.setProperty(FetchFileTransfer.UNDEFAULTED_PORT, "11");
+ runner.setProperty(FetchFileTransfer.REMOTE_FILENAME, "${filename}");
+
+ proc.addContent("hello.txt", "world".getBytes());
+ proc.allowAccess = false;
+ final Map<String, String> attrs = new HashMap<>();
+ attrs.put("filename", "hello.txt");
+ runner.enqueue(new byte[0], attrs);
+
+ runner.run(1, false, false);
+ runner.assertAllFlowFilesTransferred(FetchFileTransfer.REL_PERMISSION_DENIED, 1);
+ }
+
+ private static class TestableFetchFileTransfer extends FetchFileTransfer {
+ private boolean allowAccess = true;
+ private boolean closed = false;
+ private final Map<String, byte[]> fileContents = new HashMap<>();
+
+ public void addContent(final String filename, final byte[] content) {
+ this.fileContents.put(filename, content);
+ }
+
+ @Override
+ protected FileTransfer createFileTransfer(final ProcessContext context) {
+ return new FileTransfer() {
+ @Override
+ public void close() throws IOException {
+ closed = true;
+ }
+
+ @Override
+ public String getHomeDirectory(FlowFile flowFile) throws IOException {
+ return null;
+ }
+
+ @Override
+ public List<FileInfo> getListing() throws IOException {
+ return null;
+ }
+
+ @Override
+ public InputStream getInputStream(final String remoteFileName) throws IOException {
+ return getInputStream(remoteFileName, null);
+ }
+
+ @Override
+ public InputStream getInputStream(String remoteFileName, FlowFile flowFile) throws IOException {
+ if (!allowAccess) {
+ throw new PermissionDeniedException("test permission denied");
+ }
+
+ final byte[] content = fileContents.get(remoteFileName);
+ if (content == null) {
+ throw new FileNotFoundException();
+ }
+
+ return new ByteArrayInputStream(content);
+ }
+
+ @Override
+ public void flush() throws IOException {
+ }
+
+ @Override
+ public FileInfo getRemoteFileInfo(FlowFile flowFile, String path, String remoteFileName) throws IOException {
+ return null;
+ }
+
+ @Override
+ public String put(FlowFile flowFile, String path, String filename, InputStream content) throws IOException {
+ return null;
+ }
+
+ @Override
+ public void deleteFile(String path, String remoteFileName) throws IOException {
+ if (!fileContents.containsKey(remoteFileName)) {
+ throw new FileNotFoundException();
+ }
+
+ fileContents.remove(remoteFileName);
+ }
+
+ @Override
+ public void deleteDirectory(String remoteDirectoryName) throws IOException {
+
+ }
+
+ @Override
+ public boolean isClosed() {
+ return false;
+ }
+
+ @Override
+ public String getProtocolName() {
+ return "test";
+ }
+
+ @Override
+ public void ensureDirectoryExists(FlowFile flowFile, File remoteDirectory) throws IOException {
+
+ }
+ };
+ }
+ }
+}