You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ma...@apache.org on 2015/04/29 22:14:24 UTC

[02/13] incubator-nifi git commit: NIFI-533: Fixed code to conform to checkstyle

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/dc7f7a82/nifi/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/GetHDFS.java
----------------------------------------------------------------------
diff --cc nifi/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/GetHDFS.java
index 1dd5b91,361f1ed..f7894d9
--- a/nifi/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/GetHDFS.java
+++ b/nifi/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/GetHDFS.java
@@@ -58,13 -58,18 +58,15 @@@ import org.apache.nifi.processor.except
  import org.apache.nifi.processor.util.StandardValidators;
  import org.apache.nifi.util.StopWatch;
  
 -/**
 - * This processor reads files from HDFS into NiFi FlowFiles.
 - */
  @TriggerWhenEmpty
  @Tags({"hadoop", "HDFS", "get", "fetch", "ingest", "source", "filesystem"})
 -@CapabilityDescription("Fetch files from Hadoop Distributed File System (HDFS) into FlowFiles")
 +@CapabilityDescription("Fetch files from Hadoop Distributed File System (HDFS) into FlowFiles. This Processor will delete the file from HDFS after fetching it.")
  @WritesAttributes({
-         @WritesAttribute(attribute = "filename", description = "The name of the file that was read from HDFS."),
-         @WritesAttribute(attribute = "path", description = "The path is set to the relative path of the file's directory on HDFS. For example, if the Directory property is set to /tmp, then files picked up from /tmp will have the path attribute set to \"./\". If the Recurse Subdirectories property is set to true and a file is picked up from /tmp/abc/1/2/3, then the path attribute will be set to \"abc/1/2/3\".") })
+     @WritesAttribute(attribute = "filename", description = "The name of the file that was read from HDFS."),
 -    @WritesAttribute(attribute = "path", description = "The path is set to the relative path of the file's directory on HDFS. For example, if "
 -            + "the Directory property is set to /tmp, then files picked up from /tmp will have the path attribute set to \"./\". If the Recurse "
 -            + "Subdirectories property is set to true and a file is picked up from /tmp/abc/1/2/3, then the path attribute will be set to \"abc/1/2/3\".")})
 -@SeeAlso(PutHDFS.class)
++    @WritesAttribute(attribute = "path", description = "The path is set to the relative path of the file's directory on HDFS. For example, if the Directory property "
++            + "is set to /tmp, then files picked up from /tmp will have the path attribute set to \"./\". If the Recurse Subdirectories property is set to true and "
++            + "a file is picked up from /tmp/abc/1/2/3, then the path attribute will be set to \"abc/1/2/3\".") })
 +@SeeAlso({PutHDFS.class, ListHDFS.class})
  public class GetHDFS extends AbstractHadoopProcessor {
  
      public static final String BUFFER_SIZE_KEY = "io.file.buffer.size";
@@@ -73,105 -78,101 +75,101 @@@
  
      // relationships
      public static final Relationship REL_SUCCESS = new Relationship.Builder()
--            .name("success")
--            .description("All files retrieved from HDFS are transferred to this relationship")
--            .build();
++    .name("success")
++    .description("All files retrieved from HDFS are transferred to this relationship")
++    .build();
  
      public static final Relationship REL_PASSTHROUGH = new Relationship.Builder()
--            .name("passthrough")
--            .description(
--                    "If this processor has an input queue for some reason, then FlowFiles arriving on that input are transferred to this relationship")
++    .name("passthrough")
++    .description(
++            "If this processor has an input queue for some reason, then FlowFiles arriving on that input are transferred to this relationship")
              .build();
  
      // properties
      public static final PropertyDescriptor DIRECTORY = new PropertyDescriptor.Builder()
--            .name(DIRECTORY_PROP_NAME)
--            .description("The HDFS directory from which files should be read")
--            .required(true)
--            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
--            .build();
++    .name(DIRECTORY_PROP_NAME)
++    .description("The HDFS directory from which files should be read")
++    .required(true)
++    .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
++    .build();
  
      public static final PropertyDescriptor RECURSE_SUBDIRS = new PropertyDescriptor.Builder()
--            .name("Recurse Subdirectories")
--            .description("Indicates whether to pull files from subdirectories of the HDFS directory")
--            .required(true)
--            .allowableValues("true", "false")
--            .defaultValue("true")
--            .build();
++    .name("Recurse Subdirectories")
++    .description("Indicates whether to pull files from subdirectories of the HDFS directory")
++    .required(true)
++    .allowableValues("true", "false")
++    .defaultValue("true")
++    .build();
  
      public static final PropertyDescriptor KEEP_SOURCE_FILE = new PropertyDescriptor.Builder()
--            .name("Keep Source File")
-             .description("Determines whether to delete the file from HDFS after it has been successfully transferred. If true, the file will be fetched repeatedly. This is intended for testing only.")
 -            .description("Determines whether to delete the file from HDFS after it has been successfully transferred")
--            .required(true)
--            .allowableValues("true", "false")
--            .defaultValue("false")
--            .build();
++    .name("Keep Source File")
++    .description("Determines whether to delete the file from HDFS after it has been successfully transferred. If true, the file will be fetched repeatedly. This is intended for testing only.")
++    .required(true)
++    .allowableValues("true", "false")
++    .defaultValue("false")
++    .build();
  
      public static final PropertyDescriptor FILE_FILTER_REGEX = new PropertyDescriptor.Builder()
--            .name("File Filter Regex")
-             .description(
-                     "A Java Regular Expression for filtering Filenames; if a filter is supplied then only files whose names match that Regular Expression will be fetched, otherwise all files will be fetched")
 -            .description("A Java Regular Expression for filtering Filenames; if a filter is supplied then only files whose names match that Regular "
 -                    + "Expression will be fetched, otherwise all files will be fetched")
++    .name("File Filter Regex")
++    .description("A Java Regular Expression for filtering Filenames; if a filter is supplied then only files whose names match that Regular "
++            + "Expression will be fetched, otherwise all files will be fetched")
              .required(false)
              .addValidator(StandardValidators.REGULAR_EXPRESSION_VALIDATOR)
              .build();
  
      public static final PropertyDescriptor FILTER_MATCH_NAME_ONLY = new PropertyDescriptor.Builder()
--            .name("Filter Match Name Only")
-             .description(
-                     "If true then File Filter Regex will match on just the filename, otherwise subdirectory names will be included with filename in the regex comparison")
 -            .description("If true then File Filter Regex will match on just the filename, otherwise subdirectory names will be included with filename "
 -                    + "in the regex comparison")
++    .name("Filter Match Name Only")
++    .description("If true then File Filter Regex will match on just the filename, otherwise subdirectory names will be included with filename "
++            + "in the regex comparison")
              .required(true)
              .allowableValues("true", "false")
              .defaultValue("true")
              .build();
  
      public static final PropertyDescriptor IGNORE_DOTTED_FILES = new PropertyDescriptor.Builder()
--            .name("Ignore Dotted Files")
--            .description("If true, files whose names begin with a dot (\".\") will be ignored")
--            .required(true)
--            .allowableValues("true", "false")
--            .defaultValue("true")
--            .build();
++    .name("Ignore Dotted Files")
++    .description("If true, files whose names begin with a dot (\".\") will be ignored")
++    .required(true)
++    .allowableValues("true", "false")
++    .defaultValue("true")
++    .build();
  
      public static final PropertyDescriptor MIN_AGE = new PropertyDescriptor.Builder()
--            .name("Minimum File Age")
-             .description(
-                     "The minimum age that a file must be in order to be pulled; any file younger than this amount of time (based on last modification date) will be ignored")
 -            .description("The minimum age that a file must be in order to be pulled; any file younger than this amount of time (based on last modification date) will be ignored")
--            .required(true)
-             .addValidator(
-                     StandardValidators.createTimePeriodValidator(0, TimeUnit.MILLISECONDS, Long.MAX_VALUE, TimeUnit.NANOSECONDS))
 -            .addValidator(StandardValidators.createTimePeriodValidator(0, TimeUnit.MILLISECONDS, Long.MAX_VALUE, TimeUnit.NANOSECONDS))
--            .defaultValue("0 sec")
--            .build();
++    .name("Minimum File Age")
++    .description("The minimum age that a file must be in order to be pulled; any file younger than this amount of time (based on last modification date) will be ignored")
++    .required(true)
++    .addValidator(StandardValidators.createTimePeriodValidator(0, TimeUnit.MILLISECONDS, Long.MAX_VALUE, TimeUnit.NANOSECONDS))
++    .defaultValue("0 sec")
++    .build();
  
      public static final PropertyDescriptor MAX_AGE = new PropertyDescriptor.Builder()
--            .name("Maximum File Age")
-             .description(
-                     "The maximum age that a file must be in order to be pulled; any file older than this amount of time (based on last modification date) will be ignored")
 -            .description("The maximum age that a file must be in order to be pulled; any file older than this amount of time (based on last modification date) will be ignored")
--            .required(false)
-             .addValidator(
-                     StandardValidators.createTimePeriodValidator(100, TimeUnit.MILLISECONDS, Long.MAX_VALUE, TimeUnit.NANOSECONDS))
 -            .addValidator(StandardValidators.createTimePeriodValidator(100, TimeUnit.MILLISECONDS, Long.MAX_VALUE, TimeUnit.NANOSECONDS))
--            .build();
++    .name("Maximum File Age")
++    .description("The maximum age that a file must be in order to be pulled; any file older than this amount of time (based on last modification date) will be ignored")
++    .required(false)
++    .addValidator(StandardValidators.createTimePeriodValidator(100, TimeUnit.MILLISECONDS, Long.MAX_VALUE, TimeUnit.NANOSECONDS))
++    .build();
  
      public static final PropertyDescriptor BATCH_SIZE = new PropertyDescriptor.Builder()
--            .name("Batch Size")
--            .description("The maximum number of files to pull in each iteration, based on run schedule.")
--            .required(true)
--            .defaultValue("100")
--            .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
--            .build();
++    .name("Batch Size")
++    .description("The maximum number of files to pull in each iteration, based on run schedule.")
++    .required(true)
++    .defaultValue("100")
++    .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
++    .build();
  
      public static final PropertyDescriptor POLLING_INTERVAL = new PropertyDescriptor.Builder()
--            .name("Polling Interval")
--            .description("Indicates how long to wait between performing directory listings")
--            .required(true)
--            .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
--            .defaultValue("0 sec")
--            .build();
++    .name("Polling Interval")
++    .description("Indicates how long to wait between performing directory listings")
++    .required(true)
++    .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
++    .defaultValue("0 sec")
++    .build();
  
      public static final PropertyDescriptor BUFFER_SIZE = new PropertyDescriptor.Builder()
--            .name("IO Buffer Size")
--            .description("Amount of memory to use to buffer file contents during IO. This overrides the Hadoop Configuration")
--            .addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
--            .build();
++    .name("IO Buffer Size")
++    .description("Amount of memory to use to buffer file contents during IO. This overrides the Hadoop Configuration")
++    .addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
++    .build();
  
      private static final Set<Relationship> relationships;
      protected static final List<PropertyDescriptor> localProperties;
@@@ -461,11 -463,35 +460,8 @@@
          return files;
      }
  
