You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by ch...@apache.org on 2016/12/22 10:30:25 UTC
apex-malhar git commit: APEXMALHAR-2303 Added S3 Record Reader module
Repository: apex-malhar
Updated Branches:
refs/heads/master 1816f78fa -> 0500e0ea4
APEXMALHAR-2303 Added S3 Record Reader module
Project: http://git-wip-us.apache.org/repos/asf/apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/apex-malhar/commit/0500e0ea
Tree: http://git-wip-us.apache.org/repos/asf/apex-malhar/tree/0500e0ea
Diff: http://git-wip-us.apache.org/repos/asf/apex-malhar/diff/0500e0ea
Branch: refs/heads/master
Commit: 0500e0ea4ef335fd653c1eb9d9cebdfb8281faea
Parents: 1816f78
Author: Ajay <aj...@gmail.com>
Authored: Thu Oct 27 18:27:28 2016 +0530
Committer: ajaygit158 <aj...@gmail.com>
Committed: Thu Dec 22 13:01:48 2016 +0530
----------------------------------------------------------------------
.../datatorrent/lib/io/block/BlockMetadata.java | 35 ++
.../datatorrent/lib/io/block/ReaderContext.java | 116 +++-
.../lib/io/fs/AbstractFileSplitter.java | 2 +-
.../com/datatorrent/lib/io/fs/FileSplitter.java | 3 +-
.../datatorrent/lib/io/fs/S3BlockReader.java | 6 +-
.../apex/malhar/lib/fs/FSRecordReader.java | 43 +-
.../malhar/lib/fs/FSRecordReaderModule.java | 20 +-
.../apex/malhar/lib/fs/s3/S3RecordReader.java | 577 +++++++++++++++++++
.../malhar/lib/fs/s3/S3RecordReaderModule.java | 137 +++++
.../lib/io/block/FSLineReaderTest.java | 61 +-
.../apex/malhar/lib/fs/FSRecordReaderTest.java | 9 +-
.../lib/fs/s3/S3DelimitedRecordReaderTest.java | 285 +++++++++
.../lib/fs/s3/S3FixedWidthRecordReaderTest.java | 292 ++++++++++
.../lib/fs/s3/S3RecordReaderMockTest.java | 269 +++++++++
.../lib/fs/s3/S3RecordReaderModuleAppTest.java | 221 +++++++
15 files changed, 2037 insertions(+), 39 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/0500e0ea/library/src/main/java/com/datatorrent/lib/io/block/BlockMetadata.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/io/block/BlockMetadata.java b/library/src/main/java/com/datatorrent/lib/io/block/BlockMetadata.java
index 25b3a75..26d19a4 100644
--- a/library/src/main/java/com/datatorrent/lib/io/block/BlockMetadata.java
+++ b/library/src/main/java/com/datatorrent/lib/io/block/BlockMetadata.java
@@ -202,6 +202,7 @@ public interface BlockMetadata
class FileBlockMetadata extends AbstractBlockMetadata
{
private final String filePath;
+ private long fileLength;
protected FileBlockMetadata()
{
@@ -216,16 +217,50 @@ public interface BlockMetadata
this.filePath = filePath;
}
+ public FileBlockMetadata(String filePath, long blockId, long offset, long length, boolean isLastBlock,
+ long previousBlockId, long fileLength)
+ {
+ super(blockId, offset, length, isLastBlock, previousBlockId);
+ this.filePath = filePath;
+ this.fileLength = fileLength;
+ }
+
public FileBlockMetadata(String filePath)
{
this.filePath = filePath;
}
+ public FileBlockMetadata(String filePath, long fileLength)
+ {
+ this.filePath = filePath;
+ this.fileLength = fileLength;
+ }
+
public String getFilePath()
{
return filePath;
}
+ /**
+ * Returns the length of the file to which this block belongs
+ *
+ * @return length of the file to which this block belongs
+ */
+ public long getFileLength()
+ {
+ return fileLength;
+ }
+
+ /**
+ * Set the length of the file to which this block belongs
+ *
+ * @param fileLength
+ */
+ public void setFileLength(long fileLength)
+ {
+ this.fileLength = fileLength;
+ }
+
public FileBlockMetadata newInstance(@NotNull String filePath)
{
Preconditions.checkNotNull(filePath);
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/0500e0ea/library/src/main/java/com/datatorrent/lib/io/block/ReaderContext.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/io/block/ReaderContext.java b/library/src/main/java/com/datatorrent/lib/io/block/ReaderContext.java
index 6fe47a2..d9e8b2e 100644
--- a/library/src/main/java/com/datatorrent/lib/io/block/ReaderContext.java
+++ b/library/src/main/java/com/datatorrent/lib/io/block/ReaderContext.java
@@ -146,19 +146,26 @@ public interface ReaderContext<STREAM extends InputStream & PositionedReadable>
{
protected int bufferSize;
+ /**
+ * overflowBufferSize is the number of bytes fetched when a record overflows
+ * to consecutive block
+ */
+ protected int overflowBufferSize;
private final transient ByteArrayOutputStream lineBuilder;
private final transient ByteArrayOutputStream emptyBuilder;
private final transient ByteArrayOutputStream tmpBuilder;
- private transient byte[] buffer;
+ protected transient byte[] buffer;
private transient String bufferStr;
private transient int posInStr;
+ private transient boolean overflowBlockRead;
public LineReaderContext()
{
super();
bufferSize = 8192;
+ overflowBufferSize = 8192;
lineBuilder = new ByteArrayOutputStream();
emptyBuilder = new ByteArrayOutputStream();
tmpBuilder = new ByteArrayOutputStream();
@@ -167,10 +174,54 @@ public interface ReaderContext<STREAM extends InputStream & PositionedReadable>
@Override
public void initialize(STREAM stream, BlockMetadata blockMetadata, boolean consecutiveBlock)
{
+ overflowBlockRead = false;
+ posInStr = 0;
+ offset = blockMetadata.getOffset();
+ super.initialize(stream, blockMetadata, consecutiveBlock);
+ }
+
+ /**
+ * Reads bytes from the stream starting from the offset into the buffer
+ *
+ * @param bytesFromCurrentOffset
+ * bytes read till now from current block
+ * @param bytesToFetch
+ * the number of bytes to be read from stream
+ * @return the number of bytes actually read, -1 if 0 bytes read
+ * @throws IOException
+ */
+ protected int readData(final long bytesFromCurrentOffset, final int bytesToFetch) throws IOException
+ {
if (buffer == null) {
- buffer = new byte[bufferSize];
+ buffer = new byte[bytesToFetch];
}
- super.initialize(stream, blockMetadata, consecutiveBlock);
+ return stream.read(offset + bytesFromCurrentOffset, buffer, 0, bytesToFetch);
+ }
+
+ /**
+ * @param usedBytesFromOffset
+ * number of bytes the pointer is ahead of the offset
+ * @return true if end of stream reached, false otherwise
+ */
+ protected boolean checkEndOfStream(final long usedBytesFromOffset)
+ {
+ if (!overflowBlockRead) {
+ return (offset - blockMetadata.getOffset() + usedBytesFromOffset < bufferSize);
+ } else {
+ return (offset - blockMetadata.getOffset() + usedBytesFromOffset < overflowBufferSize);
+ }
+ }
+
+ /**
+ * Gives the number of bytes to be fetched from the stream
+ *
+ * @param overflowBlockRead
+ * indicates whether we are reading main block or overflow block
+ * @return bytes to be fetched from stream
+ */
+ protected int calculateBytesToFetch()
+ {
+ return (overflowBlockRead ? overflowBufferSize : (bufferSize));
}
@Override
@@ -186,7 +237,9 @@ public interface ReaderContext<STREAM extends InputStream & PositionedReadable>
while (!foundEOL) {
tmpBuilder.reset();
if (posInStr == 0) {
- bytesRead = stream.read(offset + usedBytes, buffer, 0, bufferSize);
+ int bytesToFetch = calculateBytesToFetch();
+ overflowBlockRead = true;
+ bytesRead = readData(usedBytes, bytesToFetch);
if (bytesRead == -1) {
break;
}
@@ -220,14 +273,13 @@ public interface ReaderContext<STREAM extends InputStream & PositionedReadable>
usedBytes += emptyBuilder.toByteArray().length;
} else {
//end of stream reached
- if (bytesRead < bufferSize) {
+ if (checkEndOfStream(usedBytes)) {
break;
}
//read more bytes from the input stream
posInStr = 0;
}
}
- posInStr = 0;
//when end of stream is reached then bytesRead is -1
if (bytesRead == -1) {
lineBuilder.reset();
@@ -260,6 +312,47 @@ public interface ReaderContext<STREAM extends InputStream & PositionedReadable>
{
return this.bufferSize;
}
+
+ /**
+ * Sets the overflow buffer size of read.
+ *
+ * @param overflowBufferSize
+ * size of the overflow buffer
+ */
+ public void setOverflowBufferSize(int overflowBufferSize)
+ {
+ this.overflowBufferSize = overflowBufferSize;
+ }
+
+ /**
+ * @param buffer
+ * the bytes read from the source
+ */
+ protected void setBuffer(byte[] buffer)
+ {
+ this.buffer = buffer;
+ }
+
+ /**
+ * Sets whether to read overflow block during next fetch.
+ *
+ * @param overflowBlockRead
+ * boolean indicating whether to read overflow block during next read
+ */
+ public void setOverflowBlockRead(boolean overflowBlockRead)
+ {
+ this.overflowBlockRead = overflowBlockRead;
+ }
+
+ /**
+ * Returns a boolean indicating whether to read overflow block during next read
+ *
+ * @returnoverflowBlockRead
+ */
+ protected boolean isOverflowBlockRead()
+ {
+ return overflowBlockRead;
+ }
}
/**
@@ -280,7 +373,7 @@ public interface ReaderContext<STREAM extends InputStream & PositionedReadable>
super.initialize(stream, blockMetadata, consecutiveBlock);
//ignore first entity of all the blocks except the first one because those bytes
//were used during the parsing of the previous block.
- if (!consecutiveBlock && blockMetadata.getOffset() != 0) {
+ if (blockMetadata.getPreviousBlockId() != -1 && blockMetadata.getOffset() != 0) {
try {
Entity entity = readEntity();
offset += entity.usedBytes;
@@ -300,6 +393,15 @@ public interface ReaderContext<STREAM extends InputStream & PositionedReadable>
}
return null;
}
+
+ @Override
+ protected int calculateBytesToFetch()
+ {
+ /*
+ * With readAheadLineReaderContext, we always read at least one overflowBlock. Hence, fetch it in advance
+ */
+ return (this.isOverflowBlockRead() ? overflowBufferSize : (bufferSize + overflowBufferSize));
+ }
}
/**
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/0500e0ea/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileSplitter.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileSplitter.java b/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileSplitter.java
index c002c18..74addf7 100644
--- a/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileSplitter.java
+++ b/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileSplitter.java
@@ -178,7 +178,7 @@ public abstract class AbstractFileSplitter extends BaseOperator
*/
protected BlockMetadata.FileBlockMetadata createBlockMetadata(FileMetadata fileMetadata)
{
- return new BlockMetadata.FileBlockMetadata(fileMetadata.getFilePath());
+ return new BlockMetadata.FileBlockMetadata(fileMetadata.getFilePath(), fileMetadata.getFileLength());
}
/**
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/0500e0ea/library/src/main/java/com/datatorrent/lib/io/fs/FileSplitter.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/io/fs/FileSplitter.java b/library/src/main/java/com/datatorrent/lib/io/fs/FileSplitter.java
index b9594b3..0949aba 100644
--- a/library/src/main/java/com/datatorrent/lib/io/fs/FileSplitter.java
+++ b/library/src/main/java/com/datatorrent/lib/io/fs/FileSplitter.java
@@ -293,7 +293,8 @@ public class FileSplitter implements InputOperator, Operator.CheckpointListener,
FileMetadata fileMetadata, boolean isLast)
{
return new FileBlockMetadata(fileMetadata.getFilePath(), fileMetadata.getBlockIds()[blockNumber - 1], pos,
- lengthOfFileInBlock, isLast, blockNumber == 1 ? -1 : fileMetadata.getBlockIds()[blockNumber - 2]);
+ lengthOfFileInBlock, isLast, blockNumber == 1 ? -1 : fileMetadata.getBlockIds()[blockNumber - 2],
+ fileMetadata.getFileLength());
}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/0500e0ea/library/src/main/java/com/datatorrent/lib/io/fs/S3BlockReader.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/io/fs/S3BlockReader.java b/library/src/main/java/com/datatorrent/lib/io/fs/S3BlockReader.java
index 42231bb..e88191c 100644
--- a/library/src/main/java/com/datatorrent/lib/io/fs/S3BlockReader.java
+++ b/library/src/main/java/com/datatorrent/lib/io/fs/S3BlockReader.java
@@ -69,7 +69,7 @@ public class S3BlockReader extends FSSliceReader
* @param s3uri s3 uri
* @return name of the bucket
*/
- protected static String extractBucket(String s3uri)
+ public static String extractBucket(String s3uri)
{
return s3uri.substring(s3uri.indexOf('@') + 1, s3uri.indexOf("/", s3uri.indexOf('@')));
}
@@ -79,7 +79,7 @@ public class S3BlockReader extends FSSliceReader
* @param s3uri given s3 uri
* @return the accessKey
*/
- protected static String extractAccessKey(String s3uri)
+ public static String extractAccessKey(String s3uri)
{
return s3uri.substring(s3uri.indexOf("://") + 3, s3uri.indexOf(':', s3uri.indexOf("://") + 3));
}
@@ -89,7 +89,7 @@ public class S3BlockReader extends FSSliceReader
* @param s3uri given s3uri
* @return the secretAccessKey
*/
- protected static String extractSecretAccessKey(String s3uri)
+ public static String extractSecretAccessKey(String s3uri)
{
return s3uri.substring(s3uri.indexOf(':', s3uri.indexOf("://") + 1) + 1, s3uri.indexOf('@'));
}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/0500e0ea/library/src/main/java/org/apache/apex/malhar/lib/fs/FSRecordReader.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/fs/FSRecordReader.java b/library/src/main/java/org/apache/apex/malhar/lib/fs/FSRecordReader.java
index 268c51b..2d82ac4 100644
--- a/library/src/main/java/org/apache/apex/malhar/lib/fs/FSRecordReader.java
+++ b/library/src/main/java/org/apache/apex/malhar/lib/fs/FSRecordReader.java
@@ -20,6 +20,9 @@
package org.apache.apex.malhar.lib.fs;
import java.io.IOException;
+
+import javax.validation.constraints.Pattern;
+
import org.apache.hadoop.fs.FSDataInputStream;
import com.datatorrent.api.Context.OperatorContext;
@@ -72,15 +75,36 @@ public class FSRecordReader extends FSSliceReader
{
super.setup(context);
if (mode == RECORD_READER_MODE.FIXED_WIDTH_RECORD) {
- ReaderContext.FixedBytesReaderContext<FSDataInputStream> fixedBytesReaderContext = new ReaderContext.FixedBytesReaderContext<FSDataInputStream>();
- fixedBytesReaderContext.setLength(recordLength);
- readerContext = fixedBytesReaderContext;
+ readerContext = createFixedWidthReaderContext();
} else {
- readerContext = new ReaderContext.ReadAheadLineReaderContext<FSDataInputStream>();
+ readerContext = createDelimitedReaderContext();
}
}
/**
+ * Creates a recordReaderContext for FixedWidthRecords
+ *
+ * @return FixedBytesReaderContext
+ */
+ protected ReaderContext<FSDataInputStream> createFixedWidthReaderContext()
+ {
+ ReaderContext.FixedBytesReaderContext<FSDataInputStream> fixedBytesReaderContext = new ReaderContext.FixedBytesReaderContext<FSDataInputStream>();
+ fixedBytesReaderContext.setLength(recordLength);
+ return fixedBytesReaderContext;
+
+ }
+
+ /**
+ * Creates a recordReaderContext for Delimited Records
+ *
+ * @return DelimitedRecordReaderContext
+ */
+ protected ReaderContext<FSDataInputStream> createDelimitedReaderContext()
+ {
+ return new ReaderContext.ReadAheadLineReaderContext<FSDataInputStream>();
+ }
+
+ /**
* Read the block data and emit records based on reader context
*
* @param blockMetadata
@@ -105,14 +129,15 @@ public class FSRecordReader extends FSSliceReader
}
/**
- * Criteria for record split
+ * Criteria for record split : FIXED_WIDTH_RECORD or DELIMITED_RECORD
*
* @param mode
* Mode
*/
- public void setMode(RECORD_READER_MODE mode)
+ public void setMode(
+ @Pattern(regexp = "FIXED_WIDTH_RECORD|DELIMITED_RECORD", flags = Pattern.Flag.CASE_INSENSITIVE) String mode)
{
- this.mode = mode;
+ this.mode = RECORD_READER_MODE.valueOf(mode.toUpperCase());
}
/**
@@ -120,9 +145,9 @@ public class FSRecordReader extends FSSliceReader
*
* @return mode
*/
- public RECORD_READER_MODE getMode()
+ public String getMode()
{
- return mode;
+ return mode.toString();
}
/**
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/0500e0ea/library/src/main/java/org/apache/apex/malhar/lib/fs/FSRecordReaderModule.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/fs/FSRecordReaderModule.java b/library/src/main/java/org/apache/apex/malhar/lib/fs/FSRecordReaderModule.java
index b727248..65f7e5f 100644
--- a/library/src/main/java/org/apache/apex/malhar/lib/fs/FSRecordReaderModule.java
+++ b/library/src/main/java/org/apache/apex/malhar/lib/fs/FSRecordReaderModule.java
@@ -21,6 +21,7 @@ package org.apache.apex.malhar.lib.fs;
import javax.validation.constraints.Min;
import javax.validation.constraints.NotNull;
+import javax.validation.constraints.Pattern;
import javax.validation.constraints.Size;
import org.apache.apex.malhar.lib.fs.FSRecordReader.RECORD_READER_MODE;
@@ -82,7 +83,7 @@ public class FSRecordReaderModule implements Module
/**
* Criteria for record split
*/
- private RECORD_READER_MODE mode = RECORD_READER_MODE.DELIMITED_RECORD;
+ private String mode = RECORD_READER_MODE.DELIMITED_RECORD.toString();
/**
* Length for fixed width record
@@ -370,24 +371,25 @@ public class FSRecordReaderModule implements Module
}
/**
- * Criteria for record split
+ * Criteria for record split : FIXED_WIDTH_RECORD or DELIMITED_RECORD
*
- * @return mode
+ * @param mode
+ * Mode
*/
- public RECORD_READER_MODE getMode()
+ public void setMode(
+ @Pattern(regexp = "FIXED_WIDTH_RECORD|DELIMITED_RECORD", flags = Pattern.Flag.CASE_INSENSITIVE) String mode)
{
- return mode;
+ this.mode = mode;
}
/**
* Criteria for record split
*
- * @param mode
- * Mode
+ * @return mode
*/
- public void setMode(RECORD_READER_MODE mode)
+ public String getMode()
{
- this.mode = mode;
+ return mode;
}
/**
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/0500e0ea/library/src/main/java/org/apache/apex/malhar/lib/fs/s3/S3RecordReader.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/fs/s3/S3RecordReader.java b/library/src/main/java/org/apache/apex/malhar/lib/fs/s3/S3RecordReader.java
new file mode 100644
index 0000000..9de2896
--- /dev/null
+++ b/library/src/main/java/org/apache/apex/malhar/lib/fs/s3/S3RecordReader.java
@@ -0,0 +1,577 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.apex.malhar.lib.fs.s3;
+
+import java.io.IOException;
+import java.util.Arrays;
+
+import org.apache.apex.malhar.lib.fs.FSRecordReader;
+import org.apache.hadoop.classification.InterfaceStability.Evolving;
+import org.apache.hadoop.fs.FSDataInputStream;
+
+import com.amazonaws.auth.BasicAWSCredentials;
+import com.amazonaws.services.s3.AmazonS3;
+import com.amazonaws.services.s3.AmazonS3Client;
+import com.amazonaws.services.s3.model.GetObjectRequest;
+import com.amazonaws.services.s3.model.S3Object;
+import com.amazonaws.services.s3.model.S3ObjectInputStream;
+import com.esotericsoftware.kryo.NotNull;
+import com.google.common.base.Preconditions;
+import com.google.common.io.ByteStreams;
+
+import com.datatorrent.lib.io.block.BlockMetadata;
+import com.datatorrent.lib.io.block.ReaderContext;
+
+/**
+ * This operator can be used for reading records/tuples from S3 in parallel
+ * (without ordering guarantees between tuples). Records can be delimited (e.g.
+ * newline) or fixed width records. Output tuples are byte[].
+ *
+ * Typically, this operator will be connected to output of FileSplitterInput to
+ * read records in parallel.
+ */
+@Evolving
+public class S3RecordReader extends FSRecordReader
+{
+ private String endPoint;
+ @NotNull
+ private String bucketName;
+ @NotNull
+ private String accessKey;
+ @NotNull
+ private String secretAccessKey;
+ private int overflowBufferSize;
+
+ public S3RecordReader()
+ {
+ /*
+ * Set default overflowBufferSize to 1MB
+ */
+ overflowBufferSize = 1024 * 1024;
+ }
+
+ /**
+ * S3 reader doesn't make use of any stream, hence returns a null value
+ *
+ * @param block
+ * block metadata
+ * @return stream (null object)
+ * @throws IOException
+ */
+ @Override
+ protected FSDataInputStream setupStream(BlockMetadata.FileBlockMetadata block) throws IOException
+ {
+ return null;
+ }
+
+ /**
+ * Returns an instance of S3FixedWidthRecordReaderContext after setting
+ * recordLength and bucketName and initializing s3Client
+ *
+ * @return S3DelimitedRecordReaderContext
+ */
+ @Override
+ protected ReaderContext<FSDataInputStream> createFixedWidthReaderContext()
+ {
+ S3FixedWidthRecordReaderContext fixedBytesReaderContext = new S3FixedWidthRecordReaderContext();
+ fixedBytesReaderContext.setLength(this.getRecordLength());
+ fixedBytesReaderContext.getS3Params().initializeS3Client(accessKey, secretAccessKey, endPoint);
+ fixedBytesReaderContext.getS3Params().setBucketName(bucketName);
+ return fixedBytesReaderContext;
+ }
+
+ /**
+ * Returns an instance of S3DelimitedRecordReaderContext after setting
+ * bucketName and overflowBuffersize and initializing the s3Client
+ *
+ * @return S3DelimitedRecordReaderContext
+ */
+ @Override
+ protected ReaderContext<FSDataInputStream> createDelimitedReaderContext()
+ {
+ S3DelimitedRecordReaderContext delimitedRecordReaderContext = new S3DelimitedRecordReaderContext();
+ delimitedRecordReaderContext.getS3Params().initializeS3Client(accessKey, secretAccessKey, endPoint);
+ delimitedRecordReaderContext.getS3Params().setBucketName(bucketName);
+ delimitedRecordReaderContext.setOverflowBufferSize(overflowBufferSize);
+ return delimitedRecordReaderContext;
+ }
+
+ /**
+ * S3RecordReaderParams is used to hold the common parameters used by the
+ * DelimitedRecordReaderContext and FixedWidthReacordReaderContext for S3
+ */
+ protected static class S3RecordReaderParams
+ {
+ /**
+ * Amazon client used to read bytes from S3
+ */
+ private AmazonS3 s3Client;
+ /**
+ * S3 bucket name
+ */
+ private String bucketName;
+ /**
+ * path of file being processed in bucket
+ */
+ private String filePath;
+ /**
+ * length of the file being processed
+ */
+ private long fileLength;
+
+ /**
+ * Initialize the AmazonS3 client using the accessKey, secretAccessKey, sets
+ * endpoint for the s3Client if provided
+ *
+ * @param accessKey
+ * @param secretAccessKey
+ * @param endPoint
+ */
+ public void initializeS3Client(@javax.validation.constraints.NotNull String accessKey,
+ @javax.validation.constraints.NotNull String secretAccessKey, String endPoint)
+ {
+ Preconditions.checkNotNull(accessKey);
+ Preconditions.checkNotNull(secretAccessKey);
+ s3Client = new AmazonS3Client(new BasicAWSCredentials(accessKey, secretAccessKey));
+ if (endPoint != null) {
+ s3Client.setEndpoint(endPoint);
+ }
+ }
+
+ /**
+ * Set the AmazonS3 service
+ *
+ * @param s3Client
+ * given s3Client
+ */
+ public void setS3Client(@javax.validation.constraints.NotNull AmazonS3 s3Client)
+ {
+ Preconditions.checkNotNull(s3Client);
+ this.s3Client = s3Client;
+ }
+
+ /**
+ * Returns the AmazonS3 service
+ *
+ * @return s3Client
+ */
+ public AmazonS3 getS3Client()
+ {
+ return s3Client;
+ }
+
+ /**
+ * Set the bucket name
+ *
+ * @param bucketName
+ * given bucketName
+ */
+ public void setBucketName(@javax.validation.constraints.NotNull String bucketName)
+ {
+ Preconditions.checkNotNull(bucketName);
+ this.bucketName = bucketName;
+ }
+
+ /**
+ * Returns the bucket name
+ *
+ * @return bucketName
+ */
+ public String getBucketName()
+ {
+ return bucketName;
+ }
+
+ /**
+ * Returns the file path
+ *
+ * @return filePath
+ */
+ public String getFilePath()
+ {
+ return filePath;
+ }
+
+ /**
+ * Returns the length of the file to which the block belongs
+ *
+ * @return fileLength
+ */
+ public long getFileLength()
+ {
+ return fileLength;
+ }
+
+ /**
+ * This method reads the blockMetadata input parameter and initializes the
+ * fileBlock and fileLength
+ *
+ * @param blockMetadata
+ */
+ public void initialzeFilepathAndFileLength(BlockMetadata blockMetadata)
+ {
+ if (blockMetadata instanceof BlockMetadata.FileBlockMetadata) {
+ BlockMetadata.FileBlockMetadata fileBlockMetadata = (BlockMetadata.FileBlockMetadata)blockMetadata;
+ fileLength = fileBlockMetadata.getFileLength();
+ filePath = fileBlockMetadata.getFilePath();
+ // File path would be the path after bucket name.
+ // Check if the file path starts with "/"
+ if (filePath.startsWith("/")) {
+ filePath = filePath.substring(1);
+ }
+ }
+ }
+ }
+
+ /**
+ * RecordReaderContext for reading delimited S3 Records.
+ */
+ protected static class S3DelimitedRecordReaderContext
+ extends ReaderContext.ReadAheadLineReaderContext<FSDataInputStream>
+ {
+ /**
+ * S3 parameters
+ */
+ private transient S3RecordReaderParams s3Params;
+
+ public S3DelimitedRecordReaderContext()
+ {
+ s3Params = new S3RecordReaderParams();
+ }
+
+ @Override
+ public void initialize(FSDataInputStream stream, BlockMetadata blockMetadata, boolean consecutiveBlock)
+ {
+ super.initialize(stream, blockMetadata, consecutiveBlock);
+ s3Params.initialzeFilepathAndFileLength(blockMetadata);
+ /*
+ * Initialize the bufferSize and overflowBufferSize
+ */
+ int bufferSize = Long.valueOf(blockMetadata.getLength() - blockMetadata.getOffset()).intValue();
+ this.setBufferSize(bufferSize);
+ if (overflowBufferSize > bufferSize) {
+ this.setOverflowBufferSize(bufferSize);
+ } else {
+ this.setOverflowBufferSize(overflowBufferSize);
+ }
+ }
+
+ /**
+ * S3 block read would be achieved through the AmazonS3 client. Following
+ * are the steps to achieve: (1) Create the objectRequest from bucketName
+ * and filePath. (2) Set the range to the above created objectRequest. (3)
+ * Get the object portion through AmazonS3 client API. (4) Get the object
+ * content from the above object portion.
+ *
+ * @param bytesFromCurrentOffset
+ * bytes read till now from current offset
+ * @param bytesToFetch
+ * the number of bytes to be fetched
+ * @return the number of bytes read, -1 if 0 bytes read
+ * @throws IOException
+ */
+
+ @Override
+ protected int readData(final long bytesFromCurrentOffset, final int bytesToFetch) throws IOException
+ {
+ GetObjectRequest rangeObjectRequest = new GetObjectRequest(s3Params.bucketName, s3Params.filePath);
+ rangeObjectRequest.setRange(offset + bytesFromCurrentOffset, offset + bytesFromCurrentOffset + bytesToFetch - 1);
+ S3Object objectPortion = s3Params.s3Client.getObject(rangeObjectRequest);
+ S3ObjectInputStream wrappedStream = objectPortion.getObjectContent();
+ buffer = ByteStreams.toByteArray(wrappedStream);
+ wrappedStream.close();
+ int bufferLength = buffer.length;
+ if (bufferLength <= 0) {
+ return -1;
+ }
+ return bufferLength;
+ }
+
+ @Override
+ protected boolean checkEndOfStream(final long usedBytesFromOffset)
+ {
+ if ((offset + usedBytesFromOffset) >= s3Params.fileLength) {
+ return true;
+ }
+ return false;
+ }
+
+ /**
+ * Returns the S3RecordReaderParams object
+ *
+ * @return s3Params
+ */
+ protected S3RecordReaderParams getS3Params()
+ {
+ return s3Params;
+ }
+ }
+
+ /**
+ * RecordReaderContext for reading fixed width S3 Records.
+ */
+ protected static class S3FixedWidthRecordReaderContext
+ extends ReaderContext.FixedBytesReaderContext<FSDataInputStream>
+ {
+ /**
+ * S3 parameters
+ */
+ private transient S3RecordReaderParams s3Params;
+
+ /**
+ * used to hold data retrieved from S3
+ */
+ protected transient byte[] buffer;
+
+ /**
+ * current offset within the byte[] buffer
+ */
+ private transient int bufferOffset;
+
+ public S3FixedWidthRecordReaderContext()
+ {
+ s3Params = new S3RecordReaderParams();
+ }
+
+ @Override
+ public void initialize(FSDataInputStream stream, BlockMetadata blockMetadata, boolean consecutiveBlock)
+ {
+ super.initialize(stream, blockMetadata, consecutiveBlock);
+ s3Params.initialzeFilepathAndFileLength(blockMetadata);
+ try {
+ int bytesRead = this.getBlockFromS3();
+ if (bytesRead == -1) {
+ return;
+ }
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ this.setBufferOffset(0);
+ }
+
+ /**
+ * S3 block read would be achieved through the AmazonS3 client. Following
+ * are the steps to achieve: (1) Create the objectRequest from bucketName
+ * and filePath. (2) Set the range to the above created objectRequest. Set
+ * the range so that it gets aligned with the fixed width records. (3) Get
+ * the object portion through AmazonS3 client API. (4) Get the object
+ * content from the above object portion.
+ */
+ protected int getBlockFromS3() throws IOException
+ {
+ long startOffset = blockMetadata.getOffset()
+ + (this.length - (blockMetadata.getOffset() % this.length)) % this.length;
+ long endOffset = blockMetadata.getLength()
+ + ((this.length - (blockMetadata.getLength() % this.length)) % this.length) - 1;
+ if (endOffset == (startOffset - 1) || startOffset > s3Params.fileLength) {
+ /*
+ * If start and end offset is same, it means no record starts in this block
+ */
+ return -1;
+ }
+ if (endOffset >= s3Params.fileLength) {
+ endOffset = s3Params.fileLength - 1;
+ }
+ offset = startOffset;
+ return readData(startOffset, endOffset);
+ }
+
+ /**
+ * Reads data from S3 starting from startOffset till the endOffset and
+ * returns the number of bytes read
+ *
+ * @param startOffset
+ * offset from where to read
+ * @param endOffset
+ * offset till where to read
+ * @return number of bytes read
+ * @throws IOException
+ */
+ protected int readData(long startOffset, long endOffset) throws IOException
+ {
+ GetObjectRequest rangeObjectRequest = new GetObjectRequest(s3Params.bucketName, s3Params.filePath);
+ rangeObjectRequest.setRange(startOffset, endOffset);
+ S3Object objectPortion = s3Params.s3Client.getObject(rangeObjectRequest);
+ S3ObjectInputStream wrappedStream = objectPortion.getObjectContent();
+ buffer = ByteStreams.toByteArray(wrappedStream);
+ wrappedStream.close();
+ return buffer.length;
+ }
+
+ @Override
+ protected ReaderContext.Entity readEntity() throws IOException
+ {
+ entity.clear();
+ /*
+ * In case file length is not a multiple of record length, the last record may not have length = recordLength.
+ * The data to be read from buffer array should be less in this case.
+ */
+ long bufferLength = length;
+ if (offset + length > s3Params.fileLength) {
+ bufferLength = s3Params.fileLength - offset;
+ }
+ byte[] record = Arrays.copyOfRange(buffer, Long.valueOf(bufferOffset).intValue(),
+ Long.valueOf(bufferOffset + bufferLength).intValue());
+ bufferOffset += record.length;
+ entity.setRecord(record);
+ entity.setUsedBytes(record.length);
+ return entity;
+ }
+
+ /**
+ * Sets the offset within the current buffer
+ *
+ * @param bufferOffset
+ * offset within the current buffer
+ */
+ protected void setBufferOffset(int bufferOffset)
+ {
+ this.bufferOffset = bufferOffset;
+ }
+
+ /**
+ * Sets the S3RecordReaderParams object
+ *
+ * @param s3Params
+ * S3RecordReaderParams object
+ */
+ protected void setS3Params(S3RecordReaderParams s3Params)
+ {
+ this.s3Params = s3Params;
+ }
+
+ /**
+ * Returns the S3RecordReaderParams object
+ *
+ * @return s3Params
+ */
+ protected S3RecordReaderParams getS3Params()
+ {
+ return s3Params;
+ }
+ }
+
+ /**
+ * Size of bytes to be retrieved when a record overflows
+ *
+ * return overflowBufferSize
+ */
+ public int getOverflowBufferSize()
+ {
+ return overflowBufferSize;
+ }
+
+ /**
+ * Size of bytes to be retrieved when a record overflows
+ *
+ * @param overflowBufferSize
+ */
+ public void setOverflowBufferSize(int overflowBufferSize)
+ {
+ this.overflowBufferSize = overflowBufferSize;
+ }
+
+ /**
+ * Get the S3 bucket name
+ *
+ * @return bucket
+ */
+ public String getBucketName()
+ {
+ return bucketName;
+ }
+
+ /**
+ * Set the bucket name where the file resides
+ *
+ * @param bucketName
+ * bucket name
+ */
+ public void setBucketName(@javax.validation.constraints.NotNull String bucketName)
+ {
+ Preconditions.checkNotNull(bucketName);
+ this.bucketName = bucketName;
+ }
+
+ /**
+ * Return the access key
+ *
+ * @return the accessKey
+ */
+ public String getAccessKey()
+ {
+ return accessKey;
+ }
+
+ /**
+ * Set the access key
+ *
+ * @param accessKey
+ * given accessKey
+ */
+ public void setAccessKey(@javax.validation.constraints.NotNull String accessKey)
+ {
+ Preconditions.checkNotNull(accessKey);
+ this.accessKey = accessKey;
+ }
+
+ /**
+ * Return the secretAccessKey
+ *
+ * @return the secretAccessKey
+ */
+ public String getSecretAccessKey()
+ {
+ return secretAccessKey;
+ }
+
+ /**
+ * Set the secretAccessKey
+ *
+ * @param secretAccessKey
+ * secretAccessKey
+ */
+ public void setSecretAccessKey(@javax.validation.constraints.NotNull String secretAccessKey)
+ {
+ Preconditions.checkNotNull(secretAccessKey);
+ this.secretAccessKey = secretAccessKey;
+ }
+
+ /**
+ * S3 endpoint
+ *
+ * @param endPoint
+ * endpoint to be used for S3
+ */
+ public void setEndPoint(String endPoint)
+ {
+ this.endPoint = endPoint;
+ }
+
+ /**
+ * S3 endpoint
+ *
+ * @return s3 endpoint
+ */
+ public String getEndPoint()
+ {
+ return endPoint;
+ }
+}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/0500e0ea/library/src/main/java/org/apache/apex/malhar/lib/fs/s3/S3RecordReaderModule.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/fs/s3/S3RecordReaderModule.java b/library/src/main/java/org/apache/apex/malhar/lib/fs/s3/S3RecordReaderModule.java
new file mode 100644
index 0000000..884073c
--- /dev/null
+++ b/library/src/main/java/org/apache/apex/malhar/lib/fs/s3/S3RecordReaderModule.java
@@ -0,0 +1,137 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.apex.malhar.lib.fs.s3;
+
+import javax.validation.constraints.Min;
+
+import org.apache.apex.malhar.lib.fs.FSRecordReaderModule;
+
+import com.datatorrent.lib.io.fs.S3BlockReader;
+
+/**
+ * This module is used for reading records/tuples from S3. Records can be read
+ * in parallel using multiple partitions of record reader operator. (Ordering is
+ * not guaranteed when records are read in parallel)
+ *
+ * Input S3 directory is scanned at specified interval to poll for new data.
+ *
+ * The module reads data in parallel, following parameters can be configured
+ * <br/>
+ * 1. files: List of file(s)/directories to read. files would be in the form of
+ * SCHEME://AccessKey:SecretKey@BucketName/FileOrDirectory ,
+ * SCHEME://AccessKey:SecretKey@BucketName/FileOrDirectory , .... where SCHEME
+ * is the protocal scheme for the file system. AccessKey is the AWS access key
+ * and SecretKey is the AWS Secret Key<br/>
+ * 2. filePatternRegularExp: Files with names matching given regex will be read
+ * <br/>
+ * 3. scanIntervalMillis: interval between two scans to discover new files in
+ * input directory<br/>
+ * 4. recursive: if true, scan input directories recursively<br/>
+ * 5. blockSize: block size used to read input blocks of file, default value
+ * 64MB<br/>
+ * 6. overflowBlockSize: For delimited records, this value represents the
+ * additional data that needs to be read to find the delimiter character for
+ * last record in a block. This should be set to approximate record size in the
+ * file, default value is 1MB<br/>
+ * 7. sequentialFileRead: if true, then each reader partition will read
+ * different file. <br/>
+ * instead of reading different offsets of the same file. <br/>
+ * (File level parallelism instead of block level parallelism)<br/>
+ * 8. blocksThreshold: number of blocks emitted per window<br/>
+ * 9. minReaders: Minimum number of block readers for dynamic partitioning<br/>
+ * 10. maxReaders: Maximum number of block readers for dynamic partitioning<br/>
+ * 11. repartitionCheckInterval: Interval for re-evaluating dynamic
+ * partitioning<br/>
+ * different file. <br/>
+ * 12. s3EndPoint: Optional parameter used to specify S3 endpoint to use
+ */
+@org.apache.hadoop.classification.InterfaceStability.Evolving
+public class S3RecordReaderModule extends FSRecordReaderModule
+{
+ /**
+ * Endpoint for S3
+ */
+ private String s3EndPoint;
+ @Min(0)
+ private int overflowBlockSize;
+
+ /**
+ * Creates an instance of Record Reader
+ *
+ * @return S3RecordReader instance
+ */
+ @Override
+ public S3RecordReader createRecordReader()
+ {
+ S3RecordReader s3RecordReader = new S3RecordReader();
+ s3RecordReader.setBucketName(S3BlockReader.extractBucket(getFiles()));
+ s3RecordReader.setAccessKey(S3BlockReader.extractAccessKey(getFiles()));
+ s3RecordReader.setSecretAccessKey(S3BlockReader.extractSecretAccessKey(getFiles()));
+ s3RecordReader.setEndPoint(s3EndPoint);
+ s3RecordReader.setMode(this.getMode());
+ s3RecordReader.setRecordLength(this.getRecordLength());
+ if (overflowBlockSize != 0) {
+ s3RecordReader.setOverflowBufferSize(overflowBlockSize);
+ }
+ return s3RecordReader;
+ }
+
+ /**
+ * Set the S3 endpoint to use
+ *
+ * @param s3EndPoint
+ */
+ public void setS3EndPoint(String s3EndPoint)
+ {
+ this.s3EndPoint = s3EndPoint;
+ }
+
+ /**
+ * Returns the s3 endpoint
+ *
+ * @return s3EndPoint
+ */
+ public String getS3EndPoint()
+ {
+ return s3EndPoint;
+ }
+
+ /**
+ * additional data that needs to be read to find the delimiter character for
+ * last record in a block. This should be set to approximate record size in
+ * the file, default value 1MB
+ *
+ * @param overflowBlockSize
+ */
+ public void setOverflowBlockSize(int overflowBlockSize)
+ {
+ this.overflowBlockSize = overflowBlockSize;
+ }
+
+ /**
+ * returns the overflow block size
+ *
+ * @return overflowBlockSize
+ */
+ public int getOverflowBlockSize()
+ {
+ return overflowBlockSize;
+ }
+}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/0500e0ea/library/src/test/java/com/datatorrent/lib/io/block/FSLineReaderTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/com/datatorrent/lib/io/block/FSLineReaderTest.java b/library/src/test/java/com/datatorrent/lib/io/block/FSLineReaderTest.java
index 5ddc8a9..2fbb9e7 100644
--- a/library/src/test/java/com/datatorrent/lib/io/block/FSLineReaderTest.java
+++ b/library/src/test/java/com/datatorrent/lib/io/block/FSLineReaderTest.java
@@ -24,6 +24,8 @@ import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStreamReader;
import java.util.Arrays;
+import java.util.Collections;
+import java.util.Comparator;
import java.util.List;
import java.util.regex.Pattern;
@@ -166,18 +168,47 @@ public class FSLineReaderTest
testMeta.blockReader.beginWindow(1);
- for (int i = 0; i < noOfBlocks; i++) {
- BlockMetadata.FileBlockMetadata blockMetadata = new BlockMetadata.FileBlockMetadata(
- testMeta.dataFile.getAbsolutePath(), i,
- i * blockSize, i == noOfBlocks - 1 ? testMeta.dataFile.length() : (i + 1) * blockSize, i == noOfBlocks - 1,
- -1);
- testMeta.blockReader.blocksMetadataInput.process(blockMetadata);
+ for (int i = 0; i < 10; i++) {
+ for (int j = 0; j < Math.ceil(noOfBlocks / 10.0); j++) {
+ int blockNo = 10 * j + i;
+ if (blockNo >= noOfBlocks) {
+ continue;
+ }
+ BlockMetadata.FileBlockMetadata blockMetadata = new BlockMetadata.FileBlockMetadata(
+ testMeta.dataFile.getAbsolutePath(), blockNo, blockNo * blockSize,
+ blockNo == noOfBlocks - 1 ? testMeta.dataFile.length() : (blockNo + 1) * blockSize,
+ blockNo == noOfBlocks - 1, blockNo - 1);
+ testMeta.blockReader.blocksMetadataInput.process(blockMetadata);
+ }
}
testMeta.blockReader.endWindow();
List<Object> messages = testMeta.messageSink.collectedTuples;
Assert.assertEquals("No of records", testMeta.messages.size(), messages.size());
+
+ Collections.sort(testMeta.messages, new Comparator<String[]>()
+ {
+ @Override
+ public int compare(String[] rec1, String[] rec2)
+ {
+ return compareStringArrayRecords(rec1, rec2);
+ }
+ });
+
+ Collections.sort(messages, new Comparator<Object>()
+ {
+ @Override
+ public int compare(Object object1, Object object2)
+ {
+ @SuppressWarnings("unchecked")
+ String[] rec1 = ((AbstractBlockReader.ReaderRecord<String>)object1).getRecord().split(",");
+ @SuppressWarnings("unchecked")
+ String[] rec2 = ((AbstractBlockReader.ReaderRecord<String>)object2).getRecord().split(",");
+ return compareStringArrayRecords(rec1, rec2);
+ }
+ });
+
for (int i = 0; i < messages.size(); i++) {
@SuppressWarnings("unchecked")
AbstractBlockReader.ReaderRecord<String> msg = (AbstractBlockReader.ReaderRecord<String>)messages.get(i);
@@ -198,6 +229,24 @@ public class FSLineReaderTest
}
}
+ /**
+ * Utility function to compare lexicographically 2 records of string arrays
+ *
+ * @param rec1
+ * @param rec2
+ * @return negative if rec1 < rec2, positive if rec1 > rec2, 0 otherwise
+ */
+ private int compareStringArrayRecords(String[] rec1, String[] rec2)
+ {
+ for (int i = 0; i < rec1.length && i < rec2.length; i++) {
+ if (rec1[i].equals(rec2[i])) {
+ continue;
+ }
+ return rec1[i].compareTo(rec2[i]);
+ }
+ return 0;
+ }
+
@SuppressWarnings("unused")
private static final Logger LOG = LoggerFactory.getLogger(FSLineReaderTest.class);
}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/0500e0ea/library/src/test/java/org/apache/apex/malhar/lib/fs/FSRecordReaderTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/org/apache/apex/malhar/lib/fs/FSRecordReaderTest.java b/library/src/test/java/org/apache/apex/malhar/lib/fs/FSRecordReaderTest.java
index 8560228..642621d 100644
--- a/library/src/test/java/org/apache/apex/malhar/lib/fs/FSRecordReaderTest.java
+++ b/library/src/test/java/org/apache/apex/malhar/lib/fs/FSRecordReaderTest.java
@@ -32,7 +32,6 @@ import org.junit.rules.TestWatcher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.apache.apex.malhar.lib.fs.FSRecordReader.RECORD_READER_MODE;
import org.apache.commons.io.FileUtils;
import org.apache.hadoop.conf.Configuration;
@@ -118,6 +117,10 @@ public class FSRecordReaderTest
}
};
+ public static Set<String> getRecords()
+ {
+ return records;
+ }
}
private static class DelimitedApplication implements StreamingApplication
@@ -126,7 +129,7 @@ public class FSRecordReaderTest
public void populateDAG(DAG dag, Configuration conf)
{
FSRecordReaderModule recordReader = dag.addModule("HDFSRecordReaderModule", FSRecordReaderModule.class);
- recordReader.setMode(RECORD_READER_MODE.DELIMITED_RECORD);
+ recordReader.setMode("delimited_record");
DelimitedValidator validator = dag.addOperator("Validator", new DelimitedValidator());
dag.addStream("records", recordReader.records, validator.data);
}
@@ -206,7 +209,7 @@ public class FSRecordReaderTest
public void populateDAG(DAG dag, Configuration conf)
{
FSRecordReaderModule recordReader = dag.addModule("HDFSRecordReaderModule", FSRecordReaderModule.class);
- recordReader.setMode(RECORD_READER_MODE.FIXED_WIDTH_RECORD);
+ recordReader.setMode("FIXED_WIDTH_RECORD");
FixedWidthValidator validator = dag.addOperator("Validator", new FixedWidthValidator());
dag.addStream("records", recordReader.records, validator.data);
}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/0500e0ea/library/src/test/java/org/apache/apex/malhar/lib/fs/s3/S3DelimitedRecordReaderTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/org/apache/apex/malhar/lib/fs/s3/S3DelimitedRecordReaderTest.java b/library/src/test/java/org/apache/apex/malhar/lib/fs/s3/S3DelimitedRecordReaderTest.java
new file mode 100644
index 0000000..a85e2c8
--- /dev/null
+++ b/library/src/test/java/org/apache/apex/malhar/lib/fs/s3/S3DelimitedRecordReaderTest.java
@@ -0,0 +1,285 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.apex.malhar.lib.fs.s3;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.List;
+
+import org.junit.Assert;
+import org.junit.Ignore;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TestWatcher;
+import org.junit.runner.Description;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.amazonaws.auth.BasicAWSCredentials;
+import com.amazonaws.services.s3.AmazonS3;
+import com.amazonaws.services.s3.AmazonS3Client;
+import com.amazonaws.services.s3.model.ObjectListing;
+import com.amazonaws.services.s3.model.PutObjectRequest;
+import com.amazonaws.services.s3.model.S3ObjectSummary;
+import com.google.common.collect.Lists;
+
+import com.datatorrent.api.Attribute;
+import com.datatorrent.api.Context;
+import com.datatorrent.api.DAG;
+import com.datatorrent.lib.helper.OperatorContextTestHelper;
+import com.datatorrent.lib.io.block.AbstractFSBlockReader;
+import com.datatorrent.lib.io.block.BlockMetadata;
+import com.datatorrent.lib.testbench.CollectorTestSink;
+import com.datatorrent.netlet.util.Slice;
+
+@Ignore
+public class S3DelimitedRecordReaderTest
+{
+ private final String accessKey = "*************";
+ private final String secretKey = "*********************";
+ private static final int overflowBufferSize = 123;
+ private static final String FILE_1 = "file1.txt";
+ private static final String s3Directory = "input/";
+
+ AbstractFSBlockReader<Slice> getBlockReader(String bucketKey)
+ {
+ S3RecordReader blockReader = new S3RecordReader();
+
+ blockReader.setAccessKey(accessKey);
+ blockReader.setSecretAccessKey(secretKey);
+ blockReader.setBucketName(bucketKey);
+ blockReader.setOverflowBufferSize(overflowBufferSize);
+ return blockReader;
+ }
+
+ class TestMeta extends TestWatcher
+ {
+ private Context.OperatorContext readerContext;
+ private AbstractFSBlockReader<Slice> blockReader;
+ private CollectorTestSink<Object> blockMetadataSink;
+ private CollectorTestSink<Object> messageSink;
+ private List<String[]> messages = Lists.newArrayList();
+ private String appId;
+ private String dataFilePath;
+ private File dataFile;
+ private String bucketKey;
+ private AmazonS3 client;
+
+ @Override
+ protected void starting(org.junit.runner.Description description)
+ {
+ dataFilePath = "src/test/resources/reader_test_data.csv";
+ dataFile = new File(dataFilePath);
+ bucketKey = new String("target-" + description.getMethodName()).toLowerCase();
+
+ client = new AmazonS3Client(new BasicAWSCredentials(accessKey, secretKey));
+ client.createBucket(bucketKey);
+
+ client.putObject(new PutObjectRequest(bucketKey, s3Directory + FILE_1, dataFile));
+
+ appId = Long.toHexString(System.currentTimeMillis());
+ blockReader = getBlockReader(bucketKey);
+
+ Attribute.AttributeMap.DefaultAttributeMap readerAttr = new Attribute.AttributeMap.DefaultAttributeMap();
+ readerAttr.put(DAG.APPLICATION_ID, appId);
+ readerAttr.put(Context.OperatorContext.SPIN_MILLIS, 10);
+ readerContext = new OperatorContextTestHelper.TestIdOperatorContext(1, readerAttr);
+
+ blockReader.setup(readerContext);
+
+ messageSink = new CollectorTestSink<>();
+ ((S3RecordReader)blockReader).records.setSink(messageSink);
+
+ blockMetadataSink = new CollectorTestSink<>();
+ blockReader.blocksMetadataOutput.setSink(blockMetadataSink);
+
+ BufferedReader reader;
+ try {
+ reader = new BufferedReader(new InputStreamReader(new FileInputStream(dataFile.getAbsolutePath())));
+ String line;
+ while ((line = reader.readLine()) != null) {
+ messages.add(line.split(","));
+ }
+ reader.close();
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ protected void finished(Description description)
+ {
+ deleteBucketAndContent();
+ blockReader.teardown();
+ }
+
+ public void deleteBucketAndContent()
+ {
+ //Get the list of objects
+ ObjectListing objectListing = client.listObjects(bucketKey);
+ for (Iterator<?> iterator = objectListing.getObjectSummaries().iterator(); iterator.hasNext();) {
+ S3ObjectSummary objectSummary = (S3ObjectSummary)iterator.next();
+ LOG.info("Deleting an object: {}", objectSummary.getKey());
+ client.deleteObject(bucketKey, objectSummary.getKey());
+ }
+ client.deleteBucket(bucketKey);
+ }
+ }
+
+ @Rule
+ public TestMeta testMeta = new TestMeta();
+
+ /**
+ * The file is processed as a single block
+ */
+ @Test
+ public void testSingleBlock()
+ {
+ BlockMetadata.FileBlockMetadata block = new BlockMetadata.FileBlockMetadata(s3Directory + FILE_1, 0L, 0L,
+ testMeta.dataFile.length(), true, -1, testMeta.dataFile.length());
+
+ testMeta.blockReader.beginWindow(1);
+ testMeta.blockReader.blocksMetadataInput.process(block);
+ testMeta.blockReader.endWindow();
+
+ List<Object> actualMessages = testMeta.messageSink.collectedTuples;
+ Assert.assertEquals("No of records", testMeta.messages.size(), actualMessages.size());
+
+ for (int i = 0; i < actualMessages.size(); i++) {
+ byte[] msg = (byte[])actualMessages.get(i);
+ Assert.assertTrue("line " + i, Arrays.equals(new String(msg).split(","), testMeta.messages.get(i)));
+ }
+ }
+
+ /**
+ * The file is divided into multiple blocks, blocks are processed
+ * consecutively
+ */
+ @Test
+ public void testMultipleBlocks()
+ {
+ long blockSize = 1000;
+ int noOfBlocks = (int)((testMeta.dataFile.length() / blockSize)
+ + (((testMeta.dataFile.length() % blockSize) == 0) ? 0 : 1));
+
+ testMeta.blockReader.beginWindow(1);
+
+ for (int i = 0; i < noOfBlocks; i++) {
+ BlockMetadata.FileBlockMetadata blockMetadata = new BlockMetadata.FileBlockMetadata(s3Directory + FILE_1, i,
+ i * blockSize, i == noOfBlocks - 1 ? testMeta.dataFile.length() : (i + 1) * blockSize, i == noOfBlocks - 1,
+ i - 1, testMeta.dataFile.length());
+ testMeta.blockReader.blocksMetadataInput.process(blockMetadata);
+ }
+
+ testMeta.blockReader.endWindow();
+
+ List<Object> messages = testMeta.messageSink.collectedTuples;
+ Assert.assertEquals("No of records", testMeta.messages.size(), messages.size());
+ for (int i = 0; i < messages.size(); i++) {
+
+ byte[] msg = (byte[])messages.get(i);
+ Assert.assertTrue("line " + i, Arrays.equals(new String(msg).split(","), testMeta.messages.get(i)));
+ }
+ }
+
+ /**
+ * The file is divided into multiple blocks, blocks are processed
+ * non-consecutively
+ */
+ @Test
+ public void testNonConsecutiveBlocks()
+ {
+ long blockSize = 1000;
+ int noOfBlocks = (int)((testMeta.dataFile.length() / blockSize)
+ + (((testMeta.dataFile.length() % blockSize) == 0) ? 0 : 1));
+
+ testMeta.blockReader.beginWindow(1);
+
+ for (int i = 0; i < 10; i++) {
+ for (int j = 0; j < Math.ceil(noOfBlocks / 10.0); j++) {
+ int blockNo = 10 * j + i;
+ if (blockNo >= noOfBlocks) {
+ continue;
+ }
+ BlockMetadata.FileBlockMetadata blockMetadata = new BlockMetadata.FileBlockMetadata(s3Directory + FILE_1,
+ blockNo, blockNo * blockSize,
+ blockNo == noOfBlocks - 1 ? testMeta.dataFile.length() : (blockNo + 1) * blockSize,
+ blockNo == noOfBlocks - 1, blockNo - 1, testMeta.dataFile.length());
+ testMeta.blockReader.blocksMetadataInput.process(blockMetadata);
+ }
+ }
+
+ testMeta.blockReader.endWindow();
+
+ List<Object> messages = testMeta.messageSink.collectedTuples;
+ Assert.assertEquals("No of records", testMeta.messages.size(), messages.size());
+
+ Collections.sort(testMeta.messages, new Comparator<String[]>()
+ {
+ @Override
+ public int compare(String[] rec1, String[] rec2)
+ {
+ return compareStringArrayRecords(rec1, rec2);
+ }
+ });
+
+ Collections.sort(messages, new Comparator<Object>()
+ {
+ @Override
+ public int compare(Object object1, Object object2)
+ {
+ String[] rec1 = new String((byte[])object1).split(",");
+ String[] rec2 = new String((byte[])object2).split(",");
+ return compareStringArrayRecords(rec1, rec2);
+ }
+ });
+ for (int i = 0; i < messages.size(); i++) {
+ byte[] msg = (byte[])messages.get(i);
+ Assert.assertTrue("line " + i, Arrays.equals(new String(msg).split(","), testMeta.messages.get(i)));
+ }
+ }
+
+ /**
+ * Utility function to compare lexicographically 2 records of string arrays
+ *
+ * @param rec1
+ * @param rec2
+ * @return negative if rec1 < rec2, positive if rec1 > rec2, 0 otherwise
+ */
+ private int compareStringArrayRecords(String[] rec1, String[] rec2)
+ {
+ for (int i = 0; i < rec1.length && i < rec2.length; i++) {
+ if (rec1[i].equals(rec2[i])) {
+ continue;
+ }
+ return rec1[i].compareTo(rec2[i]);
+ }
+ return 0;
+ }
+
+ private static final Logger LOG = LoggerFactory.getLogger(S3DelimitedRecordReaderTest.class);
+}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/0500e0ea/library/src/test/java/org/apache/apex/malhar/lib/fs/s3/S3FixedWidthRecordReaderTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/org/apache/apex/malhar/lib/fs/s3/S3FixedWidthRecordReaderTest.java b/library/src/test/java/org/apache/apex/malhar/lib/fs/s3/S3FixedWidthRecordReaderTest.java
new file mode 100644
index 0000000..0584973
--- /dev/null
+++ b/library/src/test/java/org/apache/apex/malhar/lib/fs/s3/S3FixedWidthRecordReaderTest.java
@@ -0,0 +1,292 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.apex.malhar.lib.fs.s3;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.List;
+
+import org.junit.Assert;
+import org.junit.Ignore;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TestWatcher;
+import org.junit.runner.Description;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.amazonaws.auth.BasicAWSCredentials;
+import com.amazonaws.services.s3.AmazonS3;
+import com.amazonaws.services.s3.AmazonS3Client;
+import com.amazonaws.services.s3.model.ObjectListing;
+import com.amazonaws.services.s3.model.PutObjectRequest;
+import com.amazonaws.services.s3.model.S3ObjectSummary;
+import com.google.common.collect.Lists;
+
+import com.datatorrent.api.Attribute;
+import com.datatorrent.api.Context;
+import com.datatorrent.api.DAG;
+import com.datatorrent.lib.helper.OperatorContextTestHelper;
+import com.datatorrent.lib.io.block.AbstractFSBlockReader;
+import com.datatorrent.lib.io.block.BlockMetadata;
+import com.datatorrent.lib.testbench.CollectorTestSink;
+import com.datatorrent.netlet.util.Slice;
+
+@Ignore
+public class S3FixedWidthRecordReaderTest
+{
+ private final String accessKey = "*************";
+ private final String secretKey = "*********************";
+ private static final int recordLength = 123;
+ private static final String FILE_1 = "file1.txt";
+ private static final String s3Directory = "input/";
+
+ AbstractFSBlockReader<Slice> getBlockReader(String bucketKey)
+ {
+ S3RecordReader blockReader = new S3RecordReader();
+ blockReader.setAccessKey(accessKey);
+ blockReader.setSecretAccessKey(secretKey);
+ blockReader.setBucketName(bucketKey);
+ blockReader.setRecordLength(recordLength);
+ blockReader.setMode("FIXED_WIDTH_RECORD");
+ return blockReader;
+ }
+
+ class TestMeta extends TestWatcher
+ {
+ private Context.OperatorContext readerContext;
+ private AbstractFSBlockReader<Slice> blockReader;
+ private CollectorTestSink<Object> blockMetadataSink;
+ private CollectorTestSink<Object> messageSink;
+ private List<String[]> messages = Lists.newArrayList();
+ private String appId;
+ private String dataFilePath;
+ private File dataFile;
+ private String bucketKey;
+ private AmazonS3 client;
+
+ @Override
+ protected void starting(org.junit.runner.Description description)
+ {
+ dataFilePath = "src/test/resources/reader_test_data.csv";
+ dataFile = new File(dataFilePath);
+ bucketKey = new String("target-" + description.getMethodName()).toLowerCase();
+
+ client = new AmazonS3Client(new BasicAWSCredentials(accessKey, secretKey));
+ client.createBucket(bucketKey);
+
+ client.putObject(new PutObjectRequest(bucketKey, s3Directory + FILE_1, dataFile));
+
+ appId = Long.toHexString(System.currentTimeMillis());
+ blockReader = getBlockReader(bucketKey);
+
+ Attribute.AttributeMap.DefaultAttributeMap readerAttr = new Attribute.AttributeMap.DefaultAttributeMap();
+ readerAttr.put(DAG.APPLICATION_ID, appId);
+ readerAttr.put(Context.OperatorContext.SPIN_MILLIS, 10);
+ readerContext = new OperatorContextTestHelper.TestIdOperatorContext(1, readerAttr);
+
+ blockReader.setup(readerContext);
+
+ messageSink = new CollectorTestSink<>();
+ ((S3RecordReader)blockReader).records.setSink(messageSink);
+
+ blockMetadataSink = new CollectorTestSink<>();
+ blockReader.blocksMetadataOutput.setSink(blockMetadataSink);
+
+ BufferedReader reader;
+ try {
+ reader = new BufferedReader(new InputStreamReader(new FileInputStream(dataFile.getAbsolutePath())));
+ String line;
+ while ((line = reader.readLine()) != null) {
+ messages.add(line.split(","));
+ }
+ reader.close();
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ protected void finished(Description description)
+ {
+ deleteBucketAndContent();
+ blockReader.teardown();
+ }
+
+ public void deleteBucketAndContent()
+ {
+ //Get the list of objects
+ ObjectListing objectListing = client.listObjects(bucketKey);
+ for (Iterator<?> iterator = objectListing.getObjectSummaries().iterator(); iterator.hasNext();) {
+ S3ObjectSummary objectSummary = (S3ObjectSummary)iterator.next();
+ LOG.info("Deleting an object: {}", objectSummary.getKey());
+ client.deleteObject(bucketKey, objectSummary.getKey());
+ }
+ client.deleteBucket(bucketKey);
+ }
+ }
+
+ @Rule
+ public TestMeta testMeta = new TestMeta();
+
+ /**
+ * The file is processed as a single block
+ */
+ @Test
+ public void testSingleBlock()
+ {
+ BlockMetadata.FileBlockMetadata block = new BlockMetadata.FileBlockMetadata(s3Directory + FILE_1, 0L, 0L,
+ testMeta.dataFile.length(), true, -1, testMeta.dataFile.length());
+
+ testMeta.blockReader.beginWindow(1);
+ testMeta.blockReader.blocksMetadataInput.process(block);
+ testMeta.blockReader.endWindow();
+
+ List<Object> actualMessages = testMeta.messageSink.collectedTuples;
+ Assert.assertEquals("No of records", testMeta.messages.size(), actualMessages.size());
+
+ for (int i = 0; i < actualMessages.size(); i++) {
+ byte[] msg = (byte[])actualMessages.get(i);
+ /*
+ * last character is removed below since the testMeta.messages does not contain '\n'
+ * present in byte[] msg
+ */
+ Assert.assertTrue("line " + i,
+ Arrays.equals(new String(Arrays.copyOf(msg, msg.length - 1)).split(","), testMeta.messages.get(i)));
+ }
+ }
+
+ /**
+ * The file is divided into multiple blocks, blocks are processed
+ * consecutively
+ */
+ @Test
+ public void testMultipleBlocks()
+ {
+ long blockSize = 1000;
+ int noOfBlocks = (int)((testMeta.dataFile.length() / blockSize)
+ + (((testMeta.dataFile.length() % blockSize) == 0) ? 0 : 1));
+
+ testMeta.blockReader.beginWindow(1);
+
+ for (int i = 0; i < noOfBlocks; i++) {
+ BlockMetadata.FileBlockMetadata blockMetadata = new BlockMetadata.FileBlockMetadata(s3Directory + FILE_1, i,
+ i * blockSize, i == noOfBlocks - 1 ? testMeta.dataFile.length() : (i + 1) * blockSize, i == noOfBlocks - 1,
+ i - 1, testMeta.dataFile.length());
+ testMeta.blockReader.blocksMetadataInput.process(blockMetadata);
+ }
+
+ testMeta.blockReader.endWindow();
+
+ List<Object> messages = testMeta.messageSink.collectedTuples;
+ Assert.assertEquals("No of records", testMeta.messages.size(), messages.size());
+ for (int i = 0; i < messages.size(); i++) {
+
+ byte[] msg = (byte[])messages.get(i);
+ Assert.assertTrue("line " + i,
+ Arrays.equals(new String(Arrays.copyOf(msg, msg.length - 1)).split(","), testMeta.messages.get(i)));
+ }
+ }
+
+ /**
+ * The file is divided into multiple blocks, blocks are processed
+ * non-consecutively
+ */
+ @Test
+ public void testNonConsecutiveBlocks()
+ {
+ long blockSize = 1000;
+ int noOfBlocks = (int)((testMeta.dataFile.length() / blockSize)
+ + (((testMeta.dataFile.length() % blockSize) == 0) ? 0 : 1));
+
+ testMeta.blockReader.beginWindow(1);
+
+ for (int i = 0; i < 10; i++) {
+ for (int j = 0; j < Math.ceil(noOfBlocks / 10.0); j++) {
+ int blockNo = 10 * j + i;
+ if (blockNo >= noOfBlocks) {
+ continue;
+ }
+ BlockMetadata.FileBlockMetadata blockMetadata = new BlockMetadata.FileBlockMetadata(s3Directory + FILE_1,
+ blockNo, blockNo * blockSize,
+ blockNo == noOfBlocks - 1 ? testMeta.dataFile.length() : (blockNo + 1) * blockSize,
+ blockNo == noOfBlocks - 1, blockNo - 1, testMeta.dataFile.length());
+ testMeta.blockReader.blocksMetadataInput.process(blockMetadata);
+ }
+ }
+
+ testMeta.blockReader.endWindow();
+
+ List<Object> messages = testMeta.messageSink.collectedTuples;
+ Assert.assertEquals("No of records", testMeta.messages.size(), messages.size());
+
+ Collections.sort(testMeta.messages, new Comparator<String[]>()
+ {
+ @Override
+ public int compare(String[] rec1, String[] rec2)
+ {
+ return compareStringArrayRecords(rec1, rec2);
+ }
+ });
+
+ Collections.sort(messages, new Comparator<Object>()
+ {
+ @Override
+ public int compare(Object object1, Object object2)
+ {
+ String[] rec1 = new String((byte[])object1).split(",");
+ String[] rec2 = new String((byte[])object2).split(",");
+ return compareStringArrayRecords(rec1, rec2);
+ }
+ });
+ for (int i = 0; i < messages.size(); i++) {
+ byte[] msg = (byte[])messages.get(i);
+ Assert.assertTrue("line " + i,
+ Arrays.equals(new String(Arrays.copyOf(msg, msg.length - 1)).split(","), testMeta.messages.get(i)));
+ }
+ }
+
+ /**
+ * Utility function to compare lexicographically 2 records of string arrays
+ *
+ * @param rec1
+ * @param rec2
+ * @return negative if rec1 < rec2, positive if rec1 > rec2, 0 otherwise
+ */
+ private int compareStringArrayRecords(String[] rec1, String[] rec2)
+ {
+ for (int i = 0; i < rec1.length && i < rec2.length; i++) {
+ if (rec1[i].equals(rec2[i])) {
+ continue;
+ }
+ return rec1[i].compareTo(rec2[i]);
+ }
+ return 0;
+ }
+
+ private static final Logger LOG = LoggerFactory.getLogger(S3FixedWidthRecordReaderTest.class);
+}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/0500e0ea/library/src/test/java/org/apache/apex/malhar/lib/fs/s3/S3RecordReaderMockTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/org/apache/apex/malhar/lib/fs/s3/S3RecordReaderMockTest.java b/library/src/test/java/org/apache/apex/malhar/lib/fs/s3/S3RecordReaderMockTest.java
new file mode 100644
index 0000000..bca3f33
--- /dev/null
+++ b/library/src/test/java/org/apache/apex/malhar/lib/fs/s3/S3RecordReaderMockTest.java
@@ -0,0 +1,269 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.apex.malhar.lib.fs.s3;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.Set;
+
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TestWatcher;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.apex.malhar.lib.fs.FSRecordReaderModule;
+import org.apache.apex.malhar.lib.fs.FSRecordReaderTest.DelimitedValidator;
+import org.apache.apex.malhar.lib.fs.FSRecordReaderTest.FixedWidthValidator;
+import org.apache.commons.io.FileUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.Path;
+
+import com.amazonaws.services.s3.AmazonS3;
+
+import com.datatorrent.api.Context.OperatorContext;
+import com.datatorrent.api.DAG;
+import com.datatorrent.api.LocalMode;
+import com.datatorrent.api.StreamingApplication;
+import com.datatorrent.lib.io.block.BlockMetadata.FileBlockMetadata;
+import com.datatorrent.lib.io.block.ReaderContext;
+import com.datatorrent.lib.io.fs.S3BlockReader;
+
+import static org.mockito.Mockito.mock;
+
+public class S3RecordReaderMockTest
+{
+ private String inputDir;
+ static String outputDir;
+ private static final String FILE_1 = "file1.txt";
+ private static final String FILE_2 = "file2.txt";
+ private static final String FILE_1_DATA = "1234\n567890\nabcde\nfgh\ni\njklmop";
+ private static final String FILE_2_DATA = "qr\nstuvw\nxyz\n";
+
+ public static class TestMeta extends TestWatcher
+ {
+ public String baseDirectory;
+
+ @Override
+ protected void starting(org.junit.runner.Description description)
+ {
+ this.baseDirectory = "target/" + description.getClassName() + "/" + description.getMethodName();
+ }
+
+ }
+
+ @Rule
+ public TestMeta testMeta = new TestMeta();
+
+ @Before
+ public void setup() throws Exception
+ {
+ inputDir = testMeta.baseDirectory + File.separator + "input";
+
+ FileUtils.writeStringToFile(new File(inputDir + File.separator + FILE_1), FILE_1_DATA);
+ FileUtils.writeStringToFile(new File(inputDir + File.separator + FILE_2), FILE_2_DATA);
+ }
+
+ @Test
+ public void testDelimitedRecords() throws Exception
+ {
+
+ DelimitedApplication app = new DelimitedApplication();
+ LocalMode lma = LocalMode.newInstance();
+ Configuration conf = new Configuration(false);
+ conf.set("dt.operator.S3RecordReaderModuleMock.prop.files", inputDir);
+ conf.set("dt.operator.S3RecordReaderModuleMock.prop.blockSize", "3");
+ conf.set("dt.operator.S3RecordReaderModuleMock.prop.blocksThreshold", "1");
+ conf.set("dt.operator.S3RecordReaderModuleMock.prop.scanIntervalMillis", "10000");
+
+ lma.prepareDAG(app, conf);
+ LocalMode.Controller lc = lma.getController();
+ lc.setHeartbeatMonitoringEnabled(true);
+ lc.runAsync();
+
+ Set<String> expectedRecords = new HashSet<String>(Arrays.asList(FILE_1_DATA.split("\n")));
+ expectedRecords.addAll(Arrays.asList(FILE_2_DATA.split("\n")));
+
+ while (DelimitedValidator.getRecords().size() != expectedRecords.size()) {
+ LOG.debug("Waiting for app to finish");
+ Thread.sleep(1000);
+ }
+ lc.shutdown();
+ Assert.assertEquals(expectedRecords, DelimitedValidator.getRecords());
+
+ }
+
+ private static class S3RecordReaderMock extends S3RecordReader
+ {
+ AmazonS3 s3ClientObject;
+
+ @Override
+ protected FSDataInputStream setupStream(FileBlockMetadata block) throws IOException
+ {
+ super.setupStream(block);
+ return fs.open(new Path(block.getFilePath()));
+ }
+
+ @Override
+ public void setup(OperatorContext context)
+ {
+ s3ClientObject = mock(AmazonS3.class);
+ super.setup(context);
+ }
+
+ @Override
+ protected ReaderContext<FSDataInputStream> createDelimitedReaderContext()
+ {
+ S3DelimitedRecordReaderContextMock s3DelimitedRecordReaderContextMock = new S3DelimitedRecordReaderContextMock();
+ s3DelimitedRecordReaderContextMock.getS3Params().setBucketName("S3RecordReaderMock");
+ s3DelimitedRecordReaderContextMock.getS3Params().setS3Client(s3ClientObject);
+ return s3DelimitedRecordReaderContextMock;
+ }
+
+ @Override
+ protected ReaderContext<FSDataInputStream> createFixedWidthReaderContext()
+ {
+ S3FixedWidthRecordReaderContextMock s3FixedWidthRecordReaderContextMock = new S3FixedWidthRecordReaderContextMock();
+ s3FixedWidthRecordReaderContextMock.getS3Params().setBucketName("S3RecordReaderMock");
+ s3FixedWidthRecordReaderContextMock.getS3Params().setS3Client(s3ClientObject);
+ s3FixedWidthRecordReaderContextMock.setLength(this.getRecordLength());
+ return s3FixedWidthRecordReaderContextMock;
+ }
+
+ private class S3DelimitedRecordReaderContextMock extends S3DelimitedRecordReaderContext
+ {
+ @Override
+ protected int readData(long bytesFromCurrentOffset, int bytesToFetch) throws IOException
+ {
+ if (buffer == null) {
+ buffer = new byte[bytesToFetch];
+ }
+ return stream.read(offset + bytesFromCurrentOffset, buffer, 0, bytesToFetch);
+ }
+ }
+
+ private static class S3FixedWidthRecordReaderContextMock extends S3FixedWidthRecordReaderContext
+ {
+ @Override
+ protected int readData(long startOffset, long endOffset) throws IOException
+ {
+ int bufferSize = Long.valueOf(endOffset - startOffset + 1).intValue();
+ if (buffer == null) {
+ buffer = new byte[bufferSize];
+ }
+ return stream.read(startOffset, buffer, 0, bufferSize);
+ }
+ }
+ }
+
+ private static class S3RecordReaderModuleMock extends S3RecordReaderModule
+ {
+ @Override
+ public S3RecordReader createRecordReader()
+ {
+ S3RecordReader s3RecordReader = new S3RecordReaderMock();
+ s3RecordReader.setBucketName(S3BlockReader.extractBucket(getFiles()));
+ s3RecordReader.setAccessKey("****");
+ s3RecordReader.setSecretAccessKey("*****");
+ s3RecordReader.setMode(this.getMode().toString());
+ s3RecordReader.setRecordLength(this.getRecordLength());
+ return s3RecordReader;
+ }
+ }
+
+ private static class DelimitedApplication implements StreamingApplication
+ {
+
+ public void populateDAG(DAG dag, Configuration conf)
+ {
+
+ S3RecordReaderModuleMock recordReader = dag.addModule("S3RecordReaderModuleMock", new S3RecordReaderModuleMock());
+ recordReader.setMode("delimited_record");
+ DelimitedValidator validator = dag.addOperator("Validator", new DelimitedValidator());
+ dag.addStream("records", recordReader.records, validator.data);
+ }
+
+ }
+
+ @Test
+ public void testFixedWidthRecords() throws Exception
+ {
+
+ FixedWidthApplication app = new FixedWidthApplication();
+ LocalMode lma = LocalMode.newInstance();
+ Configuration conf = new Configuration(false);
+ conf.set("dt.operator.S3RecordReaderModuleMock.prop.files", inputDir);
+ conf.set("dt.operator.S3RecordReaderModuleMock.prop.recordLength", "8");
+ conf.set("dt.operator.S3RecordReaderModuleMock.prop.blockSize", "3");
+ conf.set("dt.operator.S3RecordReaderModuleMock.prop.blocksThreshold", "1");
+ conf.set("dt.operator.S3RecordReaderModuleMock.prop.scanIntervalMillis", "10000");
+
+ lma.prepareDAG(app, conf);
+ LocalMode.Controller lc = lma.getController();
+ lc.setHeartbeatMonitoringEnabled(true);
+ lc.runAsync();
+ LOG.debug("Waiting for app to finish");
+ Thread.sleep(1000 * 1);
+ lc.shutdown();
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void testMissingRecordLength() throws Exception
+ {
+ FixedWidthApplication app = new FixedWidthApplication();
+ LocalMode lma = LocalMode.newInstance();
+ Configuration conf = new Configuration(false);
+ conf.set("dt.operator.S3RecordReaderModuleMock.prop.files", inputDir);
+ //Should give IllegalArgumentException since recordLength is not set
+ //conf.set("dt.operator.HDFSRecordReaderModule.prop.recordLength", "8");
+ conf.set("dt.operator.S3RecordReaderModuleMock.prop.blocksThreshold", "1");
+ conf.set("dt.operator.S3RecordReaderModuleMock.prop.blockSize", "3");
+ conf.set("dt.operator.S3RecordReaderModuleMock.prop.scanIntervalMillis", "10000");
+
+ lma.prepareDAG(app, conf);
+ LocalMode.Controller lc = lma.getController();
+ lc.setHeartbeatMonitoringEnabled(true);
+ lc.runAsync();
+ LOG.debug("Waiting for app to finish");
+ Thread.sleep(1000 * 1);
+ lc.shutdown();
+ }
+
+ private static class FixedWidthApplication implements StreamingApplication
+ {
+
+ public void populateDAG(DAG dag, Configuration conf)
+ {
+ FSRecordReaderModule recordReader = dag.addModule("S3RecordReaderModuleMock", FSRecordReaderModule.class);
+ recordReader.setMode("FIXED_WIDTH_RECORD");
+ FixedWidthValidator validator = dag.addOperator("Validator", new FixedWidthValidator());
+ dag.addStream("records", recordReader.records, validator.data);
+ }
+
+ }
+
+ private static Logger LOG = LoggerFactory.getLogger(S3RecordReaderMockTest.class);
+
+}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/0500e0ea/library/src/test/java/org/apache/apex/malhar/lib/fs/s3/S3RecordReaderModuleAppTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/org/apache/apex/malhar/lib/fs/s3/S3RecordReaderModuleAppTest.java b/library/src/test/java/org/apache/apex/malhar/lib/fs/s3/S3RecordReaderModuleAppTest.java
new file mode 100644
index 0000000..7d69934
--- /dev/null
+++ b/library/src/test/java/org/apache/apex/malhar/lib/fs/s3/S3RecordReaderModuleAppTest.java
@@ -0,0 +1,221 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.apex.malhar.lib.fs.s3;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Set;
+
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Ignore;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TestWatcher;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.apex.malhar.lib.fs.FSRecordReaderTest.DelimitedValidator;
+import org.apache.apex.malhar.lib.fs.FSRecordReaderTest.FixedWidthValidator;
+import org.apache.commons.io.FileUtils;
+import org.apache.hadoop.conf.Configuration;
+
+import com.amazonaws.auth.BasicAWSCredentials;
+import com.amazonaws.services.s3.AmazonS3;
+import com.amazonaws.services.s3.AmazonS3Client;
+import com.amazonaws.services.s3.model.ObjectListing;
+import com.amazonaws.services.s3.model.PutObjectRequest;
+import com.amazonaws.services.s3.model.S3ObjectSummary;
+
+import com.datatorrent.api.DAG;
+import com.datatorrent.api.LocalMode;
+import com.datatorrent.api.StreamingApplication;
+
+@Ignore
+public class S3RecordReaderModuleAppTest
+{
+ private String inputDir;
+ private static final String FILE_1 = "file1.txt";
+ private static final String FILE_2 = "file2.txt";
+ private static final String FILE_1_DATA = "1234\n567890\nabcde\nfgh\ni\njklmop";
+ private static final String FILE_2_DATA = "qr\nstuvw\nxyz\n";
+
+ private final String accessKey = "*************";
+ private final String secretKey = "*********************";
+ private AmazonS3 client;
+ private String files;
+ private static final String SCHEME = "s3n";
+
+ public static class TestMeta extends TestWatcher
+ {
+ public String baseDirectory;
+ public String bucketKey;
+
+ @Override
+ protected void starting(org.junit.runner.Description description)
+ {
+ this.baseDirectory = "target/" + description.getClassName() + "/" + description.getMethodName();
+ this.bucketKey = new String("target-" + description.getMethodName()).toLowerCase();
+ }
+ }
+
+ @Rule
+ public S3RecordReaderModuleAppTest.TestMeta testMeta = new S3RecordReaderModuleAppTest.TestMeta();
+
+ @Before
+ public void setup() throws Exception
+ {
+ client = new AmazonS3Client(new BasicAWSCredentials(accessKey, secretKey));
+ client.createBucket(testMeta.bucketKey);
+ inputDir = testMeta.baseDirectory + File.separator + "input";
+
+ File file1 = new File(inputDir + File.separator + FILE_1);
+ File file2 = new File(inputDir + File.separator + FILE_2);
+
+ FileUtils.writeStringToFile(file1, FILE_1_DATA);
+ FileUtils.writeStringToFile(file2, FILE_2_DATA);
+
+ client.putObject(new PutObjectRequest(testMeta.bucketKey, "input/" + FILE_1, file1));
+ client.putObject(new PutObjectRequest(testMeta.bucketKey, "input/" + FILE_2, file2));
+ files = SCHEME + "://" + accessKey + ":" + secretKey + "@" + testMeta.bucketKey + "/input";
+ }
+
+ @Test
+ public void testS3DelimitedRecords() throws Exception
+ {
+
+ S3DelimitedApplication app = new S3DelimitedApplication();
+ LocalMode lma = LocalMode.newInstance();
+
+ Configuration conf = new Configuration(false);
+ conf.set("dt.operator.s3RecordReaderModule.prop.files", files);
+ conf.set("dt.operator.s3RecordReaderModule.prop.blockSize", "10");
+ conf.set("dt.operator.s3RecordReaderModule.prop.overflowBlockSize", "4");
+ conf.set("dt.operator.s3RecordReaderModule.prop.scanIntervalMillis", "10000");
+
+ lma.prepareDAG(app, conf);
+ LocalMode.Controller lc = lma.getController();
+ lc.setHeartbeatMonitoringEnabled(true);
+ lc.runAsync();
+
+ Set<String> expectedRecords = new HashSet<String>(Arrays.asList(FILE_1_DATA.split("\n")));
+ expectedRecords.addAll(Arrays.asList(FILE_2_DATA.split("\n")));
+
+ while (DelimitedValidator.getRecords().size() != expectedRecords.size()) {
+ LOG.debug("Waiting for app to finish");
+ Thread.sleep(1000);
+ }
+ lc.shutdown();
+ Assert.assertEquals(expectedRecords, DelimitedValidator.getRecords());
+
+ }
+
+ private static class S3DelimitedApplication implements StreamingApplication
+ {
+
+ public void populateDAG(DAG dag, Configuration conf)
+ {
+ S3RecordReaderModule recordReader = dag.addModule("s3RecordReaderModule", S3RecordReaderModule.class);
+ DelimitedValidator validator = dag.addOperator("Validator", new DelimitedValidator());
+ dag.addStream("records", recordReader.records, validator.data);
+ }
+
+ }
+
+ @Test
+ public void testS3FixedWidthRecords() throws Exception
+ {
+
+ S3FixedWidthApplication app = new S3FixedWidthApplication();
+ LocalMode lma = LocalMode.newInstance();
+ Configuration conf = new Configuration(false);
+ conf.set("dt.operator.S3RecordReaderModule.prop.files", files);
+ conf.set("dt.operator.S3RecordReaderModule.prop.recordLength", "8");
+ conf.set("dt.operator.S3RecordReaderModule.prop.blocksThreshold", "1");
+ conf.set("dt.operator.S3RecordReaderModule.prop.scanIntervalMillis", "10000");
+
+ lma.prepareDAG(app, conf);
+ LocalMode.Controller lc = lma.getController();
+ lc.setHeartbeatMonitoringEnabled(true);
+ lc.runAsync();
+ LOG.debug("Waiting for app to finish");
+ Thread.sleep(1000 * 1);
+ lc.shutdown();
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void testS3MissingRecordLength() throws Exception
+ {
+ S3FixedWidthApplication app = new S3FixedWidthApplication();
+ LocalMode lma = LocalMode.newInstance();
+ Configuration conf = new Configuration(false);
+ conf.set("dt.operator.S3RecordReaderModule.prop.files", files);
+ //Should give IllegalArgumentException since recordLength is not set
+ //conf.set("dt.operator.S3RecordReaderModule.prop.recordLength", "8");
+ conf.set("dt.operator.S3RecordReaderModule.prop.blocksThreshold", "1");
+ conf.set("dt.operator.S3RecordReaderModule.prop.scanIntervalMillis", "10000");
+
+ lma.prepareDAG(app, conf);
+ LocalMode.Controller lc = lma.getController();
+ lc.setHeartbeatMonitoringEnabled(true);
+ lc.runAsync();
+ LOG.debug("Waiting for app to finish");
+ Thread.sleep(1000 * 1);
+ lc.shutdown();
+ }
+
+ private static class S3FixedWidthApplication implements StreamingApplication
+ {
+
+ public void populateDAG(DAG dag, Configuration conf)
+ {
+ S3RecordReaderModule recordReader = dag.addModule("S3RecordReaderModule", S3RecordReaderModule.class);
+ recordReader.setMode("fixed_width_record");
+ FixedWidthValidator validator = dag.addOperator("Validator", new FixedWidthValidator());
+ dag.addStream("records", recordReader.records, validator.data);
+ }
+
+ }
+
+ @After
+ public void tearDown() throws IOException
+ {
+ FileUtils.deleteDirectory(new File(inputDir));
+ deleteBucketAndContent();
+ }
+
+ public void deleteBucketAndContent()
+ {
+ //Get the list of objects
+ ObjectListing objectListing = client.listObjects(testMeta.bucketKey);
+ for (Iterator<?> iterator = objectListing.getObjectSummaries().iterator(); iterator.hasNext();) {
+ S3ObjectSummary objectSummary = (S3ObjectSummary)iterator.next();
+ LOG.info("Deleting an object: {}", objectSummary.getKey());
+ client.deleteObject(testMeta.bucketKey, objectSummary.getKey());
+ }
+ client.deleteBucket(testMeta.bucketKey);
+ }
+
+ private static Logger LOG = LoggerFactory.getLogger(S3RecordReaderModuleAppTest.class);
+}