You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by bh...@apache.org on 2016/12/01 05:28:54 UTC

apex-malhar git commit: APEXMALHAR-2022 Development of S3 Output Module

Repository: apex-malhar
Updated Branches:
  refs/heads/master 243dceb56 -> a5e8fa3fa


APEXMALHAR-2022 Development of S3 Output Module


Project: http://git-wip-us.apache.org/repos/asf/apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/apex-malhar/commit/a5e8fa3f
Tree: http://git-wip-us.apache.org/repos/asf/apex-malhar/tree/a5e8fa3f
Diff: http://git-wip-us.apache.org/repos/asf/apex-malhar/diff/a5e8fa3f

Branch: refs/heads/master
Commit: a5e8fa3facca750f5d7402c2c29e7cbabe53bd9e
Parents: 243dceb
Author: chaitanya <ch...@apache.org>
Authored: Wed Nov 30 10:47:36 2016 +0530
Committer: chaitanya <ch...@apache.org>
Committed: Wed Nov 30 16:31:42 2016 +0530

----------------------------------------------------------------------
 .../malhar/lib/fs/s3/S3BlockUploadOperator.java | 538 +++++++++++++++++++
 .../apex/malhar/lib/fs/s3/S3FileMerger.java     | 302 +++++++++++
 .../lib/fs/s3/S3InitiateFileUploadOperator.java | 378 +++++++++++++
 .../apex/malhar/lib/fs/s3/S3OutputModule.java   | 325 +++++++++++
 .../fs/s3/S3InitiateFileUploadOperatorTest.java | 119 ++++
 .../lib/fs/s3/S3OutputModuleMockTest.java       | 171 ++++++
 .../malhar/lib/fs/s3/S3OutputTestModule.java    |  72 +++
 7 files changed, 1905 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/a5e8fa3f/library/src/main/java/org/apache/apex/malhar/lib/fs/s3/S3BlockUploadOperator.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/fs/s3/S3BlockUploadOperator.java b/library/src/main/java/org/apache/apex/malhar/lib/fs/s3/S3BlockUploadOperator.java