-     
- 
      /**
-      * Holder for a snapshot in time of some processor properties that are
-      * passed around.
 -     * Returns the relative path of the child that does not include the filename or the root path.
 -     *
 -     * @param root root
 -     * @param child child
 -     * @return the relative path of the child that does not include the filename or the root path
 -     */
 -    public static String getPathDifference(final Path root, final Path child) {
 -        final int depthDiff = child.depth() - root.depth();
 -        if (depthDiff <= 1) {
 -            return "".intern();
 -        }
 -        String lastRoot = root.getName();
 -        Path childsParent = child.getParent();
 -        final StringBuilder builder = new StringBuilder();
 -        builder.append(childsParent.getName());
 -        for (int i = (depthDiff - 3); i >= 0; i--) {
 -            childsParent = childsParent.getParent();
 -            String name = childsParent.getName();
 -            if (name.equals(lastRoot) && childsParent.toString().endsWith(root.toString())) {
 -                break;
 -            }
 -            builder.insert(0, Path.SEPARATOR).insert(0, name);
 -        }
 -        return builder.toString();
 -    }
 -
 -    /**
+      * Holder for a snapshot in time of some processor properties that are passed around.
       */
      protected static class ProcessorConfiguration {
  

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/dc7f7a82/nifi/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/ListHDFS.java
----------------------------------------------------------------------
diff --cc nifi/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/ListHDFS.java
index 707b50d,0000000..56a128a
mode 100644,000000..100644
--- a/nifi/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/ListHDFS.java
+++ b/nifi/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/ListHDFS.java
@@@ -1,466 -1,0 +1,469 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one or more
 + * contributor license agreements.  See the NOTICE file distributed with
 + * this work for additional information regarding copyright ownership.
 + * The ASF licenses this file to You under the Apache License, Version 2.0
 + * (the "License"); you may not use this file except in compliance with
 + * the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.nifi.processors.hadoop;
 +
 +import java.io.File;
 +import java.io.FileInputStream;
 +import java.io.FileOutputStream;
 +import java.io.IOException;
 +import java.util.ArrayList;
 +import java.util.Collections;
 +import java.util.Comparator;
 +import java.util.Date;
 +import java.util.HashMap;
 +import java.util.HashSet;
 +import java.util.List;
 +import java.util.Map;
 +import java.util.Properties;
 +import java.util.Set;
 +
 +import org.apache.hadoop.fs.FileStatus;
 +import org.apache.hadoop.fs.FileSystem;
 +import org.apache.hadoop.fs.Path;
 +import org.apache.hadoop.fs.permission.FsAction;
 +import org.apache.hadoop.fs.permission.FsPermission;
 +import org.apache.nifi.annotation.behavior.TriggerSerially;
 +import org.apache.nifi.annotation.behavior.TriggerWhenEmpty;
 +import org.apache.nifi.annotation.behavior.WritesAttribute;
 +import org.apache.nifi.annotation.behavior.WritesAttributes;
 +import org.apache.nifi.annotation.documentation.CapabilityDescription;
 +import org.apache.nifi.annotation.documentation.SeeAlso;
 +import org.apache.nifi.annotation.documentation.Tags;
 +import org.apache.nifi.annotation.notification.OnPrimaryNodeStateChange;
 +import org.apache.nifi.annotation.notification.PrimaryNodeState;
 +import org.apache.nifi.components.PropertyDescriptor;
 +import org.apache.nifi.distributed.cache.client.DistributedMapCacheClient;
 +import org.apache.nifi.flowfile.FlowFile;
 +import org.apache.nifi.flowfile.attributes.CoreAttributes;
 +import org.apache.nifi.processor.ProcessContext;
 +import org.apache.nifi.processor.ProcessSession;
 +import org.apache.nifi.processor.ProcessorInitializationContext;
 +import org.apache.nifi.processor.Relationship;
 +import org.apache.nifi.processor.exception.ProcessException;
 +import org.apache.nifi.processor.util.StandardValidators;
 +import org.apache.nifi.processors.hadoop.util.HDFSListing;
 +import org.apache.nifi.processors.hadoop.util.StringSerDe;
 +import org.codehaus.jackson.JsonGenerationException;
 +import org.codehaus.jackson.JsonNode;
 +import org.codehaus.jackson.JsonParseException;
 +import org.codehaus.jackson.map.JsonMappingException;
 +import org.codehaus.jackson.map.ObjectMapper;
 +
 +
 +@TriggerSerially
 +@TriggerWhenEmpty
 +@Tags({"hadoop", "HDFS", "get", "list", "ingest", "source", "filesystem"})
 +@CapabilityDescription("Retrieves a listing of files from HDFS. For each file that is listed in HDFS, creates a FlowFile that represents "
- 		+ "the HDFS file so that it can be fetched in conjunction with ListHDFS. This Processor is designed to run on Primary Node only "
- 		+ "in a cluster. If the primary node changes, the new Primary Node will pick up where the previous node left off without duplicating "
- 		+ "all of the data. Unlike GetHDFS, this Processor does not delete any data from HDFS.")
++        + "the HDFS file so that it can be fetched in conjunction with ListHDFS. This Processor is designed to run on Primary Node only "
++        + "in a cluster. If the primary node changes, the new Primary Node will pick up where the previous node left off without duplicating "
++        + "all of the data. Unlike GetHDFS, this Processor does not delete any data from HDFS.")
 +@WritesAttributes({
-         @WritesAttribute(attribute="filename", description="The name of the file that was read from HDFS."),
-         @WritesAttribute(attribute="path", description="The path is set to the absolute path of the file's directory on HDFS. For example, if the Directory property is set to /tmp, then files picked up from /tmp will have the path attribute set to \"./\". If the Recurse Subdirectories property is set to true and a file is picked up from /tmp/abc/1/2/3, then the path attribute will be set to \"/tmp/abc/1/2/3\"."),
- 		@WritesAttribute(attribute="hdfs.owner", description="The user that owns the file in HDFS"),
- 		@WritesAttribute(attribute="hdfs.group", description="The group that owns the file in HDFS"),
- 		@WritesAttribute(attribute="hdfs.lastModified", description="The timestamp of when the file in HDFS was last modified, as milliseconds since midnight Jan 1, 1970 UTC"),
- 		@WritesAttribute(attribute="hdfs.length", description="The number of bytes in the file in HDFS"),
- 		@WritesAttribute(attribute="hdfs.replication", description="The number of HDFS replicas for hte file"),
- 		@WritesAttribute(attribute="hdfs.permissions", description="The permissions for the file in HDFS. This is formatted as 3 characters for the owner, 3 for the group, and 3 for other users. For example rw-rw-r--")
++    @WritesAttribute(attribute="filename", description="The name of the file that was read from HDFS."),
++    @WritesAttribute(attribute="path", description="The path is set to the absolute path of the file's directory on HDFS. For example, if the Directory property is set to /tmp, "
++            + "then files picked up from /tmp will have the path attribute set to \"./\". If the Recurse Subdirectories property is set to true and a file is picked up "
++            + "from /tmp/abc/1/2/3, then the path attribute will be set to \"/tmp/abc/1/2/3\"."),
++    @WritesAttribute(attribute="hdfs.owner", description="The user that owns the file in HDFS"),
++    @WritesAttribute(attribute="hdfs.group", description="The group that owns the file in HDFS"),
++    @WritesAttribute(attribute="hdfs.lastModified", description="The timestamp of when the file in HDFS was last modified, as milliseconds since midnight Jan 1, 1970 UTC"),
++    @WritesAttribute(attribute="hdfs.length", description="The number of bytes in the file in HDFS"),
++    @WritesAttribute(attribute="hdfs.replication", description="The number of HDFS replicas for hte file"),
++    @WritesAttribute(attribute="hdfs.permissions", description="The permissions for the file in HDFS. This is formatted as 3 characters for the owner, "
++            + "3 for the group, and 3 for other users. For example rw-rw-r--")
 +})
 +@SeeAlso({GetHDFS.class, FetchHDFS.class, PutHDFS.class})
 +public class ListHDFS extends AbstractHadoopProcessor {
- 
- 	public static final PropertyDescriptor DISTRIBUTED_CACHE_SERVICE = new PropertyDescriptor.Builder()
- 		.name("Distributed Cache Service")
- 		.description("Specifies the Controller Service that should be used to maintain state about what has been pulled from HDFS so that if a new node begins pulling data, it won't duplicate all of the work that has been done.")
- 		.required(true)
- 		.identifiesControllerService(DistributedMapCacheClient.class)
- 		.build();
++    public static final PropertyDescriptor DISTRIBUTED_CACHE_SERVICE = new PropertyDescriptor.Builder()
++        .name("Distributed Cache Service")
++        .description("Specifies the Controller Service that should be used to maintain state about what has been pulled from HDFS so that if a new node "
++                + "begins pulling data, it won't duplicate all of the work that has been done.")
++        .required(true)
++        .identifiesControllerService(DistributedMapCacheClient.class)
++        .build();
 +
 +    public static final PropertyDescriptor DIRECTORY = new PropertyDescriptor.Builder()
- 	    .name(DIRECTORY_PROP_NAME)
- 	    .description("The HDFS directory from which files should be read")
- 	    .required(true)
- 	    .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
- 	    .build();
- 	
- 	public static final PropertyDescriptor RECURSE_SUBDIRS = new PropertyDescriptor.Builder()
- 	    .name("Recurse Subdirectories")
- 	    .description("Indicates whether to list files from subdirectories of the HDFS directory")
- 	    .required(true)
- 	    .allowableValues("true", "false")
- 	    .defaultValue("true")
- 	    .build();
- 	
- 	
++        .name(DIRECTORY_PROP_NAME)
++        .description("The HDFS directory from which files should be read")
++        .required(true)
++        .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
++        .build();
++
++    public static final PropertyDescriptor RECURSE_SUBDIRS = new PropertyDescriptor.Builder()
++        .name("Recurse Subdirectories")
++        .description("Indicates whether to list files from subdirectories of the HDFS directory")
++        .required(true)
++        .allowableValues("true", "false")
++        .defaultValue("true")
++        .build();
++
++
 +    public static final Relationship REL_SUCCESS = new Relationship.Builder()
- 	    .name("success")
- 	    .description("All FlowFiles are transferred to this relationship")
- 	    .build();
++        .name("success")
++        .description("All FlowFiles are transferred to this relationship")
++        .build();
 +
 +    private volatile Long lastListingTime = null;
 +    private volatile Set<Path> latestPathsListed = new HashSet<>();
 +    private volatile boolean electedPrimaryNode = false;
 +    private File persistenceFile = null;
-     
++
 +    @Override
 +    protected void init(final ProcessorInitializationContext context) {
-     	super.init(context);
-     	persistenceFile = new File("conf/state/" + getIdentifier());
++        super.init(context);
++        persistenceFile = new File("conf/state/" + getIdentifier());
++    }
++
++    @Override
++    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
++        final List<PropertyDescriptor> properties = new ArrayList<>();
++        properties.add(HADOOP_CONFIGURATION_RESOURCES);
++        properties.add(DISTRIBUTED_CACHE_SERVICE);
++        properties.add(DIRECTORY);
++        properties.add(RECURSE_SUBDIRS);
++        return properties;
++    }
++
++    @Override
++    public Set<Relationship> getRelationships() {
++        final Set<Relationship> relationships = new HashSet<>();
++        relationships.add(REL_SUCCESS);
++        return relationships;
++    }
++
++    private String getKey(final String directory) {
++        return getIdentifier() + ".lastListingTime." + directory;
++    }
++
++    @OnPrimaryNodeStateChange
++    public void onPrimaryNodeChange(final PrimaryNodeState newState) {
++        if ( newState == PrimaryNodeState.ELECTED_PRIMARY_NODE ) {
++            electedPrimaryNode = true;
++        }
++    }
++
++    @Override
++    public void onPropertyModified(final PropertyDescriptor descriptor, final String oldValue, final String newValue) {
++        if ( descriptor.equals(DIRECTORY) ) {
++            lastListingTime = null; // clear lastListingTime so that we have to fetch new time
++            latestPathsListed = new HashSet<>();
++        }
 +    }
