You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by tu...@apache.org on 2021/02/24 12:20:29 UTC

[nifi] branch main updated: NIFI-6113 Refactoring GetHDFSFileInfo to remove instance level variable to make it stateless.

This is an automated email from the ASF dual-hosted git repository.

turcsanyi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/main by this push:
     new 2322b2c  NIFI-6113 Refactoring GetHDFSFileInfo to remove instance level variable to make it stateless.
2322b2c is described below

commit 2322b2cddf8eac123e2ecb1bf48c7daaca932565
Author: Stan Antyufeev <st...@gmail.com>
AuthorDate: Mon Feb 22 18:46:09 2021 -0600

    NIFI-6113 Refactoring GetHDFSFileInfo to remove instance level variable to make it stateless.
    
    This closes #4837.
    
    Signed-off-by: Peter Turcsanyi <tu...@apache.org>
---
 .../nifi/processors/hadoop/GetHDFSFileInfo.java    | 424 +++++++++++----------
 1 file changed, 217 insertions(+), 207 deletions(-)

diff --git a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/GetHDFSFileInfo.java b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/GetHDFSFileInfo.java
index 56c895b..f2864f0 100644
--- a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/GetHDFSFileInfo.java
+++ b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/GetHDFSFileInfo.java
@@ -25,11 +25,9 @@ import java.util.HashSet;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
-import java.util.Optional;
 import java.util.Set;
 import java.util.regex.Pattern;
 
-import org.apache.commons.lang3.StringUtils;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -57,7 +55,10 @@ import org.apache.nifi.processor.ProcessorInitializationContext;
 import org.apache.nifi.processor.Relationship;
 import org.apache.nifi.processor.exception.ProcessException;
 import org.apache.nifi.processor.util.StandardValidators;
-import org.apache.nifi.processors.hadoop.GetHDFSFileInfo.HDFSFileInfoRequest.Groupping;
+
+import static org.apache.nifi.processors.hadoop.GetHDFSFileInfo.HDFSFileInfoRequest.Grouping.ALL;
+import static org.apache.nifi.processors.hadoop.GetHDFSFileInfo.HDFSFileInfoRequest.Grouping.DIR;
+import static org.apache.nifi.processors.hadoop.GetHDFSFileInfo.HDFSFileInfoRequest.Grouping.NONE;
 
 @InputRequirement(Requirement.INPUT_ALLOWED)
 @Tags({"hadoop", "HCFS", "HDFS", "get", "list", "ingest", "source", "filesystem"})
@@ -65,31 +66,31 @@ import org.apache.nifi.processors.hadoop.GetHDFSFileInfo.HDFSFileInfoRequest.Gro
         + "This processor creates a FlowFile(s) that represents the HDFS file/dir with relevant information. "
         + "Main purpose of this processor to provide functionality similar to HDFS Client, i.e. count, du, ls, test, etc. "
         + "Unlike ListHDFS, this processor is stateless, supports incoming connections and provides information on a dir level. "
