You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by is...@apache.org on 2016/03/17 08:57:22 UTC
[1/2] incubator-apex-malhar git commit: APEXMALHAR-2008: HDFS File
Input module
Repository: incubator-apex-malhar
Updated Branches:
refs/heads/master 51a19e1be -> becee7f82
APEXMALHAR-2008: HDFS File Input module
Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/commit/f9fe3d5e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/tree/f9fe3d5e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/diff/f9fe3d5e
Branch: refs/heads/master
Commit: f9fe3d5e9e5ef06ebba313a9a09fa268ec9ead7c
Parents: d3a7063
Author: Priyanka Gugale <pr...@datatorrent.com>
Authored: Fri Mar 11 13:22:10 2016 +0530
Committer: Priyanka Gugale <pr...@datatorrent.com>
Committed: Wed Mar 16 15:07:08 2016 +0530
----------------------------------------------------------------------
.../datatorrent/lib/io/block/BlockMetadata.java | 31 ++-
.../datatorrent/lib/io/block/BlockReader.java | 66 +++++
.../lib/io/fs/AbstractFileSplitter.java | 45 +++-
.../lib/io/fs/FileSplitterInput.java | 81 +++++--
.../datatorrent/lib/io/fs/HDFSFileSplitter.java | 120 +++++++++
.../datatorrent/lib/io/fs/HDFSInputModule.java | 243 +++++++++++++++++++
.../lib/io/fs/FileSplitterInputTest.java | 2 +-
.../lib/io/fs/HDFSInputModuleAppTest.java | 221 +++++++++++++++++
8 files changed, 786 insertions(+), 23 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/f9fe3d5e/library/src/main/java/com/datatorrent/lib/io/block/BlockMetadata.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/io/block/BlockMetadata.java b/library/src/main/java/com/datatorrent/lib/io/block/BlockMetadata.java
index 534024d..6e38e45 100644
--- a/library/src/main/java/com/datatorrent/lib/io/block/BlockMetadata.java
+++ b/library/src/main/java/com/datatorrent/lib/io/block/BlockMetadata.java
@@ -196,11 +196,13 @@ public interface BlockMetadata
}
/**
- * A block of file which contains file path adn other block properties.
+ * A block of file which contains file path and other block properties.
+ * It also controls if blocks should be read in sequence
*/
class FileBlockMetadata extends AbstractBlockMetadata
{
private final String filePath;
+ private boolean readBlockInSequence = false;
protected FileBlockMetadata()
{
@@ -225,10 +227,37 @@ public interface BlockMetadata
return filePath;
}
+ /**
+ * Get if blocks should be read in sequence
+ * @return readBlockInSequence
+ */
+ public boolean isReadBlockInSequence()
+ {
+ return readBlockInSequence;
+ }
+
+ /**
+ * Set if blokcs should be read in sequence
+ * @param readBlockInSequence
+ */
+ public void setReadBlockInSequence(boolean readBlockInSequence)
+ {
+ this.readBlockInSequence = readBlockInSequence;
+ }
+
public FileBlockMetadata newInstance(@NotNull String filePath)
{
Preconditions.checkNotNull(filePath);
return new FileBlockMetadata(filePath);
}
+
+ @Override
+ public int hashCode()
+ {
+ if (isReadBlockInSequence()) {
+ return getFilePath().hashCode();
+ }
+ return super.hashCode();
+ }
}
}
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/f9fe3d5e/library/src/main/java/com/datatorrent/lib/io/block/BlockReader.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/io/block/BlockReader.java b/library/src/main/java/com/datatorrent/lib/io/block/BlockReader.java
new file mode 100644
index 0000000..f4f7d76
--- /dev/null
+++ b/library/src/main/java/com/datatorrent/lib/io/block/BlockReader.java
@@ -0,0 +1,66 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.lib.io.block;
+
+import java.io.IOException;
+import java.net.URI;
+
+import org.apache.hadoop.fs.FileSystem;
+
+import com.datatorrent.api.AutoMetric;
+
+/**
+ * BlockReader extends {@link FSSliceReader} to accept case insensitive uri
+ */
+public class BlockReader extends FSSliceReader
+{
+ @AutoMetric
+ private long bytesRead;
+
+ protected String uri;
+
+ @Override
+ public void beginWindow(long windowId)
+ {
+ super.beginWindow(windowId);
+ bytesRead = 0;
+ }
+
+ @Override
+ protected FileSystem getFSInstance() throws IOException
+ {
+ return FileSystem.newInstance(URI.create(uri), configuration);
+ }
+
+ /**
+ * Sets the uri
+ *
+ * @param uri of form hdfs://hostname:port/path/to/input
+ */
+ public void setUri(String uri)
+ {
+ this.uri = uri;
+ }
+
+ public String getUri()
+ {
+ return uri;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/f9fe3d5e/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileSplitter.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileSplitter.java b/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileSplitter.java
index cd47d48..b39168c 100644
--- a/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileSplitter.java
+++ b/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileSplitter.java
@@ -18,6 +18,7 @@
*/
package com.datatorrent.lib.io.fs;
+import java.io.File;
import java.io.IOException;
import java.util.Iterator;
@@ -27,12 +28,10 @@ import javax.validation.constraints.NotNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
import com.google.common.base.Preconditions;
-
import com.datatorrent.api.AutoMetric;
import com.datatorrent.api.Context;
import com.datatorrent.api.DefaultOutputPort;
@@ -206,6 +205,13 @@ public abstract class AbstractFileSplitter extends BaseOperator
fileMetadata.setDirectory(status.isDirectory());
fileMetadata.setFileLength(status.getLen());
+ if (fileInfo.getDirectoryPath() == null) { // Direct filename is given as input.
+ fileMetadata.setRelativePath(status.getPath().getName());
+ } else {
+ String relativePath = getRelativePathWithFolderName(fileInfo);
+ fileMetadata.setRelativePath(relativePath);
+ }
+
if (!status.isDirectory()) {
int noOfBlocks = (int)((status.getLen() / blockSize) + (((status.getLen() % blockSize) == 0) ? 0 : 1));
if (fileMetadata.getDataOffset() >= status.getLen()) {
@@ -217,6 +223,15 @@ public abstract class AbstractFileSplitter extends BaseOperator
return fileMetadata;
}
+ /*
+ * As folder name was given to input for copy, prefix folder name to the sub items to copy.
+ */
+ private String getRelativePathWithFolderName(FileInfo fileInfo)
+ {
+ String parentDir = new Path(fileInfo.getDirectoryPath()).getName();
+ return parentDir + File.separator + fileInfo.getRelativeFilePath();
+ }
+
/**
* This can be over-ridden to create file metadata of type that extends {@link FileSplitterInput.FileMetadata}
*
@@ -346,6 +361,7 @@ public abstract class AbstractFileSplitter extends BaseOperator
private long discoverTime;
private long[] blockIds;
private boolean isDirectory;
+ private String relativePath;
@SuppressWarnings("unused")
protected FileMetadata()
@@ -493,6 +509,31 @@ public abstract class AbstractFileSplitter extends BaseOperator
{
return isDirectory;
}
+
+ /**
+ * Sets relative file path
+ * @return relativePath
+ */
+ public String getRelativePath()
+ {
+ return relativePath;
+ }
+
+ /**
+ * Gets relative file path
+ * @param relativePath
+ */
+ public void setRelativePath(String relativePath)
+ {
+ this.relativePath = relativePath;
+ }
+
+ @Override
+ public String toString()
+ {
+ return "FileMetadata [fileName=" + fileName + ", numberOfBlocks=" + numberOfBlocks + ", isDirectory=" + isDirectory + ", relativePath=" + relativePath + "]";
+ }
+
}
/**
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/f9fe3d5e/library/src/main/java/com/datatorrent/lib/io/fs/FileSplitterInput.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/io/fs/FileSplitterInput.java b/library/src/main/java/com/datatorrent/lib/io/fs/FileSplitterInput.java
index ab70047..1d8248f 100644
--- a/library/src/main/java/com/datatorrent/lib/io/fs/FileSplitterInput.java
+++ b/library/src/main/java/com/datatorrent/lib/io/fs/FileSplitterInput.java
@@ -292,6 +292,9 @@ public class FileSplitterInput extends AbstractFileSplitter implements InputOper
private transient ScannedFileInfo lastScannedInfo;
private transient int numDiscoveredPerIteration;
+ @NotNull
+ protected final Map<String, Map<String, Long>> inputDirTolastModifiedTimes;
+
public TimeBasedDirectoryScanner()
{
recursive = true;
@@ -301,6 +304,7 @@ public class FileSplitterInput extends AbstractFileSplitter implements InputOper
discoveredFiles = new LinkedBlockingDeque<>();
atomicThrowable = new AtomicReference<>();
ignoredFiles = Sets.newHashSet();
+ inputDirTolastModifiedTimes = Maps.newHashMap();
}
@Override
@@ -360,7 +364,9 @@ public class FileSplitterInput extends AbstractFileSplitter implements InputOper
lastScannedInfo = null;
numDiscoveredPerIteration = 0;
for (String afile : files) {
- scan(new Path(afile), null);
+ Map<String, Long> lastModifiedTimesForInputDir;
+ lastModifiedTimesForInputDir = getLastModifiedTimeMap(afile);
+ scan(new Path(afile), null, lastModifiedTimesForInputDir);
}
scanIterationComplete();
} else {
@@ -375,6 +381,15 @@ public class FileSplitterInput extends AbstractFileSplitter implements InputOper
}
}
+ private Map<String, Long> getLastModifiedTimeMap(String key)
+ {
+ if (inputDirTolastModifiedTimes.get(key) == null) {
+ Map<String, Long> modifiedTimeMap = Maps.newHashMap();
+ inputDirTolastModifiedTimes.put(key, modifiedTimeMap);
+ }
+ return inputDirTolastModifiedTimes.get(key);
+ }
+
/**
* Operations that need to be done once a scan is complete.
*/
@@ -386,6 +401,13 @@ public class FileSplitterInput extends AbstractFileSplitter implements InputOper
protected void scan(@NotNull Path filePath, Path rootPath)
{
+ Map<String, Long> lastModifiedTimesForInputDir;
+ lastModifiedTimesForInputDir = getLastModifiedTimeMap(filePath.toUri().getPath());
+ scan(filePath, rootPath, lastModifiedTimesForInputDir);
+ }
+
+ private void scan(Path filePath, Path rootPath, Map<String, Long> lastModifiedTimesForInputDir)
+ {
try {
FileStatus parentStatus = fs.getFileStatus(filePath);
String parentPathStr = filePath.toUri().getPath();
@@ -393,27 +415,22 @@ public class FileSplitterInput extends AbstractFileSplitter implements InputOper
LOG.debug("scan {}", parentPathStr);
FileStatus[] childStatuses = fs.listStatus(filePath);
- for (FileStatus status : childStatuses) {
- Path childPath = status.getPath();
- ScannedFileInfo info = createScannedFileInfo(filePath, parentStatus, childPath, status, rootPath);
-
- if (skipFile(childPath, status.getModificationTime(), referenceTimes.get(info.getFilePath()))) {
- continue;
- }
- if (status.isDirectory()) {
- if (recursive) {
- scan(childPath, rootPath == null ? parentStatus.getPath() : rootPath);
- }
- }
+ if (childStatuses.length == 0 && rootPath == null && lastModifiedTimesForInputDir.get(parentPathStr) == null) { // empty input directory copy as is
+ ScannedFileInfo info = new ScannedFileInfo(null, filePath.toString(), parentStatus.getModificationTime());
+ processDiscoveredFile(info);
+ lastModifiedTimesForInputDir.put(parentPathStr, parentStatus.getModificationTime());
+ }
+ for (FileStatus childStatus : childStatuses) {
+ Path childPath = childStatus.getPath();
String childPathStr = childPath.toUri().getPath();
- if (ignoredFiles.contains(childPathStr)) {
- continue;
- }
- if (acceptFile(childPathStr)) {
- LOG.debug("found {}", childPathStr);
- processDiscoveredFile(info);
+
+ if (childStatus.isDirectory() && isRecursive()) {
+ addToDiscoveredFiles(rootPath, parentStatus, childStatus, lastModifiedTimesForInputDir);
+ scan(childPath, rootPath == null ? parentStatus.getPath() : rootPath, lastModifiedTimesForInputDir);
+ } else if (acceptFile(childPathStr)) {
+ addToDiscoveredFiles(rootPath, parentStatus, childStatus, lastModifiedTimesForInputDir);
} else {
// don't look at it again
ignoredFiles.add(childPathStr);
@@ -426,6 +443,31 @@ public class FileSplitterInput extends AbstractFileSplitter implements InputOper
}
}
+ private void addToDiscoveredFiles(Path rootPath, FileStatus parentStatus, FileStatus childStatus,
+ Map<String, Long> lastModifiedTimesForInputDir) throws IOException
+ {
+ Path childPath = childStatus.getPath();
+ String childPathStr = childPath.toUri().getPath();
+ // Directory by now is scanned forcibly. Now check for whether file/directory needs to be added to discoveredFiles.
+ Long oldModificationTime = lastModifiedTimesForInputDir.get(childPathStr);
+ lastModifiedTimesForInputDir.put(childPathStr, childStatus.getModificationTime());
+
+ if (skipFile(childPath, childStatus.getModificationTime(), oldModificationTime) || // Skip dir or file if no timestamp modification
+ (childStatus.isDirectory() && (oldModificationTime != null))) { // If timestamp modified but if its a directory and already present in map, then skip.
+ return;
+ }
+
+ if (ignoredFiles.contains(childPathStr)) {
+ return;
+ }
+
+ ScannedFileInfo info = createScannedFileInfo(parentStatus.getPath(), parentStatus, childPath, childStatus,
+ rootPath);
+
+ LOG.debug("Processing file: " + info.getFilePath());
+ processDiscoveredFile(info);
+ }
+
protected void processDiscoveredFile(ScannedFileInfo info)
{
numDiscoveredPerIteration++;
@@ -619,4 +661,5 @@ public class FileSplitterInput extends AbstractFileSplitter implements InputOper
}
private static final Logger LOG = LoggerFactory.getLogger(FileSplitterInput.class);
+
}
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/f9fe3d5e/library/src/main/java/com/datatorrent/lib/io/fs/HDFSFileSplitter.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/io/fs/HDFSFileSplitter.java b/library/src/main/java/com/datatorrent/lib/io/fs/HDFSFileSplitter.java
new file mode 100644
index 0000000..24466d5
--- /dev/null
+++ b/library/src/main/java/com/datatorrent/lib/io/fs/HDFSFileSplitter.java
@@ -0,0 +1,120 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.lib.io.fs;
+
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import org.apache.hadoop.fs.Path;
+
+import com.datatorrent.api.Context.OperatorContext;
+import com.datatorrent.lib.io.block.BlockMetadata.FileBlockMetadata;
+
+/**
+ * HDFSFileSplitter extends {@link FileSplitterInput} to,
+ * 1. Add relative path to file metadata.
+ * 2. Ignore HDFS temp files (files with extensions _COPYING_).
+ * 3. Set sequencial read option on readers.
+ */
+public class HDFSFileSplitter extends FileSplitterInput
+{
+ private boolean sequencialFileRead;
+
+ public HDFSFileSplitter()
+ {
+ super();
+ super.setScanner(new HDFSScanner());
+ }
+
+
+ @Override
+ protected FileBlockMetadata createBlockMetadata(FileMetadata fileMetadata)
+ {
+ FileBlockMetadata blockMetadta = new FileBlockMetadata(fileMetadata.getFilePath());
+ blockMetadta.setReadBlockInSequence(sequencialFileRead);
+ return blockMetadta;
+ }
+
+ public boolean isSequencialFileRead()
+ {
+ return sequencialFileRead;
+ }
+
+ public void setSequencialFileRead(boolean sequencialFileRead)
+ {
+ this.sequencialFileRead = sequencialFileRead;
+ }
+
+ /**
+ * HDFSScanner extends {@link TimeBasedDirectoryScanner} to ignore HDFS temporary files
+ * and files containing unsupported characters.
+ */
+ public static class HDFSScanner extends TimeBasedDirectoryScanner
+ {
+ protected static final String UNSUPPORTED_CHARACTOR = ":";
+ private String ignoreFilePatternRegularExp = ".*._COPYING_";
+ private transient Pattern ignoreRegex;
+
+ @Override
+ public void setup(OperatorContext context)
+ {
+ super.setup(context);
+ ignoreRegex = Pattern.compile(this.ignoreFilePatternRegularExp);
+ }
+
+ @Override
+ protected boolean acceptFile(String filePathStr)
+ {
+ boolean accepted = super.acceptFile(filePathStr);
+ if (containsUnsupportedCharacters(filePathStr) || isIgnoredFile(filePathStr)) {
+ return false;
+ }
+ return accepted;
+ }
+
+ private boolean isIgnoredFile(String filePathStr)
+ {
+ String fileName = new Path(filePathStr).getName();
+ if (ignoreRegex != null) {
+ Matcher matcher = ignoreRegex.matcher(fileName);
+ if (matcher.matches()) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ private boolean containsUnsupportedCharacters(String filePathStr)
+ {
+ return new Path(filePathStr).toUri().getPath().contains(UNSUPPORTED_CHARACTOR);
+ }
+
+ public String getIgnoreFilePatternRegularExp()
+ {
+ return ignoreFilePatternRegularExp;
+ }
+
+ public void setIgnoreFilePatternRegularExp(String ignoreFilePatternRegularExp)
+ {
+ this.ignoreFilePatternRegularExp = ignoreFilePatternRegularExp;
+ this.ignoreRegex = Pattern.compile(ignoreFilePatternRegularExp);
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/f9fe3d5e/library/src/main/java/com/datatorrent/lib/io/fs/HDFSInputModule.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/io/fs/HDFSInputModule.java b/library/src/main/java/com/datatorrent/lib/io/fs/HDFSInputModule.java
new file mode 100644
index 0000000..2b914f1
--- /dev/null
+++ b/library/src/main/java/com/datatorrent/lib/io/fs/HDFSInputModule.java
@@ -0,0 +1,243 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.lib.io.fs;
+
+import javax.validation.constraints.Min;
+import javax.validation.constraints.NotNull;
+import javax.validation.constraints.Size;
+
+import org.apache.hadoop.conf.Configuration;
+
+import com.datatorrent.api.Context;
+import com.datatorrent.api.DAG;
+import com.datatorrent.api.Module;
+import com.datatorrent.common.partitioner.StatelessPartitioner;
+import com.datatorrent.lib.io.block.AbstractBlockReader.ReaderRecord;
+import com.datatorrent.lib.io.block.BlockMetadata.FileBlockMetadata;
+import com.datatorrent.lib.io.block.BlockReader;
+import com.datatorrent.lib.io.fs.AbstractFileSplitter.FileMetadata;
+import com.datatorrent.lib.io.fs.HDFSFileSplitter.HDFSScanner;
+import com.datatorrent.netlet.util.Slice;
+
+/**
+ * HDFSInputModule is used to read files/list of files (or directory) from HDFS. <br/>
+ * Module emits, <br/>
+ * 1. FileMetadata 2. BlockMetadata 3. Block Bytes.<br/><br/>
+ * The module reads data in parallel, following parameters can be configured<br/>
+ * 1. files: list of file(s)/directories to read<br/>
+ * 2. filePatternRegularExp: Files names matching given regex will be read<br/>
+ * 3. scanIntervalMillis: interval between two scans to discover new files in input directory<br/>
+ * 4. recursive: if scan recursively input directories<br/>
+ * 5. blockSize: block size used to read input blocks of file<br/>
+ * 6. readersCount: count of readers to read input file<br/>
+ * 7. sequencialFileRead: If emit file blocks in sequence?
+ */
+public class HDFSInputModule implements Module
+{
+
+ @NotNull
+ @Size(min = 1)
+ private String files;
+ private String filePatternRegularExp;
+ @Min(0)
+ private long scanIntervalMillis;
+ private boolean recursive = true;
+ private long blockSize;
+ private boolean sequencialFileRead = false;
+ private int readersCount;
+
+ public final transient ProxyOutputPort<FileMetadata> filesMetadataOutput = new ProxyOutputPort<>();
+ public final transient ProxyOutputPort<FileBlockMetadata> blocksMetadataOutput = new ProxyOutputPort<>();
+ public final transient ProxyOutputPort<ReaderRecord<Slice>> messages = new ProxyOutputPort<>();
+
+ @Override
+ public void populateDAG(DAG dag, Configuration conf)
+ {
+ HDFSFileSplitter fileSplitter = dag.addOperator("FileSplitter", new HDFSFileSplitter());
+ BlockReader blockReader = dag.addOperator("BlockReader", new BlockReader());
+
+ dag.addStream("BlockMetadata", fileSplitter.blocksMetadataOutput, blockReader.blocksMetadataInput);
+
+ filesMetadataOutput.set(fileSplitter.filesMetadataOutput);
+ blocksMetadataOutput.set(blockReader.blocksMetadataOutput);
+ messages.set(blockReader.messages);
+
+ fileSplitter.setSequencialFileRead(sequencialFileRead);
+ if (blockSize != 0) {
+ fileSplitter.setBlockSize(blockSize);
+ }
+
+ HDFSScanner fileScanner = (HDFSScanner)fileSplitter.getScanner();
+ fileScanner.setFiles(files);
+ if (scanIntervalMillis != 0) {
+ fileScanner.setScanIntervalMillis(scanIntervalMillis);
+ }
+ fileScanner.setRecursive(recursive);
+ if (filePatternRegularExp != null) {
+ fileSplitter.getScanner().setFilePatternRegularExp(filePatternRegularExp);
+ }
+
+ blockReader.setUri(files);
+ if (readersCount != 0) {
+ dag.setAttribute(blockReader, Context.OperatorContext.PARTITIONER, new StatelessPartitioner<BlockReader>(readersCount));
+ }
+ }
+
+ /**
+ * A comma separated list of directories to scan. If the path is not fully qualified the default file system is used.
+ * A fully qualified path can be provided to scan directories in other filesystems.
+ *
+ * @param files
+ * files
+ */
+ public void setFiles(String files)
+ {
+ this.files = files;
+ }
+
+ /**
+ * Gets the files to be scanned.
+ *
+ * @return files to be scanned.
+ */
+ public String getFiles()
+ {
+ return files;
+ }
+
+ /**
+ * Gets the regular expression for file names to split
+ *
+ * @return regular expression
+ */
+ public String getFilePatternRegularExp()
+ {
+ return filePatternRegularExp;
+ }
+
+ /**
+ * Only files with names matching the given java regular expression are split
+ *
+ * @param filePatternRegexp
+ * regular expression
+ */
+ public void setFilePatternRegularExp(String filePatternRegexp)
+ {
+ this.filePatternRegularExp = filePatternRegexp;
+ }
+
+ /**
+ * Gets scan interval in milliseconds, interval between two scans to discover new files in input directory
+ *
+ * @return scanInterval milliseconds
+ */
+ public long getScanIntervalMillis()
+ {
+ return scanIntervalMillis;
+ }
+
+ /**
+ * Sets scan interval in milliseconds, interval between two scans to discover new files in input directory
+ *
+ * @param scanIntervalMillis
+ */
+ public void setScanIntervalMillis(long scanIntervalMillis)
+ {
+ this.scanIntervalMillis = scanIntervalMillis;
+ }
+
+ /**
+ * Get is scan recursive
+ *
+ * @return isRecursive
+ */
+ public boolean isRecursive()
+ {
+ return recursive;
+ }
+
+ /**
+ * set is scan recursive
+ *
+ * @param recursive
+ */
+ public void setRecursive(boolean recursive)
+ {
+ this.recursive = recursive;
+ }
+
+ /**
+ * Get block size used to read input blocks of file
+ *
+ * @return blockSize
+ */
+ public long getBlockSize()
+ {
+ return blockSize;
+ }
+
+ /**
+ * Sets block size used to read input blocks of file
+ *
+ * @param blockSize
+ */
+ public void setBlockSize(long blockSize)
+ {
+ this.blockSize = blockSize;
+ }
+
+ /**
+ * Gets readers count
+ * @return readersCount
+ */
+ public int getReadersCount()
+ {
+ return readersCount;
+ }
+
+ /**
+ * Static count of readers to read input file
+ * @param readersCount
+ */
+ public void setReadersCount(int readersCount)
+ {
+ this.readersCount = readersCount;
+ }
+
+ /**
+ * Gets is sequencial file read
+ *
+ * @return sequencialFileRead
+ */
+ public boolean isSequencialFileRead()
+ {
+ return sequencialFileRead;
+ }
+
+ /**
+ * Sets is sequencial file read
+ *
+ * @param sequencialFileRead
+ */
+ public void setSequencialFileRead(boolean sequencialFileRead)
+ {
+ this.sequencialFileRead = sequencialFileRead;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/f9fe3d5e/library/src/test/java/com/datatorrent/lib/io/fs/FileSplitterInputTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/com/datatorrent/lib/io/fs/FileSplitterInputTest.java b/library/src/test/java/com/datatorrent/lib/io/fs/FileSplitterInputTest.java
index cd0de2d..1d6cf03 100644
--- a/library/src/test/java/com/datatorrent/lib/io/fs/FileSplitterInputTest.java
+++ b/library/src/test/java/com/datatorrent/lib/io/fs/FileSplitterInputTest.java
@@ -71,7 +71,7 @@ public class FileSplitterInputTest
}
allLines.addAll(lines);
File created = new File(dataDirectory, "file" + file + ".txt");
- filePaths.add(new Path(dataDirectory, created.getName()).toUri().toString());
+ filePaths.add(created.getAbsolutePath());
FileUtils.write(created, StringUtils.join(lines, '\n'));
}
return filePaths;
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/f9fe3d5e/library/src/test/java/com/datatorrent/lib/io/fs/HDFSInputModuleAppTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/com/datatorrent/lib/io/fs/HDFSInputModuleAppTest.java b/library/src/test/java/com/datatorrent/lib/io/fs/HDFSInputModuleAppTest.java
new file mode 100644
index 0000000..8bb1e26
--- /dev/null
+++ b/library/src/test/java/com/datatorrent/lib/io/fs/HDFSInputModuleAppTest.java
@@ -0,0 +1,221 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.lib.io.fs;
+
+import java.io.File;
+import java.io.FileFilter;
+import java.io.IOException;
+
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TestWatcher;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.io.filefilter.WildcardFileFilter;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+import com.datatorrent.api.DAG;
+import com.datatorrent.api.LocalMode;
+import com.datatorrent.api.StreamingApplication;
+import com.datatorrent.lib.io.block.AbstractBlockReader.ReaderRecord;
+import com.datatorrent.lib.io.block.BlockMetadata.FileBlockMetadata;
+import com.datatorrent.lib.io.fs.AbstractFileSplitter.FileMetadata;
+import com.datatorrent.lib.stream.DevNull;
+import com.datatorrent.netlet.util.Slice;
+
+public class HDFSInputModuleAppTest
+{
+ private String inputDir;
+ static String outputDir;
+ private StreamingApplication app;
+ private static final String FILE_1 = "file1.txt";
+ private static final String FILE_2 = "file2.txt";
+ private static final String FILE_1_DATA = "File one data";
+ private static final String FILE_2_DATA = "File two data. This has more data hence more blocks.";
+ static final String OUT_DATA_FILE = "fileData.txt";
+ static final String OUT_METADATA_FILE = "fileMetaData.txt";
+
+ public static class TestMeta extends TestWatcher
+ {
+ public String baseDirectory;
+
+ @Override
+ protected void starting(org.junit.runner.Description description)
+ {
+ this.baseDirectory = "target/" + description.getClassName() + "/" + description.getMethodName();
+ }
+
+ }
+
+ @Rule
+ public TestMeta testMeta = new TestMeta();
+
+ @Before
+ public void setup() throws Exception
+ {
+ inputDir = testMeta.baseDirectory + File.separator + "input";
+ outputDir = testMeta.baseDirectory + File.separator + "output";
+
+ FileUtils.writeStringToFile(new File(inputDir + File.separator + FILE_1), FILE_1_DATA);
+ FileUtils.writeStringToFile(new File(inputDir + File.separator + FILE_2), FILE_2_DATA);
+ FileUtils.forceMkdir(new File(inputDir + File.separator + "dir"));
+ FileUtils.writeStringToFile(new File(inputDir + File.separator + "dir/inner.txt"), FILE_1_DATA);
+ }
+
+ @After
+ public void tearDown() throws IOException
+ {
+ FileUtils.deleteDirectory(new File(inputDir));
+ }
+
+ @Test
+ public void testApplication() throws Exception
+ {
+ app = new Application();
+ Configuration conf = new Configuration(false);
+ conf.set("dt.operator.hdfsInputModule.prop.files", inputDir);
+ conf.set("dt.operator.hdfsInputModule.prop.blockSize", "10");
+ conf.set("dt.operator.hdfsInputModule.prop.scanIntervalMillis", "10000");
+
+ LocalMode lma = LocalMode.newInstance();
+ lma.prepareDAG(app, conf);
+ LocalMode.Controller lc = lma.getController();
+ lc.setHeartbeatMonitoringEnabled(true);
+ lc.runAsync();
+
+ long now = System.currentTimeMillis();
+ Path outDir = new Path("file://" + new File(outputDir).getAbsolutePath());
+ FileSystem fs = FileSystem.newInstance(outDir.toUri(), new Configuration());
+ while (!fs.exists(outDir) && System.currentTimeMillis() - now < 20000) {
+ Thread.sleep(500);
+ LOG.debug("Waiting for {}", outDir);
+ }
+
+ Thread.sleep(10000);
+ lc.shutdown();
+
+ Assert.assertTrue("output dir does not exist", fs.exists(outDir));
+
+ File dir = new File(outputDir);
+ FileFilter fileFilter = new WildcardFileFilter(OUT_METADATA_FILE + "*");
+ verifyFileContents(dir.listFiles(fileFilter), "[fileName=file1.txt, numberOfBlocks=2, isDirectory=false, relativePath=input/file1.txt]");
+ verifyFileContents(dir.listFiles(fileFilter), "[fileName=file2.txt, numberOfBlocks=6, isDirectory=false, relativePath=input/file2.txt]");
+ verifyFileContents(dir.listFiles(fileFilter), "[fileName=dir, numberOfBlocks=0, isDirectory=true, relativePath=input/dir]");
+ verifyFileContents(dir.listFiles(fileFilter), "[fileName=inner.txt, numberOfBlocks=2, isDirectory=false, relativePath=input/dir/inner.txt]");
+
+ fileFilter = new WildcardFileFilter(OUT_DATA_FILE + "*");
+ verifyFileContents(dir.listFiles(fileFilter), FILE_1_DATA);
+ verifyFileContents(dir.listFiles(fileFilter), FILE_2_DATA);
+ }
+
+ private void verifyFileContents(File[] files, String expectedData) throws IOException
+ {
+ StringBuilder filesData = new StringBuilder();
+ for (File file : files) {
+ filesData.append(FileUtils.readFileToString(file));
+ }
+ Assert.assertTrue("File data doesn't contain expected text" , filesData.indexOf(expectedData) > -1);
+ }
+
+ private static Logger LOG = LoggerFactory.getLogger(HDFSInputModuleAppTest.class);
+
+ private static class Application implements StreamingApplication
+ {
+ public void populateDAG(DAG dag, Configuration conf)
+ {
+ HDFSInputModule module = dag.addModule("hdfsInputModule", HDFSInputModule.class);
+
+ AbstractFileOutputOperator<FileMetadata> metadataWriter = new MetadataWriter(HDFSInputModuleAppTest.OUT_METADATA_FILE);
+ metadataWriter.setFilePath(HDFSInputModuleAppTest.outputDir);
+ dag.addOperator("FileMetadataWriter", metadataWriter);
+
+ AbstractFileOutputOperator<ReaderRecord<Slice>> dataWriter = new HDFSFileWriter(HDFSInputModuleAppTest.OUT_DATA_FILE);
+ dataWriter.setFilePath(HDFSInputModuleAppTest.outputDir);
+ dag.addOperator("FileDataWriter", dataWriter);
+
+ DevNull<FileBlockMetadata> devNull = dag.addOperator("devNull", DevNull.class);
+
+ dag.addStream("FileMetaData", module.filesMetadataOutput, metadataWriter.input);
+ dag.addStream("data", module.messages, dataWriter.input);
+ dag.addStream("blockMetadata", module.blocksMetadataOutput, devNull.data);
+ }
+ }
+
+ private static class MetadataWriter extends AbstractFileOutputOperator<FileMetadata>
+ {
+ String fileName;
+
+ @SuppressWarnings("unused")
+ private MetadataWriter()
+ {
+
+ }
+
+ public MetadataWriter(String fileName)
+ {
+ this.fileName = fileName;
+ }
+
+ @Override
+ protected String getFileName(FileMetadata tuple)
+ {
+ return fileName;
+ }
+
+ @Override
+ protected byte[] getBytesForTuple(FileMetadata tuple)
+ {
+ return (tuple).toString().getBytes();
+ }
+ }
+
+ private static class HDFSFileWriter extends AbstractFileOutputOperator<ReaderRecord<Slice>>
+ {
+ String fileName;
+
+ @SuppressWarnings("unused")
+ private HDFSFileWriter()
+ {
+ }
+
+ public HDFSFileWriter(String fileName)
+ {
+ this.fileName = fileName;
+ }
+
+ @Override
+ protected String getFileName(ReaderRecord<Slice> tuple)
+ {
+ return fileName;
+ }
+
+ @Override
+ protected byte[] getBytesForTuple(ReaderRecord<Slice> tuple)
+ {
+ return tuple.getRecord().buffer;
+ }
+ }
+
+}
[2/2] incubator-apex-malhar git commit: Merge branch
'APEXMALHAR-2008-hdfs-input-module' of
https://github.com/DT-Priyanka/incubator-apex-malhar
Posted by is...@apache.org.
Merge branch 'APEXMALHAR-2008-hdfs-input-module' of https://github.com/DT-Priyanka/incubator-apex-malhar
Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/commit/becee7f8
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/tree/becee7f8
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/diff/becee7f8
Branch: refs/heads/master
Commit: becee7f82dbca5d975d92dc45ba5f771f8682ea8
Parents: 51a19e1 f9fe3d5
Author: ishark <is...@datatorrent.com>
Authored: Wed Mar 16 15:30:26 2016 -0700
Committer: ishark <is...@datatorrent.com>
Committed: Wed Mar 16 15:30:26 2016 -0700
----------------------------------------------------------------------
.../datatorrent/lib/io/block/BlockMetadata.java | 31 ++-
.../datatorrent/lib/io/block/BlockReader.java | 66 +++++
.../lib/io/fs/AbstractFileSplitter.java | 45 +++-
.../lib/io/fs/FileSplitterInput.java | 81 +++++--
.../datatorrent/lib/io/fs/HDFSFileSplitter.java | 120 +++++++++
.../datatorrent/lib/io/fs/HDFSInputModule.java | 243 +++++++++++++++++++
.../lib/io/fs/FileSplitterInputTest.java | 2 +-
.../lib/io/fs/HDFSInputModuleAppTest.java | 221 +++++++++++++++++
8 files changed, 786 insertions(+), 23 deletions(-)
----------------------------------------------------------------------