You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by pr...@apache.org on 2015/09/24 08:05:45 UTC

[1/4] incubator-apex-malhar git commit: MLHR-1804 MLHR-1805 MLHR-1806 #resolve #comment Refactoring of file splitter and fix for kryo exception

Repository: incubator-apex-malhar
Updated Branches:
  refs/heads/devel-3 ffc25e57c -> d710af9b1


MLHR-1804 MLHR-1805 MLHR-1806 #resolve #comment Refactoring of file splitter and fix for kryo exception


Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/commit/ea202dae
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/tree/ea202dae
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/diff/ea202dae

Branch: refs/heads/devel-3
Commit: ea202daed756d8a0ca633ec14c0539451ddc581a
Parents: ffc25e5
Author: Chandni Singh <ch...@datatorrent.com>
Authored: Tue Aug 25 12:08:21 2015 -0700
Committer: Chandni Singh <cs...@apache.org>
Committed: Tue Sep 22 15:24:51 2015 -0700

----------------------------------------------------------------------
 .../datatorrent/lib/io/block/BlockMetadata.java |  78 ++-
 .../lib/io/fs/AbstractFileSplitter.java         | 539 +++++++++++++++++
 .../com/datatorrent/lib/io/fs/FileSplitter.java |   6 +-
 .../datatorrent/lib/io/fs/FileSplitterBase.java | 142 +++++
 .../lib/io/fs/FileSplitterInput.java            | 594 +++++++++++++++++++
 .../lib/io/fs/FileSplitterBaseTest.java         | 264 +++++++++
 .../lib/io/fs/FileSplitterInputTest.java        | 486 +++++++++++++++
 7 files changed, 2096 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/ea202dae/library/src/main/java/com/datatorrent/lib/io/block/BlockMetadata.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/io/block/BlockMetadata.java b/library/src/main/java/com/datatorrent/lib/io/block/BlockMetadata.java
index 255a67f..1f47f8f 100644
--- a/library/src/main/java/com/datatorrent/lib/io/block/BlockMetadata.java
+++ b/library/src/main/java/com/datatorrent/lib/io/block/BlockMetadata.java
@@ -5,7 +5,7 @@
  * you may not use this file except in compliance with the License.
  * You may obtain a copy of the License at
  *
- *         http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -15,6 +15,10 @@
  */
 package com.datatorrent.lib.io.block;
 
+import javax.validation.constraints.NotNull;
+
+import com.google.common.base.Preconditions;
+
 /**
  * Represents the metadata of a block.
  *
@@ -51,8 +55,9 @@ public interface BlockMetadata
   {
     private long offset;
     private long length;
-    private final boolean isLastBlock;
-    private final long previousBlockId;
+    private boolean isLastBlock;
+    private long previousBlockId;
+    private long blockId;
 
     @SuppressWarnings("unused")
     protected AbstractBlockMetadata()
@@ -61,6 +66,7 @@ public interface BlockMetadata
       length = -1;
       isLastBlock = false;
       previousBlockId = -1;
+      blockId = -1;
     }
 
     /**
@@ -71,12 +77,32 @@ public interface BlockMetadata
      * @param isLastBlock     true if this is the last block of file
      * @param previousBlockId id of the previous block
      */
+    @Deprecated
     public AbstractBlockMetadata(long offset, long length, boolean isLastBlock, long previousBlockId)
     {
       this.offset = offset;
       this.length = length;
       this.isLastBlock = isLastBlock;
       this.previousBlockId = previousBlockId;
+      this.blockId = -1;
+    }
+
+    /**
+     * Constructs Block metadata
+     *
+     * @param blockId         block id
+     * @param offset          offset of the file in the block
+     * @param length          length of the file in the block
+     * @param isLastBlock     true if this is the last block of file
+     * @param previousBlockId id of the previous block
+     */
+    public AbstractBlockMetadata(long blockId, long offset, long length, boolean isLastBlock, long previousBlockId)
+    {
+      this.blockId = blockId;
+      this.offset = offset;
+      this.length = length;
+      this.isLastBlock = isLastBlock;
+      this.previousBlockId = previousBlockId;
     }
 
     @Override
@@ -89,14 +115,14 @@ public interface BlockMetadata
         return false;
       }
 
-      AbstractBlockMetadata that = (AbstractBlockMetadata) o;
+      AbstractBlockMetadata that = (AbstractBlockMetadata)o;
       return getBlockId() == that.getBlockId();
     }
 
     @Override
     public int hashCode()
     {
-      return (int) getBlockId();
+      return (int)getBlockId();
     }
 
     @Override
@@ -133,11 +159,37 @@ public interface BlockMetadata
       return isLastBlock;
     }
 
+    public void setLastBlock(boolean lastBlock)
+    {
+      this.isLastBlock = lastBlock;
+    }
+
     @Override
     public long getPreviousBlockId()
     {
       return previousBlockId;
     }
