You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by bh...@apache.org on 2016/12/01 05:28:54 UTC
apex-malhar git commit: APEXMALHAR-2022 Development of S3 Output
Module
Repository: apex-malhar
Updated Branches:
refs/heads/master 243dceb56 -> a5e8fa3fa
APEXMALHAR-2022 Development of S3 Output Module
Project: http://git-wip-us.apache.org/repos/asf/apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/apex-malhar/commit/a5e8fa3f
Tree: http://git-wip-us.apache.org/repos/asf/apex-malhar/tree/a5e8fa3f
Diff: http://git-wip-us.apache.org/repos/asf/apex-malhar/diff/a5e8fa3f
Branch: refs/heads/master
Commit: a5e8fa3facca750f5d7402c2c29e7cbabe53bd9e
Parents: 243dceb
Author: chaitanya <ch...@apache.org>
Authored: Wed Nov 30 10:47:36 2016 +0530
Committer: chaitanya <ch...@apache.org>
Committed: Wed Nov 30 16:31:42 2016 +0530
----------------------------------------------------------------------
.../malhar/lib/fs/s3/S3BlockUploadOperator.java | 538 +++++++++++++++++++
.../apex/malhar/lib/fs/s3/S3FileMerger.java | 302 +++++++++++
.../lib/fs/s3/S3InitiateFileUploadOperator.java | 378 +++++++++++++
.../apex/malhar/lib/fs/s3/S3OutputModule.java | 325 +++++++++++
.../fs/s3/S3InitiateFileUploadOperatorTest.java | 119 ++++
.../lib/fs/s3/S3OutputModuleMockTest.java | 171 ++++++
.../malhar/lib/fs/s3/S3OutputTestModule.java | 72 +++
7 files changed, 1905 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/a5e8fa3f/library/src/main/java/org/apache/apex/malhar/lib/fs/s3/S3BlockUploadOperator.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/fs/s3/S3BlockUploadOperator.java b/library/src/main/java/org/apache/apex/malhar/lib/fs/s3/S3BlockUploadOperator.java
new file mode 100644
index 0000000..aafa3a7
--- /dev/null
+++ b/library/src/main/java/org/apache/apex/malhar/lib/fs/s3/S3BlockUploadOperator.java
@@ -0,0 +1,538 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.fs.s3;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+import javax.validation.constraints.NotNull;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.apex.malhar.lib.wal.FSWindowDataManager;
+import org.apache.apex.malhar.lib.wal.WindowDataManager;
+import org.apache.hadoop.classification.InterfaceStability;
+
+import com.amazonaws.auth.BasicAWSCredentials;
+import com.amazonaws.services.s3.AmazonS3;
+import com.amazonaws.services.s3.AmazonS3Client;
+import com.amazonaws.services.s3.model.ObjectMetadata;
+import com.amazonaws.services.s3.model.PartETag;
+import com.amazonaws.services.s3.model.PutObjectRequest;
+import com.amazonaws.services.s3.model.PutObjectResult;
+import com.amazonaws.services.s3.model.UploadPartRequest;
+import com.esotericsoftware.kryo.serializers.FieldSerializer;
+import com.esotericsoftware.kryo.serializers.JavaSerializer;
+import com.google.common.base.Preconditions;
+
+import com.datatorrent.api.Context;
+import com.datatorrent.api.DefaultInputPort;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.api.Operator;
+import com.datatorrent.lib.io.block.AbstractBlockReader;
+import com.datatorrent.lib.io.block.BlockMetadata;
+import com.datatorrent.netlet.util.Slice;
+
+/**
+ * This operator can be used to upload the block into S3 bucket using multi-part feature or putObject API.
+ * Upload the block into S3 using multi-part feature only if the number of blocks of a file is > 1.
+ * This operator is useful in context of S3 Output Module.
+ */
+
+@InterfaceStability.Evolving
+public class S3BlockUploadOperator implements Operator, Operator.CheckpointNotificationListener, Operator.IdleTimeHandler
+{
+ private static final Logger LOG = LoggerFactory.getLogger(S3BlockUploadOperator.class);
+ @NotNull
+ private String bucketName;
+ @NotNull
+ private String accessKey;
+ @NotNull
+ private String secretAccessKey;
+ private String endPoint;
+ private Map<String, S3BlockMetaData> blockInfo = new HashMap<>();
+ private transient Map<Long, String> blockIdToFilePath = new HashMap<>();
+ private WindowDataManager windowDataManager = new FSWindowDataManager();
+ protected transient AmazonS3 s3Client;
+ private transient long currentWindowId;
+ private transient List<AbstractBlockReader.ReaderRecord<Slice>> waitingTuples;
+ private transient Map<String, UploadBlockMetadata> currentWindowRecoveryState;
+ public final transient DefaultOutputPort<UploadBlockMetadata> output = new DefaultOutputPort<>();
+
+ /**
+ * This input port receives incoming tuple's(Block data).
+ */
+ public final transient DefaultInputPort<AbstractBlockReader.ReaderRecord<Slice>> blockInput = new DefaultInputPort<AbstractBlockReader.ReaderRecord<Slice>>()
+ {
+ @Override
+ public void process(AbstractBlockReader.ReaderRecord<Slice> tuple)
+ {
+ uploadBlockIntoS3(tuple);
+ }
+ };
+
+ /**
+ * Input port to receive Block meta data
+ */
+ public final transient DefaultInputPort<BlockMetadata.FileBlockMetadata> blockMetadataInput = new DefaultInputPort<BlockMetadata.FileBlockMetadata>()
+ {
+ @Override
+ public void process(BlockMetadata.FileBlockMetadata blockMetadata)
+ {
+ if (currentWindowId <= windowDataManager.getLargestCompletedWindow()) {
+ return;
+ }
+ blockIdToFilePath.put(blockMetadata.getBlockId(), blockMetadata.getFilePath());
+ LOG.debug("received blockId {} for file {} ", blockMetadata.getBlockId(), blockMetadata.getFilePath());
+ }
+ };
+
+ /**
+ * Input port to receive upload file meta data.
+ */
+ public final transient DefaultInputPort<S3InitiateFileUploadOperator.UploadFileMetadata> uploadMetadataInput = new DefaultInputPort<S3InitiateFileUploadOperator.UploadFileMetadata>()
+ {
+ @Override
+ public void process(S3InitiateFileUploadOperator.UploadFileMetadata tuple)
+ {
+ processUploadFileMetadata(tuple);
+ }
+ };
+
+ /**
+ * Convert each block of a given file into S3BlockMetaData
+ * @param tuple UploadFileMetadata
+ */
+ protected void processUploadFileMetadata(S3InitiateFileUploadOperator.UploadFileMetadata tuple)
+ {
+ long[] blocks = tuple.getFileMetadata().getBlockIds();
+ String filePath = tuple.getFileMetadata().getFilePath();
+ for (int i = 0; i < blocks.length; i++) {
+ String blockId = getUniqueBlockIdFromFile(blocks[i], filePath);
+ if (blockInfo.get(blockId) != null) {
+ break;
+ }
+ blockInfo.put(blockId, new S3BlockMetaData(tuple.getKeyName(), tuple.getUploadId(), i + 1));
+ }
+ if (blocks.length > 0) {
+ blockInfo.get(getUniqueBlockIdFromFile(blocks[blocks.length - 1], filePath)).setLastBlock(true);
+ }
+ }
+
+ /**
+ * Construct the unique block id from the given block id and file path.
+ * @param blockId Id of the block
+ * @param filepath given filepath
+ * @return unique block id
+ */
+ public static String getUniqueBlockIdFromFile(long blockId, String filepath)
+ {
+ return Long.toString(blockId) + "|" + filepath;
+ }
+
+ @Override
+ public void setup(Context.OperatorContext context)
+ {
+ waitingTuples = new ArrayList<>();
+ currentWindowRecoveryState = new HashMap<>();
+ windowDataManager.setup(context);
+ s3Client = createClient();
+ }
+
+ /**
+ * Create AmazonS3 client using AWS credentials
+ * @return AmazonS3
+ */
+ protected AmazonS3 createClient()
+ {
+ AmazonS3 client = new AmazonS3Client(new BasicAWSCredentials(accessKey, secretAccessKey));
+ if (endPoint != null) {
+ client.setEndpoint(endPoint);
+ }
+ return client;
+ }
+
+ @Override
+ public void beginWindow(long windowId)
+ {
+ currentWindowId = windowId;
+ if (windowId <= windowDataManager.getLargestCompletedWindow()) {
+ replay(windowId);
+ }
+ }
+
+ /**
+ * Replay the state.
+ * @param windowId replay window Id
+ */
+ protected void replay(long windowId)
+ {
+ try {
+ @SuppressWarnings("unchecked")
+ Map<String, UploadBlockMetadata> recoveredData = (Map<String, UploadBlockMetadata>)windowDataManager.retrieve(windowId);
+ if (recoveredData == null) {
+ return;
+ }
+ for (Map.Entry<String, UploadBlockMetadata> uploadBlockMetadata: recoveredData.entrySet()) {
+ output.emit(uploadBlockMetadata.getValue());
+ blockInfo.remove(uploadBlockMetadata.getKey());
+ }
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public void endWindow()
+ {
+ if (waitingTuples.size() > 0) {
+ processWaitBlocks();
+ }
+
+ for (String uniqueblockId : currentWindowRecoveryState.keySet()) {
+ long blockId = Long.parseLong(uniqueblockId.substring(0, uniqueblockId.indexOf("|")));
+ LOG.debug("Successfully uploaded {} block", blockId);
+ blockIdToFilePath.remove(blockId);
+ blockInfo.remove(uniqueblockId);
+ }
+
+ if (blockIdToFilePath.size() > 0) {
+ for (Long blockId : blockIdToFilePath.keySet()) {
+ LOG.info("Unable to uploaded {} block", blockId);
+ }
+ blockIdToFilePath.clear();
+ }
+
+ if (currentWindowId > windowDataManager.getLargestCompletedWindow()) {
+ try {
+ windowDataManager.save(currentWindowRecoveryState, currentWindowId);
+ } catch (IOException e) {
+ throw new RuntimeException("Unable to save recovery", e);
+ }
+ }
+ currentWindowRecoveryState.clear();
+ }
+
+ @Override
+ public void teardown()
+ {
+ windowDataManager.teardown();
+ }
+
+ /**
+ * Process the blocks which are in wait state.
+ */
+ private void processWaitBlocks()
+ {
+ Iterator<AbstractBlockReader.ReaderRecord<Slice>> waitIterator = waitingTuples.iterator();
+
+ while (waitIterator.hasNext()) {
+ AbstractBlockReader.ReaderRecord<Slice> blockData = waitIterator.next();
+ String filePath = blockIdToFilePath.get(blockData.getBlockId());
+ if (filePath != null && blockInfo.get(getUniqueBlockIdFromFile(blockData.getBlockId(), filePath)) != null) {
+ uploadBlockIntoS3(blockData);
+ waitIterator.remove();
+ }
+ }
+ }
+
+ /**
+ * Upload the block into S3 bucket.
+ * @param tuple block data
+ */
+ protected void uploadBlockIntoS3(AbstractBlockReader.ReaderRecord<Slice> tuple)
+ {
+ if (currentWindowId <= windowDataManager.getLargestCompletedWindow()) {
+ return;
+ }
+ // Check whether the block metadata is present for this block
+ if (blockIdToFilePath.get(tuple.getBlockId()) == null) {
+ if (!waitingTuples.contains(tuple)) {
+ waitingTuples.add(tuple);
+ }
+ return;
+ }
+ String uniqueBlockId = getUniqueBlockIdFromFile(tuple.getBlockId(), blockIdToFilePath.get(tuple.getBlockId()));
+ S3BlockMetaData metaData = blockInfo.get(uniqueBlockId);
+ // Check whether the file metadata is received
+ if (metaData == null) {
+ if (!waitingTuples.contains(tuple)) {
+ waitingTuples.add(tuple);
+ }
+ return;
+ }
+ long partSize = tuple.getRecord().length;
+ PartETag partETag = null;
+ ByteArrayInputStream bis = new ByteArrayInputStream(tuple.getRecord().buffer);
+ // Check if it is a Single block of a file
+ if (metaData.isLastBlock && metaData.partNo == 1) {
+ ObjectMetadata omd = createObjectMetadata();
+ omd.setContentLength(partSize);
+ PutObjectResult result = s3Client.putObject(new PutObjectRequest(bucketName, metaData.getKeyName(), bis, omd));
+ partETag = new PartETag(1, result.getETag());
+ } else {
+ // Else upload use multi-part feature
+ try {
+ // Create request to upload a part.
+ UploadPartRequest uploadRequest = new UploadPartRequest().withBucketName(bucketName).withKey(metaData.getKeyName())
+ .withUploadId(metaData.getUploadId()).withPartNumber(metaData.getPartNo()).withInputStream(bis).withPartSize(partSize);
+ partETag = s3Client.uploadPart(uploadRequest).getPartETag();
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+ UploadBlockMetadata uploadmetadata = new UploadBlockMetadata(partETag, metaData.getKeyName());
+ output.emit(uploadmetadata);
+ currentWindowRecoveryState.put(uniqueBlockId, uploadmetadata);
+ try {
+ bis.close();
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ /**
+ * Creates the empty object metadata for initiate multipart upload request.
+ * @return the ObjectMetadata
+ */
+ public ObjectMetadata createObjectMetadata()
+ {
+ return new ObjectMetadata();
+ }
+
+ @Override
+ public void beforeCheckpoint(long windowId)
+ {
+ }
+
+ @Override
+ public void checkpointed(long windowId)
+ {
+ }
+
+ @Override
+ public void committed(long windowId)
+ {
+ try {
+ windowDataManager.committed(windowId);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public void handleIdleTime()
+ {
+ if (waitingTuples.size() > 0) {
+ processWaitBlocks();
+ }
+ }
+
+ /**
+ * Upload block metadata consists of partETag and key name.
+ */
+ public static class UploadBlockMetadata
+ {
+ @FieldSerializer.Bind(JavaSerializer.class)
+ private PartETag partETag;
+ private String keyName;
+
+ // For Kryo
+ public UploadBlockMetadata()
+ {
+ }
+
+ public UploadBlockMetadata(PartETag partETag, String keyName)
+ {
+ this.partETag = partETag;
+ this.keyName = keyName;
+ }
+
+ /**
+ * Get the partETag of the block
+ * @return the partETag
+ */
+ public PartETag getPartETag()
+ {
+ return partETag;
+ }
+
+ /**
+ * Return the key name of the file
+ * @return key name
+ */
+ public String getKeyName()
+ {
+ return keyName;
+ }
+
+ @Override
+ public int hashCode()
+ {
+ return keyName.hashCode();
+ }
+ }
+
+ /**
+ * S3 Block meta data consists of keyname, upload Id, part number and whether the block is last block or not.
+ */
+ public static class S3BlockMetaData
+ {
+ private String keyName;
+ private String uploadId;
+ private Integer partNo;
+ private boolean isLastBlock;
+
+ // For Kryo Serialization
+ public S3BlockMetaData()
+ {
+ }
+
+ public S3BlockMetaData(String keyName, String uploadId, Integer partNo)
+ {
+ this.keyName = keyName;
+ this.uploadId = uploadId;
+ this.partNo = partNo;
+ this.isLastBlock = false;
+ }
+
+ /**
+ * Return the key name of the file
+ * @return key name
+ */
+ public String getKeyName()
+ {
+ return keyName;
+ }
+
+ /**
+ * Return the upload id of the block
+ * @return the uplaod id
+ */
+ public String getUploadId()
+ {
+ return uploadId;
+ }
+
+ /**
+ * Return the part number of the block
+ * @return the part number
+ */
+ public Integer getPartNo()
+ {
+ return partNo;
+ }
+
+ /**
+ * Specifies whether the block is last or not.
+ * @return isLastBlock
+ */
+ public boolean isLastBlock()
+ {
+ return isLastBlock;
+ }
+
+ /**
+ * Sets the block is last or not.
+ * @param lastBlock Specifies whether the block is last or not
+ */
+ public void setLastBlock(boolean lastBlock)
+ {
+ isLastBlock = lastBlock;
+ }
+ }
+
+ /**
+ * Returns the name of the bucket in which to upload the blocks.
+ * @return bucket name
+ */
+ public String getBucketName()
+ {
+ return bucketName;
+ }
+
+ /**
+ * Sets the name of the bucket in which to upload the blocks.
+ * @param bucketName bucket name
+ */
+ public void setBucketName(@NotNull String bucketName)
+ {
+ this.bucketName = Preconditions.checkNotNull(bucketName);
+ }
+
+ /**
+ * Return the AWS access key
+ * @return access key
+ */
+ public String getAccessKey()
+ {
+ return accessKey;
+ }
+
+ /**
+ * Sets the AWS access key
+ * @param accessKey access key
+ */
+ public void setAccessKey(@NotNull String accessKey)
+ {
+ this.accessKey = Preconditions.checkNotNull(accessKey);
+ }
+
+ /**
+ * Return the AWS access key
+ * @return access key
+ */
+ public String getSecretAccessKey()
+ {
+ return secretAccessKey;
+ }
+
+ /**
+ * Sets the AWS access key
+ * @param secretAccessKey access key
+ */
+ public void setSecretAccessKey(@NotNull String secretAccessKey)
+ {
+ this.secretAccessKey = Preconditions.checkNotNull(secretAccessKey);
+ }
+
+ /**
+ * Return the AWS S3 end point
+ * @return S3 end point
+ */
+ public String getEndPoint()
+ {
+ return endPoint;
+ }
+
+ /**
+ * Sets the AWS S3 end point
+ * @param endPoint S3 end point
+ */
+ public void setEndPoint(String endPoint)
+ {
+ this.endPoint = endPoint;
+ }
+}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/a5e8fa3f/library/src/main/java/org/apache/apex/malhar/lib/fs/s3/S3FileMerger.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/fs/s3/S3FileMerger.java b/library/src/main/java/org/apache/apex/malhar/lib/fs/s3/S3FileMerger.java
new file mode 100644
index 0000000..96fbc29
--- /dev/null
+++ b/library/src/main/java/org/apache/apex/malhar/lib/fs/s3/S3FileMerger.java
@@ -0,0 +1,302 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.fs.s3;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import javax.validation.constraints.NotNull;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.apex.malhar.lib.wal.FSWindowDataManager;
+import org.apache.apex.malhar.lib.wal.WindowDataManager;
+import org.apache.hadoop.classification.InterfaceStability;
+
+import com.amazonaws.auth.BasicAWSCredentials;
+import com.amazonaws.services.s3.AmazonS3;
+import com.amazonaws.services.s3.AmazonS3Client;
+import com.amazonaws.services.s3.model.CompleteMultipartUploadRequest;
+import com.amazonaws.services.s3.model.CompleteMultipartUploadResult;
+import com.amazonaws.services.s3.model.PartETag;
+import com.esotericsoftware.kryo.serializers.FieldSerializer;
+import com.esotericsoftware.kryo.serializers.JavaSerializer;
+import com.google.common.base.Preconditions;
+
+import com.datatorrent.api.Context;
+import com.datatorrent.api.DefaultInputPort;
+import com.datatorrent.api.Operator;
+/**
+ * This operator can be used to merge the S3 blocks into a file. This operator will request for
+ * S3 CompleteMultipartUploadRequest once all the blocks are uploaded using multi-part feature.
+ * This operator is useful in context of S3 Output Module.
+ */
+
+@InterfaceStability.Evolving
+public class S3FileMerger implements Operator, Operator.CheckpointNotificationListener
+{
+ private static final Logger LOG = LoggerFactory.getLogger(S3FileMerger.class);
+ @NotNull
+ private String bucketName;
+ @NotNull
+ private String accessKey;
+ @NotNull
+ private String secretAccessKey;
+ private String endPoint;
+ protected transient List<String> uploadedFiles = new ArrayList<>();
+ private WindowDataManager windowDataManager = new FSWindowDataManager();
+ @FieldSerializer.Bind(JavaSerializer.class)
+ private Map<String, List<PartETag>> uploadParts = new HashMap<>();
+ private Map<String, S3InitiateFileUploadOperator.UploadFileMetadata> fileMetadatas = new HashMap<>();
+ protected transient long currentWindowId;
+ protected transient AmazonS3 s3Client;
+
+ /**
+ * Input port to receive UploadBlockMetadata
+ */
+ public final transient DefaultInputPort<S3BlockUploadOperator.UploadBlockMetadata> uploadMetadataInput = new DefaultInputPort<S3BlockUploadOperator.UploadBlockMetadata>()
+ {
+ @Override
+ public void process(S3BlockUploadOperator.UploadBlockMetadata tuple)
+ {
+ processUploadBlock(tuple);
+ }
+ };
+
+ /**
+ * Process to merge the uploaded block into a file.
+ * @param tuple uploaded block meta data
+ */
+ protected void processUploadBlock(S3BlockUploadOperator.UploadBlockMetadata tuple)
+ {
+ List<PartETag> listOfUploads = uploadParts.get(tuple.getKeyName());
+ if (listOfUploads == null) {
+ listOfUploads = new ArrayList<>();
+ uploadParts.put(tuple.getKeyName(), listOfUploads);
+ }
+ listOfUploads.add(tuple.getPartETag());
+ if (fileMetadatas.get(tuple.getKeyName()) != null) {
+ verifyAndEmitFileMerge(tuple.getKeyName());
+ }
+ }
+
+ /**
+ * Input port to receive UploadFileMetadata
+ */
+ public final transient DefaultInputPort<S3InitiateFileUploadOperator.UploadFileMetadata> filesMetadataInput = new DefaultInputPort<S3InitiateFileUploadOperator.UploadFileMetadata>()
+ {
+ @Override
+ public void process(S3InitiateFileUploadOperator.UploadFileMetadata tuple)
+ {
+ processFileMetadata(tuple);
+ }
+ };
+
+ /**
+ * Process to merge the uploaded blocks for the given file metadata.
+ * @param tuple file metadata
+ */
+ protected void processFileMetadata(S3InitiateFileUploadOperator.UploadFileMetadata tuple)
+ {
+ String keyName = tuple.getKeyName();
+ fileMetadatas.put(keyName, tuple);
+ if (uploadParts.get(keyName) != null) {
+ verifyAndEmitFileMerge(keyName);
+ }
+ }
+
+ @Override
+ public void setup(Context.OperatorContext context)
+ {
+ windowDataManager.setup(context);
+ s3Client = createClient();
+ }
+
+ /**
+ * Create AmazonS3 client using AWS credentials
+ * @return AmazonS3
+ */
+ protected AmazonS3 createClient()
+ {
+ AmazonS3 client = new AmazonS3Client(new BasicAWSCredentials(accessKey, secretAccessKey));
+ if (endPoint != null) {
+ client.setEndpoint(endPoint);
+ }
+ return client;
+ }
+
+
+ @Override
+ public void beginWindow(long windowId)
+ {
+ currentWindowId = windowId;
+ }
+
+ @Override
+ public void endWindow()
+ {
+ if (uploadedFiles.size() > 0) {
+ for (String keyName: uploadedFiles) {
+ uploadParts.remove(keyName);
+ fileMetadatas.remove(keyName);
+ }
+ uploadedFiles.clear();
+ }
+ if (currentWindowId > windowDataManager.getLargestCompletedWindow()) {
+ try {
+ windowDataManager.save("Uploaded Files", currentWindowId);
+ } catch (IOException e) {
+ throw new RuntimeException("Unable to save recovery", e);
+ }
+ }
+ }
+
+ @Override
+ public void teardown()
+ {
+ windowDataManager.teardown();
+ }
+
+ /**
+ * Send the CompleteMultipartUploadRequest to S3 if all the blocks of a file are uploaded into S3.
+ * @param keyName file to upload into S3
+ */
+ private void verifyAndEmitFileMerge(String keyName)
+ {
+ if (currentWindowId <= windowDataManager.getLargestCompletedWindow()) {
+ return;
+ }
+ S3InitiateFileUploadOperator.UploadFileMetadata uploadFileMetadata = fileMetadatas.get(keyName);
+ List<PartETag> partETags = uploadParts.get(keyName);
+ if (partETags == null || uploadFileMetadata == null ||
+ uploadFileMetadata.getFileMetadata().getNumberOfBlocks() != partETags.size()) {
+ return;
+ }
+
+ if (partETags.size() <= 1) {
+ uploadedFiles.add(keyName);
+ LOG.debug("Uploaded file {} successfully", keyName);
+ return;
+ }
+
+ CompleteMultipartUploadRequest compRequest = new CompleteMultipartUploadRequest(bucketName,
+ keyName, uploadFileMetadata.getUploadId(), partETags);
+ CompleteMultipartUploadResult result = s3Client.completeMultipartUpload(compRequest);
+ if (result.getETag() != null) {
+ uploadedFiles.add(keyName);
+ LOG.debug("Uploaded file {} successfully", keyName);
+ }
+ }
+
+ @Override
+ public void beforeCheckpoint(long windowId)
+ {
+ }
+
+ @Override
+ public void checkpointed(long windowId)
+ {
+ }
+
+ @Override
+ public void committed(long windowId)
+ {
+ try {
+ windowDataManager.committed(windowId);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ /**
+ * Return the name of the bucket in which to upload the files
+ * @return name of the bucket
+ */
+ public String getBucketName()
+ {
+ return bucketName;
+ }
+
+ /**
+ * Sets the name of the bucket in which to upload the files.
+ * @param bucketName name of the bucket
+ */
+ public void setBucketName(@NotNull String bucketName)
+ {
+ this.bucketName = Preconditions.checkNotNull(bucketName);
+ }
+
+ /**
+ * Return the AWS access key
+ * @return the access key
+ */
+ public String getAccessKey()
+ {
+ return accessKey;
+ }
+
+ /**
+ * Sets the AWS access key
+ * @param accessKey AWS access key
+ */
+ public void setAccessKey(@NotNull String accessKey)
+ {
+ this.accessKey = Preconditions.checkNotNull(accessKey);
+ }
+
+ /**
+ * Returns the AWS secret access key
+ * @return AWS secret access key
+ */
+ public String getSecretAccessKey()
+ {
+ return secretAccessKey;
+ }
+
+ /**
+ * Sets the AWS secret access key
+ * @param secretAccessKey AWS secret access key
+ */
+ public void setSecretAccessKey(@NotNull String secretAccessKey)
+ {
+ this.secretAccessKey = Preconditions.checkNotNull(secretAccessKey);
+ }
+
+ /**
+ * Get the AWS S3 end point
+ * @return the AWS S3 end point
+ */
+ public String getEndPoint()
+ {
+ return endPoint;
+ }
+
+ /**
+ * Set the S3 end point
+ * @param endPoint end point
+ */
+ public void setEndPoint(String endPoint)
+ {
+ this.endPoint = endPoint;
+ }
+}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/a5e8fa3f/library/src/main/java/org/apache/apex/malhar/lib/fs/s3/S3InitiateFileUploadOperator.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/fs/s3/S3InitiateFileUploadOperator.java b/library/src/main/java/org/apache/apex/malhar/lib/fs/s3/S3InitiateFileUploadOperator.java
new file mode 100644
index 0000000..3a38265
--- /dev/null
+++ b/library/src/main/java/org/apache/apex/malhar/lib/fs/s3/S3InitiateFileUploadOperator.java
@@ -0,0 +1,378 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.fs.s3;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import javax.validation.constraints.NotNull;
+
+import org.apache.apex.malhar.lib.wal.FSWindowDataManager;
+import org.apache.apex.malhar.lib.wal.WindowDataManager;
+import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.fs.Path;
+
+import com.amazonaws.auth.BasicAWSCredentials;
+import com.amazonaws.services.s3.AmazonS3;
+import com.amazonaws.services.s3.AmazonS3Client;
+import com.amazonaws.services.s3.model.InitiateMultipartUploadRequest;
+import com.amazonaws.services.s3.model.InitiateMultipartUploadResult;
+import com.amazonaws.services.s3.model.ObjectMetadata;
+import com.google.common.base.Preconditions;
+
+import com.datatorrent.api.Context;
+import com.datatorrent.api.DefaultInputPort;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.api.Operator;
+import com.datatorrent.lib.io.fs.AbstractFileSplitter;
+
+/**
+ * This is an S3 Initiate file upload operator which can be used to initiate file upload and emits the upload id.
+ * Initiate the given file for upload only if the file contains more than one block.
+ * This operator is useful in context of S3 Output Module.
+ */
+@InterfaceStability.Evolving
+public class S3InitiateFileUploadOperator implements Operator, Operator.CheckpointNotificationListener
+{
+ @NotNull
+ private String bucketName;
+ @NotNull
+ private String accessKey;
+ @NotNull
+ private String secretAccessKey;
+ private String endPoint;
+ @NotNull
+ private String outputDirectoryPath;
+ private WindowDataManager windowDataManager = new FSWindowDataManager();
+ protected transient AmazonS3 s3Client;
+ protected transient long currentWindowId;
+ protected transient List<UploadFileMetadata> currentWindowRecoveryState;
+
+ public final transient DefaultOutputPort<UploadFileMetadata> fileMetadataOutput = new DefaultOutputPort<>();
+
+ public final transient DefaultOutputPort<UploadFileMetadata> uploadMetadataOutput = new DefaultOutputPort<>();
+
+ /**
+ * This input port receive file metadata and those files will be upload into S3.
+ */
+ public final transient DefaultInputPort<AbstractFileSplitter.FileMetadata> filesMetadataInput = new DefaultInputPort<AbstractFileSplitter.FileMetadata>()
+ {
+ @Override
+ public void process(AbstractFileSplitter.FileMetadata tuple)
+ {
+ processTuple(tuple);
+ }
+ };
+
+ /**
+ * For the input file, initiate the upload and emit the UploadFileMetadata through the fileMetadataOutput,
+ * uploadMetadataOutput ports.
+ * @param tuple given tuple
+ */
+ protected void processTuple(AbstractFileSplitter.FileMetadata tuple)
+ {
+ if (currentWindowId <= windowDataManager.getLargestCompletedWindow()) {
+ return;
+ }
+ String keyName = getKeyName(tuple.getFilePath());
+ String uploadId = "";
+ if (tuple.getNumberOfBlocks() > 1) {
+ InitiateMultipartUploadRequest initRequest = new InitiateMultipartUploadRequest(bucketName, keyName);
+ initRequest.setObjectMetadata(createObjectMetadata());
+ InitiateMultipartUploadResult initResponse = s3Client.initiateMultipartUpload(initRequest);
+ uploadId = initResponse.getUploadId();
+ }
+ UploadFileMetadata uploadFileMetadata = new UploadFileMetadata(tuple, uploadId, keyName);
+ fileMetadataOutput.emit(uploadFileMetadata);
+ uploadMetadataOutput.emit(uploadFileMetadata);
+ currentWindowRecoveryState.add(uploadFileMetadata);
+ }
+
+ /**
+ * Creates the empty object metadata for initiate multipart upload request.
+ * @return the ObjectMetadata
+ */
+ public ObjectMetadata createObjectMetadata()
+ {
+ return new ObjectMetadata();
+ }
+
+ @Override
+ public void setup(Context.OperatorContext context)
+ {
+ outputDirectoryPath = StringUtils.removeEnd(outputDirectoryPath, Path.SEPARATOR);
+ currentWindowRecoveryState = new ArrayList<>();
+ windowDataManager.setup(context);
+ s3Client = createClient();
+ }
+
+ /**
+ * Create AmazonS3 client using AWS credentials
+ * @return AmazonS3
+ */
+ protected AmazonS3 createClient()
+ {
+ AmazonS3 client = new AmazonS3Client(new BasicAWSCredentials(accessKey, secretAccessKey));
+ if (endPoint != null) {
+ client.setEndpoint(endPoint);
+ }
+ return client;
+ }
+
+ /**
+ * Generates the key name from the given file path and output directory path.
+ * @param filePath file path to upload
+ * @return key name for the given file
+ */
+ private String getKeyName(String filePath)
+ {
+ return outputDirectoryPath + Path.SEPARATOR + StringUtils.removeStart(filePath, Path.SEPARATOR);
+ }
+
+ @Override
+ public void beginWindow(long windowId)
+ {
+ currentWindowId = windowId;
+ if (windowId <= windowDataManager.getLargestCompletedWindow()) {
+ replay(windowId);
+ }
+ }
+
+ @Override
+ public void endWindow()
+ {
+ if (currentWindowId > windowDataManager.getLargestCompletedWindow()) {
+ try {
+ windowDataManager.save(currentWindowRecoveryState, currentWindowId);
+ } catch (IOException e) {
+ throw new RuntimeException("Unable to save recovery", e);
+ }
+ }
+ currentWindowRecoveryState.clear();
+ }
+
+ @Override
+ public void teardown()
+ {
+ windowDataManager.teardown();
+ }
+
+ protected void replay(long windowId)
+ {
+ try {
+ @SuppressWarnings("unchecked")
+ List<UploadFileMetadata> recoveredData = (List<UploadFileMetadata>)windowDataManager.retrieve(windowId);
+ if (recoveredData != null) {
+ for (UploadFileMetadata upfm : recoveredData) {
+ uploadMetadataOutput.emit(upfm);
+ fileMetadataOutput.emit(upfm);
+ }
+ }
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public void beforeCheckpoint(long windowId)
+ {
+
+ }
+
+ @Override
+ public void checkpointed(long windowId)
+ {
+
+ }
+
+ @Override
+ public void committed(long windowId)
+ {
+ try {
+ windowDataManager.committed(windowId);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ /**
+ * Return the name of the bucket in which to create the multipart upload.
+ * @return bucket name
+ */
+ public String getBucketName()
+ {
+ return bucketName;
+ }
+
+ /**
+ * Set the name of the bucket in which to create the multipart upload.
+ * @param bucketName bucket name
+ */
+ public void setBucketName(@NotNull String bucketName)
+ {
+ this.bucketName = Preconditions.checkNotNull(bucketName);
+ }
+
+ /**
+ * Return the AWS access key
+ * @return AWS access key
+ */
+ public String getAccessKey()
+ {
+ return accessKey;
+ }
+
+ /**
+ * Sets the AWS access key
+ * @param accessKey given access key
+ */
+ public void setAccessKey(@NotNull String accessKey)
+ {
+ this.accessKey = Preconditions.checkNotNull(accessKey);
+ }
+
+ /**
+ * Return the AWS secret access key
+ * @return the AWS secret access key
+ */
+ public String getSecretAccessKey()
+ {
+ return secretAccessKey;
+ }
+
+ /**
+ * Sets the AWS secret access key
+ * @param secretAccessKey secret access key
+ */
+ public void setSecretAccessKey(@NotNull String secretAccessKey)
+ {
+ this.secretAccessKey = Preconditions.checkNotNull(secretAccessKey);
+ }
+
+ /**
+ * Output directory path for the files to upload
+ * @return the output directory path
+ */
+ public String getOutputDirectoryPath()
+ {
+ return outputDirectoryPath;
+ }
+
+ /**
+ * Sets the output directory path for uploading new files.
+ * @param outputDirectoryPath output directory path
+ */
+ public void setOutputDirectoryPath(@NotNull String outputDirectoryPath)
+ {
+ this.outputDirectoryPath = Preconditions.checkNotNull(outputDirectoryPath);
+ }
+
+ /**
+ * Returns the window data manager.
+ * @return the windowDataManager
+ */
+ public WindowDataManager getWindowDataManager()
+ {
+ return windowDataManager;
+ }
+
+ /**
+ * Sets the window data manager
+ * @param windowDataManager given windowDataManager
+ */
+ public void setWindowDataManager(@NotNull WindowDataManager windowDataManager)
+ {
+ this.windowDataManager = Preconditions.checkNotNull(windowDataManager);
+ }
+
+ /**
+ * Returns the AWS S3 end point
+ * @return the S3 end point
+ */
+ public String getEndPoint()
+ {
+ return endPoint;
+ }
+
+ /**
+ * Sets the AWS S3 end point
+ * @param endPoint S3 end point
+ */
+ public void setEndPoint(String endPoint)
+ {
+ this.endPoint = endPoint;
+ }
+
+ /**
+ * A file upload metadata which contains file metadata, upload id, key name.
+ */
+ public static class UploadFileMetadata
+ {
+ private AbstractFileSplitter.FileMetadata fileMetadata;
+ private String uploadId;
+ private String keyName;
+
+ // For Kryo
+ public UploadFileMetadata()
+ {
+ }
+
+ public UploadFileMetadata(AbstractFileSplitter.FileMetadata fileMetadata, String uploadId, String keyName)
+ {
+ this.fileMetadata = fileMetadata;
+ this.uploadId = uploadId;
+ this.keyName = keyName;
+ }
+
+ @Override
+ public int hashCode()
+ {
+ return keyName.hashCode();
+ }
+
+ /**
+ * Returns the name of the key generated from file path.
+ * @return the key name
+ */
+ public String getKeyName()
+ {
+ return keyName;
+ }
+
+ /**
+ * Return the file metadata of a file.
+ * @return the fileMetadata
+ */
+ public AbstractFileSplitter.FileMetadata getFileMetadata()
+ {
+ return fileMetadata;
+ }
+
+ /**
+ * Returns the unique upload id of a file.
+ * @return the upload Id of a file
+ */
+ public String getUploadId()
+ {
+ return uploadId;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/a5e8fa3f/library/src/main/java/org/apache/apex/malhar/lib/fs/s3/S3OutputModule.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/fs/s3/S3OutputModule.java b/library/src/main/java/org/apache/apex/malhar/lib/fs/s3/S3OutputModule.java
new file mode 100644
index 0000000..6c3d8be
--- /dev/null
+++ b/library/src/main/java/org/apache/apex/malhar/lib/fs/s3/S3OutputModule.java
@@ -0,0 +1,325 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.fs.s3;
+
+import javax.validation.constraints.Min;
+import javax.validation.constraints.NotNull;
+
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
+
+import com.datatorrent.api.Context;
+import com.datatorrent.api.DAG;
+import com.datatorrent.api.Module;
+import com.datatorrent.common.partitioner.StatelessPartitioner;
+import com.datatorrent.lib.io.block.AbstractBlockReader;
+import com.datatorrent.lib.io.block.BlockMetadata;
+import com.datatorrent.lib.io.fs.AbstractFileSplitter;
+import com.datatorrent.netlet.util.Slice;
+
+import static com.datatorrent.api.Context.OperatorContext.TIMEOUT_WINDOW_COUNT;
+
+/**
+ * S3OutputModule can be used to upload the files/directory into S3. This module supports
+ * parallel uploads of multiple blocks of the same file and merge those blocks in sequence.
+ *
+ * Below operators are wrapped into single component using Module API
+ * - S3InitiateFileUploadOperator
+ * - S3BlockUploadOperator
+ * - S3FileMerger
+ *
+ * Initial BenchMark Results
+ * -------------------------
+ * The Module writes 18 MB/s to S3 using multi part upload feature with the following configuration
+ *
+ * File Size = 1 GB
+ * Partition count of S3BlockUploadOperator = 6
+ * Partition count of S3FileMerger = 1
+ * Container memory size of this module as follows:
+ * S3InitiateFileUploadOperator = 1 GB
+ * S3BlockUploadOperator = 2.5 GB
+ * S3FileMerger = 2 GB
+ *
+ *
+ * @displayName S3 Output Module
+ * @tags S3, Output
+ */
+@InterfaceStability.Evolving
+public class S3OutputModule implements Module
+{
+ /**
+ * AWS access key
+ */
+ @NotNull
+ private String accessKey;
+ /**
+ * AWS secret access key
+ */
+ @NotNull
+ private String secretAccessKey;
+
+ /**
+ * S3 End point
+ */
+ private String endPoint;
+ /**
+ * Name of the bucket in which to upload the files
+ */
+ @NotNull
+ private String bucketName;
+
+ /**
+ * Path of the output directory. Relative path of the files copied will be
+ * maintained w.r.t. source directory and output directory
+ */
+ @NotNull
+ private String outputDirectoryPath;
+
+ /**
+ * Specified as count of streaming windows. This value will set to the operators in this module because
+ * the operators in this module is mostly interacts with the Amazon S3.
+ * Due to this reason, window id of these operators might be lag behind with the upstream operators.
+ */
+ @Min(120)
+ private int timeOutWindowCount = 6000;
+
+ /**
+ * Creates the number of instances of S3FileMerger operator.
+ */
+ @Min(1)
+ private int mergerCount = 1;
+ /**
+ * Input port for files metadata.
+ */
+ public final transient ProxyInputPort<AbstractFileSplitter.FileMetadata> filesMetadataInput = new ProxyInputPort<AbstractFileSplitter.FileMetadata>();
+
+ /**
+ * Input port for blocks metadata
+ */
+ public final transient ProxyInputPort<BlockMetadata.FileBlockMetadata> blocksMetadataInput = new ProxyInputPort<BlockMetadata.FileBlockMetadata>();
+
+ /**
+ * Input port for blocks data
+ */
+ public final transient ProxyInputPort<AbstractBlockReader.ReaderRecord<Slice>> blockData = new ProxyInputPort<AbstractBlockReader.ReaderRecord<Slice>>();
+
+ @Override
+ public void populateDAG(DAG dag, Configuration conf)
+ {
+ // DAG for S3 Output Module as follows:
+ // ---- S3InitiateFileUploadOperator -----|
+ // | S3FileMerger
+ // ---- S3BlockUploadOperator ------------|
+
+ S3InitiateFileUploadOperator initiateUpload = dag.addOperator("InitiateUpload", createS3InitiateUpload());
+ initiateUpload.setAccessKey(accessKey);
+ initiateUpload.setSecretAccessKey(secretAccessKey);
+ initiateUpload.setBucketName(bucketName);
+ initiateUpload.setOutputDirectoryPath(outputDirectoryPath);
+
+ S3BlockUploadOperator blockUploader = dag.addOperator("BlockUpload", createS3BlockUpload());
+ blockUploader.setAccessKey(accessKey);
+ blockUploader.setSecretAccessKey(secretAccessKey);
+ blockUploader.setBucketName(bucketName);
+
+ S3FileMerger fileMerger = dag.addOperator("FileMerger", createS3FileMerger());
+ fileMerger.setAccessKey(accessKey);
+ fileMerger.setSecretAccessKey(secretAccessKey);
+ fileMerger.setBucketName(bucketName);
+
+ if (endPoint != null) {
+ initiateUpload.setEndPoint(endPoint);
+ blockUploader.setEndPoint(endPoint);
+ fileMerger.setEndPoint(endPoint);
+ }
+
+ dag.setInputPortAttribute(blockUploader.blockInput, Context.PortContext.PARTITION_PARALLEL, true);
+ dag.setInputPortAttribute(blockUploader.blockMetadataInput, Context.PortContext.PARTITION_PARALLEL, true);
+
+ dag.setAttribute(initiateUpload, TIMEOUT_WINDOW_COUNT, timeOutWindowCount);
+ dag.setAttribute(blockUploader, TIMEOUT_WINDOW_COUNT, timeOutWindowCount);
+ dag.setAttribute(fileMerger, TIMEOUT_WINDOW_COUNT, timeOutWindowCount);
+ dag.setUnifierAttribute(blockUploader.output, TIMEOUT_WINDOW_COUNT, timeOutWindowCount);
+
+ dag.setAttribute(fileMerger,Context.OperatorContext.PARTITIONER, new StatelessPartitioner<S3FileMerger>(mergerCount));
+ // Add Streams
+ dag.addStream("InitiateUploadIDToMerger", initiateUpload.fileMetadataOutput, fileMerger.filesMetadataInput);
+ dag.addStream("InitiateUploadIDToWriter", initiateUpload.uploadMetadataOutput, blockUploader.uploadMetadataInput);
+ dag.addStream("WriterToMerger", blockUploader.output, fileMerger.uploadMetadataInput);
+
+ // Set the proxy ports
+ filesMetadataInput.set(initiateUpload.filesMetadataInput);
+ blocksMetadataInput.set(blockUploader.blockMetadataInput);
+ blockData.set(blockUploader.blockInput);
+ }
+
+ /**
+ * Create the S3InitiateFileUploadOperator for initiate upload
+ * @return S3InitiateFileUploadOperator
+ */
+ protected S3InitiateFileUploadOperator createS3InitiateUpload()
+ {
+ return new S3InitiateFileUploadOperator();
+ }
+
+ /**
+ * Create the S3BlockUploadOperator for block upload into S3 bucket
+ * @return S3BlockUploadOperator
+ */
+ protected S3BlockUploadOperator createS3BlockUpload()
+ {
+ return new S3BlockUploadOperator();
+ }
+
+ /**
+ * Create the S3FileMerger for sending complete request
+ * @return S3FileMerger
+ */
+ protected S3FileMerger createS3FileMerger()
+ {
+ return new S3FileMerger();
+ }
+
+ /**
+ * Get the AWS access key
+ * @return AWS access key
+ */
+ public String getAccessKey()
+ {
+ return accessKey;
+ }
+
+ /**
+ * Set the AWS access key
+ * @param accessKey access key
+ */
+ public void setAccessKey(String accessKey)
+ {
+ this.accessKey = accessKey;
+ }
+
+ /**
+ * Return the AWS secret access key
+ * @return AWS secret access key
+ */
+ public String getSecretAccessKey()
+ {
+ return secretAccessKey;
+ }
+
+ /**
+ * Set the AWS secret access key
+ * @param secretAccessKey AWS secret access key
+ */
+ public void setSecretAccessKey(String secretAccessKey)
+ {
+ this.secretAccessKey = secretAccessKey;
+ }
+
+
+ /**
+ * Get the name of the bucket in which to upload the files
+ * @return bucket name
+ */
+ public String getBucketName()
+ {
+ return bucketName;
+ }
+
+ /**
+ * Set the name of the bucket in which to upload the files
+ * @param bucketName name of the bucket
+ */
+ public void setBucketName(String bucketName)
+ {
+ this.bucketName = bucketName;
+ }
+
+ /**
+ * Return the S3 End point
+ * @return S3 End point
+ */
+ public String getEndPoint()
+ {
+ return endPoint;
+ }
+
+ /**
+ * Set the S3 End point
+ * @param endPoint S3 end point
+ */
+ public void setEndPoint(String endPoint)
+ {
+ this.endPoint = endPoint;
+ }
+
+ /**
+ * Get the path of the output directory.
+ * @return path of output directory
+ */
+ public String getOutputDirectoryPath()
+ {
+ return outputDirectoryPath;
+ }
+
+ /**
+ * Set the path of the output directory.
+ * @param outputDirectoryPath path of output directory
+ */
+ public void setOutputDirectoryPath(String outputDirectoryPath)
+ {
+ this.outputDirectoryPath = outputDirectoryPath;
+ }
+
+ /**
+ * Get the number of streaming windows for the operators which have stalled processing.
+ * @return the number of streaming windows
+ */
+ public int getTimeOutWindowCount()
+ {
+ return timeOutWindowCount;
+ }
+
+ /**
+ * Set the number of streaming windows.
+ * @param timeOutWindowCount given number of streaming windows for time out.
+ */
+ public void setTimeOutWindowCount(int timeOutWindowCount)
+ {
+ this.timeOutWindowCount = timeOutWindowCount;
+ }
+
+ /**
+ * Get the partition count of S3FileMerger operator
+ * @return the partition count
+ */
+ public int getMergerCount()
+ {
+ return mergerCount;
+ }
+
+ /**
+ * Set the partition count of S3FileMerger Operator
+ * @param mergerCount given partition count
+ */
+ public void setMergerCount(int mergerCount)
+ {
+ this.mergerCount = mergerCount;
+ }
+}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/a5e8fa3f/library/src/test/java/org/apache/apex/malhar/lib/fs/s3/S3InitiateFileUploadOperatorTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/org/apache/apex/malhar/lib/fs/s3/S3InitiateFileUploadOperatorTest.java b/library/src/test/java/org/apache/apex/malhar/lib/fs/s3/S3InitiateFileUploadOperatorTest.java
new file mode 100644
index 0000000..a077fb9
--- /dev/null
+++ b/library/src/test/java/org/apache/apex/malhar/lib/fs/s3/S3InitiateFileUploadOperatorTest.java
@@ -0,0 +1,119 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.fs.s3;
+
+import java.io.IOException;
+
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+import com.amazonaws.services.s3.AmazonS3;
+import com.amazonaws.services.s3.model.InitiateMultipartUploadRequest;
+import com.amazonaws.services.s3.model.InitiateMultipartUploadResult;
+
+import com.datatorrent.api.Attribute;
+import com.datatorrent.api.Context;
+import com.datatorrent.api.DAG;
+import com.datatorrent.lib.helper.OperatorContextTestHelper;
+import com.datatorrent.lib.io.fs.AbstractFileSplitter;
+import com.datatorrent.lib.testbench.CollectorTestSink;
+
+import static org.mockito.Mockito.any;
+import static org.mockito.Mockito.when;
+
+/**
+ * Testing the S3InitiateFileUploadOperator operator. It verifies the generated upload id by S3InitiateFileUploadOperator
+ * and client through the mock are same or not.
+ */
+public class S3InitiateFileUploadOperatorTest
+{
+ private String uploadId = "uploadfile1";
+ private static final String APPLICATION_PATH_PREFIX = "target/s3outputtest/";
+ private String applicationPath;
+ private Attribute.AttributeMap.DefaultAttributeMap attributes;
+ private Context.OperatorContext context;
+ @Mock
+ public AmazonS3 client;
+ @Mock
+ public AbstractFileSplitter.FileMetadata fileMetadata;
+
+ public class S3InitiateFileUploadTest extends S3InitiateFileUploadOperator
+ {
+ @Override
+ protected AmazonS3 createClient()
+ {
+ return client;
+ }
+ }
+
+ @Before
+ public void beforeTest()
+ {
+ applicationPath = OperatorContextTestHelper.getUniqueApplicationPath(APPLICATION_PATH_PREFIX);
+ attributes = new Attribute.AttributeMap.DefaultAttributeMap();
+ attributes.put(DAG.APPLICATION_PATH, applicationPath);
+ context = new OperatorContextTestHelper.TestIdOperatorContext(1, attributes);
+ }
+
+ @After
+ public void afterTest()
+ {
+ Path root = new Path(applicationPath);
+ try {
+ FileSystem fs = FileSystem.newInstance(root.toUri(), new Configuration());
+ fs.delete(root, true);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Test
+ public void testInitiateUpload()
+ {
+ InitiateMultipartUploadResult result = new InitiateMultipartUploadResult();
+ result.setUploadId(uploadId);
+
+ MockitoAnnotations.initMocks(this);
+ when(client.initiateMultipartUpload(any(InitiateMultipartUploadRequest.class))).thenReturn(result);
+ when(fileMetadata.getFilePath()).thenReturn("/tmp/file1.txt");
+ when(fileMetadata.getNumberOfBlocks()).thenReturn(4);
+
+ S3InitiateFileUploadTest operator = new S3InitiateFileUploadTest();
+ operator.setBucketName("testbucket");
+ operator.setup(context);
+
+ CollectorTestSink<S3InitiateFileUploadOperator.UploadFileMetadata> fileSink = new CollectorTestSink<>();
+ CollectorTestSink<Object> tmp = (CollectorTestSink)fileSink;
+ operator.fileMetadataOutput.setSink(tmp);
+ operator.beginWindow(0);
+ operator.processTuple(fileMetadata);
+ operator.endWindow();
+
+ S3InitiateFileUploadOperator.UploadFileMetadata emitted = (S3InitiateFileUploadOperator.UploadFileMetadata)tmp.collectedTuples.get(0);
+ Assert.assertEquals("Upload ID :", uploadId, emitted.getUploadId());
+ }
+}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/a5e8fa3f/library/src/test/java/org/apache/apex/malhar/lib/fs/s3/S3OutputModuleMockTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/org/apache/apex/malhar/lib/fs/s3/S3OutputModuleMockTest.java b/library/src/test/java/org/apache/apex/malhar/lib/fs/s3/S3OutputModuleMockTest.java
new file mode 100644
index 0000000..8fe5ef9
--- /dev/null
+++ b/library/src/test/java/org/apache/apex/malhar/lib/fs/s3/S3OutputModuleMockTest.java
@@ -0,0 +1,171 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.fs.s3;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.concurrent.Callable;
+
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+import com.amazonaws.services.s3.AmazonS3;
+import com.amazonaws.services.s3.model.CompleteMultipartUploadRequest;
+import com.amazonaws.services.s3.model.CompleteMultipartUploadResult;
+import com.amazonaws.services.s3.model.InitiateMultipartUploadRequest;
+import com.amazonaws.services.s3.model.InitiateMultipartUploadResult;
+import com.amazonaws.services.s3.model.PutObjectRequest;
+import com.amazonaws.services.s3.model.PutObjectResult;
+import com.amazonaws.services.s3.model.UploadPartRequest;
+import com.amazonaws.services.s3.model.UploadPartResult;
+
+import com.datatorrent.api.DAG;
+import com.datatorrent.api.LocalMode;
+import com.datatorrent.api.StreamingApplication;
+import com.datatorrent.lib.helper.OperatorContextTestHelper;
+import com.datatorrent.lib.io.fs.FSInputModule;
+import com.datatorrent.stram.StramLocalCluster;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.when;
+
+/**
+ * Verifies the S3OutputModule using the application. This reads the data from local file system
+ * "input" directory and uploads the files into "output" directory.
+ */
+public class S3OutputModuleMockTest
+{
+ private String uploadId = "uploadfile";
+ private static final String APPLICATION_PATH_PREFIX = "target/s3outputmocktest/";
+ private static final String FILE_DATA = "Testing the S3OutputModule. This File has more data hence more blocks.";
+ private static final String FILE = "file.txt";
+ private String inputDir;
+ private String outputDir;
+ private String applicationPath;
+ private File inputFile;
+ @Mock
+ public static AmazonS3 client;
+
+ @Before
+ public void beforeTest() throws IOException
+ {
+ applicationPath = OperatorContextTestHelper.getUniqueApplicationPath(APPLICATION_PATH_PREFIX);
+ inputDir = applicationPath + File.separator + "input";
+ outputDir = applicationPath + File.separator + "output";
+ inputFile = new File(inputDir + File.separator + FILE);
+ FileUtils.writeStringToFile(inputFile, FILE_DATA);
+ }
+
+ @After
+ public void afterTest()
+ {
+ Path root = new Path(applicationPath);
+ try {
+ FileSystem fs = FileSystem.newInstance(root.toUri(), new Configuration());
+ fs.delete(root, true);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ private CompleteMultipartUploadResult completeMultiPart() throws IOException
+ {
+ FileUtils.copyFile(inputFile, new File(outputDir + File.separator + FILE));
+ CompleteMultipartUploadResult result = new CompleteMultipartUploadResult();
+ result.setETag(outputDir);
+ return result;
+ }
+
+ @Test
+ public void testS3OutputModule() throws Exception
+ {
+ InitiateMultipartUploadResult result = new InitiateMultipartUploadResult();
+ result.setUploadId(uploadId);
+
+ PutObjectResult objResult = new PutObjectResult();
+ objResult.setETag("SuccessFullyUploaded");
+
+ UploadPartResult partResult = new UploadPartResult();
+ partResult.setPartNumber(1);
+ partResult.setETag("SuccessFullyPartUploaded");
+
+ MockitoAnnotations.initMocks(this);
+ when(client.initiateMultipartUpload(any(InitiateMultipartUploadRequest.class))).thenReturn(result);
+ when(client.putObject(any(PutObjectRequest.class))).thenReturn(objResult);
+ when(client.uploadPart(any(UploadPartRequest.class))).thenReturn(partResult);
+ when(client.completeMultipartUpload(any(CompleteMultipartUploadRequest.class))).thenReturn(completeMultiPart());
+
+ Application app = new S3OutputModuleMockTest.Application();
+ Configuration conf = new Configuration();
+ conf.set("dt.operator.HDFSInputModule.prop.files", inputDir);
+ conf.set("dt.operator.HDFSInputModule.prop.blockSize", "10");
+ conf.set("dt.operator.HDFSInputModule.prop.blocksThreshold", "1");
+ conf.set("dt.attr.CHECKPOINT_WINDOW_COUNT","20");
+
+ conf.set("dt.operator.S3OutputModule.prop.accessKey", "accessKey");
+ conf.set("dt.operator.S3OutputModule.prop.secretAccessKey", "secretKey");
+ conf.set("dt.operator.S3OutputModule.prop.bucketName", "bucketKey");
+ conf.set("dt.operator.S3OutputModule.prop.outputDirectoryPath", outputDir);
+
+ Path outDir = new Path("file://" + new File(outputDir).getAbsolutePath());
+ final Path outputFilePath = new Path(outDir.toString() + File.separator + FILE);
+ final FileSystem fs = FileSystem.newInstance(outDir.toUri(), new Configuration());
+ LocalMode lma = LocalMode.newInstance();
+ lma.prepareDAG(app, conf);
+ LocalMode.Controller lc = lma.getController();
+ lc.setHeartbeatMonitoringEnabled(true);
+
+ ((StramLocalCluster)lc).setExitCondition(new Callable<Boolean>()
+ {
+ @Override
+ public Boolean call() throws Exception
+ {
+ return fs.exists(outputFilePath);
+ }
+ });
+ lc.run(10000);
+
+ Assert.assertTrue("output file exist", fs.exists(outputFilePath));
+ }
+
+ private static class Application implements StreamingApplication
+ {
+ @Override
+ public void populateDAG(DAG dag, Configuration conf)
+ {
+ FSInputModule inputModule = dag.addModule("HDFSInputModule", new FSInputModule());
+ S3OutputTestModule outputModule = dag.addModule("S3OutputModule", new S3OutputTestModule());
+
+ dag.addStream("FileMetaData", inputModule.filesMetadataOutput, outputModule.filesMetadataInput);
+ dag.addStream("BlocksMetaData", inputModule.blocksMetadataOutput, outputModule.blocksMetadataInput)
+ .setLocality(DAG.Locality.CONTAINER_LOCAL);
+ dag.addStream("BlocksData", inputModule.messages, outputModule.blockData).setLocality(DAG.Locality.CONTAINER_LOCAL);
+
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/a5e8fa3f/library/src/test/java/org/apache/apex/malhar/lib/fs/s3/S3OutputTestModule.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/org/apache/apex/malhar/lib/fs/s3/S3OutputTestModule.java b/library/src/test/java/org/apache/apex/malhar/lib/fs/s3/S3OutputTestModule.java
new file mode 100644
index 0000000..f1cb291
--- /dev/null
+++ b/library/src/test/java/org/apache/apex/malhar/lib/fs/s3/S3OutputTestModule.java
@@ -0,0 +1,72 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.fs.s3;
+
+import com.amazonaws.services.s3.AmazonS3;
+
+import static org.apache.apex.malhar.lib.fs.s3.S3OutputModuleMockTest.client;
+
+public class S3OutputTestModule extends S3OutputModule
+{
+
+ public static class S3InitiateFileUploadTest extends S3InitiateFileUploadOperator
+ {
+ @Override
+ protected AmazonS3 createClient()
+ {
+ return client;
+ }
+ }
+
+ public static class S3BlockUploadTest extends S3BlockUploadOperator
+ {
+ @Override
+ protected AmazonS3 createClient()
+ {
+ return client;
+ }
+ }
+
+ public static class S3FileMergerTest extends S3FileMerger
+ {
+ @Override
+ protected AmazonS3 createClient()
+ {
+ return client;
+ }
+ }
+
+ @Override
+ protected S3InitiateFileUploadOperator createS3InitiateUpload()
+ {
+ return new S3InitiateFileUploadTest();
+ }
+
+ @Override
+ protected S3BlockUploadOperator createS3BlockUpload()
+ {
+ return new S3BlockUploadTest();
+ }
+
+ @Override
+ protected S3FileMerger createS3FileMerger()
+ {
+ return new S3FileMergerTest();
+ }
+}