- 	
- 	@Override
- 	protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
- 		final List<PropertyDescriptor> properties = new ArrayList<>();
- 		properties.add(HADOOP_CONFIGURATION_RESOURCES);
- 		properties.add(DISTRIBUTED_CACHE_SERVICE);
- 		properties.add(DIRECTORY);
- 		properties.add(RECURSE_SUBDIRS);
- 		return properties;
- 	}
- 	
- 	@Override
- 	public Set<Relationship> getRelationships() {
- 		final Set<Relationship> relationships = new HashSet<>();
- 		relationships.add(REL_SUCCESS);
- 		return relationships;
- 	}
- 
- 	private String getKey(final String directory) {
- 		return getIdentifier() + ".lastListingTime." + directory;
- 	}
- 	
- 	@OnPrimaryNodeStateChange
- 	public void onPrimaryNodeChange(final PrimaryNodeState newState) {
- 		if ( newState == PrimaryNodeState.ELECTED_PRIMARY_NODE ) {
- 			electedPrimaryNode = true;
- 		}
- 	}
- 	
- 	@Override
- 	public void onPropertyModified(final PropertyDescriptor descriptor, final String oldValue, final String newValue) {
- 		if ( descriptor.equals(DIRECTORY) ) {
- 			lastListingTime = null;	// clear lastListingTime so that we have to fetch new time
- 			latestPathsListed = new HashSet<>();
- 		}
- 	}
- 	
- 	private HDFSListing deserialize(final String serializedState) throws JsonParseException, JsonMappingException, IOException {
- 		final ObjectMapper mapper = new ObjectMapper();
++
++    private HDFSListing deserialize(final String serializedState) throws JsonParseException, JsonMappingException, IOException {
++        final ObjectMapper mapper = new ObjectMapper();
 +        final JsonNode jsonNode = mapper.readTree(serializedState);
 +        return mapper.readValue(jsonNode, HDFSListing.class);
- 	}
- 	
- 	
- 	@Override
- 	public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
- 		final String directory = context.getProperty(DIRECTORY).getValue();
- 
- 		// Determine the timestamp for the last file that we've listed.
- 		Long minTimestamp = lastListingTime;
- 		if ( minTimestamp == null || electedPrimaryNode ) {
- 			// We haven't yet restored any state from local or distributed state - or it's been at least a minute since
- 			// we have performed a listing. In this case, 
- 			// First, attempt to get timestamp from distributed cache service.
- 			final DistributedMapCacheClient client = context.getProperty(DISTRIBUTED_CACHE_SERVICE).asControllerService(DistributedMapCacheClient.class);
- 			
- 			try {
- 				final StringSerDe serde = new StringSerDe();
- 				final String serializedState = client.get(getKey(directory), serde, serde);
- 				if ( serializedState == null || serializedState.isEmpty() ) {
- 					minTimestamp = null;
- 					this.latestPathsListed = Collections.emptySet();
- 				} else {
- 					final HDFSListing listing = deserialize(serializedState);
- 					this.lastListingTime = listing.getLatestTimestamp().getTime();
- 					minTimestamp = listing.getLatestTimestamp().getTime();
- 					this.latestPathsListed = listing.toPaths();
- 				}
- 				
- 				this.lastListingTime = minTimestamp;
- 				electedPrimaryNode = false;	// no requirement to pull an update from the distributed cache anymore.
- 			} catch (final IOException ioe) {
- 				getLogger().error("Failed to retrieve timestamp of last listing from Distributed Cache Service. Will not perform listing until this is accomplished.");
- 				context.yield();
- 				return;
- 			}
- 			
- 			// Check the persistence file. We want to use the latest timestamp that we have so that
- 			// we don't duplicate data.
- 			try {
- 				if ( persistenceFile.exists() ) {
- 					try (final FileInputStream fis = new FileInputStream(persistenceFile)) {
- 						final Properties props = new Properties();
- 						props.load(fis);
- 
- 						// get the local timestamp for this directory, if it exists.
- 						final String locallyPersistedValue = props.getProperty(directory);
- 						if ( locallyPersistedValue != null ) {
- 							final HDFSListing listing = deserialize(locallyPersistedValue);
- 							final long localTimestamp = listing.getLatestTimestamp().getTime();
- 							
- 							// If distributed state doesn't have an entry or the local entry is later than the distributed state,
- 							// update the distributed state so that we are in sync.
- 							if (minTimestamp == null || localTimestamp > minTimestamp) {
- 								minTimestamp = localTimestamp;
- 								
- 								// Our local persistence file shows a later time than the Distributed service.
- 								// Update the distributed service to match our local state.
- 								try {
- 									final StringSerDe serde = new StringSerDe();
- 									client.put(getKey(directory), locallyPersistedValue, serde, serde);
- 								} catch (final IOException ioe) {
- 									getLogger().warn("Local timestamp for {} is {}, which is later than Distributed state but failed to update Distributed "
- 											+ "state due to {}. If a new node performs HDFS Listing, data duplication may occur", 
- 											new Object[] {directory, locallyPersistedValue, ioe});
- 								}
- 							}
- 						}
- 					}
- 				}
- 			} catch (final IOException ioe) {
- 				getLogger().warn("Failed to recover local state due to {}. Assuming that the state from the distributed cache is correct.", ioe);
- 			}
- 		}
- 		
- 		
- 		// Pull in any file that is newer than the timestamp that we have.
- 		final FileSystem hdfs = hdfsResources.get().getValue();
- 		final boolean recursive = context.getProperty(RECURSE_SUBDIRS).asBoolean();
- 		final Path rootPath = new Path(directory);
- 		
- 		int listCount = 0;
- 		Long latestListingModTime = null;
- 		final Set<FileStatus> statuses;
- 		try {
- 			statuses = getStatuses(rootPath, recursive, hdfs);
- 			for ( final FileStatus status : statuses ) {
- 				// don't get anything where the last modified timestamp is equal to our current timestamp.
- 				// if we do, then we run the risk of multiple files having the same last mod date but us only
- 				// seeing a portion of them.
- 				// I.e., there could be 5 files with last mod date = (now). But if we do the listing now, maybe
- 				// only 2 exist and 3 more will exist later in this millisecond. So we ignore anything with a
- 				// modified date not before the current time.
- 				final long fileModTime = status.getModificationTime();
- 				
- 				// we only want the file if its timestamp is later than the minTimestamp or equal to and we didn't pull it last time.
- 				// Also, HDFS creates files with the suffix _COPYING_ when they are being written - we want to ignore those.
- 				boolean fetch = !status.getPath().getName().endsWith("_COPYING_") &&
- 						(minTimestamp == null || fileModTime > minTimestamp || (fileModTime == minTimestamp && !latestPathsListed.contains(status.getPath())));
- 				
- 				// Create the FlowFile for this path.
- 				if ( fetch ) {
- 					final Map<String, String> attributes = createAttributes(status);
- 					FlowFile flowFile = session.create();
- 					flowFile = session.putAllAttributes(flowFile, attributes);
- 					session.transfer(flowFile, REL_SUCCESS);
- 					listCount++;
- 					
- 					if ( latestListingModTime == null || fileModTime > latestListingModTime ) {
- 						latestListingModTime = fileModTime;
- 					}
- 				}
- 			}
- 		} catch (final IOException ioe) {
- 			getLogger().error("Failed to perform listing of HDFS due to {}", new Object[] {ioe});
- 			return;
- 		}
- 		
- 		if ( listCount > 0 ) {
- 			getLogger().info("Successfully created listing with {} new files from HDFS", new Object[] {listCount});
- 			session.commit();
- 
- 			// We have performed a listing and pushed the FlowFiles out.
- 			// Now, we need to persist state about the Last Modified timestamp of the newest file
- 			// that we pulled in. We do this in order to avoid pulling in the same file twice.
- 			// However, we want to save the state both locally and remotely.
- 			// We store the state remotely so that if a new Primary Node is chosen, it can pick up where the
- 			// previously Primary Node left off.
- 			// We also store the state locally so that if the node is restarted, and the node cannot contact
- 			// the distributed state cache, the node can continue to run (if it is primary node).
- 			String serializedState = null;
- 			try {
- 				serializedState = serializeState(latestListingModTime, statuses);
- 			} catch (final Exception e) {
- 				getLogger().error("Failed to serialize state due to {}", new Object[] {e});
- 			}
- 			
- 			if ( serializedState != null ) {
- 				// Save our state locally.
- 				try {
- 					persistLocalState(directory, serializedState);
- 				} catch (final IOException ioe) {
- 					getLogger().warn("Unable to save state locally. If the node is restarted now, data may be duplicated. Failure is due to {}", ioe);
- 				}
- 	
- 				// Attempt to save state to remote server.
- 				final DistributedMapCacheClient client = context.getProperty(DISTRIBUTED_CACHE_SERVICE).asControllerService(DistributedMapCacheClient.class);
- 				try {
- 					client.put(getKey(directory), serializedState, new StringSerDe(), new StringSerDe());
- 				} catch (final IOException ioe) {
- 					getLogger().warn("Unable to communicate with distributed cache server due to {}. Persisting state locally instead.", ioe);
- 				}
- 			}
- 			
- 			lastListingTime = latestListingModTime;
- 		} else {
- 			getLogger().debug("There is no data to list. Yielding.");
- 			context.yield();
- 			
- 			// lastListingTime = 0 so that we don't continually poll the distributed cache / local file system
- 			if ( lastListingTime == null ) {
- 				lastListingTime = 0L;
- 			}
- 			
- 			return;
- 		}
- 	}
- 	
- 	private Set<FileStatus> getStatuses(final Path path, final boolean recursive, final FileSystem hdfs) throws IOException {
- 		final Set<FileStatus> statusSet = new HashSet<>();
- 		
- 		final FileStatus[] statuses = hdfs.listStatus(path);
- 		
- 		for ( final FileStatus status : statuses ) {
- 			if ( status.isDirectory() ) {
- 				if ( recursive ) {
- 					try {
- 						statusSet.addAll(getStatuses(status.getPath(), recursive, hdfs));
- 					} catch (final IOException ioe) {
- 						getLogger().error("Failed to retrieve HDFS listing for subdirectory {} due to {}; will continue listing others", new Object[] {status.getPath(), ioe});
- 					}
- 				}
- 			} else {
- 				statusSet.add(status);
- 			}
- 		}
- 		
- 		return statusSet;
- 	}
- 	
- 	
- 	private String serializeState(final long latestListingTime, final Set<FileStatus> statuses) throws JsonGenerationException, JsonMappingException, IOException {
- 		// we need to keep track of all files that we pulled in that had a modification time equal to
- 		// lastListingTime so that we can avoid pulling those files in again. We can't just ignore any files
- 		// that have a mod time equal to that timestamp because more files may come in with the same timestamp
- 		// later in the same millisecond.
- 		if ( statuses.isEmpty() ) {
- 			return null;
- 		} else {
- 			final List<FileStatus> sortedStatuses = new ArrayList<>(statuses);
- 			Collections.sort(sortedStatuses, new Comparator<FileStatus>() {
- 				@Override
- 				public int compare(final FileStatus o1, final FileStatus o2) {
- 					return Long.compare(o1.getModificationTime(), o2.getModificationTime());
- 				}
- 			});
- 			
- 			final long latestListingModTime = sortedStatuses.get(sortedStatuses.size() - 1).getModificationTime();
- 			final Set<Path> pathsWithModTimeEqualToListingModTime = new HashSet<>();
- 			for (int i=sortedStatuses.size() - 1; i >= 0; i--) {
- 				final FileStatus status = sortedStatuses.get(i);
- 				if (status.getModificationTime() == latestListingModTime) {
- 					pathsWithModTimeEqualToListingModTime.add(status.getPath());
- 				}
- 			}
- 			
- 			this.latestPathsListed = pathsWithModTimeEqualToListingModTime;
- 			
- 			final HDFSListing listing = new HDFSListing();
- 			listing.setLatestTimestamp(new Date(latestListingModTime));
- 			final Set<String> paths = new HashSet<>();
- 			for ( final Path path : pathsWithModTimeEqualToListingModTime ) {
- 				paths.add(path.toUri().toString());
- 			}
- 			listing.setMatchingPaths(paths);
- 
- 			final ObjectMapper mapper = new ObjectMapper();
- 			final String serializedState = mapper.writerWithType(HDFSListing.class).writeValueAsString(listing);
- 			return serializedState;
- 		}
- 	}
- 	
- 	private void persistLocalState(final String directory, final String serializedState) throws IOException {
- 		// we need to keep track of all files that we pulled in that had a modification time equal to
- 		// lastListingTime so that we can avoid pulling those files in again. We can't just ignore any files
- 		// that have a mod time equal to that timestamp because more files may come in with the same timestamp
- 		// later in the same millisecond.
- 		final File dir = persistenceFile.getParentFile();
- 		if ( !dir.exists() && !dir.mkdirs() ) {
- 			throw new IOException("Could not create directory " + dir.getAbsolutePath() + " in order to save local state");
- 		}
- 		
- 		final Properties props = new Properties();
- 		if ( persistenceFile.exists() ) {
- 			try (final FileInputStream fis = new FileInputStream(persistenceFile)) {
- 				props.load(fis);
- 			}
- 		}
- 
- 		props.setProperty(directory, serializedState);
- 		
- 		try (final FileOutputStream fos = new FileOutputStream(persistenceFile)) {
- 			props.store(fos, null);
- 		}
- 	}
- 
- 	private String getAbsolutePath(final Path path) {
- 		final Path parent = path.getParent();
- 		final String prefix = (parent == null || parent.getName().equals("")) ? "" : getAbsolutePath(parent);
- 		return prefix + "/" + path.getName();
- 	}
- 	
- 	private Map<String, String> createAttributes(final FileStatus status) {
- 		final Map<String, String> attributes = new HashMap<>();
- 		attributes.put(CoreAttributes.FILENAME.key(), status.getPath().getName());
- 		attributes.put(CoreAttributes.PATH.key(), getAbsolutePath(status.getPath().getParent()));
- 		
- 		attributes.put("hdfs.owner", status.getOwner());
- 		attributes.put("hdfs.group", status.getGroup());
- 		attributes.put("hdfs.lastModified", String.valueOf(status.getModificationTime()));
- 		attributes.put("hdfs.length", String.valueOf(status.getLen()));
- 		attributes.put("hdfs.replication", String.valueOf(status.getReplication()));
- 		
- 		final FsPermission permission = status.getPermission();
- 		final String perms = getPerms(permission.getUserAction()) + getPerms(permission.getGroupAction()) + getPerms(permission.getOtherAction());
- 		attributes.put("hdfs.permissions", perms);
- 		return attributes;
- 	}
- 	
- 	private String getPerms(final FsAction action) {
- 		final StringBuilder sb = new StringBuilder();
- 		if ( action.implies(FsAction.READ) ) {
- 			sb.append("r");
- 		} else {
- 			sb.append("-");
- 		}
- 		
- 		if ( action.implies(FsAction.WRITE) ) {
- 			sb.append("w");
- 		} else {
- 			sb.append("-");
- 		}
- 		
- 		if ( action.implies(FsAction.EXECUTE) ) {
- 			sb.append("x");
- 		} else {
- 			sb.append("-");
- 		}
- 		
- 		return sb.toString();
- 	}
++    }
++
++
++    @Override
++    public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
++        final String directory = context.getProperty(DIRECTORY).getValue();
++
++        // Determine the timestamp for the last file that we've listed.
++        Long minTimestamp = lastListingTime;
++        if ( minTimestamp == null || electedPrimaryNode ) {
++            // We haven't yet restored any state from local or distributed state - or it's been at least a minute since
++            // we have performed a listing. In this case,
++            // First, attempt to get timestamp from distributed cache service.
++            final DistributedMapCacheClient client = context.getProperty(DISTRIBUTED_CACHE_SERVICE).asControllerService(DistributedMapCacheClient.class);
++
++            try {
++                final StringSerDe serde = new StringSerDe();
++                final String serializedState = client.get(getKey(directory), serde, serde);
++                if ( serializedState == null || serializedState.isEmpty() ) {
++                    minTimestamp = null;
++                    this.latestPathsListed = Collections.emptySet();
++                } else {
++                    final HDFSListing listing = deserialize(serializedState);
++                    this.lastListingTime = listing.getLatestTimestamp().getTime();
++                    minTimestamp = listing.getLatestTimestamp().getTime();
++                    this.latestPathsListed = listing.toPaths();
++                }
++
++                this.lastListingTime = minTimestamp;
++                electedPrimaryNode = false; // no requirement to pull an update from the distributed cache anymore.
++            } catch (final IOException ioe) {
++                getLogger().error("Failed to retrieve timestamp of last listing from Distributed Cache Service. Will not perform listing until this is accomplished.");
++                context.yield();
++                return;
++            }
++
++            // Check the persistence file. We want to use the latest timestamp that we have so that
++            // we don't duplicate data.
++            try {
++                if ( persistenceFile.exists() ) {
++                    try (final FileInputStream fis = new FileInputStream(persistenceFile)) {
++                        final Properties props = new Properties();
++                        props.load(fis);
++
++                        // get the local timestamp for this directory, if it exists.
++                        final String locallyPersistedValue = props.getProperty(directory);
++                        if ( locallyPersistedValue != null ) {
++                            final HDFSListing listing = deserialize(locallyPersistedValue);
++                            final long localTimestamp = listing.getLatestTimestamp().getTime();
++
++                            // If distributed state doesn't have an entry or the local entry is later than the distributed state,
++                            // update the distributed state so that we are in sync.
++                            if (minTimestamp == null || localTimestamp > minTimestamp) {
++                                minTimestamp = localTimestamp;
++
++                                // Our local persistence file shows a later time than the Distributed service.
++                                // Update the distributed service to match our local state.
++                                try {
++                                    final StringSerDe serde = new StringSerDe();
++                                    client.put(getKey(directory), locallyPersistedValue, serde, serde);
++                                } catch (final IOException ioe) {
++                                    getLogger().warn("Local timestamp for {} is {}, which is later than Distributed state but failed to update Distributed "
++                                            + "state due to {}. If a new node performs HDFS Listing, data duplication may occur",
++                                            new Object[] {directory, locallyPersistedValue, ioe});
++                                }
++                            }
++                        }
++                    }
++                }
++            } catch (final IOException ioe) {
++                getLogger().warn("Failed to recover local state due to {}. Assuming that the state from the distributed cache is correct.", ioe);
++            }
++        }
++
++
++        // Pull in any file that is newer than the timestamp that we have.
++        final FileSystem hdfs = hdfsResources.get().getValue();
++        final boolean recursive = context.getProperty(RECURSE_SUBDIRS).asBoolean();
++        final Path rootPath = new Path(directory);
++
++        int listCount = 0;
++        Long latestListingModTime = null;
++        final Set<FileStatus> statuses;
++        try {
++            statuses = getStatuses(rootPath, recursive, hdfs);
++            for ( final FileStatus status : statuses ) {
++                // don't get anything where the last modified timestamp is equal to our current timestamp.
++                // if we do, then we run the risk of multiple files having the same last mod date but us only
++                // seeing a portion of them.
++                // I.e., there could be 5 files with last mod date = (now). But if we do the listing now, maybe
++                // only 2 exist and 3 more will exist later in this millisecond. So we ignore anything with a
++                // modified date not before the current time.
++                final long fileModTime = status.getModificationTime();
++
++                // we only want the file if its timestamp is later than the minTimestamp or equal to and we didn't pull it last time.
++                // Also, HDFS creates files with the suffix _COPYING_ when they are being written - we want to ignore those.
++                boolean fetch = !status.getPath().getName().endsWith("_COPYING_")
++                        && (minTimestamp == null || fileModTime > minTimestamp || (fileModTime == minTimestamp && !latestPathsListed.contains(status.getPath())));
++
++                // Create the FlowFile for this path.
++                if ( fetch ) {
++                    final Map<String, String> attributes = createAttributes(status);
++                    FlowFile flowFile = session.create();
++                    flowFile = session.putAllAttributes(flowFile, attributes);
++                    session.transfer(flowFile, REL_SUCCESS);
++                    listCount++;
++
++                    if ( latestListingModTime == null || fileModTime > latestListingModTime ) {
++                        latestListingModTime = fileModTime;
++                    }
++                }
++            }
++        } catch (final IOException ioe) {
++            getLogger().error("Failed to perform listing of HDFS due to {}", new Object[] {ioe});
++            return;
++        }
++
++        if ( listCount > 0 ) {
++            getLogger().info("Successfully created listing with {} new files from HDFS", new Object[] {listCount});
++            session.commit();
++
++            // We have performed a listing and pushed the FlowFiles out.
++            // Now, we need to persist state about the Last Modified timestamp of the newest file
++            // that we pulled in. We do this in order to avoid pulling in the same file twice.
++            // However, we want to save the state both locally and remotely.
++            // We store the state remotely so that if a new Primary Node is chosen, it can pick up where the
++            // previously Primary Node left off.
++            // We also store the state locally so that if the node is restarted, and the node cannot contact
++            // the distributed state cache, the node can continue to run (if it is primary node).
++            String serializedState = null;
++            try {
++                serializedState = serializeState(latestListingModTime, statuses);
++            } catch (final Exception e) {
++                getLogger().error("Failed to serialize state due to {}", new Object[] {e});
++            }
++
++            if ( serializedState != null ) {
++                // Save our state locally.
++                try {
++                    persistLocalState(directory, serializedState);
++                } catch (final IOException ioe) {
++                    getLogger().warn("Unable to save state locally. If the node is restarted now, data may be duplicated. Failure is due to {}", ioe);
++                }
++
++                // Attempt to save state to remote server.
++                final DistributedMapCacheClient client = context.getProperty(DISTRIBUTED_CACHE_SERVICE).asControllerService(DistributedMapCacheClient.class);
++                try {
++                    client.put(getKey(directory), serializedState, new StringSerDe(), new StringSerDe());
++                } catch (final IOException ioe) {
++                    getLogger().warn("Unable to communicate with distributed cache server due to {}. Persisting state locally instead.", ioe);
++                }
++            }
++
++            lastListingTime = latestListingModTime;
++        } else {
++            getLogger().debug("There is no data to list. Yielding.");
++            context.yield();
++
++            // lastListingTime = 0 so that we don't continually poll the distributed cache / local file system
++            if ( lastListingTime == null ) {
++                lastListingTime = 0L;
++            }
++
++            return;
++        }
++    }
++
++    private Set<FileStatus> getStatuses(final Path path, final boolean recursive, final FileSystem hdfs) throws IOException {
++        final Set<FileStatus> statusSet = new HashSet<>();
++
++        final FileStatus[] statuses = hdfs.listStatus(path);
++
++        for ( final FileStatus status : statuses ) {
++            if ( status.isDirectory() ) {
++                if ( recursive ) {
++                    try {
++                        statusSet.addAll(getStatuses(status.getPath(), recursive, hdfs));
++                    } catch (final IOException ioe) {
++                        getLogger().error("Failed to retrieve HDFS listing for subdirectory {} due to {}; will continue listing others", new Object[] {status.getPath(), ioe});
++                    }
++                }
++            } else {
++                statusSet.add(status);
++            }
++        }
++
++        return statusSet;
++    }
++
++
++    private String serializeState(final long latestListingTime, final Set<FileStatus> statuses) throws JsonGenerationException, JsonMappingException, IOException {
++        // we need to keep track of all files that we pulled in that had a modification time equal to
++        // lastListingTime so that we can avoid pulling those files in again. We can't just ignore any files
++        // that have a mod time equal to that timestamp because more files may come in with the same timestamp
++        // later in the same millisecond.
++        if ( statuses.isEmpty() ) {
++            return null;
++        } else {
++            final List<FileStatus> sortedStatuses = new ArrayList<>(statuses);
++            Collections.sort(sortedStatuses, new Comparator<FileStatus>() {
++                @Override
++                public int compare(final FileStatus o1, final FileStatus o2) {
++                    return Long.compare(o1.getModificationTime(), o2.getModificationTime());
++                }
++            });
++
++            final long latestListingModTime = sortedStatuses.get(sortedStatuses.size() - 1).getModificationTime();
++            final Set<Path> pathsWithModTimeEqualToListingModTime = new HashSet<>();
++            for (int i=sortedStatuses.size() - 1; i >= 0; i--) {
++                final FileStatus status = sortedStatuses.get(i);
++                if (status.getModificationTime() == latestListingModTime) {
++                    pathsWithModTimeEqualToListingModTime.add(status.getPath());
++                }
++            }
++
++            this.latestPathsListed = pathsWithModTimeEqualToListingModTime;
++
++            final HDFSListing listing = new HDFSListing();
++            listing.setLatestTimestamp(new Date(latestListingModTime));
++            final Set<String> paths = new HashSet<>();
++            for ( final Path path : pathsWithModTimeEqualToListingModTime ) {
++                paths.add(path.toUri().toString());
++            }
++            listing.setMatchingPaths(paths);
++
++            final ObjectMapper mapper = new ObjectMapper();
++            final String serializedState = mapper.writerWithType(HDFSListing.class).writeValueAsString(listing);
++            return serializedState;
++        }
++    }
++
++    private void persistLocalState(final String directory, final String serializedState) throws IOException {
++        // we need to keep track of all files that we pulled in that had a modification time equal to
++        // lastListingTime so that we can avoid pulling those files in again. We can't just ignore any files
++        // that have a mod time equal to that timestamp because more files may come in with the same timestamp
++        // later in the same millisecond.
++        final File dir = persistenceFile.getParentFile();
++        if ( !dir.exists() && !dir.mkdirs() ) {
++            throw new IOException("Could not create directory " + dir.getAbsolutePath() + " in order to save local state");
++        }
++
++        final Properties props = new Properties();
++        if ( persistenceFile.exists() ) {
++            try (final FileInputStream fis = new FileInputStream(persistenceFile)) {
++                props.load(fis);
++            }
++        }
++
++        props.setProperty(directory, serializedState);
++
++        try (final FileOutputStream fos = new FileOutputStream(persistenceFile)) {
++            props.store(fos, null);
++        }
++    }
++
++    private String getAbsolutePath(final Path path) {
++        final Path parent = path.getParent();
++        final String prefix = (parent == null || parent.getName().equals("")) ? "" : getAbsolutePath(parent);
++        return prefix + "/" + path.getName();
++    }
++
++    private Map<String, String> createAttributes(final FileStatus status) {
++        final Map<String, String> attributes = new HashMap<>();
++        attributes.put(CoreAttributes.FILENAME.key(), status.getPath().getName());
++        attributes.put(CoreAttributes.PATH.key(), getAbsolutePath(status.getPath().getParent()));
++
++        attributes.put("hdfs.owner", status.getOwner());
++        attributes.put("hdfs.group", status.getGroup());
++        attributes.put("hdfs.lastModified", String.valueOf(status.getModificationTime()));
++        attributes.put("hdfs.length", String.valueOf(status.getLen()));
++        attributes.put("hdfs.replication", String.valueOf(status.getReplication()));
++
++        final FsPermission permission = status.getPermission();
++        final String perms = getPerms(permission.getUserAction()) + getPerms(permission.getGroupAction()) + getPerms(permission.getOtherAction());
++        attributes.put("hdfs.permissions", perms);
++        return attributes;
++    }
++
++    private String getPerms(final FsAction action) {
++        final StringBuilder sb = new StringBuilder();
++        if ( action.implies(FsAction.READ) ) {
++            sb.append("r");
++        } else {
++            sb.append("-");
++        }
++
++        if ( action.implies(FsAction.WRITE) ) {
++            sb.append("w");
++        } else {
++            sb.append("-");
++        }
++
++        if ( action.implies(FsAction.EXECUTE) ) {
++            sb.append("x");
++        } else {
++            sb.append("-");
++        }
++
++        return sb.toString();
++    }
 +}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/dc7f7a82/nifi/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/util/HDFSListing.java
