You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by is...@apache.org on 2016/03/17 08:57:22 UTC

[1/2] incubator-apex-malhar git commit: APEXMALHAR-2008: HDFS File Input module

Repository: incubator-apex-malhar
Updated Branches:
  refs/heads/master 51a19e1be -> becee7f82


APEXMALHAR-2008: HDFS File Input module


Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/commit/f9fe3d5e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/tree/f9fe3d5e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/diff/f9fe3d5e

Branch: refs/heads/master
Commit: f9fe3d5e9e5ef06ebba313a9a09fa268ec9ead7c
Parents: d3a7063
Author: Priyanka Gugale <pr...@datatorrent.com>
Authored: Fri Mar 11 13:22:10 2016 +0530
Committer: Priyanka Gugale <pr...@datatorrent.com>
Committed: Wed Mar 16 15:07:08 2016 +0530

----------------------------------------------------------------------
 .../datatorrent/lib/io/block/BlockMetadata.java |  31 ++-
 .../datatorrent/lib/io/block/BlockReader.java   |  66 +++++
 .../lib/io/fs/AbstractFileSplitter.java         |  45 +++-
 .../lib/io/fs/FileSplitterInput.java            |  81 +++++--
 .../datatorrent/lib/io/fs/HDFSFileSplitter.java | 120 +++++++++
 .../datatorrent/lib/io/fs/HDFSInputModule.java  | 243 +++++++++++++++++++
 .../lib/io/fs/FileSplitterInputTest.java        |   2 +-
 .../lib/io/fs/HDFSInputModuleAppTest.java       | 221 +++++++++++++++++
 8 files changed, 786 insertions(+), 23 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/f9fe3d5e/library/src/main/java/com/datatorrent/lib/io/block/BlockMetadata.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/io/block/BlockMetadata.java b/library/src/main/java/com/datatorrent/lib/io/block/BlockMetadata.java
