You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by pr...@apache.org on 2016/05/13 21:30:38 UTC

[1/2] incubator-apex-malhar git commit: APEXMALHAR-2081 removed HDFS specific classes and moved useful properties to the base class

Repository: incubator-apex-malhar
Updated Branches:
  refs/heads/master 2459f6c02 -> af6075bfa


APEXMALHAR-2081 removed HDFS specific classes and moved useful properties to the base class


Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/commit/b037ae20
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/tree/b037ae20
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/diff/b037ae20

Branch: refs/heads/master
Commit: b037ae20e2da914d41e8bca4cefa151f82871721
Parents: 72de840
Author: Chandni Singh <cs...@apache.org>
Authored: Mon May 9 12:56:40 2016 -0700
Committer: Chandni Singh <cs...@apache.org>
Committed: Mon May 9 14:22:42 2016 -0700

----------------------------------------------------------------------
 .../datatorrent/apps/copy/HDFSFileCopyApp.java  |   4 +-
 .../lib/io/block/AbstractBlockReader.java       |   6 +
 .../lib/io/block/AbstractFSBlockReader.java     |  26 ++-
 .../datatorrent/lib/io/block/BlockMetadata.java |  28 ---
 .../datatorrent/lib/io/block/BlockReader.java   |  66 ------
 .../datatorrent/lib/io/fs/FSFileSplitter.java   | 130 -----------
 .../datatorrent/lib/io/fs/FSInputModule.java    |  40 +++-
 .../lib/io/fs/FileSplitterInput.java            |  37 ++++
 .../datatorrent/lib/io/fs/HDFSFileSplitter.java |  37 ----
 .../datatorrent/lib/io/fs/HDFSInputModule.java  |  49 ----
 .../lib/io/fs/FSInputModuleAppTest.java         | 221 +++++++++++++++++++
 .../lib/io/fs/HDFSInputModuleAppTest.java       | 221 -------------------
 12 files changed, 321 insertions(+), 544 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/b037ae20/apps/filecopy/src/main/java/com/datatorrent/apps/copy/HDFSFileCopyApp.java
----------------------------------------------------------------------
diff --git a/apps/filecopy/src/main/java/com/datatorrent/apps/copy/HDFSFileCopyApp.java b/apps/filecopy/src/main/java/com/datatorrent/apps/copy/HDFSFileCopyApp.java
index d735988..1d8f2e5 100644
--- a/apps/filecopy/src/main/java/com/datatorrent/apps/copy/HDFSFileCopyApp.java
+++ b/apps/filecopy/src/main/java/com/datatorrent/apps/copy/HDFSFileCopyApp.java
@@ -25,8 +25,8 @@ import com.datatorrent.api.DAG;
 import com.datatorrent.api.DAG.Locality;
 import com.datatorrent.api.StreamingApplication;
 import com.datatorrent.api.annotation.ApplicationAnnotation;
+import com.datatorrent.lib.io.fs.FSInputModule;
 import com.datatorrent.lib.io.fs.HDFSFileCopyModule;
