You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by ch...@apache.org on 2016/12/22 10:30:25 UTC

apex-malhar git commit: APEXMALHAR-2303 Added S3 Record Reader module

Repository: apex-malhar
Updated Branches:
  refs/heads/master 1816f78fa -> 0500e0ea4


APEXMALHAR-2303 Added S3 Record Reader module


Project: http://git-wip-us.apache.org/repos/asf/apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/apex-malhar/commit/0500e0ea
Tree: http://git-wip-us.apache.org/repos/asf/apex-malhar/tree/0500e0ea
Diff: http://git-wip-us.apache.org/repos/asf/apex-malhar/diff/0500e0ea

Branch: refs/heads/master
Commit: 0500e0ea4ef335fd653c1eb9d9cebdfb8281faea
Parents: 1816f78
Author: Ajay <aj...@gmail.com>
Authored: Thu Oct 27 18:27:28 2016 +0530
Committer: ajaygit158 <aj...@gmail.com>
Committed: Thu Dec 22 13:01:48 2016 +0530

----------------------------------------------------------------------
 .../datatorrent/lib/io/block/BlockMetadata.java |  35 ++
 .../datatorrent/lib/io/block/ReaderContext.java | 116 +++-
 .../lib/io/fs/AbstractFileSplitter.java         |   2 +-
 .../com/datatorrent/lib/io/fs/FileSplitter.java |   3 +-
 .../datatorrent/lib/io/fs/S3BlockReader.java    |   6 +-
 .../apex/malhar/lib/fs/FSRecordReader.java      |  43 +-
 .../malhar/lib/fs/FSRecordReaderModule.java     |  20 +-
 .../apex/malhar/lib/fs/s3/S3RecordReader.java   | 577 +++++++++++++++++++
 .../malhar/lib/fs/s3/S3RecordReaderModule.java  | 137 +++++
 .../lib/io/block/FSLineReaderTest.java          |  61 +-
 .../apex/malhar/lib/fs/FSRecordReaderTest.java  |   9 +-
 .../lib/fs/s3/S3DelimitedRecordReaderTest.java  | 285 +++++++++
 .../lib/fs/s3/S3FixedWidthRecordReaderTest.java | 292 ++++++++++
 .../lib/fs/s3/S3RecordReaderMockTest.java       | 269 +++++++++
 .../lib/fs/s3/S3RecordReaderModuleAppTest.java  | 221 +++++++
 15 files changed, 2037 insertions(+), 39 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/0500e0ea/library/src/main/java/com/datatorrent/lib/io/block/BlockMetadata.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/io/block/BlockMetadata.java b/library/src/main/java/com/datatorrent/lib/io/block/BlockMetadata.java