index 534024d..6e38e45 100644
--- a/library/src/main/java/com/datatorrent/lib/io/block/BlockMetadata.java
+++ b/library/src/main/java/com/datatorrent/lib/io/block/BlockMetadata.java
@@ -196,11 +196,13 @@ public interface BlockMetadata
   }
 
   /**
-   * A block of file which contains file path adn other block properties.
+   * A block of file which contains file path and other block properties.
+   * It also controls if blocks should be read in sequence
    */
   class FileBlockMetadata extends AbstractBlockMetadata
   {
     private final String filePath;
+    private boolean readBlockInSequence = false;
 
     protected FileBlockMetadata()
     {
@@ -225,10 +227,37 @@ public interface BlockMetadata
       return filePath;
     }
 
+    /**
+     * Get if blocks should be read in sequence
+     * @return readBlockInSequence
+     */
+    public boolean isReadBlockInSequence()
+    {
+      return readBlockInSequence;
+    }
+
+    /**
+     * Set if blokcs should be read in sequence
+     * @param readBlockInSequence
+     */
+    public void setReadBlockInSequence(boolean readBlockInSequence)
+    {
+      this.readBlockInSequence = readBlockInSequence;
+    }
+
     public FileBlockMetadata newInstance(@NotNull String filePath)
     {
       Preconditions.checkNotNull(filePath);
       return new FileBlockMetadata(filePath);
     }
+
+    @Override
+    public int hashCode()
+    {
+      if (isReadBlockInSequence()) {
+        return getFilePath().hashCode();
+      }
+      return super.hashCode();
+    }
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/f9fe3d5e/library/src/main/java/com/datatorrent/lib/io/block/BlockReader.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/io/block/BlockReader.java b/library/src/main/java/com/datatorrent/lib/io/block/BlockReader.java
new file mode 100644
index 0000000..f4f7d76
--- /dev/null
+++ b/library/src/main/java/com/datatorrent/lib/io/block/BlockReader.java
@@ -0,0 +1,66 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.lib.io.block;
+
+import java.io.IOException;
+import java.net.URI;
+
+import org.apache.hadoop.fs.FileSystem;
+
+import com.datatorrent.api.AutoMetric;
+
+/**
+ * BlockReader extends {@link FSSliceReader} to accept case insensitive uri
+ */
+public class BlockReader extends FSSliceReader
+{
+  @AutoMetric
+  private long bytesRead;
+
+  protected String uri;
+
+  @Override
+  public void beginWindow(long windowId)
+  {
+    super.beginWindow(windowId);
+    bytesRead = 0;
+  }
+
+  @Override
+  protected FileSystem getFSInstance() throws IOException
+  {
+    return FileSystem.newInstance(URI.create(uri), configuration);
+  }
+
+  /**
+   * Sets the uri
+   *
+   * @param uri of form hdfs://hostname:port/path/to/input
+   */
+  public void setUri(String uri)
+  {
+    this.uri = uri;
+  }
+
+  public String getUri()
+  {
+    return uri;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/f9fe3d5e/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileSplitter.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileSplitter.java b/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileSplitter.java
index cd47d48..b39168c 100644
--- a/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileSplitter.java
+++ b/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileSplitter.java
@@ -18,6 +18,7 @@
  */
 package com.datatorrent.lib.io.fs;
 
+import java.io.File;
 import java.io.IOException;
 import java.util.Iterator;
 
@@ -27,12 +28,10 @@ import javax.validation.constraints.NotNull;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.Path;
 
 import com.google.common.base.Preconditions;
-
 import com.datatorrent.api.AutoMetric;
 import com.datatorrent.api.Context;
 import com.datatorrent.api.DefaultOutputPort;
@@ -206,6 +205,13 @@ public abstract class AbstractFileSplitter extends BaseOperator
     fileMetadata.setDirectory(status.isDirectory());
     fileMetadata.setFileLength(status.getLen());
 
+    if (fileInfo.getDirectoryPath() == null) { // Direct filename is given as input.
+      fileMetadata.setRelativePath(status.getPath().getName());
+    } else {
+      String relativePath = getRelativePathWithFolderName(fileInfo);
+      fileMetadata.setRelativePath(relativePath);
+    }
+
     if (!status.isDirectory()) {
       int noOfBlocks = (int)((status.getLen() / blockSize) + (((status.getLen() % blockSize) == 0) ? 0 : 1));
       if (fileMetadata.getDataOffset() >= status.getLen()) {
@@ -217,6 +223,15 @@ public abstract class AbstractFileSplitter extends BaseOperator
     return fileMetadata;
   }
 
+  /*
+   * As folder name was given to input for copy, prefix folder name to the sub items to copy.
+   */
+  private String getRelativePathWithFolderName(FileInfo fileInfo)
+  {
+    String parentDir = new Path(fileInfo.getDirectoryPath()).getName();
+    return parentDir + File.separator + fileInfo.getRelativeFilePath();
+  }
+
   /**
    * This can be over-ridden to create file metadata of type that extends {@link FileSplitterInput.FileMetadata}
    *
@@ -346,6 +361,7 @@ public abstract class AbstractFileSplitter extends BaseOperator
     private long discoverTime;
     private long[] blockIds;
     private boolean isDirectory;
+    private String relativePath;
 
     @SuppressWarnings("unused")
     protected FileMetadata()
@@ -493,6 +509,31 @@ public abstract class AbstractFileSplitter extends BaseOperator
     {
       return isDirectory;
     }
+
+    /**
+     * Sets relative file path
+     * @return relativePath
+     */
+    public String getRelativePath()
+    {
+      return relativePath;
+    }
+
+    /**
+     * Gets relative file path
+     * @param relativePath
+     */
+    public void setRelativePath(String relativePath)
+    {
+      this.relativePath = relativePath;
+    }
+
+    @Override
+    public String toString()
+    {
+      return "FileMetadata [fileName=" + fileName + ", numberOfBlocks=" + numberOfBlocks + ", isDirectory=" + isDirectory + ", relativePath=" + relativePath + "]";
+    }
+
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/f9fe3d5e/library/src/main/java/com/datatorrent/lib/io/fs/FileSplitterInput.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/io/fs/FileSplitterInput.java b/library/src/main/java/com/datatorrent/lib/io/fs/FileSplitterInput.java
index ab70047..1d8248f 100644
--- a/library/src/main/java/com/datatorrent/lib/io/fs/FileSplitterInput.java
+++ b/library/src/main/java/com/datatorrent/lib/io/fs/FileSplitterInput.java
@@ -292,6 +292,9 @@ public class FileSplitterInput extends AbstractFileSplitter implements InputOper
     private transient ScannedFileInfo lastScannedInfo;
     private transient int numDiscoveredPerIteration;
 
+    @NotNull
+    protected final Map<String, Map<String, Long>> inputDirTolastModifiedTimes;
+
     public TimeBasedDirectoryScanner()
     {
       recursive = true;
@@ -301,6 +304,7 @@ public class FileSplitterInput extends AbstractFileSplitter implements InputOper
       discoveredFiles = new LinkedBlockingDeque<>();
       atomicThrowable = new AtomicReference<>();
       ignoredFiles = Sets.newHashSet();
+      inputDirTolastModifiedTimes = Maps.newHashMap();
     }
 
     @Override
@@ -360,7 +364,9 @@ public class FileSplitterInput extends AbstractFileSplitter implements InputOper
             lastScannedInfo = null;
             numDiscoveredPerIteration = 0;
             for (String afile : files) {
-              scan(new Path(afile), null);
+              Map<String, Long> lastModifiedTimesForInputDir;
+              lastModifiedTimesForInputDir = getLastModifiedTimeMap(afile);
+              scan(new Path(afile), null, lastModifiedTimesForInputDir);
             }
             scanIterationComplete();
           } else {
@@ -375,6 +381,15 @@ public class FileSplitterInput extends AbstractFileSplitter implements InputOper
       }
     }
 
+    private Map<String, Long> getLastModifiedTimeMap(String key)
+    {
+      if (inputDirTolastModifiedTimes.get(key) == null) {
+        Map<String, Long> modifiedTimeMap = Maps.newHashMap();
+        inputDirTolastModifiedTimes.put(key, modifiedTimeMap);
+      }
+      return inputDirTolastModifiedTimes.get(key);
+    }
+
     /**
      * Operations that need to be done once a scan is complete.
      */
@@ -386,6 +401,13 @@ public class FileSplitterInput extends AbstractFileSplitter implements InputOper
 
     protected void scan(@NotNull Path filePath, Path rootPath)
     {
+      Map<String, Long> lastModifiedTimesForInputDir;
+      lastModifiedTimesForInputDir = getLastModifiedTimeMap(filePath.toUri().getPath());
+      scan(filePath, rootPath, lastModifiedTimesForInputDir);
+    }
+
+    private void scan(Path filePath, Path rootPath, Map<String, Long> lastModifiedTimesForInputDir)
+    {
       try {
         FileStatus parentStatus = fs.getFileStatus(filePath);
         String parentPathStr = filePath.toUri().getPath();
@@ -393,27 +415,22 @@ public class FileSplitterInput extends AbstractFileSplitter implements InputOper
         LOG.debug("scan {}", parentPathStr);
 
         FileStatus[] childStatuses = fs.listStatus(filePath);
-        for (FileStatus status : childStatuses) {
-          Path childPath = status.getPath();
-          ScannedFileInfo info = createScannedFileInfo(filePath, parentStatus, childPath, status, rootPath);
-
-          if (skipFile(childPath, status.getModificationTime(), referenceTimes.get(info.getFilePath()))) {
-            continue;
-          }
 
-          if (status.isDirectory()) {
-            if (recursive) {
-              scan(childPath, rootPath == null ? parentStatus.getPath() : rootPath);
-            }
-          }
+        if (childStatuses.length == 0 && rootPath == null && lastModifiedTimesForInputDir.get(parentPathStr) == null) { // empty input directory copy as is
+          ScannedFileInfo info = new ScannedFileInfo(null, filePath.toString(), parentStatus.getModificationTime());
+          processDiscoveredFile(info);
+          lastModifiedTimesForInputDir.put(parentPathStr, parentStatus.getModificationTime());
+        }
 
+        for (FileStatus childStatus : childStatuses) {
+          Path childPath = childStatus.getPath();
           String childPathStr = childPath.toUri().getPath();
-          if (ignoredFiles.contains(childPathStr)) {
-            continue;
-          }
-          if (acceptFile(childPathStr)) {
-            LOG.debug("found {}", childPathStr);
-            processDiscoveredFile(info);
+
+          if (childStatus.isDirectory() && isRecursive()) {
+            addToDiscoveredFiles(rootPath, parentStatus, childStatus, lastModifiedTimesForInputDir);
+            scan(childPath, rootPath == null ? parentStatus.getPath() : rootPath, lastModifiedTimesForInputDir);
+          } else if (acceptFile(childPathStr)) {
+            addToDiscoveredFiles(rootPath, parentStatus, childStatus, lastModifiedTimesForInputDir);
           } else {
             // don't look at it again
             ignoredFiles.add(childPathStr);
@@ -426,6 +443,31 @@ public class FileSplitterInput extends AbstractFileSplitter implements InputOper
       }
     }
 
+    private void addToDiscoveredFiles(Path rootPath, FileStatus parentStatus, FileStatus childStatus,
+        Map<String, Long> lastModifiedTimesForInputDir) throws IOException
+    {
+      Path childPath = childStatus.getPath();
+      String childPathStr = childPath.toUri().getPath();
+      // Directory by now is scanned forcibly. Now check for whether file/directory needs to be added to discoveredFiles.
+      Long oldModificationTime = lastModifiedTimesForInputDir.get(childPathStr);
+      lastModifiedTimesForInputDir.put(childPathStr, childStatus.getModificationTime());
+
+      if (skipFile(childPath, childStatus.getModificationTime(), oldModificationTime) || // Skip dir or file if no timestamp modification
+          (childStatus.isDirectory() && (oldModificationTime != null))) { // If timestamp modified but if its a directory and already present in map, then skip.
+        return;
+      }
+
+      if (ignoredFiles.contains(childPathStr)) {
+        return;
+      }
+
+      ScannedFileInfo info = createScannedFileInfo(parentStatus.getPath(), parentStatus, childPath, childStatus,
+          rootPath);
+
+      LOG.debug("Processing file: " + info.getFilePath());
+      processDiscoveredFile(info);
+    }
+
     protected void processDiscoveredFile(ScannedFileInfo info)
     {
       numDiscoveredPerIteration++;
@@ -619,4 +661,5 @@ public class FileSplitterInput extends AbstractFileSplitter implements InputOper
   }
 
   private static final Logger LOG = LoggerFactory.getLogger(FileSplitterInput.class);
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/f9fe3d5e/library/src/main/java/com/datatorrent/lib/io/fs/HDFSFileSplitter.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/io/fs/HDFSFileSplitter.java b/library/src/main/java/com/datatorrent/lib/io/fs/HDFSFileSplitter.java
new file mode 100644
index 0000000..24466d5
--- /dev/null
+++ b/library/src/main/java/com/datatorrent/lib/io/fs/HDFSFileSplitter.java
@@ -0,0 +1,120 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.lib.io.fs;
+
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import org.apache.hadoop.fs.Path;
+
+import com.datatorrent.api.Context.OperatorContext;
+import com.datatorrent.lib.io.block.BlockMetadata.FileBlockMetadata;
+
+/**
+ * HDFSFileSplitter extends {@link FileSplitterInput} to,
+ * 1. Add relative path to file metadata.
+ * 2. Ignore HDFS temp files (files with extensions _COPYING_).
+ * 3. Set sequencial read option on readers.
+ */
+public class HDFSFileSplitter extends FileSplitterInput
+{
+  private boolean sequencialFileRead;
+
+  public HDFSFileSplitter()
+  {
+    super();
+    super.setScanner(new HDFSScanner());
+  }
+
+
+  @Override
+  protected FileBlockMetadata createBlockMetadata(FileMetadata fileMetadata)
+  {
+    FileBlockMetadata blockMetadta = new FileBlockMetadata(fileMetadata.getFilePath());
+    blockMetadta.setReadBlockInSequence(sequencialFileRead);
+    return blockMetadta;
+  }
+
+  public boolean isSequencialFileRead()
+  {
+    return sequencialFileRead;
+  }
+
+  public void setSequencialFileRead(boolean sequencialFileRead)
+  {
+    this.sequencialFileRead = sequencialFileRead;
+  }
+
+  /**
+   * HDFSScanner extends {@link TimeBasedDirectoryScanner} to ignore HDFS temporary files
+   * and files containing unsupported characters. 
+   */
+  public static class HDFSScanner extends TimeBasedDirectoryScanner
+  {
+    protected static final String UNSUPPORTED_CHARACTOR = ":";
+    private String ignoreFilePatternRegularExp = ".*._COPYING_";
+    private transient Pattern ignoreRegex;
+
+    @Override
+    public void setup(OperatorContext context)
+    {
+      super.setup(context);
+      ignoreRegex = Pattern.compile(this.ignoreFilePatternRegularExp);
+    }
+
+    @Override
+    protected boolean acceptFile(String filePathStr)
+    {
+      boolean accepted = super.acceptFile(filePathStr);
+      if (containsUnsupportedCharacters(filePathStr) || isIgnoredFile(filePathStr)) {
+        return false;
+      }
+      return accepted;
+    }
+
+    private boolean isIgnoredFile(String filePathStr)
+    {
+      String fileName = new Path(filePathStr).getName();
+      if (ignoreRegex != null) {
+        Matcher matcher = ignoreRegex.matcher(fileName);
+        if (matcher.matches()) {
+          return true;
+        }
+      }
+      return false;
+    }
+
+    private boolean containsUnsupportedCharacters(String filePathStr)
+    {
+      return new Path(filePathStr).toUri().getPath().contains(UNSUPPORTED_CHARACTOR);
+    }
+
+    public String getIgnoreFilePatternRegularExp()
+    {
+      return ignoreFilePatternRegularExp;
+    }
+
+    public void setIgnoreFilePatternRegularExp(String ignoreFilePatternRegularExp)
+    {
+      this.ignoreFilePatternRegularExp = ignoreFilePatternRegularExp;
+      this.ignoreRegex = Pattern.compile(ignoreFilePatternRegularExp);
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/f9fe3d5e/library/src/main/java/com/datatorrent/lib/io/fs/HDFSInputModule.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/io/fs/HDFSInputModule.java b/library/src/main/java/com/datatorrent/lib/io/fs/HDFSInputModule.java
new file mode 100644
index 0000000..2b914f1
--- /dev/null
+++ b/library/src/main/java/com/datatorrent/lib/io/fs/HDFSInputModule.java
@@ -0,0 +1,243 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.lib.io.fs;
+
+import javax.validation.constraints.Min;
+import javax.validation.constraints.NotNull;
+import javax.validation.constraints.Size;
+
+import org.apache.hadoop.conf.Configuration;
+
+import com.datatorrent.api.Context;
+import com.datatorrent.api.DAG;
+import com.datatorrent.api.Module;
+import com.datatorrent.common.partitioner.StatelessPartitioner;
+import com.datatorrent.lib.io.block.AbstractBlockReader.ReaderRecord;
+import com.datatorrent.lib.io.block.BlockMetadata.FileBlockMetadata;
+import com.datatorrent.lib.io.block.BlockReader;
+import com.datatorrent.lib.io.fs.AbstractFileSplitter.FileMetadata;
+import com.datatorrent.lib.io.fs.HDFSFileSplitter.HDFSScanner;
+import com.datatorrent.netlet.util.Slice;
+
+/**
+ * HDFSInputModule is used to read files/list of files (or directory) from HDFS. <br/>
+ * Module emits, <br/>
+ * 1. FileMetadata 2. BlockMetadata 3. Block Bytes.<br/><br/>
+ * The module reads data in parallel, following parameters can be configured<br/>
+ * 1. files: list of file(s)/directories to read<br/>
+ * 2. filePatternRegularExp: Files names matching given regex will be read<br/>
+ * 3. scanIntervalMillis: interval between two scans to discover new files in input directory<br/>
+ * 4. recursive: if scan recursively input directories<br/>
+ * 5. blockSize: block size used to read input blocks of file<br/>
+ * 6. readersCount: count of readers to read input file<br/>
+ * 7. sequencialFileRead: If emit file blocks in sequence?
+ */
+public class HDFSInputModule implements Module
+{
+
+  @NotNull
+  @Size(min = 1)
+  private String files;
+  private String filePatternRegularExp;
+  @Min(0)
+  private long scanIntervalMillis;
+  private boolean recursive = true;
+  private long blockSize;
+  private boolean sequencialFileRead = false;
+  private int readersCount;
+
+  public final transient ProxyOutputPort<FileMetadata> filesMetadataOutput = new ProxyOutputPort<>();
+  public final transient ProxyOutputPort<FileBlockMetadata> blocksMetadataOutput = new ProxyOutputPort<>();
+  public final transient ProxyOutputPort<ReaderRecord<Slice>> messages = new ProxyOutputPort<>();
+
+  @Override
+  public void populateDAG(DAG dag, Configuration conf)
+  {
+    HDFSFileSplitter fileSplitter = dag.addOperator("FileSplitter", new HDFSFileSplitter());
+    BlockReader blockReader = dag.addOperator("BlockReader", new BlockReader());
+
+    dag.addStream("BlockMetadata", fileSplitter.blocksMetadataOutput, blockReader.blocksMetadataInput);
+
+    filesMetadataOutput.set(fileSplitter.filesMetadataOutput);
+    blocksMetadataOutput.set(blockReader.blocksMetadataOutput);
+    messages.set(blockReader.messages);
+
+    fileSplitter.setSequencialFileRead(sequencialFileRead);
+    if (blockSize != 0) {
+      fileSplitter.setBlockSize(blockSize);
+    }
+
+    HDFSScanner fileScanner = (HDFSScanner)fileSplitter.getScanner();
+    fileScanner.setFiles(files);
+    if (scanIntervalMillis != 0) {
+      fileScanner.setScanIntervalMillis(scanIntervalMillis);
+    }
+    fileScanner.setRecursive(recursive);
+    if (filePatternRegularExp != null) {
+      fileSplitter.getScanner().setFilePatternRegularExp(filePatternRegularExp);
+    }
+
+    blockReader.setUri(files);
+    if (readersCount != 0) {
+      dag.setAttribute(blockReader, Context.OperatorContext.PARTITIONER, new StatelessPartitioner<BlockReader>(readersCount));
+    }
+  }
+
+  /**
+   * A comma separated list of directories to scan. If the path is not fully qualified the default file system is used.
+   * A fully qualified path can be provided to scan directories in other filesystems.
+   *
+   * @param files
+   *          files
+   */
+  public void setFiles(String files)
+  {
+    this.files = files;
+  }
+
+  /**
+   * Gets the files to be scanned.
+   *
+   * @return files to be scanned.
+   */
+  public String getFiles()
+  {
+    return files;
+  }
+
+  /**
+   * Gets the regular expression for file names to split
+   *
+   * @return regular expression
+   */
+  public String getFilePatternRegularExp()
+  {
+    return filePatternRegularExp;
+  }
+
+  /**
+   * Only files with names matching the given java regular expression are split
+   *
+   * @param filePatternRegexp
+   *          regular expression
+   */
+  public void setFilePatternRegularExp(String filePatternRegexp)
+  {
+    this.filePatternRegularExp = filePatternRegexp;
+  }
+
+  /**
+   * Gets scan interval in milliseconds, interval between two scans to discover new files in input directory
+   *
+   * @return scanInterval milliseconds
+   */
+  public long getScanIntervalMillis()
+  {
+    return scanIntervalMillis;
+  }
+
+  /**
+   * Sets scan interval in milliseconds, interval between two scans to discover new files in input directory
+   *
+   * @param scanIntervalMillis
+   */
+  public void setScanIntervalMillis(long scanIntervalMillis)
+  {
+    this.scanIntervalMillis = scanIntervalMillis;
+  }
+
+  /**
+   * Get is scan recursive
+   *
+   * @return isRecursive
+   */
+  public boolean isRecursive()
+  {
+    return recursive;
+  }
+
+  /**
+   * set is scan recursive
+   *
+   * @param recursive
+   */
+  public void setRecursive(boolean recursive)
+  {
+    this.recursive = recursive;
+  }
+
+  /**
+   * Get block size used to read input blocks of file
+   *
+   * @return blockSize
+   */
+  public long getBlockSize()
+  {
+    return blockSize;
+  }
+
+  /**
+   * Sets block size used to read input blocks of file
+   *
+   * @param blockSize
+   */
+  public void setBlockSize(long blockSize)
+  {
+    this.blockSize = blockSize;
+  }
+
+  /**
+   * Gets readers count
+   * @return readersCount
+   */
+  public int getReadersCount()
+  {
+    return readersCount;
+  }
+
+  /**
+   * Static count of readers to read input file
+   * @param readersCount
+   */
+  public void setReadersCount(int readersCount)
+  {
+    this.readersCount = readersCount;
+  }
+
+  /**
+   * Gets is sequencial file read
+   * 
+   * @return sequencialFileRead
+   */
+  public boolean isSequencialFileRead()
+  {
+    return sequencialFileRead;
+  }
+
+  /**
+   * Sets is sequencial file read
+   *
+   * @param sequencialFileRead
+   */
+  public void setSequencialFileRead(boolean sequencialFileRead)
+  {
+    this.sequencialFileRead = sequencialFileRead;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/f9fe3d5e/library/src/test/java/com/datatorrent/lib/io/fs/FileSplitterInputTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/com/datatorrent/lib/io/fs/FileSplitterInputTest.java b/library/src/test/java/com/datatorrent/lib/io/fs/FileSplitterInputTest.java
index cd0de2d..1d6cf03 100644
--- a/library/src/test/java/com/datatorrent/lib/io/fs/FileSplitterInputTest.java
+++ b/library/src/test/java/com/datatorrent/lib/io/fs/FileSplitterInputTest.java
@@ -71,7 +71,7 @@ public class FileSplitterInputTest
       }
       allLines.addAll(lines);
       File created = new File(dataDirectory, "file" + file + ".txt");
-      filePaths.add(new Path(dataDirectory, created.getName()).toUri().toString());
+      filePaths.add(created.getAbsolutePath());
       FileUtils.write(created, StringUtils.join(lines, '\n'));
     }
     return filePaths;

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/f9fe3d5e/library/src/test/java/com/datatorrent/lib/io/fs/HDFSInputModuleAppTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/com/datatorrent/lib/io/fs/HDFSInputModuleAppTest.java b/library/src/test/java/com/datatorrent/lib/io/fs/HDFSInputModuleAppTest.java
new file mode 100644
index 0000000..8bb1e26
--- /dev/null
+++ b/library/src/test/java/com/datatorrent/lib/io/fs/HDFSInputModuleAppTest.java
@@ -0,0 +1,221 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.lib.io.fs;
+
+import java.io.File;
+import java.io.FileFilter;
+import java.io.IOException;
+
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TestWatcher;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.io.filefilter.WildcardFileFilter;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+import com.datatorrent.api.DAG;
+import com.datatorrent.api.LocalMode;
+import com.datatorrent.api.StreamingApplication;
+import com.datatorrent.lib.io.block.AbstractBlockReader.ReaderRecord;
+import com.datatorrent.lib.io.block.BlockMetadata.FileBlockMetadata;
+import com.datatorrent.lib.io.fs.AbstractFileSplitter.FileMetadata;
+import com.datatorrent.lib.stream.DevNull;
+import com.datatorrent.netlet.util.Slice;
+
+public class HDFSInputModuleAppTest
+{
+  private String inputDir;
+  static String outputDir;
+  private StreamingApplication app;
+  private static final String FILE_1 = "file1.txt";
+  private static final String FILE_2 = "file2.txt";
+  private static final String FILE_1_DATA = "File one data";
+  private static final String FILE_2_DATA = "File two data. This has more data hence more blocks.";
+  static final String OUT_DATA_FILE = "fileData.txt";
+  static final String OUT_METADATA_FILE = "fileMetaData.txt";
+
+  public static class TestMeta extends TestWatcher
+  {
+    public String baseDirectory;
+
+    @Override
+    protected void starting(org.junit.runner.Description description)
+    {
+      this.baseDirectory = "target/" + description.getClassName() + "/" + description.getMethodName();
+    }
+
+  }
+
+  @Rule
+  public TestMeta testMeta = new TestMeta();
+
+  @Before
+  public void setup() throws Exception
+  {
+    inputDir = testMeta.baseDirectory + File.separator + "input";
+    outputDir = testMeta.baseDirectory + File.separator + "output";
+
+    FileUtils.writeStringToFile(new File(inputDir + File.separator + FILE_1), FILE_1_DATA);
+    FileUtils.writeStringToFile(new File(inputDir + File.separator + FILE_2), FILE_2_DATA);
+    FileUtils.forceMkdir(new File(inputDir + File.separator + "dir"));
+    FileUtils.writeStringToFile(new File(inputDir + File.separator + "dir/inner.txt"), FILE_1_DATA);
+  }
+
+  @After
+  public void tearDown() throws IOException
+  {
+    FileUtils.deleteDirectory(new File(inputDir));
+  }
+
+  @Test
+  public void testApplication() throws Exception
+  {
+    app = new Application();
+    Configuration conf = new Configuration(false);
+    conf.set("dt.operator.hdfsInputModule.prop.files", inputDir);
+    conf.set("dt.operator.hdfsInputModule.prop.blockSize", "10");
+    conf.set("dt.operator.hdfsInputModule.prop.scanIntervalMillis", "10000");
+
+    LocalMode lma = LocalMode.newInstance();
+    lma.prepareDAG(app, conf);
+    LocalMode.Controller lc = lma.getController();
+    lc.setHeartbeatMonitoringEnabled(true);
+    lc.runAsync();
+
+    long now = System.currentTimeMillis();
+    Path outDir = new Path("file://" + new File(outputDir).getAbsolutePath());
+    FileSystem fs = FileSystem.newInstance(outDir.toUri(), new Configuration());
+    while (!fs.exists(outDir) && System.currentTimeMillis() - now < 20000) {
+      Thread.sleep(500);
+      LOG.debug("Waiting for {}", outDir);
+    }
+
+    Thread.sleep(10000);
+    lc.shutdown();
+
+    Assert.assertTrue("output dir does not exist", fs.exists(outDir));
+
+    File dir = new File(outputDir);
+    FileFilter fileFilter = new WildcardFileFilter(OUT_METADATA_FILE + "*");
+    verifyFileContents(dir.listFiles(fileFilter), "[fileName=file1.txt, numberOfBlocks=2, isDirectory=false, relativePath=input/file1.txt]");
+    verifyFileContents(dir.listFiles(fileFilter), "[fileName=file2.txt, numberOfBlocks=6, isDirectory=false, relativePath=input/file2.txt]");
+    verifyFileContents(dir.listFiles(fileFilter), "[fileName=dir, numberOfBlocks=0, isDirectory=true, relativePath=input/dir]");
+    verifyFileContents(dir.listFiles(fileFilter), "[fileName=inner.txt, numberOfBlocks=2, isDirectory=false, relativePath=input/dir/inner.txt]");
+
+    fileFilter = new WildcardFileFilter(OUT_DATA_FILE + "*");
+    verifyFileContents(dir.listFiles(fileFilter), FILE_1_DATA);
+    verifyFileContents(dir.listFiles(fileFilter), FILE_2_DATA);
+  }
+
+  private void verifyFileContents(File[] files, String expectedData) throws IOException
+  {
+    StringBuilder filesData = new StringBuilder();
+    for (File file : files) {
+      filesData.append(FileUtils.readFileToString(file));
+    }
+    Assert.assertTrue("File data doesn't contain expected text" , filesData.indexOf(expectedData) > -1);
+  }
+
+  private static Logger LOG = LoggerFactory.getLogger(HDFSInputModuleAppTest.class);
+
+  private static class Application implements StreamingApplication
+  {
+    public void populateDAG(DAG dag, Configuration conf)
+    {
+      HDFSInputModule module = dag.addModule("hdfsInputModule", HDFSInputModule.class);
+
+      AbstractFileOutputOperator<FileMetadata> metadataWriter = new MetadataWriter(HDFSInputModuleAppTest.OUT_METADATA_FILE);
+      metadataWriter.setFilePath(HDFSInputModuleAppTest.outputDir);
+      dag.addOperator("FileMetadataWriter", metadataWriter);
+
+      AbstractFileOutputOperator<ReaderRecord<Slice>> dataWriter = new HDFSFileWriter(HDFSInputModuleAppTest.OUT_DATA_FILE);
+      dataWriter.setFilePath(HDFSInputModuleAppTest.outputDir);
+      dag.addOperator("FileDataWriter", dataWriter);
+
+      DevNull<FileBlockMetadata> devNull = dag.addOperator("devNull", DevNull.class);
+
+      dag.addStream("FileMetaData", module.filesMetadataOutput, metadataWriter.input);
+      dag.addStream("data", module.messages, dataWriter.input);
+      dag.addStream("blockMetadata", module.blocksMetadataOutput, devNull.data);
+    }
+  }
+
+  private static class MetadataWriter extends AbstractFileOutputOperator<FileMetadata>
+  {
+    String fileName;
+
+    @SuppressWarnings("unused")
+    private MetadataWriter()
+    {
+
+    }
+
+    public MetadataWriter(String fileName)
+    {
+      this.fileName = fileName;
+    }
+
+    @Override
+    protected String getFileName(FileMetadata tuple)
+    {
+      return fileName;
+    }
+
+    @Override
+    protected byte[] getBytesForTuple(FileMetadata tuple)
+    {
+      return (tuple).toString().getBytes();
+    }
+  }
+
+  private static class HDFSFileWriter extends AbstractFileOutputOperator<ReaderRecord<Slice>>
+  {
+    String fileName;
+
+    @SuppressWarnings("unused")
+    private HDFSFileWriter()
+    {
+    }
+
+    public HDFSFileWriter(String fileName)
+    {
+      this.fileName = fileName;
+    }
+
+    @Override
+    protected String getFileName(ReaderRecord<Slice> tuple)
+    {
+      return fileName;
+    }
+
+    @Override
+    protected byte[] getBytesForTuple(ReaderRecord<Slice> tuple)
+    {
+      return tuple.getRecord().buffer;
+    }
+  }
+
+}


[2/2] incubator-apex-malhar git commit: Merge branch 'APEXMALHAR-2008-hdfs-input-module' of https://github.com/DT-Priyanka/incubator-apex-malhar

Posted by is...@apache.org.
Merge branch 'APEXMALHAR-2008-hdfs-input-module' of https://github.com/DT-Priyanka/incubator-apex-malhar


Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/commit/becee7f8
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/tree/becee7f8
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/diff/becee7f8

Branch: refs/heads/master
Commit: becee7f82dbca5d975d92dc45ba5f771f8682ea8
Parents: 51a19e1 f9fe3d5
Author: ishark <is...@datatorrent.com>
Authored: Wed Mar 16 15:30:26 2016 -0700
Committer: ishark <is...@datatorrent.com>
Committed: Wed Mar 16 15:30:26 2016 -0700

----------------------------------------------------------------------
 .../datatorrent/lib/io/block/BlockMetadata.java |  31 ++-
 .../datatorrent/lib/io/block/BlockReader.java   |  66 +++++
 .../lib/io/fs/AbstractFileSplitter.java         |  45 +++-
 .../lib/io/fs/FileSplitterInput.java            |  81 +++++--
 .../datatorrent/lib/io/fs/HDFSFileSplitter.java | 120 +++++++++
 .../datatorrent/lib/io/fs/HDFSInputModule.java  | 243 +++++++++++++++++++
 .../lib/io/fs/FileSplitterInputTest.java        |   2 +-
 .../lib/io/fs/HDFSInputModuleAppTest.java       | 221 +++++++++++++++++
 8 files changed, 786 insertions(+), 23 deletions(-)
----------------------------------------------------------------------