You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ma...@apache.org on 2015/04/28 16:53:36 UTC
[4/7] incubator-nifi git commit: NIFI-533: Refactored for Unit Tests
and added unit tests for ListHDFS
NIFI-533: Refactored for Unit Tests and added unit tests for ListHDFS
Project: http://git-wip-us.apache.org/repos/asf/incubator-nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-nifi/commit/e4f43156
Tree: http://git-wip-us.apache.org/repos/asf/incubator-nifi/tree/e4f43156
Diff: http://git-wip-us.apache.org/repos/asf/incubator-nifi/diff/e4f43156
Branch: refs/heads/ListHDFS
Commit: e4f431561e6af7a603a5c8b7a82910f28dc6d600
Parents: dc7f7a8
Author: Mark Payne <ma...@hotmail.com>
Authored: Tue Apr 28 08:46:38 2015 -0400
Committer: Mark Payne <ma...@hotmail.com>
Committed: Tue Apr 28 08:46:38 2015 -0400
----------------------------------------------------------------------
.../hadoop/AbstractHadoopProcessor.java | 24 +-
.../hadoop/CreateHadoopSequenceFile.java | 2 +-
.../nifi/processors/hadoop/FetchHDFS.java | 2 +-
.../apache/nifi/processors/hadoop/GetHDFS.java | 10 +-
.../processors/hadoop/GetHDFSSequenceFile.java | 12 +-
.../apache/nifi/processors/hadoop/ListHDFS.java | 42 ++-
.../apache/nifi/processors/hadoop/PutHDFS.java | 21 +-
.../nifi/processors/hadoop/TestListHDFS.java | 347 +++++++++++++++++++
8 files changed, 416 insertions(+), 44 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/e4f43156/nifi/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/AbstractHadoopProcessor.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/AbstractHadoopProcessor.java b/nifi/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/AbstractHadoopProcessor.java
index 91c21a0..355950f 100644
--- a/nifi/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/AbstractHadoopProcessor.java
+++ b/nifi/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/AbstractHadoopProcessor.java
@@ -70,7 +70,7 @@ public abstract class AbstractHadoopProcessor extends AbstractProcessor {
// variables shared by all threads of this processor
// Hadoop Configuration and FileSystem
- protected final AtomicReference<Tuple<Configuration, FileSystem>> hdfsResources = new AtomicReference<>();
+ private final AtomicReference<Tuple<Configuration, FileSystem>> hdfsResources = new AtomicReference<>();
@Override
protected void init(ProcessorInitializationContext context) {
@@ -153,7 +153,7 @@ public abstract class AbstractHadoopProcessor extends AbstractProcessor {
String disableCacheName = String.format("fs.%s.impl.disable.cache", FileSystem.getDefaultUri(config).getScheme());
config.set(disableCacheName, "true");
- final FileSystem fs = FileSystem.get(config);
+ final FileSystem fs = getFileSystem(config);
getLogger().info(
"Initialized a new HDFS File System with working dir: {} default block size: {} default replication: {} config: {}",
new Object[]{fs.getWorkingDirectory(), fs.getDefaultBlockSize(new Path(dir)),
@@ -165,6 +165,18 @@ public abstract class AbstractHadoopProcessor extends AbstractProcessor {
}
}
+ /**
+ * This exists in order to allow unit tests to override it so that they don't take several minutes waiting
+ * for UDP packets to be received
+ *
+ * @param config the configuration to use
+ * @return the FileSystem that is created for the given Configuration
+ * @throws IOException if unable to create the FileSystem
+ */
+ protected FileSystem getFileSystem(final Configuration config) throws IOException {
+ return FileSystem.get(config);
+ }
+
/*
* Drastically reduce the timeout of a socket connection from the default in FileSystem.get()
*/
@@ -243,4 +255,12 @@ public abstract class AbstractHadoopProcessor extends AbstractProcessor {
}
return builder.toString();
}
+
+ protected Configuration getConfiguration() {
+ return hdfsResources.get().getKey();
+ }
+
+ protected FileSystem getFileSystem() {
+ return hdfsResources.get().getValue();
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/e4f43156/nifi/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/CreateHadoopSequenceFile.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/CreateHadoopSequenceFile.java b/nifi/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/CreateHadoopSequenceFile.java
index f462277..186a290 100644
--- a/nifi/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/CreateHadoopSequenceFile.java
+++ b/nifi/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/CreateHadoopSequenceFile.java
@@ -156,7 +156,7 @@ public class CreateHadoopSequenceFile extends AbstractHadoopProcessor {
final String fileName = flowFile.getAttribute(CoreAttributes.FILENAME.key()) + ".sf";
flowFile = session.putAttribute(flowFile, CoreAttributes.FILENAME.key(), fileName);
try {
- flowFile = sequenceFileWriter.writeSequenceFile(flowFile, session, hdfsResources.get().getKey(), compressionType);
+ flowFile = sequenceFileWriter.writeSequenceFile(flowFile, session, getConfiguration(), compressionType);
session.transfer(flowFile, RELATIONSHIP_SUCCESS);
getLogger().info("Transferred flowfile {} to {}", new Object[]{flowFile, RELATIONSHIP_SUCCESS});
} catch (ProcessException e) {
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/e4f43156/nifi/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/FetchHDFS.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/FetchHDFS.java b/nifi/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/FetchHDFS.java
index b5efce0..4a52fb7 100644
--- a/nifi/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/FetchHDFS.java
+++ b/nifi/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/FetchHDFS.java
@@ -99,7 +99,7 @@ public class FetchHDFS extends AbstractHadoopProcessor {
return;
}
- final FileSystem hdfs = hdfsResources.get().getValue();
+ final FileSystem hdfs = getFileSystem();
final Path path = new Path(context.getProperty(FILENAME).evaluateAttributeExpressions(flowFile).getValue());
final URI uri = path.toUri();
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/e4f43156/nifi/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/GetHDFS.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/GetHDFS.java b/nifi/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/GetHDFS.java
index f7894d9..7aa534f 100644
--- a/nifi/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/GetHDFS.java
+++ b/nifi/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/GetHDFS.java
@@ -236,8 +236,8 @@ public class GetHDFS extends AbstractHadoopProcessor {
abstractOnScheduled(context);
// copy configuration values to pass them around cleanly
processorConfig = new ProcessorConfiguration(context);
- FileSystem fs = hdfsResources.get().getValue();
- Path dir = new Path(context.getProperty(DIRECTORY).getValue());
+ final FileSystem fs = getFileSystem();
+ final Path dir = new Path(context.getProperty(DIRECTORY).getValue());
if (!fs.exists(dir)) {
throw new IOException("PropertyDescriptor " + DIRECTORY + " has invalid value " + dir + ". The directory does not exist.");
}
@@ -330,8 +330,8 @@ public class GetHDFS extends AbstractHadoopProcessor {
protected void processBatchOfFiles(final List<Path> files, final ProcessContext context, final ProcessSession session) {
// process the batch of files
FSDataInputStream stream = null;
- Configuration conf = hdfsResources.get().getKey();
- FileSystem hdfs = hdfsResources.get().getValue();
+ Configuration conf = getConfiguration();
+ FileSystem hdfs = getFileSystem();
final boolean keepSourceFiles = context.getProperty(KEEP_SOURCE_FILE).asBoolean();
final Double bufferSizeProp = context.getProperty(BUFFER_SIZE).asDataSize(DataUnit.B);
int bufferSize = bufferSizeProp != null ? bufferSizeProp.intValue() : conf.getInt(BUFFER_SIZE_KEY,
@@ -398,7 +398,7 @@ public class GetHDFS extends AbstractHadoopProcessor {
if (System.currentTimeMillis() >= nextPollTime && listingLock.tryLock()) {
try {
- final FileSystem hdfs = hdfsResources.get().getValue();
+ final FileSystem hdfs = getFileSystem();
// get listing
listing = selectFiles(hdfs, processorConfig.getConfiguredRootDirPath(), null);
lastPollTime.set(System.currentTimeMillis());
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/e4f43156/nifi/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/GetHDFSSequenceFile.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/GetHDFSSequenceFile.java b/nifi/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/GetHDFSSequenceFile.java
index 22ba36b..f032ee4 100644
--- a/nifi/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/GetHDFSSequenceFile.java
+++ b/nifi/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/GetHDFSSequenceFile.java
@@ -22,6 +22,9 @@ import java.util.List;
import java.util.Set;
import java.util.concurrent.TimeUnit;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
import org.apache.nifi.annotation.behavior.TriggerWhenEmpty;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.SeeAlso;
@@ -34,10 +37,6 @@ import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processors.hadoop.util.SequenceFileReader;
import org.apache.nifi.util.StopWatch;
-import org.apache.nifi.util.Tuple;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
/**
* This processor is used to pull files from HDFS. The files being pulled in MUST be SequenceFile formatted files. The processor creates a flow file for each key/value entry in the ingested
@@ -80,9 +79,8 @@ public class GetHDFSSequenceFile extends GetHDFS {
@Override
protected void processBatchOfFiles(final List<Path> files, final ProcessContext context, final ProcessSession session) {
- final Tuple<Configuration, FileSystem> hadoopResources = hdfsResources.get();
- final Configuration conf = hadoopResources.getKey();
- final FileSystem hdfs = hadoopResources.getValue();
+ final Configuration conf = getConfiguration();
+ final FileSystem hdfs = getFileSystem();
final String flowFileContentValue = context.getProperty(FLOWFILE_CONTENT).getValue();
final boolean keepSourceFiles = context.getProperty(KEEP_SOURCE_FILE).asBoolean();
final Double bufferSizeProp = context.getProperty(BUFFER_SIZE).asDataSize(DataUnit.B);
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/e4f43156/nifi/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/ListHDFS.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/ListHDFS.java b/nifi/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/ListHDFS.java
index 56a128a..151cbf2 100644
--- a/nifi/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/ListHDFS.java
+++ b/nifi/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/ListHDFS.java
@@ -118,12 +118,14 @@ public class ListHDFS extends AbstractHadoopProcessor {
private volatile Long lastListingTime = null;
private volatile Set<Path> latestPathsListed = new HashSet<>();
private volatile boolean electedPrimaryNode = false;
- private File persistenceFile = null;
@Override
protected void init(final ProcessorInitializationContext context) {
super.init(context);
- persistenceFile = new File("conf/state/" + getIdentifier());
+ }
+
+ protected File getPersistenceFile() {
+ return new File("conf/state/" + getIdentifier());
}
@Override
@@ -143,7 +145,7 @@ public class ListHDFS extends AbstractHadoopProcessor {
return relationships;
}
- private String getKey(final String directory) {
+ protected String getKey(final String directory) {
return getIdentifier() + ".lastListingTime." + directory;
}
@@ -169,18 +171,13 @@ public class ListHDFS extends AbstractHadoopProcessor {
}
- @Override
- public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
- final String directory = context.getProperty(DIRECTORY).getValue();
-
+ private Long getMinTimestamp(final String directory, final DistributedMapCacheClient client) throws IOException {
// Determine the timestamp for the last file that we've listed.
Long minTimestamp = lastListingTime;
if ( minTimestamp == null || electedPrimaryNode ) {
// We haven't yet restored any state from local or distributed state - or it's been at least a minute since
// we have performed a listing. In this case,
// First, attempt to get timestamp from distributed cache service.
- final DistributedMapCacheClient client = context.getProperty(DISTRIBUTED_CACHE_SERVICE).asControllerService(DistributedMapCacheClient.class);
-
try {
final StringSerDe serde = new StringSerDe();
final String serializedState = client.get(getKey(directory), serde, serde);
@@ -197,14 +194,13 @@ public class ListHDFS extends AbstractHadoopProcessor {
this.lastListingTime = minTimestamp;
electedPrimaryNode = false; // no requirement to pull an update from the distributed cache anymore.
} catch (final IOException ioe) {
- getLogger().error("Failed to retrieve timestamp of last listing from Distributed Cache Service. Will not perform listing until this is accomplished.");
- context.yield();
- return;
+ throw ioe;
}
// Check the persistence file. We want to use the latest timestamp that we have so that
// we don't duplicate data.
try {
+ final File persistenceFile = getPersistenceFile();
if ( persistenceFile.exists() ) {
try (final FileInputStream fis = new FileInputStream(persistenceFile)) {
final Properties props = new Properties();
@@ -240,9 +236,25 @@ public class ListHDFS extends AbstractHadoopProcessor {
}
}
+ return minTimestamp;
+ }
+
+ @Override
+ public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
+ final String directory = context.getProperty(DIRECTORY).getValue();
+ final DistributedMapCacheClient client = context.getProperty(DISTRIBUTED_CACHE_SERVICE).asControllerService(DistributedMapCacheClient.class);
+
+ final Long minTimestamp;
+ try {
+ minTimestamp = getMinTimestamp(directory, client);
+ } catch (final IOException ioe) {
+ getLogger().error("Failed to retrieve timestamp of last listing from Distributed Cache Service. Will not perform listing until this is accomplished.");
+ context.yield();
+ return;
+ }
// Pull in any file that is newer than the timestamp that we have.
- final FileSystem hdfs = hdfsResources.get().getValue();
+ final FileSystem hdfs = getFileSystem();
final boolean recursive = context.getProperty(RECURSE_SUBDIRS).asBoolean();
final Path rootPath = new Path(directory);
@@ -311,7 +323,6 @@ public class ListHDFS extends AbstractHadoopProcessor {
}
// Attempt to save state to remote server.
- final DistributedMapCacheClient client = context.getProperty(DISTRIBUTED_CACHE_SERVICE).asControllerService(DistributedMapCacheClient.class);
try {
client.put(getKey(directory), serializedState, new StringSerDe(), new StringSerDe());
} catch (final IOException ioe) {
@@ -397,11 +408,12 @@ public class ListHDFS extends AbstractHadoopProcessor {
}
}
- private void persistLocalState(final String directory, final String serializedState) throws IOException {
+ protected void persistLocalState(final String directory, final String serializedState) throws IOException {
// we need to keep track of all files that we pulled in that had a modification time equal to
// lastListingTime so that we can avoid pulling those files in again. We can't just ignore any files
// that have a mod time equal to that timestamp because more files may come in with the same timestamp
// later in the same millisecond.
+ final File persistenceFile = getPersistenceFile();
final File dir = persistenceFile.getParentFile();
if ( !dir.exists() && !dir.mkdirs() ) {
throw new IOException("Could not create directory " + dir.getAbsolutePath() + " in order to save local state");
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/e4f43156/nifi/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/PutHDFS.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/PutHDFS.java b/nifi/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/PutHDFS.java
index 057f786..52cf475 100644
--- a/nifi/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/PutHDFS.java
+++ b/nifi/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/PutHDFS.java
@@ -32,10 +32,10 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.ipc.RemoteException;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.SeeAlso;
import org.apache.nifi.annotation.documentation.Tags;
-import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.PropertyValue;
@@ -54,7 +54,6 @@ import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.stream.io.BufferedInputStream;
import org.apache.nifi.stream.io.StreamUtils;
import org.apache.nifi.util.StopWatch;
-import org.apache.nifi.util.Tuple;
/**
* This processor copies FlowFiles to HDFS.
@@ -183,8 +182,7 @@ public class PutHDFS extends AbstractHadoopProcessor {
} else {
dfsUmask = FsPermission.DEFAULT_UMASK;
}
- final Tuple<Configuration, FileSystem> resources = hdfsResources.get();
- final Configuration conf = resources.getKey();
+ final Configuration conf = getConfiguration();
FsPermission.setUMask(conf, new FsPermission(dfsUmask));
}
@@ -195,26 +193,23 @@ public class PutHDFS extends AbstractHadoopProcessor {
return;
}
- final Tuple<Configuration, FileSystem> resources = hdfsResources.get();
- if (resources == null || resources.getKey() == null || resources.getValue() == null) {
+ final Configuration configuration = getConfiguration();
+ final FileSystem hdfs = getFileSystem();
+ if (configuration == null || hdfs == null) {
getLogger().error("HDFS not configured properly");
session.transfer(flowFile, REL_FAILURE);
context.yield();
return;
}
- final Configuration conf = resources.getKey();
- final FileSystem hdfs = resources.getValue();
- final Path configuredRootDirPath = new Path(context.getProperty(DIRECTORY).evaluateAttributeExpressions(flowFile)
- .getValue());
+ final Path configuredRootDirPath = new Path(context.getProperty(DIRECTORY).evaluateAttributeExpressions(flowFile).getValue());
final String conflictResponse = context.getProperty(CONFLICT_RESOLUTION).getValue();
final Double blockSizeProp = context.getProperty(BLOCK_SIZE).asDataSize(DataUnit.B);
final long blockSize = blockSizeProp != null ? blockSizeProp.longValue() : hdfs.getDefaultBlockSize(configuredRootDirPath);
final Double bufferSizeProp = context.getProperty(BUFFER_SIZE).asDataSize(DataUnit.B);
- final int bufferSize = bufferSizeProp != null ? bufferSizeProp.intValue() : conf.getInt(BUFFER_SIZE_KEY,
- BUFFER_SIZE_DEFAULT);
+ final int bufferSize = bufferSizeProp != null ? bufferSizeProp.intValue() : configuration.getInt(BUFFER_SIZE_KEY, BUFFER_SIZE_DEFAULT);
final Integer replicationProp = context.getProperty(REPLICATION_FACTOR).asInteger();
final short replication = replicationProp != null ? replicationProp.shortValue() : hdfs
@@ -230,7 +225,7 @@ public class PutHDFS extends AbstractHadoopProcessor {
// Create destination directory if it does not exist
try {
- if (!hdfs.getFileStatus(configuredRootDirPath).isDir()) {
+ if (!hdfs.getFileStatus(configuredRootDirPath).isDirectory()) {
throw new IOException(configuredRootDirPath.toString() + " already exists and is not a directory");
}
} catch (FileNotFoundException fe) {
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/e4f43156/nifi/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/TestListHDFS.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/TestListHDFS.java b/nifi/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/TestListHDFS.java
new file mode 100644
index 0000000..499fd51
--- /dev/null
+++ b/nifi/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/TestListHDFS.java
@@ -0,0 +1,347 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.hadoop;
+
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNotSame;
+import static org.junit.Assert.assertTrue;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.net.URI;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.util.Progressable;
+import org.apache.nifi.annotation.notification.PrimaryNodeState;
+import org.apache.nifi.controller.AbstractControllerService;
+import org.apache.nifi.distributed.cache.client.Deserializer;
+import org.apache.nifi.distributed.cache.client.DistributedMapCacheClient;
+import org.apache.nifi.distributed.cache.client.Serializer;
+import org.apache.nifi.reporting.InitializationException;
+import org.apache.nifi.util.MockFlowFile;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TestListHDFS {
+
+ private TestRunner runner;
+ private ListHDFSWithMockedFileSystem proc;
+ private MockCacheClient service;
+
+ @Before
+ public void setup() throws InitializationException {
+ proc = new ListHDFSWithMockedFileSystem();
+ runner = TestRunners.newTestRunner(proc);
+
+ service = new MockCacheClient();
+ runner.addControllerService("service", service);
+ runner.enableControllerService(service);
+
+ runner.setProperty(ListHDFS.HADOOP_CONFIGURATION_RESOURCES, "src/test/resources/core-site.xml");
+ runner.setProperty(ListHDFS.DIRECTORY, "/test");
+ runner.setProperty(ListHDFS.DISTRIBUTED_CACHE_SERVICE, "service");
+ }
+
+ @Test
+ public void testListingHasCorrectAttributes() {
+ proc.fileSystem.addFileStatus(new Path("/test"), new FileStatus(1L, false, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("/test/testFile.txt")));
+
+ runner.run();
+
+ runner.assertAllFlowFilesTransferred(ListHDFS.REL_SUCCESS, 1);
+ final MockFlowFile mff = runner.getFlowFilesForRelationship(ListHDFS.REL_SUCCESS).get(0);
+ mff.assertAttributeEquals("path", "/test");
+ mff.assertAttributeEquals("filename", "testFile.txt");
+ }
+
+
+ @Test
+ public void testRecursive() {
+ proc.fileSystem.addFileStatus(new Path("/test"), new FileStatus(1L, false, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("/test/testFile.txt")));
+
+ proc.fileSystem.addFileStatus(new Path("/test"), new FileStatus(1L, true, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("/test/testDir")));
+ proc.fileSystem.addFileStatus(new Path("/test/testDir"), new FileStatus(1L, false, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("/test/testDir/1.txt")));
+
+ runner.run();
+
+ runner.assertAllFlowFilesTransferred(ListHDFS.REL_SUCCESS, 2);
+
+ final MockFlowFile mff1 = runner.getFlowFilesForRelationship(ListHDFS.REL_SUCCESS).get(0);
+ mff1.assertAttributeEquals("path", "/test");
+ mff1.assertAttributeEquals("filename", "testFile.txt");
+
+ final MockFlowFile mff2 = runner.getFlowFilesForRelationship(ListHDFS.REL_SUCCESS).get(1);
+ mff2.assertAttributeEquals("path", "/test/testDir");
+ mff2.assertAttributeEquals("filename", "1.txt");
+ }
+
+ @Test
+ public void testNotRecursive() {
+ runner.setProperty(ListHDFS.RECURSE_SUBDIRS, "false");
+ proc.fileSystem.addFileStatus(new Path("/test"), new FileStatus(1L, false, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("/test/testFile.txt")));
+
+ proc.fileSystem.addFileStatus(new Path("/test"), new FileStatus(1L, true, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("/test/testDir")));
+ proc.fileSystem.addFileStatus(new Path("/test/testDir"), new FileStatus(1L, false, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("/test/testDir/1.txt")));
+
+ runner.run();
+
+ runner.assertAllFlowFilesTransferred(ListHDFS.REL_SUCCESS, 1);
+
+ final MockFlowFile mff1 = runner.getFlowFilesForRelationship(ListHDFS.REL_SUCCESS).get(0);
+ mff1.assertAttributeEquals("path", "/test");
+ mff1.assertAttributeEquals("filename", "testFile.txt");
+ }
+
+
+ @Test
+ public void testNoListUntilUpdateFromRemoteOnPrimaryNodeChange() {
+ proc.fileSystem.addFileStatus(new Path("/test"), new FileStatus(1L, false, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("/test/testFile.txt")));
+
+ runner.run();
+
+ runner.assertAllFlowFilesTransferred(ListHDFS.REL_SUCCESS, 1);
+
+ final MockFlowFile mff1 = runner.getFlowFilesForRelationship(ListHDFS.REL_SUCCESS).get(0);
+ mff1.assertAttributeEquals("path", "/test");
+ mff1.assertAttributeEquals("filename", "testFile.txt");
+
+ runner.clearTransferState();
+
+ // add new file to pull
+ proc.fileSystem.addFileStatus(new Path("/test"), new FileStatus(1L, false, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("/test/testFile2.txt")));
+
+ // trigger primary node change
+ proc.onPrimaryNodeChange(PrimaryNodeState.ELECTED_PRIMARY_NODE);
+
+ // cause calls to service to fail
+ service.failOnCalls = true;
+
+ runner.run();
+ runner.assertAllFlowFilesTransferred(ListHDFS.REL_SUCCESS, 0);
+
+ runner.run();
+ runner.assertAllFlowFilesTransferred(ListHDFS.REL_SUCCESS, 0);
+
+ final String key = proc.getKey("/test");
+
+ // wait just to a bit to ensure that the timestamp changes when we update the service
+ final Object curVal = service.values.get(key);
+ try {
+ Thread.sleep(10L);
+ } catch (final InterruptedException ie) {
+ }
+
+ service.failOnCalls = false;
+ runner.run();
+ runner.assertAllFlowFilesTransferred(ListHDFS.REL_SUCCESS, 1);
+
+ // ensure state saved both locally & remotely
+ assertTrue(proc.localStateSaved);
+ assertNotNull(service.values.get(key));
+ assertNotSame(curVal, service.values.get(key));
+ }
+
+
+ private FsPermission create777() {
+ return new FsPermission((short) 0777);
+ }
+
+
+ private class ListHDFSWithMockedFileSystem extends ListHDFS {
+ private final MockFileSystem fileSystem = new MockFileSystem();
+ private boolean localStateSaved = false;
+
+ @Override
+ protected FileSystem getFileSystem() {
+ return fileSystem;
+ }
+
+ @Override
+ protected File getPersistenceFile() {
+ return new File("target/conf/state-file");
+ }
+
+ @Override
+ protected FileSystem getFileSystem(Configuration config) throws IOException {
+ return fileSystem;
+ }
+
+ @Override
+ protected void persistLocalState(String directory, String serializedState) throws IOException {
+ super.persistLocalState(directory, serializedState);
+ localStateSaved = true;
+ }
+ }
+
+
+ private class MockFileSystem extends FileSystem {
+ private final Map<Path, Set<FileStatus>> fileStatuses = new HashMap<>();
+
+ public void addFileStatus(final Path parent, final FileStatus child) {
+ Set<FileStatus> children = fileStatuses.get(parent);
+ if ( children == null ) {
+ children = new HashSet<>();
+ fileStatuses.put(parent, children);
+ }
+
+ children.add(child);
+ }
+
+
+ @Override
+ public long getDefaultBlockSize() {
+ return 1024L;
+ }
+
+ @Override
+ public short getDefaultReplication() {
+ return 1;
+ }
+
+ @Override
+ public URI getUri() {
+ return null;
+ }
+
+ @Override
+ public FSDataInputStream open(Path f, int bufferSize) throws IOException {
+ return null;
+ }
+
+ @Override
+ public FSDataOutputStream create(Path f, FsPermission permission, boolean overwrite, int bufferSize, short replication, long blockSize, Progressable progress) throws IOException {
+ return null;
+ }
+
+ @Override
+ public FSDataOutputStream append(Path f, int bufferSize, Progressable progress) throws IOException {
+ return null;
+ }
+
+ @Override
+ public boolean rename(Path src, Path dst) throws IOException {
+ return false;
+ }
+
+ @Override
+ public boolean delete(Path f, boolean recursive) throws IOException {
+ return false;
+ }
+
+ @Override
+ public FileStatus[] listStatus(Path f) throws FileNotFoundException, IOException {
+ final Set<FileStatus> statuses = fileStatuses.get(f);
+ if ( statuses == null ) {
+ return new FileStatus[0];
+ }
+
+ return statuses.toArray(new FileStatus[statuses.size()]);
+ }
+
+ @Override
+ public void setWorkingDirectory(Path new_dir) {
+
+ }
+
+ @Override
+ public Path getWorkingDirectory() {
+ return new Path(new File(".").getAbsolutePath());
+ }
+
+ @Override
+ public boolean mkdirs(Path f, FsPermission permission) throws IOException {
+ return false;
+ }
+
+ @Override
+ public FileStatus getFileStatus(Path f) throws IOException {
+ return null;
+ }
+
+ }
+
+
+ private class MockCacheClient extends AbstractControllerService implements DistributedMapCacheClient {
+ private ConcurrentMap<Object, Object> values = new ConcurrentHashMap<>();
+ private boolean failOnCalls = false;
+
+ private void verifyNotFail() throws IOException {
+ if ( failOnCalls ) {
+ throw new IOException("Could not call to remote service because Unit Test marked service unavailable");
+ }
+ }
+
+ @Override
+ public <K, V> boolean putIfAbsent(K key, V value, Serializer<K> keySerializer, Serializer<V> valueSerializer) throws IOException {
+ verifyNotFail();
+ final Object retValue = values.putIfAbsent(key, value);
+ return (retValue == null);
+ }
+
+ @Override
+ @SuppressWarnings("unchecked")
+ public <K, V> V getAndPutIfAbsent(K key, V value, Serializer<K> keySerializer, Serializer<V> valueSerializer, Deserializer<V> valueDeserializer) throws IOException {
+ verifyNotFail();
+ return (V) values.putIfAbsent(key, value);
+ }
+
+ @Override
+ public <K> boolean containsKey(K key, Serializer<K> keySerializer) throws IOException {
+ verifyNotFail();
+ return values.containsKey(key);
+ }
+
+ @Override
+ public <K, V> void put(K key, V value, Serializer<K> keySerializer, Serializer<V> valueSerializer) throws IOException {
+ verifyNotFail();
+ values.put(key, value);
+ }
+
+ @Override
+ @SuppressWarnings("unchecked")
+ public <K, V> V get(K key, Serializer<K> keySerializer, Deserializer<V> valueDeserializer) throws IOException {
+ verifyNotFail();
+ return (V) values.get(key);
+ }
+
+ @Override
+ public void close() throws IOException {
+ }
+
+ @Override
+ public <K> boolean remove(K key, Serializer<K> serializer) throws IOException {
+ verifyNotFail();
+ values.remove(key);
+ return true;
+ }
+ }
+}