-        )
+)
 @WritesAttributes({
-    @WritesAttribute(attribute="hdfs.objectName", description="The name of the file/dir found on HDFS."),
-    @WritesAttribute(attribute="hdfs.path", description="The path is set to the absolute path of the object's parent directory on HDFS. "
-            + "For example, if an object is a directory 'foo', under directory '/bar' then 'hdfs.objectName' will have value 'foo', and 'hdfs.path' will be '/bar'"),
-    @WritesAttribute(attribute="hdfs.type", description="The type of an object. Possible values: directory, file, link"),
-    @WritesAttribute(attribute="hdfs.owner", description="The user that owns the object in HDFS"),
-    @WritesAttribute(attribute="hdfs.group", description="The group that owns the object in HDFS"),
-    @WritesAttribute(attribute="hdfs.lastModified", description="The timestamp of when the object in HDFS was last modified, as milliseconds since midnight Jan 1, 1970 UTC"),
-    @WritesAttribute(attribute="hdfs.length", description=""
-            + "In case of files: The number of bytes in the file in HDFS.  "
-            + "In case of dirs: Retuns storage space consumed by directory. "
-            + ""),
-    @WritesAttribute(attribute="hdfs.count.files", description="In case of type='directory' will represent total count of files under this dir. "
-            + "Won't be populated to other types of HDFS objects. "),
-    @WritesAttribute(attribute="hdfs.count.dirs", description="In case of type='directory' will represent total count of directories under this dir (including itself). "
-            + "Won't be populated to other types of HDFS objects. "),
-    @WritesAttribute(attribute="hdfs.replication", description="The number of HDFS replicas for the file"),
-    @WritesAttribute(attribute="hdfs.permissions", description="The permissions for the object in HDFS. This is formatted as 3 characters for the owner, "
-            + "3 for the group, and 3 for other users. For example rw-rw-r--"),
-    @WritesAttribute(attribute="hdfs.status", description="The status contains comma separated list of file/dir paths, which couldn't be listed/accessed. "
-            + "Status won't be set if no errors occured."),
-    @WritesAttribute(attribute="hdfs.full.tree", description="When destination is 'attribute', will be populated with full tree of HDFS directory in JSON format."
-            + "WARNING: In case when scan finds thousands or millions of objects, having huge values in attribute could impact flow file repo and GC/heap usage. "
-            + "Use content destination for such cases")
+        @WritesAttribute(attribute = "hdfs.objectName", description = "The name of the file/dir found on HDFS."),
+        @WritesAttribute(attribute = "hdfs.path", description = "The path is set to the absolute path of the object's parent directory on HDFS. "
+                + "For example, if an object is a directory 'foo', under directory '/bar' then 'hdfs.objectName' will have value 'foo', and 'hdfs.path' will be '/bar'"),
+        @WritesAttribute(attribute = "hdfs.type", description = "The type of an object. Possible values: directory, file, link"),
+        @WritesAttribute(attribute = "hdfs.owner", description = "The user that owns the object in HDFS"),
+        @WritesAttribute(attribute = "hdfs.group", description = "The group that owns the object in HDFS"),
+        @WritesAttribute(attribute = "hdfs.lastModified", description = "The timestamp of when the object in HDFS was last modified, as milliseconds since midnight Jan 1, 1970 UTC"),
+        @WritesAttribute(attribute = "hdfs.length", description = ""
+                + "In case of files: The number of bytes in the file in HDFS.  "
+                + "In case of dirs: Retuns storage space consumed by directory. "
+                + ""),
+        @WritesAttribute(attribute = "hdfs.count.files", description = "In case of type='directory' will represent total count of files under this dir. "
+                + "Won't be populated to other types of HDFS objects. "),
+        @WritesAttribute(attribute = "hdfs.count.dirs", description = "In case of type='directory' will represent total count of directories under this dir (including itself). "
+                + "Won't be populated to other types of HDFS objects. "),
+        @WritesAttribute(attribute = "hdfs.replication", description = "The number of HDFS replicas for the file"),
+        @WritesAttribute(attribute = "hdfs.permissions", description = "The permissions for the object in HDFS. This is formatted as 3 characters for the owner, "
+                + "3 for the group, and 3 for other users. For example rw-rw-r--"),
+        @WritesAttribute(attribute = "hdfs.status", description = "The status contains comma separated list of file/dir paths, which couldn't be listed/accessed. "
+                + "Status won't be set if no errors occured."),
+        @WritesAttribute(attribute = "hdfs.full.tree", description = "When destination is 'attribute', will be populated with full tree of HDFS directory in JSON format."
+                + "WARNING: In case when scan finds thousands or millions of objects, having huge values in attribute could impact flow file repo and GC/heap usage. "
+                + "Use content destination for such cases")
 })
 @SeeAlso({ListHDFS.class, GetHDFS.class, FetchHDFS.class, PutHDFS.class})
 public class GetHDFSFileInfo extends AbstractHadoopProcessor {
@@ -166,7 +167,7 @@ public class GetHDFSFileInfo extends AbstractHadoopProcessor {
 
     static final AllowableValue GROUP_PARENT_DIR = new AllowableValue("gethdfsfileinfo-group-parent-dir", "Parent Directory",
             "Group HDFS objects by their parent directories only. Processor will generate flowfile for each directory (if recursive). "
-            + "If 'Recurse Subdirectories' property set to 'false', then will have the same effect as 'All'");
+                    + "If 'Recurse Subdirectories' property set to 'false', then will have the same effect as 'All'");
 
     static final AllowableValue GROUP_NONE = new AllowableValue("gethdfsfileinfo-group-none", "None",
             "Don't group results. Generate flowfile per each HDFS object.");
@@ -185,15 +186,15 @@ public class GetHDFSFileInfo extends AbstractHadoopProcessor {
             .displayName("Batch Size")
             .name("gethdfsfileinfo-batch-size")
             .description("Number of records to put into an output flowfile when 'Destination' is set to 'Content'"
-                + " and 'Group Results' is set to 'None'")
+                    + " and 'Group Results' is set to 'None'")
             .required(false)
             .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
             .build();
 
     static final AllowableValue DESTINATION_ATTRIBUTES = new AllowableValue("gethdfsfileinfo-dest-attr", "Attributes",
             "Details of given HDFS object will be stored in attributes of flowfile. "
-            + "WARNING: In case when scan finds thousands or millions of objects, having huge values in attribute could impact flow file repo and GC/heap usage. "
-            + "Use content destination for such cases.");
+                    + "WARNING: In case when scan finds thousands or millions of objects, having huge values in attribute could impact flow file repo and GC/heap usage. "
+                    + "Use content destination for such cases.");
 
     static final AllowableValue DESTINATION_CONTENT = new AllowableValue("gethdfsfileinfo-dest-content", "Content",
             "Details of given HDFS object will be stored in a content in JSON format");