----------------------------------------------------------------------
diff --cc nifi/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/util/HDFSListing.java
index 9f4d68b,0000000..49957f5
mode 100644,000000..100644
--- a/nifi/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/util/HDFSListing.java
+++ b/nifi/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/util/HDFSListing.java
@@@ -1,83 -1,0 +1,83 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one or more
 + * contributor license agreements.  See the NOTICE file distributed with
 + * this work for additional information regarding copyright ownership.
 + * The ASF licenses this file to You under the Apache License, Version 2.0
 + * (the "License"); you may not use this file except in compliance with
 + * the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.nifi.processors.hadoop.util;
 +
 +import java.util.Collection;
 +import java.util.Date;
 +import java.util.HashSet;
 +import java.util.Set;
 +
 +import javax.xml.bind.annotation.XmlTransient;
 +import javax.xml.bind.annotation.XmlType;
 +
 +import org.apache.hadoop.fs.Path;
 +
 +/**
 + * A simple POJO for maintaining state about the last HDFS Listing that was performed so that
-  * we can avoid pulling the same file multiple times 
++ * we can avoid pulling the same file multiple times
 + */
 +@XmlType(name = "listing")
 +public class HDFSListing {
- 	private Date latestTimestamp;
- 	private Collection<String> matchingPaths;
- 	
- 	/**
- 	 * @return the modification date of the newest file that was contained in the HDFS Listing
- 	 */
- 	public Date getLatestTimestamp() {
- 		return latestTimestamp;
- 	}
- 	
- 	/**
- 	 * Sets the timestamp of the modification date of the newest file that was contained in the HDFS Listing
- 	 * 
- 	 * @param latestTimestamp the timestamp of the modification date of the newest file that was contained in the HDFS Listing
- 	 */
- 	public void setLatestTimestamp(Date latestTimestamp) {
- 		this.latestTimestamp = latestTimestamp;
- 	}
- 	
- 	/**
- 	 * @return a Collection containing the paths of all files in the HDFS Listing whose Modification date
- 	 * was equal to {@link #getLatestTimestamp()}
- 	 */
- 	@XmlTransient
- 	public Collection<String> getMatchingPaths() {
- 		return matchingPaths;
- 	}
- 	
- 	/**
- 	 * @return a Collection of {@link Path} objects equivalent to those returned by {@link #getMatchingPaths()}
- 	 */
- 	public Set<Path> toPaths() {
- 		final Set<Path> paths = new HashSet<>(matchingPaths.size());
- 		for ( final String pathname : matchingPaths ) {
- 			paths.add(new Path(pathname));
- 		}
- 		return paths;
- 	}
++    private Date latestTimestamp;
++    private Collection<String> matchingPaths;
 +
- 	/**
- 	 * Sets the Collection containing the paths of all files in the HDFS Listing whose Modification Date was
- 	 * equal to {@link #getLatestTimestamp()}
- 	 * @param matchingPaths
- 	 */
- 	public void setMatchingPaths(Collection<String> matchingPaths) {
- 		this.matchingPaths = matchingPaths;
- 	}
++    /**
++     * @return the modification date of the newest file that was contained in the HDFS Listing
++     */
++    public Date getLatestTimestamp() {
++        return latestTimestamp;
++    }
++
++    /**
++     * Sets the timestamp of the modification date of the newest file that was contained in the HDFS Listing
++     *
++     * @param latestTimestamp the timestamp of the modification date of the newest file that was contained in the HDFS Listing
++     */
++    public void setLatestTimestamp(Date latestTimestamp) {
++        this.latestTimestamp = latestTimestamp;
++    }
++
++    /**
++     * @return a Collection containing the paths of all files in the HDFS Listing whose Modification date
++     * was equal to {@link #getLatestTimestamp()}
++     */
++    @XmlTransient
++    public Collection<String> getMatchingPaths() {
++        return matchingPaths;
++    }
++
++    /**
++     * @return a Collection of {@link Path} objects equivalent to those returned by {@link #getMatchingPaths()}
++     */
++    public Set<Path> toPaths() {
++        final Set<Path> paths = new HashSet<>(matchingPaths.size());
++        for ( final String pathname : matchingPaths ) {
++            paths.add(new Path(pathname));
++        }
++        return paths;
++    }
++
++    /**
++     * Sets the Collection containing the paths of all files in the HDFS Listing whose Modification Date was
++     * equal to {@link #getLatestTimestamp()}
++     * @param matchingPaths the paths that have last modified date matching the latest timestamp
++     */
++    public void setMatchingPaths(Collection<String> matchingPaths) {
++        this.matchingPaths = matchingPaths;
++    }
 +
 +}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/dc7f7a82/nifi/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/util/LongSerDe.java
