You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by tk...@apache.org on 2015/11/19 07:20:53 UTC

[10/24] nifi git commit: NIFI-1054: Fixing Line endings of source code

http://git-wip-us.apache.org/repos/asf/nifi/blob/e2d3d1b7/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/FetchHDFS.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/FetchHDFS.java b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/FetchHDFS.java
index 6434e5e..6b04910 100644
--- a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/FetchHDFS.java
+++ b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/FetchHDFS.java
@@ -1,131 +1,131 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.processors.hadoop;
-
-import java.io.FileNotFoundException;
-import java.io.IOException;
-import java.net.URI;
-import java.util.ArrayList;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.security.AccessControlException;
-import org.apache.nifi.annotation.behavior.InputRequirement;
-import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
-import org.apache.nifi.annotation.behavior.SupportsBatching;
-import org.apache.nifi.annotation.behavior.WritesAttribute;
-import org.apache.nifi.annotation.documentation.CapabilityDescription;
-import org.apache.nifi.annotation.documentation.SeeAlso;
-import org.apache.nifi.annotation.documentation.Tags;
-import org.apache.nifi.components.PropertyDescriptor;
-import org.apache.nifi.flowfile.FlowFile;
-import org.apache.nifi.processor.ProcessContext;
-import org.apache.nifi.processor.ProcessSession;
-import org.apache.nifi.processor.Relationship;
-import org.apache.nifi.processor.exception.ProcessException;
-import org.apache.nifi.processor.util.StandardValidators;
-import org.apache.nifi.util.StopWatch;
-
-@SupportsBatching
-@InputRequirement(Requirement.INPUT_REQUIRED)
-@Tags({"hadoop", "hdfs", "get", "ingest", "fetch", "source"})
-@CapabilityDescription("Retrieves a file from HDFS. The content of the incoming FlowFile is replaced by the content of the file in HDFS. "
-        + "The file in HDFS is left intact without any changes being made to it.")
-@WritesAttribute(attribute="hdfs.failure.reason", description="When a FlowFile is routed to 'failure', this attribute is added indicating why the file could "
-        + "not be fetched from HDFS")
-@SeeAlso({ListHDFS.class, GetHDFS.class, PutHDFS.class})
-public class FetchHDFS extends AbstractHadoopProcessor {
-    static final PropertyDescriptor FILENAME = new PropertyDescriptor.Builder()
-        .name("HDFS Filename")
-        .description("The name of the HDFS file to retrieve")
-        .required(true)
-        .expressionLanguageSupported(true)
-        .defaultValue("${path}/${filename}")
-        .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
-        .build();
-
-    static final Relationship REL_SUCCESS = new Relationship.Builder()
-        .name("success")
-        .description("FlowFiles will be routed to this relationship once they have been updated with the content of the HDFS file")
-        .build();
-    static final Relationship REL_FAILURE = new Relationship.Builder()
-        .name("failure")
-        .description("FlowFiles will be routed to this relationship if the content of the HDFS file cannot be retrieved and trying again will likely not be helpful. "
-                + "This would occur, for instance, if the file is not found or if there is a permissions issue")
-        .build();
-    static final Relationship REL_COMMS_FAILURE = new Relationship.Builder()
-        .name("comms.failure")
-        .description("FlowFiles will be routed to this relationship if the content of the HDFS file cannot be retrieve due to a communications failure. "
-                + "This generally indicates that the Fetch should be tried again.")
-        .build();
-
-    @Override
-    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
-        final List<PropertyDescriptor> properties = new ArrayList<>();
-        properties.add(HADOOP_CONFIGURATION_RESOURCES);
-        properties.add(FILENAME);
-        properties.add(KERBEROS_PRINCIPAL);
-        properties.add(KERBEROS_KEYTAB);
-        properties.add(KERBEROS_RELOGIN_PERIOD);
-        return properties;
-    }
-
-    @Override
-    public Set<Relationship> getRelationships() {
-        final Set<Relationship> relationships = new HashSet<>();
-        relationships.add(REL_SUCCESS);
-        relationships.add(REL_FAILURE);
-        relationships.add(REL_COMMS_FAILURE);
-        return relationships;
-    }
-
-    @Override
-    public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
-        FlowFile flowFile = session.get();
-        if ( flowFile == null ) {
-            return;
-        }
-
-        final FileSystem hdfs = getFileSystem();
-        final Path path = new Path(context.getProperty(FILENAME).evaluateAttributeExpressions(flowFile).getValue());
-        final URI uri = path.toUri();
-
-        final StopWatch stopWatch = new StopWatch(true);
-        try (final FSDataInputStream inStream = hdfs.open(path, 16384)) {
-            flowFile = session.importFrom(inStream, flowFile);
-            stopWatch.stop();
-            getLogger().info("Successfully received content from {} for {} in {}", new Object[] {uri, flowFile, stopWatch.getDuration()});
-            session.getProvenanceReporter().fetch(flowFile, uri.toString(), stopWatch.getDuration(TimeUnit.MILLISECONDS));
-            session.transfer(flowFile, REL_SUCCESS);
-        } catch (final FileNotFoundException | AccessControlException e) {
-            getLogger().error("Failed to retrieve content from {} for {} due to {}; routing to failure", new Object[] {uri, flowFile, e});
-            flowFile = session.putAttribute(flowFile, "hdfs.failure.reason", e.getMessage());
-            flowFile = session.penalize(flowFile);
-            session.transfer(flowFile, REL_FAILURE);
-        } catch (final IOException e) {
-            getLogger().error("Failed to retrieve content from {} for {} due to {}; routing to comms.failure", new Object[] {uri, flowFile, e});
-            flowFile = session.penalize(flowFile);
-            session.transfer(flowFile, REL_COMMS_FAILURE);
-        }
-    }
-
-}
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.hadoop;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.security.AccessControlException;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
+import org.apache.nifi.annotation.behavior.SupportsBatching;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.util.StopWatch;
+
+@SupportsBatching
+@InputRequirement(Requirement.INPUT_REQUIRED)
+@Tags({"hadoop", "hdfs", "get", "ingest", "fetch", "source"})
+@CapabilityDescription("Retrieves a file from HDFS. The content of the incoming FlowFile is replaced by the content of the file in HDFS. "
+        + "The file in HDFS is left intact without any changes being made to it.")
+@WritesAttribute(attribute="hdfs.failure.reason", description="When a FlowFile is routed to 'failure', this attribute is added indicating why the file could "
+        + "not be fetched from HDFS")
+@SeeAlso({ListHDFS.class, GetHDFS.class, PutHDFS.class})
+public class FetchHDFS extends AbstractHadoopProcessor {
+    static final PropertyDescriptor FILENAME = new PropertyDescriptor.Builder()
+        .name("HDFS Filename")
+        .description("The name of the HDFS file to retrieve")
+        .required(true)
+        .expressionLanguageSupported(true)
+        .defaultValue("${path}/${filename}")
+        .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+        .build();
+
+    static final Relationship REL_SUCCESS = new Relationship.Builder()
+        .name("success")
+        .description("FlowFiles will be routed to this relationship once they have been updated with the content of the HDFS file")
+        .build();
+    static final Relationship REL_FAILURE = new Relationship.Builder()
+        .name("failure")
+        .description("FlowFiles will be routed to this relationship if the content of the HDFS file cannot be retrieved and trying again will likely not be helpful. "
+                + "This would occur, for instance, if the file is not found or if there is a permissions issue")
+        .build();
+    static final Relationship REL_COMMS_FAILURE = new Relationship.Builder()
+        .name("comms.failure")
+        .description("FlowFiles will be routed to this relationship if the content of the HDFS file cannot be retrieve due to a communications failure. "
+                + "This generally indicates that the Fetch should be tried again.")
+        .build();
+
+    @Override
+    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        final List<PropertyDescriptor> properties = new ArrayList<>();
+        properties.add(HADOOP_CONFIGURATION_RESOURCES);
+        properties.add(FILENAME);
+        properties.add(KERBEROS_PRINCIPAL);
+        properties.add(KERBEROS_KEYTAB);
+        properties.add(KERBEROS_RELOGIN_PERIOD);
+        return properties;
+    }
+
+    @Override
+    public Set<Relationship> getRelationships() {
+        final Set<Relationship> relationships = new HashSet<>();
+        relationships.add(REL_SUCCESS);
+        relationships.add(REL_FAILURE);
+        relationships.add(REL_COMMS_FAILURE);
+        return relationships;
+    }
+
+    @Override
+    public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
+        FlowFile flowFile = session.get();
+        if ( flowFile == null ) {
+            return;
+        }
+
+        final FileSystem hdfs = getFileSystem();
+        final Path path = new Path(context.getProperty(FILENAME).evaluateAttributeExpressions(flowFile).getValue());
+        final URI uri = path.toUri();
+
+        final StopWatch stopWatch = new StopWatch(true);
+        try (final FSDataInputStream inStream = hdfs.open(path, 16384)) {
+            flowFile = session.importFrom(inStream, flowFile);
+            stopWatch.stop();
+            getLogger().info("Successfully received content from {} for {} in {}", new Object[] {uri, flowFile, stopWatch.getDuration()});
+            session.getProvenanceReporter().fetch(flowFile, uri.toString(), stopWatch.getDuration(TimeUnit.MILLISECONDS));
+            session.transfer(flowFile, REL_SUCCESS);
+        } catch (final FileNotFoundException | AccessControlException e) {
+            getLogger().error("Failed to retrieve content from {} for {} due to {}; routing to failure", new Object[] {uri, flowFile, e});
+            flowFile = session.putAttribute(flowFile, "hdfs.failure.reason", e.getMessage());
+            flowFile = session.penalize(flowFile);
+            session.transfer(flowFile, REL_FAILURE);
+        } catch (final IOException e) {
+            getLogger().error("Failed to retrieve content from {} for {} due to {}; routing to comms.failure", new Object[] {uri, flowFile, e});
+            flowFile = session.penalize(flowFile);
+            session.transfer(flowFile, REL_COMMS_FAILURE);
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/e2d3d1b7/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/ListHDFS.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/ListHDFS.java b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/ListHDFS.java
index 0fae4ca..ea2d397 100644
--- a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/ListHDFS.java
+++ b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/ListHDFS.java
@@ -1,487 +1,487 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.processors.hadoop;
-
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.FileOutputStream;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.Date;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
-import java.util.Set;
-
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.permission.FsAction;
-import org.apache.hadoop.fs.permission.FsPermission;
-import org.apache.nifi.annotation.behavior.InputRequirement;
-import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
-import org.apache.nifi.annotation.behavior.TriggerSerially;
-import org.apache.nifi.annotation.behavior.TriggerWhenEmpty;
-import org.apache.nifi.annotation.behavior.WritesAttribute;
-import org.apache.nifi.annotation.behavior.WritesAttributes;
-import org.apache.nifi.annotation.documentation.CapabilityDescription;
-import org.apache.nifi.annotation.documentation.SeeAlso;
-import org.apache.nifi.annotation.documentation.Tags;
-import org.apache.nifi.annotation.notification.OnPrimaryNodeStateChange;
-import org.apache.nifi.annotation.notification.PrimaryNodeState;
-import org.apache.nifi.components.PropertyDescriptor;
-import org.apache.nifi.distributed.cache.client.DistributedMapCacheClient;
-import org.apache.nifi.flowfile.FlowFile;
-import org.apache.nifi.flowfile.attributes.CoreAttributes;
-import org.apache.nifi.processor.ProcessContext;
-import org.apache.nifi.processor.ProcessSession;
-import org.apache.nifi.processor.ProcessorInitializationContext;
-import org.apache.nifi.processor.Relationship;
-import org.apache.nifi.processor.exception.ProcessException;
-import org.apache.nifi.processor.util.StandardValidators;
-import org.apache.nifi.processors.hadoop.util.HDFSListing;
-import org.apache.nifi.processors.hadoop.util.StringSerDe;
-import org.codehaus.jackson.JsonGenerationException;
-import org.codehaus.jackson.JsonNode;
-import org.codehaus.jackson.JsonParseException;
-import org.codehaus.jackson.map.JsonMappingException;
-import org.codehaus.jackson.map.ObjectMapper;
-
-
-@TriggerSerially
-@TriggerWhenEmpty
-@InputRequirement(Requirement.INPUT_FORBIDDEN)
-@Tags({"hadoop", "HDFS", "get", "list", "ingest", "source", "filesystem"})
-@CapabilityDescription("Retrieves a listing of files from HDFS. For each file that is listed in HDFS, creates a FlowFile that represents "
-        + "the HDFS file so that it can be fetched in conjunction with ListHDFS. This Processor is designed to run on Primary Node only "
-        + "in a cluster. If the primary node changes, the new Primary Node will pick up where the previous node left off without duplicating "
-        + "all of the data. Unlike GetHDFS, this Processor does not delete any data from HDFS.")
-@WritesAttributes({
-    @WritesAttribute(attribute="filename", description="The name of the file that was read from HDFS."),
-    @WritesAttribute(attribute="path", description="The path is set to the absolute path of the file's directory on HDFS. For example, if the Directory property is set to /tmp, "
-            + "then files picked up from /tmp will have the path attribute set to \"./\". If the Recurse Subdirectories property is set to true and a file is picked up "
-            + "from /tmp/abc/1/2/3, then the path attribute will be set to \"/tmp/abc/1/2/3\"."),
-    @WritesAttribute(attribute="hdfs.owner", description="The user that owns the file in HDFS"),
-    @WritesAttribute(attribute="hdfs.group", description="The group that owns the file in HDFS"),
-    @WritesAttribute(attribute="hdfs.lastModified", description="The timestamp of when the file in HDFS was last modified, as milliseconds since midnight Jan 1, 1970 UTC"),
-    @WritesAttribute(attribute="hdfs.length", description="The number of bytes in the file in HDFS"),
-    @WritesAttribute(attribute="hdfs.replication", description="The number of HDFS replicas for hte file"),
-    @WritesAttribute(attribute="hdfs.permissions", description="The permissions for the file in HDFS. This is formatted as 3 characters for the owner, "
-            + "3 for the group, and 3 for other users. For example rw-rw-r--")
-})
-@SeeAlso({GetHDFS.class, FetchHDFS.class, PutHDFS.class})
-public class ListHDFS extends AbstractHadoopProcessor {
-    public static final PropertyDescriptor DISTRIBUTED_CACHE_SERVICE = new PropertyDescriptor.Builder()
-        .name("Distributed Cache Service")
-        .description("Specifies the Controller Service that should be used to maintain state about what has been pulled from HDFS so that if a new node "
-                + "begins pulling data, it won't duplicate all of the work that has been done.")
-        .required(true)
-        .identifiesControllerService(DistributedMapCacheClient.class)
-        .build();
-
-    public static final PropertyDescriptor DIRECTORY = new PropertyDescriptor.Builder()
-        .name(DIRECTORY_PROP_NAME)
-        .description("The HDFS directory from which files should be read")
-        .required(true)
-        .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
-        .build();
-
-    public static final PropertyDescriptor RECURSE_SUBDIRS = new PropertyDescriptor.Builder()
-        .name("Recurse Subdirectories")
-        .description("Indicates whether to list files from subdirectories of the HDFS directory")
-        .required(true)
-        .allowableValues("true", "false")
-        .defaultValue("true")
-        .build();
-
-
-    public static final Relationship REL_SUCCESS = new Relationship.Builder()
-        .name("success")
-        .description("All FlowFiles are transferred to this relationship")
-        .build();
-
-    private volatile Long lastListingTime = null;
-    private volatile Set<Path> latestPathsListed = new HashSet<>();
-    private volatile boolean electedPrimaryNode = false;
-
-    @Override
-    protected void init(final ProcessorInitializationContext context) {
-        super.init(context);
-    }
-
-    protected File getPersistenceFile() {
-        return new File("conf/state/" + getIdentifier());
-    }
-
-    @Override
-    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
-        final List<PropertyDescriptor> properties = new ArrayList<>();
-        properties.add(HADOOP_CONFIGURATION_RESOURCES);
-        properties.add(DISTRIBUTED_CACHE_SERVICE);
-        properties.add(DIRECTORY);
-        properties.add(RECURSE_SUBDIRS);
-        properties.add(KERBEROS_PRINCIPAL);
-        properties.add(KERBEROS_KEYTAB);
-        properties.add(KERBEROS_RELOGIN_PERIOD);
-        return properties;
-    }
-
-    @Override
-    public Set<Relationship> getRelationships() {
-        final Set<Relationship> relationships = new HashSet<>();
-        relationships.add(REL_SUCCESS);
-        return relationships;
-    }
-
-    protected String getKey(final String directory) {
-        return getIdentifier() + ".lastListingTime." + directory;
-    }
-
-    @OnPrimaryNodeStateChange
-    public void onPrimaryNodeChange(final PrimaryNodeState newState) {
-        if ( newState == PrimaryNodeState.ELECTED_PRIMARY_NODE ) {
-            electedPrimaryNode = true;
-        }
-    }
-
-    @Override
-    public void onPropertyModified(final PropertyDescriptor descriptor, final String oldValue, final String newValue) {
-        if ( descriptor.equals(DIRECTORY) ) {
-            lastListingTime = null; // clear lastListingTime so that we have to fetch new time
-            latestPathsListed = new HashSet<>();
-        }
-    }
-
-    private HDFSListing deserialize(final String serializedState) throws JsonParseException, JsonMappingException, IOException {
-        final ObjectMapper mapper = new ObjectMapper();
-        final JsonNode jsonNode = mapper.readTree(serializedState);
-        return mapper.readValue(jsonNode, HDFSListing.class);
-    }
-
-
-    private Long getMinTimestamp(final String directory, final DistributedMapCacheClient client) throws IOException {
-        // Determine the timestamp for the last file that we've listed.
-        Long minTimestamp = lastListingTime;
-        if ( minTimestamp == null || electedPrimaryNode ) {
-            // We haven't yet restored any state from local or distributed state - or it's been at least a minute since
-            // we have performed a listing. In this case,
-            // First, attempt to get timestamp from distributed cache service.
-            try {
-                final StringSerDe serde = new StringSerDe();
-                final String serializedState = client.get(getKey(directory), serde, serde);
-                if ( serializedState == null || serializedState.isEmpty() ) {
-                    minTimestamp = null;
-                    this.latestPathsListed = Collections.emptySet();
-                } else {
-                    final HDFSListing listing = deserialize(serializedState);
-                    this.lastListingTime = listing.getLatestTimestamp().getTime();
-                    minTimestamp = listing.getLatestTimestamp().getTime();
-                    this.latestPathsListed = listing.toPaths();
-                }
-
-                this.lastListingTime = minTimestamp;
-                electedPrimaryNode = false; // no requirement to pull an update from the distributed cache anymore.
-            } catch (final IOException ioe) {
-                throw ioe;
-            }
-
-            // Check the persistence file. We want to use the latest timestamp that we have so that
-            // we don't duplicate data.
-            try {
-                final File persistenceFile = getPersistenceFile();
-                if ( persistenceFile.exists() ) {
-                    try (final FileInputStream fis = new FileInputStream(persistenceFile)) {
-                        final Properties props = new Properties();
-                        props.load(fis);
-
-                        // get the local timestamp for this directory, if it exists.
-                        final String locallyPersistedValue = props.getProperty(directory);
-                        if ( locallyPersistedValue != null ) {
-                            final HDFSListing listing = deserialize(locallyPersistedValue);
-                            final long localTimestamp = listing.getLatestTimestamp().getTime();
-
-                            // If distributed state doesn't have an entry or the local entry is later than the distributed state,
-                            // update the distributed state so that we are in sync.
-                            if (minTimestamp == null || localTimestamp > minTimestamp) {
-                                minTimestamp = localTimestamp;
-
-                                // Our local persistence file shows a later time than the Distributed service.
-                                // Update the distributed service to match our local state.
-                                try {
-                                    final StringSerDe serde = new StringSerDe();
-                                    client.put(getKey(directory), locallyPersistedValue, serde, serde);
-                                } catch (final IOException ioe) {
-                                    getLogger().warn("Local timestamp for {} is {}, which is later than Distributed state but failed to update Distributed "
-                                            + "state due to {}. If a new node performs HDFS Listing, data duplication may occur",
-                                            new Object[] {directory, locallyPersistedValue, ioe});
-                                }
-                            }
-                        }
-                    }
-                }
-            } catch (final IOException ioe) {
-                getLogger().warn("Failed to recover local state due to {}. Assuming that the state from the distributed cache is correct.", ioe);
-            }
-        }
-
-        return minTimestamp;
-    }
-
-    @Override
-    public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
-        final String directory = context.getProperty(DIRECTORY).getValue();
-        final DistributedMapCacheClient client = context.getProperty(DISTRIBUTED_CACHE_SERVICE).asControllerService(DistributedMapCacheClient.class);
-
-        final Long minTimestamp;
-        try {
-            minTimestamp = getMinTimestamp(directory, client);
-        } catch (final IOException ioe) {
-            getLogger().error("Failed to retrieve timestamp of last listing from Distributed Cache Service. Will not perform listing until this is accomplished.");
-            context.yield();
-            return;
-        }
-
-        // Pull in any file that is newer than the timestamp that we have.
-        final FileSystem hdfs = getFileSystem();
-        final boolean recursive = context.getProperty(RECURSE_SUBDIRS).asBoolean();
-        final Path rootPath = new Path(directory);
-
-        int listCount = 0;
-        Long latestListingModTime = null;
-        final Set<FileStatus> statuses;
-        try {
-            statuses = getStatuses(rootPath, recursive, hdfs);
-            for ( final FileStatus status : statuses ) {
-                // don't get anything where the last modified timestamp is equal to our current timestamp.
-                // if we do, then we run the risk of multiple files having the same last mod date but us only
-                // seeing a portion of them.
-                // I.e., there could be 5 files with last mod date = (now). But if we do the listing now, maybe
-                // only 2 exist and 3 more will exist later in this millisecond. So we ignore anything with a
-                // modified date not before the current time.
-                final long fileModTime = status.getModificationTime();
-
-                // we only want the file if its timestamp is later than the minTimestamp or equal to and we didn't pull it last time.
-                // Also, HDFS creates files with the suffix _COPYING_ when they are being written - we want to ignore those.
-                boolean fetch = !status.getPath().getName().endsWith("_COPYING_")
-                        && (minTimestamp == null || fileModTime > minTimestamp || (fileModTime == minTimestamp && !latestPathsListed.contains(status.getPath())));
-
-                // Create the FlowFile for this path.
-                if ( fetch ) {
-                    final Map<String, String> attributes = createAttributes(status);
-                    FlowFile flowFile = session.create();
-                    flowFile = session.putAllAttributes(flowFile, attributes);
-                    session.transfer(flowFile, REL_SUCCESS);
-                    listCount++;
-
-                    if ( latestListingModTime == null || fileModTime > latestListingModTime ) {
-                        latestListingModTime = fileModTime;
-                    }
-                }
-            }
-        } catch (final IOException ioe) {
-            getLogger().error("Failed to perform listing of HDFS due to {}", new Object[] {ioe});
-            return;
-        }
-
-        if ( listCount > 0 ) {
-            getLogger().info("Successfully created listing with {} new files from HDFS", new Object[] {listCount});
-            session.commit();
-
-            // We have performed a listing and pushed the FlowFiles out.
-            // Now, we need to persist state about the Last Modified timestamp of the newest file
-            // that we pulled in. We do this in order to avoid pulling in the same file twice.
-            // However, we want to save the state both locally and remotely.
-            // We store the state remotely so that if a new Primary Node is chosen, it can pick up where the
-            // previously Primary Node left off.
-            // We also store the state locally so that if the node is restarted, and the node cannot contact
-            // the distributed state cache, the node can continue to run (if it is primary node).
-            String serializedState = null;
-            try {
-                serializedState = serializeState(latestListingModTime, statuses);
-            } catch (final Exception e) {
-                getLogger().error("Failed to serialize state due to {}", new Object[] {e});
-            }
-
-            if ( serializedState != null ) {
-                // Save our state locally.
-                try {
-                    persistLocalState(directory, serializedState);
-                } catch (final IOException ioe) {
-                    getLogger().warn("Unable to save state locally. If the node is restarted now, data may be duplicated. Failure is due to {}", ioe);
-                }
-
-                // Attempt to save state to remote server.
-                try {
-                    client.put(getKey(directory), serializedState, new StringSerDe(), new StringSerDe());
-                } catch (final IOException ioe) {
-                    getLogger().warn("Unable to communicate with distributed cache server due to {}. Persisting state locally instead.", ioe);
-                }
-            }
-
-            lastListingTime = latestListingModTime;
-        } else {
-            getLogger().debug("There is no data to list. Yielding.");
-            context.yield();
-
-            // lastListingTime = 0 so that we don't continually poll the distributed cache / local file system
-            if ( lastListingTime == null ) {
-                lastListingTime = 0L;
-            }
-
-            return;
-        }
-    }
-
-    private Set<FileStatus> getStatuses(final Path path, final boolean recursive, final FileSystem hdfs) throws IOException {
-        final Set<FileStatus> statusSet = new HashSet<>();
-
-        final FileStatus[] statuses = hdfs.listStatus(path);
-
-        for ( final FileStatus status : statuses ) {
-            if ( status.isDirectory() ) {
-                if ( recursive ) {
-                    try {
-                        statusSet.addAll(getStatuses(status.getPath(), recursive, hdfs));
-                    } catch (final IOException ioe) {
-                        getLogger().error("Failed to retrieve HDFS listing for subdirectory {} due to {}; will continue listing others", new Object[] {status.getPath(), ioe});
-                    }
-                }
-            } else {
-                statusSet.add(status);
-            }
-        }
-
-        return statusSet;
-    }
-
-
-    private String serializeState(final long latestListingTime, final Set<FileStatus> statuses) throws JsonGenerationException, JsonMappingException, IOException {
-        // we need to keep track of all files that we pulled in that had a modification time equal to
-        // lastListingTime so that we can avoid pulling those files in again. We can't just ignore any files
-        // that have a mod time equal to that timestamp because more files may come in with the same timestamp
-        // later in the same millisecond.
-        if ( statuses.isEmpty() ) {
-            return null;
-        } else {
-            final List<FileStatus> sortedStatuses = new ArrayList<>(statuses);
-            Collections.sort(sortedStatuses, new Comparator<FileStatus>() {
-                @Override
-                public int compare(final FileStatus o1, final FileStatus o2) {
-                    return Long.compare(o1.getModificationTime(), o2.getModificationTime());
-                }
-            });
-
-            final long latestListingModTime = sortedStatuses.get(sortedStatuses.size() - 1).getModificationTime();
-            final Set<Path> pathsWithModTimeEqualToListingModTime = new HashSet<>();
-            for (int i=sortedStatuses.size() - 1; i >= 0; i--) {
-                final FileStatus status = sortedStatuses.get(i);
-                if (status.getModificationTime() == latestListingModTime) {
-                    pathsWithModTimeEqualToListingModTime.add(status.getPath());
-                }
-            }
-
-            this.latestPathsListed = pathsWithModTimeEqualToListingModTime;
-
-            final HDFSListing listing = new HDFSListing();
-            listing.setLatestTimestamp(new Date(latestListingModTime));
-            final Set<String> paths = new HashSet<>();
-            for ( final Path path : pathsWithModTimeEqualToListingModTime ) {
-                paths.add(path.toUri().toString());
-            }
-            listing.setMatchingPaths(paths);
-
-            final ObjectMapper mapper = new ObjectMapper();
-            final String serializedState = mapper.writerWithType(HDFSListing.class).writeValueAsString(listing);
-            return serializedState;
-        }
-    }
-
-    protected void persistLocalState(final String directory, final String serializedState) throws IOException {
-        // we need to keep track of all files that we pulled in that had a modification time equal to
-        // lastListingTime so that we can avoid pulling those files in again. We can't just ignore any files
-        // that have a mod time equal to that timestamp because more files may come in with the same timestamp
-        // later in the same millisecond.
-        final File persistenceFile = getPersistenceFile();
-        final File dir = persistenceFile.getParentFile();
-        if ( !dir.exists() && !dir.mkdirs() ) {
-            throw new IOException("Could not create directory " + dir.getAbsolutePath() + " in order to save local state");
-        }
-
-        final Properties props = new Properties();
-        if ( persistenceFile.exists() ) {
-            try (final FileInputStream fis = new FileInputStream(persistenceFile)) {
-                props.load(fis);
-            }
-        }
-
-        props.setProperty(directory, serializedState);
-
-        try (final FileOutputStream fos = new FileOutputStream(persistenceFile)) {
-            props.store(fos, null);
-        }
-    }
-
-    private String getAbsolutePath(final Path path) {
-        final Path parent = path.getParent();
-        final String prefix = (parent == null || parent.getName().equals("")) ? "" : getAbsolutePath(parent);
-        return prefix + "/" + path.getName();
-    }
-
-    private Map<String, String> createAttributes(final FileStatus status) {
-        final Map<String, String> attributes = new HashMap<>();
-        attributes.put(CoreAttributes.FILENAME.key(), status.getPath().getName());
-        attributes.put(CoreAttributes.PATH.key(), getAbsolutePath(status.getPath().getParent()));
-
-        attributes.put("hdfs.owner", status.getOwner());
-        attributes.put("hdfs.group", status.getGroup());
-        attributes.put("hdfs.lastModified", String.valueOf(status.getModificationTime()));
-        attributes.put("hdfs.length", String.valueOf(status.getLen()));
-        attributes.put("hdfs.replication", String.valueOf(status.getReplication()));
-
-        final FsPermission permission = status.getPermission();
-        final String perms = getPerms(permission.getUserAction()) + getPerms(permission.getGroupAction()) + getPerms(permission.getOtherAction());
-        attributes.put("hdfs.permissions", perms);
-        return attributes;
-    }
-
-    private String getPerms(final FsAction action) {
-        final StringBuilder sb = new StringBuilder();
-        if ( action.implies(FsAction.READ) ) {
-            sb.append("r");
-        } else {
-            sb.append("-");
-        }
-
-        if ( action.implies(FsAction.WRITE) ) {
-            sb.append("w");
-        } else {
-            sb.append("-");
-        }
-
-        if ( action.implies(FsAction.EXECUTE) ) {
-            sb.append("x");
-        } else {
-            sb.append("-");
-        }
-
-        return sb.toString();
-    }
-}
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.hadoop;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsAction;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
+import org.apache.nifi.annotation.behavior.TriggerSerially;
+import org.apache.nifi.annotation.behavior.TriggerWhenEmpty;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.notification.OnPrimaryNodeStateChange;
+import org.apache.nifi.annotation.notification.PrimaryNodeState;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.distributed.cache.client.DistributedMapCacheClient;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.ProcessorInitializationContext;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.processors.hadoop.util.HDFSListing;
+import org.apache.nifi.processors.hadoop.util.StringSerDe;
+import org.codehaus.jackson.JsonGenerationException;
+import org.codehaus.jackson.JsonNode;
+import org.codehaus.jackson.JsonParseException;
+import org.codehaus.jackson.map.JsonMappingException;
+import org.codehaus.jackson.map.ObjectMapper;
+
+
+@TriggerSerially
+@TriggerWhenEmpty
+@InputRequirement(Requirement.INPUT_FORBIDDEN)
+@Tags({"hadoop", "HDFS", "get", "list", "ingest", "source", "filesystem"})
+@CapabilityDescription("Retrieves a listing of files from HDFS. For each file that is listed in HDFS, creates a FlowFile that represents "
+        + "the HDFS file so that it can be fetched in conjunction with ListHDFS. This Processor is designed to run on Primary Node only "
+        + "in a cluster. If the primary node changes, the new Primary Node will pick up where the previous node left off without duplicating "
+        + "all of the data. Unlike GetHDFS, this Processor does not delete any data from HDFS.")
+@WritesAttributes({
+    @WritesAttribute(attribute="filename", description="The name of the file that was read from HDFS."),
+    @WritesAttribute(attribute="path", description="The path is set to the absolute path of the file's directory on HDFS. For example, if the Directory property is set to /tmp, "
+            + "then files picked up from /tmp will have the path attribute set to \"./\". If the Recurse Subdirectories property is set to true and a file is picked up "
+            + "from /tmp/abc/1/2/3, then the path attribute will be set to \"/tmp/abc/1/2/3\"."),
+    @WritesAttribute(attribute="hdfs.owner", description="The user that owns the file in HDFS"),
+    @WritesAttribute(attribute="hdfs.group", description="The group that owns the file in HDFS"),
+    @WritesAttribute(attribute="hdfs.lastModified", description="The timestamp of when the file in HDFS was last modified, as milliseconds since midnight Jan 1, 1970 UTC"),
+    @WritesAttribute(attribute="hdfs.length", description="The number of bytes in the file in HDFS"),
+    @WritesAttribute(attribute="hdfs.replication", description="The number of HDFS replicas for hte file"),
+    @WritesAttribute(attribute="hdfs.permissions", description="The permissions for the file in HDFS. This is formatted as 3 characters for the owner, "
+            + "3 for the group, and 3 for other users. For example rw-rw-r--")
+})
+@SeeAlso({GetHDFS.class, FetchHDFS.class, PutHDFS.class})
+public class ListHDFS extends AbstractHadoopProcessor {
+    public static final PropertyDescriptor DISTRIBUTED_CACHE_SERVICE = new PropertyDescriptor.Builder()
+        .name("Distributed Cache Service")
+        .description("Specifies the Controller Service that should be used to maintain state about what has been pulled from HDFS so that if a new node "
+                + "begins pulling data, it won't duplicate all of the work that has been done.")
+        .required(true)
+        .identifiesControllerService(DistributedMapCacheClient.class)
+        .build();
+
+    public static final PropertyDescriptor DIRECTORY = new PropertyDescriptor.Builder()
+        .name(DIRECTORY_PROP_NAME)
+        .description("The HDFS directory from which files should be read")
+        .required(true)
+        .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+        .build();
+
+    public static final PropertyDescriptor RECURSE_SUBDIRS = new PropertyDescriptor.Builder()
+        .name("Recurse Subdirectories")
+        .description("Indicates whether to list files from subdirectories of the HDFS directory")
+        .required(true)
+        .allowableValues("true", "false")
+        .defaultValue("true")
+        .build();
+
+
+    public static final Relationship REL_SUCCESS = new Relationship.Builder()
+        .name("success")
+        .description("All FlowFiles are transferred to this relationship")
+        .build();
+
+    private volatile Long lastListingTime = null;
+    private volatile Set<Path> latestPathsListed = new HashSet<>();
+    private volatile boolean electedPrimaryNode = false;
+
+    @Override
+    protected void init(final ProcessorInitializationContext context) {
+        super.init(context);
+    }
+
+    protected File getPersistenceFile() {
+        return new File("conf/state/" + getIdentifier());
+    }
+
+    @Override
+    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        final List<PropertyDescriptor> properties = new ArrayList<>();
+        properties.add(HADOOP_CONFIGURATION_RESOURCES);
+        properties.add(DISTRIBUTED_CACHE_SERVICE);
+        properties.add(DIRECTORY);
+        properties.add(RECURSE_SUBDIRS);
+        properties.add(KERBEROS_PRINCIPAL);
+        properties.add(KERBEROS_KEYTAB);
+        properties.add(KERBEROS_RELOGIN_PERIOD);
+        return properties;
+    }
+
+    @Override
+    public Set<Relationship> getRelationships() {
+        final Set<Relationship> relationships = new HashSet<>();
+        relationships.add(REL_SUCCESS);
+        return relationships;
+    }
+
+    protected String getKey(final String directory) {
+        return getIdentifier() + ".lastListingTime." + directory;
+    }
+
+    @OnPrimaryNodeStateChange
+    public void onPrimaryNodeChange(final PrimaryNodeState newState) {
+        if ( newState == PrimaryNodeState.ELECTED_PRIMARY_NODE ) {
+            electedPrimaryNode = true;
+        }
+    }
+
+    @Override
+    public void onPropertyModified(final PropertyDescriptor descriptor, final String oldValue, final String newValue) {
+        if ( descriptor.equals(DIRECTORY) ) {
+            lastListingTime = null; // clear lastListingTime so that we have to fetch new time
+            latestPathsListed = new HashSet<>();
+        }
+    }
+
+    private HDFSListing deserialize(final String serializedState) throws JsonParseException, JsonMappingException, IOException {
+        final ObjectMapper mapper = new ObjectMapper();
+        final JsonNode jsonNode = mapper.readTree(serializedState);
+        return mapper.readValue(jsonNode, HDFSListing.class);
+    }
+
+
+    private Long getMinTimestamp(final String directory, final DistributedMapCacheClient client) throws IOException {
+        // Determine the timestamp for the last file that we've listed.
+        Long minTimestamp = lastListingTime;
+        if ( minTimestamp == null || electedPrimaryNode ) {
+            // We haven't yet restored any state from local or distributed state - or it's been at least a minute since
+            // we have performed a listing. In this case,
+            // First, attempt to get timestamp from distributed cache service.
+            try {
+                final StringSerDe serde = new StringSerDe();
+                final String serializedState = client.get(getKey(directory), serde, serde);
+                if ( serializedState == null || serializedState.isEmpty() ) {
+                    minTimestamp = null;
+                    this.latestPathsListed = Collections.emptySet();
+                } else {
+                    final HDFSListing listing = deserialize(serializedState);
+                    this.lastListingTime = listing.getLatestTimestamp().getTime();
+                    minTimestamp = listing.getLatestTimestamp().getTime();
+                    this.latestPathsListed = listing.toPaths();
+                }
+
+                this.lastListingTime = minTimestamp;
+                electedPrimaryNode = false; // no requirement to pull an update from the distributed cache anymore.
+            } catch (final IOException ioe) {
+                throw ioe;
+            }
+
+            // Check the persistence file. We want to use the latest timestamp that we have so that
+            // we don't duplicate data.
+            try {
+                final File persistenceFile = getPersistenceFile();
+                if ( persistenceFile.exists() ) {
+                    try (final FileInputStream fis = new FileInputStream(persistenceFile)) {
+                        final Properties props = new Properties();
+                        props.load(fis);
+
+                        // get the local timestamp for this directory, if it exists.
+                        final String locallyPersistedValue = props.getProperty(directory);
+                        if ( locallyPersistedValue != null ) {
+                            final HDFSListing listing = deserialize(locallyPersistedValue);
+                            final long localTimestamp = listing.getLatestTimestamp().getTime();
+
+                            // If distributed state doesn't have an entry or the local entry is later than the distributed state,
+                            // update the distributed state so that we are in sync.
+                            if (minTimestamp == null || localTimestamp > minTimestamp) {
+                                minTimestamp = localTimestamp;
+
+                                // Our local persistence file shows a later time than the Distributed service.
+                                // Update the distributed service to match our local state.
+                                try {
+                                    final StringSerDe serde = new StringSerDe();
+                                    client.put(getKey(directory), locallyPersistedValue, serde, serde);
+                                } catch (final IOException ioe) {
+                                    getLogger().warn("Local timestamp for {} is {}, which is later than Distributed state but failed to update Distributed "
+                                            + "state due to {}. If a new node performs HDFS Listing, data duplication may occur",
+                                            new Object[] {directory, locallyPersistedValue, ioe});
+                                }
+                            }
+                        }
+                    }
+                }
+            } catch (final IOException ioe) {
+                getLogger().warn("Failed to recover local state due to {}. Assuming that the state from the distributed cache is correct.", ioe);
+            }
+        }
+
+        return minTimestamp;
+    }
+
+    @Override
+    public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
+        final String directory = context.getProperty(DIRECTORY).getValue();
+        final DistributedMapCacheClient client = context.getProperty(DISTRIBUTED_CACHE_SERVICE).asControllerService(DistributedMapCacheClient.class);
+
+        final Long minTimestamp;
+        try {
+            minTimestamp = getMinTimestamp(directory, client);
+        } catch (final IOException ioe) {
+            getLogger().error("Failed to retrieve timestamp of last listing from Distributed Cache Service. Will not perform listing until this is accomplished.");
+            context.yield();
+            return;
+        }
+
+        // Pull in any file that is newer than the timestamp that we have.
+        final FileSystem hdfs = getFileSystem();
+        final boolean recursive = context.getProperty(RECURSE_SUBDIRS).asBoolean();
+        final Path rootPath = new Path(directory);
+
+        int listCount = 0;
+        Long latestListingModTime = null;
+        final Set<FileStatus> statuses;
+        try {
+            statuses = getStatuses(rootPath, recursive, hdfs);
+            for ( final FileStatus status : statuses ) {
+                // don't get anything where the last modified timestamp is equal to our current timestamp.
+                // if we do, then we run the risk of multiple files having the same last mod date but us only
+                // seeing a portion of them.
+                // I.e., there could be 5 files with last mod date = (now). But if we do the listing now, maybe
+                // only 2 exist and 3 more will exist later in this millisecond. So we ignore anything with a
+                // modified date not before the current time.
+                final long fileModTime = status.getModificationTime();
+
+                // we only want the file if its timestamp is later than the minTimestamp or equal to and we didn't pull it last time.
+                // Also, HDFS creates files with the suffix _COPYING_ when they are being written - we want to ignore those.
+                boolean fetch = !status.getPath().getName().endsWith("_COPYING_")
+                        && (minTimestamp == null || fileModTime > minTimestamp || (fileModTime == minTimestamp && !latestPathsListed.contains(status.getPath())));
+
+                // Create the FlowFile for this path.
+                if ( fetch ) {
+                    final Map<String, String> attributes = createAttributes(status);
+                    FlowFile flowFile = session.create();
+                    flowFile = session.putAllAttributes(flowFile, attributes);
+                    session.transfer(flowFile, REL_SUCCESS);
+                    listCount++;
+
+                    if ( latestListingModTime == null || fileModTime > latestListingModTime ) {
+                        latestListingModTime = fileModTime;
+                    }
+                }
+            }
+        } catch (final IOException ioe) {
+            getLogger().error("Failed to perform listing of HDFS due to {}", new Object[] {ioe});
+            return;
+        }
+
+        if ( listCount > 0 ) {
+            getLogger().info("Successfully created listing with {} new files from HDFS", new Object[] {listCount});
+            session.commit();
+
+            // We have performed a listing and pushed the FlowFiles out.
+            // Now, we need to persist state about the Last Modified timestamp of the newest file
+            // that we pulled in. We do this in order to avoid pulling in the same file twice.
+            // However, we want to save the state both locally and remotely.
+            // We store the state remotely so that if a new Primary Node is chosen, it can pick up where the
+            // previously Primary Node left off.
+            // We also store the state locally so that if the node is restarted, and the node cannot contact
+            // the distributed state cache, the node can continue to run (if it is primary node).
+            String serializedState = null;
+            try {
+                serializedState = serializeState(latestListingModTime, statuses);
+            } catch (final Exception e) {
+                getLogger().error("Failed to serialize state due to {}", new Object[] {e});
+            }
+
+            if ( serializedState != null ) {
+                // Save our state locally.
+                try {
+                    persistLocalState(directory, serializedState);
+                } catch (final IOException ioe) {
+                    getLogger().warn("Unable to save state locally. If the node is restarted now, data may be duplicated. Failure is due to {}", ioe);
+                }
+
+                // Attempt to save state to remote server.
+                try {
+                    client.put(getKey(directory), serializedState, new StringSerDe(), new StringSerDe());
+                } catch (final IOException ioe) {
+                    getLogger().warn("Unable to communicate with distributed cache server due to {}. Persisting state locally instead.", ioe);
+                }
+            }
+
+            lastListingTime = latestListingModTime;
+        } else {
+            getLogger().debug("There is no data to list. Yielding.");
+            context.yield();
+
+            // lastListingTime = 0 so that we don't continually poll the distributed cache / local file system
+            if ( lastListingTime == null ) {
+                lastListingTime = 0L;
+            }
+
+            return;
+        }
+    }
+
+    private Set<FileStatus> getStatuses(final Path path, final boolean recursive, final FileSystem hdfs) throws IOException {
+        final Set<FileStatus> statusSet = new HashSet<>();
+
+        final FileStatus[] statuses = hdfs.listStatus(path);
+
+        for ( final FileStatus status : statuses ) {
+            if ( status.isDirectory() ) {
+                if ( recursive ) {
+                    try {
+                        statusSet.addAll(getStatuses(status.getPath(), recursive, hdfs));
+                    } catch (final IOException ioe) {
+                        getLogger().error("Failed to retrieve HDFS listing for subdirectory {} due to {}; will continue listing others", new Object[] {status.getPath(), ioe});
+                    }
+                }
+            } else {
+                statusSet.add(status);
+            }
+        }
+
+        return statusSet;
+    }
+
+
+    private String serializeState(final long latestListingTime, final Set<FileStatus> statuses) throws JsonGenerationException, JsonMappingException, IOException {
+        // we need to keep track of all files that we pulled in that had a modification time equal to
+        // lastListingTime so that we can avoid pulling those files in again. We can't just ignore any files
+        // that have a mod time equal to that timestamp because more files may come in with the same timestamp
+        // later in the same millisecond.
+        if ( statuses.isEmpty() ) {
+            return null;
+        } else {
+            final List<FileStatus> sortedStatuses = new ArrayList<>(statuses);
+            Collections.sort(sortedStatuses, new Comparator<FileStatus>() {
+                @Override
+                public int compare(final FileStatus o1, final FileStatus o2) {
+                    return Long.compare(o1.getModificationTime(), o2.getModificationTime());
+                }
+            });
+
+            final long latestListingModTime = sortedStatuses.get(sortedStatuses.size() - 1).getModificationTime();
+            final Set<Path> pathsWithModTimeEqualToListingModTime = new HashSet<>();
+            for (int i=sortedStatuses.size() - 1; i >= 0; i--) {
+                final FileStatus status = sortedStatuses.get(i);
+                if (status.getModificationTime() == latestListingModTime) {
+                    pathsWithModTimeEqualToListingModTime.add(status.getPath());
+                }
+            }
+
+            this.latestPathsListed = pathsWithModTimeEqualToListingModTime;
+
+            final HDFSListing listing = new HDFSListing();
+            listing.setLatestTimestamp(new Date(latestListingModTime));
+            final Set<String> paths = new HashSet<>();
+            for ( final Path path : pathsWithModTimeEqualToListingModTime ) {
+                paths.add(path.toUri().toString());
+            }
+            listing.setMatchingPaths(paths);
+
+            final ObjectMapper mapper = new ObjectMapper();
+            final String serializedState = mapper.writerWithType(HDFSListing.class).writeValueAsString(listing);
+            return serializedState;
+        }
+    }
+
+    protected void persistLocalState(final String directory, final String serializedState) throws IOException {
+        // we need to keep track of all files that we pulled in that had a modification time equal to
+        // lastListingTime so that we can avoid pulling those files in again. We can't just ignore any files
+        // that have a mod time equal to that timestamp because more files may come in with the same timestamp
+        // later in the same millisecond.
+        final File persistenceFile = getPersistenceFile();
+        final File dir = persistenceFile.getParentFile();
+        if ( !dir.exists() && !dir.mkdirs() ) {
+            throw new IOException("Could not create directory " + dir.getAbsolutePath() + " in order to save local state");
+        }
+
+        final Properties props = new Properties();
+        if ( persistenceFile.exists() ) {
+            try (final FileInputStream fis = new FileInputStream(persistenceFile)) {
+                props.load(fis);
+            }
+        }
+
+        props.setProperty(directory, serializedState);
+
+        try (final FileOutputStream fos = new FileOutputStream(persistenceFile)) {
+            props.store(fos, null);
+        }
+    }
+
+    private String getAbsolutePath(final Path path) {
+        final Path parent = path.getParent();
+        final String prefix = (parent == null || parent.getName().equals("")) ? "" : getAbsolutePath(parent);
+        return prefix + "/" + path.getName();
+    }
+
+    private Map<String, String> createAttributes(final FileStatus status) {
+        final Map<String, String> attributes = new HashMap<>();
+        attributes.put(CoreAttributes.FILENAME.key(), status.getPath().getName());
+        attributes.put(CoreAttributes.PATH.key(), getAbsolutePath(status.getPath().getParent()));
+
+        attributes.put("hdfs.owner", status.getOwner());
+        attributes.put("hdfs.group", status.getGroup());
+        attributes.put("hdfs.lastModified", String.valueOf(status.getModificationTime()));
+        attributes.put("hdfs.length", String.valueOf(status.getLen()));
+        attributes.put("hdfs.replication", String.valueOf(status.getReplication()));
+
+        final FsPermission permission = status.getPermission();
+        final String perms = getPerms(permission.getUserAction()) + getPerms(permission.getGroupAction()) + getPerms(permission.getOtherAction());
+        attributes.put("hdfs.permissions", perms);
+        return attributes;
+    }
+
+    private String getPerms(final FsAction action) {
+        final StringBuilder sb = new StringBuilder();
+        if ( action.implies(FsAction.READ) ) {
+            sb.append("r");
+        } else {
+            sb.append("-");
+        }
+
+        if ( action.implies(FsAction.WRITE) ) {
+            sb.append("w");
+        } else {
+            sb.append("-");
+        }
+
+        if ( action.implies(FsAction.EXECUTE) ) {
+            sb.append("x");
+        } else {
+            sb.append("-");
+        }
+
+        return sb.toString();
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/e2d3d1b7/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/util/HDFSListing.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/util/HDFSListing.java b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/util/HDFSListing.java
index 49957f5..a4d957a 100644
--- a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/util/HDFSListing.java
+++ b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/util/HDFSListing.java
@@ -1,83 +1,83 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.processors.hadoop.util;
-
-import java.util.Collection;
-import java.util.Date;
-import java.util.HashSet;
-import java.util.Set;
-
-import javax.xml.bind.annotation.XmlTransient;
-import javax.xml.bind.annotation.XmlType;
-
-import org.apache.hadoop.fs.Path;
-
-/**
- * A simple POJO for maintaining state about the last HDFS Listing that was performed so that
- * we can avoid pulling the same file multiple times
- */
-@XmlType(name = "listing")
-public class HDFSListing {
-    private Date latestTimestamp;
-    private Collection<String> matchingPaths;
-
-    /**
-     * @return the modification date of the newest file that was contained in the HDFS Listing
-     */
-    public Date getLatestTimestamp() {
-        return latestTimestamp;
-    }
-
-    /**
-     * Sets the timestamp of the modification date of the newest file that was contained in the HDFS Listing
-     *
-     * @param latestTimestamp the timestamp of the modification date of the newest file that was contained in the HDFS Listing
-     */
-    public void setLatestTimestamp(Date latestTimestamp) {
-        this.latestTimestamp = latestTimestamp;
-    }
-
-    /**
-     * @return a Collection containing the paths of all files in the HDFS Listing whose Modification date
-     * was equal to {@link #getLatestTimestamp()}
-     */
-    @XmlTransient
-    public Collection<String> getMatchingPaths() {
-        return matchingPaths;
-    }
-
-    /**
-     * @return a Collection of {@link Path} objects equivalent to those returned by {@link #getMatchingPaths()}
-     */
-    public Set<Path> toPaths() {
-        final Set<Path> paths = new HashSet<>(matchingPaths.size());
-        for ( final String pathname : matchingPaths ) {
-            paths.add(new Path(pathname));
-        }
-        return paths;
-    }
-
-    /**
-     * Sets the Collection containing the paths of all files in the HDFS Listing whose Modification Date was
-     * equal to {@link #getLatestTimestamp()}
-     * @param matchingPaths the paths that have last modified date matching the latest timestamp
-     */
-    public void setMatchingPaths(Collection<String> matchingPaths) {
-        this.matchingPaths = matchingPaths;
-    }
-
-}
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.hadoop.util;
+
+import java.util.Collection;
+import java.util.Date;
+import java.util.HashSet;
+import java.util.Set;
+
+import javax.xml.bind.annotation.XmlTransient;
+import javax.xml.bind.annotation.XmlType;
+
+import org.apache.hadoop.fs.Path;
+
+/**
+ * A simple POJO for maintaining state about the last HDFS Listing that was performed so that
+ * we can avoid pulling the same file multiple times
+ */
+@XmlType(name = "listing")
+public class HDFSListing {
+    private Date latestTimestamp;
+    private Collection<String> matchingPaths;
+
+    /**
+     * @return the modification date of the newest file that was contained in the HDFS Listing
+     */
+    public Date getLatestTimestamp() {
+        return latestTimestamp;
+    }
+
+    /**
+     * Sets the timestamp of the modification date of the newest file that was contained in the HDFS Listing
+     *
+     * @param latestTimestamp the timestamp of the modification date of the newest file that was contained in the HDFS Listing
+     */
+    public void setLatestTimestamp(Date latestTimestamp) {
+        this.latestTimestamp = latestTimestamp;
+    }
+
+    /**
+     * @return a Collection containing the paths of all files in the HDFS Listing whose Modification date
+     * was equal to {@link #getLatestTimestamp()}
+     */
+    @XmlTransient
+    public Collection<String> getMatchingPaths() {
+        return matchingPaths;
+    }
+
+    /**
+     * @return a Collection of {@link Path} objects equivalent to those returned by {@link #getMatchingPaths()}
+     */
+    public Set<Path> toPaths() {
+        final Set<Path> paths = new HashSet<>(matchingPaths.size());
+        for ( final String pathname : matchingPaths ) {
+            paths.add(new Path(pathname));
+        }
+        return paths;
+    }
+
+    /**
+     * Sets the Collection containing the paths of all files in the HDFS Listing whose Modification Date was
+     * equal to {@link #getLatestTimestamp()}
+     * @param matchingPaths the paths that have last modified date matching the latest timestamp
+     */
+    public void setMatchingPaths(Collection<String> matchingPaths) {
+        this.matchingPaths = matchingPaths;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/e2d3d1b7/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/util/LongSerDe.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/util/LongSerDe.java b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/util/LongSerDe.java
index 229f26c..17cacd9 100644
--- a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/util/LongSerDe.java
+++ b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/util/LongSerDe.java
@@ -1,48 +1,48 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.processors.hadoop.util;
-
-import java.io.ByteArrayInputStream;
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.io.IOException;
-import java.io.OutputStream;
-
-import org.apache.nifi.distributed.cache.client.Deserializer;
-import org.apache.nifi.distributed.cache.client.Serializer;
-import org.apache.nifi.distributed.cache.client.exception.DeserializationException;
-import org.apache.nifi.distributed.cache.client.exception.SerializationException;
-
-public class LongSerDe implements Serializer<Long>, Deserializer<Long> {
-
-    @Override
-    public Long deserialize(final byte[] input) throws DeserializationException, IOException {
-        if ( input == null || input.length == 0 ) {
-            return null;
-        }
-
-        final DataInputStream dis = new DataInputStream(new ByteArrayInputStream(input));
-        return dis.readLong();
-    }
-
-    @Override
-    public void serialize(final Long value, final OutputStream out) throws SerializationException, IOException {
-        final DataOutputStream dos = new DataOutputStream(out);
-        dos.writeLong(value);
-    }
-
-}
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.hadoop.util;
+
+import java.io.ByteArrayInputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+
+import org.apache.nifi.distributed.cache.client.Deserializer;
+import org.apache.nifi.distributed.cache.client.Serializer;
+import org.apache.nifi.distributed.cache.client.exception.DeserializationException;
+import org.apache.nifi.distributed.cache.client.exception.SerializationException;
+
+public class LongSerDe implements Serializer<Long>, Deserializer<Long> {
+
+    @Override
+    public Long deserialize(final byte[] input) throws DeserializationException, IOException {
+        if ( input == null || input.length == 0 ) {
+            return null;
+        }
+
+        final DataInputStream dis = new DataInputStream(new ByteArrayInputStream(input));
+        return dis.readLong();
+    }
+
+    @Override
+    public void serialize(final Long value, final OutputStream out) throws SerializationException, IOException {
+        final DataOutputStream dos = new DataOutputStream(out);
+        dos.writeLong(value);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/e2d3d1b7/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/util/StringSerDe.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/util/StringSerDe.java b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/util/StringSerDe.java
index ca1c548..2a52c4d 100644
--- a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/util/StringSerDe.java
+++ b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/util/StringSerDe.java
@@ -1,44 +1,44 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.processors.hadoop.util;
-
-import java.io.IOException;
-import java.io.OutputStream;
-import java.nio.charset.StandardCharsets;
-
-import org.apache.nifi.distributed.cache.client.Deserializer;
-import org.apache.nifi.distributed.cache.client.Serializer;
-import org.apache.nifi.distributed.cache.client.exception.DeserializationException;
-import org.apache.nifi.distributed.cache.client.exception.SerializationException;
-
-public class StringSerDe implements Serializer<String>, Deserializer<String> {
-
-    @Override
-    public String deserialize(final byte[] value) throws DeserializationException, IOException {
-        if ( value == null ) {
-            return null;
-        }
-
-        return new String(value, StandardCharsets.UTF_8);
-    }
-
-    @Override
-    public void serialize(final String value, final OutputStream out) throws SerializationException, IOException {
-        out.write(value.getBytes(StandardCharsets.UTF_8));
-    }
-
-}
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.hadoop.util;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.charset.StandardCharsets;
+
+import org.apache.nifi.distributed.cache.client.Deserializer;
+import org.apache.nifi.distributed.cache.client.Serializer;
+import org.apache.nifi.distributed.cache.client.exception.DeserializationException;
+import org.apache.nifi.distributed.cache.client.exception.SerializationException;
+
+public class StringSerDe implements Serializer<String>, Deserializer<String> {
+
+    @Override
+    public String deserialize(final byte[] value) throws DeserializationException, IOException {
+        if ( value == null ) {
+            return null;
+        }
+
+        return new String(value, StandardCharsets.UTF_8);
+    }
+
+    @Override
+    public void serialize(final String value, final OutputStream out) throws SerializationException, IOException {
+        out.write(value.getBytes(StandardCharsets.UTF_8));
+    }
+
+}