index 25b3a75..26d19a4 100644
--- a/library/src/main/java/com/datatorrent/lib/io/block/BlockMetadata.java
+++ b/library/src/main/java/com/datatorrent/lib/io/block/BlockMetadata.java
@@ -202,6 +202,7 @@ public interface BlockMetadata
   class FileBlockMetadata extends AbstractBlockMetadata
   {
     private final String filePath;
+    private long fileLength;
 
     protected FileBlockMetadata()
     {
@@ -216,16 +217,50 @@ public interface BlockMetadata
       this.filePath = filePath;
     }
 
+    public FileBlockMetadata(String filePath, long blockId, long offset, long length, boolean isLastBlock,
+        long previousBlockId, long fileLength)
+    {
+      super(blockId, offset, length, isLastBlock, previousBlockId);
+      this.filePath = filePath;
+      this.fileLength = fileLength;
+    }
+
     public FileBlockMetadata(String filePath)
     {
       this.filePath = filePath;
     }
 
+    public FileBlockMetadata(String filePath, long fileLength)
+    {
+      this.filePath = filePath;
+      this.fileLength = fileLength;
+    }
+
     public String getFilePath()
     {
       return filePath;
     }
 
+    /**
+     * Returns the length of the file to which this block belongs
+     *
+     * @return length of the file to which this block belongs
+     */
+    public long getFileLength()
+    {
+      return fileLength;
+    }
+
+    /**
+     * Set the length of the file to which this block belongs
+     *
+     * @param fileLength
+     */
+    public void setFileLength(long fileLength)
+    {
+      this.fileLength = fileLength;
+    }
+
     public FileBlockMetadata newInstance(@NotNull String filePath)
     {
       Preconditions.checkNotNull(filePath);

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/0500e0ea/library/src/main/java/com/datatorrent/lib/io/block/ReaderContext.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/io/block/ReaderContext.java b/library/src/main/java/com/datatorrent/lib/io/block/ReaderContext.java
index 6fe47a2..d9e8b2e 100644
--- a/library/src/main/java/com/datatorrent/lib/io/block/ReaderContext.java
+++ b/library/src/main/java/com/datatorrent/lib/io/block/ReaderContext.java
@@ -146,19 +146,26 @@ public interface ReaderContext<STREAM extends InputStream & PositionedReadable>
   {
 
     protected int bufferSize;
+    /**
+    * overflowBufferSize is the number of bytes fetched when a record overflows
+    * to consecutive block
+    */
+    protected int overflowBufferSize;
 
     private final transient ByteArrayOutputStream lineBuilder;
     private final transient ByteArrayOutputStream emptyBuilder;
     private final transient ByteArrayOutputStream tmpBuilder;
 
-    private transient byte[] buffer;
+    protected transient byte[] buffer;
     private transient String bufferStr;
     private transient int posInStr;
+    private transient boolean overflowBlockRead;
 
     public LineReaderContext()
     {
       super();
       bufferSize = 8192;
+      overflowBufferSize = 8192;
       lineBuilder = new ByteArrayOutputStream();
       emptyBuilder = new ByteArrayOutputStream();
       tmpBuilder = new ByteArrayOutputStream();
@@ -167,10 +174,54 @@ public interface ReaderContext<STREAM extends InputStream & PositionedReadable>
     @Override
     public void initialize(STREAM stream, BlockMetadata blockMetadata, boolean consecutiveBlock)
     {
+      overflowBlockRead = false;
+      posInStr = 0;
+      offset = blockMetadata.getOffset();
+      super.initialize(stream, blockMetadata, consecutiveBlock);
+    }
+
+    /**
+     * Reads bytes from the stream starting from the offset into the buffer
+     *
+     * @param bytesFromCurrentOffset
+     *          bytes read till now from current block
+     * @param bytesToFetch
+     *          the number of bytes to be read from stream
+     * @return the number of bytes actually read, -1 if 0 bytes read
+     * @throws IOException
+     */
+    protected int readData(final long bytesFromCurrentOffset, final int bytesToFetch) throws IOException
+    {
       if (buffer == null) {
-        buffer = new byte[bufferSize];
+        buffer = new byte[bytesToFetch];
       }
-      super.initialize(stream, blockMetadata, consecutiveBlock);
+      return stream.read(offset + bytesFromCurrentOffset, buffer, 0, bytesToFetch);
+    }
+
+    /**
+     * @param usedBytesFromOffset
+     *          number of bytes the pointer is ahead of the offset
+     * @return true if end of stream reached, false otherwise
+     */
+    protected boolean checkEndOfStream(final long usedBytesFromOffset)
+    {
+      if (!overflowBlockRead) {
+        return (offset - blockMetadata.getOffset() + usedBytesFromOffset < bufferSize);
+      } else {
+        return (offset - blockMetadata.getOffset() + usedBytesFromOffset < overflowBufferSize);
+      }
+    }
+
+    /**
+     * Gives the number of bytes to be fetched from the stream
+     *
+     * @param overflowBlockRead
+     *          indicates whether we are reading main block or overflow block
+     * @return bytes to be fetched from stream
+     */
+    protected int calculateBytesToFetch()
+    {
+      return (overflowBlockRead ? overflowBufferSize : (bufferSize));
     }
 
     @Override
@@ -186,7 +237,9 @@ public interface ReaderContext<STREAM extends InputStream & PositionedReadable>
       while (!foundEOL) {
         tmpBuilder.reset();
         if (posInStr == 0) {
-          bytesRead = stream.read(offset + usedBytes, buffer, 0, bufferSize);
+          int bytesToFetch = calculateBytesToFetch();
+          overflowBlockRead = true;
+          bytesRead = readData(usedBytes, bytesToFetch);
           if (bytesRead == -1) {
             break;
           }
@@ -220,14 +273,13 @@ public interface ReaderContext<STREAM extends InputStream & PositionedReadable>
           usedBytes += emptyBuilder.toByteArray().length;
         } else {
           //end of stream reached
-          if (bytesRead < bufferSize) {
+          if (checkEndOfStream(usedBytes)) {
             break;
           }
           //read more bytes from the input stream
           posInStr = 0;
         }
       }
-      posInStr = 0;
       //when end of stream is reached then bytesRead is -1
       if (bytesRead == -1) {
         lineBuilder.reset();
@@ -260,6 +312,47 @@ public interface ReaderContext<STREAM extends InputStream & PositionedReadable>
     {
       return this.bufferSize;
     }
+
+    /**
+     * Sets the overflow buffer size of read.
+     *
+     * @param overflowBufferSize
+     *          size of the overflow buffer
+     */
+    public void setOverflowBufferSize(int overflowBufferSize)
+    {
+      this.overflowBufferSize = overflowBufferSize;
+    }
+
+    /**
+     * @param buffer
+     *          the bytes read from the source
+     */
+    protected void setBuffer(byte[] buffer)
+    {
+      this.buffer = buffer;
+    }
+
+    /**
+     * Sets whether to read overflow block during next fetch.
+     *
+     * @param overflowBlockRead
+     *          boolean indicating whether to read overflow block during next read
+     */
+    public void setOverflowBlockRead(boolean overflowBlockRead)
+    {
+      this.overflowBlockRead = overflowBlockRead;
+    }
+
+    /**
+     * Returns a boolean indicating whether to read overflow block during next read
+     *
+     * @returnoverflowBlockRead
+     */
+    protected boolean isOverflowBlockRead()
+    {
+      return overflowBlockRead;
+    }
   }
 
   /**
@@ -280,7 +373,7 @@ public interface ReaderContext<STREAM extends InputStream & PositionedReadable>
       super.initialize(stream, blockMetadata, consecutiveBlock);
       //ignore first entity of  all the blocks except the first one because those bytes
       //were used during the parsing of the previous block.
-      if (!consecutiveBlock && blockMetadata.getOffset() != 0) {
+      if (blockMetadata.getPreviousBlockId() != -1 && blockMetadata.getOffset() != 0) {
         try {
           Entity entity = readEntity();
           offset += entity.usedBytes;
@@ -300,6 +393,15 @@ public interface ReaderContext<STREAM extends InputStream & PositionedReadable>
       }
       return null;
     }
+
+    @Override
+    protected int calculateBytesToFetch()
+    {
+      /*
+       * With readAheadLineReaderContext, we always read at least one overflowBlock. Hence, fetch it in advance
+       */
+      return (this.isOverflowBlockRead() ? overflowBufferSize : (bufferSize + overflowBufferSize));
+    }
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/0500e0ea/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileSplitter.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileSplitter.java b/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileSplitter.java
index c002c18..74addf7 100644
--- a/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileSplitter.java
+++ b/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileSplitter.java
@@ -178,7 +178,7 @@ public abstract class AbstractFileSplitter extends BaseOperator
    */
   protected BlockMetadata.FileBlockMetadata createBlockMetadata(FileMetadata fileMetadata)
   {
-    return new BlockMetadata.FileBlockMetadata(fileMetadata.getFilePath());
+    return new BlockMetadata.FileBlockMetadata(fileMetadata.getFilePath(), fileMetadata.getFileLength());
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/0500e0ea/library/src/main/java/com/datatorrent/lib/io/fs/FileSplitter.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/io/fs/FileSplitter.java b/library/src/main/java/com/datatorrent/lib/io/fs/FileSplitter.java
index b9594b3..0949aba 100644
--- a/library/src/main/java/com/datatorrent/lib/io/fs/FileSplitter.java
+++ b/library/src/main/java/com/datatorrent/lib/io/fs/FileSplitter.java
@@ -293,7 +293,8 @@ public class FileSplitter implements InputOperator, Operator.CheckpointListener,
       FileMetadata fileMetadata, boolean isLast)
   {
     return new FileBlockMetadata(fileMetadata.getFilePath(), fileMetadata.getBlockIds()[blockNumber - 1], pos,
-      lengthOfFileInBlock, isLast, blockNumber == 1 ? -1 : fileMetadata.getBlockIds()[blockNumber - 2]);
+        lengthOfFileInBlock, isLast, blockNumber == 1 ? -1 : fileMetadata.getBlockIds()[blockNumber - 2],
+        fileMetadata.getFileLength());
 
   }
 

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/0500e0ea/library/src/main/java/com/datatorrent/lib/io/fs/S3BlockReader.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/io/fs/S3BlockReader.java b/library/src/main/java/com/datatorrent/lib/io/fs/S3BlockReader.java
index 42231bb..e88191c 100644
--- a/library/src/main/java/com/datatorrent/lib/io/fs/S3BlockReader.java
+++ b/library/src/main/java/com/datatorrent/lib/io/fs/S3BlockReader.java
@@ -69,7 +69,7 @@ public class S3BlockReader extends FSSliceReader
    * @param s3uri s3 uri
    * @return name of the bucket
    */
-  protected static String extractBucket(String s3uri)
+  public static String extractBucket(String s3uri)
   {
     return s3uri.substring(s3uri.indexOf('@') + 1, s3uri.indexOf("/", s3uri.indexOf('@')));
   }
@@ -79,7 +79,7 @@ public class S3BlockReader extends FSSliceReader
    * @param s3uri given s3 uri
    * @return the accessKey
    */
-  protected static String extractAccessKey(String s3uri)
+  public static String extractAccessKey(String s3uri)
   {
     return s3uri.substring(s3uri.indexOf("://") + 3, s3uri.indexOf(':', s3uri.indexOf("://") + 3));
   }
@@ -89,7 +89,7 @@ public class S3BlockReader extends FSSliceReader
    * @param s3uri given s3uri
    * @return the secretAccessKey
    */
-  protected static String extractSecretAccessKey(String s3uri)
+  public static String extractSecretAccessKey(String s3uri)
   {
     return s3uri.substring(s3uri.indexOf(':', s3uri.indexOf("://") + 1) + 1, s3uri.indexOf('@'));
   }

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/0500e0ea/library/src/main/java/org/apache/apex/malhar/lib/fs/FSRecordReader.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/fs/FSRecordReader.java b/library/src/main/java/org/apache/apex/malhar/lib/fs/FSRecordReader.java
index 268c51b..2d82ac4 100644
--- a/library/src/main/java/org/apache/apex/malhar/lib/fs/FSRecordReader.java
+++ b/library/src/main/java/org/apache/apex/malhar/lib/fs/FSRecordReader.java
@@ -20,6 +20,9 @@
 package org.apache.apex.malhar.lib.fs;
 
 import java.io.IOException;
+
+import javax.validation.constraints.Pattern;
+
 import org.apache.hadoop.fs.FSDataInputStream;
 
 import com.datatorrent.api.Context.OperatorContext;
@@ -72,15 +75,36 @@ public class FSRecordReader extends FSSliceReader
   {
     super.setup(context);
     if (mode == RECORD_READER_MODE.FIXED_WIDTH_RECORD) {
-      ReaderContext.FixedBytesReaderContext<FSDataInputStream> fixedBytesReaderContext = new ReaderContext.FixedBytesReaderContext<FSDataInputStream>();
-      fixedBytesReaderContext.setLength(recordLength);
-      readerContext = fixedBytesReaderContext;
+      readerContext = createFixedWidthReaderContext();
     } else {
-      readerContext = new ReaderContext.ReadAheadLineReaderContext<FSDataInputStream>();
+      readerContext = createDelimitedReaderContext();
     }
   }
 
   /**
+   * Creates a recordReaderContext for FixedWidthRecords
+   *
+   * @return FixedBytesReaderContext
+   */
+  protected ReaderContext<FSDataInputStream> createFixedWidthReaderContext()
+  {
+    ReaderContext.FixedBytesReaderContext<FSDataInputStream> fixedBytesReaderContext = new ReaderContext.FixedBytesReaderContext<FSDataInputStream>();
+    fixedBytesReaderContext.setLength(recordLength);
+    return fixedBytesReaderContext;
+
+  }
+
+  /**
+   * Creates a recordReaderContext for Delimited Records
+   *
+   * @return DelimitedRecordReaderContext
+   */
+  protected ReaderContext<FSDataInputStream> createDelimitedReaderContext()
+  {
+    return new ReaderContext.ReadAheadLineReaderContext<FSDataInputStream>();
+  }
+
+  /**
    * Read the block data and emit records based on reader context
    *
    * @param blockMetadata
@@ -105,14 +129,15 @@ public class FSRecordReader extends FSSliceReader
   }
 
   /**
-   * Criteria for record split
+   * Criteria for record split : FIXED_WIDTH_RECORD or DELIMITED_RECORD
    *
    * @param mode
    *          Mode
    */
-  public void setMode(RECORD_READER_MODE mode)
+  public void setMode(
+      @Pattern(regexp = "FIXED_WIDTH_RECORD|DELIMITED_RECORD", flags = Pattern.Flag.CASE_INSENSITIVE) String mode)
   {
-    this.mode = mode;
+    this.mode = RECORD_READER_MODE.valueOf(mode.toUpperCase());
   }
 
   /**
@@ -120,9 +145,9 @@ public class FSRecordReader extends FSSliceReader
    *
    * @return mode
    */
-  public RECORD_READER_MODE getMode()
+  public String getMode()
   {
-    return mode;
+    return mode.toString();
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/0500e0ea/library/src/main/java/org/apache/apex/malhar/lib/fs/FSRecordReaderModule.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/fs/FSRecordReaderModule.java b/library/src/main/java/org/apache/apex/malhar/lib/fs/FSRecordReaderModule.java
index b727248..65f7e5f 100644
--- a/library/src/main/java/org/apache/apex/malhar/lib/fs/FSRecordReaderModule.java
+++ b/library/src/main/java/org/apache/apex/malhar/lib/fs/FSRecordReaderModule.java
@@ -21,6 +21,7 @@ package org.apache.apex.malhar.lib.fs;
 
 import javax.validation.constraints.Min;
 import javax.validation.constraints.NotNull;
+import javax.validation.constraints.Pattern;
 import javax.validation.constraints.Size;
 
 import org.apache.apex.malhar.lib.fs.FSRecordReader.RECORD_READER_MODE;
@@ -82,7 +83,7 @@ public class FSRecordReaderModule implements Module
   /**
    * Criteria for record split
    */
-  private RECORD_READER_MODE mode = RECORD_READER_MODE.DELIMITED_RECORD;
+  private String mode = RECORD_READER_MODE.DELIMITED_RECORD.toString();
 
   /**
    * Length for fixed width record
@@ -370,24 +371,25 @@ public class FSRecordReaderModule implements Module
   }
 
   /**
-   * Criteria for record split
+   * Criteria for record split : FIXED_WIDTH_RECORD or DELIMITED_RECORD
    *
-   * @return mode
+   * @param mode
+   *          Mode
    */
-  public RECORD_READER_MODE getMode()
+  public void setMode(
+      @Pattern(regexp = "FIXED_WIDTH_RECORD|DELIMITED_RECORD", flags = Pattern.Flag.CASE_INSENSITIVE) String mode)
   {
-    return mode;
+    this.mode = mode;
   }
 
   /**
    * Criteria for record split
    *
-   * @param mode
-   *          Mode
+   * @return mode
    */
-  public void setMode(RECORD_READER_MODE mode)
+  public String getMode()
   {
-    this.mode = mode;
+    return mode;
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/0500e0ea/library/src/main/java/org/apache/apex/malhar/lib/fs/s3/S3RecordReader.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/fs/s3/S3RecordReader.java b/library/src/main/java/org/apache/apex/malhar/lib/fs/s3/S3RecordReader.java
new file mode 100644
index 0000000..9de2896
--- /dev/null
+++ b/library/src/main/java/org/apache/apex/malhar/lib/fs/s3/S3RecordReader.java
@@ -0,0 +1,577 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.apex.malhar.lib.fs.s3;
+
+import java.io.IOException;
+import java.util.Arrays;
+
+import org.apache.apex.malhar.lib.fs.FSRecordReader;
+import org.apache.hadoop.classification.InterfaceStability.Evolving;
+import org.apache.hadoop.fs.FSDataInputStream;
+
+import com.amazonaws.auth.BasicAWSCredentials;
+import com.amazonaws.services.s3.AmazonS3;
+import com.amazonaws.services.s3.AmazonS3Client;
+import com.amazonaws.services.s3.model.GetObjectRequest;
+import com.amazonaws.services.s3.model.S3Object;
+import com.amazonaws.services.s3.model.S3ObjectInputStream;
+import com.esotericsoftware.kryo.NotNull;
+import com.google.common.base.Preconditions;
+import com.google.common.io.ByteStreams;
+
+import com.datatorrent.lib.io.block.BlockMetadata;
+import com.datatorrent.lib.io.block.ReaderContext;
+
+/**
+ * This operator can be used for reading records/tuples from S3 in parallel
+ * (without ordering guarantees between tuples). Records can be delimited (e.g.
+ * newline) or fixed width records. Output tuples are byte[].
+ *
+ * Typically, this operator will be connected to output of FileSplitterInput to
+ * read records in parallel.
+ */
+@Evolving
+public class S3RecordReader extends FSRecordReader
+{
+  private String endPoint;
+  @NotNull
+  private String bucketName;
+  @NotNull
+  private String accessKey;
+  @NotNull
+  private String secretAccessKey;
+  private int overflowBufferSize;
+
+  public S3RecordReader()
+  {
+    /*
+     * Set default overflowBufferSize to 1MB
+     */
+    overflowBufferSize = 1024 * 1024;
+  }
+
+  /**
+   * S3 reader doesn't make use of any stream, hence returns a null value
+   *
+   * @param block
+   *          block metadata
+   * @return stream (null object)
+   * @throws IOException
+   */
+  @Override
+  protected FSDataInputStream setupStream(BlockMetadata.FileBlockMetadata block) throws IOException
+  {
+    return null;
+  }
+
+  /**
+   * Returns an instance of S3FixedWidthRecordReaderContext after setting
+   * recordLength and bucketName and initializing s3Client
+   *
+   * @return S3DelimitedRecordReaderContext
+   */
+  @Override
+  protected ReaderContext<FSDataInputStream> createFixedWidthReaderContext()
+  {
+    S3FixedWidthRecordReaderContext fixedBytesReaderContext = new S3FixedWidthRecordReaderContext();
+    fixedBytesReaderContext.setLength(this.getRecordLength());
+    fixedBytesReaderContext.getS3Params().initializeS3Client(accessKey, secretAccessKey, endPoint);
+    fixedBytesReaderContext.getS3Params().setBucketName(bucketName);
+    return fixedBytesReaderContext;
+  }
+
+  /**
+   * Returns an instance of S3DelimitedRecordReaderContext after setting
+   * bucketName and overflowBuffersize and initializing the s3Client
+   *
+   * @return S3DelimitedRecordReaderContext
+   */
+  @Override
+  protected ReaderContext<FSDataInputStream> createDelimitedReaderContext()
+  {
+    S3DelimitedRecordReaderContext delimitedRecordReaderContext = new S3DelimitedRecordReaderContext();
+    delimitedRecordReaderContext.getS3Params().initializeS3Client(accessKey, secretAccessKey, endPoint);
+    delimitedRecordReaderContext.getS3Params().setBucketName(bucketName);
+    delimitedRecordReaderContext.setOverflowBufferSize(overflowBufferSize);
+    return delimitedRecordReaderContext;
+  }
+
+  /**
+   * S3RecordReaderParams is used to hold the common parameters used by the
+   * DelimitedRecordReaderContext and FixedWidthReacordReaderContext for S3
+   */
+  protected static class S3RecordReaderParams
+  {
+    /**
+     * Amazon client used to read bytes from S3
+     */
+    private AmazonS3 s3Client;
+    /**
+     * S3 bucket name
+     */
+    private String bucketName;
+    /**
+     * path of file being processed in bucket
+     */
+    private String filePath;
+    /**
+     * length of the file being processed
+     */
+    private long fileLength;
+
+    /**
+     * Initialize the AmazonS3 client using the accessKey, secretAccessKey, sets
+     * endpoint for the s3Client if provided
+     *
+     * @param accessKey
+     * @param secretAccessKey
+     * @param endPoint
+     */
+    public void initializeS3Client(@javax.validation.constraints.NotNull String accessKey,
+        @javax.validation.constraints.NotNull String secretAccessKey, String endPoint)
+    {
+      Preconditions.checkNotNull(accessKey);
+      Preconditions.checkNotNull(secretAccessKey);
+      s3Client = new AmazonS3Client(new BasicAWSCredentials(accessKey, secretAccessKey));
+      if (endPoint != null) {
+        s3Client.setEndpoint(endPoint);
+      }
+    }
+
+    /**
+     * Set the AmazonS3 service
+     *
+     * @param s3Client
+     *          given s3Client
+     */
+    public void setS3Client(@javax.validation.constraints.NotNull AmazonS3 s3Client)
+    {
+      Preconditions.checkNotNull(s3Client);
+      this.s3Client = s3Client;
+    }
+
+    /**
+     * Returns the AmazonS3 service
+     *
+     * @return s3Client
+     */
+    public AmazonS3 getS3Client()
+    {
+      return s3Client;
+    }
+
+    /**
+     * Set the bucket name
+     *
+     * @param bucketName
+     *          given bucketName
+     */
+    public void setBucketName(@javax.validation.constraints.NotNull String bucketName)
+    {
+      Preconditions.checkNotNull(bucketName);
+      this.bucketName = bucketName;
+    }
+
+    /**
+     * Returns the bucket name
+     *
+     * @return bucketName
+     */
+    public String getBucketName()
+    {
+      return bucketName;
+    }
+
+    /**
+     * Returns the file path
+     *
+     * @return filePath
+     */
+    public String getFilePath()
+    {
+      return filePath;
+    }
+
+    /**
+     * Returns the length of the file to which the block belongs
+     *
+     * @return fileLength
+     */
+    public long getFileLength()
+    {
+      return fileLength;
+    }
+
+    /**
+     * This method reads the blockMetadata input parameter and initializes the
+     * fileBlock and fileLength
+     *
+     * @param blockMetadata
+     */
+    public void initialzeFilepathAndFileLength(BlockMetadata blockMetadata)
+    {
+      if (blockMetadata instanceof BlockMetadata.FileBlockMetadata) {
+        BlockMetadata.FileBlockMetadata fileBlockMetadata = (BlockMetadata.FileBlockMetadata)blockMetadata;
+        fileLength = fileBlockMetadata.getFileLength();
+        filePath = fileBlockMetadata.getFilePath();
+        // File path would be the path after bucket name.
+        // Check if the file path starts with "/"
+        if (filePath.startsWith("/")) {
+          filePath = filePath.substring(1);
+        }
+      }
+    }
+  }
+
+  /**
+   * RecordReaderContext for reading delimited S3 Records.
+   */
+  protected static class S3DelimitedRecordReaderContext
+      extends ReaderContext.ReadAheadLineReaderContext<FSDataInputStream>
+  {
+    /**
+     * S3 parameters
+     */
+    private transient S3RecordReaderParams s3Params;
+
+    public S3DelimitedRecordReaderContext()
+    {
+      s3Params = new S3RecordReaderParams();
+    }
+
+    @Override
+    public void initialize(FSDataInputStream stream, BlockMetadata blockMetadata, boolean consecutiveBlock)
+    {
+      super.initialize(stream, blockMetadata, consecutiveBlock);
+      s3Params.initialzeFilepathAndFileLength(blockMetadata);
+      /*
+       * Initialize the bufferSize and overflowBufferSize
+       */
+      int bufferSize = Long.valueOf(blockMetadata.getLength() - blockMetadata.getOffset()).intValue();
+      this.setBufferSize(bufferSize);
+      if (overflowBufferSize > bufferSize) {
+        this.setOverflowBufferSize(bufferSize);
+      } else {
+        this.setOverflowBufferSize(overflowBufferSize);
+      }
+    }
+
+    /**
+     * S3 block read would be achieved through the AmazonS3 client. Following
+     * are the steps to achieve: (1) Create the objectRequest from bucketName
+     * and filePath. (2) Set the range to the above created objectRequest. (3)
+     * Get the object portion through AmazonS3 client API. (4) Get the object
+     * content from the above object portion.
+     *
+     * @param bytesFromCurrentOffset
+     *          bytes read till now from current offset
+     * @param bytesToFetch
+     *          the number of bytes to be fetched
+     * @return the number of bytes read, -1 if 0 bytes read
+     * @throws IOException
+     */
+
+    @Override
+    protected int readData(final long bytesFromCurrentOffset, final int bytesToFetch) throws IOException
+    {
+      GetObjectRequest rangeObjectRequest = new GetObjectRequest(s3Params.bucketName, s3Params.filePath);
+      rangeObjectRequest.setRange(offset + bytesFromCurrentOffset, offset + bytesFromCurrentOffset + bytesToFetch - 1);
+      S3Object objectPortion = s3Params.s3Client.getObject(rangeObjectRequest);
+      S3ObjectInputStream wrappedStream = objectPortion.getObjectContent();
+      buffer = ByteStreams.toByteArray(wrappedStream);
+      wrappedStream.close();
+      int bufferLength = buffer.length;
+      if (bufferLength <= 0) {
+        return -1;
+      }
+      return bufferLength;
+    }
+
+    @Override
+    protected boolean checkEndOfStream(final long usedBytesFromOffset)
+    {
+      if ((offset + usedBytesFromOffset) >= s3Params.fileLength) {
+        return true;
+      }
+      return false;
+    }
+
+    /**
+     * Returns the S3RecordReaderParams object
+     *
+     * @return s3Params
+     */
+    protected S3RecordReaderParams getS3Params()
+    {
+      return s3Params;
+    }
+  }
+
+  /**
+   * RecordReaderContext for reading fixed width S3 Records.
+   */
+  protected static class S3FixedWidthRecordReaderContext
+      extends ReaderContext.FixedBytesReaderContext<FSDataInputStream>
+  {
+    /**
+     * S3 parameters
+     */
+    private transient S3RecordReaderParams s3Params;
+
+    /**
+     * used to hold data retrieved from S3
+     */
+    protected transient byte[] buffer;
+
+    /**
+     * current offset within the byte[] buffer
+     */
+    private transient int bufferOffset;
+
+    public S3FixedWidthRecordReaderContext()
+    {
+      s3Params = new S3RecordReaderParams();
+    }
+
+    @Override
+    public void initialize(FSDataInputStream stream, BlockMetadata blockMetadata, boolean consecutiveBlock)
+    {
+      super.initialize(stream, blockMetadata, consecutiveBlock);
+      s3Params.initialzeFilepathAndFileLength(blockMetadata);
+      try {
+        int bytesRead = this.getBlockFromS3();
+        if (bytesRead == -1) {
+          return;
+        }
+      } catch (IOException e) {
+        throw new RuntimeException(e);
+      }
+      this.setBufferOffset(0);
+    }
+
+    /**
+     * S3 block read would be achieved through the AmazonS3 client. Following
+     * are the steps to achieve: (1) Create the objectRequest from bucketName
+     * and filePath. (2) Set the range to the above created objectRequest. Set
+     * the range so that it gets aligned with the fixed width records. (3) Get
+     * the object portion through AmazonS3 client API. (4) Get the object
+     * content from the above object portion.
+     */
+    protected int getBlockFromS3() throws IOException
+    {
+      long startOffset = blockMetadata.getOffset()
+          + (this.length - (blockMetadata.getOffset() % this.length)) % this.length;
+      long endOffset = blockMetadata.getLength()
+          + ((this.length - (blockMetadata.getLength() % this.length)) % this.length) - 1;
+      if (endOffset == (startOffset - 1) || startOffset > s3Params.fileLength) {
+        /*
+         * If start and end offset is same, it means no record starts in this block
+         */
+        return -1;
+      }
+      if (endOffset >= s3Params.fileLength) {
+        endOffset = s3Params.fileLength - 1;
+      }
+      offset = startOffset;
+      return readData(startOffset, endOffset);
+    }
+
+    /**
+     * Reads data from S3 starting from startOffset till the endOffset and
+     * returns the number of bytes read
+     *
+     * @param startOffset
+     *          offset from where to read
+     * @param endOffset
+     *          offset till where to read
+     * @return number of bytes read
+     * @throws IOException
+     */
+    protected int readData(long startOffset, long endOffset) throws IOException
+    {
+      GetObjectRequest rangeObjectRequest = new GetObjectRequest(s3Params.bucketName, s3Params.filePath);
+      rangeObjectRequest.setRange(startOffset, endOffset);
+      S3Object objectPortion = s3Params.s3Client.getObject(rangeObjectRequest);
+      S3ObjectInputStream wrappedStream = objectPortion.getObjectContent();
+      buffer = ByteStreams.toByteArray(wrappedStream);
+      wrappedStream.close();
+      return buffer.length;
+    }
+
+    @Override
+    protected ReaderContext.Entity readEntity() throws IOException
+    {
+      entity.clear();
+      /*
+       * In case file length is not a multiple of record length, the last record may not have length = recordLength.
+       * The data to be read from buffer array should be less in this case.
+       */
+      long bufferLength = length;
+      if (offset + length > s3Params.fileLength) {
+        bufferLength = s3Params.fileLength - offset;
+      }
+      byte[] record = Arrays.copyOfRange(buffer, Long.valueOf(bufferOffset).intValue(),
+          Long.valueOf(bufferOffset + bufferLength).intValue());
+      bufferOffset += record.length;
+      entity.setRecord(record);
+      entity.setUsedBytes(record.length);
+      return entity;
+    }
+
+    /**
+     * Sets the offset within the current buffer
+     *
+     * @param bufferOffset
+     *          offset within the current buffer
+     */
+    protected void setBufferOffset(int bufferOffset)
+    {
+      this.bufferOffset = bufferOffset;
+    }
+
+    /**
+     * Sets the S3RecordReaderParams object
+     *
+     * @param s3Params
+     *          S3RecordReaderParams object
+     */
+    protected void setS3Params(S3RecordReaderParams s3Params)
+    {
+      this.s3Params = s3Params;
+    }
+
+    /**
+     * Returns the S3RecordReaderParams object
+     *
+     * @return s3Params
+     */
+    protected S3RecordReaderParams getS3Params()
+    {
+      return s3Params;
+    }
+  }
+
+  /**
+   * Size of bytes to be retrieved when a record overflows
+   *
+   * return overflowBufferSize
+   */
+  public int getOverflowBufferSize()
+  {
+    return overflowBufferSize;
+  }
+
+  /**
+   * Size of bytes to be retrieved when a record overflows
+   *
+   * @param overflowBufferSize
+   */
+  public void setOverflowBufferSize(int overflowBufferSize)
+  {
+    this.overflowBufferSize = overflowBufferSize;
+  }
+
+  /**
+   * Get the S3 bucket name
+   *
+   * @return bucket
+   */
+  public String getBucketName()
+  {
+    return bucketName;
+  }
+
+  /**
+   * Set the bucket name where the file resides
+   *
+   * @param bucketName
+   *          bucket name
+   */
+  public void setBucketName(@javax.validation.constraints.NotNull String bucketName)
+  {
+    Preconditions.checkNotNull(bucketName);
+    this.bucketName = bucketName;
+  }
+
+  /**
+   * Return the access key
+   *
+   * @return the accessKey
+   */
+  public String getAccessKey()
+  {
+    return accessKey;
+  }
+
+  /**
+   * Set the access key
+   *
+   * @param accessKey
+   *          given accessKey
+   */
+  public void setAccessKey(@javax.validation.constraints.NotNull String accessKey)
+  {
+    Preconditions.checkNotNull(accessKey);
+    this.accessKey = accessKey;
+  }
+
+  /**
+   * Return the secretAccessKey
+   *
+   * @return the secretAccessKey
+   */
+  public String getSecretAccessKey()
+  {
+    return secretAccessKey;
+  }
+
+  /**
+   * Set the secretAccessKey
+   *
+   * @param secretAccessKey
+   *          secretAccessKey
+   */
+  public void setSecretAccessKey(@javax.validation.constraints.NotNull String secretAccessKey)
+  {
+    Preconditions.checkNotNull(secretAccessKey);
+    this.secretAccessKey = secretAccessKey;
+  }
+
+  /**
+   * S3 endpoint
+   *
+   * @param endPoint
+   *          endpoint to be used for S3
+   */
+  public void setEndPoint(String endPoint)
+  {
+    this.endPoint = endPoint;
+  }
+
+  /**
+   * S3 endpoint
+   *
+   * @return s3 endpoint
+   */
+  public String getEndPoint()
+  {
+    return endPoint;
+  }
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/0500e0ea/library/src/main/java/org/apache/apex/malhar/lib/fs/s3/S3RecordReaderModule.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/fs/s3/S3RecordReaderModule.java b/library/src/main/java/org/apache/apex/malhar/lib/fs/s3/S3RecordReaderModule.java
new file mode 100644
index 0000000..884073c
--- /dev/null
+++ b/library/src/main/java/org/apache/apex/malhar/lib/fs/s3/S3RecordReaderModule.java
@@ -0,0 +1,137 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.apex.malhar.lib.fs.s3;
+
+import javax.validation.constraints.Min;
+
+import org.apache.apex.malhar.lib.fs.FSRecordReaderModule;
+
+import com.datatorrent.lib.io.fs.S3BlockReader;
+
+/**
+ * This module is used for reading records/tuples from S3. Records can be read
+ * in parallel using multiple partitions of record reader operator. (Ordering is
+ * not guaranteed when records are read in parallel)
+ *
+ * Input S3 directory is scanned at specified interval to poll for new data.
+ *
+ * The module reads data in parallel, following parameters can be configured
+ * <br/>
+ * 1. files: List of file(s)/directories to read. files would be in the form of
+ * SCHEME://AccessKey:SecretKey@BucketName/FileOrDirectory ,
+ * SCHEME://AccessKey:SecretKey@BucketName/FileOrDirectory , .... where SCHEME
+ * is the protocal scheme for the file system. AccessKey is the AWS access key
+ * and SecretKey is the AWS Secret Key<br/>
+ * 2. filePatternRegularExp: Files with names matching given regex will be read
+ * <br/>
+ * 3. scanIntervalMillis: interval between two scans to discover new files in
+ * input directory<br/>
+ * 4. recursive: if true, scan input directories recursively<br/>
+ * 5. blockSize: block size used to read input blocks of file, default value
+ * 64MB<br/>
+ * 6. overflowBlockSize: For delimited records, this value represents the
+ * additional data that needs to be read to find the delimiter character for
+ * last record in a block. This should be set to approximate record size in the
+ * file, default value is 1MB<br/>
+ * 7. sequentialFileRead: if true, then each reader partition will read
+ * different file. <br/>
+ * instead of reading different offsets of the same file. <br/>
+ * (File level parallelism instead of block level parallelism)<br/>
+ * 8. blocksThreshold: number of blocks emitted per window<br/>
+ * 9. minReaders: Minimum number of block readers for dynamic partitioning<br/>
+ * 10. maxReaders: Maximum number of block readers for dynamic partitioning<br/>
+ * 11. repartitionCheckInterval: Interval for re-evaluating dynamic
+ * partitioning<br/>
+ * different file. <br/>
+ * 12. s3EndPoint: Optional parameter used to specify S3 endpoint to use
+ */
+@org.apache.hadoop.classification.InterfaceStability.Evolving
+public class S3RecordReaderModule extends FSRecordReaderModule
+{
+  /**
+   * Endpoint for S3
+   */
+  private String s3EndPoint;
+  @Min(0)
+  private int overflowBlockSize;
+
+  /**
+   * Creates an instance of Record Reader
+   *
+   * @return S3RecordReader instance
+   */
+  @Override
+  public S3RecordReader createRecordReader()
+  {
+    S3RecordReader s3RecordReader = new S3RecordReader();
+    s3RecordReader.setBucketName(S3BlockReader.extractBucket(getFiles()));
+    s3RecordReader.setAccessKey(S3BlockReader.extractAccessKey(getFiles()));
+    s3RecordReader.setSecretAccessKey(S3BlockReader.extractSecretAccessKey(getFiles()));
+    s3RecordReader.setEndPoint(s3EndPoint);
+    s3RecordReader.setMode(this.getMode());
+    s3RecordReader.setRecordLength(this.getRecordLength());
+    if (overflowBlockSize != 0) {
+      s3RecordReader.setOverflowBufferSize(overflowBlockSize);
+    }
+    return s3RecordReader;
+  }
+
+  /**
+   * Set the S3 endpoint to use
+   *
+   * @param s3EndPoint
+   */
+  public void setS3EndPoint(String s3EndPoint)
+  {
+    this.s3EndPoint = s3EndPoint;
+  }
+
+  /**
+   * Returns the s3 endpoint
+   *
+   * @return s3EndPoint
+   */
+  public String getS3EndPoint()
+  {
+    return s3EndPoint;
+  }
+
+  /**
+   * additional data that needs to be read to find the delimiter character for
+   * last record in a block. This should be set to approximate record size in
+   * the file, default value 1MB
+   *
+   * @param overflowBlockSize
+   */
+  public void setOverflowBlockSize(int overflowBlockSize)
+  {
+    this.overflowBlockSize = overflowBlockSize;
+  }
+
+  /**
+   * returns the overflow block size
+   *
+   * @return overflowBlockSize
+   */
+  public int getOverflowBlockSize()
+  {
+    return overflowBlockSize;
+  }
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/0500e0ea/library/src/test/java/com/datatorrent/lib/io/block/FSLineReaderTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/com/datatorrent/lib/io/block/FSLineReaderTest.java b/library/src/test/java/com/datatorrent/lib/io/block/FSLineReaderTest.java
index 5ddc8a9..2fbb9e7 100644
--- a/library/src/test/java/com/datatorrent/lib/io/block/FSLineReaderTest.java
+++ b/library/src/test/java/com/datatorrent/lib/io/block/FSLineReaderTest.java
@@ -24,6 +24,8 @@ import java.io.FileInputStream;
 import java.io.IOException;
 import java.io.InputStreamReader;
 import java.util.Arrays;
+import java.util.Collections;
+import java.util.Comparator;
 import java.util.List;
 import java.util.regex.Pattern;
 
@@ -166,18 +168,47 @@ public class FSLineReaderTest
 
     testMeta.blockReader.beginWindow(1);
 
-    for (int i = 0; i < noOfBlocks; i++) {
-      BlockMetadata.FileBlockMetadata blockMetadata = new BlockMetadata.FileBlockMetadata(
-          testMeta.dataFile.getAbsolutePath(), i,
-          i * blockSize, i == noOfBlocks - 1 ? testMeta.dataFile.length() : (i + 1) * blockSize, i == noOfBlocks - 1,
-          -1);
-      testMeta.blockReader.blocksMetadataInput.process(blockMetadata);
+    for (int i = 0; i < 10; i++) {
+      for (int j = 0; j < Math.ceil(noOfBlocks / 10.0); j++) {
+        int blockNo = 10 * j + i;
+        if (blockNo >= noOfBlocks) {
+          continue;
+        }
+        BlockMetadata.FileBlockMetadata blockMetadata = new BlockMetadata.FileBlockMetadata(
+            testMeta.dataFile.getAbsolutePath(), blockNo, blockNo * blockSize,
+            blockNo == noOfBlocks - 1 ? testMeta.dataFile.length() : (blockNo + 1) * blockSize,
+            blockNo == noOfBlocks - 1, blockNo - 1);
+        testMeta.blockReader.blocksMetadataInput.process(blockMetadata);
+      }
     }
 
     testMeta.blockReader.endWindow();
 
     List<Object> messages = testMeta.messageSink.collectedTuples;
     Assert.assertEquals("No of records", testMeta.messages.size(), messages.size());
+
+    Collections.sort(testMeta.messages, new Comparator<String[]>()
+    {
+      @Override
+      public int compare(String[] rec1, String[] rec2)
+      {
+        return compareStringArrayRecords(rec1, rec2);
+      }
+    });
+
+    Collections.sort(messages, new Comparator<Object>()
+    {
+      @Override
+      public int compare(Object object1, Object object2)
+      {
+        @SuppressWarnings("unchecked")
+        String[] rec1 = ((AbstractBlockReader.ReaderRecord<String>)object1).getRecord().split(",");
+        @SuppressWarnings("unchecked")
+        String[] rec2 = ((AbstractBlockReader.ReaderRecord<String>)object2).getRecord().split(",");
+        return compareStringArrayRecords(rec1, rec2);
+      }
+    });
+
     for (int i = 0; i < messages.size(); i++) {
       @SuppressWarnings("unchecked")
       AbstractBlockReader.ReaderRecord<String> msg = (AbstractBlockReader.ReaderRecord<String>)messages.get(i);
@@ -198,6 +229,24 @@ public class FSLineReaderTest
     }
   }
 
+  /**
+   * Utility function to compare lexicographically 2 records of string arrays
+   *
+   * @param rec1
+   * @param rec2
+   * @return negative if rec1 < rec2, positive if rec1 > rec2, 0 otherwise
+   */
+  private int compareStringArrayRecords(String[] rec1, String[] rec2)
+  {
+    for (int i = 0; i < rec1.length && i < rec2.length; i++) {
+      if (rec1[i].equals(rec2[i])) {
+        continue;
+      }
+      return rec1[i].compareTo(rec2[i]);
+    }
+    return 0;
+  }
+
   @SuppressWarnings("unused")
   private static final Logger LOG = LoggerFactory.getLogger(FSLineReaderTest.class);
 }

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/0500e0ea/library/src/test/java/org/apache/apex/malhar/lib/fs/FSRecordReaderTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/org/apache/apex/malhar/lib/fs/FSRecordReaderTest.java b/library/src/test/java/org/apache/apex/malhar/lib/fs/FSRecordReaderTest.java
index 8560228..642621d 100644
--- a/library/src/test/java/org/apache/apex/malhar/lib/fs/FSRecordReaderTest.java
+++ b/library/src/test/java/org/apache/apex/malhar/lib/fs/FSRecordReaderTest.java
@@ -32,7 +32,6 @@ import org.junit.rules.TestWatcher;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import org.apache.apex.malhar.lib.fs.FSRecordReader.RECORD_READER_MODE;
 import org.apache.commons.io.FileUtils;
 import org.apache.hadoop.conf.Configuration;
 
@@ -118,6 +117,10 @@ public class FSRecordReaderTest
       }
     };
 
+    public static Set<String> getRecords()
+    {
+      return records;
+    }
   }
 
   private static class DelimitedApplication implements StreamingApplication
@@ -126,7 +129,7 @@ public class FSRecordReaderTest
     public void populateDAG(DAG dag, Configuration conf)
     {
       FSRecordReaderModule recordReader = dag.addModule("HDFSRecordReaderModule", FSRecordReaderModule.class);
-      recordReader.setMode(RECORD_READER_MODE.DELIMITED_RECORD);
+      recordReader.setMode("delimited_record");
       DelimitedValidator validator = dag.addOperator("Validator", new DelimitedValidator());
       dag.addStream("records", recordReader.records, validator.data);
     }
@@ -206,7 +209,7 @@ public class FSRecordReaderTest
     public void populateDAG(DAG dag, Configuration conf)
     {
       FSRecordReaderModule recordReader = dag.addModule("HDFSRecordReaderModule", FSRecordReaderModule.class);
-      recordReader.setMode(RECORD_READER_MODE.FIXED_WIDTH_RECORD);
+      recordReader.setMode("FIXED_WIDTH_RECORD");
       FixedWidthValidator validator = dag.addOperator("Validator", new FixedWidthValidator());
       dag.addStream("records", recordReader.records, validator.data);
     }

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/0500e0ea/library/src/test/java/org/apache/apex/malhar/lib/fs/s3/S3DelimitedRecordReaderTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/org/apache/apex/malhar/lib/fs/s3/S3DelimitedRecordReaderTest.java b/library/src/test/java/org/apache/apex/malhar/lib/fs/s3/S3DelimitedRecordReaderTest.java
new file mode 100644
index 0000000..a85e2c8
--- /dev/null
+++ b/library/src/test/java/org/apache/apex/malhar/lib/fs/s3/S3DelimitedRecordReaderTest.java
@@ -0,0 +1,285 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.apex.malhar.lib.fs.s3;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.List;
+
+import org.junit.Assert;
+import org.junit.Ignore;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TestWatcher;
+import org.junit.runner.Description;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.amazonaws.auth.BasicAWSCredentials;
+import com.amazonaws.services.s3.AmazonS3;
+import com.amazonaws.services.s3.AmazonS3Client;
+import com.amazonaws.services.s3.model.ObjectListing;
+import com.amazonaws.services.s3.model.PutObjectRequest;
+import com.amazonaws.services.s3.model.S3ObjectSummary;
+import com.google.common.collect.Lists;
+
+import com.datatorrent.api.Attribute;
+import com.datatorrent.api.Context;
+import com.datatorrent.api.DAG;
+import com.datatorrent.lib.helper.OperatorContextTestHelper;
+import com.datatorrent.lib.io.block.AbstractFSBlockReader;
+import com.datatorrent.lib.io.block.BlockMetadata;
+import com.datatorrent.lib.testbench.CollectorTestSink;
+import com.datatorrent.netlet.util.Slice;
+
+@Ignore
+public class S3DelimitedRecordReaderTest
+{
+  private final String accessKey = "*************";
+  private final String secretKey = "*********************";
+  private static final int overflowBufferSize = 123;
+  private static final String FILE_1 = "file1.txt";
+  private static final String s3Directory = "input/";
+
+  AbstractFSBlockReader<Slice> getBlockReader(String bucketKey)
+  {
+    S3RecordReader blockReader = new S3RecordReader();
+
+    blockReader.setAccessKey(accessKey);
+    blockReader.setSecretAccessKey(secretKey);
+    blockReader.setBucketName(bucketKey);
+    blockReader.setOverflowBufferSize(overflowBufferSize);
+    return blockReader;
+  }
+
+  class TestMeta extends TestWatcher
+  {
+    private Context.OperatorContext readerContext;
+    private AbstractFSBlockReader<Slice> blockReader;
+    private CollectorTestSink<Object> blockMetadataSink;
+    private CollectorTestSink<Object> messageSink;
+    private List<String[]> messages = Lists.newArrayList();
+    private String appId;
+    private String dataFilePath;
+    private File dataFile;
+    private String bucketKey;
+    private AmazonS3 client;
+
+    @Override
+    protected void starting(org.junit.runner.Description description)
+    {
+      dataFilePath = "src/test/resources/reader_test_data.csv";
+      dataFile = new File(dataFilePath);
+      bucketKey = new String("target-" + description.getMethodName()).toLowerCase();
+
+      client = new AmazonS3Client(new BasicAWSCredentials(accessKey, secretKey));
+      client.createBucket(bucketKey);
+
+      client.putObject(new PutObjectRequest(bucketKey, s3Directory + FILE_1, dataFile));
+
+      appId = Long.toHexString(System.currentTimeMillis());
+      blockReader = getBlockReader(bucketKey);
+
+      Attribute.AttributeMap.DefaultAttributeMap readerAttr = new Attribute.AttributeMap.DefaultAttributeMap();
+      readerAttr.put(DAG.APPLICATION_ID, appId);
+      readerAttr.put(Context.OperatorContext.SPIN_MILLIS, 10);
+      readerContext = new OperatorContextTestHelper.TestIdOperatorContext(1, readerAttr);
+
+      blockReader.setup(readerContext);
+
+      messageSink = new CollectorTestSink<>();
+      ((S3RecordReader)blockReader).records.setSink(messageSink);
+
+      blockMetadataSink = new CollectorTestSink<>();
+      blockReader.blocksMetadataOutput.setSink(blockMetadataSink);
+
+      BufferedReader reader;
+      try {
+        reader = new BufferedReader(new InputStreamReader(new FileInputStream(dataFile.getAbsolutePath())));
+        String line;
+        while ((line = reader.readLine()) != null) {
+          messages.add(line.split(","));
+        }
+        reader.close();
+      } catch (IOException e) {
+        throw new RuntimeException(e);
+      }
+    }
+
+    @Override
+    protected void finished(Description description)
+    {
+      deleteBucketAndContent();
+      blockReader.teardown();
+    }
+
+    public void deleteBucketAndContent()
+    {
+      //Get the list of objects
+      ObjectListing objectListing = client.listObjects(bucketKey);
+      for (Iterator<?> iterator = objectListing.getObjectSummaries().iterator(); iterator.hasNext();) {
+        S3ObjectSummary objectSummary = (S3ObjectSummary)iterator.next();
+        LOG.info("Deleting an object: {}", objectSummary.getKey());
+        client.deleteObject(bucketKey, objectSummary.getKey());
+      }
+      client.deleteBucket(bucketKey);
+    }
+  }
+
+  @Rule
+  public TestMeta testMeta = new TestMeta();
+
+  /**
+   * The file is processed as a single block
+   */
+  @Test
+  public void testSingleBlock()
+  {
+    BlockMetadata.FileBlockMetadata block = new BlockMetadata.FileBlockMetadata(s3Directory + FILE_1, 0L, 0L,
+        testMeta.dataFile.length(), true, -1, testMeta.dataFile.length());
+
+    testMeta.blockReader.beginWindow(1);
+    testMeta.blockReader.blocksMetadataInput.process(block);
+    testMeta.blockReader.endWindow();
+
+    List<Object> actualMessages = testMeta.messageSink.collectedTuples;
+    Assert.assertEquals("No of records", testMeta.messages.size(), actualMessages.size());
+
+    for (int i = 0; i < actualMessages.size(); i++) {
+      byte[] msg = (byte[])actualMessages.get(i);
+      Assert.assertTrue("line " + i, Arrays.equals(new String(msg).split(","), testMeta.messages.get(i)));
+    }
+  }
+
+  /**
+   * The file is divided into multiple blocks, blocks are processed
+   * consecutively
+   */
+  @Test
+  public void testMultipleBlocks()
+  {
+    long blockSize = 1000;
+    int noOfBlocks = (int)((testMeta.dataFile.length() / blockSize)
+        + (((testMeta.dataFile.length() % blockSize) == 0) ? 0 : 1));
+
+    testMeta.blockReader.beginWindow(1);
+
+    for (int i = 0; i < noOfBlocks; i++) {
+      BlockMetadata.FileBlockMetadata blockMetadata = new BlockMetadata.FileBlockMetadata(s3Directory + FILE_1, i,
+          i * blockSize, i == noOfBlocks - 1 ? testMeta.dataFile.length() : (i + 1) * blockSize, i == noOfBlocks - 1,
+          i - 1, testMeta.dataFile.length());
+      testMeta.blockReader.blocksMetadataInput.process(blockMetadata);
+    }
+
+    testMeta.blockReader.endWindow();
+
+    List<Object> messages = testMeta.messageSink.collectedTuples;
+    Assert.assertEquals("No of records", testMeta.messages.size(), messages.size());
+    for (int i = 0; i < messages.size(); i++) {
+
+      byte[] msg = (byte[])messages.get(i);
+      Assert.assertTrue("line " + i, Arrays.equals(new String(msg).split(","), testMeta.messages.get(i)));
+    }
+  }
+
+  /**
+   * The file is divided into multiple blocks, blocks are processed
+   * non-consecutively
+   */
+  @Test
+  public void testNonConsecutiveBlocks()
+  {
+    long blockSize = 1000;
+    int noOfBlocks = (int)((testMeta.dataFile.length() / blockSize)
+        + (((testMeta.dataFile.length() % blockSize) == 0) ? 0 : 1));
+
+    testMeta.blockReader.beginWindow(1);
+
+    for (int i = 0; i < 10; i++) {
+      for (int j = 0; j < Math.ceil(noOfBlocks / 10.0); j++) {
+        int blockNo = 10 * j + i;
+        if (blockNo >= noOfBlocks) {
+          continue;
+        }
+        BlockMetadata.FileBlockMetadata blockMetadata = new BlockMetadata.FileBlockMetadata(s3Directory + FILE_1,
+            blockNo, blockNo * blockSize,
+            blockNo == noOfBlocks - 1 ? testMeta.dataFile.length() : (blockNo + 1) * blockSize,
+            blockNo == noOfBlocks - 1, blockNo - 1, testMeta.dataFile.length());
+        testMeta.blockReader.blocksMetadataInput.process(blockMetadata);
+      }
+    }
+
+    testMeta.blockReader.endWindow();
+
+    List<Object> messages = testMeta.messageSink.collectedTuples;
+    Assert.assertEquals("No of records", testMeta.messages.size(), messages.size());
+
+    Collections.sort(testMeta.messages, new Comparator<String[]>()
+    {
+      @Override
+      public int compare(String[] rec1, String[] rec2)
+      {
+        return compareStringArrayRecords(rec1, rec2);
+      }
+    });
+
+    Collections.sort(messages, new Comparator<Object>()
+    {
+      @Override
+      public int compare(Object object1, Object object2)
+      {
+        String[] rec1 = new String((byte[])object1).split(",");
+        String[] rec2 = new String((byte[])object2).split(",");
+        return compareStringArrayRecords(rec1, rec2);
+      }
+    });
+    for (int i = 0; i < messages.size(); i++) {
+      byte[] msg = (byte[])messages.get(i);
+      Assert.assertTrue("line " + i, Arrays.equals(new String(msg).split(","), testMeta.messages.get(i)));
+    }
+  }
+
+  /**
+   * Utility function to compare lexicographically 2 records of string arrays
+   *
+   * @param rec1
+   * @param rec2
+   * @return negative if rec1 < rec2, positive if rec1 > rec2, 0 otherwise
+   */
+  private int compareStringArrayRecords(String[] rec1, String[] rec2)
+  {
+    for (int i = 0; i < rec1.length && i < rec2.length; i++) {
+      if (rec1[i].equals(rec2[i])) {
+        continue;
+      }
+      return rec1[i].compareTo(rec2[i]);
+    }
+    return 0;
+  }
+
+  private static final Logger LOG = LoggerFactory.getLogger(S3DelimitedRecordReaderTest.class);
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/0500e0ea/library/src/test/java/org/apache/apex/malhar/lib/fs/s3/S3FixedWidthRecordReaderTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/org/apache/apex/malhar/lib/fs/s3/S3FixedWidthRecordReaderTest.java b/library/src/test/java/org/apache/apex/malhar/lib/fs/s3/S3FixedWidthRecordReaderTest.java
new file mode 100644
index 0000000..0584973
--- /dev/null
+++ b/library/src/test/java/org/apache/apex/malhar/lib/fs/s3/S3FixedWidthRecordReaderTest.java
@@ -0,0 +1,292 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.apex.malhar.lib.fs.s3;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.List;
+
+import org.junit.Assert;
+import org.junit.Ignore;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TestWatcher;
+import org.junit.runner.Description;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.amazonaws.auth.BasicAWSCredentials;
+import com.amazonaws.services.s3.AmazonS3;
+import com.amazonaws.services.s3.AmazonS3Client;
+import com.amazonaws.services.s3.model.ObjectListing;
+import com.amazonaws.services.s3.model.PutObjectRequest;
+import com.amazonaws.services.s3.model.S3ObjectSummary;
+import com.google.common.collect.Lists;
+
+import com.datatorrent.api.Attribute;
+import com.datatorrent.api.Context;
+import com.datatorrent.api.DAG;
+import com.datatorrent.lib.helper.OperatorContextTestHelper;
+import com.datatorrent.lib.io.block.AbstractFSBlockReader;
+import com.datatorrent.lib.io.block.BlockMetadata;
+import com.datatorrent.lib.testbench.CollectorTestSink;
+import com.datatorrent.netlet.util.Slice;
+
+@Ignore
+public class S3FixedWidthRecordReaderTest
+{
+  private final String accessKey = "*************";
+  private final String secretKey = "*********************";
+  private static final int recordLength = 123;
+  private static final String FILE_1 = "file1.txt";
+  private static final String s3Directory = "input/";
+
+  AbstractFSBlockReader<Slice> getBlockReader(String bucketKey)
+  {
+    S3RecordReader blockReader = new S3RecordReader();
+    blockReader.setAccessKey(accessKey);
+    blockReader.setSecretAccessKey(secretKey);
+    blockReader.setBucketName(bucketKey);
+    blockReader.setRecordLength(recordLength);
+    blockReader.setMode("FIXED_WIDTH_RECORD");
+    return blockReader;
+  }
+
+  class TestMeta extends TestWatcher
+  {
+    private Context.OperatorContext readerContext;
+    private AbstractFSBlockReader<Slice> blockReader;
+    private CollectorTestSink<Object> blockMetadataSink;
+    private CollectorTestSink<Object> messageSink;
+    private List<String[]> messages = Lists.newArrayList();
+    private String appId;
+    private String dataFilePath;
+    private File dataFile;
+    private String bucketKey;
+    private AmazonS3 client;
+
+    @Override
+    protected void starting(org.junit.runner.Description description)
+    {
+      dataFilePath = "src/test/resources/reader_test_data.csv";
+      dataFile = new File(dataFilePath);
+      bucketKey = new String("target-" + description.getMethodName()).toLowerCase();
+
+      client = new AmazonS3Client(new BasicAWSCredentials(accessKey, secretKey));
+      client.createBucket(bucketKey);
+
+      client.putObject(new PutObjectRequest(bucketKey, s3Directory + FILE_1, dataFile));
+
+      appId = Long.toHexString(System.currentTimeMillis());
+      blockReader = getBlockReader(bucketKey);
+
+      Attribute.AttributeMap.DefaultAttributeMap readerAttr = new Attribute.AttributeMap.DefaultAttributeMap();
+      readerAttr.put(DAG.APPLICATION_ID, appId);
+      readerAttr.put(Context.OperatorContext.SPIN_MILLIS, 10);
+      readerContext = new OperatorContextTestHelper.TestIdOperatorContext(1, readerAttr);
+
+      blockReader.setup(readerContext);
+
+      messageSink = new CollectorTestSink<>();
+      ((S3RecordReader)blockReader).records.setSink(messageSink);
+
+      blockMetadataSink = new CollectorTestSink<>();
+      blockReader.blocksMetadataOutput.setSink(blockMetadataSink);
+
+      BufferedReader reader;
+      try {
+        reader = new BufferedReader(new InputStreamReader(new FileInputStream(dataFile.getAbsolutePath())));
+        String line;
+        while ((line = reader.readLine()) != null) {
+          messages.add(line.split(","));
+        }
+        reader.close();
+      } catch (IOException e) {
+        throw new RuntimeException(e);
+      }
+    }
+
+    @Override
+    protected void finished(Description description)
+    {
+      deleteBucketAndContent();
+      blockReader.teardown();
+    }
+
+    public void deleteBucketAndContent()
+    {
+      //Get the list of objects
+      ObjectListing objectListing = client.listObjects(bucketKey);
+      for (Iterator<?> iterator = objectListing.getObjectSummaries().iterator(); iterator.hasNext();) {
+        S3ObjectSummary objectSummary = (S3ObjectSummary)iterator.next();
+        LOG.info("Deleting an object: {}", objectSummary.getKey());
+        client.deleteObject(bucketKey, objectSummary.getKey());
+      }
+      client.deleteBucket(bucketKey);
+    }
+  }
+
+  @Rule
+  public TestMeta testMeta = new TestMeta();
+
+  /**
+   * The file is processed as a single block
+   */
+  @Test
+  public void testSingleBlock()
+  {
+    BlockMetadata.FileBlockMetadata block = new BlockMetadata.FileBlockMetadata(s3Directory + FILE_1, 0L, 0L,
+        testMeta.dataFile.length(), true, -1, testMeta.dataFile.length());
+
+    testMeta.blockReader.beginWindow(1);
+    testMeta.blockReader.blocksMetadataInput.process(block);
+    testMeta.blockReader.endWindow();
+
+    List<Object> actualMessages = testMeta.messageSink.collectedTuples;
+    Assert.assertEquals("No of records", testMeta.messages.size(), actualMessages.size());
+
+    for (int i = 0; i < actualMessages.size(); i++) {
+      byte[] msg = (byte[])actualMessages.get(i);
+      /*
+       * last character is removed below since the testMeta.messages does not contain '\n'
+       *  present in byte[] msg
+       */
+      Assert.assertTrue("line " + i,
+          Arrays.equals(new String(Arrays.copyOf(msg, msg.length - 1)).split(","), testMeta.messages.get(i)));
+    }
+  }
+
+  /**
+   * The file is divided into multiple blocks, blocks are processed
+   * consecutively
+   */
+  @Test
+  public void testMultipleBlocks()
+  {
+    long blockSize = 1000;
+    int noOfBlocks = (int)((testMeta.dataFile.length() / blockSize)
+        + (((testMeta.dataFile.length() % blockSize) == 0) ? 0 : 1));
+
+    testMeta.blockReader.beginWindow(1);
+
+    for (int i = 0; i < noOfBlocks; i++) {
+      BlockMetadata.FileBlockMetadata blockMetadata = new BlockMetadata.FileBlockMetadata(s3Directory + FILE_1, i,
+          i * blockSize, i == noOfBlocks - 1 ? testMeta.dataFile.length() : (i + 1) * blockSize, i == noOfBlocks - 1,
+          i - 1, testMeta.dataFile.length());
+      testMeta.blockReader.blocksMetadataInput.process(blockMetadata);
+    }
+
+    testMeta.blockReader.endWindow();
+
+    List<Object> messages = testMeta.messageSink.collectedTuples;
+    Assert.assertEquals("No of records", testMeta.messages.size(), messages.size());
+    for (int i = 0; i < messages.size(); i++) {
+
+      byte[] msg = (byte[])messages.get(i);
+      Assert.assertTrue("line " + i,
+          Arrays.equals(new String(Arrays.copyOf(msg, msg.length - 1)).split(","), testMeta.messages.get(i)));
+    }
+  }
+
+  /**
+   * The file is divided into multiple blocks, blocks are processed
+   * non-consecutively
+   */
+  @Test
+  public void testNonConsecutiveBlocks()
+  {
+    long blockSize = 1000;
+    int noOfBlocks = (int)((testMeta.dataFile.length() / blockSize)
+        + (((testMeta.dataFile.length() % blockSize) == 0) ? 0 : 1));
+
+    testMeta.blockReader.beginWindow(1);
+
+    for (int i = 0; i < 10; i++) {
+      for (int j = 0; j < Math.ceil(noOfBlocks / 10.0); j++) {
+        int blockNo = 10 * j + i;
+        if (blockNo >= noOfBlocks) {
+          continue;
+        }
+        BlockMetadata.FileBlockMetadata blockMetadata = new BlockMetadata.FileBlockMetadata(s3Directory + FILE_1,
+            blockNo, blockNo * blockSize,
+            blockNo == noOfBlocks - 1 ? testMeta.dataFile.length() : (blockNo + 1) * blockSize,
+            blockNo == noOfBlocks - 1, blockNo - 1, testMeta.dataFile.length());
+        testMeta.blockReader.blocksMetadataInput.process(blockMetadata);
+      }
+    }
+
+    testMeta.blockReader.endWindow();
+
+    List<Object> messages = testMeta.messageSink.collectedTuples;
+    Assert.assertEquals("No of records", testMeta.messages.size(), messages.size());
+
+    Collections.sort(testMeta.messages, new Comparator<String[]>()
+    {
+      @Override
+      public int compare(String[] rec1, String[] rec2)
+      {
+        return compareStringArrayRecords(rec1, rec2);
+      }
+    });
+
+    Collections.sort(messages, new Comparator<Object>()
+    {
+      @Override
+      public int compare(Object object1, Object object2)
+      {
+        String[] rec1 = new String((byte[])object1).split(",");
+        String[] rec2 = new String((byte[])object2).split(",");
+        return compareStringArrayRecords(rec1, rec2);
+      }
+    });
+    for (int i = 0; i < messages.size(); i++) {
+      byte[] msg = (byte[])messages.get(i);
+      Assert.assertTrue("line " + i,
+          Arrays.equals(new String(Arrays.copyOf(msg, msg.length - 1)).split(","), testMeta.messages.get(i)));
+    }
+  }
+
+  /**
+   * Utility function to compare lexicographically 2 records of string arrays
+   *
+   * @param rec1
+   * @param rec2
+   * @return negative if rec1 < rec2, positive if rec1 > rec2, 0 otherwise
+   */
+  private int compareStringArrayRecords(String[] rec1, String[] rec2)
+  {
+    for (int i = 0; i < rec1.length && i < rec2.length; i++) {
+      if (rec1[i].equals(rec2[i])) {
+        continue;
+      }
+      return rec1[i].compareTo(rec2[i]);
+    }
+    return 0;
+  }
+
+  private static final Logger LOG = LoggerFactory.getLogger(S3FixedWidthRecordReaderTest.class);
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/0500e0ea/library/src/test/java/org/apache/apex/malhar/lib/fs/s3/S3RecordReaderMockTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/org/apache/apex/malhar/lib/fs/s3/S3RecordReaderMockTest.java b/library/src/test/java/org/apache/apex/malhar/lib/fs/s3/S3RecordReaderMockTest.java
new file mode 100644
index 0000000..bca3f33
--- /dev/null
+++ b/library/src/test/java/org/apache/apex/malhar/lib/fs/s3/S3RecordReaderMockTest.java
@@ -0,0 +1,269 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.apex.malhar.lib.fs.s3;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.Set;
+
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TestWatcher;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.apex.malhar.lib.fs.FSRecordReaderModule;
+import org.apache.apex.malhar.lib.fs.FSRecordReaderTest.DelimitedValidator;
+import org.apache.apex.malhar.lib.fs.FSRecordReaderTest.FixedWidthValidator;
+import org.apache.commons.io.FileUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.Path;
+
+import com.amazonaws.services.s3.AmazonS3;
+
+import com.datatorrent.api.Context.OperatorContext;
+import com.datatorrent.api.DAG;
+import com.datatorrent.api.LocalMode;
+import com.datatorrent.api.StreamingApplication;
+import com.datatorrent.lib.io.block.BlockMetadata.FileBlockMetadata;
+import com.datatorrent.lib.io.block.ReaderContext;
+import com.datatorrent.lib.io.fs.S3BlockReader;
+
+import static org.mockito.Mockito.mock;
+
+public class S3RecordReaderMockTest
+{
+  private String inputDir;
+  static String outputDir;
+  private static final String FILE_1 = "file1.txt";
+  private static final String FILE_2 = "file2.txt";
+  private static final String FILE_1_DATA = "1234\n567890\nabcde\nfgh\ni\njklmop";
+  private static final String FILE_2_DATA = "qr\nstuvw\nxyz\n";
+
+  public static class TestMeta extends TestWatcher
+  {
+    public String baseDirectory;
+
+    @Override
+    protected void starting(org.junit.runner.Description description)
+    {
+      this.baseDirectory = "target/" + description.getClassName() + "/" + description.getMethodName();
+    }
+
+  }
+
+  @Rule
+  public TestMeta testMeta = new TestMeta();
+
+  @Before
+  public void setup() throws Exception
+  {
+    inputDir = testMeta.baseDirectory + File.separator + "input";
+
+    FileUtils.writeStringToFile(new File(inputDir + File.separator + FILE_1), FILE_1_DATA);
+    FileUtils.writeStringToFile(new File(inputDir + File.separator + FILE_2), FILE_2_DATA);
+  }
+
+  @Test
+  public void testDelimitedRecords() throws Exception
+  {
+
+    DelimitedApplication app = new DelimitedApplication();
+    LocalMode lma = LocalMode.newInstance();
+    Configuration conf = new Configuration(false);
+    conf.set("dt.operator.S3RecordReaderModuleMock.prop.files", inputDir);
+    conf.set("dt.operator.S3RecordReaderModuleMock.prop.blockSize", "3");
+    conf.set("dt.operator.S3RecordReaderModuleMock.prop.blocksThreshold", "1");
+    conf.set("dt.operator.S3RecordReaderModuleMock.prop.scanIntervalMillis", "10000");
+
+    lma.prepareDAG(app, conf);
+    LocalMode.Controller lc = lma.getController();
+    lc.setHeartbeatMonitoringEnabled(true);
+    lc.runAsync();
+
+    Set<String> expectedRecords = new HashSet<String>(Arrays.asList(FILE_1_DATA.split("\n")));
+    expectedRecords.addAll(Arrays.asList(FILE_2_DATA.split("\n")));
+
+    while (DelimitedValidator.getRecords().size() != expectedRecords.size()) {
+      LOG.debug("Waiting for app to finish");
+      Thread.sleep(1000);
+    }
+    lc.shutdown();
+    Assert.assertEquals(expectedRecords, DelimitedValidator.getRecords());
+
+  }
+
+  private static class S3RecordReaderMock extends S3RecordReader
+  {
+    AmazonS3 s3ClientObject;
+
+    @Override
+    protected FSDataInputStream setupStream(FileBlockMetadata block) throws IOException
+    {
+      super.setupStream(block);
+      return fs.open(new Path(block.getFilePath()));
+    }
+
+    @Override
+    public void setup(OperatorContext context)
+    {
+      s3ClientObject = mock(AmazonS3.class);
+      super.setup(context);
+    }
+
+    @Override
+    protected ReaderContext<FSDataInputStream> createDelimitedReaderContext()
+    {
+      S3DelimitedRecordReaderContextMock s3DelimitedRecordReaderContextMock = new S3DelimitedRecordReaderContextMock();
+      s3DelimitedRecordReaderContextMock.getS3Params().setBucketName("S3RecordReaderMock");
+      s3DelimitedRecordReaderContextMock.getS3Params().setS3Client(s3ClientObject);
+      return s3DelimitedRecordReaderContextMock;
+    }
+
+    @Override
+    protected ReaderContext<FSDataInputStream> createFixedWidthReaderContext()
+    {
+      S3FixedWidthRecordReaderContextMock s3FixedWidthRecordReaderContextMock = new S3FixedWidthRecordReaderContextMock();
+      s3FixedWidthRecordReaderContextMock.getS3Params().setBucketName("S3RecordReaderMock");
+      s3FixedWidthRecordReaderContextMock.getS3Params().setS3Client(s3ClientObject);
+      s3FixedWidthRecordReaderContextMock.setLength(this.getRecordLength());
+      return s3FixedWidthRecordReaderContextMock;
+    }
+
+    private class S3DelimitedRecordReaderContextMock extends S3DelimitedRecordReaderContext
+    {
+      @Override
+      protected int readData(long bytesFromCurrentOffset, int bytesToFetch) throws IOException
+      {
+        if (buffer == null) {
+          buffer = new byte[bytesToFetch];
+        }
+        return stream.read(offset + bytesFromCurrentOffset, buffer, 0, bytesToFetch);
+      }
+    }
+
+    private static class S3FixedWidthRecordReaderContextMock extends S3FixedWidthRecordReaderContext
+    {
+      @Override
+      protected int readData(long startOffset, long endOffset) throws IOException
+      {
+        int bufferSize = Long.valueOf(endOffset - startOffset + 1).intValue();
+        if (buffer == null) {
+          buffer = new byte[bufferSize];
+        }
+        return stream.read(startOffset, buffer, 0, bufferSize);
+      }
+    }
+  }
+
+  private static class S3RecordReaderModuleMock extends S3RecordReaderModule
+  {
+    @Override
+    public S3RecordReader createRecordReader()
+    {
+      S3RecordReader s3RecordReader = new S3RecordReaderMock();
+      s3RecordReader.setBucketName(S3BlockReader.extractBucket(getFiles()));
+      s3RecordReader.setAccessKey("****");
+      s3RecordReader.setSecretAccessKey("*****");
+      s3RecordReader.setMode(this.getMode().toString());
+      s3RecordReader.setRecordLength(this.getRecordLength());
+      return s3RecordReader;
+    }
+  }
+
+  private static class DelimitedApplication implements StreamingApplication
+  {
+
+    public void populateDAG(DAG dag, Configuration conf)
+    {
+
+      S3RecordReaderModuleMock recordReader = dag.addModule("S3RecordReaderModuleMock", new S3RecordReaderModuleMock());
+      recordReader.setMode("delimited_record");
+      DelimitedValidator validator = dag.addOperator("Validator", new DelimitedValidator());
+      dag.addStream("records", recordReader.records, validator.data);
+    }
+
+  }
+
+  @Test
+  public void testFixedWidthRecords() throws Exception
+  {
+
+    FixedWidthApplication app = new FixedWidthApplication();
+    LocalMode lma = LocalMode.newInstance();
+    Configuration conf = new Configuration(false);
+    conf.set("dt.operator.S3RecordReaderModuleMock.prop.files", inputDir);
+    conf.set("dt.operator.S3RecordReaderModuleMock.prop.recordLength", "8");
+    conf.set("dt.operator.S3RecordReaderModuleMock.prop.blockSize", "3");
+    conf.set("dt.operator.S3RecordReaderModuleMock.prop.blocksThreshold", "1");
+    conf.set("dt.operator.S3RecordReaderModuleMock.prop.scanIntervalMillis", "10000");
+
+    lma.prepareDAG(app, conf);
+    LocalMode.Controller lc = lma.getController();
+    lc.setHeartbeatMonitoringEnabled(true);
+    lc.runAsync();
+    LOG.debug("Waiting for app to finish");
+    Thread.sleep(1000 * 1);
+    lc.shutdown();
+  }
+
+  @Test(expected = IllegalArgumentException.class)
+  public void testMissingRecordLength() throws Exception
+  {
+    FixedWidthApplication app = new FixedWidthApplication();
+    LocalMode lma = LocalMode.newInstance();
+    Configuration conf = new Configuration(false);
+    conf.set("dt.operator.S3RecordReaderModuleMock.prop.files", inputDir);
+    //Should give IllegalArgumentException since recordLength is not set
+    //conf.set("dt.operator.HDFSRecordReaderModule.prop.recordLength", "8");
+    conf.set("dt.operator.S3RecordReaderModuleMock.prop.blocksThreshold", "1");
+    conf.set("dt.operator.S3RecordReaderModuleMock.prop.blockSize", "3");
+    conf.set("dt.operator.S3RecordReaderModuleMock.prop.scanIntervalMillis", "10000");
+
+    lma.prepareDAG(app, conf);
+    LocalMode.Controller lc = lma.getController();
+    lc.setHeartbeatMonitoringEnabled(true);
+    lc.runAsync();
+    LOG.debug("Waiting for app to finish");
+    Thread.sleep(1000 * 1);
+    lc.shutdown();
+  }
+
+  private static class FixedWidthApplication implements StreamingApplication
+  {
+
+    public void populateDAG(DAG dag, Configuration conf)
+    {
+      FSRecordReaderModule recordReader = dag.addModule("S3RecordReaderModuleMock", FSRecordReaderModule.class);
+      recordReader.setMode("FIXED_WIDTH_RECORD");
+      FixedWidthValidator validator = dag.addOperator("Validator", new FixedWidthValidator());
+      dag.addStream("records", recordReader.records, validator.data);
+    }
+
+  }
+
+  private static Logger LOG = LoggerFactory.getLogger(S3RecordReaderMockTest.class);
+
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/0500e0ea/library/src/test/java/org/apache/apex/malhar/lib/fs/s3/S3RecordReaderModuleAppTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/org/apache/apex/malhar/lib/fs/s3/S3RecordReaderModuleAppTest.java b/library/src/test/java/org/apache/apex/malhar/lib/fs/s3/S3RecordReaderModuleAppTest.java
new file mode 100644
index 0000000..7d69934
--- /dev/null
+++ b/library/src/test/java/org/apache/apex/malhar/lib/fs/s3/S3RecordReaderModuleAppTest.java
@@ -0,0 +1,221 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.apex.malhar.lib.fs.s3;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Set;
+
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Ignore;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TestWatcher;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.apex.malhar.lib.fs.FSRecordReaderTest.DelimitedValidator;
+import org.apache.apex.malhar.lib.fs.FSRecordReaderTest.FixedWidthValidator;
+import org.apache.commons.io.FileUtils;
+import org.apache.hadoop.conf.Configuration;
+
+import com.amazonaws.auth.BasicAWSCredentials;
+import com.amazonaws.services.s3.AmazonS3;
+import com.amazonaws.services.s3.AmazonS3Client;
+import com.amazonaws.services.s3.model.ObjectListing;
+import com.amazonaws.services.s3.model.PutObjectRequest;
+import com.amazonaws.services.s3.model.S3ObjectSummary;
+
+import com.datatorrent.api.DAG;
+import com.datatorrent.api.LocalMode;
+import com.datatorrent.api.StreamingApplication;
+
+@Ignore
+public class S3RecordReaderModuleAppTest
+{
+  private String inputDir;
+  private static final String FILE_1 = "file1.txt";
+  private static final String FILE_2 = "file2.txt";
+  private static final String FILE_1_DATA = "1234\n567890\nabcde\nfgh\ni\njklmop";
+  private static final String FILE_2_DATA = "qr\nstuvw\nxyz\n";
+
+  private final String accessKey = "*************";
+  private final String secretKey = "*********************";
+  private AmazonS3 client;
+  private String files;
+  private static final String SCHEME = "s3n";
+
+  public static class TestMeta extends TestWatcher
+  {
+    public String baseDirectory;
+    public String bucketKey;
+
+    @Override
+    protected void starting(org.junit.runner.Description description)
+    {
+      this.baseDirectory = "target/" + description.getClassName() + "/" + description.getMethodName();
+      this.bucketKey = new String("target-" + description.getMethodName()).toLowerCase();
+    }
+  }
+
+  @Rule
+  public S3RecordReaderModuleAppTest.TestMeta testMeta = new S3RecordReaderModuleAppTest.TestMeta();
+
+  @Before
+  public void setup() throws Exception
+  {
+    client = new AmazonS3Client(new BasicAWSCredentials(accessKey, secretKey));
+    client.createBucket(testMeta.bucketKey);
+    inputDir = testMeta.baseDirectory + File.separator + "input";
+
+    File file1 = new File(inputDir + File.separator + FILE_1);
+    File file2 = new File(inputDir + File.separator + FILE_2);
+
+    FileUtils.writeStringToFile(file1, FILE_1_DATA);
+    FileUtils.writeStringToFile(file2, FILE_2_DATA);
+
+    client.putObject(new PutObjectRequest(testMeta.bucketKey, "input/" + FILE_1, file1));
+    client.putObject(new PutObjectRequest(testMeta.bucketKey, "input/" + FILE_2, file2));
+    files = SCHEME + "://" + accessKey + ":" + secretKey + "@" + testMeta.bucketKey + "/input";
+  }
+
+  @Test
+  public void testS3DelimitedRecords() throws Exception
+  {
+
+    S3DelimitedApplication app = new S3DelimitedApplication();
+    LocalMode lma = LocalMode.newInstance();
+
+    Configuration conf = new Configuration(false);
+    conf.set("dt.operator.s3RecordReaderModule.prop.files", files);
+    conf.set("dt.operator.s3RecordReaderModule.prop.blockSize", "10");
+    conf.set("dt.operator.s3RecordReaderModule.prop.overflowBlockSize", "4");
+    conf.set("dt.operator.s3RecordReaderModule.prop.scanIntervalMillis", "10000");
+
+    lma.prepareDAG(app, conf);
+    LocalMode.Controller lc = lma.getController();
+    lc.setHeartbeatMonitoringEnabled(true);
+    lc.runAsync();
+
+    Set<String> expectedRecords = new HashSet<String>(Arrays.asList(FILE_1_DATA.split("\n")));
+    expectedRecords.addAll(Arrays.asList(FILE_2_DATA.split("\n")));
+
+    while (DelimitedValidator.getRecords().size() != expectedRecords.size()) {
+      LOG.debug("Waiting for app to finish");
+      Thread.sleep(1000);
+    }
+    lc.shutdown();
+    Assert.assertEquals(expectedRecords, DelimitedValidator.getRecords());
+
+  }
+
+  private static class S3DelimitedApplication implements StreamingApplication
+  {
+
+    public void populateDAG(DAG dag, Configuration conf)
+    {
+      S3RecordReaderModule recordReader = dag.addModule("s3RecordReaderModule", S3RecordReaderModule.class);
+      DelimitedValidator validator = dag.addOperator("Validator", new DelimitedValidator());
+      dag.addStream("records", recordReader.records, validator.data);
+    }
+
+  }
+
+  @Test
+  public void testS3FixedWidthRecords() throws Exception
+  {
+
+    S3FixedWidthApplication app = new S3FixedWidthApplication();
+    LocalMode lma = LocalMode.newInstance();
+    Configuration conf = new Configuration(false);
+    conf.set("dt.operator.S3RecordReaderModule.prop.files", files);
+    conf.set("dt.operator.S3RecordReaderModule.prop.recordLength", "8");
+    conf.set("dt.operator.S3RecordReaderModule.prop.blocksThreshold", "1");
+    conf.set("dt.operator.S3RecordReaderModule.prop.scanIntervalMillis", "10000");
+
+    lma.prepareDAG(app, conf);
+    LocalMode.Controller lc = lma.getController();
+    lc.setHeartbeatMonitoringEnabled(true);
+    lc.runAsync();
+    LOG.debug("Waiting for app to finish");
+    Thread.sleep(1000 * 1);
+    lc.shutdown();
+  }
+
+  @Test(expected = IllegalArgumentException.class)
+  public void testS3MissingRecordLength() throws Exception
+  {
+    S3FixedWidthApplication app = new S3FixedWidthApplication();
+    LocalMode lma = LocalMode.newInstance();
+    Configuration conf = new Configuration(false);
+    conf.set("dt.operator.S3RecordReaderModule.prop.files", files);
+    //Should give IllegalArgumentException since recordLength is not set
+    //conf.set("dt.operator.S3RecordReaderModule.prop.recordLength", "8");
+    conf.set("dt.operator.S3RecordReaderModule.prop.blocksThreshold", "1");
+    conf.set("dt.operator.S3RecordReaderModule.prop.scanIntervalMillis", "10000");
+
+    lma.prepareDAG(app, conf);
+    LocalMode.Controller lc = lma.getController();
+    lc.setHeartbeatMonitoringEnabled(true);
+    lc.runAsync();
+    LOG.debug("Waiting for app to finish");
+    Thread.sleep(1000 * 1);
+    lc.shutdown();
+  }
+
+  private static class S3FixedWidthApplication implements StreamingApplication
+  {
+
+    public void populateDAG(DAG dag, Configuration conf)
+    {
+      S3RecordReaderModule recordReader = dag.addModule("S3RecordReaderModule", S3RecordReaderModule.class);
+      recordReader.setMode("fixed_width_record");
+      FixedWidthValidator validator = dag.addOperator("Validator", new FixedWidthValidator());
+      dag.addStream("records", recordReader.records, validator.data);
+    }
+
+  }
+
+  @After
+  public void tearDown() throws IOException
+  {
+    FileUtils.deleteDirectory(new File(inputDir));
+    deleteBucketAndContent();
+  }
+
+  public void deleteBucketAndContent()
+  {
+    //Get the list of objects
+    ObjectListing objectListing = client.listObjects(testMeta.bucketKey);
+    for (Iterator<?> iterator = objectListing.getObjectSummaries().iterator(); iterator.hasNext();) {
+      S3ObjectSummary objectSummary = (S3ObjectSummary)iterator.next();
+      LOG.info("Deleting an object: {}", objectSummary.getKey());
+      client.deleteObject(testMeta.bucketKey, objectSummary.getKey());
+    }
+    client.deleteBucket(testMeta.bucketKey);
+  }
+
+  private static Logger LOG = LoggerFactory.getLogger(S3RecordReaderModuleAppTest.class);
+}