----------------------------------------------------------------------
diff --cc nifi/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/util/LongSerDe.java
index ef0e590,0000000..229f26c
mode 100644,000000..100644
--- a/nifi/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/util/LongSerDe.java
+++ b/nifi/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/util/LongSerDe.java
@@@ -1,48 -1,0 +1,48 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one or more
 + * contributor license agreements.  See the NOTICE file distributed with
 + * this work for additional information regarding copyright ownership.
 + * The ASF licenses this file to You under the Apache License, Version 2.0
 + * (the "License"); you may not use this file except in compliance with
 + * the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.nifi.processors.hadoop.util;
 +
 +import java.io.ByteArrayInputStream;
 +import java.io.DataInputStream;
 +import java.io.DataOutputStream;
 +import java.io.IOException;
 +import java.io.OutputStream;
 +
 +import org.apache.nifi.distributed.cache.client.Deserializer;
 +import org.apache.nifi.distributed.cache.client.Serializer;
 +import org.apache.nifi.distributed.cache.client.exception.DeserializationException;
 +import org.apache.nifi.distributed.cache.client.exception.SerializationException;
 +
 +public class LongSerDe implements Serializer<Long>, Deserializer<Long> {
 +
- 	@Override
- 	public Long deserialize(final byte[] input) throws DeserializationException, IOException {
- 		if ( input == null || input.length == 0 ) {
- 			return null;
- 		}
- 		
- 		final DataInputStream dis = new DataInputStream(new ByteArrayInputStream(input));
- 		return dis.readLong();
- 	}
++    @Override
++    public Long deserialize(final byte[] input) throws DeserializationException, IOException {
++        if ( input == null || input.length == 0 ) {
++            return null;
++        }
 +
- 	@Override
- 	public void serialize(final Long value, final OutputStream out) throws SerializationException, IOException {
- 		final DataOutputStream dos = new DataOutputStream(out);
- 		dos.writeLong(value);
- 	}
++        final DataInputStream dis = new DataInputStream(new ByteArrayInputStream(input));
++        return dis.readLong();
++    }
++
++    @Override
++    public void serialize(final Long value, final OutputStream out) throws SerializationException, IOException {
++        final DataOutputStream dos = new DataOutputStream(out);
++        dos.writeLong(value);
++    }
 +
 +}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/dc7f7a82/nifi/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/util/StringSerDe.java