+
+    /**
+     * Sets the previous block id.
+     *
+     * @param previousBlockId previous block id.
+     */
+    public void setPreviousBlockId(long previousBlockId)
+    {
+      this.previousBlockId = previousBlockId;
+    }
+
+    @Override
+    public long getBlockId()
+    {
+      return blockId;
+    }
+
+    public void setBlockId(long blockId)
+    {
+      this.blockId = blockId;
+    }
   }
 
   /**
@@ -146,31 +198,33 @@ public interface BlockMetadata
   public static class FileBlockMetadata extends AbstractBlockMetadata
   {
     private final String filePath;
-    private final long blockId;
 
     protected FileBlockMetadata()
     {
       super();
       filePath = null;
-      blockId = -1;
     }
 
     public FileBlockMetadata(String filePath, long blockId, long offset, long length, boolean isLastBlock, long previousBlockId)
     {
-      super(offset, length, isLastBlock, previousBlockId);
+      super(blockId, offset, length, isLastBlock, previousBlockId);
       this.filePath = filePath;
-      this.blockId = blockId;
     }
 
-    @Override
-    public long getBlockId()
+    public FileBlockMetadata(String filePath)
     {
-      return blockId;
+      this.filePath = filePath;
     }
 
     public String getFilePath()
     {
       return filePath;
     }
+
+    public FileBlockMetadata newInstance(@NotNull String filePath)
+    {
+      Preconditions.checkNotNull(filePath);
+      return new FileBlockMetadata(filePath);
+    }
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/ea202dae/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileSplitter.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileSplitter.java b/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileSplitter.java
new file mode 100644
index 0000000..b8513ee
--- /dev/null
+++ b/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileSplitter.java
@@ -0,0 +1,539 @@
+/**
+ * Copyright (C) 2015 DataTorrent, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.datatorrent.lib.io.fs;
+
+import java.io.IOException;
+import java.util.Iterator;
+
+import javax.annotation.Nullable;
+import javax.validation.constraints.Min;
+import javax.validation.constraints.NotNull;
+
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Preconditions;
+
+import com.datatorrent.api.AutoMetric;
+import com.datatorrent.api.Context;
+import com.datatorrent.api.DefaultOutputPort;
+
+import com.datatorrent.common.util.BaseOperator;
+import com.datatorrent.lib.io.block.BlockMetadata;
+
+/**
+ * An abstract File Splitter.
+ */
+public abstract class AbstractFileSplitter extends BaseOperator
+{
+  protected Long blockSize;
+  private int sequenceNo;
+
+  /**
+   * This is a threshold on the no. of blocks emitted per window. A lot of blocks emitted
+   * per window can overwhelm the downstream operators. This setting helps to control that.
+   */
+  @Min(1)
+  protected int blocksThreshold;
+
+  protected transient long blockCount;
+
+  protected BlockMetadataIterator blockMetadataIterator;
+
+  protected transient int operatorId;
+  protected transient Context.OperatorContext context;
+  protected transient long currentWindowId;
+
+  @AutoMetric
+  protected int filesProcessed;
+
+  public final transient DefaultOutputPort<FileMetadata> filesMetadataOutput = new DefaultOutputPort<>();
+  public final transient DefaultOutputPort<BlockMetadata.FileBlockMetadata> blocksMetadataOutput = new DefaultOutputPort<>();
+
+  public AbstractFileSplitter()
+  {
+    blocksThreshold = Integer.MAX_VALUE;
+  }
+
+  @Override
+  public void setup(Context.OperatorContext context)
+  {
+    Preconditions.checkArgument(blockSize == null || blockSize > 0, "invalid block size");
+
+    operatorId = context.getId();
+    this.context = context;
+    currentWindowId = context.getValue(Context.OperatorContext.ACTIVATION_WINDOW_ID);
+    if (blockSize == null) {
+      blockSize = getDefaultBlockSize();
+    }
+  }
+
+  @Override
+  public void beginWindow(long windowId)
+  {
+    filesProcessed = 0;
+    blockCount = 0;
+    currentWindowId = windowId;
+  }
+
+  protected void process()
+  {
+    if (blockMetadataIterator != null && blockCount < blocksThreshold) {
+      emitBlockMetadata();
+    }
+
+    FileInfo fileInfo;
+    while (blockCount < blocksThreshold && (fileInfo = getFileInfo()) != null) {
+      if (!processFileInfo(fileInfo)) {
+        break;
+      }
+    }
+  }
+
+  /**
+   * @return {@link FileInfo}
+   */
+  protected abstract FileInfo getFileInfo();
+
+  /**
+   * @param fileInfo file info
+   * @return true if blocks threshold is reached; false otherwise
+   */
+  protected boolean processFileInfo(FileInfo fileInfo)
+  {
+    try {
+      FileMetadata fileMetadata = buildFileMetadata(fileInfo);
+      filesMetadataOutput.emit(fileMetadata);
+      filesProcessed++;
+      if (!fileMetadata.isDirectory()) {
+        blockMetadataIterator = new BlockMetadataIterator(this, fileMetadata, blockSize);
+        if (!emitBlockMetadata()) {
+          //block threshold reached
+          return false;
+        }
+      }
+      return true;
+    } catch (IOException e) {
+      throw new RuntimeException("creating metadata", e);
+    }
+  }
+
+  /**
+   * @return true if all the blocks were emitted; false otherwise
+   */
+  protected boolean emitBlockMetadata()
+  {
+    while (blockMetadataIterator.hasNext()) {
+      if (blockCount++ < blocksThreshold) {
+        this.blocksMetadataOutput.emit(blockMetadataIterator.next());
+      } else {
+        return false;
+      }
+    }
+    blockMetadataIterator = null;
+    return true;
+  }
+
+  /**
+   * Builds block metadata
+   *
+   * @param pos                 offset of the block
+   * @param lengthOfFileInBlock length of the block in file
+   * @param blockNumber         block number
+   * @param fileMetadata        file metadata
+   * @param isLast              last block of the file
+   * @return
+   */
+  protected BlockMetadata.FileBlockMetadata buildBlockMetadata(long pos, long lengthOfFileInBlock, int blockNumber,
+                                                               FileMetadata fileMetadata, boolean isLast)
+  {
+    BlockMetadata.FileBlockMetadata fileBlockMetadata = createBlockMetadata(fileMetadata);
+    fileBlockMetadata.setBlockId(fileMetadata.getBlockIds()[blockNumber - 1]);
+    fileBlockMetadata.setOffset(pos);
+    fileBlockMetadata.setLength(lengthOfFileInBlock);
+    fileBlockMetadata.setLastBlock(isLast);
+    fileBlockMetadata.setPreviousBlockId(blockNumber == 1 ? -1 : fileMetadata.getBlockIds()[blockNumber - 2]);
+
+    return fileBlockMetadata;
+  }
+
+  /**
+   * Can be overridden for creating block metadata of a type that extends {@link BlockMetadata.FileBlockMetadata}
+   */
+  protected BlockMetadata.FileBlockMetadata createBlockMetadata(FileMetadata fileMetadata)
+  {
+    return new BlockMetadata.FileBlockMetadata(fileMetadata.getFilePath());
+  }
+
+  /**
+   * Creates file-metadata and populates no. of blocks in the metadata.
+   *
+   * @param fileInfo file information
+   * @return file-metadata
+   * @throws IOException
+   */
+  protected FileMetadata buildFileMetadata(FileInfo fileInfo) throws IOException
+  {
+    LOG.debug("file {}", fileInfo.getFilePath());
+    FileMetadata fileMetadata = createFileMetadata(fileInfo);
+    Path path = new Path(fileInfo.getFilePath());
+
+    fileMetadata.setFileName(path.getName());
+
+    FileStatus status = getFileStatus(path);
+    fileMetadata.setDirectory(status.isDirectory());
+    fileMetadata.setFileLength(status.getLen());
+
+    if (!status.isDirectory()) {
+      int noOfBlocks = (int)((status.getLen() / blockSize) + (((status.getLen() % blockSize) == 0) ? 0 : 1));
+      if (fileMetadata.getDataOffset() >= status.getLen()) {
+        noOfBlocks = 0;
+      }
+      fileMetadata.setNumberOfBlocks(noOfBlocks);
+      populateBlockIds(fileMetadata);
+    }
+    return fileMetadata;
+  }
+
+  /**
+   * This can be over-ridden to create file metadata of type that extends {@link FileSplitterInput.FileMetadata}
+   *
+   * @param fileInfo file information
+   * @return file-metadata
+   */
+  protected FileMetadata createFileMetadata(FileInfo fileInfo)
+  {
+    return new FileMetadata(fileInfo.getFilePath());
+  }
+
+  protected void populateBlockIds(FileMetadata fileMetadata)
+  {
+    // block ids are 32 bits of operatorId | 32 bits of sequence number
+    long[] blockIds = new long[fileMetadata.getNumberOfBlocks()];
+    long longLeftSide = ((long)operatorId) << 32;
+    for (int i = 0; i < fileMetadata.getNumberOfBlocks(); i++) {
+      blockIds[i] = longLeftSide | sequenceNo++ & 0xFFFFFFFFL;
+    }
+    fileMetadata.setBlockIds(blockIds);
+  }
+
+  /**
+   * Get default block size which is used when the user hasn't specified block size.
+   *
+   * @return default block size.
+   */
+  protected abstract long getDefaultBlockSize();
+
+  /**
+   * Get status of a file.
+   *
+   * @param path path of a file
+   * @return file status
+   */
+  protected abstract FileStatus getFileStatus(Path path) throws IOException;
+
+  public void setBlockSize(Long blockSize)
+  {
+    this.blockSize = blockSize;
+  }
+
+  public Long getBlockSize()
+  {
+    return blockSize;
+  }
+
+  public void setBlocksThreshold(int threshold)
+  {
+    this.blocksThreshold = threshold;
+  }
+
+  public int getBlocksThreshold()
+  {
+    return blocksThreshold;
+  }
+
+  /**
+   * An {@link Iterator} for Block-Metadatas of a file.
+   */
+  protected static class BlockMetadataIterator implements Iterator<BlockMetadata.FileBlockMetadata>
+  {
+    private final FileMetadata fileMetadata;
+    private final long blockSize;
+
+    private long pos;
+    private int blockNumber;
+
+    private final transient AbstractFileSplitter splitter;
+
+    protected BlockMetadataIterator()
+    {
+      //for kryo
+      fileMetadata = null;
+      blockSize = -1;
+      splitter = null;
+    }
+
+    protected BlockMetadataIterator(AbstractFileSplitter splitter, FileMetadata fileMetadata, long blockSize)
+    {
+      this.splitter = splitter;
+      this.fileMetadata = fileMetadata;
+      this.blockSize = blockSize;
+      this.pos = fileMetadata.getDataOffset();
+      this.blockNumber = 0;
+    }
+
+    @Override
+    public boolean hasNext()
+    {
+      return pos < fileMetadata.getFileLength();
+    }
+
+    @SuppressWarnings("StatementWithEmptyBody")
+    @Override
+    public BlockMetadata.FileBlockMetadata next()
+    {
+      long length;
+      while ((length = blockSize * ++blockNumber) <= pos) {
+      }
+      boolean isLast = length >= fileMetadata.getFileLength();
+      long lengthOfFileInBlock = isLast ? fileMetadata.getFileLength() : length;
+      BlockMetadata.FileBlockMetadata fileBlock = splitter.buildBlockMetadata(pos, lengthOfFileInBlock, blockNumber, fileMetadata, isLast);
+      pos = lengthOfFileInBlock;
+      return fileBlock;
+    }
+
+    @Override
+    public void remove()
+    {
+      throw new UnsupportedOperationException("remove not supported");
+    }
+  }
+
+  /**
+   * Represents the file metadata - file path, name, no. of blocks, etc.
+   */
+  public static class FileMetadata
+  {
+    @NotNull
+    private String filePath;
+    private String fileName;
+    private int numberOfBlocks;
+    private long dataOffset;
+    private long fileLength;
+    private long discoverTime;
+    private long[] blockIds;
+    private boolean isDirectory;
+
+    @SuppressWarnings("unused")
+    protected FileMetadata()
+    {
+      //for kryo
+      filePath = null;
+      discoverTime = System.currentTimeMillis();
+    }
+
+    /**
+     * Constructs file metadata
+     *
+     * @param filePath file path
+     */
+    public FileMetadata(@NotNull String filePath)
+    {
+      this.filePath = filePath;
+      discoverTime = System.currentTimeMillis();
+    }
+
+    /**
+     * Returns the total number of blocks.
+     */
+    public int getNumberOfBlocks()
+    {
+      return numberOfBlocks;
+    }
+
+    /**
+     * Sets the total number of blocks.
+     */
+    public void setNumberOfBlocks(int numberOfBlocks)
+    {
+      this.numberOfBlocks = numberOfBlocks;
+    }
+
+    /**
+     * Returns the file name.
+     */
+    public String getFileName()
+    {
+      return fileName;
+    }
+
+    /**
+     * Sets the file name.
+     */
+    public void setFileName(String fileName)
+    {
+      this.fileName = fileName;
+    }
+
+    /**
+     * Sets the file path.
+     */
+    public void setFilePath(String filePath)
+    {
+      this.filePath = filePath;
+    }
+
+    /**
+     * Returns the file path.
+     */
+    public String getFilePath()
+    {
+      return filePath;
+    }
+
+    /**
+     * Returns the data offset.
+     */
+    public long getDataOffset()
+    {
+      return dataOffset;
+    }
+
+    /**
+     * Sets the data offset.
+     */
+    public void setDataOffset(long offset)
+    {
+      this.dataOffset = offset;
+    }
+
+    /**
+     * Returns the file length.
+     */
+    public long getFileLength()
+    {
+      return fileLength;
+    }
+
+    /**
+     * Sets the file length.
+     */
+    public void setFileLength(long fileLength)
+    {
+      this.fileLength = fileLength;
+    }
+
+    /**
+     * Returns the file discover time.
+     */
+    public long getDiscoverTime()
+    {
+      return discoverTime;
+    }
+
+    /**
+     * Sets the discover time.
+     */
+    public void setDiscoverTime(long discoverTime)
+    {
+      this.discoverTime = discoverTime;
+    }
+
+    /**
+     * Returns the block ids associated with the file.
+     */
+    public long[] getBlockIds()
+    {
+      return blockIds;
+    }
+
+    /**
+     * Sets the blocks ids of the file.
+     */
+    public void setBlockIds(long[] blockIds)
+    {
+      this.blockIds = blockIds;
+    }
+
+    /**
+     * Sets whether the file metadata is a directory.
+     */
+    public void setDirectory(boolean isDirectory)
+    {
+      this.isDirectory = isDirectory;
+    }
+
+    /**
+     * @return true if it is a directory; false otherwise.
+     */
+    public boolean isDirectory()
+    {
+      return isDirectory;
+    }
+  }
+
+  /**
+   * A class that encapsulates file path.
+   */
+  public static class FileInfo
+  {
+    protected final String directoryPath;
+    protected final String relativeFilePath;
+
+    protected FileInfo()
+    {
+      directoryPath = null;
+      relativeFilePath = null;
+    }
+
+    public FileInfo(@Nullable String directoryPath, @NotNull String relativeFilePath)
+    {
+      this.directoryPath = directoryPath;
+      this.relativeFilePath = relativeFilePath;
+    }
+
+    /**
+     * @return directory path
+     */
+    public String getDirectoryPath()
+    {
+      return directoryPath;
+    }
+
+    /**
+     * @return path relative to directory
+     */
+    public String getRelativeFilePath()
+    {
+      return relativeFilePath;
+    }
+
+    /**
+     * @return full path of the file
+     */
+    public String getFilePath()
+    {
+      if (directoryPath == null) {
+        return relativeFilePath;
+      }
+      return new Path(directoryPath, relativeFilePath).toUri().getPath();
+    }
+  }
+
+  private static final Logger LOG = LoggerFactory.getLogger(AbstractFileSplitter.class);
+}

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/ea202dae/library/src/main/java/com/datatorrent/lib/io/fs/FileSplitter.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/io/fs/FileSplitter.java b/library/src/main/java/com/datatorrent/lib/io/fs/FileSplitter.java
index 58554c3..874d486 100644
--- a/library/src/main/java/com/datatorrent/lib/io/fs/FileSplitter.java
+++ b/library/src/main/java/com/datatorrent/lib/io/fs/FileSplitter.java
@@ -59,14 +59,15 @@ import com.datatorrent.lib.io.block.BlockMetadata.FileBlockMetadata;
  * The operator emits block metadata and file metadata.<br/>
  *
  * The file system/directory space should be different for different partitions of file splitter.
- * The scanning of
  *
+ * @deprecated  use {@link FileSplitterInput}. This splitter has issues with recovery and fixing that breaks backward compatibility.
  * @displayName File Splitter
  * @category Input
  * @tags file
  * @since 2.0.0
  */
 @OperatorAnnotation(checkpointableWithinAppWindow = false)