new file mode 100644
index 0000000..aafa3a7
--- /dev/null
+++ b/library/src/main/java/org/apache/apex/malhar/lib/fs/s3/S3BlockUploadOperator.java
@@ -0,0 +1,538 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.fs.s3;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+import javax.validation.constraints.NotNull;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.apex.malhar.lib.wal.FSWindowDataManager;
+import org.apache.apex.malhar.lib.wal.WindowDataManager;
+import org.apache.hadoop.classification.InterfaceStability;
+
+import com.amazonaws.auth.BasicAWSCredentials;
+import com.amazonaws.services.s3.AmazonS3;
+import com.amazonaws.services.s3.AmazonS3Client;
+import com.amazonaws.services.s3.model.ObjectMetadata;
+import com.amazonaws.services.s3.model.PartETag;
+import com.amazonaws.services.s3.model.PutObjectRequest;
+import com.amazonaws.services.s3.model.PutObjectResult;
+import com.amazonaws.services.s3.model.UploadPartRequest;
+import com.esotericsoftware.kryo.serializers.FieldSerializer;
+import com.esotericsoftware.kryo.serializers.JavaSerializer;
+import com.google.common.base.Preconditions;
+
+import com.datatorrent.api.Context;
+import com.datatorrent.api.DefaultInputPort;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.api.Operator;
+import com.datatorrent.lib.io.block.AbstractBlockReader;
+import com.datatorrent.lib.io.block.BlockMetadata;
+import com.datatorrent.netlet.util.Slice;
+
+/**
+ * This operator can be used to upload the block into S3 bucket using multi-part feature or putObject API.
+ * Upload the block into S3 using multi-part feature only if the number of blocks of a file is > 1.
+ * This operator is useful in context of S3 Output Module.
+ */
+
+@InterfaceStability.Evolving
+public class S3BlockUploadOperator implements Operator, Operator.CheckpointNotificationListener, Operator.IdleTimeHandler
+{
+  private static final Logger LOG = LoggerFactory.getLogger(S3BlockUploadOperator.class);
+  @NotNull
+  private String bucketName;
+  @NotNull
+  private String accessKey;
+  @NotNull
+  private String secretAccessKey;
+  private String endPoint;
+  private Map<String, S3BlockMetaData> blockInfo = new HashMap<>();
+  private transient Map<Long, String> blockIdToFilePath = new HashMap<>();
+  private WindowDataManager windowDataManager = new FSWindowDataManager();
+  protected transient AmazonS3 s3Client;
+  private transient long currentWindowId;
+  private transient List<AbstractBlockReader.ReaderRecord<Slice>> waitingTuples;
+  private transient Map<String, UploadBlockMetadata> currentWindowRecoveryState;
+  public final transient DefaultOutputPort<UploadBlockMetadata> output = new DefaultOutputPort<>();
+
+  /**
+   * This input port receives incoming tuple's(Block data).
+   */
+  public final transient DefaultInputPort<AbstractBlockReader.ReaderRecord<Slice>> blockInput = new DefaultInputPort<AbstractBlockReader.ReaderRecord<Slice>>()
+  {
+    @Override
+    public void process(AbstractBlockReader.ReaderRecord<Slice> tuple)
+    {
+      uploadBlockIntoS3(tuple);
+    }
+  };
+
+  /**
+   * Input port to receive Block meta data
+   */
+  public final transient DefaultInputPort<BlockMetadata.FileBlockMetadata> blockMetadataInput = new DefaultInputPort<BlockMetadata.FileBlockMetadata>()
+  {
+    @Override
+    public void process(BlockMetadata.FileBlockMetadata blockMetadata)
+    {
+      if (currentWindowId <= windowDataManager.getLargestCompletedWindow()) {
+        return;
+      }
+      blockIdToFilePath.put(blockMetadata.getBlockId(), blockMetadata.getFilePath());
+      LOG.debug("received blockId {} for file {} ", blockMetadata.getBlockId(), blockMetadata.getFilePath());
+    }
+  };
+
+  /**
+   * Input port to receive upload file meta data.
+   */
+  public final transient DefaultInputPort<S3InitiateFileUploadOperator.UploadFileMetadata> uploadMetadataInput = new DefaultInputPort<S3InitiateFileUploadOperator.UploadFileMetadata>()
+  {
+    @Override
+    public void process(S3InitiateFileUploadOperator.UploadFileMetadata tuple)
+    {
+      processUploadFileMetadata(tuple);
+    }
+  };
+
+  /**
+   * Convert each block of a given file into S3BlockMetaData
+   * @param tuple UploadFileMetadata
+   */
+  protected void processUploadFileMetadata(S3InitiateFileUploadOperator.UploadFileMetadata tuple)
+  {
+    long[] blocks = tuple.getFileMetadata().getBlockIds();
+    String filePath = tuple.getFileMetadata().getFilePath();
+    for (int i = 0; i < blocks.length; i++) {
+      String blockId = getUniqueBlockIdFromFile(blocks[i], filePath);
+      if (blockInfo.get(blockId) != null) {
+        break;
+      }
+      blockInfo.put(blockId, new S3BlockMetaData(tuple.getKeyName(), tuple.getUploadId(), i + 1));
+    }
+    if (blocks.length > 0) {
+      blockInfo.get(getUniqueBlockIdFromFile(blocks[blocks.length - 1], filePath)).setLastBlock(true);
+    }
+  }
+
+  /**
+   * Construct the unique block id from the given block id and file path.
+   * @param blockId Id of the block
+   * @param filepath given filepath
+   * @return unique block id
+   */
+  public static String getUniqueBlockIdFromFile(long blockId, String filepath)
+  {
+    return Long.toString(blockId) + "|" + filepath;
+  }
+
+  @Override
+  public void setup(Context.OperatorContext context)
+  {
+    waitingTuples = new ArrayList<>();
+    currentWindowRecoveryState = new HashMap<>();
+    windowDataManager.setup(context);
+    s3Client = createClient();
+  }
+
+  /**
+   * Create AmazonS3 client using AWS credentials
+   * @return AmazonS3
+   */
+  protected AmazonS3 createClient()
+  {
+    AmazonS3 client = new AmazonS3Client(new BasicAWSCredentials(accessKey, secretAccessKey));
+    if (endPoint != null) {
+      client.setEndpoint(endPoint);
+    }
+    return client;
+  }
+
+  @Override
+  public void beginWindow(long windowId)
+  {
+    currentWindowId = windowId;
+    if (windowId <= windowDataManager.getLargestCompletedWindow()) {
+      replay(windowId);
+    }
+  }
+
+  /**
+   * Replay the state.
+   * @param windowId replay window Id
+   */
+  protected void replay(long windowId)
+  {
+    try {
+      @SuppressWarnings("unchecked")
+      Map<String, UploadBlockMetadata> recoveredData = (Map<String, UploadBlockMetadata>)windowDataManager.retrieve(windowId);
+      if (recoveredData == null) {
+        return;
+      }
+      for (Map.Entry<String, UploadBlockMetadata> uploadBlockMetadata: recoveredData.entrySet()) {
+        output.emit(uploadBlockMetadata.getValue());
+        blockInfo.remove(uploadBlockMetadata.getKey());
+      }
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  @Override
+  public void endWindow()
+  {
+    if (waitingTuples.size() > 0) {
+      processWaitBlocks();
+    }
+
+    for (String uniqueblockId : currentWindowRecoveryState.keySet()) {
+      long blockId = Long.parseLong(uniqueblockId.substring(0, uniqueblockId.indexOf("|")));
+      LOG.debug("Successfully uploaded {} block", blockId);
+      blockIdToFilePath.remove(blockId);
+      blockInfo.remove(uniqueblockId);
+    }
+
+    if (blockIdToFilePath.size() > 0) {
+      for (Long blockId : blockIdToFilePath.keySet()) {
+        LOG.info("Unable to uploaded {} block", blockId);
+      }
+      blockIdToFilePath.clear();
+    }
+
+    if (currentWindowId > windowDataManager.getLargestCompletedWindow()) {
+      try {
+        windowDataManager.save(currentWindowRecoveryState, currentWindowId);
+      } catch (IOException e) {
+        throw new RuntimeException("Unable to save recovery", e);
+      }
+    }
+    currentWindowRecoveryState.clear();
+  }
+
+  @Override
+  public void teardown()
+  {
+    windowDataManager.teardown();
+  }
+
+  /**
+   * Process the blocks which are in wait state.
+   */
+  private void processWaitBlocks()
+  {
+    Iterator<AbstractBlockReader.ReaderRecord<Slice>> waitIterator = waitingTuples.iterator();
+
+    while (waitIterator.hasNext()) {
+      AbstractBlockReader.ReaderRecord<Slice> blockData = waitIterator.next();
+      String filePath = blockIdToFilePath.get(blockData.getBlockId());
+      if (filePath != null && blockInfo.get(getUniqueBlockIdFromFile(blockData.getBlockId(), filePath)) != null) {
+        uploadBlockIntoS3(blockData);
+        waitIterator.remove();
+      }
+    }
+  }
+
+  /**
+   * Upload the block into S3 bucket.
+   * @param tuple block data
+   */
+  protected void uploadBlockIntoS3(AbstractBlockReader.ReaderRecord<Slice> tuple)
+  {
+    if (currentWindowId <= windowDataManager.getLargestCompletedWindow()) {
+      return;
+    }
+    // Check whether the block metadata is present for this block
+    if (blockIdToFilePath.get(tuple.getBlockId()) == null) {
+      if (!waitingTuples.contains(tuple)) {
+        waitingTuples.add(tuple);
+      }
+      return;
+    }
+    String uniqueBlockId = getUniqueBlockIdFromFile(tuple.getBlockId(), blockIdToFilePath.get(tuple.getBlockId()));
+    S3BlockMetaData metaData = blockInfo.get(uniqueBlockId);
+    // Check whether the file metadata is received
+    if (metaData == null) {
+      if (!waitingTuples.contains(tuple)) {
+        waitingTuples.add(tuple);
+      }
+      return;
+    }
+    long partSize = tuple.getRecord().length;
+    PartETag partETag = null;
+    ByteArrayInputStream bis = new ByteArrayInputStream(tuple.getRecord().buffer);
+    // Check if it is a Single block of a file
+    if (metaData.isLastBlock && metaData.partNo == 1) {
+      ObjectMetadata omd = createObjectMetadata();
+      omd.setContentLength(partSize);
+      PutObjectResult result = s3Client.putObject(new PutObjectRequest(bucketName, metaData.getKeyName(), bis, omd));
+      partETag = new PartETag(1, result.getETag());
+    } else {
+      // Else upload use multi-part feature
+      try {
+        // Create request to upload a part.
+        UploadPartRequest uploadRequest = new UploadPartRequest().withBucketName(bucketName).withKey(metaData.getKeyName())
+            .withUploadId(metaData.getUploadId()).withPartNumber(metaData.getPartNo()).withInputStream(bis).withPartSize(partSize);
+        partETag =  s3Client.uploadPart(uploadRequest).getPartETag();
+      } catch (Exception e) {
+        throw new RuntimeException(e);
+      }
+    }
+    UploadBlockMetadata uploadmetadata = new UploadBlockMetadata(partETag, metaData.getKeyName());
+    output.emit(uploadmetadata);
+    currentWindowRecoveryState.put(uniqueBlockId, uploadmetadata);
+    try {
+      bis.close();
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  /**
+   * Creates the empty object metadata for initiate multipart upload request.
+   * @return the ObjectMetadata
+   */
+  public ObjectMetadata createObjectMetadata()
+  {
+    return new ObjectMetadata();
+  }
+
+  @Override
+  public void beforeCheckpoint(long windowId)
+  {
+  }
+
+  @Override
+  public void checkpointed(long windowId)
+  {
+  }
+
+  @Override
+  public void committed(long windowId)
+  {
+    try {
+      windowDataManager.committed(windowId);
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  @Override
+  public void handleIdleTime()
+  {
+    if (waitingTuples.size() > 0) {
+      processWaitBlocks();
+    }
+  }
+
+  /**
+   * Upload block metadata consists of partETag and key name.
+   */
+  public static class UploadBlockMetadata
+  {
+    @FieldSerializer.Bind(JavaSerializer.class)
+    private PartETag partETag;
+    private String keyName;
+
+    // For Kryo
+    public UploadBlockMetadata()
+    {
+    }
+
+    public UploadBlockMetadata(PartETag partETag, String keyName)
+    {
+      this.partETag = partETag;
+      this.keyName = keyName;
+    }
+
+    /**
+     * Get the partETag of the block
+     * @return the partETag
+     */
+    public PartETag getPartETag()
+    {
+      return partETag;
+    }
+
+    /**
+     * Return the key name of the file
+     * @return key name
+     */
+    public String getKeyName()
+    {
+      return keyName;
+    }
+
+    @Override
+    public int hashCode()
+    {
+      return keyName.hashCode();
+    }
+  }
+
+  /**
+   * S3 Block meta data consists of keyname, upload Id, part number and whether the block is last block or not.
+   */
+  public static class S3BlockMetaData
+  {
+    private String keyName;
+    private String uploadId;
+    private Integer partNo;
+    private boolean isLastBlock;
+
+    // For Kryo Serialization
+    public S3BlockMetaData()
+    {
+    }
+
+    public S3BlockMetaData(String keyName, String uploadId, Integer partNo)
+    {
+      this.keyName = keyName;
+      this.uploadId = uploadId;
+      this.partNo = partNo;
+      this.isLastBlock = false;
+    }
+
+    /**
+     * Return the key name of the file
+     * @return key name
+     */
+    public String getKeyName()
+    {
+      return keyName;
+    }
+
+    /**
+     * Return the upload id of the block
+     * @return the uplaod id
+     */
+    public String getUploadId()
+    {
+      return uploadId;
+    }
+
+    /**
+     * Return the part number of the block
+     * @return the part number
+     */
+    public Integer getPartNo()
+    {
+      return partNo;
+    }
+
+    /**
+     * Specifies whether the block is last or not.
+     * @return isLastBlock
+     */
+    public boolean isLastBlock()
+    {
+      return isLastBlock;
+    }
+
+    /**
+     * Sets the block is last or not.
+     * @param lastBlock Specifies whether the block is last or not
+     */
+    public void setLastBlock(boolean lastBlock)
+    {
+      isLastBlock = lastBlock;
+    }
+  }
+
+  /**
+   * Returns the name of the bucket in which to upload the blocks.
+   * @return bucket name
+   */
+  public String getBucketName()
+  {
+    return bucketName;
+  }
+
+  /**
+   * Sets the name of the bucket in which to upload the blocks.
+   * @param bucketName bucket name
+   */
+  public void setBucketName(@NotNull String bucketName)
+  {
+    this.bucketName = Preconditions.checkNotNull(bucketName);
+  }
+
+  /**
+   * Return the AWS access key
+   * @return access key
+   */
+  public String getAccessKey()
+  {
+    return accessKey;
+  }
+
+  /**
+   * Sets the AWS access key
+   * @param accessKey access key
+   */
+  public void setAccessKey(@NotNull String accessKey)
+  {
+    this.accessKey = Preconditions.checkNotNull(accessKey);
+  }
+
+  /**
+   * Return the AWS access key
+   * @return access key
+   */
+  public String getSecretAccessKey()
+  {
+    return secretAccessKey;
+  }
+
+  /**
+   * Sets the AWS access key
+   * @param secretAccessKey access key
+   */
+  public void setSecretAccessKey(@NotNull String secretAccessKey)
+  {
+    this.secretAccessKey = Preconditions.checkNotNull(secretAccessKey);
+  }
+
+  /**
+   * Return the AWS S3 end point
+   * @return S3 end point
+   */
+  public String getEndPoint()
+  {
+    return endPoint;
+  }
+
+  /**
+   * Sets the AWS S3 end point
+   * @param endPoint S3 end point
+   */
+  public void setEndPoint(String endPoint)
+  {
+    this.endPoint = endPoint;
+  }
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/a5e8fa3f/library/src/main/java/org/apache/apex/malhar/lib/fs/s3/S3FileMerger.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/fs/s3/S3FileMerger.java b/library/src/main/java/org/apache/apex/malhar/lib/fs/s3/S3FileMerger.java
new file mode 100644
index 0000000..96fbc29
--- /dev/null
+++ b/library/src/main/java/org/apache/apex/malhar/lib/fs/s3/S3FileMerger.java
@@ -0,0 +1,302 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.fs.s3;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import javax.validation.constraints.NotNull;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.apex.malhar.lib.wal.FSWindowDataManager;
+import org.apache.apex.malhar.lib.wal.WindowDataManager;
+import org.apache.hadoop.classification.InterfaceStability;
+
+import com.amazonaws.auth.BasicAWSCredentials;
+import com.amazonaws.services.s3.AmazonS3;
+import com.amazonaws.services.s3.AmazonS3Client;
+import com.amazonaws.services.s3.model.CompleteMultipartUploadRequest;
+import com.amazonaws.services.s3.model.CompleteMultipartUploadResult;
+import com.amazonaws.services.s3.model.PartETag;
+import com.esotericsoftware.kryo.serializers.FieldSerializer;
+import com.esotericsoftware.kryo.serializers.JavaSerializer;
+import com.google.common.base.Preconditions;
+
+import com.datatorrent.api.Context;
+import com.datatorrent.api.DefaultInputPort;
+import com.datatorrent.api.Operator;
+/**
+ * This operator can be used to merge the S3 blocks into a file. This operator will request for
+ * S3 CompleteMultipartUploadRequest once all the blocks are uploaded using multi-part feature.
+ * This operator is useful in context of S3 Output Module.
+ */
+
+@InterfaceStability.Evolving
+public class S3FileMerger implements Operator, Operator.CheckpointNotificationListener
+{
+  private static final Logger LOG = LoggerFactory.getLogger(S3FileMerger.class);
+  @NotNull
+  private String bucketName;
+  @NotNull
+  private String accessKey;
+  @NotNull
+  private String secretAccessKey;
+  private String endPoint;
+  protected transient List<String> uploadedFiles = new ArrayList<>();
+  private WindowDataManager windowDataManager = new FSWindowDataManager();
+  @FieldSerializer.Bind(JavaSerializer.class)
+  private Map<String, List<PartETag>> uploadParts = new HashMap<>();
+  private Map<String, S3InitiateFileUploadOperator.UploadFileMetadata> fileMetadatas = new HashMap<>();
+  protected transient long currentWindowId;
+  protected transient AmazonS3 s3Client;
+
+  /**
+   * Input port to receive UploadBlockMetadata
+   */
+  public final transient DefaultInputPort<S3BlockUploadOperator.UploadBlockMetadata> uploadMetadataInput = new DefaultInputPort<S3BlockUploadOperator.UploadBlockMetadata>()
+  {
+    @Override
+    public void process(S3BlockUploadOperator.UploadBlockMetadata tuple)
+    {
+      processUploadBlock(tuple);
+    }
+  };
+
+  /**
+   * Process to merge the uploaded block into a file.
+   * @param tuple uploaded block meta data
+   */
+  protected void processUploadBlock(S3BlockUploadOperator.UploadBlockMetadata tuple)
+  {
+    List<PartETag> listOfUploads = uploadParts.get(tuple.getKeyName());
+    if (listOfUploads == null) {
+      listOfUploads = new ArrayList<>();
+      uploadParts.put(tuple.getKeyName(), listOfUploads);
+    }
+    listOfUploads.add(tuple.getPartETag());
+    if (fileMetadatas.get(tuple.getKeyName()) != null) {
+      verifyAndEmitFileMerge(tuple.getKeyName());
+    }
+  }
+
+  /**
+   * Input port to receive UploadFileMetadata
+   */
+  public final transient DefaultInputPort<S3InitiateFileUploadOperator.UploadFileMetadata> filesMetadataInput = new DefaultInputPort<S3InitiateFileUploadOperator.UploadFileMetadata>()
+  {
+    @Override
+    public void process(S3InitiateFileUploadOperator.UploadFileMetadata tuple)
+    {
+      processFileMetadata(tuple);
+    }
+  };
+
+  /**
+   * Process to merge the uploaded blocks for the given file metadata.
+   * @param tuple file metadata
+   */
+  protected void processFileMetadata(S3InitiateFileUploadOperator.UploadFileMetadata tuple)
+  {
+    String keyName = tuple.getKeyName();
+    fileMetadatas.put(keyName, tuple);
+    if (uploadParts.get(keyName) != null) {
+      verifyAndEmitFileMerge(keyName);
+    }
+  }
+
+  @Override
+  public void setup(Context.OperatorContext context)
+  {
+    windowDataManager.setup(context);
+    s3Client = createClient();
+  }
+
+  /**
+   * Create AmazonS3 client using AWS credentials
+   * @return AmazonS3
+   */
+  protected AmazonS3 createClient()
+  {
+    AmazonS3 client = new AmazonS3Client(new BasicAWSCredentials(accessKey, secretAccessKey));
+    if (endPoint != null) {
+      client.setEndpoint(endPoint);
+    }
+    return client;
+  }
+
+
+  @Override
+  public void beginWindow(long windowId)
+  {
+    currentWindowId = windowId;
+  }
+
+  @Override
+  public void endWindow()
+  {
+    if (uploadedFiles.size() > 0) {
+      for (String keyName: uploadedFiles) {
+        uploadParts.remove(keyName);
+        fileMetadatas.remove(keyName);
+      }
+      uploadedFiles.clear();
+    }
+    if (currentWindowId > windowDataManager.getLargestCompletedWindow()) {
+      try {
+        windowDataManager.save("Uploaded Files", currentWindowId);
+      } catch (IOException e) {
+        throw new RuntimeException("Unable to save recovery", e);
+      }
+    }
+  }
+
+  @Override
+  public void teardown()
+  {
+    windowDataManager.teardown();
+  }
+
+  /**
+   * Send the CompleteMultipartUploadRequest to S3 if all the blocks of a file are uploaded into S3.
+   * @param keyName file to upload into S3
+   */
+  private void verifyAndEmitFileMerge(String keyName)
+  {
+    if (currentWindowId <= windowDataManager.getLargestCompletedWindow()) {
+      return;
+    }
+    S3InitiateFileUploadOperator.UploadFileMetadata uploadFileMetadata = fileMetadatas.get(keyName);
+    List<PartETag> partETags = uploadParts.get(keyName);
+    if (partETags == null || uploadFileMetadata == null ||
+        uploadFileMetadata.getFileMetadata().getNumberOfBlocks() != partETags.size()) {
+      return;
+    }
+
+    if (partETags.size() <= 1) {
+      uploadedFiles.add(keyName);
+      LOG.debug("Uploaded file {} successfully", keyName);
+      return;
+    }
+
+    CompleteMultipartUploadRequest compRequest = new CompleteMultipartUploadRequest(bucketName,
+        keyName, uploadFileMetadata.getUploadId(), partETags);
+    CompleteMultipartUploadResult result = s3Client.completeMultipartUpload(compRequest);
+    if (result.getETag() != null) {
+      uploadedFiles.add(keyName);
+      LOG.debug("Uploaded file {} successfully", keyName);
+    }
+  }
+
+  @Override
+  public void beforeCheckpoint(long windowId)
+  {
+  }
+
+  @Override
+  public void checkpointed(long windowId)
+  {
+  }
+
+  @Override
+  public void committed(long windowId)
+  {
+    try {
+      windowDataManager.committed(windowId);
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  /**
+   * Return the name of the bucket in which to upload the files
+   * @return name of the bucket
+   */
+  public String getBucketName()
+  {
+    return bucketName;
+  }
+
+  /**
+   * Sets the name of the bucket in which to upload the files.
+   * @param bucketName name of the bucket
+   */
+  public void setBucketName(@NotNull String bucketName)
+  {
+    this.bucketName = Preconditions.checkNotNull(bucketName);
+  }
+
+  /**
+   * Return the AWS access key
+   * @return the access key
+   */
+  public String getAccessKey()
+  {
+    return accessKey;
+  }
+
+  /**
+   * Sets the AWS access key
+   * @param accessKey AWS access key
+   */
+  public void setAccessKey(@NotNull String accessKey)
+  {
+    this.accessKey = Preconditions.checkNotNull(accessKey);
+  }
+
+  /**
+   * Returns the AWS secret access key
+   * @return AWS secret access key
+   */
+  public String getSecretAccessKey()
+  {
+    return secretAccessKey;
+  }
+
+  /**
+   * Sets the AWS secret access key
+   * @param secretAccessKey AWS secret access key
+   */
+  public void setSecretAccessKey(@NotNull String secretAccessKey)
+  {
+    this.secretAccessKey = Preconditions.checkNotNull(secretAccessKey);
+  }
+
+  /**
+   * Get the AWS S3 end point
+   * @return the AWS S3 end point
+   */
+  public String getEndPoint()
+  {
+    return endPoint;
+  }
+
+  /**
+   * Set the S3 end point
+   * @param endPoint end point
+   */
+  public void setEndPoint(String endPoint)
+  {
+    this.endPoint = endPoint;
+  }
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/a5e8fa3f/library/src/main/java/org/apache/apex/malhar/lib/fs/s3/S3InitiateFileUploadOperator.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/fs/s3/S3InitiateFileUploadOperator.java b/library/src/main/java/org/apache/apex/malhar/lib/fs/s3/S3InitiateFileUploadOperator.java
new file mode 100644
index 0000000..3a38265
--- /dev/null
+++ b/library/src/main/java/org/apache/apex/malhar/lib/fs/s3/S3InitiateFileUploadOperator.java
@@ -0,0 +1,378 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.fs.s3;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import javax.validation.constraints.NotNull;
+
+import org.apache.apex.malhar.lib.wal.FSWindowDataManager;
+import org.apache.apex.malhar.lib.wal.WindowDataManager;
+import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.fs.Path;
+
+import com.amazonaws.auth.BasicAWSCredentials;
+import com.amazonaws.services.s3.AmazonS3;
+import com.amazonaws.services.s3.AmazonS3Client;
+import com.amazonaws.services.s3.model.InitiateMultipartUploadRequest;
+import com.amazonaws.services.s3.model.InitiateMultipartUploadResult;
+import com.amazonaws.services.s3.model.ObjectMetadata;
+import com.google.common.base.Preconditions;
+
+import com.datatorrent.api.Context;
+import com.datatorrent.api.DefaultInputPort;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.api.Operator;
+import com.datatorrent.lib.io.fs.AbstractFileSplitter;
+
+/**
+ * This is an S3 Initiate file upload operator which can be used to initiate file upload and emits the upload id.
+ * Initiate the given file for upload only if the file contains more than one block.
+ * This operator is useful in context of S3 Output Module.
+ */
+@InterfaceStability.Evolving
+public class S3InitiateFileUploadOperator implements Operator, Operator.CheckpointNotificationListener
+{
+  @NotNull
+  private String bucketName;
+  @NotNull
+  private String accessKey;
+  @NotNull
+  private String secretAccessKey;
+  private String endPoint;
+  @NotNull
+  private String outputDirectoryPath;
+  private WindowDataManager windowDataManager = new FSWindowDataManager();
+  protected transient AmazonS3 s3Client;
+  protected transient long currentWindowId;
+  protected transient List<UploadFileMetadata> currentWindowRecoveryState;
+
+  public final transient DefaultOutputPort<UploadFileMetadata> fileMetadataOutput = new DefaultOutputPort<>();
+
+  public final transient DefaultOutputPort<UploadFileMetadata> uploadMetadataOutput = new DefaultOutputPort<>();
+
+  /**
+   * This input port receive file metadata and those files will be upload into S3.
+   */
+  public final transient DefaultInputPort<AbstractFileSplitter.FileMetadata> filesMetadataInput = new DefaultInputPort<AbstractFileSplitter.FileMetadata>()
+  {
+    @Override
+    public void process(AbstractFileSplitter.FileMetadata tuple)
+    {
+      processTuple(tuple);
+    }
+  };
+
+  /**
+   * For the input file, initiate the upload and emit the UploadFileMetadata through the fileMetadataOutput,
+   * uploadMetadataOutput ports.
+   * @param tuple given tuple
+   */
+  protected void processTuple(AbstractFileSplitter.FileMetadata tuple)
+  {
+    if (currentWindowId <= windowDataManager.getLargestCompletedWindow()) {
+      return;
+    }
+    String keyName = getKeyName(tuple.getFilePath());
+    String uploadId = "";
+    if (tuple.getNumberOfBlocks() > 1) {
+      InitiateMultipartUploadRequest initRequest = new InitiateMultipartUploadRequest(bucketName, keyName);
+      initRequest.setObjectMetadata(createObjectMetadata());
+      InitiateMultipartUploadResult initResponse = s3Client.initiateMultipartUpload(initRequest);
+      uploadId = initResponse.getUploadId();
+    }
+    UploadFileMetadata uploadFileMetadata = new UploadFileMetadata(tuple, uploadId, keyName);
+    fileMetadataOutput.emit(uploadFileMetadata);
+    uploadMetadataOutput.emit(uploadFileMetadata);
+    currentWindowRecoveryState.add(uploadFileMetadata);
+  }
+
+  /**
+   * Creates the empty object metadata for initiate multipart upload request.
+   * @return the ObjectMetadata
+   */
+  public ObjectMetadata createObjectMetadata()
+  {
+    return new ObjectMetadata();
+  }
+
+  @Override
+  public void setup(Context.OperatorContext context)
+  {
+    outputDirectoryPath = StringUtils.removeEnd(outputDirectoryPath, Path.SEPARATOR);
+    currentWindowRecoveryState = new ArrayList<>();
+    windowDataManager.setup(context);
+    s3Client = createClient();
+  }
+
+  /**
+   * Create AmazonS3 client using AWS credentials
+   * @return AmazonS3
+   */
+  protected AmazonS3 createClient()
+  {
+    AmazonS3 client = new AmazonS3Client(new BasicAWSCredentials(accessKey, secretAccessKey));
+    if (endPoint != null) {
+      client.setEndpoint(endPoint);
+    }
+    return client;
+  }
+
+  /**
+   * Generates the key name from the given file path and output directory path.
+   * @param filePath file path to upload
+   * @return key name for the given file
+   */
+  private String getKeyName(String filePath)
+  {
+    return outputDirectoryPath + Path.SEPARATOR + StringUtils.removeStart(filePath, Path.SEPARATOR);
+  }
+
+  @Override
+  public void beginWindow(long windowId)
+  {
+    currentWindowId = windowId;
+    if (windowId <= windowDataManager.getLargestCompletedWindow()) {
+      replay(windowId);
+    }
+  }
+
+  @Override
+  public void endWindow()
+  {
+    if (currentWindowId > windowDataManager.getLargestCompletedWindow()) {
+      try {
+        windowDataManager.save(currentWindowRecoveryState, currentWindowId);
+      } catch (IOException e) {
+        throw new RuntimeException("Unable to save recovery", e);
+      }
+    }
+    currentWindowRecoveryState.clear();
+  }
+
+  @Override
+  public void teardown()
+  {
+    windowDataManager.teardown();
+  }
+
+  protected void replay(long windowId)
+  {
+    try {
+      @SuppressWarnings("unchecked")
+      List<UploadFileMetadata> recoveredData = (List<UploadFileMetadata>)windowDataManager.retrieve(windowId);
+      if (recoveredData != null) {
+        for (UploadFileMetadata upfm : recoveredData) {
+          uploadMetadataOutput.emit(upfm);
+          fileMetadataOutput.emit(upfm);
+        }
+      }
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  @Override
+  public void beforeCheckpoint(long windowId)
+  {
+
+  }
+
+  @Override
+  public void checkpointed(long windowId)
+  {
+
+  }
+
+  @Override
+  public void committed(long windowId)
+  {
+    try {
+      windowDataManager.committed(windowId);
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  /**
+   * Return the name of the bucket in which to create the multipart upload.
+   * @return bucket name
+   */
+  public String getBucketName()
+  {
+    return bucketName;
+  }
+
+  /**
+   * Set the name of the bucket in which to create the multipart upload.
+   * @param bucketName bucket name
+   */
+  public void setBucketName(@NotNull String bucketName)
+  {
+    this.bucketName = Preconditions.checkNotNull(bucketName);
+  }
+
+  /**
+   * Return the AWS access key
+   * @return AWS access key
+   */
+  public String getAccessKey()
+  {
+    return accessKey;
+  }
+
+  /**
+   * Sets the AWS access key
+   * @param accessKey given access key
+   */
+  public void setAccessKey(@NotNull String accessKey)
+  {
+    this.accessKey = Preconditions.checkNotNull(accessKey);
+  }
+
+  /**
+   * Return the AWS secret access key
+   * @return the AWS secret access key
+   */
+  public String getSecretAccessKey()
+  {
+    return secretAccessKey;
+  }
+
+  /**
+   * Sets the AWS secret access key
+   * @param secretAccessKey secret access key
+   */
+  public void setSecretAccessKey(@NotNull String secretAccessKey)
+  {
+    this.secretAccessKey = Preconditions.checkNotNull(secretAccessKey);
+  }
+
+  /**
+   * Output directory path for the files to upload
+   * @return the output directory path
+   */
+  public String getOutputDirectoryPath()
+  {
+    return outputDirectoryPath;
+  }
+
+  /**
+   * Sets the output directory path for uploading new files.
+   * @param outputDirectoryPath output directory path
+   */
+  public void setOutputDirectoryPath(@NotNull String outputDirectoryPath)
+  {
+    this.outputDirectoryPath = Preconditions.checkNotNull(outputDirectoryPath);
+  }
+
+  /**
+   * Returns the window data manager.
+   * @return the windowDataManager
+   */
+  public WindowDataManager getWindowDataManager()
+  {
+    return windowDataManager;
+  }
+
+  /**
+   * Sets the window data manager
+   * @param windowDataManager given windowDataManager
+   */
+  public void setWindowDataManager(@NotNull WindowDataManager windowDataManager)
+  {
+    this.windowDataManager = Preconditions.checkNotNull(windowDataManager);
+  }
+
+  /**
+   * Returns the AWS S3 end point
+   * @return the S3 end point
+   */
+  public String getEndPoint()
+  {
+    return endPoint;
+  }
+
+  /**
+   * Sets the AWS S3 end point
+   * @param endPoint S3 end point
+   */
+  public void setEndPoint(String endPoint)
+  {
+    this.endPoint = endPoint;
+  }
+
+  /**
+   * A file upload metadata which contains file metadata, upload id, key name.
+   */
+  public static class UploadFileMetadata
+  {
+    private AbstractFileSplitter.FileMetadata fileMetadata;
+    private String uploadId;
+    private String keyName;
+
+    // For Kryo
+    public UploadFileMetadata()
+    {
+    }
+
+    public UploadFileMetadata(AbstractFileSplitter.FileMetadata fileMetadata, String uploadId, String keyName)
+    {
+      this.fileMetadata = fileMetadata;
+      this.uploadId = uploadId;
+      this.keyName = keyName;
+    }
+
+    @Override
+    public int hashCode()
+    {
+      return keyName.hashCode();
+    }
+
+    /**
+     * Returns the name of the key generated from file path.
+     * @return the key name
+     */
+    public String getKeyName()
+    {
+      return keyName;
+    }
+
+    /**
+     * Return the file metadata of a file.
+     * @return the fileMetadata
+     */
+    public AbstractFileSplitter.FileMetadata getFileMetadata()
+    {
+      return fileMetadata;
+    }
+
+    /**
+     * Returns the unique upload id of a file.
+     * @return the upload Id of a file
+     */
+    public String getUploadId()
+    {
+      return uploadId;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/a5e8fa3f/library/src/main/java/org/apache/apex/malhar/lib/fs/s3/S3OutputModule.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/fs/s3/S3OutputModule.java b/library/src/main/java/org/apache/apex/malhar/lib/fs/s3/S3OutputModule.java
new file mode 100644
index 0000000..6c3d8be
--- /dev/null
+++ b/library/src/main/java/org/apache/apex/malhar/lib/fs/s3/S3OutputModule.java
@@ -0,0 +1,325 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.fs.s3;
+
+import javax.validation.constraints.Min;
+import javax.validation.constraints.NotNull;
+
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
+
+import com.datatorrent.api.Context;
+import com.datatorrent.api.DAG;
+import com.datatorrent.api.Module;
+import com.datatorrent.common.partitioner.StatelessPartitioner;
+import com.datatorrent.lib.io.block.AbstractBlockReader;
+import com.datatorrent.lib.io.block.BlockMetadata;
+import com.datatorrent.lib.io.fs.AbstractFileSplitter;
+import com.datatorrent.netlet.util.Slice;
+
+import static com.datatorrent.api.Context.OperatorContext.TIMEOUT_WINDOW_COUNT;
+
+/**
+ * S3OutputModule can be used to upload the files/directory into S3. This module supports
+ * parallel uploads of multiple blocks of the same file and merge those blocks in sequence.
+ *
+ * Below operators are wrapped into single component using Module API
+ *  - S3InitiateFileUploadOperator
+ *  - S3BlockUploadOperator
+ *  - S3FileMerger
+ *
+ * Initial BenchMark Results
+ * -------------------------
+ * The Module writes 18 MB/s to S3 using multi part upload feature with the following configuration
+ *
+ * File Size = 1 GB
+ * Partition count of S3BlockUploadOperator = 6
+ * Partition count of S3FileMerger = 1
+ * Container memory size of this module as follows:
+ *          S3InitiateFileUploadOperator  = 1 GB
+ *          S3BlockUploadOperator         = 2.5 GB
+ *          S3FileMerger          = 2 GB
+ *
+ *
+ *  @displayName S3 Output Module
+ *  @tags S3, Output
+ */
+@InterfaceStability.Evolving
+public class S3OutputModule implements Module
+{
+  /**
+   * AWS access key
+   */
+  @NotNull
+  private String accessKey;
+  /**
+   * AWS secret access key
+   */
+  @NotNull
+  private String secretAccessKey;
+
+  /**
+   * S3 End point
+   */
+  private String endPoint;
+  /**
+   * Name of the bucket in which to upload the files
+   */
+  @NotNull
+  private String bucketName;
+
+  /**
+   * Path of the output directory. Relative path of the files copied will be
+   * maintained w.r.t. source directory and output directory
+   */
+  @NotNull
+  private String outputDirectoryPath;
+
+  /**
+   * Specified as count of streaming windows. This value will set to the operators in this module because
+   * the operators in this module is mostly interacts with the Amazon S3.
+   * Due to this reason, window id of these operators might be lag behind with the upstream operators.
+   */
+  @Min(120)
+  private int timeOutWindowCount = 6000;
+
+  /**
+   * Creates the number of instances of S3FileMerger operator.
+   */
+  @Min(1)
+  private int mergerCount = 1;
+  /**
+   * Input port for files metadata.
+   */
+  public final transient ProxyInputPort<AbstractFileSplitter.FileMetadata> filesMetadataInput = new ProxyInputPort<AbstractFileSplitter.FileMetadata>();
+
+  /**
+   * Input port for blocks metadata
+   */
+  public final transient ProxyInputPort<BlockMetadata.FileBlockMetadata> blocksMetadataInput = new ProxyInputPort<BlockMetadata.FileBlockMetadata>();
+
+  /**
+   * Input port for blocks data
+   */
+  public final transient ProxyInputPort<AbstractBlockReader.ReaderRecord<Slice>> blockData = new ProxyInputPort<AbstractBlockReader.ReaderRecord<Slice>>();
+
+  @Override
+  public void populateDAG(DAG dag, Configuration conf)
+  {
+    // DAG for S3 Output Module as follows:
+    //   ---- S3InitiateFileUploadOperator -----|
+    //             |                    S3FileMerger
+    //   ----  S3BlockUploadOperator ------------|
+
+    S3InitiateFileUploadOperator initiateUpload = dag.addOperator("InitiateUpload", createS3InitiateUpload());
+    initiateUpload.setAccessKey(accessKey);
+    initiateUpload.setSecretAccessKey(secretAccessKey);
+    initiateUpload.setBucketName(bucketName);
+    initiateUpload.setOutputDirectoryPath(outputDirectoryPath);
+
+    S3BlockUploadOperator blockUploader = dag.addOperator("BlockUpload", createS3BlockUpload());
+    blockUploader.setAccessKey(accessKey);
+    blockUploader.setSecretAccessKey(secretAccessKey);
+    blockUploader.setBucketName(bucketName);
+
+    S3FileMerger fileMerger = dag.addOperator("FileMerger", createS3FileMerger());
+    fileMerger.setAccessKey(accessKey);
+    fileMerger.setSecretAccessKey(secretAccessKey);
+    fileMerger.setBucketName(bucketName);
+
+    if (endPoint != null) {
+      initiateUpload.setEndPoint(endPoint);
+      blockUploader.setEndPoint(endPoint);
+      fileMerger.setEndPoint(endPoint);
+    }
+
+    dag.setInputPortAttribute(blockUploader.blockInput, Context.PortContext.PARTITION_PARALLEL, true);
+    dag.setInputPortAttribute(blockUploader.blockMetadataInput, Context.PortContext.PARTITION_PARALLEL, true);
+
+    dag.setAttribute(initiateUpload, TIMEOUT_WINDOW_COUNT, timeOutWindowCount);
+    dag.setAttribute(blockUploader, TIMEOUT_WINDOW_COUNT, timeOutWindowCount);
+    dag.setAttribute(fileMerger, TIMEOUT_WINDOW_COUNT, timeOutWindowCount);
+    dag.setUnifierAttribute(blockUploader.output, TIMEOUT_WINDOW_COUNT, timeOutWindowCount);
+
+    dag.setAttribute(fileMerger,Context.OperatorContext.PARTITIONER, new StatelessPartitioner<S3FileMerger>(mergerCount));
+    // Add Streams
+    dag.addStream("InitiateUploadIDToMerger", initiateUpload.fileMetadataOutput, fileMerger.filesMetadataInput);
+    dag.addStream("InitiateUploadIDToWriter", initiateUpload.uploadMetadataOutput, blockUploader.uploadMetadataInput);
+    dag.addStream("WriterToMerger", blockUploader.output, fileMerger.uploadMetadataInput);
+
+    // Set the proxy ports
+    filesMetadataInput.set(initiateUpload.filesMetadataInput);
+    blocksMetadataInput.set(blockUploader.blockMetadataInput);
+    blockData.set(blockUploader.blockInput);
+  }
+
+  /**
+   * Create the S3InitiateFileUploadOperator for initiate upload
+   * @return S3InitiateFileUploadOperator
+   */
+  protected S3InitiateFileUploadOperator createS3InitiateUpload()
+  {
+    return new S3InitiateFileUploadOperator();
+  }
+
+  /**
+   * Create the S3BlockUploadOperator for block upload into S3 bucket
+   * @return S3BlockUploadOperator
+   */
+  protected S3BlockUploadOperator createS3BlockUpload()
+  {
+    return new S3BlockUploadOperator();
+  }
+
+  /**
+   * Create the S3FileMerger for sending complete request
+   * @return S3FileMerger
+   */
+  protected S3FileMerger createS3FileMerger()
+  {
+    return new S3FileMerger();
+  }
+
+  /**
+   * Get the AWS access key
+   * @return AWS access key
+   */
+  public String getAccessKey()
+  {
+    return accessKey;
+  }
+
+  /**
+   * Set the AWS access key
+   * @param accessKey access key
+   */
+  public void setAccessKey(String accessKey)
+  {
+    this.accessKey = accessKey;
+  }
+
+  /**
+   * Return the AWS secret access key
+   * @return AWS secret access key
+   */
+  public String getSecretAccessKey()
+  {
+    return secretAccessKey;
+  }
+
+  /**
+   * Set the AWS secret access key
+   * @param secretAccessKey AWS secret access key
+   */
+  public void setSecretAccessKey(String secretAccessKey)
+  {
+    this.secretAccessKey = secretAccessKey;
+  }
+
+
+  /**
+   * Get the name of the bucket in which to upload the files
+   * @return bucket name
+   */
+  public String getBucketName()
+  {
+    return bucketName;
+  }
+
+  /**
+   * Set the name of the bucket in which to upload the files
+   * @param bucketName name of the bucket
+   */
+  public void setBucketName(String bucketName)
+  {
+    this.bucketName = bucketName;
+  }
+
+  /**
+   * Return the S3 End point
+   * @return S3 End point
+   */
+  public String getEndPoint()
+  {
+    return endPoint;
+  }
+
+  /**
+   * Set the S3 End point
+   * @param endPoint S3 end point
+   */
+  public void setEndPoint(String endPoint)
+  {
+    this.endPoint = endPoint;
+  }
+
+  /**
+   * Get the path of the output directory.
+   * @return path of output directory
+   */
+  public String getOutputDirectoryPath()
+  {
+    return outputDirectoryPath;
+  }
+
+  /**
+   * Set the path of the output directory.
+   * @param outputDirectoryPath path of output directory
+   */
+  public void setOutputDirectoryPath(String outputDirectoryPath)
+  {
+    this.outputDirectoryPath = outputDirectoryPath;
+  }
+
+  /**
+   * Get the number of streaming windows for the operators which have stalled processing.
+   * @return the number of streaming windows
+   */
+  public int getTimeOutWindowCount()
+  {
+    return timeOutWindowCount;
+  }
+
+  /**
+   * Set the number of streaming windows.
+   * @param timeOutWindowCount given number of streaming windows for time out.
+   */
+  public void setTimeOutWindowCount(int timeOutWindowCount)
+  {
+    this.timeOutWindowCount = timeOutWindowCount;
+  }
+
+  /**
+   * Get the partition count of S3FileMerger operator
+   * @return the partition count
+   */
+  public int getMergerCount()
+  {
+    return mergerCount;
+  }
+
+  /**
+   * Set the partition count of S3FileMerger Operator
+   * @param mergerCount given partition count
+   */
+  public void setMergerCount(int mergerCount)
+  {
+    this.mergerCount = mergerCount;
+  }
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/a5e8fa3f/library/src/test/java/org/apache/apex/malhar/lib/fs/s3/S3InitiateFileUploadOperatorTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/org/apache/apex/malhar/lib/fs/s3/S3InitiateFileUploadOperatorTest.java b/library/src/test/java/org/apache/apex/malhar/lib/fs/s3/S3InitiateFileUploadOperatorTest.java
new file mode 100644
index 0000000..a077fb9
--- /dev/null
+++ b/library/src/test/java/org/apache/apex/malhar/lib/fs/s3/S3InitiateFileUploadOperatorTest.java
@@ -0,0 +1,119 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.fs.s3;
+
+import java.io.IOException;
+
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+import com.amazonaws.services.s3.AmazonS3;
+import com.amazonaws.services.s3.model.InitiateMultipartUploadRequest;
+import com.amazonaws.services.s3.model.InitiateMultipartUploadResult;
+
+import com.datatorrent.api.Attribute;
+import com.datatorrent.api.Context;
+import com.datatorrent.api.DAG;
+import com.datatorrent.lib.helper.OperatorContextTestHelper;
+import com.datatorrent.lib.io.fs.AbstractFileSplitter;
+import com.datatorrent.lib.testbench.CollectorTestSink;
+
+import static org.mockito.Mockito.any;
+import static org.mockito.Mockito.when;
+
+/**
+ * Testing the S3InitiateFileUploadOperator operator. It verifies the generated upload id by S3InitiateFileUploadOperator
+ * and client through the mock are same or not.
+ */
+public class S3InitiateFileUploadOperatorTest
+{
+  private String uploadId = "uploadfile1";
+  private static final String APPLICATION_PATH_PREFIX = "target/s3outputtest/";
+  private String applicationPath;
+  private Attribute.AttributeMap.DefaultAttributeMap attributes;
+  private Context.OperatorContext context;
+  @Mock
+  public AmazonS3 client;
+  @Mock
+  public AbstractFileSplitter.FileMetadata fileMetadata;
+
+  public class S3InitiateFileUploadTest extends S3InitiateFileUploadOperator
+  {
+    @Override
+    protected AmazonS3 createClient()
+    {
+      return client;
+    }
+  }
+
+  @Before
+  public void beforeTest()
+  {
+    applicationPath =  OperatorContextTestHelper.getUniqueApplicationPath(APPLICATION_PATH_PREFIX);
+    attributes = new Attribute.AttributeMap.DefaultAttributeMap();
+    attributes.put(DAG.APPLICATION_PATH, applicationPath);
+    context = new OperatorContextTestHelper.TestIdOperatorContext(1, attributes);
+  }
+
+  @After
+  public void afterTest()
+  {
+    Path root = new Path(applicationPath);
+    try {
+      FileSystem fs = FileSystem.newInstance(root.toUri(), new Configuration());
+      fs.delete(root, true);
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  @Test
+  public void testInitiateUpload()
+  {
+    InitiateMultipartUploadResult result = new InitiateMultipartUploadResult();
+    result.setUploadId(uploadId);
+
+    MockitoAnnotations.initMocks(this);
+    when(client.initiateMultipartUpload(any(InitiateMultipartUploadRequest.class))).thenReturn(result);
+    when(fileMetadata.getFilePath()).thenReturn("/tmp/file1.txt");
+    when(fileMetadata.getNumberOfBlocks()).thenReturn(4);
+
+    S3InitiateFileUploadTest operator = new S3InitiateFileUploadTest();
+    operator.setBucketName("testbucket");
+    operator.setup(context);
+
+    CollectorTestSink<S3InitiateFileUploadOperator.UploadFileMetadata> fileSink = new CollectorTestSink<>();
+    CollectorTestSink<Object> tmp = (CollectorTestSink)fileSink;
+    operator.fileMetadataOutput.setSink(tmp);
+    operator.beginWindow(0);
+    operator.processTuple(fileMetadata);
+    operator.endWindow();
+
+    S3InitiateFileUploadOperator.UploadFileMetadata emitted = (S3InitiateFileUploadOperator.UploadFileMetadata)tmp.collectedTuples.get(0);
+    Assert.assertEquals("Upload ID :", uploadId, emitted.getUploadId());
+  }
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/a5e8fa3f/library/src/test/java/org/apache/apex/malhar/lib/fs/s3/S3OutputModuleMockTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/org/apache/apex/malhar/lib/fs/s3/S3OutputModuleMockTest.java b/library/src/test/java/org/apache/apex/malhar/lib/fs/s3/S3OutputModuleMockTest.java
new file mode 100644
index 0000000..8fe5ef9
--- /dev/null
+++ b/library/src/test/java/org/apache/apex/malhar/lib/fs/s3/S3OutputModuleMockTest.java
@@ -0,0 +1,171 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.fs.s3;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.concurrent.Callable;
+
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+import com.amazonaws.services.s3.AmazonS3;
+import com.amazonaws.services.s3.model.CompleteMultipartUploadRequest;
+import com.amazonaws.services.s3.model.CompleteMultipartUploadResult;
+import com.amazonaws.services.s3.model.InitiateMultipartUploadRequest;
+import com.amazonaws.services.s3.model.InitiateMultipartUploadResult;
+import com.amazonaws.services.s3.model.PutObjectRequest;
+import com.amazonaws.services.s3.model.PutObjectResult;
+import com.amazonaws.services.s3.model.UploadPartRequest;
+import com.amazonaws.services.s3.model.UploadPartResult;
+
+import com.datatorrent.api.DAG;
+import com.datatorrent.api.LocalMode;
+import com.datatorrent.api.StreamingApplication;
+import com.datatorrent.lib.helper.OperatorContextTestHelper;
+import com.datatorrent.lib.io.fs.FSInputModule;
+import com.datatorrent.stram.StramLocalCluster;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.when;
+
+/**
+ * Verifies the S3OutputModule using the application. This reads the data from local file system
+ * "input" directory and uploads the files into "output" directory.
+ */
+public class S3OutputModuleMockTest
+{
+  private String uploadId = "uploadfile";
+  private static final String APPLICATION_PATH_PREFIX = "target/s3outputmocktest/";
+  private static final String FILE_DATA = "Testing the S3OutputModule. This File has more data hence more blocks.";
+  private static final String FILE = "file.txt";
+  private String inputDir;
+  private String outputDir;
+  private String applicationPath;
+  private File inputFile;
+  @Mock
+  public static AmazonS3 client;
+
+  @Before
+  public void beforeTest() throws IOException
+  {
+    applicationPath = OperatorContextTestHelper.getUniqueApplicationPath(APPLICATION_PATH_PREFIX);
+    inputDir = applicationPath + File.separator + "input";
+    outputDir = applicationPath + File.separator + "output";
+    inputFile = new File(inputDir + File.separator + FILE);
+    FileUtils.writeStringToFile(inputFile, FILE_DATA);
+  }
+
+  @After
+  public void afterTest()
+  {
+    Path root = new Path(applicationPath);
+    try {
+      FileSystem fs = FileSystem.newInstance(root.toUri(), new Configuration());
+      fs.delete(root, true);
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  private CompleteMultipartUploadResult completeMultiPart() throws IOException
+  {
+    FileUtils.copyFile(inputFile, new File(outputDir + File.separator + FILE));
+    CompleteMultipartUploadResult result = new CompleteMultipartUploadResult();
+    result.setETag(outputDir);
+    return result;
+  }
+
+  @Test
+  public void testS3OutputModule() throws Exception
+  {
+    InitiateMultipartUploadResult result = new InitiateMultipartUploadResult();
+    result.setUploadId(uploadId);
+
+    PutObjectResult objResult = new PutObjectResult();
+    objResult.setETag("SuccessFullyUploaded");
+
+    UploadPartResult partResult = new UploadPartResult();
+    partResult.setPartNumber(1);
+    partResult.setETag("SuccessFullyPartUploaded");
+
+    MockitoAnnotations.initMocks(this);
+    when(client.initiateMultipartUpload(any(InitiateMultipartUploadRequest.class))).thenReturn(result);
+    when(client.putObject(any(PutObjectRequest.class))).thenReturn(objResult);
+    when(client.uploadPart(any(UploadPartRequest.class))).thenReturn(partResult);
+    when(client.completeMultipartUpload(any(CompleteMultipartUploadRequest.class))).thenReturn(completeMultiPart());
+
+    Application app = new S3OutputModuleMockTest.Application();
+    Configuration conf = new Configuration();
+    conf.set("dt.operator.HDFSInputModule.prop.files", inputDir);
+    conf.set("dt.operator.HDFSInputModule.prop.blockSize", "10");
+    conf.set("dt.operator.HDFSInputModule.prop.blocksThreshold", "1");
+    conf.set("dt.attr.CHECKPOINT_WINDOW_COUNT","20");
+
+    conf.set("dt.operator.S3OutputModule.prop.accessKey", "accessKey");
+    conf.set("dt.operator.S3OutputModule.prop.secretAccessKey", "secretKey");
+    conf.set("dt.operator.S3OutputModule.prop.bucketName", "bucketKey");
+    conf.set("dt.operator.S3OutputModule.prop.outputDirectoryPath", outputDir);
+
+    Path outDir = new Path("file://" + new File(outputDir).getAbsolutePath());
+    final Path outputFilePath =  new Path(outDir.toString() + File.separator + FILE);
+    final FileSystem fs = FileSystem.newInstance(outDir.toUri(), new Configuration());
+    LocalMode lma = LocalMode.newInstance();
+    lma.prepareDAG(app, conf);
+    LocalMode.Controller lc = lma.getController();
+    lc.setHeartbeatMonitoringEnabled(true);
+
+    ((StramLocalCluster)lc).setExitCondition(new Callable<Boolean>()
+    {
+      @Override
+      public Boolean call() throws Exception
+      {
+        return fs.exists(outputFilePath);
+      }
+    });
+    lc.run(10000);
+
+    Assert.assertTrue("output file exist", fs.exists(outputFilePath));
+  }
+
+  private static class Application implements StreamingApplication
+  {
+    @Override
+    public void populateDAG(DAG dag, Configuration conf)
+    {
+      FSInputModule inputModule = dag.addModule("HDFSInputModule", new FSInputModule());
+      S3OutputTestModule outputModule = dag.addModule("S3OutputModule", new S3OutputTestModule());
+
+      dag.addStream("FileMetaData", inputModule.filesMetadataOutput, outputModule.filesMetadataInput);
+      dag.addStream("BlocksMetaData", inputModule.blocksMetadataOutput, outputModule.blocksMetadataInput)
+        .setLocality(DAG.Locality.CONTAINER_LOCAL);
+      dag.addStream("BlocksData", inputModule.messages, outputModule.blockData).setLocality(DAG.Locality.CONTAINER_LOCAL);
+
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/a5e8fa3f/library/src/test/java/org/apache/apex/malhar/lib/fs/s3/S3OutputTestModule.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/org/apache/apex/malhar/lib/fs/s3/S3OutputTestModule.java b/library/src/test/java/org/apache/apex/malhar/lib/fs/s3/S3OutputTestModule.java
new file mode 100644
index 0000000..f1cb291
--- /dev/null
+++ b/library/src/test/java/org/apache/apex/malhar/lib/fs/s3/S3OutputTestModule.java
@@ -0,0 +1,72 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.fs.s3;
+
+import com.amazonaws.services.s3.AmazonS3;
+
+import static org.apache.apex.malhar.lib.fs.s3.S3OutputModuleMockTest.client;
+
+public class S3OutputTestModule extends S3OutputModule
+{
+
+  public static class S3InitiateFileUploadTest extends S3InitiateFileUploadOperator
+  {
+    @Override
+    protected AmazonS3 createClient()
+    {
+      return client;
+    }
+  }
+
+  public static class S3BlockUploadTest extends S3BlockUploadOperator
+  {
+    @Override
+    protected AmazonS3 createClient()
+    {
+      return client;
+    }
+  }
+
+  public static class S3FileMergerTest extends S3FileMerger
+  {
+    @Override
+    protected AmazonS3 createClient()
+    {
+      return client;
+    }
+  }
+
+  @Override
+  protected S3InitiateFileUploadOperator createS3InitiateUpload()
+  {
+    return new S3InitiateFileUploadTest();
+  }
+
+  @Override
+  protected S3BlockUploadOperator createS3BlockUpload()
+  {
+    return new S3BlockUploadTest();
+  }
+
+  @Override
+  protected S3FileMerger createS3FileMerger()
+  {
+    return new S3FileMergerTest();
+  }
+}