----------------------------------------------------------------------
diff --cc nifi/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/util/StringSerDe.java
index 848831f,0000000..ca1c548
mode 100644,000000..100644
--- a/nifi/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/util/StringSerDe.java
+++ b/nifi/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/util/StringSerDe.java
@@@ -1,44 -1,0 +1,44 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one or more
 + * contributor license agreements.  See the NOTICE file distributed with
 + * this work for additional information regarding copyright ownership.
 + * The ASF licenses this file to You under the Apache License, Version 2.0
 + * (the "License"); you may not use this file except in compliance with
 + * the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.nifi.processors.hadoop.util;
 +
 +import java.io.IOException;
 +import java.io.OutputStream;
 +import java.nio.charset.StandardCharsets;
 +
 +import org.apache.nifi.distributed.cache.client.Deserializer;
 +import org.apache.nifi.distributed.cache.client.Serializer;
 +import org.apache.nifi.distributed.cache.client.exception.DeserializationException;
 +import org.apache.nifi.distributed.cache.client.exception.SerializationException;
 +
 +public class StringSerDe implements Serializer<String>, Deserializer<String> {
 +
- 	@Override
- 	public String deserialize(final byte[] value) throws DeserializationException, IOException {
- 		if ( value == null ) {
- 			return null;
- 		}
- 		
- 		return new String(value, StandardCharsets.UTF_8);
- 	}
++    @Override
++    public String deserialize(final byte[] value) throws DeserializationException, IOException {
++        if ( value == null ) {
++            return null;
++        }
 +
- 	@Override
- 	public void serialize(final String value, final OutputStream out) throws SerializationException, IOException {
- 		out.write(value.getBytes(StandardCharsets.UTF_8));
- 	}
++        return new String(value, StandardCharsets.UTF_8);
++    }
++
++    @Override
++    public void serialize(final String value, final OutputStream out) throws SerializationException, IOException {
++        out.write(value.getBytes(StandardCharsets.UTF_8));
++    }
 +
 +}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/dc7f7a82/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestDetectDuplicate.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/dc7f7a82/nifi/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-client-service-api/src/main/java/org/apache/nifi/distributed/cache/client/DistributedMapCacheClient.java
