You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by mc...@apache.org on 2015/10/30 15:29:50 UTC

[24/50] [abbrv] nifi git commit: NIFI-673: Added Completion Strategy to FetchSFTP

NIFI-673: Added Completion Strategy to FetchSFTP


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/b0322d9f
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/b0322d9f
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/b0322d9f

Branch: refs/heads/NIFI-655
Commit: b0322d9ffe8d117aae4faf7dd3e2881a28940f96
Parents: d1d5793
Author: Mark Payne <ma...@hotmail.com>
Authored: Mon Oct 5 16:11:40 2015 -0400
Committer: Mark Payne <ma...@hotmail.com>
Committed: Sun Oct 25 11:13:02 2015 -0400

----------------------------------------------------------------------
 .../standard/AbstractListProcessor.java         |  38 +++---
 .../processors/standard/FetchFileTransfer.java  |  66 ++++++++--
 .../nifi/processors/standard/FetchSFTP.java     |  15 ++-
 .../processors/standard/ListFileTransfer.java   |   8 ++
 .../nifi/processors/standard/ListSFTP.java      |   7 +-
 .../processors/standard/util/EntityListing.java |   2 +-
 .../processors/standard/util/FTPTransfer.java   |  26 ++--
 .../processors/standard/util/FileTransfer.java  |   2 +
 .../processors/standard/util/SFTPTransfer.java  |  17 +++
 .../standard/TestFetchFileTransfer.java         | 131 +++++++++++++++++++
 10 files changed, 265 insertions(+), 47 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/b0322d9f/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractListProcessor.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractListProcessor.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractListProcessor.java
index 8a7fade..e592483 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractListProcessor.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractListProcessor.java
@@ -70,23 +70,25 @@ import org.codehaus.jackson.map.ObjectMapper;
  *
  * <p>
  * In order to make use of this abstract class, the entities listed must meet the following criteria:
- * <ul>
- * <li>
- * Entity must have a timestamp associated with it. This timestamp is used to determine if entities are "new" or not. Any entity that is
- * returned by the listing will be considered "new" if the timestamp is later than the latest timestamp pulled.
- * </li>
- * <li>
- * Entity must have a unique identifier. This is used in conjunction with the timestamp in order to determine whether or not the entity is
- * new. If the timestamp of an entity is before the latest timestamp pulled, then the entity is not considered new. If the timestamp is later
- * than the last timestamp pulled, then the entity is considered new. If the timestamp is equal to the latest timestamp pulled, then the entity's
- * identifier is compared to all of the entity identifiers that have that same timestamp in order to determine whether or not the entity has been
- * seen already.
- * </li>
- * <li>
- * Entity must have a user-readable name that can be used for logging purposes.
- * </li>
  * </p>
  *
+ * <ul>
+ *  <li>
+ *      Entity must have a timestamp associated with it. This timestamp is used to determine if entities are "new" or not. Any entity that is
+ *      returned by the listing will be considered "new" if the timestamp is later than the latest timestamp pulled.
+ *  </li>
+ *  <li>
+ *      Entity must have a unique identifier. This is used in conjunction with the timestamp in order to determine whether or not the entity is
+ *      new. If the timestamp of an entity is before the latest timestamp pulled, then the entity is not considered new. If the timestamp is later
+ *      than the last timestamp pulled, then the entity is considered new. If the timestamp is equal to the latest timestamp pulled, then the entity's
+ *      identifier is compared to all of the entity identifiers that have that same timestamp in order to determine whether or not the entity has been
+ *      seen already.
+ *  </li>
+ *  <li>
+ *      Entity must have a user-readable name that can be used for logging purposes.
+ *  </li>
+ * </ul>
+ *
  * <p>
  * This class persists state across restarts so that even if NiFi is restarted, duplicates will not be pulled from the remote system. This is performed using
  * two different mechanisms. First, state is stored locally. This allows the system to be restarted and begin processing where it left off. The state that is
@@ -111,6 +113,7 @@ import org.codehaus.jackson.map.ObjectMapper;
  *
  * <p>
  * Subclasses are responsible for the following:
+ * </p>
  *
  * <ul>
  * <li>
@@ -134,7 +137,6 @@ import org.codehaus.jackson.map.ObjectMapper;
  * a boolean indicating whether or not a change in the value of the provided property should trigger the timestamp and identifier information to be cleared.
  * </li>
  * </ul>
- * </p>
  */
 @TriggerSerially
 public abstract class AbstractListProcessor<T extends ListableEntity> extends AbstractProcessor {
@@ -372,8 +374,8 @@ public abstract class AbstractListProcessor<T extends ListableEntity> extends Ab
         int listCount = 0;
         Long latestListingTimestamp = null;
         for (final T entity : entityList) {
-            final boolean list = (minTimestamp == null || entity.getTimestamp() > minTimestamp ||
-                (entity.getTimestamp() == minTimestamp && !latestIdentifiersListed.contains(entity.getIdentifier())));
+            final boolean list = (minTimestamp == null || entity.getTimestamp() > minTimestamp
+                || (entity.getTimestamp() == minTimestamp && !latestIdentifiersListed.contains(entity.getIdentifier())));
 
             // Create the FlowFile for this path.
             if (list) {

http://git-wip-us.apache.org/repos/asf/nifi/blob/b0322d9f/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/FetchFileTransfer.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/FetchFileTransfer.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/FetchFileTransfer.java
index 5eecac3..ab0be78 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/FetchFileTransfer.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/FetchFileTransfer.java
@@ -31,7 +31,9 @@ import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.TimeUnit;
 
+import org.apache.commons.lang3.StringUtils;
 import org.apache.nifi.annotation.lifecycle.OnStopped;
+import org.apache.nifi.components.AllowableValue;
 import org.apache.nifi.components.PropertyDescriptor;
 import org.apache.nifi.flowfile.FlowFile;
 import org.apache.nifi.flowfile.attributes.CoreAttributes;
@@ -49,9 +51,18 @@ import org.apache.nifi.util.StopWatch;
 import org.apache.nifi.util.Tuple;
 
 /**
- * A base class for FetchSFTP, FetchFTP processors
+ * A base class for FetchSFTP, FetchFTP processors.
+ *
+ * Note that implementations of this class should never use the @SupportsBatching annotation! Doing so
+ * could result in data loss!
  */
 public abstract class FetchFileTransfer extends AbstractProcessor {
+
+    static final AllowableValue COMPLETION_NONE = new AllowableValue("None", "None", "Leave the file as-is");
+    static final AllowableValue COMPLETION_MOVE = new AllowableValue("Move File", "Move File", "Move the file to the directory specified by the <Move Destination Directory> property");
+    static final AllowableValue COMPLETION_DELETE = new AllowableValue("Delete File", "Delete File", "Deletes the original file from the remote system");
+
+
     static final PropertyDescriptor HOSTNAME = new PropertyDescriptor.Builder()
         .name("Hostname")
         .description("The fully-qualified hostname or IP address of the host to fetch the data from")
@@ -73,13 +84,25 @@ public abstract class FetchFileTransfer extends AbstractProcessor {
         .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
         .expressionLanguageSupported(true)
         .build();
-    public static final PropertyDescriptor DELETE_ORIGINAL = new PropertyDescriptor.Builder()
-        .name("Delete Original")
-        .description("Determines whether or not the file is deleted from the remote system after it has been successfully transferred")
-        .defaultValue("true")
-        .allowableValues("true", "false")
+    static final PropertyDescriptor COMPLETION_STRATEGY = new PropertyDescriptor.Builder()
+        .name("Completion Strategy")
+        .description("Specifies what to do with the original file on the server once it has been pulled into NiFi. If the Completion Strategy fails, a warning will be "
+            + "logged but the data will still be transferred.")
+        .expressionLanguageSupported(false)
+        .allowableValues(COMPLETION_NONE, COMPLETION_MOVE, COMPLETION_DELETE)
+        .defaultValue(COMPLETION_NONE.getValue())
         .required(true)
         .build();
+    static final PropertyDescriptor MOVE_DESTINATION_DIR = new PropertyDescriptor.Builder()
+        .name("Move Destination Directory")
+        .description("The directory on the remote server to the move the original file to once it has been ingested into NiFi. "
+            + "This property is ignored unless the Completion Strategy is set to \"Move File\". The specified directory must already exist on"
+            + "the remote system, or the rename will fail.")
+        .expressionLanguageSupported(true)
+        .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+        .required(false)
+        .build();
+
 
     public static final Relationship REL_SUCCESS = new Relationship.Builder()
         .name("success")
@@ -156,7 +179,8 @@ public abstract class FetchFileTransfer extends AbstractProcessor {
         properties.add(HOSTNAME);
         properties.add(UNDEFAULTED_PORT);
         properties.add(REMOTE_FILENAME);
-        properties.add(DELETE_ORIGINAL);
+        properties.add(COMPLETION_STRATEGY);
+        properties.add(MOVE_DESTINATION_DIR);
         return properties;
     }
 
@@ -203,6 +227,7 @@ public abstract class FetchFileTransfer extends AbstractProcessor {
         final InputStream in;
         try {
             in = transfer.getInputStream(filename, flowFile);
+
             flowFile = session.write(flowFile, new OutputStreamCallback() {
                 @Override
                 public void process(final OutputStream out) throws IOException {
@@ -250,15 +275,34 @@ public abstract class FetchFileTransfer extends AbstractProcessor {
             stopWatch.getElapsed(TimeUnit.MILLISECONDS));
         session.transfer(flowFile, REL_SUCCESS);
 
-        // delete remote file is necessary
-        final boolean deleteOriginal = context.getProperty(DELETE_ORIGINAL).asBoolean();
-        if (deleteOriginal) {
+        // it is critical that we commit the session before moving/deleting the remote file. Otherwise, we could have a situation where
+        // we ingest the data, delete/move the remote file, and then NiFi dies/is shut down before the session is committed. This would
+        // result in data loss! If we commit the session first, we are safe.
+        session.commit();
+
+        final String completionStrategy = context.getProperty(COMPLETION_STRATEGY).getValue();
+        if (COMPLETION_DELETE.getValue().equalsIgnoreCase(completionStrategy)) {
             try {
                 transfer.deleteFile(null, filename);
             } catch (final FileNotFoundException e) {
                 // file doesn't exist -- effectively the same as removing it. Move on.
             } catch (final IOException ioe) {
-                getLogger().warn("Successfully fetched the content for {} from {}:{}{} but failed to remove the remote file due to {}", new Object[] {flowFile, host, port, filename, ioe}, ioe);
+                getLogger().warn("Successfully fetched the content for {} from {}:{}{} but failed to remove the remote file due to {}",
+                    new Object[] {flowFile, host, port, filename, ioe}, ioe);
+            }
+        } else if (COMPLETION_MOVE.getValue().equalsIgnoreCase(completionStrategy)) {
+            String targetDir = context.getProperty(MOVE_DESTINATION_DIR).evaluateAttributeExpressions(flowFile).getValue();
+            if (!targetDir.endsWith("/")) {
+                targetDir = targetDir + "/";
+            }
+            final String simpleFilename = StringUtils.substringAfterLast(filename, "/");
+            final String target = targetDir + simpleFilename;
+
+            try {
+                transfer.rename(filename, target);
+            } catch (final IOException ioe) {
+                getLogger().warn("Successfully fetched the content for {} from {}:{}{} but failed to rename the remote file due to {}",
+                    new Object[] {flowFile, host, port, filename, ioe}, ioe);
             }
         }
     }

http://git-wip-us.apache.org/repos/asf/nifi/blob/b0322d9f/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/FetchSFTP.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/FetchSFTP.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/FetchSFTP.java
index 6387e19..ad81c83 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/FetchSFTP.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/FetchSFTP.java
@@ -22,7 +22,6 @@ import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
 
-import org.apache.nifi.annotation.behavior.SupportsBatching;
 import org.apache.nifi.annotation.behavior.WritesAttribute;
 import org.apache.nifi.annotation.behavior.WritesAttributes;
 import org.apache.nifi.annotation.documentation.CapabilityDescription;
@@ -35,8 +34,7 @@ import org.apache.nifi.processor.ProcessContext;
 import org.apache.nifi.processors.standard.util.FileTransfer;
 import org.apache.nifi.processors.standard.util.SFTPTransfer;
 
-
-@SupportsBatching
+// Note that we do not use @SupportsBatching annotation. This processor cannot support batching because it must ensure that session commits happen before remote files are deleted.
 @Tags({"sftp", "get", "retrieve", "files", "fetch", "remote", "ingest", "source", "input"})
 @CapabilityDescription("Fetches the content of a file from a remote SFTP server and overwrites the contents of an incoming FlowFile with the content of the remote file.")
 @SeeAlso({GetSFTP.class, PutSFTP.class, GetFTP.class, PutFTP.class})
@@ -50,15 +48,18 @@ public class FetchSFTP extends FetchFileTransfer {
 
     @Override
     protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        final PropertyDescriptor port = new PropertyDescriptor.Builder().fromPropertyDescriptor(UNDEFAULTED_PORT).defaultValue("22").build();
+
         final List<PropertyDescriptor> properties = new ArrayList<>();
-        properties.add(FetchFileTransfer.HOSTNAME);
-        properties.add(SFTPTransfer.PORT);
+        properties.add(HOSTNAME);
+        properties.add(port);
         properties.add(SFTPTransfer.USERNAME);
         properties.add(SFTPTransfer.PASSWORD);
         properties.add(SFTPTransfer.PRIVATE_KEY_PATH);
         properties.add(SFTPTransfer.PRIVATE_KEY_PASSPHRASE);
-        properties.add(FetchFileTransfer.REMOTE_FILENAME);
-        properties.add(SFTPTransfer.DELETE_ORIGINAL);
+        properties.add(REMOTE_FILENAME);
+        properties.add(COMPLETION_STRATEGY);
+        properties.add(MOVE_DESTINATION_DIR);
         properties.add(SFTPTransfer.CONNECTION_TIMEOUT);
         properties.add(SFTPTransfer.DATA_TIMEOUT);
         properties.add(SFTPTransfer.USE_KEEPALIVE_ON_TIMEOUT);

http://git-wip-us.apache.org/repos/asf/nifi/blob/b0322d9f/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListFileTransfer.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListFileTransfer.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListFileTransfer.java
index d6e1cd1..1176fd0 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListFileTransfer.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListFileTransfer.java
@@ -38,6 +38,13 @@ public abstract class ListFileTransfer extends AbstractListProcessor<FileInfo> {
         .required(true)
         .expressionLanguageSupported(true)
         .build();
+    static final PropertyDescriptor UNDEFAULTED_PORT = new PropertyDescriptor.Builder()
+        .name("Port")
+        .description("The port to connect to on the remote host to fetch the data from")
+        .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+        .expressionLanguageSupported(true)
+        .required(true)
+        .build();
     public static final PropertyDescriptor REMOTE_PATH = new PropertyDescriptor.Builder()
         .name("Remote Path")
         .description("The path on the remote system from which to pull or push files")
@@ -52,6 +59,7 @@ public abstract class ListFileTransfer extends AbstractListProcessor<FileInfo> {
     protected Map<String, String> createAttributes(final FileInfo fileInfo, final ProcessContext context) {
         final Map<String, String> attributes = new HashMap<>();
         attributes.put(getProtocolName() + ".remote.host", context.getProperty(HOSTNAME).evaluateAttributeExpressions().getValue());
+        attributes.put(getProtocolName() + ".remote.port", context.getProperty(UNDEFAULTED_PORT).evaluateAttributeExpressions().getValue());
         attributes.put("file.owner", fileInfo.getOwner());
         attributes.put("file.group", fileInfo.getGroup());
         attributes.put("file.permissions", fileInfo.getPermissions());

http://git-wip-us.apache.org/repos/asf/nifi/blob/b0322d9f/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListSFTP.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListSFTP.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListSFTP.java
index 3b6b69e..925e5f8 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListSFTP.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListSFTP.java
@@ -38,6 +38,7 @@ import org.apache.nifi.processors.standard.util.SFTPTransfer;
 @SeeAlso({FetchSFTP.class, GetSFTP.class, PutSFTP.class})
 @WritesAttributes({
     @WritesAttribute(attribute = "sftp.remote.host", description = "The hostname of the SFTP Server"),
+    @WritesAttribute(attribute = "sftp.remote.port", description = "The port that was connected to on the SFTP Server"),
     @WritesAttribute(attribute = "file.owner", description = "The numeric owner id of the source file"),
     @WritesAttribute(attribute = "file.group", description = "The numeric group id of the source file"),
     @WritesAttribute(attribute = "file.permissions", description = "The read/write/execute permissions of the source file"),
@@ -48,9 +49,11 @@ public class ListSFTP extends ListFileTransfer {
 
     @Override
     protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        final PropertyDescriptor port = new PropertyDescriptor.Builder().fromPropertyDescriptor(UNDEFAULTED_PORT).defaultValue("22").build();
+
         final List<PropertyDescriptor> properties = new ArrayList<>();
-        properties.add(SFTPTransfer.HOSTNAME);
-        properties.add(SFTPTransfer.PORT);
+        properties.add(HOSTNAME);
+        properties.add(port);
         properties.add(SFTPTransfer.USERNAME);
         properties.add(SFTPTransfer.PASSWORD);
         properties.add(SFTPTransfer.PRIVATE_KEY_PATH);

http://git-wip-us.apache.org/repos/asf/nifi/blob/b0322d9f/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/EntityListing.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/EntityListing.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/EntityListing.java
index 56489f0..2d9525f 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/EntityListing.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/EntityListing.java
@@ -61,7 +61,7 @@ public class EntityListing {
     /**
      * Sets the Collection containing the identifiers of all entities in the listing whose Timestamp was
      * equal to {@link #getLatestTimestamp()}
-     * 
+     *
      * @param matchingIdentifiers the identifiers that have last modified date matching the latest timestamp
      */
     public void setMatchingIdentifiers(Collection<String> matchingIdentifiers) {

http://git-wip-us.apache.org/repos/asf/nifi/blob/b0322d9f/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/FTPTransfer.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/FTPTransfer.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/FTPTransfer.java
index 7f659d4..a038eb7 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/FTPTransfer.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/FTPTransfer.java
@@ -135,7 +135,7 @@ public class FTPTransfer implements FileTransfer {
                 client.disconnect();
             }
         } catch (final Exception ex) {
-            logger.warn("Failed to close FTPClient due to {}", new Object[] { ex.toString() }, ex);
+            logger.warn("Failed to close FTPClient due to {}", new Object[] {ex.toString()}, ex);
         }
         client = null;
     }
@@ -334,9 +334,9 @@ public class FTPTransfer implements FileTransfer {
         final boolean cdSuccessful = setWorkingDirectory(remoteDirectory);
 
         if (!cdSuccessful) {
-            logger.debug("Remote Directory {} does not exist; creating it", new Object[] { remoteDirectory });
+            logger.debug("Remote Directory {} does not exist; creating it", new Object[] {remoteDirectory});
             if (client.makeDirectory(remoteDirectory)) {
-                logger.debug("Created {}", new Object[] { remoteDirectory });
+                logger.debug("Created {}", new Object[] {remoteDirectory});
             } else {
                 throw new IOException("Failed to create remote directory " + remoteDirectory);
             }
@@ -392,10 +392,10 @@ public class FTPTransfer implements FileTransfer {
                 final String time = outformat.format(fileModifyTime);
                 if (!client.setModificationTime(tempFilename, time)) {
                     // FTP server probably doesn't support MFMT command
-                    logger.warn("Could not set lastModifiedTime on {} to {}", new Object[] { flowFile, lastModifiedTime });
+                    logger.warn("Could not set lastModifiedTime on {} to {}", new Object[] {flowFile, lastModifiedTime});
                 }
             } catch (final Exception e) {
-                logger.error("Failed to set lastModifiedTime on {} to {} due to {}", new Object[] { flowFile, lastModifiedTime, e });
+                logger.error("Failed to set lastModifiedTime on {} to {} due to {}", new Object[] {flowFile, lastModifiedTime, e});
             }
         }
         final String permissions = ctx.getProperty(PERMISSIONS).evaluateAttributeExpressions(flowFile).getValue();
@@ -404,17 +404,17 @@ public class FTPTransfer implements FileTransfer {
                 int perms = numberPermissions(permissions);
                 if (perms >= 0) {
                     if (!client.sendSiteCommand("chmod " + Integer.toOctalString(perms) + " " + tempFilename)) {
-                        logger.warn("Could not set permission on {} to {}", new Object[] { flowFile, permissions });
+                        logger.warn("Could not set permission on {} to {}", new Object[] {flowFile, permissions});
                     }
                 }
             } catch (final Exception e) {
-                logger.error("Failed to set permission on {} to {} due to {}", new Object[] { flowFile, permissions, e });
+                logger.error("Failed to set permission on {} to {} due to {}", new Object[] {flowFile, permissions, e});
             }
         }
 
         if (!filename.equals(tempFilename)) {
             try {
-                logger.debug("Renaming remote path from {} to {} for {}", new Object[] { tempFilename, filename, flowFile });
+                logger.debug("Renaming remote path from {} to {} for {}", new Object[] {tempFilename, filename, flowFile});
                 final boolean renameSuccessful = client.rename(tempFilename, filename);
                 if (!renameSuccessful) {
                     throw new IOException("Failed to rename temporary file " + tempFilename + " to " + fullPath + " due to: " + client.getReplyString());
@@ -432,6 +432,16 @@ public class FTPTransfer implements FileTransfer {
         return fullPath;
     }
 
+
+    @Override
+    public void rename(final String source, final String target) throws IOException {
+        final FTPClient client = getClient(null);
+        final boolean renameSuccessful = client.rename(source, target);
+        if (!renameSuccessful) {
+            throw new IOException("Failed to rename temporary file " + source + " to " + target + " due to: " + client.getReplyString());
+        }
+    }
+
     @Override
     public void deleteFile(final String path, final String remoteFileName) throws IOException {
         final FTPClient client = getClient(null);

http://git-wip-us.apache.org/repos/asf/nifi/blob/b0322d9f/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/FileTransfer.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/FileTransfer.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/FileTransfer.java
index fe277df..8d48de2 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/FileTransfer.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/FileTransfer.java
@@ -43,6 +43,8 @@ public interface FileTransfer extends Closeable {
 
     String put(FlowFile flowFile, String path, String filename, InputStream content) throws IOException;
 
+    void rename(String source, String target) throws IOException;
+
     void deleteFile(String path, String remoteFileName) throws IOException;
 
     void deleteDirectory(String remoteDirectoryName) throws IOException;

http://git-wip-us.apache.org/repos/asf/nifi/blob/b0322d9f/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/SFTPTransfer.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/SFTPTransfer.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/SFTPTransfer.java
index c28f275..9bad520 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/SFTPTransfer.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/SFTPTransfer.java
@@ -603,6 +603,23 @@ public class SFTPTransfer implements FileTransfer {
         return fullPath;
     }
 
+    @Override
+    public void rename(final String source, final String target) throws IOException {
+        final ChannelSftp sftp = getChannel(null);
+        try {
+            sftp.rename(source, target);
+        } catch (final SftpException e) {
+            switch (e.id) {
+                case ChannelSftp.SSH_FX_NO_SUCH_FILE:
+                    throw new FileNotFoundException();
+                case ChannelSftp.SSH_FX_PERMISSION_DENIED:
+                    throw new PermissionDeniedException("Could not rename remote file " + source + " to " + target + " due to insufficient permissions");
+                default:
+                    throw new IOException(e);
+            }
+        }
+    }
+
     protected int numberPermissions(String perms) {
         int number = -1;
         final Pattern rwxPattern = Pattern.compile("^[rwx-]{9}$");

http://git-wip-us.apache.org/repos/asf/nifi/blob/b0322d9f/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestFetchFileTransfer.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestFetchFileTransfer.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestFetchFileTransfer.java
index 7aa8f9c..4175a77 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestFetchFileTransfer.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestFetchFileTransfer.java
@@ -17,7 +17,9 @@
 
 package org.apache.nifi.processors.standard;
 
+import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
 
 import java.io.ByteArrayInputStream;
 import java.io.File;
@@ -92,8 +94,119 @@ public class TestFetchFileTransfer {
         runner.assertAllFlowFilesTransferred(FetchFileTransfer.REL_PERMISSION_DENIED, 1);
     }
 
+
+    @Test
+    public void testMoveFileWithNoTrailingSlashDirName() {
+        final TestableFetchFileTransfer proc = new TestableFetchFileTransfer();
+        final TestRunner runner = TestRunners.newTestRunner(proc);
+        runner.setProperty(FetchFileTransfer.HOSTNAME, "localhost");
+        runner.setProperty(FetchFileTransfer.UNDEFAULTED_PORT, "11");
+        runner.setProperty(FetchFileTransfer.REMOTE_FILENAME, "${filename}");
+        runner.setProperty(FetchFileTransfer.COMPLETION_STRATEGY, FetchFileTransfer.COMPLETION_MOVE.getValue());
+        runner.setProperty(FetchFileTransfer.MOVE_DESTINATION_DIR, "/moved");
+
+        proc.addContent("hello.txt", "world".getBytes());
+        final Map<String, String> attrs = new HashMap<>();
+        attrs.put("filename", "hello.txt");
+        runner.enqueue(new byte[0], attrs);
+
+        runner.run(1, false, false);
+        runner.assertAllFlowFilesTransferred(FetchFileTransfer.REL_SUCCESS, 1);
+
+        proc.fileContents.containsKey("/moved/hello.txt");
+        assertEquals(1, proc.fileContents.size());
+    }
+
+    @Test
+    public void testMoveFileWithTrailingSlashDirName() {
+        final TestableFetchFileTransfer proc = new TestableFetchFileTransfer();
+        final TestRunner runner = TestRunners.newTestRunner(proc);
+        runner.setProperty(FetchFileTransfer.HOSTNAME, "localhost");
+        runner.setProperty(FetchFileTransfer.UNDEFAULTED_PORT, "11");
+        runner.setProperty(FetchFileTransfer.REMOTE_FILENAME, "${filename}");
+        runner.setProperty(FetchFileTransfer.COMPLETION_STRATEGY, FetchFileTransfer.COMPLETION_MOVE.getValue());
+        runner.setProperty(FetchFileTransfer.MOVE_DESTINATION_DIR, "/moved/");
+
+        proc.addContent("hello.txt", "world".getBytes());
+        final Map<String, String> attrs = new HashMap<>();
+        attrs.put("filename", "hello.txt");
+        runner.enqueue(new byte[0], attrs);
+
+        runner.run(1, false, false);
+        runner.assertAllFlowFilesTransferred(FetchFileTransfer.REL_SUCCESS, 1);
+
+        proc.fileContents.containsKey("/moved/hello.txt");
+        assertEquals(1, proc.fileContents.size());
+    }
+
+    @Test
+    public void testDeleteFile() {
+        final TestableFetchFileTransfer proc = new TestableFetchFileTransfer();
+        final TestRunner runner = TestRunners.newTestRunner(proc);
+        runner.setProperty(FetchFileTransfer.HOSTNAME, "localhost");
+        runner.setProperty(FetchFileTransfer.UNDEFAULTED_PORT, "11");
+        runner.setProperty(FetchFileTransfer.REMOTE_FILENAME, "${filename}");
+        runner.setProperty(FetchFileTransfer.COMPLETION_STRATEGY, FetchFileTransfer.COMPLETION_DELETE.getValue());
+
+        proc.addContent("hello.txt", "world".getBytes());
+        final Map<String, String> attrs = new HashMap<>();
+        attrs.put("filename", "hello.txt");
+        runner.enqueue(new byte[0], attrs);
+
+        runner.run(1, false, false);
+        runner.assertAllFlowFilesTransferred(FetchFileTransfer.REL_SUCCESS, 1);
+        assertTrue(proc.fileContents.isEmpty());
+    }
+
+    @Test
+    public void testDeleteFails() {
+        final TestableFetchFileTransfer proc = new TestableFetchFileTransfer();
+        final TestRunner runner = TestRunners.newTestRunner(proc);
+        runner.setProperty(FetchFileTransfer.HOSTNAME, "localhost");
+        runner.setProperty(FetchFileTransfer.UNDEFAULTED_PORT, "11");
+        runner.setProperty(FetchFileTransfer.REMOTE_FILENAME, "${filename}");
+        runner.setProperty(FetchFileTransfer.COMPLETION_STRATEGY, FetchFileTransfer.COMPLETION_DELETE.getValue());
+
+        proc.addContent("hello.txt", "world".getBytes());
+        final Map<String, String> attrs = new HashMap<>();
+        attrs.put("filename", "hello.txt");
+        runner.enqueue(new byte[0], attrs);
+        proc.allowDelete = false;
+
+        runner.run(1, false, false);
+        runner.assertAllFlowFilesTransferred(FetchFileTransfer.REL_SUCCESS, 1);
+        assertFalse(proc.fileContents.isEmpty());
+    }
+
+    @Test
+    public void testRenameFails() {
+        final TestableFetchFileTransfer proc = new TestableFetchFileTransfer();
+        final TestRunner runner = TestRunners.newTestRunner(proc);
+        runner.setProperty(FetchFileTransfer.HOSTNAME, "localhost");
+        runner.setProperty(FetchFileTransfer.UNDEFAULTED_PORT, "11");
+        runner.setProperty(FetchFileTransfer.REMOTE_FILENAME, "${filename}");
+        runner.setProperty(FetchFileTransfer.COMPLETION_STRATEGY, FetchFileTransfer.COMPLETION_MOVE.getValue());
+        runner.setProperty(FetchFileTransfer.MOVE_DESTINATION_DIR, "/moved/");
+
+        proc.addContent("hello.txt", "world".getBytes());
+        final Map<String, String> attrs = new HashMap<>();
+        attrs.put("filename", "hello.txt");
+        runner.enqueue(new byte[0], attrs);
+        proc.allowDelete = false;
+        proc.allowRename = false;
+
+        runner.run(1, false, false);
+        runner.assertAllFlowFilesTransferred(FetchFileTransfer.REL_SUCCESS, 1);
+        assertEquals(1, proc.fileContents.size());
+
+        assertTrue(proc.fileContents.containsKey("hello.txt"));
+    }
+
+
     private static class TestableFetchFileTransfer extends FetchFileTransfer {
         private boolean allowAccess = true;
+        private boolean allowDelete = true;
+        private boolean allowRename = true;
         private boolean closed = false;
         private final Map<String, byte[]> fileContents = new HashMap<>();
 
@@ -154,6 +267,10 @@ public class TestFetchFileTransfer {
 
                 @Override
                 public void deleteFile(String path, String remoteFileName) throws IOException {
+                    if (!allowDelete) {
+                        throw new PermissionDeniedException("test permission denied");
+                    }
+
                     if (!fileContents.containsKey(remoteFileName)) {
                         throw new FileNotFoundException();
                     }
@@ -162,6 +279,20 @@ public class TestFetchFileTransfer {
                 }
 
                 @Override
+                public void rename(String source, String target) throws IOException {
+                    if (!allowRename) {
+                        throw new PermissionDeniedException("test permission denied");
+                    }
+
+                    if (!fileContents.containsKey(source)) {
+                        throw new FileNotFoundException();
+                    }
+
+                    final byte[] content = fileContents.remove(source);
+                    fileContents.put(target, content);
+                }
+
+                @Override
                 public void deleteDirectory(String remoteDirectoryName) throws IOException {
 
                 }