+@Deprecated
 public class FileSplitter implements InputOperator, Operator.CheckpointListener
 {
   protected Long blockSize;
@@ -454,6 +455,7 @@ public class FileSplitter implements InputOperator, Operator.CheckpointListener
   /**
    * Represents the file metadata - file path, name, no. of blocks, etc.
    */
+  @Deprecated
   public static class FileMetadata
   {
     @NotNull
@@ -614,6 +616,7 @@ public class FileSplitter implements InputOperator, Operator.CheckpointListener
     }
   }
 
+  @Deprecated
   public static class TimeBasedDirectoryScanner implements Component<Context.OperatorContext>, Runnable
   {
     private static long DEF_SCAN_INTERVAL_MILLIS = 5000;
@@ -939,6 +942,7 @@ public class FileSplitter implements InputOperator, Operator.CheckpointListener
   /**
    * A class that represents the file discovered by time-based scanner.
    */
+  @Deprecated
   protected static class FileInfo
   {
     protected final String directoryPath;

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/ea202dae/library/src/main/java/com/datatorrent/lib/io/fs/FileSplitterBase.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/io/fs/FileSplitterBase.java b/library/src/main/java/com/datatorrent/lib/io/fs/FileSplitterBase.java
new file mode 100644
index 0000000..005377c
--- /dev/null
+++ b/library/src/main/java/com/datatorrent/lib/io/fs/FileSplitterBase.java
@@ -0,0 +1,142 @@
+/**
+ * Copyright (C) 2015 DataTorrent, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.datatorrent.lib.io.fs;
+
+import java.io.IOException;
+import java.util.LinkedList;
+
+import javax.validation.constraints.NotNull;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+import com.google.common.base.Preconditions;
+
+import com.datatorrent.api.Context;
+import com.datatorrent.api.DefaultInputPort;
+import com.datatorrent.api.Operator;
+
+/**
+ * A file splitter that receives its input from an upstream operator.
+ */
+public class FileSplitterBase extends AbstractFileSplitter implements Operator.IdleTimeHandler
+{
+  @NotNull
+  protected String file;
+  protected transient FileSystem fs;
+
+  protected final LinkedList<FileInfo> fileInfos;
+  protected transient int sleepTimeMillis;
+
+  public FileSplitterBase()
+  {
+    fileInfos = new LinkedList<>();
+  }
+
+  public final transient DefaultInputPort<FileInfo> input = new DefaultInputPort<FileInfo>()
+  {
+    @Override
+    public void process(FileInfo fileInfo)
+    {
+      fileInfos.add(fileInfo);
+      FileSplitterBase.this.process();
+    }
+  };
+
+  @Override
+  public void setup(Context.OperatorContext context)
+  {
+    sleepTimeMillis = context.getValue(Context.OperatorContext.SPIN_MILLIS);
+    try {
+      fs = getFSInstance();
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+    super.setup(context);
+  }
+
+  protected FileSystem getFSInstance() throws IOException
+  {
+    return FileSystem.newInstance(new Path(file).toUri(), new Configuration());
+  }
+
+  @Override
+  protected FileInfo getFileInfo()
+  {
+    if (fileInfos.size() > 0) {
+      return fileInfos.remove();
+    }
+    return null;
+  }
+
+  @Override
+  public void handleIdleTime()
+  {
+    if (blockCount < blocksThreshold && (blockMetadataIterator != null || fileInfos.size() > 0)) {
+      process();
+    } else {
+      /* nothing to do here, so sleep for a while to avoid busy loop */
+      try {
+        Thread.sleep(sleepTimeMillis);
+      } catch (InterruptedException ie) {
+        throw new RuntimeException(ie);
+      }
+    }
+  }
+
+  @Override
+  public void teardown()
+  {
+    super.teardown();
+    try {
+      fs.close();
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  @Override
+  protected long getDefaultBlockSize()
+  {
+    return fs.getDefaultBlockSize(new Path(file));
+  }
+
+  @Override
+  protected FileStatus getFileStatus(Path path) throws IOException
+  {
+    return fs.getFileStatus(path);
+  }
+
+  /**
+   * File path from which the File System is inferred.
+   *
+   * @param file files
+   */
+  public void setFile(@NotNull String file)
+  {
+    this.file = Preconditions.checkNotNull(file, "file path");
+  }
+
+  /**
+   * @return file path from which the File System is inferred.
+   */
+  public String getFile()
+  {
+    return file;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/ea202dae/library/src/main/java/com/datatorrent/lib/io/fs/FileSplitterInput.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/io/fs/FileSplitterInput.java b/library/src/main/java/com/datatorrent/lib/io/fs/FileSplitterInput.java
new file mode 100644
index 0000000..a381be5
--- /dev/null
+++ b/library/src/main/java/com/datatorrent/lib/io/fs/FileSplitterInput.java
@@ -0,0 +1,594 @@
+/**
+ * Copyright (C) 2015 DataTorrent, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.datatorrent.lib.io.fs;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.net.URI;
+import java.util.*;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingDeque;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import javax.annotation.Nullable;
+import javax.validation.constraints.Min;
+import javax.validation.constraints.NotNull;
+import javax.validation.constraints.Size;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Joiner;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Splitter;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+
+import com.datatorrent.api.Component;
+import com.datatorrent.api.Context;
+import com.datatorrent.api.InputOperator;
+import com.datatorrent.api.Operator;
+import com.datatorrent.api.annotation.OperatorAnnotation;
+import com.datatorrent.api.annotation.Stateless;
+
+import com.datatorrent.lib.io.IdempotentStorageManager;
+import com.datatorrent.netlet.util.DTThrowable;
+
+/**
+ * Input operator that scans a directory for files and splits a file into blocks.<br/>
+ * The operator emits block metadata and file metadata.<br/>
+ *
+ * The file system/directory space should be different for different partitions of file splitter.
+ * The scanning of
+ *
+ * @displayName File Splitter
+ * @category Input
+ * @tags file
+ * @since 2.0.0
+ */
+@OperatorAnnotation(checkpointableWithinAppWindow = false)
+public class FileSplitterInput extends AbstractFileSplitter implements InputOperator, Operator.CheckpointListener
+{
+  @NotNull
+  private IdempotentStorageManager idempotentStorageManager;
+  @NotNull
+  protected final transient LinkedList<ScannedFileInfo> currentWindowRecoveryState;
+
+  @NotNull
+  private TimeBasedDirectoryScanner scanner;
+  @NotNull
+  private Map<String, Long> referenceTimes;
+
+  private transient long sleepMillis;
+
+  public FileSplitterInput()
+  {
+    super();
+    currentWindowRecoveryState = Lists.newLinkedList();
+    idempotentStorageManager = new IdempotentStorageManager.FSIdempotentStorageManager();
+    referenceTimes = Maps.newHashMap();
+    scanner = new TimeBasedDirectoryScanner();
+  }
+
+  @Override
+  public void setup(Context.OperatorContext context)
+  {
+    sleepMillis = context.getValue(Context.OperatorContext.SPIN_MILLIS);
+    scanner.setup(context);
+    idempotentStorageManager.setup(context);
+    super.setup(context);
+
+    long largestRecoveryWindow = idempotentStorageManager.getLargestRecoveryWindow();
+    if (largestRecoveryWindow == Stateless.WINDOW_ID || context.getValue(Context.OperatorContext.ACTIVATION_WINDOW_ID) > largestRecoveryWindow) {
+      scanner.startScanning(Collections.unmodifiableMap(referenceTimes));
+    }
+  }
+
+  @Override
+  public void beginWindow(long windowId)
+  {
+    super.beginWindow(windowId);
+    if (windowId <= idempotentStorageManager.getLargestRecoveryWindow()) {
+      replay(windowId);
+    }
+  }
+
+  protected void replay(long windowId)
+  {
+    try {
+      @SuppressWarnings("unchecked")
+      LinkedList<ScannedFileInfo> recoveredData = (LinkedList<ScannedFileInfo>)idempotentStorageManager.load(operatorId, windowId);
+      if (recoveredData == null) {
+        //This could happen when there are multiple physical instances and one of them is ahead in processing windows.
+        return;
+      }
+      if (blockMetadataIterator != null) {
+        emitBlockMetadata();
+      }
+      for (ScannedFileInfo info : recoveredData) {
+        updateReferenceTimes(info);
+        FileMetadata fileMetadata = buildFileMetadata(info);
+        filesMetadataOutput.emit(fileMetadata);
+
+        blockMetadataIterator = new BlockMetadataIterator(this, fileMetadata, blockSize);
+        if (!emitBlockMetadata()) {
+          break;
+        }
+      }
+    } catch (IOException e) {
+      throw new RuntimeException("replay", e);
+    }
+    if (windowId == idempotentStorageManager.getLargestRecoveryWindow()) {
+      scanner.startScanning(Collections.unmodifiableMap(referenceTimes));
+    }
+  }
+
+  @Override
+  public void emitTuples()
+  {
+    if (currentWindowId <= idempotentStorageManager.getLargestRecoveryWindow()) {
+      return;
+    }
+
+    Throwable throwable;
+    if ((throwable = scanner.atomicThrowable.get()) != null) {
+      DTThrowable.rethrow(throwable);
+    }
+    if (blockMetadataIterator == null && scanner.discoveredFiles.isEmpty()) {
+      try {
+        Thread.sleep(sleepMillis);
+      } catch (InterruptedException e) {
+        throw new RuntimeException("waiting for work", e);
+      }
+    }
+    process();
+  }
+
+  @Override
+  protected FileInfo getFileInfo()
+  {
+    return scanner.pollFile();
+  }
+
+  @Override
+  protected boolean processFileInfo(FileInfo fileInfo)
+  {
+    ScannedFileInfo scannedFileInfo = (ScannedFileInfo)fileInfo;
+    currentWindowRecoveryState.add(scannedFileInfo);
+    updateReferenceTimes(scannedFileInfo);
+    return super.processFileInfo(fileInfo) && !scannedFileInfo.lastFileOfScan;
+  }
+
+  protected void updateReferenceTimes(ScannedFileInfo fileInfo)
+  {
+    referenceTimes.put(fileInfo.getFilePath(), fileInfo.modifiedTime);
+  }
+
+  @Override
+  public void endWindow()
+  {
+    if (currentWindowId > idempotentStorageManager.getLargestRecoveryWindow()) {
+      try {
+        idempotentStorageManager.save(currentWindowRecoveryState, operatorId, currentWindowId);
+      } catch (IOException e) {
+        throw new RuntimeException("saving recovery", e);
+      }
+    }
+    currentWindowRecoveryState.clear();
+  }
+
+  @Override
+  protected long getDefaultBlockSize()
+  {
+    return scanner.fs.getDefaultBlockSize(new Path(scanner.files.iterator().next()));
+  }
+
+  @Override
+  protected FileStatus getFileStatus(Path path) throws IOException
+  {
+    return scanner.fs.getFileStatus(path);
+  }
+
+  @Override
+  public void checkpointed(long l)
+  {
+  }
+
+  @Override
+  public void committed(long l)
+  {
+    try {
+      idempotentStorageManager.deleteUpTo(operatorId, l);
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  @Override
+  public void teardown()
+  {
+    scanner.teardown();
+  }
+
+  public void setIdempotentStorageManager(IdempotentStorageManager idempotentStorageManager)
+  {
+    this.idempotentStorageManager = idempotentStorageManager;
+  }
+
+  public IdempotentStorageManager getIdempotentStorageManager()
+  {
+    return this.idempotentStorageManager;
+  }
+
+  public void setScanner(TimeBasedDirectoryScanner scanner)
+  {
+    this.scanner = scanner;
+  }
+
+  public TimeBasedDirectoryScanner getScanner()
+  {
+    return this.scanner;
+  }
+
+  public static class TimeBasedDirectoryScanner implements Runnable, Component<Context.OperatorContext>
+  {
+    private static long DEF_SCAN_INTERVAL_MILLIS = 5000;
+
+    private boolean recursive;
+
+    private transient volatile boolean trigger;
+
+    @NotNull
+    @Size(min = 1)
+    private final Set<String> files;
+
+    @Min(0)
+    private long scanIntervalMillis;
+    private String filePatternRegularExp;
+
+    protected transient long lastScanMillis;
+    protected transient FileSystem fs;
+    protected final transient LinkedBlockingDeque<ScannedFileInfo> discoveredFiles;
+    protected final transient ExecutorService scanService;
+    protected final transient AtomicReference<Throwable> atomicThrowable;
+
+    private transient volatile boolean running;
+    protected final transient HashSet<String> ignoredFiles;
+    protected transient Pattern regex;
+    protected transient long sleepMillis;
+    protected transient Map<String, Long> referenceTimes;
+
+    private transient ScannedFileInfo lastScannedInfo;
+
+    public TimeBasedDirectoryScanner()
+    {
+      recursive = true;
+      scanIntervalMillis = DEF_SCAN_INTERVAL_MILLIS;
+      files = Sets.newLinkedHashSet();
+      scanService = Executors.newSingleThreadExecutor();
+      discoveredFiles = new LinkedBlockingDeque<>();
+      atomicThrowable = new AtomicReference<>();
+      ignoredFiles = Sets.newHashSet();
+    }
+
+    @Override
+    public void setup(Context.OperatorContext context)
+    {
+      sleepMillis = context.getValue(Context.OperatorContext.SPIN_MILLIS);
+      if (filePatternRegularExp != null) {
+        regex = Pattern.compile(filePatternRegularExp);
+      }
+      try {
+        fs = getFSInstance();
+      } catch (IOException e) {
+        throw new RuntimeException("opening fs", e);
+      }
+    }
+
+    protected void startScanning(Map<String, Long> referenceTimes)
+    {
+      this.referenceTimes = Preconditions.checkNotNull(referenceTimes);
+      scanService.submit(this);
+    }
+
+    @Override
+    public void teardown()
+    {
+      running = false;
+      scanService.shutdownNow();
+      try {
+        fs.close();
+      } catch (IOException e) {
+        throw new RuntimeException("closing fs", e);
+      }
+    }
+
+    protected FileSystem getFSInstance() throws IOException
+    {
+      return FileSystem.newInstance(new Path(files.iterator().next()).toUri(), new Configuration());
+    }
+
+    @Override
+    public void run()
+    {
+      running = true;
+      try {
+        while (running) {
+          if ((trigger || (System.currentTimeMillis() - scanIntervalMillis >= lastScanMillis)) &&
+            (lastScannedInfo == null || referenceTimes.get(lastScannedInfo.getFilePath()) != null)) {
+            trigger = false;
+            for (String afile : files) {
+              scan(new Path(afile), null);
+            }
+            scanComplete();
+          } else {
+            Thread.sleep(sleepMillis);
+          }
+        }
+      } catch (Throwable throwable) {
+        LOG.error("service", throwable);
+        running = false;
+        atomicThrowable.set(throwable);
+        DTThrowable.rethrow(throwable);
+      }
+    }
+
+    /**
+     * Operations that need to be done once a scan is complete.
+     */
+    protected void scanComplete()
+    {
+      LOG.debug("scan complete {}", lastScanMillis);
+      ScannedFileInfo fileInfo = discoveredFiles.peekLast();
+      if (fileInfo != null) {
+        fileInfo.lastFileOfScan = true;
+        lastScannedInfo = fileInfo;
+      }
+      lastScanMillis = System.currentTimeMillis();
+    }
+
+    protected void scan(@NotNull Path filePath, Path rootPath)
+    {
+      try {
+        FileStatus parentStatus = fs.getFileStatus(filePath);
+        String parentPathStr = filePath.toUri().getPath();
+
+        LOG.debug("scan {}", parentPathStr);
+
+        FileStatus[] childStatuses = fs.listStatus(filePath);
+        for (FileStatus status : childStatuses) {
+          Path childPath = status.getPath();
+          ScannedFileInfo info = createScannedFileInfo(filePath, parentStatus, childPath, status, rootPath);
+
+          if (skipFile(childPath, status.getModificationTime(), referenceTimes.get(info.getFilePath()))) {
+            continue;
+          }
+
+          if (status.isDirectory()) {
+            if (recursive) {
+              scan(childPath, rootPath == null ? parentStatus.getPath() : rootPath);
+            }
+          }
+
+          String childPathStr = childPath.toUri().getPath();
+          if (ignoredFiles.contains(childPathStr)) {
+            continue;
+          }
+          if (acceptFile(childPathStr)) {
+            LOG.debug("found {}", childPathStr);
+            discoveredFiles.add(info);
+          } else {
+            // don't look at it again
+            ignoredFiles.add(childPathStr);
+          }
+        }
+      } catch (FileNotFoundException fnf) {
+        LOG.warn("Failed to list directory {}", filePath, fnf);
+      } catch (IOException e) {
+        throw new RuntimeException("listing files", e);
+      }
+    }
+
+    protected ScannedFileInfo createScannedFileInfo(Path parentPath, FileStatus parentStatus, Path childPath, @SuppressWarnings("UnusedParameters") FileStatus childStatus, Path rootPath)
+    {
+      ScannedFileInfo info;
+      if (rootPath == null) {
+        info = parentStatus.isDirectory() ?
+          new ScannedFileInfo(parentPath.toUri().getPath(), childPath.getName(), parentStatus.getModificationTime()) :
+          new ScannedFileInfo(null, childPath.toUri().getPath(), parentStatus.getModificationTime());
+      } else {
+        URI relativeChildURI = rootPath.toUri().relativize(childPath.toUri());
+        info = new ScannedFileInfo(rootPath.toUri().getPath(), relativeChildURI.getPath(), parentStatus.getModificationTime());
+      }
+      return info;
+    }
+
+    /**
+     * Skips file/directory based on their modification time.<br/>
+     *
+     * @param path                 file path
+     * @param modificationTime     modification time
+     * @param lastModificationTime last cached directory modification time
+     * @return true to skip; false otherwise.
+     * @throws IOException
+     */
+    protected static boolean skipFile(@SuppressWarnings("unused") @NotNull Path path, @NotNull Long modificationTime,
+                                      Long lastModificationTime) throws IOException
+    {
+      return (!(lastModificationTime == null || modificationTime > lastModificationTime));
+    }
+
+    /**
+     * Accepts file which match a regular pattern.
+     *
+     * @param filePathStr file path
+     * @return true if the path matches the pattern; false otherwise;
+     */
+    protected boolean acceptFile(String filePathStr)
+    {
+      if (regex != null) {
+        Matcher matcher = regex.matcher(filePathStr);
+        if (!matcher.matches()) {
+          return false;
+        }
+      }
+      return true;
+    }
+
+    public FileInfo pollFile()
+    {
+      return discoveredFiles.poll();
+    }
+
+    /**
+     * Gets the regular expression for file names to split.
+     *
+     * @return regular expression
+     */
+    public String getFilePatternRegularExp()
+    {
+      return filePatternRegularExp;
+    }
+
+    /**
+     * Only files with names matching the given java regular expression are split.
+     *
+     * @param filePatternRegexp regular expression
+     */
+    public void setFilePatternRegularExp(String filePatternRegexp)
+    {
+      this.filePatternRegularExp = filePatternRegexp;
+    }
+
+    /**
+     * A comma separated list of directories to scan. If the path is not fully qualified the default
+     * file system is used. A fully qualified path can be provided to scan directories in other filesystems.
+     *
+     * @param files files
+     */
+    public void setFiles(String files)
+    {
+      Iterables.addAll(this.files, Splitter.on(",").omitEmptyStrings().split(files));
+    }
+
+    /**
+     * Gets the files to be scanned.
+     *
+     * @return files to be scanned.
+     */
+    public String getFiles()
+    {
+      return Joiner.on(",").join(this.files);
+    }
+
+    /**
+     * True if recursive; false otherwise.
+     *
+     * @param recursive true if recursive; false otherwise.
+     */
+    public void setRecursive(boolean recursive)
+    {
+      this.recursive = recursive;
+    }
+
+    /**
+     * Sets whether scan will be recursive.
+     *
+     * @return true if recursive; false otherwise.
+     */
+    public boolean isRecursive()
+    {
+      return this.recursive;
+    }
+
+    /**
+     * Sets the trigger which will initiate scan.
+     *
+     * @param trigger
+     */
+    public void setTrigger(boolean trigger)
+    {
+      this.trigger = trigger;
+    }
+
+    /**
+     * The trigger which will initiate scan.
+     *
+     * @return trigger
+     */
+    public boolean isTrigger()
+    {
+      return this.trigger;
+    }
+
+    /**
+     * Returns the frequency with which new files are scanned for in milliseconds.
+     *
+     * @return The scan interval in milliseconds.
+     */
+    public long getScanIntervalMillis()
+    {
+      return scanIntervalMillis;
+    }
+
+    /**
+     * Sets the frequency with which new files are scanned for in milliseconds.
+     *
+     * @param scanIntervalMillis The scan interval in milliseconds.
+     */
+    public void setScanIntervalMillis(long scanIntervalMillis)
+    {
+      this.scanIntervalMillis = scanIntervalMillis;
+    }
+  }
+
+  /**
+   * File info created for files discovered by scanner
+   */
+  public static class ScannedFileInfo extends AbstractFileSplitter.FileInfo
+  {
+    protected final long modifiedTime;
+    private transient boolean lastFileOfScan;
+
+    private ScannedFileInfo()
+    {
+      super();
+      modifiedTime = -1;
+    }
+
+    public ScannedFileInfo(@Nullable String directoryPath, @NotNull String relativeFilePath, long modifiedTime)
+    {
+      super(directoryPath, relativeFilePath);
+      this.modifiedTime = modifiedTime;
+    }
+
+    protected boolean isLastFileOfScan()
+    {
+      return lastFileOfScan;
+    }
+  }
+
+  private static final Logger LOG = LoggerFactory.getLogger(FileSplitterInput.class);
+}

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/ea202dae/library/src/test/java/com/datatorrent/lib/io/fs/FileSplitterBaseTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/com/datatorrent/lib/io/fs/FileSplitterBaseTest.java b/library/src/test/java/com/datatorrent/lib/io/fs/FileSplitterBaseTest.java
new file mode 100644
index 0000000..61ccfad
--- /dev/null
+++ b/library/src/test/java/com/datatorrent/lib/io/fs/FileSplitterBaseTest.java
@@ -0,0 +1,264 @@
+/**
+ * Copyright (C) 2015 DataTorrent, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.datatorrent.lib.io.fs;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.junit.*;
+import org.junit.rules.TestWatcher;
+import org.junit.runner.Description;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.datatorrent.api.*;
+import com.datatorrent.api.annotation.ApplicationAnnotation;
+
+import com.datatorrent.common.util.BaseOperator;
+import com.datatorrent.lib.helper.OperatorContextTestHelper;
+import com.datatorrent.lib.io.block.BlockMetadata;
+import com.datatorrent.lib.testbench.CollectorTestSink;
+import com.datatorrent.lib.util.TestUtils;
+
+/**
+ * Tests for {@link FileSplitterBase}
+ */
+public class FileSplitterBaseTest
+{
+  @ClassRule
+  public static FileSplitterInputTest.TestClassMeta classTestMeta = new FileSplitterInputTest.TestClassMeta();
+
+  static class BastTestMeta extends TestWatcher
+  {
+    public String dataDirectory;
+
+    FileSplitterBase fileSplitter;
+    CollectorTestSink<FileSplitterInput.FileMetadata> fileMetadataSink;
+    CollectorTestSink<BlockMetadata.FileBlockMetadata> blockMetadataSink;
+    Set<String> filePaths;
+    Context.OperatorContext context;
+
+    @Override
+    protected void starting(org.junit.runner.Description description)
+    {
+
+      String methodName = description.getMethodName();
+      String className = description.getClassName();
+      this.dataDirectory = "target/" + className + "/" + methodName;
+      try {
+        filePaths = FileSplitterInputTest.createData(this.dataDirectory);
+      } catch (IOException e) {
+        throw new RuntimeException(e);
+      }
+
+      fileSplitter = new FileSplitterBase();
+      fileSplitter.setFile(this.dataDirectory);
+
+      Attribute.AttributeMap.DefaultAttributeMap attributes = new Attribute.AttributeMap.DefaultAttributeMap();
+      attributes.put(Context.OperatorContext.SPIN_MILLIS, 500);
+
+      context = new OperatorContextTestHelper.TestIdOperatorContext(0, attributes);
+      fileSplitter.setup(context);
+
+      fileMetadataSink = new CollectorTestSink<>();
+      TestUtils.setSink(fileSplitter.filesMetadataOutput, fileMetadataSink);
+
+      blockMetadataSink = new CollectorTestSink<>();
+      TestUtils.setSink(fileSplitter.blocksMetadataOutput, blockMetadataSink);
+    }
+
+    @Override
+    protected void finished(Description description)
+    {
+      this.fileSplitter.teardown();
+    }
+  }
+
+  @Rule
+  public BastTestMeta baseTestMeta = new BastTestMeta();
+
+  @Test
+  public void testFileMetadata() throws InterruptedException
+  {
+    baseTestMeta.fileSplitter.beginWindow(1);
+    for (String filePath : baseTestMeta.filePaths) {
+      baseTestMeta.fileSplitter.input.process(new FileSplitterInput.FileInfo(null, filePath));
+    }
+    baseTestMeta.fileSplitter.endWindow();
+    Assert.assertEquals("File metadata", 12, baseTestMeta.fileMetadataSink.collectedTuples.size());
+    for (Object fileMetadata : baseTestMeta.fileMetadataSink.collectedTuples) {
+      FileSplitterInput.FileMetadata metadata = (FileSplitterInput.FileMetadata)fileMetadata;
+      Assert.assertTrue("path: " + metadata.getFilePath(), baseTestMeta.filePaths.contains(metadata.getFilePath()));
+      Assert.assertNotNull("name: ", metadata.getFileName());
+    }
+
+    baseTestMeta.fileMetadataSink.collectedTuples.clear();
+  }
+
+  @Test
+  public void testBlockMetadataNoSplit() throws InterruptedException
+  {
+    baseTestMeta.fileSplitter.beginWindow(1);
+    for (String filePath : baseTestMeta.filePaths) {
+      baseTestMeta.fileSplitter.input.process(new FileSplitterInput.FileInfo(null, filePath));
+    }
+    Assert.assertEquals("Blocks", 12, baseTestMeta.blockMetadataSink.collectedTuples.size());
+    for (Object blockMetadata : baseTestMeta.blockMetadataSink.collectedTuples) {
+      BlockMetadata.FileBlockMetadata metadata = (BlockMetadata.FileBlockMetadata)blockMetadata;
+      Assert.assertTrue("path: " + metadata.getFilePath(), baseTestMeta.filePaths.contains(metadata.getFilePath()));
+    }
+  }
+
+  @Test
+  public void testBlockMetadataWithSplit() throws InterruptedException
+  {
+    baseTestMeta.fileSplitter.setBlockSize(2L);
+    baseTestMeta.fileSplitter.beginWindow(1);
+    for (String filePath : baseTestMeta.filePaths) {
+      baseTestMeta.fileSplitter.input.process(new FileSplitterInput.FileInfo(null, filePath));
+    }
+    Assert.assertEquals("Files", 12, baseTestMeta.fileMetadataSink.collectedTuples.size());
+
+    int noOfBlocks = 0;
+    for (int i = 0; i < 12; i++) {
+      FileSplitterInput.FileMetadata fm = baseTestMeta.fileMetadataSink.collectedTuples.get(i);
+      File testFile = new File(baseTestMeta.dataDirectory, fm.getFileName());
+      noOfBlocks += (int)Math.ceil(testFile.length() / (2 * 1.0));
+    }
+    Assert.assertEquals("Blocks", noOfBlocks, baseTestMeta.blockMetadataSink.collectedTuples.size());
+  }
+
+  @Test
+  public void testBlocksThreshold() throws InterruptedException
+  {
+    int noOfBlocks = 0;
+    for (int i = 0; i < 12; i++) {
+      File testFile = new File(baseTestMeta.dataDirectory, "file" + i + ".txt");
+      noOfBlocks += (int)Math.ceil(testFile.length() / (2 * 1.0));
+    }
+
+    baseTestMeta.fileSplitter.setBlockSize(2L);
+    baseTestMeta.fileSplitter.setBlocksThreshold(10);
+    baseTestMeta.fileSplitter.beginWindow(1);
+
+    for (String filePath : baseTestMeta.filePaths) {
+      baseTestMeta.fileSplitter.input.process(new FileSplitterInput.FileInfo(null, filePath));
+    }
+    baseTestMeta.fileSplitter.endWindow();
+
+    Assert.assertEquals("Blocks", 10, baseTestMeta.blockMetadataSink.collectedTuples.size());
+
+    for (int window = 2; window < 8; window++) {
+      baseTestMeta.fileSplitter.beginWindow(window);
+      baseTestMeta.fileSplitter.handleIdleTime();
+      baseTestMeta.fileSplitter.endWindow();
+    }
+
+    Assert.assertEquals("Files", 12, baseTestMeta.fileMetadataSink.collectedTuples.size());
+    Assert.assertEquals("Blocks", noOfBlocks, baseTestMeta.blockMetadataSink.collectedTuples.size());
+  }
+
+  @Test
+  public void testSplitterInApp() throws Exception
+  {
+    LocalMode lma = LocalMode.newInstance();
+    SplitterApp app = new SplitterApp();
+    lma.prepareDAG(app, new Configuration());
+    lma.cloneDAG(); // check serialization
+    LocalMode.Controller lc = lma.getController();
+    lc.runAsync();
+    app.receiver.latch.await();
+    Assert.assertEquals("no. of metadata", 12, app.receiver.count);
+    FileUtils.deleteQuietly(new File("target/SplitterInApp"));
+  }
+
+  @ApplicationAnnotation(name = "TestApp")
+  class SplitterApp implements StreamingApplication
+  {
+    MockReceiver receiver;
+
+    @Override
+    public void populateDAG(DAG dag, Configuration configuration)
+    {
+      dag.setAttribute(DAG.APPLICATION_PATH, "target/SplitterInApp");
+      MockFileInput fileInput = dag.addOperator("Input", new MockFileInput());
+      fileInput.filePaths = baseTestMeta.filePaths;
+
+      FileSplitterBase splitter = dag.addOperator("Splitter", new FileSplitterBase());
+      splitter.setFile(baseTestMeta.dataDirectory);
+
+      receiver = dag.addOperator("Receiver", new MockReceiver());
+
+      dag.addStream("files", fileInput.files, splitter.input);
+      dag.addStream("file-metadata", splitter.filesMetadataOutput, receiver.fileMetadata);
+    }
+  }
+
+  static class MockReceiver extends BaseOperator implements StatsListener
+  {
+    @AutoMetric
+    int count;
+
+    transient CountDownLatch latch = new CountDownLatch(1);
+    public final transient DefaultInputPort<FileSplitterInput.FileMetadata> fileMetadata = new DefaultInputPort<FileSplitterInput.FileMetadata>()
+    {
+      @Override
+      public void process(FileSplitterInput.FileMetadata fileMetadata)
+      {
+        count++;
+        LOG.debug("count {}", count);
+      }
+    };
+
+    @Override
+    public Response processStats(BatchedOperatorStats stats)
+    {
+      Stats.OperatorStats operatorStats = stats.getLastWindowedStats().get(stats.getLastWindowedStats().size() - 1);
+      count = (Integer)operatorStats.metrics.get("count");
+      if (count == 12) {
+        latch.countDown();
+      }
+      return null;
+    }
+  }
+
+  static class MockFileInput extends BaseOperator implements InputOperator
+  {
+
+    public final transient DefaultOutputPort<FileSplitterInput.FileInfo> files = new DefaultOutputPort<>();
+
+    protected Set<String> filePaths;
+
+    protected boolean done;
+
+    @Override
+    public void emitTuples()
+    {
+      if (!done) {
+        done = true;
+        for (String file : filePaths) {
+          files.emit(new FileSplitterInput.FileInfo(null, file));
+        }
+      }
+    }
+  }
+
+  private static final transient Logger LOG = LoggerFactory.getLogger(FileSplitterBaseTest.class);
+}

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/ea202dae/library/src/test/java/com/datatorrent/lib/io/fs/FileSplitterInputTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/com/datatorrent/lib/io/fs/FileSplitterInputTest.java b/library/src/test/java/com/datatorrent/lib/io/fs/FileSplitterInputTest.java
new file mode 100644
index 0000000..8dfea7a
--- /dev/null
+++ b/library/src/test/java/com/datatorrent/lib/io/fs/FileSplitterInputTest.java
@@ -0,0 +1,486 @@
+/**
+ * Copyright (C) 2015 DataTorrent, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.datatorrent.lib.io.fs;
+
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeoutException;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.fs.FileContext;
+import org.apache.hadoop.fs.Path;
+import org.junit.*;
+import org.junit.rules.TestWatcher;
+import org.junit.runner.Description;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.Output;
+import com.google.common.collect.Sets;
+
+import com.datatorrent.api.Attribute;
+import com.datatorrent.api.Context;
+
+import com.datatorrent.lib.helper.OperatorContextTestHelper;
+import com.datatorrent.lib.io.IdempotentStorageManager;
+import com.datatorrent.lib.io.block.BlockMetadata;
+import com.datatorrent.lib.testbench.CollectorTestSink;
+import com.datatorrent.lib.util.TestUtils;
+
+/**
+ * Tests for {@link FileSplitterInput}
+ */
+public class FileSplitterInputTest
+{
+
+  public static class TestClassMeta extends TestWatcher
+  {
+    @Override
+    protected void finished(Description description)
+    {
+      try {
+        FileContext.getLocalFSFileContext().delete(new Path(new File("target/" + description.getClassName()).getAbsolutePath()), true);
+      } catch (IOException e) {
+        throw new RuntimeException(e);
+      }
+    }
+  }
+
+  static Set<String> createData(String dataDirectory) throws IOException
+  {
+    Set<String> filePaths = Sets.newHashSet();
+    FileContext.getLocalFSFileContext().delete(new Path(new File(dataDirectory).getAbsolutePath()), true);
+    HashSet<String> allLines = Sets.newHashSet();
+    for (int file = 0; file < 12; file++) {
+      HashSet<String> lines = Sets.newHashSet();
+      for (int line = 0; line < 2; line++) {
+        lines.add("f" + file + "l" + line);
+      }
+      allLines.addAll(lines);
+      File created = new File(dataDirectory, "file" + file + ".txt");
+      filePaths.add(new Path(dataDirectory, created.getName()).toUri().toString());
+      FileUtils.write(created, StringUtils.join(lines, '\n'));
+    }
+    return filePaths;
+  }
+
+  public static class TestMeta extends TestWatcher
+  {
+    public String dataDirectory;
+
+    FileSplitterInput fileSplitterInput;
+    CollectorTestSink<FileSplitterInput.FileMetadata> fileMetadataSink;
+    CollectorTestSink<BlockMetadata.FileBlockMetadata> blockMetadataSink;
+    Set<String> filePaths;
+    Context.OperatorContext context;
+    MockScanner scanner;
+
+    @Override
+    protected void starting(org.junit.runner.Description description)
+    {
+
+      String methodName = description.getMethodName();
+      String className = description.getClassName();
+      this.dataDirectory = "target/" + className + "/" + methodName + "/data";
+      try {
+        filePaths = createData(this.dataDirectory);
+      } catch (IOException e) {
+        throw new RuntimeException(e);
+      }
+
+      fileSplitterInput = new FileSplitterInput();
+      scanner = new MockScanner();
+      fileSplitterInput.setScanner(scanner);
+      fileSplitterInput.getScanner().setScanIntervalMillis(500);
+      fileSplitterInput.getScanner().setFilePatternRegularExp(".*[.]txt");
+      fileSplitterInput.getScanner().setFiles(dataDirectory);
+      fileSplitterInput.setIdempotentStorageManager(new IdempotentStorageManager.NoopIdempotentStorageManager());
+
+      Attribute.AttributeMap.DefaultAttributeMap attributes = new Attribute.AttributeMap.DefaultAttributeMap();
+      attributes.put(Context.DAGContext.APPLICATION_PATH, "target/" + className + "/" + methodName + "/" + Long.toHexString(System.currentTimeMillis()));
+
+      context = new OperatorContextTestHelper.TestIdOperatorContext(0, attributes);
+      fileSplitterInput.setup(context);
+
+      fileMetadataSink = new CollectorTestSink<>();
+      TestUtils.setSink(fileSplitterInput.filesMetadataOutput, fileMetadataSink);
+
+      blockMetadataSink = new CollectorTestSink<>();
+      TestUtils.setSink(fileSplitterInput.blocksMetadataOutput, blockMetadataSink);
+    }
+
+    @Override
+    protected void finished(Description description)
+    {
+      this.fileSplitterInput.teardown();
+    }
+  }
+
+  @ClassRule
+  public static TestClassMeta classTestMeta = new TestClassMeta();
+
+  @Rule
+  public TestMeta testMeta = new TestMeta();
+
+  @Test
+  public void testFileMetadata() throws InterruptedException
+  {
+    testMeta.fileSplitterInput.beginWindow(1);
+    testMeta.scanner.semaphore.acquire();
+
+    testMeta.fileSplitterInput.emitTuples();
+    testMeta.fileSplitterInput.endWindow();
+    Assert.assertEquals("File metadata", 12, testMeta.fileMetadataSink.collectedTuples.size());
+    for (Object fileMetadata : testMeta.fileMetadataSink.collectedTuples) {
+      FileSplitterInput.FileMetadata metadata = (FileSplitterInput.FileMetadata)fileMetadata;
+      Assert.assertTrue("path: " + metadata.getFilePath(), testMeta.filePaths.contains(metadata.getFilePath()));
+      Assert.assertNotNull("name: ", metadata.getFileName());
+    }
+
+    testMeta.fileMetadataSink.collectedTuples.clear();
+  }
+
+  @Test
+  public void testBlockMetadataNoSplit() throws InterruptedException
+  {
+    testMeta.fileSplitterInput.beginWindow(1);
+    testMeta.scanner.semaphore.acquire();
+
+    testMeta.fileSplitterInput.emitTuples();
+    Assert.assertEquals("Blocks", 12, testMeta.blockMetadataSink.collectedTuples.size());
+    for (Object blockMetadata : testMeta.blockMetadataSink.collectedTuples) {
+      BlockMetadata.FileBlockMetadata metadata = (BlockMetadata.FileBlockMetadata)blockMetadata;
+      Assert.assertTrue("path: " + metadata.getFilePath(), testMeta.filePaths.contains(metadata.getFilePath()));
+    }
+  }
+
+  @Test
+  public void testBlockMetadataWithSplit() throws InterruptedException
+  {
+    testMeta.fileSplitterInput.setBlockSize(2L);
+    testMeta.fileSplitterInput.beginWindow(1);
+    testMeta.scanner.semaphore.acquire();
+
+    testMeta.fileSplitterInput.emitTuples();
+    Assert.assertEquals("Files", 12, testMeta.fileMetadataSink.collectedTuples.size());
+
+    int noOfBlocks = 0;
+    for (int i = 0; i < 12; i++) {
+      FileSplitterInput.FileMetadata fm = testMeta.fileMetadataSink.collectedTuples.get(i);
+      File testFile = new File(testMeta.dataDirectory, fm.getFileName());
+      noOfBlocks += (int)Math.ceil(testFile.length() / (2 * 1.0));
+    }
+    Assert.assertEquals("Blocks", noOfBlocks, testMeta.blockMetadataSink.collectedTuples.size());
+  }
+
+  @Test
+  public void testIdempotency() throws InterruptedException
+  {
+    IdempotentStorageManager.FSIdempotentStorageManager fsIdempotentStorageManager =
+      new IdempotentStorageManager.FSIdempotentStorageManager();
+    testMeta.fileSplitterInput.setIdempotentStorageManager(fsIdempotentStorageManager);
+
+    testMeta.fileSplitterInput.setup(testMeta.context);
+    //will emit window 1 from data directory
+    testFileMetadata();
+    testMeta.fileMetadataSink.clear();
+    testMeta.blockMetadataSink.clear();
+
+    testMeta.fileSplitterInput.setup(testMeta.context);
+    testMeta.fileSplitterInput.beginWindow(1);
+    Assert.assertEquals("Blocks", 12, testMeta.blockMetadataSink.collectedTuples.size());
+    for (Object blockMetadata : testMeta.blockMetadataSink.collectedTuples) {
+      BlockMetadata.FileBlockMetadata metadata = (BlockMetadata.FileBlockMetadata)blockMetadata;
+      Assert.assertTrue("path: " + metadata.getFilePath(), testMeta.filePaths.contains(metadata.getFilePath()));
+    }
+  }
+
+  @Test
+  public void testTimeScan() throws InterruptedException, IOException, TimeoutException
+  {
+    testFileMetadata();
+    testMeta.fileMetadataSink.clear();
+    testMeta.blockMetadataSink.clear();
+
+    Thread.sleep(1000);
+    //added a new relativeFilePath
+    File f13 = new File(testMeta.dataDirectory, "file13" + ".txt");
+    HashSet<String> lines = Sets.newHashSet();
+    for (int line = 0; line < 2; line++) {
+      lines.add("f13" + "l" + line);
+    }
+    FileUtils.write(f13, StringUtils.join(lines, '\n'));
+
+    //window 2
+    testMeta.fileSplitterInput.beginWindow(2);
+    testMeta.scanner.semaphore.acquire();
+    testMeta.fileSplitterInput.emitTuples();
+    testMeta.fileSplitterInput.endWindow();
+
+    Assert.assertEquals("window 2: files", 1, testMeta.fileMetadataSink.collectedTuples.size());
+    Assert.assertEquals("window 2: blocks", 1, testMeta.blockMetadataSink.collectedTuples.size());
+  }
+
+  @Test
+  public void testTrigger() throws InterruptedException, IOException, TimeoutException
+  {
+    testMeta.fileSplitterInput.getScanner().setScanIntervalMillis(60 * 1000);
+    testFileMetadata();
+    testMeta.fileMetadataSink.clear();
+    testMeta.blockMetadataSink.clear();
+
+    Thread.sleep(1000);
+    //added a new relativeFilePath
+    File f13 = new File(testMeta.dataDirectory, "file13" + ".txt");
+    HashSet<String> lines = Sets.newHashSet();
+    for (int line = 0; line < 2; line++) {
+      lines.add("f13" + "l" + line);
+    }
+    FileUtils.write(f13, StringUtils.join(lines, '\n'));
+    testMeta.fileSplitterInput.getScanner().setTrigger(true);
+
+    //window 2
+    testMeta.fileSplitterInput.beginWindow(2);
+    testMeta.scanner.semaphore.acquire();
+    testMeta.fileSplitterInput.emitTuples();
+    testMeta.fileSplitterInput.endWindow();
+
+    Assert.assertEquals("window 2: files", 1, testMeta.fileMetadataSink.collectedTuples.size());
+    Assert.assertEquals("window 2: blocks", 1, testMeta.blockMetadataSink.collectedTuples.size());
+  }
+
+  @Test
+  public void testBlocksThreshold() throws InterruptedException
+  {
+    int noOfBlocks = 0;
+    for (int i = 0; i < 12; i++) {
+      File testFile = new File(testMeta.dataDirectory, "file" + i + ".txt");
+      noOfBlocks += (int)Math.ceil(testFile.length() / (2 * 1.0));
+    }
+
+    testMeta.fileSplitterInput.setBlockSize(2L);
+    testMeta.fileSplitterInput.setBlocksThreshold(10);
+    testMeta.fileSplitterInput.beginWindow(1);
+
+    testMeta.scanner.semaphore.acquire();
+    testMeta.fileSplitterInput.emitTuples();
+    testMeta.fileSplitterInput.endWindow();
+
+    Assert.assertEquals("Blocks", 10, testMeta.blockMetadataSink.collectedTuples.size());
+
+    for (int window = 2; window < 8; window++) {
+      testMeta.fileSplitterInput.beginWindow(window);
+      testMeta.fileSplitterInput.emitTuples();
+      testMeta.fileSplitterInput.endWindow();
+    }
+
+    Assert.assertEquals("Files", 12, testMeta.fileMetadataSink.collectedTuples.size());
+    Assert.assertEquals("Blocks", noOfBlocks, testMeta.blockMetadataSink.collectedTuples.size());
+  }
+
+  @Test
+  public void testIdempotencyWithBlocksThreshold() throws InterruptedException
+  {
+    IdempotentStorageManager.FSIdempotentStorageManager fsIdempotentStorageManager = new IdempotentStorageManager.FSIdempotentStorageManager();
+    testMeta.fileSplitterInput.setIdempotentStorageManager(fsIdempotentStorageManager);
+    testMeta.fileSplitterInput.setBlocksThreshold(10);
+    testMeta.fileSplitterInput.getScanner().setScanIntervalMillis(500);
+    testMeta.fileSplitterInput.setup(testMeta.context);
+
+    testBlocksThreshold();
+    testMeta.fileMetadataSink.clear();
+    testMeta.blockMetadataSink.clear();
+
+    testMeta.fileSplitterInput.setup(testMeta.context);
+    for (int i = 1; i < 8; i++) {
+      testMeta.fileSplitterInput.beginWindow(i);
+    }
+    Assert.assertEquals("Files", 12, testMeta.fileMetadataSink.collectedTuples.size());
+    Assert.assertEquals("Blocks", 62, testMeta.blockMetadataSink.collectedTuples.size());
+  }
+
+  @Test
+  public void testFirstWindowAfterRecovery() throws IOException, InterruptedException
+  {
+    testIdempotencyWithBlocksThreshold();
+    Thread.sleep(1000);
+    HashSet<String> lines = Sets.newHashSet();
+    for (int line = 2; line < 4; line++) {
+      lines.add("f13" + "l" + line);
+    }
+    File f13 = new File(testMeta.dataDirectory, "file13" + ".txt");
+
+    FileUtils.writeLines(f13, lines, true);
+
+    testMeta.fileMetadataSink.clear();
+    testMeta.blockMetadataSink.clear();
+
+    testMeta.fileSplitterInput.beginWindow(8);
+    testMeta.scanner.semaphore.acquire();
+    testMeta.fileSplitterInput.emitTuples();
+    testMeta.fileSplitterInput.endWindow();
+
+    Assert.assertEquals("Files", 1, testMeta.fileMetadataSink.collectedTuples.size());
+    Assert.assertEquals("Blocks", 6, testMeta.blockMetadataSink.collectedTuples.size());
+  }
+
+  @Test
+  public void testRecoveryOfPartialFile() throws InterruptedException
+  {
+    IdempotentStorageManager.FSIdempotentStorageManager fsIdempotentStorageManager = new IdempotentStorageManager.FSIdempotentStorageManager();
+    testMeta.fileSplitterInput.setIdempotentStorageManager(fsIdempotentStorageManager);
+    testMeta.fileSplitterInput.setBlockSize(2L);
+    testMeta.fileSplitterInput.setBlocksThreshold(2);
+    testMeta.fileSplitterInput.getScanner().setScanIntervalMillis(500);
+
+    Kryo kryo = new Kryo();
+    ByteArrayOutputStream bos = new ByteArrayOutputStream();
+    Output loutput = new Output(bos);
+    kryo.writeObject(loutput, testMeta.fileSplitterInput);
+    loutput.close();
+
+    testMeta.fileSplitterInput.setup(testMeta.context);
+
+    testMeta.fileSplitterInput.beginWindow(1);
+
+    testMeta.scanner.semaphore.acquire();
+    testMeta.fileSplitterInput.emitTuples();
+    testMeta.fileSplitterInput.endWindow();
+
+    //file0.txt has just 5 blocks. Since blocks threshold is 2, only 2 are emitted.
+    Assert.assertEquals("Files", 1, testMeta.fileMetadataSink.collectedTuples.size());
+    Assert.assertEquals("Blocks", 2, testMeta.blockMetadataSink.collectedTuples.size());
+
+    testMeta.fileMetadataSink.clear();
+    testMeta.blockMetadataSink.clear();
+
+    testMeta.fileSplitterInput.teardown();
+
+    //there was a failure and the operator was re-deployed
+    Input lInput = new Input(bos.toByteArray());
+    testMeta.fileSplitterInput = kryo.readObject(lInput, testMeta.fileSplitterInput.getClass());
+    lInput.close();
+    TestUtils.setSink(testMeta.fileSplitterInput.blocksMetadataOutput, testMeta.blockMetadataSink);
+    TestUtils.setSink(testMeta.fileSplitterInput.filesMetadataOutput, testMeta.fileMetadataSink);
+
+    testMeta.fileSplitterInput.setup(testMeta.context);
+    testMeta.fileSplitterInput.beginWindow(1);
+
+    Assert.assertEquals("Recovered Files", 1, testMeta.fileMetadataSink.collectedTuples.size());
+    Assert.assertEquals("Recovered Blocks", 2, testMeta.blockMetadataSink.collectedTuples.size());
+
+    testMeta.fileSplitterInput.beginWindow(2);
+    testMeta.fileSplitterInput.emitTuples();
+    testMeta.fileSplitterInput.endWindow();
+
+    Assert.assertEquals("Blocks", 4, testMeta.blockMetadataSink.collectedTuples.size());
+
+    String file1 = testMeta.fileMetadataSink.collectedTuples.get(0).getFileName();
+
+    testMeta.fileMetadataSink.clear();
+    testMeta.blockMetadataSink.clear();
+
+    testMeta.fileSplitterInput.beginWindow(3);
+    ((MockScanner)testMeta.fileSplitterInput.getScanner()).semaphore.acquire();
+    testMeta.fileSplitterInput.emitTuples();
+    testMeta.fileSplitterInput.endWindow();
+
+    Assert.assertEquals("New file", 1, testMeta.fileMetadataSink.collectedTuples.size());
+    Assert.assertEquals("Blocks", 2, testMeta.blockMetadataSink.collectedTuples.size());
+
+    String file2 = testMeta.fileMetadataSink.collectedTuples.get(0).getFileName();
+
+    Assert.assertTrue("Block file name 0", testMeta.blockMetadataSink.collectedTuples.get(0).getFilePath().endsWith(file1));
+    Assert.assertTrue("Block file name 1", testMeta.blockMetadataSink.collectedTuples.get(1).getFilePath().endsWith(file2));
+  }
+
+  @Test
+  public void testRecursive() throws InterruptedException, IOException
+  {
+    testMeta.fileSplitterInput.getScanner().regex = null;
+    testFileMetadata();
+    testMeta.fileMetadataSink.clear();
+    testMeta.blockMetadataSink.clear();
+
+    Thread.sleep(1000);
+    //added a new relativeFilePath
+    File f13 = new File(testMeta.dataDirectory + "/child", "file13" + ".txt");
+    HashSet<String> lines = Sets.newHashSet();
+    for (int line = 0; line < 2; line++) {
+      lines.add("f13" + "l" + line);
+    }
+    FileUtils.write(f13, StringUtils.join(lines, '\n'));
+
+    //window 2
+    testMeta.fileSplitterInput.beginWindow(2);
+    testMeta.scanner.semaphore.acquire();
+    testMeta.fileSplitterInput.emitTuples();
+    testMeta.fileSplitterInput.endWindow();
+
+    Assert.assertEquals("window 2: files", 2, testMeta.fileMetadataSink.collectedTuples.size());
+    Assert.assertEquals("window 2: blocks", 1, testMeta.blockMetadataSink.collectedTuples.size());
+  }
+
+  @Test
+  public void testSingleFile() throws InterruptedException, IOException
+  {
+    testMeta.fileSplitterInput.teardown();
+    testMeta.fileSplitterInput.setScanner(new MockScanner());
+    testMeta.fileSplitterInput.getScanner().regex = null;
+    testMeta.fileSplitterInput.getScanner().setFiles(testMeta.dataDirectory + "/file1.txt");
+
+    testMeta.fileSplitterInput.setup(testMeta.context);
+    testMeta.fileSplitterInput.beginWindow(1);
+    ((MockScanner)testMeta.fileSplitterInput.getScanner()).semaphore.acquire();
+
+    testMeta.fileSplitterInput.emitTuples();
+    testMeta.fileSplitterInput.endWindow();
+    Assert.assertEquals("File metadata count", 1, testMeta.fileMetadataSink.collectedTuples.size());
+    Assert.assertEquals("File metadata", new File(testMeta.dataDirectory + "/file1.txt").getAbsolutePath(),
+      testMeta.fileMetadataSink.collectedTuples.get(0).getFilePath());
+  }
+
+  private static class MockScanner extends FileSplitterInput.TimeBasedDirectoryScanner
+  {
+    transient Semaphore semaphore;
+
+    private MockScanner()
+    {
+      super();
+      this.semaphore = new Semaphore(0);
+    }
+
+    @Override
+    protected void scanComplete()
+    {
+      super.scanComplete();
+      if (discoveredFiles.size() > 0 && discoveredFiles.getLast().isLastFileOfScan()) {
+        semaphore.release();
+        LOG.debug("discovered {}", discoveredFiles.size());
+      }
+
+    }
+  }
+
+  private static final Logger LOG = LoggerFactory.getLogger(FileSplitterInputTest.class);
+}


[4/4] incubator-apex-malhar git commit: Merge branch 'feature-splitter' of github.com:chandnisingh/incubator-apex-malhar into devel-3

Posted by pr...@apache.org.
Merge branch 'feature-splitter' of github.com:chandnisingh/incubator-apex-malhar into devel-3


Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/commit/d710af9b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/tree/d710af9b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/diff/d710af9b

Branch: refs/heads/devel-3
Commit: d710af9b1a5a84de8e179c783bb96c5405af95de
Parents: ffc25e5 a3f1b3c
Author: Pramod Immaneni <pr...@datatorrent.com>
Authored: Wed Sep 23 22:50:55 2015 -0700
Committer: Pramod Immaneni <pr...@datatorrent.com>
Committed: Wed Sep 23 22:50:55 2015 -0700

----------------------------------------------------------------------
 .../datatorrent/lib/io/block/BlockMetadata.java |  78 ++-
 .../lib/io/fs/AbstractFileSplitter.java         | 539 +++++++++++++++++
 .../com/datatorrent/lib/io/fs/FileSplitter.java |   6 +-
 .../datatorrent/lib/io/fs/FileSplitterBase.java | 142 +++++
 .../lib/io/fs/FileSplitterInput.java            | 598 +++++++++++++++++++
 .../lib/io/fs/FileSplitterBaseTest.java         | 264 ++++++++
 .../lib/io/fs/FileSplitterInputTest.java        | 484 +++++++++++++++
 7 files changed, 2098 insertions(+), 13 deletions(-)
----------------------------------------------------------------------



[3/4] incubator-apex-malhar git commit: Removed delimiter

Posted by pr...@apache.org.
Removed delimiter


Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/commit/a3f1b3ca
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/tree/a3f1b3ca
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/diff/a3f1b3ca

Branch: refs/heads/devel-3
Commit: a3f1b3cac46e1821f0fcbdb0adc276a56cb447ef
Parents: f99696a
Author: Chandni Singh <cs...@apache.org>
Authored: Wed Sep 23 18:18:26 2015 -0700
Committer: Chandni Singh <cs...@apache.org>
Committed: Wed Sep 23 18:18:26 2015 -0700

----------------------------------------------------------------------
 .../java/com/datatorrent/lib/io/fs/FileSplitterInput.java     | 7 -------
 1 file changed, 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/a3f1b3ca/library/src/main/java/com/datatorrent/lib/io/fs/FileSplitterInput.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/io/fs/FileSplitterInput.java b/library/src/main/java/com/datatorrent/lib/io/fs/FileSplitterInput.java
index 2560191..92cb97a 100644
--- a/library/src/main/java/com/datatorrent/lib/io/fs/FileSplitterInput.java
+++ b/library/src/main/java/com/datatorrent/lib/io/fs/FileSplitterInput.java
@@ -176,9 +176,6 @@ public class FileSplitterInput extends AbstractFileSplitter implements InputOper
   protected boolean processFileInfo(FileInfo fileInfo)
   {
     ScannedFileInfo scannedFileInfo = (ScannedFileInfo)fileInfo;
-    if (scannedFileInfo ==  TimeBasedDirectoryScanner.DELIMITER) {
-      return false;
-    }
     currentWindowRecoveryState.add(scannedFileInfo);
     updateReferenceTimes(scannedFileInfo);
     return super.processFileInfo(fileInfo);
@@ -258,7 +255,6 @@ public class FileSplitterInput extends AbstractFileSplitter implements InputOper
   public static class TimeBasedDirectoryScanner implements Runnable, Component<Context.OperatorContext>
   {
     private static long DEF_SCAN_INTERVAL_MILLIS = 5000;
-    private static ScannedFileInfo DELIMITER = new ScannedFileInfo();
 
     private boolean recursive;
 
@@ -368,9 +364,6 @@ public class FileSplitterInput extends AbstractFileSplitter implements InputOper
     protected void scanIterationComplete()
     {
       LOG.debug("scan complete {} {}", lastScanMillis, numDiscoveredPerIteration);
-      if (numDiscoveredPerIteration > 0) {
-        discoveredFiles.add(DELIMITER);
-      }
       lastScanMillis = System.currentTimeMillis();
     }
 


[2/4] incubator-apex-malhar git commit: discoveredFiles getting empty between scan and scan iteration completion

Posted by pr...@apache.org.
discoveredFiles getting empty between scan and scan iteration completion


Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/commit/f99696af
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/tree/f99696af
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/diff/f99696af

Branch: refs/heads/devel-3
Commit: f99696aff2e46cd50d3060480d4d3018ac65e3e1
Parents: ea202da
Author: Chandni Singh <cs...@apache.org>
Authored: Tue Sep 22 23:01:36 2015 -0700
Committer: Chandni Singh <cs...@apache.org>
Committed: Wed Sep 23 17:48:15 2015 -0700

----------------------------------------------------------------------
 .../lib/io/fs/FileSplitterInput.java            | 43 ++++++++++++--------
 .../lib/io/fs/FileSplitterInputTest.java        |  8 ++--
 2 files changed, 30 insertions(+), 21 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/f99696af/library/src/main/java/com/datatorrent/lib/io/fs/FileSplitterInput.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/io/fs/FileSplitterInput.java b/library/src/main/java/com/datatorrent/lib/io/fs/FileSplitterInput.java
index a381be5..2560191 100644
--- a/library/src/main/java/com/datatorrent/lib/io/fs/FileSplitterInput.java
+++ b/library/src/main/java/com/datatorrent/lib/io/fs/FileSplitterInput.java
@@ -176,9 +176,12 @@ public class FileSplitterInput extends AbstractFileSplitter implements InputOper
   protected boolean processFileInfo(FileInfo fileInfo)
   {
     ScannedFileInfo scannedFileInfo = (ScannedFileInfo)fileInfo;
+    if (scannedFileInfo ==  TimeBasedDirectoryScanner.DELIMITER) {
+      return false;
+    }
     currentWindowRecoveryState.add(scannedFileInfo);
     updateReferenceTimes(scannedFileInfo);
-    return super.processFileInfo(fileInfo) && !scannedFileInfo.lastFileOfScan;
+    return super.processFileInfo(fileInfo);
   }
 
   protected void updateReferenceTimes(ScannedFileInfo fileInfo)
@@ -255,6 +258,7 @@ public class FileSplitterInput extends AbstractFileSplitter implements InputOper
   public static class TimeBasedDirectoryScanner implements Runnable, Component<Context.OperatorContext>
   {
     private static long DEF_SCAN_INTERVAL_MILLIS = 5000;
+    private static ScannedFileInfo DELIMITER = new ScannedFileInfo();
 
     private boolean recursive;
 
@@ -281,6 +285,7 @@ public class FileSplitterInput extends AbstractFileSplitter implements InputOper
     protected transient Map<String, Long> referenceTimes;
 
     private transient ScannedFileInfo lastScannedInfo;
+    private transient int numDiscoveredPerIteration;
 
     public TimeBasedDirectoryScanner()
     {
@@ -339,10 +344,12 @@ public class FileSplitterInput extends AbstractFileSplitter implements InputOper
           if ((trigger || (System.currentTimeMillis() - scanIntervalMillis >= lastScanMillis)) &&
             (lastScannedInfo == null || referenceTimes.get(lastScannedInfo.getFilePath()) != null)) {
             trigger = false;
+            lastScannedInfo = null;
+            numDiscoveredPerIteration = 0;
             for (String afile : files) {
               scan(new Path(afile), null);
             }
-            scanComplete();
+            scanIterationComplete();
           } else {
             Thread.sleep(sleepMillis);
           }
@@ -358,13 +365,11 @@ public class FileSplitterInput extends AbstractFileSplitter implements InputOper
     /**
      * Operations that need to be done once a scan is complete.
      */
-    protected void scanComplete()
+    protected void scanIterationComplete()
     {
-      LOG.debug("scan complete {}", lastScanMillis);
-      ScannedFileInfo fileInfo = discoveredFiles.peekLast();
-      if (fileInfo != null) {
-        fileInfo.lastFileOfScan = true;
-        lastScannedInfo = fileInfo;
+      LOG.debug("scan complete {} {}", lastScanMillis, numDiscoveredPerIteration);
+      if (numDiscoveredPerIteration > 0) {
+        discoveredFiles.add(DELIMITER);
       }
       lastScanMillis = System.currentTimeMillis();
     }
@@ -398,7 +403,7 @@ public class FileSplitterInput extends AbstractFileSplitter implements InputOper
           }
           if (acceptFile(childPathStr)) {
             LOG.debug("found {}", childPathStr);
-            discoveredFiles.add(info);
+            processDiscoveredFile(info);
           } else {
             // don't look at it again
             ignoredFiles.add(childPathStr);
@@ -411,6 +416,13 @@ public class FileSplitterInput extends AbstractFileSplitter implements InputOper
       }
     }
 
+    protected void processDiscoveredFile(ScannedFileInfo info)
+    {
+      numDiscoveredPerIteration++;
+      lastScannedInfo = info;
+      discoveredFiles.add(info);
+    }
+
     protected ScannedFileInfo createScannedFileInfo(Path parentPath, FileStatus parentStatus, Path childPath, @SuppressWarnings("UnusedParameters") FileStatus childStatus, Path rootPath)
     {
       ScannedFileInfo info;
@@ -462,6 +474,11 @@ public class FileSplitterInput extends AbstractFileSplitter implements InputOper
       return discoveredFiles.poll();
     }
 
+    protected int getNumDiscoveredPerIteration()
+    {
+      return numDiscoveredPerIteration;
+    }
+
     /**
      * Gets the regular expression for file names to split.
      *
@@ -570,9 +587,8 @@ public class FileSplitterInput extends AbstractFileSplitter implements InputOper
   public static class ScannedFileInfo extends AbstractFileSplitter.FileInfo
   {
     protected final long modifiedTime;
-    private transient boolean lastFileOfScan;
 
-    private ScannedFileInfo()
+    protected ScannedFileInfo()
     {
       super();
       modifiedTime = -1;
@@ -583,11 +599,6 @@ public class FileSplitterInput extends AbstractFileSplitter implements InputOper
       super(directoryPath, relativeFilePath);
       this.modifiedTime = modifiedTime;
     }
-
-    protected boolean isLastFileOfScan()
-    {
-      return lastFileOfScan;
-    }
   }
 
   private static final Logger LOG = LoggerFactory.getLogger(FileSplitterInput.class);

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/f99696af/library/src/test/java/com/datatorrent/lib/io/fs/FileSplitterInputTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/com/datatorrent/lib/io/fs/FileSplitterInputTest.java b/library/src/test/java/com/datatorrent/lib/io/fs/FileSplitterInputTest.java
index 8dfea7a..1e2a25c 100644
--- a/library/src/test/java/com/datatorrent/lib/io/fs/FileSplitterInputTest.java
+++ b/library/src/test/java/com/datatorrent/lib/io/fs/FileSplitterInputTest.java
@@ -471,14 +471,12 @@ public class FileSplitterInputTest
     }
 
     @Override
-    protected void scanComplete()
+    protected void scanIterationComplete()
     {
-      super.scanComplete();
-      if (discoveredFiles.size() > 0 && discoveredFiles.getLast().isLastFileOfScan()) {
+      if (getNumDiscoveredPerIteration() > 0) {
         semaphore.release();
-        LOG.debug("discovered {}", discoveredFiles.size());
       }
-
+      super.scanIterationComplete();
     }
   }