----------------------------------------------------------------------
diff --cc nifi/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-client-service-api/src/main/java/org/apache/nifi/distributed/cache/client/DistributedMapCacheClient.java
index 975dc63,d816e8c..ea3bb63
--- a/nifi/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-client-service-api/src/main/java/org/apache/nifi/distributed/cache/client/DistributedMapCacheClient.java
+++ b/nifi/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-client-service-api/src/main/java/org/apache/nifi/distributed/cache/client/DistributedMapCacheClient.java
@@@ -83,31 -86,15 +86,34 @@@ public interface DistributedMapCacheCli
      <K> boolean containsKey(K key, Serializer<K> keySerializer) throws IOException;
  
      /**
 -     * @param <K> type of key
 -     * @param <V> type of value
 +     * Adds the specified key and value to the cache, overwriting any value that is
 +     * currently set.
-      * 
++     *
++     * @param <K> the key type
++     * @param <V> the value type
 +     * @param key The key to set
 +     * @param value The value to associate with the given Key
 +     * @param keySerializer the Serializer that will be used to serialize the key into bytes
 +     * @param valueSerializer the Serializer that will be used to serialize the value into bytes
-      * 
++     *
 +     * @throws IOException if unable to communicate with the remote instance
 +     * @throws NullPointerException if the key or either serializer is null
 +     */
 +    <K, V> void put(K key, V value, Serializer<K> keySerializer, Serializer<V> valueSerializer) throws IOException;