@@ -228,8 +229,6 @@ public class GetHDFSFileInfo extends AbstractHadoopProcessor {
             .description("All failed attempts to access HDFS will be routed to this relationship")
             .build();
 
-    private HDFSFileInfoRequest req;
-
     @Override
     protected void init(final ProcessorInitializationContext context) {
         super.init(context);
@@ -262,13 +261,6 @@ public class GetHDFSFileInfo extends AbstractHadoopProcessor {
     }
 
     @Override
-    public void onPropertyModified(PropertyDescriptor descriptor, String oldValue, String newValue) {
-        super.onPropertyModified(descriptor, oldValue, newValue);
-        // drop request details to rebuild it
-        req = null;
-    }
-
-    @Override
     protected Collection<ValidationResult> customValidate(ValidationContext validationContext) {
         final List<ValidationResult> validationResults = new ArrayList<>(super.customValidate(validationContext));
 
@@ -277,16 +269,16 @@ public class GetHDFSFileInfo extends AbstractHadoopProcessor {
         String batchSize = validationContext.getProperty(BATCH_SIZE).getValue();
 
         if (
-            (!DESTINATION_CONTENT.getValue().equals(destination) || !GROUP_NONE.getValue().equals(grouping))
-                && batchSize != null
+                (!DESTINATION_CONTENT.getValue().equals(destination) || !GROUP_NONE.getValue().equals(grouping))
+                        && batchSize != null
         ) {
             validationResults.add(new ValidationResult.Builder()
-                .valid(false)
-                .subject(BATCH_SIZE.getDisplayName())
-                .explanation("'" + BATCH_SIZE.getDisplayName() + "' is applicable only when " +
-                    "'" + DESTINATION.getDisplayName() + "'='" + DESTINATION_CONTENT.getDisplayName() + "' and " +
-                    "'" + GROUPING.getDisplayName() + "'='" + GROUP_NONE.getDisplayName() + "'")
-                .build());
+                    .valid(false)
+                    .subject(BATCH_SIZE.getDisplayName())
+                    .explanation("'" + BATCH_SIZE.getDisplayName() + "' is applicable only when " +
+                            "'" + DESTINATION.getDisplayName() + "'='" + DESTINATION_CONTENT.getDisplayName() + "' and " +
+                            "'" + GROUPING.getDisplayName() + "'='" + GROUP_NONE.getDisplayName() + "'")
+                    .build());
         }
 
         return validationResults;
@@ -311,20 +303,13 @@ public class GetHDFSFileInfo extends AbstractHadoopProcessor {
             ff = session.create();
             scheduledFF = true;
         }
-
-        if (req == null) {
-            //rebuild the request details based on
-            req = buildRequestDetails(context, session, ff);
-        }else {
-            //avoid rebuilding req object's patterns in order to have better performance
-            req = updateRequestDetails(context, session, ff);
-        }
+        HDFSFileInfoRequest req = buildRequestDetails(context, ff);
 
         try {
             final FileSystem hdfs = getFileSystem();
             UserGroupInformation ugi = getUserGroupInformation();
             ExecutionContext executionContext = new ExecutionContext();
-            HDFSObjectInfoDetails res = walkHDFSTree(context, session, executionContext, ff, hdfs, ugi, req, null, false);
+            HDFSObjectInfoDetails res = walkHDFSTree(session, executionContext, ff, hdfs, ugi, req, null, false);
             executionContext.finish(session);
             if (res == null) {
                 ff = session.putAttribute(ff, "hdfs.status", "Path not found: " + req.fullPath);
@@ -333,25 +318,18 @@ public class GetHDFSFileInfo extends AbstractHadoopProcessor {
             }
             if (!scheduledFF) {
                 session.transfer(ff, REL_ORIGINAL);
-            }else {
+            } else {
                 session.remove(ff);
             }
-        } catch (final IOException | IllegalArgumentException e) {
-            getLogger().error("Failed to perform listing of HDFS due to {}", new Object[] {e});
-            ff = session.putAttribute(ff, "hdfs.status", "Failed due to: " + e);
-            session.transfer(ff, REL_FAILURE);
-            return;
         } catch (final InterruptedException e) {
             Thread.currentThread().interrupt();
             getLogger().error("Interrupted while performing listing of HDFS", e);
             ff = session.putAttribute(ff, "hdfs.status", "Failed due to: " + e);
             session.transfer(ff, REL_FAILURE);
-            return;
         } catch (final Exception e) {
-            getLogger().error("Failed to perform listing of HDFS due to {}", new Object[] {e});
+            getLogger().error("Failed to perform listing of HDFS due to {}", new Object[]{e});
             ff = session.putAttribute(ff, "hdfs.status", "Failed due to: " + e);
             session.transfer(ff, REL_FAILURE);
-            return;
         }
     }
 
@@ -359,23 +337,23 @@ public class GetHDFSFileInfo extends AbstractHadoopProcessor {
     /*
      * Walks thru HDFS tree. This method will return null to the main if there is no provided path existing.
      */
-    protected HDFSObjectInfoDetails walkHDFSTree(final ProcessContext context, final ProcessSession session, ExecutionContext executionContext,
-             FlowFile origFF, final FileSystem hdfs, final UserGroupInformation ugi, final HDFSFileInfoRequest req, HDFSObjectInfoDetails parent,
-             final boolean statsOnly
-    ) throws Exception{
+    protected HDFSObjectInfoDetails walkHDFSTree(final ProcessSession session, ExecutionContext executionContext,
+                                                 FlowFile origFF, final FileSystem hdfs, final UserGroupInformation ugi,
+                                                 final HDFSFileInfoRequest req, HDFSObjectInfoDetails parent, final boolean statsOnly
+    ) throws Exception {
 
         final HDFSObjectInfoDetails p = parent;
 
-        if (!ugi.doAs((PrivilegedExceptionAction<Boolean>) () -> hdfs.exists(p != null ? p.getPath() : new Path(req.fullPath)))) {
-                return null;
+        if (!ugi.doAs((PrivilegedExceptionAction<Boolean>) () -> hdfs.exists(p != null ? p.getPath() : new Path(req.getFullPath())))) {
+            return null;
         }
 
         if (parent == null) {
-            parent = new HDFSObjectInfoDetails(ugi.doAs((PrivilegedExceptionAction<FileStatus>) () -> hdfs.getFileStatus(new Path(req.fullPath))));
+            parent = new HDFSObjectInfoDetails(ugi.doAs((PrivilegedExceptionAction<FileStatus>) () -> hdfs.getFileStatus(new Path(req.getFullPath()))));
         }
         if (parent.isFile() && p == null) {
             //single file path requested and found, lets send to output:
-            processHDFSObject(context, session, executionContext, origFF, req, parent, true);
+            processHDFSObject(session, executionContext, origFF, req, parent, true);
             return parent;
         }
 
@@ -384,26 +362,26 @@ public class GetHDFSFileInfo extends AbstractHadoopProcessor {
         FileStatus[] listFSt = null;
         try {
             listFSt = ugi.doAs((PrivilegedExceptionAction<FileStatus[]>) () -> hdfs.listStatus(path));
-        }catch (IOException e) {
+        } catch (IOException e) {
             parent.error = "Couldn't list directory: " + e;
-            processHDFSObject(context, session, executionContext, origFF, req, parent, p == null);
+            processHDFSObject(session, executionContext, origFF, req, parent, p == null);
             return parent; //File not found exception, or access denied - don't interrupt, just don't list
         }
         if (listFSt != null) {
             for (FileStatus f : listFSt) {
                 HDFSObjectInfoDetails o = new HDFSObjectInfoDetails(f);
                 HDFSObjectInfoDetails vo = validateMatchingPatterns(o, req);
-                if (o.isDirectory() && !o.isSymlink() && req.isRecursive) {
-                    o = walkHDFSTree(context, session, executionContext, origFF, hdfs, ugi, req, o, vo == null || statsOnly);
+                if (o.isDirectory() && !o.isSymlink() && req.isRecursive()) {
+                    o = walkHDFSTree(session, executionContext, origFF, hdfs, ugi, req, o, vo == null || statsOnly);
                     parent.countDirs += o.countDirs;
                     parent.totalLen += o.totalLen;
                     parent.countFiles += o.countFiles;
-                }else if (o.isDirectory() && o.isSymlink()) {
+                } else if (o.isDirectory() && o.isSymlink()) {
                     parent.countDirs += 1;
-                }else if (o.isFile() && !o.isSymlink()) {
+                } else if (o.isFile() && !o.isSymlink()) {
                     parent.countFiles += 1;
                     parent.totalLen += o.getLen();
-                }else if (o.isFile() && o.isSymlink()) {
+                } else if (o.isFile() && o.isSymlink()) {
                     parent.countFiles += 1; // do not add length of the symlink, as it doesn't consume space under THIS directory, but count files, as it is still an object.
                 }
 
@@ -411,14 +389,14 @@ public class GetHDFSFileInfo extends AbstractHadoopProcessor {
                 if (vo != null && !statsOnly) {
                     parent.addChild(vo);
                     if (vo.isFile() && !vo.isSymlink()) {
-                        processHDFSObject(context, session, executionContext, origFF, req, vo, false);
+                        processHDFSObject(session, executionContext, origFF, req, vo, false);
                     }
                 }
             }
             if (!statsOnly) {
-                processHDFSObject(context, session, executionContext, origFF, req, parent, p==null);
+                processHDFSObject(session, executionContext, origFF, req, parent, p == null);
             }
-            if (req.groupping != Groupping.ALL) {
+            if (req.getGrouping() != ALL) {
                 parent.setChildren(null); //we need children in full tree only when single output requested.
             }
         }
@@ -432,27 +410,25 @@ public class GetHDFSFileInfo extends AbstractHadoopProcessor {
         }
 
         if (o.isFile()) {
-            if (req.isIgnoreDotFiles && o.getPath().getName().startsWith(".")) {
+            if (req.isIgnoreDotFiles() && o.getPath().getName().startsWith(".")) {
                 return null;
-            }else if (req.fileExcludeFilter != null && req.fileExcludeFilter.matcher(o.getPath().getName()).matches()) {
+            } else if (req.getFileExcludeFilter() != null && req.getFileExcludeFilter().matcher(o.getPath().getName()).matches()) {
                 return null;
-            }else if (req.fileFilter == null) {
+            } else if (req.getFileFilter() != null && req.getFileFilter().matcher(o.getPath().getName()).matches()) {
                 return o;
-            }else if (req.fileFilter != null && req.fileFilter.matcher(o.getPath().getName()).matches()) {
+            } else if (req.getFileFilter() == null) {
                 return o;
-            }else {
-                return null;
             }
+            return null;
         }
+
         if (o.isDirectory()) {
-            if (req.isIgnoreDotDirs && o.getPath().getName().startsWith(".")) {
+            if (req.isIgnoreDotDirs() && o.getPath().getName().startsWith(".")) {
                 return null;
-            }else if (req.dirFilter == null) {
+            } else if (req.getDirFilter() != null && req.getDirFilter().matcher(o.getPath().getName()).matches()) {
                 return o;
-            }else if (req.dirFilter != null && req.dirFilter.matcher(o.getPath().getName()).matches()) {
+            } else if (req.getDirFilter() == null) {
                 return o;
-            }else {
-                return null;
             }
         }
         return null;
@@ -462,8 +438,7 @@ public class GetHDFSFileInfo extends AbstractHadoopProcessor {
      * Checks whether HDFS object should be sent to output.
      * If it should be sent, new flowfile will be created, its content and attributes will be populated according to other request params.
      */
-    protected HDFSObjectInfoDetails processHDFSObject(
-            final ProcessContext context,
+    protected void processHDFSObject(
             final ProcessSession session,
             final ExecutionContext executionContext,
             FlowFile origFF,
@@ -471,72 +446,70 @@ public class GetHDFSFileInfo extends AbstractHadoopProcessor {
             final HDFSObjectInfoDetails o,
             final boolean isRoot
     ) {
-        if (o.isFile() && req.groupping != Groupping.NONE) {
-            return null; //there is grouping by either root directory or every directory, no need to print separate files.
+        if (o.isFile() && req.getGrouping() != NONE) {
+            return;
         }
-        if (o.isDirectory() && o.isSymlink() && req.groupping != Groupping.NONE) {
-            return null; //ignore symlink dirs an
+        if (o.isDirectory() && o.isSymlink() && req.getGrouping() != NONE) {
+            return;
         }
-        if (o.isDirectory() && req.groupping == Groupping.ALL && !isRoot) {
-            return null;
+        if (o.isDirectory() && req.getGrouping() == ALL && !isRoot) {
+            return;
         }
 
         FlowFile ff = getReadyFlowFile(executionContext, session, origFF);
 
         //if destination type is content - always add mime type
-        if (req.isDestContent) {
+        if (req.isDestContent()) {
             ff = session.putAttribute(ff, CoreAttributes.MIME_TYPE.key(), APPLICATION_JSON);
         }
 
         //won't combine conditions for similar actions for better readability and maintenance.
-        if (o.isFile() && isRoot &&  req.isDestContent) {
+        if (o.isFile() && isRoot && req.isDestContent()) {
             ff = addAsContent(executionContext, session, o, ff);
             // ------------------------------
-        }else if (o.isFile() && isRoot &&  !req.isDestContent) {
+        } else if (o.isFile() && isRoot && !req.isDestContent()) {
             ff = addAsAttributes(session, o, ff);
             // ------------------------------
-        }else if (o.isFile() && req.isDestContent) {
+        } else if (o.isFile() && req.isDestContent()) {
             ff = addAsContent(executionContext, session, o, ff);
             // ------------------------------
-        }else if (o.isFile() && !req.isDestContent) {
+        } else if (o.isFile() && !req.isDestContent()) {
             ff = addAsAttributes(session, o, ff);
             // ------------------------------
-        }else if (o.isDirectory() && o.isSymlink() && req.isDestContent) {
+        } else if (o.isDirectory() && o.isSymlink() && req.isDestContent()) {
             ff = addAsContent(executionContext, session, o, ff);
             // ------------------------------
-        }else if (o.isDirectory() && o.isSymlink() && !req.isDestContent) {
+        } else if (o.isDirectory() && o.isSymlink() && !req.isDestContent()) {
             ff = addAsAttributes(session, o, ff);
             // ------------------------------
-        }else if (o.isDirectory() && req.groupping == Groupping.NONE && req.isDestContent) {
+        } else if (o.isDirectory() && req.getGrouping() == NONE && req.isDestContent()) {
             o.setChildren(null);
             ff = addAsContent(executionContext, session, o, ff);
             // ------------------------------
-        }else if (o.isDirectory() && req.groupping == Groupping.NONE && !req.isDestContent) {
+        } else if (o.isDirectory() && req.getGrouping() == NONE && !req.isDestContent()) {
             ff = addAsAttributes(session, o, ff);
             // ------------------------------
-        }else if (o.isDirectory() && req.groupping == Groupping.DIR && req.isDestContent) {
+        } else if (o.isDirectory() && req.getGrouping() == DIR && req.isDestContent()) {
             ff = addAsContent(executionContext, session, o, ff);
             // ------------------------------
-        }else if (o.isDirectory() && req.groupping == Groupping.DIR && !req.isDestContent) {
+        } else if (o.isDirectory() && req.getGrouping() == DIR && !req.isDestContent()) {
             ff = addAsAttributes(session, o, ff);
             ff = addFullTreeToAttribute(session, o, ff);
             // ------------------------------
-        }else if (o.isDirectory() && req.groupping == Groupping.ALL && req.isDestContent) {
+        } else if (o.isDirectory() && req.getGrouping() == ALL && req.isDestContent()) {
             ff = addAsContent(executionContext, session, o, ff);
             // ------------------------------
-        }else if (o.isDirectory() && req.groupping == Groupping.ALL && !req.isDestContent) {
+        } else if (o.isDirectory() && req.getGrouping() == ALL && !req.isDestContent()) {
             ff = addAsAttributes(session, o, ff);
             ff = addFullTreeToAttribute(session, o, ff);
-        }else {
+        } else {
             getLogger().error("Illegal State!");
             session.remove(ff);
-            return null;
+            return;
         }
 
         executionContext.flowfile = ff;
-        finishProcessing(executionContext, session);
-
-        return o;
+        finishProcessing(req, executionContext, session);
     }
 
     private FlowFile getReadyFlowFile(ExecutionContext executionContext, ProcessSession session, FlowFile origFF) {
@@ -547,13 +520,12 @@ public class GetHDFSFileInfo extends AbstractHadoopProcessor {
         return executionContext.flowfile;
     }
 
-    private void finishProcessing(ExecutionContext executionContext, ProcessSession session) {
+    private void finishProcessing(HDFSFileInfoRequest req, ExecutionContext executionContext, ProcessSession session) {
         executionContext.nrOfWaitingHDFSObjects++;
 
-        if (req.groupping == Groupping.NONE && req.isDestContent && executionContext.nrOfWaitingHDFSObjects < req.batchSize) {
+        if (req.grouping == NONE && req.isDestContent() && executionContext.nrOfWaitingHDFSObjects < req.getBatchSize()) {
             return;
         }
-
         session.transfer(executionContext.flowfile, REL_SUCCESS);
 
         executionContext.reset();
@@ -608,81 +580,39 @@ public class GetHDFSFileInfo extends AbstractHadoopProcessor {
      * Creates internal request object and initialize the fields that won't be changed every call (onTrigger).
      * Dynamic fields will be updated per each call separately.
      */
-    protected HDFSFileInfoRequest buildRequestDetails(ProcessContext context, ProcessSession session, FlowFile ff) {
+    protected HDFSFileInfoRequest buildRequestDetails(ProcessContext context, FlowFile ff) {
         HDFSFileInfoRequest req = new HDFSFileInfoRequest();
-        req.fullPath = context.getProperty(FULL_PATH).evaluateAttributeExpressions(ff).getValue();
-        req.isRecursive = context.getProperty(RECURSE_SUBDIRS).asBoolean();
+        req.setFullPath(context.getProperty(FULL_PATH).evaluateAttributeExpressions(ff).getValue());
+        req.setRecursive(context.getProperty(RECURSE_SUBDIRS).asBoolean());
 
-        PropertyValue pv = null;
-        String v = null;
+        PropertyValue pv;
+        String v;
 
-        if (context.getProperty(DIR_FILTER).isSet() && (pv=context.getProperty(DIR_FILTER).evaluateAttributeExpressions(ff))!=null) {
+        if (context.getProperty(DIR_FILTER).isSet() && (pv = context.getProperty(DIR_FILTER).evaluateAttributeExpressions(ff)) != null) {
             v = pv.getValue();
-            req.dirFilter = v == null ? null : Pattern.compile(v);
+            req.setDirFilter(v == null ? null : Pattern.compile(v));
         }
 
-        if (context.getProperty(FILE_FILTER).isSet() && (pv=context.getProperty(FILE_FILTER).evaluateAttributeExpressions(ff))!=null) {
+        if (context.getProperty(FILE_FILTER).isSet() && (pv = context.getProperty(FILE_FILTER).evaluateAttributeExpressions(ff)) != null) {
             v = pv.getValue();
-            req.fileFilter = v == null ? null : Pattern.compile(v);
+            req.setFileFilter(v == null ? null : Pattern.compile(v));
         }
 
-        if (context.getProperty(FILE_EXCLUDE_FILTER).isSet() && (pv=context.getProperty(FILE_EXCLUDE_FILTER).evaluateAttributeExpressions(ff))!=null) {
+        if (context.getProperty(FILE_EXCLUDE_FILTER).isSet()
+                && (pv = context.getProperty(FILE_EXCLUDE_FILTER).evaluateAttributeExpressions(ff)) != null) {
             v = pv.getValue();
-            req.fileExcludeFilter = v == null ? null : Pattern.compile(v);
+            req.setFileExcludeFilter(v == null ? null : Pattern.compile(v));
         }
 
-        req.isIgnoreDotFiles = context.getProperty(IGNORE_DOTTED_FILES).asBoolean();
-        req.isIgnoreDotDirs = context.getProperty(IGNORE_DOTTED_DIRS).asBoolean();
+        req.setIgnoreDotFiles(context.getProperty(IGNORE_DOTTED_FILES).asBoolean());
+        req.setIgnoreDotDirs(context.getProperty(IGNORE_DOTTED_DIRS).asBoolean());
 
-        req.groupping = HDFSFileInfoRequest.Groupping.getEnum(context.getProperty(GROUPING).getValue());
-        req.batchSize = Optional.ofNullable(context.getProperty(BATCH_SIZE))
-            .filter(propertyValue -> propertyValue.getValue() != null)
-            .map(PropertyValue::asInteger)
-            .orElse(1);
+        req.setGrouping(HDFSFileInfoRequest.Grouping.getEnum(context.getProperty(GROUPING).getValue()));
+        req.setBatchSize(context.getProperty(BATCH_SIZE).asInteger() != null ? context.getProperty(BATCH_SIZE).asInteger() : 1);
 
         v = context.getProperty(DESTINATION).getValue();
-        if (DESTINATION_CONTENT.getValue().equals(v)) {
-            req.isDestContent = true;
-        }else {
-            req.isDestContent = false;
-        }
-
-        return req;
-    }
-
-    /*
-     * Creates internal request object if not created previously, and updates it with dynamic property every time onTrigger is called.
-     * Avoids creating regex Patter objects unless their actual value are changed due to evaluation of EL
-     */
-    protected HDFSFileInfoRequest updateRequestDetails(ProcessContext context, ProcessSession session, FlowFile ff) {
-
-        if (req == null) {
-            return buildRequestDetails(context, session, ff);
-        }
-        req.fullPath = context.getProperty(FULL_PATH).evaluateAttributeExpressions(ff).getValue();
-
-        String currValue = null;
-        String oldValue = null;
-
-        currValue = context.getProperty(DIR_FILTER).evaluateAttributeExpressions(ff).getValue();
-        oldValue = req.dirFilter == null ? null : req.dirFilter.toString();
-        if (StringUtils.compare(currValue, oldValue) != 0) {
-            req.dirFilter = currValue == null ? null : Pattern.compile(currValue);
-        }
-
-
-        currValue = context.getProperty(FILE_FILTER).evaluateAttributeExpressions(ff).getValue();
-        oldValue = req.fileFilter == null ? null : req.fileFilter.toString();
-        if (StringUtils.compare(currValue, oldValue) != 0) {
-            req.fileFilter = currValue == null ? null : Pattern.compile(currValue);
-        }
 
-
-        currValue = context.getProperty(FILE_EXCLUDE_FILTER).evaluateAttributeExpressions(ff).getValue();
-        oldValue = req.fileExcludeFilter == null ? null : req.fileExcludeFilter.toString();
-        if (StringUtils.compare(currValue, oldValue) != 0) {
-            req.fileExcludeFilter = currValue == null ? null : Pattern.compile(currValue);
-        }
+        req.setDestContent(DESTINATION_CONTENT.getValue().equals(v));
 
         return req;
     }
@@ -707,15 +637,15 @@ public class GetHDFSFileInfo extends AbstractHadoopProcessor {
     /*
      * Keeps all request details in single object.
      */
-    static class HDFSFileInfoRequest{
-        enum Groupping {
+    static class HDFSFileInfoRequest {
+        enum Grouping {
             ALL(GROUP_ALL.getValue()),
             DIR(GROUP_PARENT_DIR.getValue()),
             NONE(GROUP_NONE.getValue());
 
-            private String val;
+            final private String val;
 
-            Groupping(String val){
+            Grouping(String val) {
                 this.val = val;
             }
 
@@ -723,8 +653,8 @@ public class GetHDFSFileInfo extends AbstractHadoopProcessor {
                 return this.val;
             }
 
-            public static Groupping getEnum(String value) {
-                for (Groupping v : values()) {
+            public static Grouping getEnum(String value) {
+                for (Grouping v : values()) {
                     if (v.val.equals(value)) {
                         return v;
                     }
@@ -733,23 +663,103 @@ public class GetHDFSFileInfo extends AbstractHadoopProcessor {
             }
         }
 
-        String fullPath;
-        boolean isRecursive;
-        Pattern dirFilter;
-        Pattern fileFilter;
-        Pattern fileExcludeFilter;
-        boolean isIgnoreDotFiles;
-        boolean isIgnoreDotDirs;
-        boolean isDestContent;
-        Groupping groupping;
-        int batchSize;
+        private String fullPath;
+        private boolean recursive;
+        private Pattern dirFilter;
+        private Pattern fileFilter;
+        private Pattern fileExcludeFilter;
+        private boolean ignoreDotFiles;
+        private boolean ignoreDotDirs;
+        private boolean destContent;
+        private Grouping grouping;
+        private int batchSize;
+
+        String getFullPath() {
+            return fullPath;
+        }
+
+        void setFullPath(String fullPath) {
+            this.fullPath = fullPath;
+        }
+
+        boolean isRecursive() {
+            return this.recursive;
+        }
+
+        void setRecursive(boolean recursive) {
+            this.recursive = recursive;
+        }
+
+        Pattern getDirFilter() {
+            return this.dirFilter;
+        }
+
+        void setDirFilter(Pattern dirFilter) {
+            this.dirFilter = dirFilter;
+        }
+
+        Pattern getFileFilter() {
+            return fileFilter;
+        }
+
+        void setFileFilter(Pattern fileFilter) {
+            this.fileFilter = fileFilter;
+        }
+
+        Pattern getFileExcludeFilter() {
+            return fileExcludeFilter;
+        }
+
+        void setFileExcludeFilter(Pattern fileExcludeFilter) {
+            this.fileExcludeFilter = fileExcludeFilter;
+        }
+
+        boolean isIgnoreDotFiles() {
+            return ignoreDotFiles;
+        }
+
+        void setIgnoreDotFiles(boolean ignoreDotFiles) {
+            this.ignoreDotFiles = ignoreDotFiles;
+        }
+
+        boolean isIgnoreDotDirs() {
+            return this.ignoreDotDirs;
+        }
+
+        void setIgnoreDotDirs(boolean ignoreDotDirs) {
+            this.ignoreDotDirs = ignoreDotDirs;
+        }
+
+        boolean isDestContent() {
+            return this.destContent;
+        }
+
+        void setDestContent(boolean destContent) {
+            this.destContent = destContent;
+        }
+
+        Grouping getGrouping() {
+            return grouping;
+        }
+
+        void setGrouping(Grouping grouping) {
+            this.grouping = grouping;
+        }
+
+        int getBatchSize() {
+            return this.batchSize;
+        }
+
+        void setBatchSize(int batchSize) {
+            this.batchSize = batchSize;
+        }
     }
 
     /*
      * Keeps details of HDFS objects.
      * This class is based on FileStatus and adds additional feature/properties for count, total size of directories, and subtrees/hierarchy of recursive listings.
      */
-    class HDFSObjectInfoDetails extends FileStatus{
+    class HDFSObjectInfoDetails extends FileStatus {
 
         private long countFiles;
         private long countDirs = 1;
@@ -757,7 +767,7 @@ public class GetHDFSFileInfo extends AbstractHadoopProcessor {
         private Collection<HDFSObjectInfoDetails> children = new LinkedList<>();
         private String error;
 
-        HDFSObjectInfoDetails(FileStatus fs) throws IOException{
+        HDFSObjectInfoDetails(FileStatus fs) throws IOException {
             super(fs);
         }
 
@@ -812,17 +822,17 @@ public class GetHDFSFileInfo extends AbstractHadoopProcessor {
                 this.totalLen = 0;
             }
 
-            for(HDFSObjectInfoDetails c : children) {
+            for (HDFSObjectInfoDetails c : children) {
                 if (c.isSymlink()) {
                     continue; //do not count symlinks. they either will be counted under their actual directories, or won't be count if actual location is not under provided root for scan.
-                }else if (c.isDirectory()) {
+                } else if (c.isDirectory()) {
                     if (deepUpdate) {
                         c.updateTotals(deepUpdate);
                     }
                     this.totalLen += c.totalLen;
                     this.countDirs += c.countDirs;
                     this.countFiles += c.countFiles;
-                }else if (c.isFile()) {
+                } else if (c.isFile()) {
                     this.totalLen += c.getLen();
                     this.countFiles++;
                 }
@@ -833,7 +843,7 @@ public class GetHDFSFileInfo extends AbstractHadoopProcessor {
         /*
          * Since, by definition, FF will keep only attributes for parent/single object, we don't need to recurse the children
          */
-        public Map<String, String> toAttributesMap(){
+        public Map<String, String> toAttributesMap() {
             Map<String, String> map = new HashMap<>();
 
             map.put("hdfs.objectName", this.getPath().getName());
@@ -892,14 +902,14 @@ public class GetHDFSFileInfo extends AbstractHadoopProcessor {
                 for (HDFSObjectInfoDetails c : this.getChildren()) {
                     c.toJsonString(sb).append(",");
                 }
-                sb.deleteCharAt(sb.length()-1).append("]");
+                sb.deleteCharAt(sb.length() - 1).append("]");
             }
             sb.append("}");
             return sb;
         }
 
         private StringBuilder appendProperty(StringBuilder sb, String name, String value) {
-            return sb.append("\"").append(name).append("\":\"").append(value == null? "": value).append("\"");
+            return sb.append("\"").append(name).append("\":\"").append(value == null ? "" : value).append("\"");
         }
     }
 }