You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ma...@apache.org on 2015/04/29 22:14:24 UTC
[02/13] incubator-nifi git commit: NIFI-533: Fixed code to conform to
checkstyle
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/dc7f7a82/nifi/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/GetHDFS.java
----------------------------------------------------------------------
diff --cc nifi/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/GetHDFS.java
index 1dd5b91,361f1ed..f7894d9
--- a/nifi/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/GetHDFS.java
+++ b/nifi/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/GetHDFS.java
@@@ -58,13 -58,18 +58,15 @@@ import org.apache.nifi.processor.except
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.util.StopWatch;
-/**
- * This processor reads files from HDFS into NiFi FlowFiles.
- */
@TriggerWhenEmpty
@Tags({"hadoop", "HDFS", "get", "fetch", "ingest", "source", "filesystem"})
-@CapabilityDescription("Fetch files from Hadoop Distributed File System (HDFS) into FlowFiles")
+@CapabilityDescription("Fetch files from Hadoop Distributed File System (HDFS) into FlowFiles. This Processor will delete the file from HDFS after fetching it.")
@WritesAttributes({
- @WritesAttribute(attribute = "filename", description = "The name of the file that was read from HDFS."),
- @WritesAttribute(attribute = "path", description = "The path is set to the relative path of the file's directory on HDFS. For example, if the Directory property is set to /tmp, then files picked up from /tmp will have the path attribute set to \"./\". If the Recurse Subdirectories property is set to true and a file is picked up from /tmp/abc/1/2/3, then the path attribute will be set to \"abc/1/2/3\".") })
+ @WritesAttribute(attribute = "filename", description = "The name of the file that was read from HDFS."),
- @WritesAttribute(attribute = "path", description = "The path is set to the relative path of the file's directory on HDFS. For example, if "
- + "the Directory property is set to /tmp, then files picked up from /tmp will have the path attribute set to \"./\". If the Recurse "
- + "Subdirectories property is set to true and a file is picked up from /tmp/abc/1/2/3, then the path attribute will be set to \"abc/1/2/3\".")})
-@SeeAlso(PutHDFS.class)
++ @WritesAttribute(attribute = "path", description = "The path is set to the relative path of the file's directory on HDFS. For example, if the Directory property "
++ + "is set to /tmp, then files picked up from /tmp will have the path attribute set to \"./\". If the Recurse Subdirectories property is set to true and "
++ + "a file is picked up from /tmp/abc/1/2/3, then the path attribute will be set to \"abc/1/2/3\".") })
+@SeeAlso({PutHDFS.class, ListHDFS.class})
public class GetHDFS extends AbstractHadoopProcessor {
public static final String BUFFER_SIZE_KEY = "io.file.buffer.size";
@@@ -73,105 -78,101 +75,101 @@@
// relationships
public static final Relationship REL_SUCCESS = new Relationship.Builder()
-- .name("success")
-- .description("All files retrieved from HDFS are transferred to this relationship")
-- .build();
++ .name("success")
++ .description("All files retrieved from HDFS are transferred to this relationship")
++ .build();
public static final Relationship REL_PASSTHROUGH = new Relationship.Builder()
-- .name("passthrough")
-- .description(
-- "If this processor has an input queue for some reason, then FlowFiles arriving on that input are transferred to this relationship")
++ .name("passthrough")
++ .description(
++ "If this processor has an input queue for some reason, then FlowFiles arriving on that input are transferred to this relationship")
.build();
// properties
public static final PropertyDescriptor DIRECTORY = new PropertyDescriptor.Builder()
-- .name(DIRECTORY_PROP_NAME)
-- .description("The HDFS directory from which files should be read")
-- .required(true)
-- .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
-- .build();
++ .name(DIRECTORY_PROP_NAME)
++ .description("The HDFS directory from which files should be read")
++ .required(true)
++ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
++ .build();
public static final PropertyDescriptor RECURSE_SUBDIRS = new PropertyDescriptor.Builder()
-- .name("Recurse Subdirectories")
-- .description("Indicates whether to pull files from subdirectories of the HDFS directory")
-- .required(true)
-- .allowableValues("true", "false")
-- .defaultValue("true")
-- .build();
++ .name("Recurse Subdirectories")
++ .description("Indicates whether to pull files from subdirectories of the HDFS directory")
++ .required(true)
++ .allowableValues("true", "false")
++ .defaultValue("true")
++ .build();
public static final PropertyDescriptor KEEP_SOURCE_FILE = new PropertyDescriptor.Builder()
-- .name("Keep Source File")
- .description("Determines whether to delete the file from HDFS after it has been successfully transferred. If true, the file will be fetched repeatedly. This is intended for testing only.")
- .description("Determines whether to delete the file from HDFS after it has been successfully transferred")
-- .required(true)
-- .allowableValues("true", "false")
-- .defaultValue("false")
-- .build();
++ .name("Keep Source File")
++ .description("Determines whether to delete the file from HDFS after it has been successfully transferred. If true, the file will be fetched repeatedly. This is intended for testing only.")
++ .required(true)
++ .allowableValues("true", "false")
++ .defaultValue("false")
++ .build();
public static final PropertyDescriptor FILE_FILTER_REGEX = new PropertyDescriptor.Builder()
-- .name("File Filter Regex")
- .description(
- "A Java Regular Expression for filtering Filenames; if a filter is supplied then only files whose names match that Regular Expression will be fetched, otherwise all files will be fetched")
- .description("A Java Regular Expression for filtering Filenames; if a filter is supplied then only files whose names match that Regular "
- + "Expression will be fetched, otherwise all files will be fetched")
++ .name("File Filter Regex")
++ .description("A Java Regular Expression for filtering Filenames; if a filter is supplied then only files whose names match that Regular "
++ + "Expression will be fetched, otherwise all files will be fetched")
.required(false)
.addValidator(StandardValidators.REGULAR_EXPRESSION_VALIDATOR)
.build();
public static final PropertyDescriptor FILTER_MATCH_NAME_ONLY = new PropertyDescriptor.Builder()
-- .name("Filter Match Name Only")
- .description(
- "If true then File Filter Regex will match on just the filename, otherwise subdirectory names will be included with filename in the regex comparison")
- .description("If true then File Filter Regex will match on just the filename, otherwise subdirectory names will be included with filename "
- + "in the regex comparison")
++ .name("Filter Match Name Only")
++ .description("If true then File Filter Regex will match on just the filename, otherwise subdirectory names will be included with filename "
++ + "in the regex comparison")
.required(true)
.allowableValues("true", "false")
.defaultValue("true")
.build();
public static final PropertyDescriptor IGNORE_DOTTED_FILES = new PropertyDescriptor.Builder()
-- .name("Ignore Dotted Files")
-- .description("If true, files whose names begin with a dot (\".\") will be ignored")
-- .required(true)
-- .allowableValues("true", "false")
-- .defaultValue("true")
-- .build();
++ .name("Ignore Dotted Files")
++ .description("If true, files whose names begin with a dot (\".\") will be ignored")
++ .required(true)
++ .allowableValues("true", "false")
++ .defaultValue("true")
++ .build();
public static final PropertyDescriptor MIN_AGE = new PropertyDescriptor.Builder()
-- .name("Minimum File Age")
- .description(
- "The minimum age that a file must be in order to be pulled; any file younger than this amount of time (based on last modification date) will be ignored")
- .description("The minimum age that a file must be in order to be pulled; any file younger than this amount of time (based on last modification date) will be ignored")
-- .required(true)
- .addValidator(
- StandardValidators.createTimePeriodValidator(0, TimeUnit.MILLISECONDS, Long.MAX_VALUE, TimeUnit.NANOSECONDS))
- .addValidator(StandardValidators.createTimePeriodValidator(0, TimeUnit.MILLISECONDS, Long.MAX_VALUE, TimeUnit.NANOSECONDS))
-- .defaultValue("0 sec")
-- .build();
++ .name("Minimum File Age")
++ .description("The minimum age that a file must be in order to be pulled; any file younger than this amount of time (based on last modification date) will be ignored")
++ .required(true)
++ .addValidator(StandardValidators.createTimePeriodValidator(0, TimeUnit.MILLISECONDS, Long.MAX_VALUE, TimeUnit.NANOSECONDS))
++ .defaultValue("0 sec")
++ .build();
public static final PropertyDescriptor MAX_AGE = new PropertyDescriptor.Builder()
-- .name("Maximum File Age")
- .description(
- "The maximum age that a file must be in order to be pulled; any file older than this amount of time (based on last modification date) will be ignored")
- .description("The maximum age that a file must be in order to be pulled; any file older than this amount of time (based on last modification date) will be ignored")
-- .required(false)
- .addValidator(
- StandardValidators.createTimePeriodValidator(100, TimeUnit.MILLISECONDS, Long.MAX_VALUE, TimeUnit.NANOSECONDS))
- .addValidator(StandardValidators.createTimePeriodValidator(100, TimeUnit.MILLISECONDS, Long.MAX_VALUE, TimeUnit.NANOSECONDS))
-- .build();
++ .name("Maximum File Age")
++ .description("The maximum age that a file must be in order to be pulled; any file older than this amount of time (based on last modification date) will be ignored")
++ .required(false)
++ .addValidator(StandardValidators.createTimePeriodValidator(100, TimeUnit.MILLISECONDS, Long.MAX_VALUE, TimeUnit.NANOSECONDS))
++ .build();
public static final PropertyDescriptor BATCH_SIZE = new PropertyDescriptor.Builder()
-- .name("Batch Size")
-- .description("The maximum number of files to pull in each iteration, based on run schedule.")
-- .required(true)
-- .defaultValue("100")
-- .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
-- .build();
++ .name("Batch Size")
++ .description("The maximum number of files to pull in each iteration, based on run schedule.")
++ .required(true)
++ .defaultValue("100")
++ .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
++ .build();
public static final PropertyDescriptor POLLING_INTERVAL = new PropertyDescriptor.Builder()
-- .name("Polling Interval")
-- .description("Indicates how long to wait between performing directory listings")
-- .required(true)
-- .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
-- .defaultValue("0 sec")
-- .build();
++ .name("Polling Interval")
++ .description("Indicates how long to wait between performing directory listings")
++ .required(true)
++ .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
++ .defaultValue("0 sec")
++ .build();
public static final PropertyDescriptor BUFFER_SIZE = new PropertyDescriptor.Builder()
-- .name("IO Buffer Size")
-- .description("Amount of memory to use to buffer file contents during IO. This overrides the Hadoop Configuration")
-- .addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
-- .build();
++ .name("IO Buffer Size")
++ .description("Amount of memory to use to buffer file contents during IO. This overrides the Hadoop Configuration")
++ .addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
++ .build();
private static final Set<Relationship> relationships;
protected static final List<PropertyDescriptor> localProperties;
@@@ -461,11 -463,35 +460,8 @@@
return files;
}
-
-
/**
- * Holder for a snapshot in time of some processor properties that are
- * passed around.
- * Returns the relative path of the child that does not include the filename or the root path.
- *
- * @param root root
- * @param child child
- * @return the relative path of the child that does not include the filename or the root path
- */
- public static String getPathDifference(final Path root, final Path child) {
- final int depthDiff = child.depth() - root.depth();
- if (depthDiff <= 1) {
- return "".intern();
- }
- String lastRoot = root.getName();
- Path childsParent = child.getParent();
- final StringBuilder builder = new StringBuilder();
- builder.append(childsParent.getName());
- for (int i = (depthDiff - 3); i >= 0; i--) {
- childsParent = childsParent.getParent();
- String name = childsParent.getName();
- if (name.equals(lastRoot) && childsParent.toString().endsWith(root.toString())) {
- break;
- }
- builder.insert(0, Path.SEPARATOR).insert(0, name);
- }
- return builder.toString();
- }
-
- /**
+ * Holder for a snapshot in time of some processor properties that are passed around.
*/
protected static class ProcessorConfiguration {
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/dc7f7a82/nifi/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/ListHDFS.java
----------------------------------------------------------------------
diff --cc nifi/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/ListHDFS.java
index 707b50d,0000000..56a128a
mode 100644,000000..100644
--- a/nifi/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/ListHDFS.java
+++ b/nifi/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/ListHDFS.java
@@@ -1,466 -1,0 +1,469 @@@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.hadoop;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsAction;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.nifi.annotation.behavior.TriggerSerially;
+import org.apache.nifi.annotation.behavior.TriggerWhenEmpty;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.notification.OnPrimaryNodeStateChange;
+import org.apache.nifi.annotation.notification.PrimaryNodeState;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.distributed.cache.client.DistributedMapCacheClient;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.ProcessorInitializationContext;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.processors.hadoop.util.HDFSListing;
+import org.apache.nifi.processors.hadoop.util.StringSerDe;
+import org.codehaus.jackson.JsonGenerationException;
+import org.codehaus.jackson.JsonNode;
+import org.codehaus.jackson.JsonParseException;
+import org.codehaus.jackson.map.JsonMappingException;
+import org.codehaus.jackson.map.ObjectMapper;
+
+
+@TriggerSerially
+@TriggerWhenEmpty
+@Tags({"hadoop", "HDFS", "get", "list", "ingest", "source", "filesystem"})
+@CapabilityDescription("Retrieves a listing of files from HDFS. For each file that is listed in HDFS, creates a FlowFile that represents "
- + "the HDFS file so that it can be fetched in conjunction with ListHDFS. This Processor is designed to run on Primary Node only "
- + "in a cluster. If the primary node changes, the new Primary Node will pick up where the previous node left off without duplicating "
- + "all of the data. Unlike GetHDFS, this Processor does not delete any data from HDFS.")
++ + "the HDFS file so that it can be fetched in conjunction with ListHDFS. This Processor is designed to run on Primary Node only "
++ + "in a cluster. If the primary node changes, the new Primary Node will pick up where the previous node left off without duplicating "
++ + "all of the data. Unlike GetHDFS, this Processor does not delete any data from HDFS.")
+@WritesAttributes({
- @WritesAttribute(attribute="filename", description="The name of the file that was read from HDFS."),
- @WritesAttribute(attribute="path", description="The path is set to the absolute path of the file's directory on HDFS. For example, if the Directory property is set to /tmp, then files picked up from /tmp will have the path attribute set to \"./\". If the Recurse Subdirectories property is set to true and a file is picked up from /tmp/abc/1/2/3, then the path attribute will be set to \"/tmp/abc/1/2/3\"."),
- @WritesAttribute(attribute="hdfs.owner", description="The user that owns the file in HDFS"),
- @WritesAttribute(attribute="hdfs.group", description="The group that owns the file in HDFS"),
- @WritesAttribute(attribute="hdfs.lastModified", description="The timestamp of when the file in HDFS was last modified, as milliseconds since midnight Jan 1, 1970 UTC"),
- @WritesAttribute(attribute="hdfs.length", description="The number of bytes in the file in HDFS"),
- @WritesAttribute(attribute="hdfs.replication", description="The number of HDFS replicas for hte file"),
- @WritesAttribute(attribute="hdfs.permissions", description="The permissions for the file in HDFS. This is formatted as 3 characters for the owner, 3 for the group, and 3 for other users. For example rw-rw-r--")
++ @WritesAttribute(attribute="filename", description="The name of the file that was read from HDFS."),
++ @WritesAttribute(attribute="path", description="The path is set to the absolute path of the file's directory on HDFS. For example, if the Directory property is set to /tmp, "
++ + "then files picked up from /tmp will have the path attribute set to \"./\". If the Recurse Subdirectories property is set to true and a file is picked up "
++ + "from /tmp/abc/1/2/3, then the path attribute will be set to \"/tmp/abc/1/2/3\"."),
++ @WritesAttribute(attribute="hdfs.owner", description="The user that owns the file in HDFS"),
++ @WritesAttribute(attribute="hdfs.group", description="The group that owns the file in HDFS"),
++ @WritesAttribute(attribute="hdfs.lastModified", description="The timestamp of when the file in HDFS was last modified, as milliseconds since midnight Jan 1, 1970 UTC"),
++ @WritesAttribute(attribute="hdfs.length", description="The number of bytes in the file in HDFS"),
++ @WritesAttribute(attribute="hdfs.replication", description="The number of HDFS replicas for hte file"),
++ @WritesAttribute(attribute="hdfs.permissions", description="The permissions for the file in HDFS. This is formatted as 3 characters for the owner, "
++ + "3 for the group, and 3 for other users. For example rw-rw-r--")
+})
+@SeeAlso({GetHDFS.class, FetchHDFS.class, PutHDFS.class})
+public class ListHDFS extends AbstractHadoopProcessor {
-
- public static final PropertyDescriptor DISTRIBUTED_CACHE_SERVICE = new PropertyDescriptor.Builder()
- .name("Distributed Cache Service")
- .description("Specifies the Controller Service that should be used to maintain state about what has been pulled from HDFS so that if a new node begins pulling data, it won't duplicate all of the work that has been done.")
- .required(true)
- .identifiesControllerService(DistributedMapCacheClient.class)
- .build();
++ public static final PropertyDescriptor DISTRIBUTED_CACHE_SERVICE = new PropertyDescriptor.Builder()
++ .name("Distributed Cache Service")
++ .description("Specifies the Controller Service that should be used to maintain state about what has been pulled from HDFS so that if a new node "
++ + "begins pulling data, it won't duplicate all of the work that has been done.")
++ .required(true)
++ .identifiesControllerService(DistributedMapCacheClient.class)
++ .build();
+
+ public static final PropertyDescriptor DIRECTORY = new PropertyDescriptor.Builder()
- .name(DIRECTORY_PROP_NAME)
- .description("The HDFS directory from which files should be read")
- .required(true)
- .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
- .build();
-
- public static final PropertyDescriptor RECURSE_SUBDIRS = new PropertyDescriptor.Builder()
- .name("Recurse Subdirectories")
- .description("Indicates whether to list files from subdirectories of the HDFS directory")
- .required(true)
- .allowableValues("true", "false")
- .defaultValue("true")
- .build();
-
-
++ .name(DIRECTORY_PROP_NAME)
++ .description("The HDFS directory from which files should be read")
++ .required(true)
++ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
++ .build();
++
++ public static final PropertyDescriptor RECURSE_SUBDIRS = new PropertyDescriptor.Builder()
++ .name("Recurse Subdirectories")
++ .description("Indicates whether to list files from subdirectories of the HDFS directory")
++ .required(true)
++ .allowableValues("true", "false")
++ .defaultValue("true")
++ .build();
++
++
+ public static final Relationship REL_SUCCESS = new Relationship.Builder()
- .name("success")
- .description("All FlowFiles are transferred to this relationship")
- .build();
++ .name("success")
++ .description("All FlowFiles are transferred to this relationship")
++ .build();
+
+ private volatile Long lastListingTime = null;
+ private volatile Set<Path> latestPathsListed = new HashSet<>();
+ private volatile boolean electedPrimaryNode = false;
+ private File persistenceFile = null;
-
++
+ @Override
+ protected void init(final ProcessorInitializationContext context) {
- super.init(context);
- persistenceFile = new File("conf/state/" + getIdentifier());
++ super.init(context);
++ persistenceFile = new File("conf/state/" + getIdentifier());
++ }
++
++ @Override
++ protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
++ final List<PropertyDescriptor> properties = new ArrayList<>();
++ properties.add(HADOOP_CONFIGURATION_RESOURCES);
++ properties.add(DISTRIBUTED_CACHE_SERVICE);
++ properties.add(DIRECTORY);
++ properties.add(RECURSE_SUBDIRS);
++ return properties;
++ }
++
++ @Override
++ public Set<Relationship> getRelationships() {
++ final Set<Relationship> relationships = new HashSet<>();
++ relationships.add(REL_SUCCESS);
++ return relationships;
++ }
++
++ private String getKey(final String directory) {
++ return getIdentifier() + ".lastListingTime." + directory;
++ }
++
++ @OnPrimaryNodeStateChange
++ public void onPrimaryNodeChange(final PrimaryNodeState newState) {
++ if ( newState == PrimaryNodeState.ELECTED_PRIMARY_NODE ) {
++ electedPrimaryNode = true;
++ }
++ }
++
++ @Override
++ public void onPropertyModified(final PropertyDescriptor descriptor, final String oldValue, final String newValue) {
++ if ( descriptor.equals(DIRECTORY) ) {
++ lastListingTime = null; // clear lastListingTime so that we have to fetch new time
++ latestPathsListed = new HashSet<>();
++ }
+ }
-
- @Override
- protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
- final List<PropertyDescriptor> properties = new ArrayList<>();
- properties.add(HADOOP_CONFIGURATION_RESOURCES);
- properties.add(DISTRIBUTED_CACHE_SERVICE);
- properties.add(DIRECTORY);
- properties.add(RECURSE_SUBDIRS);
- return properties;
- }
-
- @Override
- public Set<Relationship> getRelationships() {
- final Set<Relationship> relationships = new HashSet<>();
- relationships.add(REL_SUCCESS);
- return relationships;
- }
-
- private String getKey(final String directory) {
- return getIdentifier() + ".lastListingTime." + directory;
- }
-
- @OnPrimaryNodeStateChange
- public void onPrimaryNodeChange(final PrimaryNodeState newState) {
- if ( newState == PrimaryNodeState.ELECTED_PRIMARY_NODE ) {
- electedPrimaryNode = true;
- }
- }
-
- @Override
- public void onPropertyModified(final PropertyDescriptor descriptor, final String oldValue, final String newValue) {
- if ( descriptor.equals(DIRECTORY) ) {
- lastListingTime = null; // clear lastListingTime so that we have to fetch new time
- latestPathsListed = new HashSet<>();
- }
- }
-
- private HDFSListing deserialize(final String serializedState) throws JsonParseException, JsonMappingException, IOException {
- final ObjectMapper mapper = new ObjectMapper();
++
++ private HDFSListing deserialize(final String serializedState) throws JsonParseException, JsonMappingException, IOException {
++ final ObjectMapper mapper = new ObjectMapper();
+ final JsonNode jsonNode = mapper.readTree(serializedState);
+ return mapper.readValue(jsonNode, HDFSListing.class);
- }
-
-
- @Override
- public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
- final String directory = context.getProperty(DIRECTORY).getValue();
-
- // Determine the timestamp for the last file that we've listed.
- Long minTimestamp = lastListingTime;
- if ( minTimestamp == null || electedPrimaryNode ) {
- // We haven't yet restored any state from local or distributed state - or it's been at least a minute since
- // we have performed a listing. In this case,
- // First, attempt to get timestamp from distributed cache service.
- final DistributedMapCacheClient client = context.getProperty(DISTRIBUTED_CACHE_SERVICE).asControllerService(DistributedMapCacheClient.class);
-
- try {
- final StringSerDe serde = new StringSerDe();
- final String serializedState = client.get(getKey(directory), serde, serde);
- if ( serializedState == null || serializedState.isEmpty() ) {
- minTimestamp = null;
- this.latestPathsListed = Collections.emptySet();
- } else {
- final HDFSListing listing = deserialize(serializedState);
- this.lastListingTime = listing.getLatestTimestamp().getTime();
- minTimestamp = listing.getLatestTimestamp().getTime();
- this.latestPathsListed = listing.toPaths();
- }
-
- this.lastListingTime = minTimestamp;
- electedPrimaryNode = false; // no requirement to pull an update from the distributed cache anymore.
- } catch (final IOException ioe) {
- getLogger().error("Failed to retrieve timestamp of last listing from Distributed Cache Service. Will not perform listing until this is accomplished.");
- context.yield();
- return;
- }
-
- // Check the persistence file. We want to use the latest timestamp that we have so that
- // we don't duplicate data.
- try {
- if ( persistenceFile.exists() ) {
- try (final FileInputStream fis = new FileInputStream(persistenceFile)) {
- final Properties props = new Properties();
- props.load(fis);
-
- // get the local timestamp for this directory, if it exists.
- final String locallyPersistedValue = props.getProperty(directory);
- if ( locallyPersistedValue != null ) {
- final HDFSListing listing = deserialize(locallyPersistedValue);
- final long localTimestamp = listing.getLatestTimestamp().getTime();
-
- // If distributed state doesn't have an entry or the local entry is later than the distributed state,
- // update the distributed state so that we are in sync.
- if (minTimestamp == null || localTimestamp > minTimestamp) {
- minTimestamp = localTimestamp;
-
- // Our local persistence file shows a later time than the Distributed service.
- // Update the distributed service to match our local state.
- try {
- final StringSerDe serde = new StringSerDe();
- client.put(getKey(directory), locallyPersistedValue, serde, serde);
- } catch (final IOException ioe) {
- getLogger().warn("Local timestamp for {} is {}, which is later than Distributed state but failed to update Distributed "
- + "state due to {}. If a new node performs HDFS Listing, data duplication may occur",
- new Object[] {directory, locallyPersistedValue, ioe});
- }
- }
- }
- }
- }
- } catch (final IOException ioe) {
- getLogger().warn("Failed to recover local state due to {}. Assuming that the state from the distributed cache is correct.", ioe);
- }
- }
-
-
- // Pull in any file that is newer than the timestamp that we have.
- final FileSystem hdfs = hdfsResources.get().getValue();
- final boolean recursive = context.getProperty(RECURSE_SUBDIRS).asBoolean();
- final Path rootPath = new Path(directory);
-
- int listCount = 0;
- Long latestListingModTime = null;
- final Set<FileStatus> statuses;
- try {
- statuses = getStatuses(rootPath, recursive, hdfs);
- for ( final FileStatus status : statuses ) {
- // don't get anything where the last modified timestamp is equal to our current timestamp.
- // if we do, then we run the risk of multiple files having the same last mod date but us only
- // seeing a portion of them.
- // I.e., there could be 5 files with last mod date = (now). But if we do the listing now, maybe
- // only 2 exist and 3 more will exist later in this millisecond. So we ignore anything with a
- // modified date not before the current time.
- final long fileModTime = status.getModificationTime();
-
- // we only want the file if its timestamp is later than the minTimestamp or equal to and we didn't pull it last time.
- // Also, HDFS creates files with the suffix _COPYING_ when they are being written - we want to ignore those.
- boolean fetch = !status.getPath().getName().endsWith("_COPYING_") &&
- (minTimestamp == null || fileModTime > minTimestamp || (fileModTime == minTimestamp && !latestPathsListed.contains(status.getPath())));
-
- // Create the FlowFile for this path.
- if ( fetch ) {
- final Map<String, String> attributes = createAttributes(status);
- FlowFile flowFile = session.create();
- flowFile = session.putAllAttributes(flowFile, attributes);
- session.transfer(flowFile, REL_SUCCESS);
- listCount++;
-
- if ( latestListingModTime == null || fileModTime > latestListingModTime ) {
- latestListingModTime = fileModTime;
- }
- }
- }
- } catch (final IOException ioe) {
- getLogger().error("Failed to perform listing of HDFS due to {}", new Object[] {ioe});
- return;
- }
-
- if ( listCount > 0 ) {
- getLogger().info("Successfully created listing with {} new files from HDFS", new Object[] {listCount});
- session.commit();
-
- // We have performed a listing and pushed the FlowFiles out.
- // Now, we need to persist state about the Last Modified timestamp of the newest file
- // that we pulled in. We do this in order to avoid pulling in the same file twice.
- // However, we want to save the state both locally and remotely.
- // We store the state remotely so that if a new Primary Node is chosen, it can pick up where the
- // previously Primary Node left off.
- // We also store the state locally so that if the node is restarted, and the node cannot contact
- // the distributed state cache, the node can continue to run (if it is primary node).
- String serializedState = null;
- try {
- serializedState = serializeState(latestListingModTime, statuses);
- } catch (final Exception e) {
- getLogger().error("Failed to serialize state due to {}", new Object[] {e});
- }
-
- if ( serializedState != null ) {
- // Save our state locally.
- try {
- persistLocalState(directory, serializedState);
- } catch (final IOException ioe) {
- getLogger().warn("Unable to save state locally. If the node is restarted now, data may be duplicated. Failure is due to {}", ioe);
- }
-
- // Attempt to save state to remote server.
- final DistributedMapCacheClient client = context.getProperty(DISTRIBUTED_CACHE_SERVICE).asControllerService(DistributedMapCacheClient.class);
- try {
- client.put(getKey(directory), serializedState, new StringSerDe(), new StringSerDe());
- } catch (final IOException ioe) {
- getLogger().warn("Unable to communicate with distributed cache server due to {}. Persisting state locally instead.", ioe);
- }
- }
-
- lastListingTime = latestListingModTime;
- } else {
- getLogger().debug("There is no data to list. Yielding.");
- context.yield();
-
- // lastListingTime = 0 so that we don't continually poll the distributed cache / local file system
- if ( lastListingTime == null ) {
- lastListingTime = 0L;
- }
-
- return;
- }
- }
-
- private Set<FileStatus> getStatuses(final Path path, final boolean recursive, final FileSystem hdfs) throws IOException {
- final Set<FileStatus> statusSet = new HashSet<>();
-
- final FileStatus[] statuses = hdfs.listStatus(path);
-
- for ( final FileStatus status : statuses ) {
- if ( status.isDirectory() ) {
- if ( recursive ) {
- try {
- statusSet.addAll(getStatuses(status.getPath(), recursive, hdfs));
- } catch (final IOException ioe) {
- getLogger().error("Failed to retrieve HDFS listing for subdirectory {} due to {}; will continue listing others", new Object[] {status.getPath(), ioe});
- }
- }
- } else {
- statusSet.add(status);
- }
- }
-
- return statusSet;
- }
-
-
- private String serializeState(final long latestListingTime, final Set<FileStatus> statuses) throws JsonGenerationException, JsonMappingException, IOException {
- // we need to keep track of all files that we pulled in that had a modification time equal to
- // lastListingTime so that we can avoid pulling those files in again. We can't just ignore any files
- // that have a mod time equal to that timestamp because more files may come in with the same timestamp
- // later in the same millisecond.
- if ( statuses.isEmpty() ) {
- return null;
- } else {
- final List<FileStatus> sortedStatuses = new ArrayList<>(statuses);
- Collections.sort(sortedStatuses, new Comparator<FileStatus>() {
- @Override
- public int compare(final FileStatus o1, final FileStatus o2) {
- return Long.compare(o1.getModificationTime(), o2.getModificationTime());
- }
- });
-
- final long latestListingModTime = sortedStatuses.get(sortedStatuses.size() - 1).getModificationTime();
- final Set<Path> pathsWithModTimeEqualToListingModTime = new HashSet<>();
- for (int i=sortedStatuses.size() - 1; i >= 0; i--) {
- final FileStatus status = sortedStatuses.get(i);
- if (status.getModificationTime() == latestListingModTime) {
- pathsWithModTimeEqualToListingModTime.add(status.getPath());
- }
- }
-
- this.latestPathsListed = pathsWithModTimeEqualToListingModTime;
-
- final HDFSListing listing = new HDFSListing();
- listing.setLatestTimestamp(new Date(latestListingModTime));
- final Set<String> paths = new HashSet<>();
- for ( final Path path : pathsWithModTimeEqualToListingModTime ) {
- paths.add(path.toUri().toString());
- }
- listing.setMatchingPaths(paths);
-
- final ObjectMapper mapper = new ObjectMapper();
- final String serializedState = mapper.writerWithType(HDFSListing.class).writeValueAsString(listing);
- return serializedState;
- }
- }
-
- private void persistLocalState(final String directory, final String serializedState) throws IOException {
- // we need to keep track of all files that we pulled in that had a modification time equal to
- // lastListingTime so that we can avoid pulling those files in again. We can't just ignore any files
- // that have a mod time equal to that timestamp because more files may come in with the same timestamp
- // later in the same millisecond.
- final File dir = persistenceFile.getParentFile();
- if ( !dir.exists() && !dir.mkdirs() ) {
- throw new IOException("Could not create directory " + dir.getAbsolutePath() + " in order to save local state");
- }
-
- final Properties props = new Properties();
- if ( persistenceFile.exists() ) {
- try (final FileInputStream fis = new FileInputStream(persistenceFile)) {
- props.load(fis);
- }
- }
-
- props.setProperty(directory, serializedState);
-
- try (final FileOutputStream fos = new FileOutputStream(persistenceFile)) {
- props.store(fos, null);
- }
- }
-
- private String getAbsolutePath(final Path path) {
- final Path parent = path.getParent();
- final String prefix = (parent == null || parent.getName().equals("")) ? "" : getAbsolutePath(parent);
- return prefix + "/" + path.getName();
- }
-
- private Map<String, String> createAttributes(final FileStatus status) {
- final Map<String, String> attributes = new HashMap<>();
- attributes.put(CoreAttributes.FILENAME.key(), status.getPath().getName());
- attributes.put(CoreAttributes.PATH.key(), getAbsolutePath(status.getPath().getParent()));
-
- attributes.put("hdfs.owner", status.getOwner());
- attributes.put("hdfs.group", status.getGroup());
- attributes.put("hdfs.lastModified", String.valueOf(status.getModificationTime()));
- attributes.put("hdfs.length", String.valueOf(status.getLen()));
- attributes.put("hdfs.replication", String.valueOf(status.getReplication()));
-
- final FsPermission permission = status.getPermission();
- final String perms = getPerms(permission.getUserAction()) + getPerms(permission.getGroupAction()) + getPerms(permission.getOtherAction());
- attributes.put("hdfs.permissions", perms);
- return attributes;
- }
-
- private String getPerms(final FsAction action) {
- final StringBuilder sb = new StringBuilder();
- if ( action.implies(FsAction.READ) ) {
- sb.append("r");
- } else {
- sb.append("-");
- }
-
- if ( action.implies(FsAction.WRITE) ) {
- sb.append("w");
- } else {
- sb.append("-");
- }
-
- if ( action.implies(FsAction.EXECUTE) ) {
- sb.append("x");
- } else {
- sb.append("-");
- }
-
- return sb.toString();
- }
++ }
++
++
++ @Override
++ public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
++ final String directory = context.getProperty(DIRECTORY).getValue();
++
++ // Determine the timestamp for the last file that we've listed.
++ Long minTimestamp = lastListingTime;
++ if ( minTimestamp == null || electedPrimaryNode ) {
++ // We haven't yet restored any state from local or distributed state - or it's been at least a minute since
++ // we have performed a listing. In this case,
++ // First, attempt to get timestamp from distributed cache service.
++ final DistributedMapCacheClient client = context.getProperty(DISTRIBUTED_CACHE_SERVICE).asControllerService(DistributedMapCacheClient.class);
++
++ try {
++ final StringSerDe serde = new StringSerDe();
++ final String serializedState = client.get(getKey(directory), serde, serde);
++ if ( serializedState == null || serializedState.isEmpty() ) {
++ minTimestamp = null;
++ this.latestPathsListed = Collections.emptySet();
++ } else {
++ final HDFSListing listing = deserialize(serializedState);
++ this.lastListingTime = listing.getLatestTimestamp().getTime();
++ minTimestamp = listing.getLatestTimestamp().getTime();
++ this.latestPathsListed = listing.toPaths();
++ }
++
++ this.lastListingTime = minTimestamp;
++ electedPrimaryNode = false; // no requirement to pull an update from the distributed cache anymore.
++ } catch (final IOException ioe) {
++ getLogger().error("Failed to retrieve timestamp of last listing from Distributed Cache Service. Will not perform listing until this is accomplished.");
++ context.yield();
++ return;
++ }
++
++ // Check the persistence file. We want to use the latest timestamp that we have so that
++ // we don't duplicate data.
++ try {
++ if ( persistenceFile.exists() ) {
++ try (final FileInputStream fis = new FileInputStream(persistenceFile)) {
++ final Properties props = new Properties();
++ props.load(fis);
++
++ // get the local timestamp for this directory, if it exists.
++ final String locallyPersistedValue = props.getProperty(directory);
++ if ( locallyPersistedValue != null ) {
++ final HDFSListing listing = deserialize(locallyPersistedValue);
++ final long localTimestamp = listing.getLatestTimestamp().getTime();
++
++ // If distributed state doesn't have an entry or the local entry is later than the distributed state,
++ // update the distributed state so that we are in sync.
++ if (minTimestamp == null || localTimestamp > minTimestamp) {
++ minTimestamp = localTimestamp;
++
++ // Our local persistence file shows a later time than the Distributed service.
++ // Update the distributed service to match our local state.
++ try {
++ final StringSerDe serde = new StringSerDe();
++ client.put(getKey(directory), locallyPersistedValue, serde, serde);
++ } catch (final IOException ioe) {
++ getLogger().warn("Local timestamp for {} is {}, which is later than Distributed state but failed to update Distributed "
++ + "state due to {}. If a new node performs HDFS Listing, data duplication may occur",
++ new Object[] {directory, locallyPersistedValue, ioe});
++ }
++ }
++ }
++ }
++ }
++ } catch (final IOException ioe) {
++ getLogger().warn("Failed to recover local state due to {}. Assuming that the state from the distributed cache is correct.", ioe);
++ }
++ }
++
++
++ // Pull in any file that is newer than the timestamp that we have.
++ final FileSystem hdfs = hdfsResources.get().getValue();
++ final boolean recursive = context.getProperty(RECURSE_SUBDIRS).asBoolean();
++ final Path rootPath = new Path(directory);
++
++ int listCount = 0;
++ Long latestListingModTime = null;
++ final Set<FileStatus> statuses;
++ try {
++ statuses = getStatuses(rootPath, recursive, hdfs);
++ for ( final FileStatus status : statuses ) {
++ // don't get anything where the last modified timestamp is equal to our current timestamp.
++ // if we do, then we run the risk of multiple files having the same last mod date but us only
++ // seeing a portion of them.
++ // I.e., there could be 5 files with last mod date = (now). But if we do the listing now, maybe
++ // only 2 exist and 3 more will exist later in this millisecond. So we ignore anything with a
++ // modified date not before the current time.
++ final long fileModTime = status.getModificationTime();
++
++ // we only want the file if its timestamp is later than the minTimestamp or equal to and we didn't pull it last time.
++ // Also, HDFS creates files with the suffix _COPYING_ when they are being written - we want to ignore those.
++ boolean fetch = !status.getPath().getName().endsWith("_COPYING_")
++ && (minTimestamp == null || fileModTime > minTimestamp || (fileModTime == minTimestamp && !latestPathsListed.contains(status.getPath())));
++
++ // Create the FlowFile for this path.
++ if ( fetch ) {
++ final Map<String, String> attributes = createAttributes(status);
++ FlowFile flowFile = session.create();
++ flowFile = session.putAllAttributes(flowFile, attributes);
++ session.transfer(flowFile, REL_SUCCESS);
++ listCount++;
++
++ if ( latestListingModTime == null || fileModTime > latestListingModTime ) {
++ latestListingModTime = fileModTime;
++ }
++ }
++ }
++ } catch (final IOException ioe) {
++ getLogger().error("Failed to perform listing of HDFS due to {}", new Object[] {ioe});
++ return;
++ }
++
++ if ( listCount > 0 ) {
++ getLogger().info("Successfully created listing with {} new files from HDFS", new Object[] {listCount});
++ session.commit();
++
++ // We have performed a listing and pushed the FlowFiles out.
++ // Now, we need to persist state about the Last Modified timestamp of the newest file
++ // that we pulled in. We do this in order to avoid pulling in the same file twice.
++ // However, we want to save the state both locally and remotely.
++ // We store the state remotely so that if a new Primary Node is chosen, it can pick up where the
++ // previously Primary Node left off.
++ // We also store the state locally so that if the node is restarted, and the node cannot contact
++ // the distributed state cache, the node can continue to run (if it is primary node).
++ String serializedState = null;
++ try {
++ serializedState = serializeState(latestListingModTime, statuses);
++ } catch (final Exception e) {
++ getLogger().error("Failed to serialize state due to {}", new Object[] {e});
++ }
++
++ if ( serializedState != null ) {
++ // Save our state locally.
++ try {
++ persistLocalState(directory, serializedState);
++ } catch (final IOException ioe) {
++ getLogger().warn("Unable to save state locally. If the node is restarted now, data may be duplicated. Failure is due to {}", ioe);
++ }
++
++ // Attempt to save state to remote server.
++ final DistributedMapCacheClient client = context.getProperty(DISTRIBUTED_CACHE_SERVICE).asControllerService(DistributedMapCacheClient.class);
++ try {
++ client.put(getKey(directory), serializedState, new StringSerDe(), new StringSerDe());
++ } catch (final IOException ioe) {
++ getLogger().warn("Unable to communicate with distributed cache server due to {}. Persisting state locally instead.", ioe);
++ }
++ }
++
++ lastListingTime = latestListingModTime;
++ } else {
++ getLogger().debug("There is no data to list. Yielding.");
++ context.yield();
++
++ // lastListingTime = 0 so that we don't continually poll the distributed cache / local file system
++ if ( lastListingTime == null ) {
++ lastListingTime = 0L;
++ }
++
++ return;
++ }
++ }
++
++ private Set<FileStatus> getStatuses(final Path path, final boolean recursive, final FileSystem hdfs) throws IOException {
++ final Set<FileStatus> statusSet = new HashSet<>();
++
++ final FileStatus[] statuses = hdfs.listStatus(path);
++
++ for ( final FileStatus status : statuses ) {
++ if ( status.isDirectory() ) {
++ if ( recursive ) {
++ try {
++ statusSet.addAll(getStatuses(status.getPath(), recursive, hdfs));
++ } catch (final IOException ioe) {
++ getLogger().error("Failed to retrieve HDFS listing for subdirectory {} due to {}; will continue listing others", new Object[] {status.getPath(), ioe});
++ }
++ }
++ } else {
++ statusSet.add(status);
++ }
++ }
++
++ return statusSet;
++ }
++
++
++ private String serializeState(final long latestListingTime, final Set<FileStatus> statuses) throws JsonGenerationException, JsonMappingException, IOException {
++ // we need to keep track of all files that we pulled in that had a modification time equal to
++ // lastListingTime so that we can avoid pulling those files in again. We can't just ignore any files
++ // that have a mod time equal to that timestamp because more files may come in with the same timestamp
++ // later in the same millisecond.
++ if ( statuses.isEmpty() ) {
++ return null;
++ } else {
++ final List<FileStatus> sortedStatuses = new ArrayList<>(statuses);
++ Collections.sort(sortedStatuses, new Comparator<FileStatus>() {
++ @Override
++ public int compare(final FileStatus o1, final FileStatus o2) {
++ return Long.compare(o1.getModificationTime(), o2.getModificationTime());
++ }
++ });
++
++ final long latestListingModTime = sortedStatuses.get(sortedStatuses.size() - 1).getModificationTime();
++ final Set<Path> pathsWithModTimeEqualToListingModTime = new HashSet<>();
++ for (int i=sortedStatuses.size() - 1; i >= 0; i--) {
++ final FileStatus status = sortedStatuses.get(i);
++ if (status.getModificationTime() == latestListingModTime) {
++ pathsWithModTimeEqualToListingModTime.add(status.getPath());
++ }
++ }
++
++ this.latestPathsListed = pathsWithModTimeEqualToListingModTime;
++
++ final HDFSListing listing = new HDFSListing();
++ listing.setLatestTimestamp(new Date(latestListingModTime));
++ final Set<String> paths = new HashSet<>();
++ for ( final Path path : pathsWithModTimeEqualToListingModTime ) {
++ paths.add(path.toUri().toString());
++ }
++ listing.setMatchingPaths(paths);
++
++ final ObjectMapper mapper = new ObjectMapper();
++ final String serializedState = mapper.writerWithType(HDFSListing.class).writeValueAsString(listing);
++ return serializedState;
++ }
++ }
++
++ private void persistLocalState(final String directory, final String serializedState) throws IOException {
++ // we need to keep track of all files that we pulled in that had a modification time equal to
++ // lastListingTime so that we can avoid pulling those files in again. We can't just ignore any files
++ // that have a mod time equal to that timestamp because more files may come in with the same timestamp
++ // later in the same millisecond.
++ final File dir = persistenceFile.getParentFile();
++ if ( !dir.exists() && !dir.mkdirs() ) {
++ throw new IOException("Could not create directory " + dir.getAbsolutePath() + " in order to save local state");
++ }
++
++ final Properties props = new Properties();
++ if ( persistenceFile.exists() ) {
++ try (final FileInputStream fis = new FileInputStream(persistenceFile)) {
++ props.load(fis);
++ }
++ }
++
++ props.setProperty(directory, serializedState);
++
++ try (final FileOutputStream fos = new FileOutputStream(persistenceFile)) {
++ props.store(fos, null);
++ }
++ }
++
++ private String getAbsolutePath(final Path path) {
++ final Path parent = path.getParent();
++ final String prefix = (parent == null || parent.getName().equals("")) ? "" : getAbsolutePath(parent);
++ return prefix + "/" + path.getName();
++ }
++
++ private Map<String, String> createAttributes(final FileStatus status) {
++ final Map<String, String> attributes = new HashMap<>();
++ attributes.put(CoreAttributes.FILENAME.key(), status.getPath().getName());
++ attributes.put(CoreAttributes.PATH.key(), getAbsolutePath(status.getPath().getParent()));
++
++ attributes.put("hdfs.owner", status.getOwner());
++ attributes.put("hdfs.group", status.getGroup());
++ attributes.put("hdfs.lastModified", String.valueOf(status.getModificationTime()));
++ attributes.put("hdfs.length", String.valueOf(status.getLen()));
++ attributes.put("hdfs.replication", String.valueOf(status.getReplication()));
++
++ final FsPermission permission = status.getPermission();
++ final String perms = getPerms(permission.getUserAction()) + getPerms(permission.getGroupAction()) + getPerms(permission.getOtherAction());
++ attributes.put("hdfs.permissions", perms);
++ return attributes;
++ }
++
++ private String getPerms(final FsAction action) {
++ final StringBuilder sb = new StringBuilder();
++ if ( action.implies(FsAction.READ) ) {
++ sb.append("r");
++ } else {
++ sb.append("-");
++ }
++
++ if ( action.implies(FsAction.WRITE) ) {
++ sb.append("w");
++ } else {
++ sb.append("-");
++ }
++
++ if ( action.implies(FsAction.EXECUTE) ) {
++ sb.append("x");
++ } else {
++ sb.append("-");
++ }
++
++ return sb.toString();
++ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/dc7f7a82/nifi/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/util/HDFSListing.java
----------------------------------------------------------------------
diff --cc nifi/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/util/HDFSListing.java
index 9f4d68b,0000000..49957f5
mode 100644,000000..100644
--- a/nifi/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/util/HDFSListing.java
+++ b/nifi/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/util/HDFSListing.java
@@@ -1,83 -1,0 +1,83 @@@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.hadoop.util;
+
+import java.util.Collection;
+import java.util.Date;
+import java.util.HashSet;
+import java.util.Set;
+
+import javax.xml.bind.annotation.XmlTransient;
+import javax.xml.bind.annotation.XmlType;
+
+import org.apache.hadoop.fs.Path;
+
+/**
+ * A simple POJO for maintaining state about the last HDFS Listing that was performed so that
- * we can avoid pulling the same file multiple times
++ * we can avoid pulling the same file multiple times
+ */
+@XmlType(name = "listing")
+public class HDFSListing {
- private Date latestTimestamp;
- private Collection<String> matchingPaths;
-
- /**
- * @return the modification date of the newest file that was contained in the HDFS Listing
- */
- public Date getLatestTimestamp() {
- return latestTimestamp;
- }
-
- /**
- * Sets the timestamp of the modification date of the newest file that was contained in the HDFS Listing
- *
- * @param latestTimestamp the timestamp of the modification date of the newest file that was contained in the HDFS Listing
- */
- public void setLatestTimestamp(Date latestTimestamp) {
- this.latestTimestamp = latestTimestamp;
- }
-
- /**
- * @return a Collection containing the paths of all files in the HDFS Listing whose Modification date
- * was equal to {@link #getLatestTimestamp()}
- */
- @XmlTransient
- public Collection<String> getMatchingPaths() {
- return matchingPaths;
- }
-
- /**
- * @return a Collection of {@link Path} objects equivalent to those returned by {@link #getMatchingPaths()}
- */
- public Set<Path> toPaths() {
- final Set<Path> paths = new HashSet<>(matchingPaths.size());
- for ( final String pathname : matchingPaths ) {
- paths.add(new Path(pathname));
- }
- return paths;
- }
++ private Date latestTimestamp;
++ private Collection<String> matchingPaths;
+
- /**
- * Sets the Collection containing the paths of all files in the HDFS Listing whose Modification Date was
- * equal to {@link #getLatestTimestamp()}
- * @param matchingPaths
- */
- public void setMatchingPaths(Collection<String> matchingPaths) {
- this.matchingPaths = matchingPaths;
- }
++ /**
++ * @return the modification date of the newest file that was contained in the HDFS Listing
++ */
++ public Date getLatestTimestamp() {
++ return latestTimestamp;
++ }
++
++ /**
++ * Sets the timestamp of the modification date of the newest file that was contained in the HDFS Listing
++ *
++ * @param latestTimestamp the timestamp of the modification date of the newest file that was contained in the HDFS Listing
++ */
++ public void setLatestTimestamp(Date latestTimestamp) {
++ this.latestTimestamp = latestTimestamp;
++ }
++
++ /**
++ * @return a Collection containing the paths of all files in the HDFS Listing whose Modification date
++ * was equal to {@link #getLatestTimestamp()}
++ */
++ @XmlTransient
++ public Collection<String> getMatchingPaths() {
++ return matchingPaths;
++ }
++
++ /**
++ * @return a Collection of {@link Path} objects equivalent to those returned by {@link #getMatchingPaths()}
++ */
++ public Set<Path> toPaths() {
++ final Set<Path> paths = new HashSet<>(matchingPaths.size());
++ for ( final String pathname : matchingPaths ) {
++ paths.add(new Path(pathname));
++ }
++ return paths;
++ }
++
++ /**
++ * Sets the Collection containing the paths of all files in the HDFS Listing whose Modification Date was
++ * equal to {@link #getLatestTimestamp()}
++ * @param matchingPaths the paths that have last modified date matching the latest timestamp
++ */
++ public void setMatchingPaths(Collection<String> matchingPaths) {
++ this.matchingPaths = matchingPaths;
++ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/dc7f7a82/nifi/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/util/LongSerDe.java
----------------------------------------------------------------------
diff --cc nifi/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/util/LongSerDe.java
index ef0e590,0000000..229f26c
mode 100644,000000..100644
--- a/nifi/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/util/LongSerDe.java
+++ b/nifi/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/util/LongSerDe.java
@@@ -1,48 -1,0 +1,48 @@@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.hadoop.util;
+
+import java.io.ByteArrayInputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+
+import org.apache.nifi.distributed.cache.client.Deserializer;
+import org.apache.nifi.distributed.cache.client.Serializer;
+import org.apache.nifi.distributed.cache.client.exception.DeserializationException;
+import org.apache.nifi.distributed.cache.client.exception.SerializationException;
+
+public class LongSerDe implements Serializer<Long>, Deserializer<Long> {
+
- @Override
- public Long deserialize(final byte[] input) throws DeserializationException, IOException {
- if ( input == null || input.length == 0 ) {
- return null;
- }
-
- final DataInputStream dis = new DataInputStream(new ByteArrayInputStream(input));
- return dis.readLong();
- }
++ @Override
++ public Long deserialize(final byte[] input) throws DeserializationException, IOException {
++ if ( input == null || input.length == 0 ) {
++ return null;
++ }
+
- @Override
- public void serialize(final Long value, final OutputStream out) throws SerializationException, IOException {
- final DataOutputStream dos = new DataOutputStream(out);
- dos.writeLong(value);
- }
++ final DataInputStream dis = new DataInputStream(new ByteArrayInputStream(input));
++ return dis.readLong();
++ }
++
++ @Override
++ public void serialize(final Long value, final OutputStream out) throws SerializationException, IOException {
++ final DataOutputStream dos = new DataOutputStream(out);
++ dos.writeLong(value);
++ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/dc7f7a82/nifi/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/util/StringSerDe.java
----------------------------------------------------------------------
diff --cc nifi/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/util/StringSerDe.java
index 848831f,0000000..ca1c548
mode 100644,000000..100644
--- a/nifi/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/util/StringSerDe.java
+++ b/nifi/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/util/StringSerDe.java
@@@ -1,44 -1,0 +1,44 @@@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.hadoop.util;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.charset.StandardCharsets;
+
+import org.apache.nifi.distributed.cache.client.Deserializer;
+import org.apache.nifi.distributed.cache.client.Serializer;
+import org.apache.nifi.distributed.cache.client.exception.DeserializationException;
+import org.apache.nifi.distributed.cache.client.exception.SerializationException;
+
+public class StringSerDe implements Serializer<String>, Deserializer<String> {
+
- @Override
- public String deserialize(final byte[] value) throws DeserializationException, IOException {
- if ( value == null ) {
- return null;
- }
-
- return new String(value, StandardCharsets.UTF_8);
- }
++ @Override
++ public String deserialize(final byte[] value) throws DeserializationException, IOException {
++ if ( value == null ) {
++ return null;
++ }
+
- @Override
- public void serialize(final String value, final OutputStream out) throws SerializationException, IOException {
- out.write(value.getBytes(StandardCharsets.UTF_8));
- }
++ return new String(value, StandardCharsets.UTF_8);
++ }
++
++ @Override
++ public void serialize(final String value, final OutputStream out) throws SerializationException, IOException {
++ out.write(value.getBytes(StandardCharsets.UTF_8));
++ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/dc7f7a82/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestDetectDuplicate.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/dc7f7a82/nifi/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-client-service-api/src/main/java/org/apache/nifi/distributed/cache/client/DistributedMapCacheClient.java
----------------------------------------------------------------------
diff --cc nifi/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-client-service-api/src/main/java/org/apache/nifi/distributed/cache/client/DistributedMapCacheClient.java
index 975dc63,d816e8c..ea3bb63
--- a/nifi/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-client-service-api/src/main/java/org/apache/nifi/distributed/cache/client/DistributedMapCacheClient.java
+++ b/nifi/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-client-service-api/src/main/java/org/apache/nifi/distributed/cache/client/DistributedMapCacheClient.java
@@@ -83,31 -86,15 +86,34 @@@ public interface DistributedMapCacheCli
<K> boolean containsKey(K key, Serializer<K> keySerializer) throws IOException;
/**
- * @param <K> type of key
- * @param <V> type of value
+ * Adds the specified key and value to the cache, overwriting any value that is
+ * currently set.
- *
++ *
++ * @param <K> the key type
++ * @param <V> the value type
+ * @param key The key to set
+ * @param value The value to associate with the given Key
+ * @param keySerializer the Serializer that will be used to serialize the key into bytes
+ * @param valueSerializer the Serializer that will be used to serialize the value into bytes
- *
++ *
+ * @throws IOException if unable to communicate with the remote instance
+ * @throws NullPointerException if the key or either serializer is null
+ */
+ <K, V> void put(K key, V value, Serializer<K> keySerializer, Serializer<V> valueSerializer) throws IOException;
-
++
+ /**
+ * Returns the value in the cache for the given key, if one exists;
+ * otherwise returns <code>null</code>
+ *
- * @param <K>
- * @param <V>
++ * @param <K> the key type
++ * @param <V> the value type
* @param key the key to lookup in the map
- * @param keySerializer
- * @param valueDeserializer
+ * @param keySerializer key serializer
+ * @param valueDeserializer value serializer
*
- * @return
- * @throws IOException
+ * @return the value in the cache for the given key, if one exists;
+ * otherwise returns <code>null</code>
+ * @throws IOException ex
*/
<K, V> V get(K key, Serializer<K> keySerializer, Deserializer<V> valueDeserializer) throws IOException;
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/dc7f7a82/nifi/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/DistributedMapCacheClientService.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/dc7f7a82/nifi/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/MapCache.java
----------------------------------------------------------------------
diff --cc nifi/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/MapCache.java
index 8903046,fad0adb..e9c6f1d
--- a/nifi/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/MapCache.java
+++ b/nifi/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/MapCache.java
@@@ -22,9 -22,12 +22,14 @@@ import java.nio.ByteBuffer
public interface MapCache {
MapPutResult putIfAbsent(ByteBuffer key, ByteBuffer value) throws IOException;
+
+ MapPutResult put(ByteBuffer key, ByteBuffer value) throws IOException;
++
boolean containsKey(ByteBuffer key) throws IOException;
+
ByteBuffer get(ByteBuffer key) throws IOException;
+
ByteBuffer remove(ByteBuffer key) throws IOException;
+
void shutdown() throws IOException;
}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/dc7f7a82/nifi/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/MapCacheServer.java
----------------------------------------------------------------------
diff --cc nifi/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/MapCacheServer.java
index cf8996c,943d6aa..13ed0df
--- a/nifi/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/MapCacheServer.java
+++ b/nifi/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/MapCacheServer.java
@@@ -55,70 -55,63 +55,70 @@@ public class MapCacheServer extends Abs
final String action = dis.readUTF();
try {
switch (action) {
- case "close": {
- return false;
- }
- case "putIfAbsent": {
- final byte[] key = readValue(dis);
- final byte[] value = readValue(dis);
- final MapPutResult putResult = cache.putIfAbsent(ByteBuffer.wrap(key), ByteBuffer.wrap(value));
- dos.writeBoolean(putResult.isSuccessful());
- break;
- }
- case "containsKey": {
- final byte[] key = readValue(dis);
- final boolean contains = cache.containsKey(ByteBuffer.wrap(key));
- dos.writeBoolean(contains);
- break;
- }
- case "getAndPutIfAbsent": {
- final byte[] key = readValue(dis);
- final byte[] value = readValue(dis);
-
- final MapPutResult putResult = cache.putIfAbsent(ByteBuffer.wrap(key), ByteBuffer.wrap(value));
- if (putResult.isSuccessful()) {
- // Put was successful. There was no old value to get.
- dos.writeInt(0);
- } else {
- // we didn't put. Write back the previous value
- final byte[] byteArray = putResult.getExistingValue().array();
- dos.writeInt(byteArray.length);
- dos.write(byteArray);
- }
-
- break;
- }
- case "get": {
- final byte[] key = readValue(dis);
- final ByteBuffer existingValue = cache.get(ByteBuffer.wrap(key));
- if (existingValue == null) {
- // there was no existing value; we did a "put".
- dos.writeInt(0);
- } else {
- // a value already existed. we did not update the map
- final byte[] byteArray = existingValue.array();
- dos.writeInt(byteArray.length);
- dos.write(byteArray);
- }
-
- break;
- }
- case "remove": {
- final byte[] key = readValue(dis);
- final boolean removed = cache.remove(ByteBuffer.wrap(key)) != null;
- dos.writeBoolean(removed);
- break;
+ case "close": {
+ return false;
+ }
+ case "putIfAbsent": {
+ final byte[] key = readValue(dis);
+ final byte[] value = readValue(dis);
+ final MapPutResult putResult = cache.putIfAbsent(ByteBuffer.wrap(key), ByteBuffer.wrap(value));
+ dos.writeBoolean(putResult.isSuccessful());
+ break;
+ }
+ case "put": {
- final byte[] key = readValue(dis);
- final byte[] value = readValue(dis);
- cache.put(ByteBuffer.wrap(key), ByteBuffer.wrap(value));
++ final byte[] key = readValue(dis);
++ final byte[] value = readValue(dis);
++ cache.put(ByteBuffer.wrap(key), ByteBuffer.wrap(value));
+ dos.writeBoolean(true);
- break;
++ break;
+ }
+ case "containsKey": {
+ final byte[] key = readValue(dis);
+ final boolean contains = cache.containsKey(ByteBuffer.wrap(key));
+ dos.writeBoolean(contains);
+ break;
+ }
+ case "getAndPutIfAbsent": {
+ final byte[] key = readValue(dis);
+ final byte[] value = readValue(dis);
+
+ final MapPutResult putResult = cache.putIfAbsent(ByteBuffer.wrap(key), ByteBuffer.wrap(value));
+ if (putResult.isSuccessful()) {
+ // Put was successful. There was no old value to get.
+ dos.writeInt(0);
+ } else {
+ // we didn't put. Write back the previous value
+ final byte[] byteArray = putResult.getExistingValue().array();
+ dos.writeInt(byteArray.length);
+ dos.write(byteArray);
}
- default: {
- throw new IOException("Illegal Request");
+
+ break;
+ }
+ case "get": {
+ final byte[] key = readValue(dis);
+ final ByteBuffer existingValue = cache.get(ByteBuffer.wrap(key));
+ if (existingValue == null) {
+ // there was no existing value; we did a "put".
+ dos.writeInt(0);
+ } else {
+ // a value already existed. we did not update the map
+ final byte[] byteArray = existingValue.array();
+ dos.writeInt(byteArray.length);
+ dos.write(byteArray);
}
+
+ break;
+ }
+ case "remove": {
+ final byte[] key = readValue(dis);
+ final boolean removed = cache.remove(ByteBuffer.wrap(key)) != null;
+ dos.writeBoolean(removed);
+ break;
+ }
+ default: {
+ throw new IOException("Illegal Request");
+ }
}
} finally {
dos.flush();
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/dc7f7a82/nifi/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/PersistentMapCache.java
----------------------------------------------------------------------
diff --cc nifi/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/PersistentMapCache.java
index 82b1787,e821fbf..663f441
--- a/nifi/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/PersistentMapCache.java
+++ b/nifi/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/PersistentMapCache.java
@@@ -75,33 -75,9 +75,33 @@@ public class PersistentMapCache impleme
wali.checkpoint();
}
}
-
+
return putResult;
}
+
+ @Override
+ public MapPutResult put(final ByteBuffer key, final ByteBuffer value) throws IOException {
+ final MapPutResult putResult = wrapped.put(key, value);
+ if ( putResult.isSuccessful() ) {
+ // The put was successful.
+ final MapWaliRecord record = new MapWaliRecord(UpdateType.CREATE, key, value);
+ final List<MapWaliRecord> records = new ArrayList<>();
+ records.add(record);
+
+ if ( putResult.getEvictedKey() != null ) {
+ records.add(new MapWaliRecord(UpdateType.DELETE, putResult.getEvictedKey(), putResult.getEvictedValue()));
+ }
+
+ wali.update(Collections.singletonList(record), false);
+
+ final long modCount = modifications.getAndIncrement();
+ if ( modCount > 0 && modCount % 100000 == 0 ) {
+ wali.checkpoint();
+ }
+ }
+
+ return putResult;
+ }
@Override
public boolean containsKey(final ByteBuffer key) throws IOException {
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/dc7f7a82/nifi/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/SimpleMapCache.java
----------------------------------------------------------------------
diff --cc nifi/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/SimpleMapCache.java
index d8f9c45,9e8bbd1..b167c62
--- a/nifi/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/SimpleMapCache.java
+++ b/nifi/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/SimpleMapCache.java
@@@ -105,29 -106,7 +106,29 @@@ public class SimpleMapCache implements
writeLock.unlock();
}
}
-
+
+
+ @Override
+ public MapPutResult put(final ByteBuffer key, final ByteBuffer value) {
+ writeLock.lock();
+ try {
- // evict if we need to in order to make room for a new entry.
++ // evict if we need to in order to make room for a new entry.
+ final MapCacheRecord evicted = evict();
+
+ final MapCacheRecord record = new MapCacheRecord(key, value);
- final MapCacheRecord existing = cache.put(key, record);
- inverseCacheMap.put(record, key);
-
- final ByteBuffer existingValue = (existing == null) ? null : existing.getValue();
- final ByteBuffer evictedKey = (evicted == null) ? null : evicted.getKey();
- final ByteBuffer evictedValue = (evicted == null) ? null : evicted.getValue();
-
- return new MapPutResult(true, key, value, existingValue, evictedKey, evictedValue);
++ final MapCacheRecord existing = cache.put(key, record);
++ inverseCacheMap.put(record, key);
++
++ final ByteBuffer existingValue = (existing == null) ? null : existing.getValue();
++ final ByteBuffer evictedKey = (evicted == null) ? null : evicted.getKey();
++ final ByteBuffer evictedValue = (evicted == null) ? null : evicted.getValue();
++
++ return new MapPutResult(true, key, value, existingValue, evictedKey, evictedValue);
+ } finally {
+ writeLock.unlock();
+ }
+ }
-
++
@Override
public boolean containsKey(final ByteBuffer key) {
readLock.lock();