-     
++
 +    /**
 +     * Returns the value in the cache for the given key, if one exists;
 +     * otherwise returns <code>null</code>
 +     *
-      * @param <K>
-      * @param <V>
++     * @param <K> the key type
++     * @param <V> the value type
       * @param key the key to lookup in the map
-      * @param keySerializer
-      * @param valueDeserializer
+      * @param keySerializer key serializer
+      * @param valueDeserializer value serializer
       *
-      * @return
-      * @throws IOException
+      * @return the value in the cache for the given key, if one exists;
+      * otherwise returns <code>null</code>
+      * @throws IOException ex
       */
      <K, V> V get(K key, Serializer<K> keySerializer, Deserializer<V> valueDeserializer) throws IOException;
  

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/dc7f7a82/nifi/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/DistributedMapCacheClientService.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/dc7f7a82/nifi/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/MapCache.java
----------------------------------------------------------------------
diff --cc nifi/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/MapCache.java
index 8903046,fad0adb..e9c6f1d
--- a/nifi/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/MapCache.java
+++ b/nifi/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/MapCache.java
@@@ -22,9 -22,12 +22,14 @@@ import java.nio.ByteBuffer
  public interface MapCache {
  
      MapPutResult putIfAbsent(ByteBuffer key, ByteBuffer value) throws IOException;
+ 
 +    MapPutResult put(ByteBuffer key, ByteBuffer value) throws IOException;
++
      boolean containsKey(ByteBuffer key) throws IOException;
+ 
      ByteBuffer get(ByteBuffer key) throws IOException;
+ 
      ByteBuffer remove(ByteBuffer key) throws IOException;
+ 
      void shutdown() throws IOException;
  }

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/dc7f7a82/nifi/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/MapCacheServer.java
----------------------------------------------------------------------
diff --cc nifi/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/MapCacheServer.java
index cf8996c,943d6aa..13ed0df
--- a/nifi/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/MapCacheServer.java
+++ b/nifi/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/MapCacheServer.java
@@@ -55,70 -55,63 +55,70 @@@ public class MapCacheServer extends Abs
          final String action = dis.readUTF();
          try {
              switch (action) {
 -                case "close": {
 -                    return false;
 -                }
 -                case "putIfAbsent": {
 -                    final byte[] key = readValue(dis);
 -                    final byte[] value = readValue(dis);
 -                    final MapPutResult putResult = cache.putIfAbsent(ByteBuffer.wrap(key), ByteBuffer.wrap(value));
 -                    dos.writeBoolean(putResult.isSuccessful());
 -                    break;
 -                }
 -                case "containsKey": {
 -                    final byte[] key = readValue(dis);
 -                    final boolean contains = cache.containsKey(ByteBuffer.wrap(key));
 -                    dos.writeBoolean(contains);
 -                    break;
 -                }
 -                case "getAndPutIfAbsent": {
 -                    final byte[] key = readValue(dis);
 -                    final byte[] value = readValue(dis);
 -
 -                    final MapPutResult putResult = cache.putIfAbsent(ByteBuffer.wrap(key), ByteBuffer.wrap(value));
 -                    if (putResult.isSuccessful()) {
 -                        // Put was successful. There was no old value to get.
 -                        dos.writeInt(0);
 -                    } else {
 -                        // we didn't put. Write back the previous value
 -                        final byte[] byteArray = putResult.getExistingValue().array();
 -                        dos.writeInt(byteArray.length);
 -                        dos.write(byteArray);
 -                    }
 -
 -                    break;
 -                }
 -                case "get": {
 -                    final byte[] key = readValue(dis);
 -                    final ByteBuffer existingValue = cache.get(ByteBuffer.wrap(key));
 -                    if (existingValue == null) {
 -                        // there was no existing value; we did a "put".
 -                        dos.writeInt(0);
 -                    } else {
 -                        // a value already existed. we did not update the map
 -                        final byte[] byteArray = existingValue.array();
 -                        dos.writeInt(byteArray.length);
 -                        dos.write(byteArray);
 -                    }
 -
 -                    break;
 -                }
 -                case "remove": {
 -                    final byte[] key = readValue(dis);
 -                    final boolean removed = cache.remove(ByteBuffer.wrap(key)) != null;
 -                    dos.writeBoolean(removed);
 -                    break;
 +            case "close": {
 +                return false;
 +            }
 +            case "putIfAbsent": {
 +                final byte[] key = readValue(dis);
 +                final byte[] value = readValue(dis);
 +                final MapPutResult putResult = cache.putIfAbsent(ByteBuffer.wrap(key), ByteBuffer.wrap(value));
 +                dos.writeBoolean(putResult.isSuccessful());
 +                break;
 +            }
 +            case "put": {
-             	final byte[] key = readValue(dis);
-             	final byte[] value = readValue(dis);
-             	cache.put(ByteBuffer.wrap(key), ByteBuffer.wrap(value));
++                final byte[] key = readValue(dis);
++                final byte[] value = readValue(dis);
++                cache.put(ByteBuffer.wrap(key), ByteBuffer.wrap(value));
 +                dos.writeBoolean(true);
-             	break;
++                break;
 +            }
 +            case "containsKey": {
 +                final byte[] key = readValue(dis);
 +                final boolean contains = cache.containsKey(ByteBuffer.wrap(key));
 +                dos.writeBoolean(contains);
 +                break;
 +            }
 +            case "getAndPutIfAbsent": {
 +                final byte[] key = readValue(dis);
 +                final byte[] value = readValue(dis);
 +
 +                final MapPutResult putResult = cache.putIfAbsent(ByteBuffer.wrap(key), ByteBuffer.wrap(value));
 +                if (putResult.isSuccessful()) {
 +                    // Put was successful. There was no old value to get.
 +                    dos.writeInt(0);
 +                } else {
 +                    // we didn't put. Write back the previous value
 +                    final byte[] byteArray = putResult.getExistingValue().array();
 +                    dos.writeInt(byteArray.length);
 +                    dos.write(byteArray);
                  }
 -                default: {
 -                    throw new IOException("Illegal Request");
 +
 +                break;
 +            }
 +            case "get": {
 +                final byte[] key = readValue(dis);
 +                final ByteBuffer existingValue = cache.get(ByteBuffer.wrap(key));
 +                if (existingValue == null) {
 +                    // there was no existing value; we did a "put".
 +                    dos.writeInt(0);
 +                } else {
 +                    // a value already existed. we did not update the map
 +                    final byte[] byteArray = existingValue.array();
 +                    dos.writeInt(byteArray.length);
 +                    dos.write(byteArray);
                  }
 +
 +                break;
 +            }
 +            case "remove": {
 +                final byte[] key = readValue(dis);
 +                final boolean removed = cache.remove(ByteBuffer.wrap(key)) != null;
 +                dos.writeBoolean(removed);
 +                break;
 +            }
 +            default: {
 +                throw new IOException("Illegal Request");
 +            }
              }
          } finally {
              dos.flush();

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/dc7f7a82/nifi/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/PersistentMapCache.java
----------------------------------------------------------------------
diff --cc nifi/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/PersistentMapCache.java
index 82b1787,e821fbf..663f441
--- a/nifi/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/PersistentMapCache.java
+++ b/nifi/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/PersistentMapCache.java
@@@ -75,33 -75,9 +75,33 @@@ public class PersistentMapCache impleme
                  wali.checkpoint();
              }
          }
-         
+ 
          return putResult;
      }
 +    
 +    @Override
 +    public MapPutResult put(final ByteBuffer key, final ByteBuffer value) throws IOException {
 +    	final MapPutResult putResult = wrapped.put(key, value);
 +        if ( putResult.isSuccessful() ) {
 +            // The put was successful.
 +            final MapWaliRecord record = new MapWaliRecord(UpdateType.CREATE, key, value);
 +            final List<MapWaliRecord> records = new ArrayList<>();
 +            records.add(record);
 +
 +            if ( putResult.getEvictedKey() != null ) {
 +                records.add(new MapWaliRecord(UpdateType.DELETE, putResult.getEvictedKey(), putResult.getEvictedValue()));
 +            }
 +            
 +            wali.update(Collections.singletonList(record), false);
 +            
 +            final long modCount = modifications.getAndIncrement();
 +            if ( modCount > 0 && modCount % 100000 == 0 ) {
 +                wali.checkpoint();
 +            }
 +        }
 +        
 +        return putResult;
 +    }
  
      @Override
      public boolean containsKey(final ByteBuffer key) throws IOException {

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/dc7f7a82/nifi/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/SimpleMapCache.java
----------------------------------------------------------------------
diff --cc nifi/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/SimpleMapCache.java
index d8f9c45,9e8bbd1..b167c62
--- a/nifi/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/SimpleMapCache.java
+++ b/nifi/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/SimpleMapCache.java
@@@ -105,29 -106,7 +106,29 @@@ public class SimpleMapCache implements 
              writeLock.unlock();
          }
      }
-     
+ 
 +
 +    @Override
 +    public MapPutResult put(final ByteBuffer key, final ByteBuffer value) {
 +        writeLock.lock();
 +        try {
-         	// evict if we need to in order to make room for a new entry.
++            // evict if we need to in order to make room for a new entry.
 +            final MapCacheRecord evicted = evict();
 +
 +            final MapCacheRecord record = new MapCacheRecord(key, value);
-         	final MapCacheRecord existing = cache.put(key, record);
-         	inverseCacheMap.put(record, key);
-         	
-         	final ByteBuffer existingValue = (existing == null) ? null : existing.getValue();
-         	final ByteBuffer evictedKey = (evicted == null) ? null : evicted.getKey();
-         	final ByteBuffer evictedValue = (evicted == null) ? null : evicted.getValue();
-         	
-         	return new MapPutResult(true, key, value, existingValue, evictedKey, evictedValue);
++            final MapCacheRecord existing = cache.put(key, record);
++            inverseCacheMap.put(record, key);
++
++            final ByteBuffer existingValue = (existing == null) ? null : existing.getValue();
++            final ByteBuffer evictedKey = (evicted == null) ? null : evicted.getKey();
++            final ByteBuffer evictedValue = (evicted == null) ? null : evicted.getValue();
++
++            return new MapPutResult(true, key, value, existingValue, evictedKey, evictedValue);
 +        } finally {
 +            writeLock.unlock();
 +        }
 +    }
-     
++
      @Override
      public boolean containsKey(final ByteBuffer key) {
          readLock.lock();