You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by pw...@apache.org on 2019/04/22 16:12:23 UTC

[nifi] branch master updated: NIFI-6049: Updated MoveHDFS to support FF Attribute for Output Directory (#3433)

This is an automated email from the ASF dual-hosted git repository.

pwicks pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/master by this push:
     new 94c2b1e  NIFI-6049: Updated MoveHDFS to support FF Attribute for Output Directory (#3433)
94c2b1e is described below

commit 94c2b1e76ebab33b6417fb46ce046e59c51fcdca
Author: Sivaprasanna <si...@gmail.com>
AuthorDate: Mon Apr 22 21:42:16 2019 +0530

    NIFI-6049: Updated MoveHDFS to support FF Attribute for Output Directory (#3433)
    
    NIFI-6049: Updated MoveHDFS to support FF Attribute for Output Directory
---
 .../java/org/apache/nifi/processors/hadoop/MoveHDFS.java | 16 +++++-----------
 1 file changed, 5 insertions(+), 11 deletions(-)

diff --git a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/MoveHDFS.java b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/MoveHDFS.java
index a2292f5..95bcdeb 100644
--- a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/MoveHDFS.java
+++ b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/MoveHDFS.java
@@ -152,7 +152,7 @@ public class MoveHDFS extends AbstractHadoopProcessor {
             .description("The HDFS directory where the files will be moved to")
             .required(true)
             .addValidator(StandardValidators.ATTRIBUTE_EXPRESSION_LANGUAGE_VALIDATOR)
-            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
             .build();
 
     public static final PropertyDescriptor OPERATION = new PropertyDescriptor.Builder()
@@ -243,7 +243,7 @@ public class MoveHDFS extends AbstractHadoopProcessor {
         final FileSystem hdfs = getFileSystem();
         final String filenameValue = context.getProperty(INPUT_DIRECTORY_OR_FILE).evaluateAttributeExpressions(flowFile).getValue();
 
-        Path inputPath = null;
+        Path inputPath;
         try {
             inputPath = new Path(filenameValue);
             if (!hdfs.exists(inputPath)) {
@@ -257,7 +257,7 @@ public class MoveHDFS extends AbstractHadoopProcessor {
             return;
         }
 
-        List<Path> files = new ArrayList<Path>();
+        List<Path> files = new ArrayList<>();
 
         try {
             final StopWatch stopWatch = new StopWatch(true);
@@ -348,7 +348,8 @@ public class MoveHDFS extends AbstractHadoopProcessor {
                     FlowFile flowFile = session.create(parentFlowFile);
                     try {
                         final String originalFilename = file.getName();
-                        final Path configuredRootOutputDirPath = processorConfig.getOutputDirectory();
+                        final String outputDirValue = context.getProperty(OUTPUT_DIRECTORY).evaluateAttributeExpressions(parentFlowFile).getValue();
+                        final Path configuredRootOutputDirPath = new Path(outputDirValue);
                         final Path newFile = new Path(configuredRootOutputDirPath, originalFilename);
                         final boolean destinationExists = hdfs.exists(newFile);
                         // If destination file already exists, resolve that
@@ -502,15 +503,12 @@ public class MoveHDFS extends AbstractHadoopProcessor {
 
         final private String conflictResolution;
         final private String operation;
-        final private Path outputRootDirPath;
         final private Pattern fileFilterPattern;
         final private boolean ignoreDottedFiles;
 
         ProcessorConfiguration(final ProcessContext context) {
             conflictResolution = context.getProperty(CONFLICT_RESOLUTION).getValue();
             operation = context.getProperty(OPERATION).getValue();
-            final String outputDirValue = context.getProperty(OUTPUT_DIRECTORY).evaluateAttributeExpressions().getValue();
-            outputRootDirPath = new Path(outputDirValue);
             final String fileFilterRegex = context.getProperty(FILE_FILTER_REGEX).getValue();
             fileFilterPattern = (fileFilterRegex == null) ? null : Pattern.compile(fileFilterRegex);
             ignoreDottedFiles = context.getProperty(IGNORE_DOTTED_FILES).asBoolean();
@@ -524,10 +522,6 @@ public class MoveHDFS extends AbstractHadoopProcessor {
             return conflictResolution;
         }
 
-        public Path getOutputDirectory() {
-            return outputRootDirPath;
-        }
-
         protected PathFilter getPathFilter(final Path dir) {
             return new PathFilter() {