-import com.datatorrent.lib.io.fs.HDFSInputModule;
 
 /**
  * Application for HDFS to HDFS file copy
@@ -38,7 +38,7 @@ public class HDFSFileCopyApp implements StreamingApplication
   public void populateDAG(DAG dag, Configuration conf)
   {
 
-    HDFSInputModule inputModule = dag.addModule("HDFSInputModule", new HDFSInputModule());
+    FSInputModule inputModule = dag.addModule("HDFSInputModule", new FSInputModule());
     HDFSFileCopyModule outputModule = dag.addModule("HDFSFileCopyModule", new HDFSFileCopyModule());
 
     dag.addStream("FileMetaData", inputModule.filesMetadataOutput, outputModule.filesMetadataInput);

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/b037ae20/library/src/main/java/com/datatorrent/lib/io/block/AbstractBlockReader.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/io/block/AbstractBlockReader.java b/library/src/main/java/com/datatorrent/lib/io/block/AbstractBlockReader.java
index a6d1bd9..268a17b 100644
--- a/library/src/main/java/com/datatorrent/lib/io/block/AbstractBlockReader.java
+++ b/library/src/main/java/com/datatorrent/lib/io/block/AbstractBlockReader.java
@@ -38,6 +38,7 @@ import org.apache.hadoop.fs.PositionedReadable;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 
+import com.datatorrent.api.AutoMetric;
 import com.datatorrent.api.Context;
 import com.datatorrent.api.DefaultInputPort;
 import com.datatorrent.api.DefaultOutputPort;
@@ -127,6 +128,9 @@ public abstract class AbstractBlockReader<R, B extends BlockMetadata, STREAM ext
   protected transient long lastBlockOpenTime;
   protected transient boolean consecutiveBlock;
 
+  @AutoMetric
+  private long bytesRead;
+
   public final transient DefaultOutputPort<B> blocksMetadataOutput = new DefaultOutputPort<>();
   public final transient DefaultOutputPort<ReaderRecord<R>> messages = new DefaultOutputPort<>();
 
@@ -171,6 +175,7 @@ public abstract class AbstractBlockReader<R, B extends BlockMetadata, STREAM ext
   {
     this.windowId = windowId;
     blocksPerWindow = 0;
+    bytesRead = 0;
   }
 
   @Override
@@ -247,6 +252,7 @@ public abstract class AbstractBlockReader<R, B extends BlockMetadata, STREAM ext
     while ((entity = readerContext.next()) != null) {
 
       counters.getCounter(ReaderCounterKeys.BYTES).add(entity.getUsedBytes());
+      bytesRead += entity.getUsedBytes();
 
       R record = convertToRecord(entity.getRecord());
 

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/b037ae20/library/src/main/java/com/datatorrent/lib/io/block/AbstractFSBlockReader.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/io/block/AbstractFSBlockReader.java b/library/src/main/java/com/datatorrent/lib/io/block/AbstractFSBlockReader.java
index d74c9c9..3eccb3e 100644
--- a/library/src/main/java/com/datatorrent/lib/io/block/AbstractFSBlockReader.java
+++ b/library/src/main/java/com/datatorrent/lib/io/block/AbstractFSBlockReader.java
@@ -19,6 +19,7 @@
 package com.datatorrent.lib.io.block;
 
 import java.io.IOException;
+import java.net.URI;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataInputStream;
@@ -42,6 +43,12 @@ public abstract class AbstractFSBlockReader<R>
   protected transient FileSystem fs;
   protected transient Configuration configuration;
 
+  /**
+   * If all the blocks belong to files which are under a folder than this base can be set to that folder.
+   * The File System instance is derived using this base path.
+   */
+  protected String basePath;
+
   @Override
   public void setup(Context.OperatorContext context)
   {
@@ -79,7 +86,11 @@ public abstract class AbstractFSBlockReader<R>
    */
   protected FileSystem getFSInstance() throws IOException
   {
-    return FileSystem.newInstance(configuration);
+    if (basePath != null) {
+      return FileSystem.newInstance(URI.create(basePath), configuration);
+    } else  {
+      return FileSystem.newInstance(configuration);
+    }
   }
 
   /**
@@ -111,4 +122,17 @@ public abstract class AbstractFSBlockReader<R>
       this.readerContext = new ReaderContext.ReadAheadLineReaderContext<>();
     }
   }
+
+  /**
+   * Sets the base path.
+   */
+  public void setBasePath(String basePath)
+  {
+    this.basePath = basePath;
+  }
+
+  public String getBasePath()
+  {
+    return basePath;
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/b037ae20/library/src/main/java/com/datatorrent/lib/io/block/BlockMetadata.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/io/block/BlockMetadata.java b/library/src/main/java/com/datatorrent/lib/io/block/BlockMetadata.java
index 6e38e45..25b3a75 100644
--- a/library/src/main/java/com/datatorrent/lib/io/block/BlockMetadata.java
+++ b/library/src/main/java/com/datatorrent/lib/io/block/BlockMetadata.java
@@ -202,7 +202,6 @@ public interface BlockMetadata
   class FileBlockMetadata extends AbstractBlockMetadata
   {
     private final String filePath;
-    private boolean readBlockInSequence = false;
 
     protected FileBlockMetadata()
     {
@@ -227,37 +226,10 @@ public interface BlockMetadata
       return filePath;
     }
 
-    /**
-     * Get if blocks should be read in sequence
-     * @return readBlockInSequence
-     */
-    public boolean isReadBlockInSequence()
-    {
-      return readBlockInSequence;
-    }
-
-    /**
-     * Set if blokcs should be read in sequence
-     * @param readBlockInSequence
-     */
-    public void setReadBlockInSequence(boolean readBlockInSequence)
-    {
-      this.readBlockInSequence = readBlockInSequence;
-    }
-
     public FileBlockMetadata newInstance(@NotNull String filePath)
     {
       Preconditions.checkNotNull(filePath);
       return new FileBlockMetadata(filePath);
     }
-
-    @Override
-    public int hashCode()
-    {
-      if (isReadBlockInSequence()) {
-        return getFilePath().hashCode();
-      }
-      return super.hashCode();
-    }
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/b037ae20/library/src/main/java/com/datatorrent/lib/io/block/BlockReader.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/io/block/BlockReader.java b/library/src/main/java/com/datatorrent/lib/io/block/BlockReader.java
deleted file mode 100644
index f4f7d76..0000000
--- a/library/src/main/java/com/datatorrent/lib/io/block/BlockReader.java
+++ /dev/null
@@ -1,66 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package com.datatorrent.lib.io.block;
-
-import java.io.IOException;
-import java.net.URI;
-
-import org.apache.hadoop.fs.FileSystem;
-
-import com.datatorrent.api.AutoMetric;
-
-/**
- * BlockReader extends {@link FSSliceReader} to accept case insensitive uri
- */
-public class BlockReader extends FSSliceReader
-{
-  @AutoMetric
-  private long bytesRead;
-
-  protected String uri;
-
-  @Override
-  public void beginWindow(long windowId)
-  {
-    super.beginWindow(windowId);
-    bytesRead = 0;
-  }
-
-  @Override
-  protected FileSystem getFSInstance() throws IOException
-  {
-    return FileSystem.newInstance(URI.create(uri), configuration);
-  }
-
-  /**
-   * Sets the uri
-   *
-   * @param uri of form hdfs://hostname:port/path/to/input
-   */
-  public void setUri(String uri)
-  {
-    this.uri = uri;
-  }
-
-  public String getUri()
-  {
-    return uri;
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/b037ae20/library/src/main/java/com/datatorrent/lib/io/fs/FSFileSplitter.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/io/fs/FSFileSplitter.java b/library/src/main/java/com/datatorrent/lib/io/fs/FSFileSplitter.java
deleted file mode 100644
index 4318994..0000000
--- a/library/src/main/java/com/datatorrent/lib/io/fs/FSFileSplitter.java
+++ /dev/null
@@ -1,130 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package com.datatorrent.lib.io.fs;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
-import org.apache.hadoop.fs.Path;
-import com.datatorrent.api.Context;
-import com.datatorrent.lib.io.block.BlockMetadata;
-
-/**
- * FSFileSplitter extends {@link FileSplitterInput} to,
- * 1. Ignore files with extension "ignoreFilePatternRegularExp"
- * 2. Set sequencial read option on readers.
- */
-public class FSFileSplitter extends FileSplitterInput
-{
-  private boolean sequencialFileRead;
-
-  public FSFileSplitter()
-  {
-    super();
-    super.setScanner(new FSScanner());
-  }
-
-  @Override
-  protected BlockMetadata.FileBlockMetadata createBlockMetadata(FileMetadata fileMetadata)
-  {
-    BlockMetadata.FileBlockMetadata blockMetadta = new BlockMetadata.FileBlockMetadata(fileMetadata.getFilePath());
-    blockMetadta.setReadBlockInSequence(sequencialFileRead);
-    return blockMetadta;
-  }
-
-  public boolean isSequencialFileRead()
-  {
-    return sequencialFileRead;
-  }
-
-  public void setSequencialFileRead(boolean sequencialFileRead)
-  {
-    this.sequencialFileRead = sequencialFileRead;
-  }
-
-  /**
-   * FSScanner extends {@link TimeBasedDirectoryScanner} to ignore temporary files
-   * and files containing unsupported characters.
-   */
-  public static class FSScanner extends TimeBasedDirectoryScanner
-  {
-    private String unsupportedCharacter;
-    private String ignoreFilePatternRegularExp;
-    private transient Pattern ignoreRegex;
-
-    @Override
-    public void setup(Context.OperatorContext context)
-    {
-      super.setup(context);
-      if (ignoreFilePatternRegularExp != null) {
-        ignoreRegex = Pattern.compile(this.ignoreFilePatternRegularExp);
-      }
-    }
-
-    @Override
-    protected boolean acceptFile(String filePathStr)
-    {
-      boolean accepted = super.acceptFile(filePathStr);
-      if (containsUnsupportedCharacters(filePathStr) || isIgnoredFile(filePathStr)) {
-        return false;
-      }
-      return accepted;
-    }
-
-    private boolean isIgnoredFile(String filePathStr)
-    {
-      String fileName = new Path(filePathStr).getName();
-      if (ignoreRegex != null) {
-        Matcher matcher = ignoreRegex.matcher(fileName);
-        if (matcher.matches()) {
-          return true;
-        }
-      }
-      return false;
-    }
-
-    private boolean containsUnsupportedCharacters(String filePathStr)
-    {
-      if (unsupportedCharacter != null) {
-        return new Path(filePathStr).toUri().getPath().contains(unsupportedCharacter);
-      }
-      return false;
-    }
-
-    public String getIgnoreFilePatternRegularExp()
-    {
-      return ignoreFilePatternRegularExp;
-    }
-
-    public void setIgnoreFilePatternRegularExp(String ignoreFilePatternRegularExp)
-    {
-      this.ignoreFilePatternRegularExp = ignoreFilePatternRegularExp;
-      this.ignoreRegex = Pattern.compile(ignoreFilePatternRegularExp);
-    }
-
-    public String getUnsupportedCharacter()
-    {
-      return unsupportedCharacter;
-    }
-
-    public void setUnsupportedCharacter(String unsupportedCharacter)
-    {
-      this.unsupportedCharacter = unsupportedCharacter;
-    }
-  }
-}
-

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/b037ae20/library/src/main/java/com/datatorrent/lib/io/fs/FSInputModule.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/io/fs/FSInputModule.java b/library/src/main/java/com/datatorrent/lib/io/fs/FSInputModule.java
index e5221a0..deaa0a4 100644
--- a/library/src/main/java/com/datatorrent/lib/io/fs/FSInputModule.java
+++ b/library/src/main/java/com/datatorrent/lib/io/fs/FSInputModule.java
@@ -26,9 +26,10 @@ import com.datatorrent.api.Context;
 import com.datatorrent.api.DAG;
 import com.datatorrent.api.Module;
 import com.datatorrent.common.partitioner.StatelessPartitioner;
+import com.datatorrent.lib.codec.KryoSerializableStreamCodec;
 import com.datatorrent.lib.io.block.AbstractBlockReader;
 import com.datatorrent.lib.io.block.BlockMetadata;
-import com.datatorrent.lib.io.block.BlockReader;
+import com.datatorrent.lib.io.block.FSSliceReader;
 import com.datatorrent.netlet.util.Slice;
 
 /**
@@ -44,7 +45,7 @@ import com.datatorrent.netlet.util.Slice;
  * 7. sequencialFileRead: If emit file blocks in sequence?
  */
 
-public abstract class FSInputModule implements Module
+public class FSInputModule implements Module
 {
   @NotNull
   @Size(min = 1)
@@ -61,15 +62,21 @@ public abstract class FSInputModule implements Module
   public final transient ProxyOutputPort<BlockMetadata.FileBlockMetadata> blocksMetadataOutput = new ProxyOutputPort<>();
   public final transient ProxyOutputPort<AbstractBlockReader.ReaderRecord<Slice>> messages = new ProxyOutputPort<>();
 
-  public abstract FSFileSplitter createFileSplitter();
+  public FileSplitterInput createFileSplitter()
+  {
+    return new FileSplitterInput();
+  }
 
-  public abstract BlockReader createBlockReader();
+  public FSSliceReader createBlockReader()
+  {
+    return new FSSliceReader();
+  }
 
   @Override
   public void populateDAG(DAG dag, Configuration conf)
   {
-    FSFileSplitter fileSplitter = dag.addOperator("FileSplitter", createFileSplitter());
-    BlockReader blockReader = dag.addOperator("BlockReader", createBlockReader());
+    FileSplitterInput fileSplitter = dag.addOperator("FileSplitter", createFileSplitter());
+    FSSliceReader blockReader = dag.addOperator("BlockReader", createBlockReader());
 
     dag.addStream("BlockMetadata", fileSplitter.blocksMetadataOutput, blockReader.blocksMetadataInput);
 
@@ -77,12 +84,15 @@ public abstract class FSInputModule implements Module
     blocksMetadataOutput.set(blockReader.blocksMetadataOutput);
     messages.set(blockReader.messages);
 
-    fileSplitter.setSequencialFileRead(sequencialFileRead);
+    if (sequencialFileRead) {
+      dag.setInputPortAttribute(blockReader.blocksMetadataInput, Context.PortContext.STREAM_CODEC,
+          new SequentialFileBlockMetadataCodec());
+    }
     if (blockSize != 0) {
       fileSplitter.setBlockSize(blockSize);
     }
 
-    FSFileSplitter.FSScanner fileScanner = (FSFileSplitter.FSScanner)fileSplitter.getScanner();
+    FileSplitterInput.TimeBasedDirectoryScanner fileScanner = fileSplitter.getScanner();
     fileScanner.setFiles(files);
     if (scanIntervalMillis != 0) {
       fileScanner.setScanIntervalMillis(scanIntervalMillis);
@@ -92,9 +102,9 @@ public abstract class FSInputModule implements Module
       fileSplitter.getScanner().setFilePatternRegularExp(filePatternRegularExp);
     }
 
-    blockReader.setUri(files);
+    blockReader.setBasePath(files);
     if (readersCount != 0) {
-      dag.setAttribute(blockReader, Context.OperatorContext.PARTITIONER, new StatelessPartitioner<BlockReader>(readersCount));
+      dag.setAttribute(blockReader, Context.OperatorContext.PARTITIONER, new StatelessPartitioner<FSSliceReader>(readersCount));
       fileSplitter.setBlocksThreshold(readersCount);
     }
   }
@@ -239,4 +249,14 @@ public abstract class FSInputModule implements Module
   {
     this.sequencialFileRead = sequencialFileRead;
   }
+
+  public static class SequentialFileBlockMetadataCodec
+      extends KryoSerializableStreamCodec<BlockMetadata.FileBlockMetadata>
+  {
+    @Override
+    public int getPartition(BlockMetadata.FileBlockMetadata fileBlockMetadata)
+    {
+      return fileBlockMetadata.hashCode();
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/b037ae20/library/src/main/java/com/datatorrent/lib/io/fs/FileSplitterInput.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/io/fs/FileSplitterInput.java b/library/src/main/java/com/datatorrent/lib/io/fs/FileSplitterInput.java
index b7b674e..0ba07d2 100644
--- a/library/src/main/java/com/datatorrent/lib/io/fs/FileSplitterInput.java
+++ b/library/src/main/java/com/datatorrent/lib/io/fs/FileSplitterInput.java
@@ -266,6 +266,7 @@ public class FileSplitterInput extends AbstractFileSplitter implements InputOper
   public static class TimeBasedDirectoryScanner implements Runnable, Component<Context.OperatorContext>
   {
     private static long DEF_SCAN_INTERVAL_MILLIS = 5000;
+    private static String FILE_BEING_COPIED = "_COPYING_";
 
     private boolean recursive;
 
@@ -279,6 +280,8 @@ public class FileSplitterInput extends AbstractFileSplitter implements InputOper
     private long scanIntervalMillis;
     private String filePatternRegularExp;
 
+    private String ignoreFilePatternRegularExp;
+
     protected transient long lastScanMillis;
     protected transient FileSystem fs;
     protected final transient LinkedBlockingDeque<ScannedFileInfo> discoveredFiles;
@@ -287,7 +290,10 @@ public class FileSplitterInput extends AbstractFileSplitter implements InputOper
 
     private transient volatile boolean running;
     protected final transient HashSet<String> ignoredFiles;
+
     protected transient Pattern regex;
+    private transient Pattern ignoreRegex;
+
     protected transient long sleepMillis;
     protected transient Map<String, Long> referenceTimes;
 
@@ -316,6 +322,10 @@ public class FileSplitterInput extends AbstractFileSplitter implements InputOper
       if (filePatternRegularExp != null) {
         regex = Pattern.compile(filePatternRegularExp);
       }
+      if (ignoreFilePatternRegularExp != null) {
+        ignoreRegex = Pattern.compile(this.ignoreFilePatternRegularExp);
+      }
+
       try {
         fs = getFSInstance();
       } catch (IOException e) {
@@ -516,12 +526,21 @@ public class FileSplitterInput extends AbstractFileSplitter implements InputOper
      */
     protected boolean acceptFile(String filePathStr)
     {
+      if (fs.getScheme().equalsIgnoreCase("hdfs") && filePathStr.endsWith(FILE_BEING_COPIED)) {
+        return false;
+      }
       if (regex != null) {
         Matcher matcher = regex.matcher(filePathStr);
         if (!matcher.matches()) {
           return false;
         }
       }
+      if (ignoreRegex != null) {
+        Matcher matcher = ignoreRegex.matcher(filePathStr);
+        if (matcher.matches()) {
+          return false;
+        }
+      }
       return true;
     }
 
@@ -556,6 +575,24 @@ public class FileSplitterInput extends AbstractFileSplitter implements InputOper
     }
 
     /**
+     * @return the regular expression for ignored files.
+     */
+    public String getIgnoreFilePatternRegularExp()
+    {
+      return ignoreFilePatternRegularExp;
+    }
+
+    /**
+     * Sets the regular expression for files that should be ignored.
+     *
+     * @param ignoreFilePatternRegex regular expression for files that will be ignored.
+     */
+    public void setIgnoreFilePatternRegularExp(String ignoreFilePatternRegex)
+    {
+      this.ignoreFilePatternRegularExp = ignoreFilePatternRegex;
+    }
+
+    /**
      * A comma separated list of directories to scan. If the path is not fully qualified the default
      * file system is used. A fully qualified path can be provided to scan directories in other filesystems.
      *

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/b037ae20/library/src/main/java/com/datatorrent/lib/io/fs/HDFSFileSplitter.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/io/fs/HDFSFileSplitter.java b/library/src/main/java/com/datatorrent/lib/io/fs/HDFSFileSplitter.java
deleted file mode 100644
index 26f2b6d..0000000
--- a/library/src/main/java/com/datatorrent/lib/io/fs/HDFSFileSplitter.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package com.datatorrent.lib.io.fs;
-
-/**
- * HDFSFileSplitter extends {@link FileSplitterInput} to,
- * 1. Add relative path to file metadata.
- * 2. Ignore HDFS temp files (files with extensions _COPYING_).
- * 3. Set sequencial read option on readers.
- */
-public class HDFSFileSplitter extends FSFileSplitter
-{
-  public HDFSFileSplitter()
-  {
-    super();
-    FSFileSplitter.FSScanner scanner = (FSFileSplitter.FSScanner)getScanner();
-    scanner.setIgnoreFilePatternRegularExp(".*._COPYING_");
-    scanner.setUnsupportedCharacter(":");
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/b037ae20/library/src/main/java/com/datatorrent/lib/io/fs/HDFSInputModule.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/io/fs/HDFSInputModule.java b/library/src/main/java/com/datatorrent/lib/io/fs/HDFSInputModule.java
deleted file mode 100644
index de99fd3..0000000
--- a/library/src/main/java/com/datatorrent/lib/io/fs/HDFSInputModule.java
+++ /dev/null
@@ -1,49 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package com.datatorrent.lib.io.fs;
-
-import com.datatorrent.lib.io.block.BlockReader;
-
-/**
- * HDFSInputModule is used to read files/list of files (or directory) from HDFS. <br/>
- * Module emits, <br/>
- * 1. FileMetadata 2. BlockMetadata 3. Block Bytes.<br/><br/>
- * The module reads data in parallel, following parameters can be configured<br/>
- * 1. files: list of file(s)/directories to read<br/>
- * 2. filePatternRegularExp: Files names matching given regex will be read<br/>
- * 3. scanIntervalMillis: interval between two scans to discover new files in input directory<br/>
- * 4. recursive: if scan recursively input directories<br/>
- * 5. blockSize: block size used to read input blocks of file<br/>
- * 6. readersCount: count of readers to read input file<br/>
- * 7. sequencialFileRead: If emit file blocks in sequence?
- */
-public class HDFSInputModule extends FSInputModule
-{
-  @Override
-  public FSFileSplitter createFileSplitter()
-  {
-    return new HDFSFileSplitter();
-  }
-
-  @Override
-  public BlockReader createBlockReader()
-  {
-    return new BlockReader();
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/b037ae20/library/src/test/java/com/datatorrent/lib/io/fs/FSInputModuleAppTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/com/datatorrent/lib/io/fs/FSInputModuleAppTest.java b/library/src/test/java/com/datatorrent/lib/io/fs/FSInputModuleAppTest.java
new file mode 100644
index 0000000..55ee090
--- /dev/null
+++ b/library/src/test/java/com/datatorrent/lib/io/fs/FSInputModuleAppTest.java
@@ -0,0 +1,221 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.lib.io.fs;
+
+import java.io.File;
+import java.io.FileFilter;
+import java.io.IOException;
+
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TestWatcher;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.io.filefilter.WildcardFileFilter;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+import com.datatorrent.api.DAG;
+import com.datatorrent.api.LocalMode;
+import com.datatorrent.api.StreamingApplication;
+import com.datatorrent.lib.io.block.AbstractBlockReader.ReaderRecord;
+import com.datatorrent.lib.io.block.BlockMetadata.FileBlockMetadata;
+import com.datatorrent.lib.io.fs.AbstractFileSplitter.FileMetadata;
+import com.datatorrent.lib.stream.DevNull;
+import com.datatorrent.netlet.util.Slice;
+
+public class FSInputModuleAppTest
+{
+  private String inputDir;
+  static String outputDir;
+  private StreamingApplication app;
+  private static final String FILE_1 = "file1.txt";
+  private static final String FILE_2 = "file2.txt";
+  private static final String FILE_1_DATA = "File one data";
+  private static final String FILE_2_DATA = "File two data. This has more data hence more blocks.";
+  static final String OUT_DATA_FILE = "fileData.txt";
+  static final String OUT_METADATA_FILE = "fileMetaData.txt";
+
+  public static class TestMeta extends TestWatcher
+  {
+    public String baseDirectory;
+
+    @Override
+    protected void starting(org.junit.runner.Description description)
+    {
+      this.baseDirectory = "target/" + description.getClassName() + "/" + description.getMethodName();
+    }
+
+  }
+
+  @Rule
+  public TestMeta testMeta = new TestMeta();
+
+  @Before
+  public void setup() throws Exception
+  {
+    inputDir = testMeta.baseDirectory + File.separator + "input";
+    outputDir = testMeta.baseDirectory + File.separator + "output";
+
+    FileUtils.writeStringToFile(new File(inputDir + File.separator + FILE_1), FILE_1_DATA);
+    FileUtils.writeStringToFile(new File(inputDir + File.separator + FILE_2), FILE_2_DATA);
+    FileUtils.forceMkdir(new File(inputDir + File.separator + "dir"));
+    FileUtils.writeStringToFile(new File(inputDir + File.separator + "dir/inner.txt"), FILE_1_DATA);
+  }
+
+  @After
+  public void tearDown() throws IOException
+  {
+    FileUtils.deleteDirectory(new File(inputDir));
+  }
+
+  @Test
+  public void testApplication() throws Exception
+  {
+    app = new Application();
+    Configuration conf = new Configuration(false);
+    conf.set("dt.operator.hdfsInputModule.prop.files", inputDir);
+    conf.set("dt.operator.hdfsInputModule.prop.blockSize", "10");
+    conf.set("dt.operator.hdfsInputModule.prop.scanIntervalMillis", "10000");
+
+    LocalMode lma = LocalMode.newInstance();
+    lma.prepareDAG(app, conf);
+    LocalMode.Controller lc = lma.getController();
+    lc.setHeartbeatMonitoringEnabled(true);
+    lc.runAsync();
+
+    long now = System.currentTimeMillis();
+    Path outDir = new Path("file://" + new File(outputDir).getAbsolutePath());
+    FileSystem fs = FileSystem.newInstance(outDir.toUri(), new Configuration());
+    while (!fs.exists(outDir) && System.currentTimeMillis() - now < 20000) {
+      Thread.sleep(500);
+      LOG.debug("Waiting for {}", outDir);
+    }
+
+    Thread.sleep(10000);
+    lc.shutdown();
+
+    Assert.assertTrue("output dir does not exist", fs.exists(outDir));
+
+    File dir = new File(outputDir);
+    FileFilter fileFilter = new WildcardFileFilter(OUT_METADATA_FILE + "*");
+    verifyFileContents(dir.listFiles(fileFilter), "[fileName=file1.txt, numberOfBlocks=2, isDirectory=false, relativePath=input/file1.txt]");
+    verifyFileContents(dir.listFiles(fileFilter), "[fileName=file2.txt, numberOfBlocks=6, isDirectory=false, relativePath=input/file2.txt]");
+    verifyFileContents(dir.listFiles(fileFilter), "[fileName=dir, numberOfBlocks=0, isDirectory=true, relativePath=input/dir]");
+    verifyFileContents(dir.listFiles(fileFilter), "[fileName=inner.txt, numberOfBlocks=2, isDirectory=false, relativePath=input/dir/inner.txt]");
+
+    fileFilter = new WildcardFileFilter(OUT_DATA_FILE + "*");
+    verifyFileContents(dir.listFiles(fileFilter), FILE_1_DATA);
+    verifyFileContents(dir.listFiles(fileFilter), FILE_2_DATA);
+  }
+
+  private void verifyFileContents(File[] files, String expectedData) throws IOException
+  {
+    StringBuilder filesData = new StringBuilder();
+    for (File file : files) {
+      filesData.append(FileUtils.readFileToString(file));
+    }
+    Assert.assertTrue("File data doesn't contain expected text" , filesData.indexOf(expectedData) > -1);
+  }
+
+  private static Logger LOG = LoggerFactory.getLogger(FSInputModuleAppTest.class);
+
+  private static class Application implements StreamingApplication
+  {
+    public void populateDAG(DAG dag, Configuration conf)
+    {
+      FSInputModule module = dag.addModule("hdfsInputModule", FSInputModule.class);
+
+      AbstractFileOutputOperator<FileMetadata> metadataWriter = new MetadataWriter(FSInputModuleAppTest.OUT_METADATA_FILE);
+      metadataWriter.setFilePath(FSInputModuleAppTest.outputDir);
+      dag.addOperator("FileMetadataWriter", metadataWriter);
+
+      AbstractFileOutputOperator<ReaderRecord<Slice>> dataWriter = new HDFSFileWriter(FSInputModuleAppTest.OUT_DATA_FILE);
+      dataWriter.setFilePath(FSInputModuleAppTest.outputDir);
+      dag.addOperator("FileDataWriter", dataWriter);
+
+      DevNull<FileBlockMetadata> devNull = dag.addOperator("devNull", DevNull.class);
+
+      dag.addStream("FileMetaData", module.filesMetadataOutput, metadataWriter.input);
+      dag.addStream("data", module.messages, dataWriter.input);
+      dag.addStream("blockMetadata", module.blocksMetadataOutput, devNull.data);
+    }
+  }
+
+  private static class MetadataWriter extends AbstractFileOutputOperator<FileMetadata>
+  {
+    String fileName;
+
+    @SuppressWarnings("unused")
+    private MetadataWriter()
+    {
+
+    }
+
+    public MetadataWriter(String fileName)
+    {
+      this.fileName = fileName;
+    }
+
+    @Override
+    protected String getFileName(FileMetadata tuple)
+    {
+      return fileName;
+    }
+
+    @Override
+    protected byte[] getBytesForTuple(FileMetadata tuple)
+    {
+      return (tuple).toString().getBytes();
+    }
+  }
+
+  private static class HDFSFileWriter extends AbstractFileOutputOperator<ReaderRecord<Slice>>
+  {
+    String fileName;
+
+    @SuppressWarnings("unused")
+    private HDFSFileWriter()
+    {
+    }
+
+    public HDFSFileWriter(String fileName)
+    {
+      this.fileName = fileName;
+    }
+
+    @Override
+    protected String getFileName(ReaderRecord<Slice> tuple)
+    {
+      return fileName;
+    }
+
+    @Override
+    protected byte[] getBytesForTuple(ReaderRecord<Slice> tuple)
+    {
+      return tuple.getRecord().buffer;
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/b037ae20/library/src/test/java/com/datatorrent/lib/io/fs/HDFSInputModuleAppTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/com/datatorrent/lib/io/fs/HDFSInputModuleAppTest.java b/library/src/test/java/com/datatorrent/lib/io/fs/HDFSInputModuleAppTest.java
deleted file mode 100644
index 8bb1e26..0000000
--- a/library/src/test/java/com/datatorrent/lib/io/fs/HDFSInputModuleAppTest.java
+++ /dev/null
@@ -1,221 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package com.datatorrent.lib.io.fs;
-
-import java.io.File;
-import java.io.FileFilter;
-import java.io.IOException;
-
-import org.junit.After;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TestWatcher;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.apache.commons.io.FileUtils;
-import org.apache.commons.io.filefilter.WildcardFileFilter;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-
-import com.datatorrent.api.DAG;
-import com.datatorrent.api.LocalMode;
-import com.datatorrent.api.StreamingApplication;
-import com.datatorrent.lib.io.block.AbstractBlockReader.ReaderRecord;
-import com.datatorrent.lib.io.block.BlockMetadata.FileBlockMetadata;
-import com.datatorrent.lib.io.fs.AbstractFileSplitter.FileMetadata;
-import com.datatorrent.lib.stream.DevNull;
-import com.datatorrent.netlet.util.Slice;
-
-public class HDFSInputModuleAppTest
-{
-  private String inputDir;
-  static String outputDir;
-  private StreamingApplication app;
-  private static final String FILE_1 = "file1.txt";
-  private static final String FILE_2 = "file2.txt";
-  private static final String FILE_1_DATA = "File one data";
-  private static final String FILE_2_DATA = "File two data. This has more data hence more blocks.";
-  static final String OUT_DATA_FILE = "fileData.txt";
-  static final String OUT_METADATA_FILE = "fileMetaData.txt";
-
-  public static class TestMeta extends TestWatcher
-  {
-    public String baseDirectory;
-
-    @Override
-    protected void starting(org.junit.runner.Description description)
-    {
-      this.baseDirectory = "target/" + description.getClassName() + "/" + description.getMethodName();
-    }
-
-  }
-
-  @Rule
-  public TestMeta testMeta = new TestMeta();
-
-  @Before
-  public void setup() throws Exception
-  {
-    inputDir = testMeta.baseDirectory + File.separator + "input";
-    outputDir = testMeta.baseDirectory + File.separator + "output";
-
-    FileUtils.writeStringToFile(new File(inputDir + File.separator + FILE_1), FILE_1_DATA);
-    FileUtils.writeStringToFile(new File(inputDir + File.separator + FILE_2), FILE_2_DATA);
-    FileUtils.forceMkdir(new File(inputDir + File.separator + "dir"));
-    FileUtils.writeStringToFile(new File(inputDir + File.separator + "dir/inner.txt"), FILE_1_DATA);
-  }
-
-  @After
-  public void tearDown() throws IOException
-  {
-    FileUtils.deleteDirectory(new File(inputDir));
-  }
-
-  @Test
-  public void testApplication() throws Exception
-  {
-    app = new Application();
-    Configuration conf = new Configuration(false);
-    conf.set("dt.operator.hdfsInputModule.prop.files", inputDir);
-    conf.set("dt.operator.hdfsInputModule.prop.blockSize", "10");
-    conf.set("dt.operator.hdfsInputModule.prop.scanIntervalMillis", "10000");
-
-    LocalMode lma = LocalMode.newInstance();
-    lma.prepareDAG(app, conf);
-    LocalMode.Controller lc = lma.getController();
-    lc.setHeartbeatMonitoringEnabled(true);
-    lc.runAsync();
-
-    long now = System.currentTimeMillis();
-    Path outDir = new Path("file://" + new File(outputDir).getAbsolutePath());
-    FileSystem fs = FileSystem.newInstance(outDir.toUri(), new Configuration());
-    while (!fs.exists(outDir) && System.currentTimeMillis() - now < 20000) {
-      Thread.sleep(500);
-      LOG.debug("Waiting for {}", outDir);
-    }
-
-    Thread.sleep(10000);
-    lc.shutdown();
-
-    Assert.assertTrue("output dir does not exist", fs.exists(outDir));
-
-    File dir = new File(outputDir);
-    FileFilter fileFilter = new WildcardFileFilter(OUT_METADATA_FILE + "*");
-    verifyFileContents(dir.listFiles(fileFilter), "[fileName=file1.txt, numberOfBlocks=2, isDirectory=false, relativePath=input/file1.txt]");
-    verifyFileContents(dir.listFiles(fileFilter), "[fileName=file2.txt, numberOfBlocks=6, isDirectory=false, relativePath=input/file2.txt]");
-    verifyFileContents(dir.listFiles(fileFilter), "[fileName=dir, numberOfBlocks=0, isDirectory=true, relativePath=input/dir]");
-    verifyFileContents(dir.listFiles(fileFilter), "[fileName=inner.txt, numberOfBlocks=2, isDirectory=false, relativePath=input/dir/inner.txt]");
-
-    fileFilter = new WildcardFileFilter(OUT_DATA_FILE + "*");
-    verifyFileContents(dir.listFiles(fileFilter), FILE_1_DATA);
-    verifyFileContents(dir.listFiles(fileFilter), FILE_2_DATA);
-  }
-
-  private void verifyFileContents(File[] files, String expectedData) throws IOException
-  {
-    StringBuilder filesData = new StringBuilder();
-    for (File file : files) {
-      filesData.append(FileUtils.readFileToString(file));
-    }
-    Assert.assertTrue("File data doesn't contain expected text" , filesData.indexOf(expectedData) > -1);
-  }
-
-  private static Logger LOG = LoggerFactory.getLogger(HDFSInputModuleAppTest.class);
-
-  private static class Application implements StreamingApplication
-  {
-    public void populateDAG(DAG dag, Configuration conf)
-    {
-      HDFSInputModule module = dag.addModule("hdfsInputModule", HDFSInputModule.class);
-
-      AbstractFileOutputOperator<FileMetadata> metadataWriter = new MetadataWriter(HDFSInputModuleAppTest.OUT_METADATA_FILE);
-      metadataWriter.setFilePath(HDFSInputModuleAppTest.outputDir);
-      dag.addOperator("FileMetadataWriter", metadataWriter);
-
-      AbstractFileOutputOperator<ReaderRecord<Slice>> dataWriter = new HDFSFileWriter(HDFSInputModuleAppTest.OUT_DATA_FILE);
-      dataWriter.setFilePath(HDFSInputModuleAppTest.outputDir);
-      dag.addOperator("FileDataWriter", dataWriter);
-
-      DevNull<FileBlockMetadata> devNull = dag.addOperator("devNull", DevNull.class);
-
-      dag.addStream("FileMetaData", module.filesMetadataOutput, metadataWriter.input);
-      dag.addStream("data", module.messages, dataWriter.input);
-      dag.addStream("blockMetadata", module.blocksMetadataOutput, devNull.data);
-    }
-  }
-
-  private static class MetadataWriter extends AbstractFileOutputOperator<FileMetadata>
-  {
-    String fileName;
-
-    @SuppressWarnings("unused")
-    private MetadataWriter()
-    {
-
-    }
-
-    public MetadataWriter(String fileName)
-    {
-      this.fileName = fileName;
-    }
-
-    @Override
-    protected String getFileName(FileMetadata tuple)
-    {
-      return fileName;
-    }
-
-    @Override
-    protected byte[] getBytesForTuple(FileMetadata tuple)
-    {
-      return (tuple).toString().getBytes();
-    }
-  }
-
-  private static class HDFSFileWriter extends AbstractFileOutputOperator<ReaderRecord<Slice>>
-  {
-    String fileName;
-
-    @SuppressWarnings("unused")
-    private HDFSFileWriter()
-    {
-    }
-
-    public HDFSFileWriter(String fileName)
-    {
-      this.fileName = fileName;
-    }
-
-    @Override
-    protected String getFileName(ReaderRecord<Slice> tuple)
-    {
-      return fileName;
-    }
-
-    @Override
-    protected byte[] getBytesForTuple(ReaderRecord<Slice> tuple)
-    {
-      return tuple.getRecord().buffer;
-    }
-  }
-
-}



[2/2] incubator-apex-malhar git commit: Merge branch 'APEXMALHAR-2081' of github.com:chandnisingh/incubator-apex-malhar

Posted by pr...@apache.org.
Merge branch 'APEXMALHAR-2081' of github.com:chandnisingh/incubator-apex-malhar


Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/commit/af6075bf
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/tree/af6075bf
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/diff/af6075bf

Branch: refs/heads/master
Commit: af6075bfa01fd45fa29d2f331ae93e29a976c5a1
Parents: 2459f6c b037ae2
Author: Priyanka Gugale <pr...@datatorrent.com>
Authored: Fri May 13 14:10:46 2016 -0700
Committer: Priyanka Gugale <pr...@datatorrent.com>
Committed: Fri May 13 14:10:46 2016 -0700

----------------------------------------------------------------------
 .../datatorrent/apps/copy/HDFSFileCopyApp.java  |   4 +-
 .../lib/io/block/AbstractBlockReader.java       |   6 +
 .../lib/io/block/AbstractFSBlockReader.java     |  26 ++-
 .../datatorrent/lib/io/block/BlockMetadata.java |  28 ---
 .../datatorrent/lib/io/block/BlockReader.java   |  66 ------
 .../datatorrent/lib/io/fs/FSFileSplitter.java   | 130 -----------
 .../datatorrent/lib/io/fs/FSInputModule.java    |  40 +++-
 .../lib/io/fs/FileSplitterInput.java            |  37 ++++
 .../datatorrent/lib/io/fs/HDFSFileSplitter.java |  37 ----
 .../datatorrent/lib/io/fs/HDFSInputModule.java  |  49 ----
 .../lib/io/fs/FSInputModuleAppTest.java         | 221 +++++++++++++++++++
 .../lib/io/fs/HDFSInputModuleAppTest.java       | 221 -------------------
 12 files changed, 321 insertions(+), 544 deletions(-)
----------------------------------------------------------------------