You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by ch...@apache.org on 2017/02/25 16:40:34 UTC
apex-malhar git commit: APEXMALHAR-2369 S3 output-tuple-based
Repository: apex-malhar
Updated Branches:
refs/heads/master dd5341f22 -> ec7b480ac
APEXMALHAR-2369 S3 output-tuple-based
Project: http://git-wip-us.apache.org/repos/asf/apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/apex-malhar/commit/ec7b480a
Tree: http://git-wip-us.apache.org/repos/asf/apex-malhar/tree/ec7b480a
Diff: http://git-wip-us.apache.org/repos/asf/apex-malhar/diff/ec7b480a
Branch: refs/heads/master
Commit: ec7b480ac48402f1c1b0bc765c5730776ff4f15e
Parents: dd5341f
Author: yogidevendra <yo...@apache.org>
Authored: Tue Dec 20 15:38:51 2016 +0530
Committer: yogidevendra <yo...@apache.org>
Committed: Fri Feb 24 12:32:12 2017 +0530
----------------------------------------------------------------------
.../lib/io/fs/AbstractReconciler.java | 10 +
.../StatsAwareStatelessPartitioner.java | 2 +-
.../lib/fs/FSRecordCompactionOperator.java | 177 +++++++++
.../apex/malhar/lib/fs/s3/S3Reconciler.java | 316 +++++++++++++++
.../lib/fs/s3/S3ReconcilerQueuePartitioner.java | 116 ++++++
.../malhar/lib/fs/s3/S3TupleOutputModule.java | 398 +++++++++++++++++++
.../lib/fs/FSRecordCompactionOperatorTest.java | 103 +++++
.../apex/malhar/lib/fs/s3/S3ReconcilerTest.java | 138 +++++++
8 files changed, 1259 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/ec7b480a/library/src/main/java/com/datatorrent/lib/io/fs/AbstractReconciler.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/io/fs/AbstractReconciler.java b/library/src/main/java/com/datatorrent/lib/io/fs/AbstractReconciler.java
index 0d67c31..78f340e 100644
--- a/library/src/main/java/com/datatorrent/lib/io/fs/AbstractReconciler.java
+++ b/library/src/main/java/com/datatorrent/lib/io/fs/AbstractReconciler.java
@@ -25,10 +25,14 @@ import java.util.Queue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicReference;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+
import com.google.common.collect.Maps;
import com.google.common.collect.Queues;
+
+import com.datatorrent.api.AutoMetric;
import com.datatorrent.api.Context;
import com.datatorrent.api.DefaultInputPort;
import com.datatorrent.api.Operator.CheckpointNotificationListener;
@@ -76,6 +80,10 @@ public abstract class AbstractReconciler<INPUT, QUEUETUPLE> extends BaseOperator
private transient volatile boolean execute;
private transient AtomicReference<Throwable> cause;
+ @AutoMetric
+ private long queueLength;
+
+
@Override
public void setup(Context.OperatorContext context)
{
@@ -175,6 +183,7 @@ public abstract class AbstractReconciler<INPUT, QUEUETUPLE> extends BaseOperator
}
QUEUETUPLE output = waitingTuples.remove();
processCommittedData(output);
+ --queueLength;
doneTuples.add(output);
}
} catch (Throwable e) {
@@ -195,6 +204,7 @@ public abstract class AbstractReconciler<INPUT, QUEUETUPLE> extends BaseOperator
protected void enqueueForProcessing(QUEUETUPLE queueTuple)
{
currentWindowTuples.get(currentWindowId).add(queueTuple);
+ ++queueLength;
}
/**
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/ec7b480a/library/src/main/java/com/datatorrent/lib/partitioner/StatsAwareStatelessPartitioner.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/partitioner/StatsAwareStatelessPartitioner.java b/library/src/main/java/com/datatorrent/lib/partitioner/StatsAwareStatelessPartitioner.java
index ed571b4..bf648c7 100644
--- a/library/src/main/java/com/datatorrent/lib/partitioner/StatsAwareStatelessPartitioner.java
+++ b/library/src/main/java/com/datatorrent/lib/partitioner/StatsAwareStatelessPartitioner.java
@@ -64,7 +64,7 @@ public abstract class StatsAwareStatelessPartitioner<T extends Operator> impleme
private long nextMillis;
private long partitionNextMillis;
private boolean repartition;
- private transient HashMap<Integer, BatchedOperatorStats> partitionedInstanceStatus = new HashMap<Integer, BatchedOperatorStats>();
+ protected transient HashMap<Integer, BatchedOperatorStats> partitionedInstanceStatus = new HashMap<Integer, BatchedOperatorStats>();
@Min(1)
private int initialPartitionCount = 1;
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/ec7b480a/library/src/main/java/org/apache/apex/malhar/lib/fs/FSRecordCompactionOperator.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/fs/FSRecordCompactionOperator.java b/library/src/main/java/org/apache/apex/malhar/lib/fs/FSRecordCompactionOperator.java
new file mode 100644
index 0000000..1dd12c4
--- /dev/null
+++ b/library/src/main/java/org/apache/apex/malhar/lib/fs/FSRecordCompactionOperator.java
@@ -0,0 +1,177 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.apex.malhar.lib.fs;
+
+import java.io.IOException;
+import java.util.Queue;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import javax.validation.constraints.NotNull;
+
+import org.apache.hadoop.fs.Path;
+
+import com.google.common.base.Preconditions;
+
+import com.datatorrent.api.Context;
+import com.datatorrent.api.DAG;
+import com.datatorrent.api.DefaultOutputPort;
+
+/**
+ * This operator writes incoming tuples to files.
+ * MetaData about the files is emitted on the output port for downstream processing (if any)
+ *
+ * @param <INPUT>
+ * Type for incoming tuples. Converter needs to be defined which
+ * converts these tuples to byte[]. Default converters for String,
+ * byte[] tuples are provided in S3TupleOutputModule.
+ */
+@org.apache.hadoop.classification.InterfaceStability.Evolving
+public class FSRecordCompactionOperator<INPUT> extends GenericFileOutputOperator<INPUT>
+{
+
+ /**
+ * Output port for emitting metadata for finalized files.
+ */
+ public transient DefaultOutputPort<OutputMetaData> output = new DefaultOutputPort<OutputMetaData>();
+
+ /**
+ * Queue for holding finalized files for emitting on output port
+ */
+ private Queue<OutputMetaData> emitQueue = new LinkedBlockingQueue<OutputMetaData>();
+
+ @NotNull
+ String outputDirectoryName = "COMPACTION_OUTPUT_DIR";
+
+ @NotNull
+ String outputFileNamePrefix = "tuples-";
+
+ public FSRecordCompactionOperator()
+ {
+ filePath = "";
+ outputFileName = outputFileNamePrefix;
+ maxLength = 128 * 1024 * 1024L;
+ }
+
+ @Override
+ public void setup(Context.OperatorContext context)
+ {
+ filePath = context.getValue(DAG.APPLICATION_PATH) + Path.SEPARATOR + outputDirectoryName;
+ outputFileName = outputFileNamePrefix + context.getValue(DAG.APPLICATION_ID);
+ super.setup(context);
+ }
+
+ @Override
+ protected void finalizeFile(String fileName) throws IOException
+ {
+ super.finalizeFile(fileName);
+
+ String src = filePath + Path.SEPARATOR + fileName;
+ Path srcPath = new Path(src);
+ long offset = fs.getFileStatus(srcPath).getLen();
+
+ //Add finalized files to the queue
+ OutputMetaData metaData = new OutputMetaData(src, fileName, offset);
+ //finalizeFile is called from committed callback.
+ //Tuples should be emitted only between beginWindow to endWindow. Thus using emitQueue.
+ emitQueue.add(metaData);
+ }
+
+ @Override
+ public void beginWindow(long windowId)
+ {
+ super.beginWindow(windowId);
+ //Emit finalized files from the queue
+ while (!emitQueue.isEmpty()) {
+ output.emit(emitQueue.poll());
+ }
+ }
+
+ public String getOutputDirectoryName()
+ {
+ return outputDirectoryName;
+ }
+
+ public void setOutputDirectoryName(@NotNull String outputDirectoryName)
+ {
+ this.outputDirectoryName = Preconditions.checkNotNull(outputDirectoryName);
+ }
+
+ public String getOutputFileNamePrefix()
+ {
+ return outputFileNamePrefix;
+ }
+
+ public void setOutputFileNamePrefix(@NotNull String outputFileNamePrefix)
+ {
+ this.outputFileNamePrefix = Preconditions.checkNotNull(outputFileNamePrefix);
+ }
+
+ /**
+ * Metadata for output file for downstream processing
+ */
+ public static class OutputMetaData
+ {
+ private String path;
+ private String fileName;
+ private long size;
+
+ public OutputMetaData()
+ {
+ }
+
+ public OutputMetaData(String path, String fileName, long size)
+ {
+ this.path = path;
+ this.fileName = fileName;
+ this.size = size;
+ }
+
+ public String getPath()
+ {
+ return path;
+ }
+
+ public void setPath(String path)
+ {
+ this.path = path;
+ }
+
+ public String getFileName()
+ {
+ return fileName;
+ }
+
+ public void setFileName(String fileName)
+ {
+ this.fileName = fileName;
+ }
+
+ public long getSize()
+ {
+ return size;
+ }
+
+ public void setSize(long size)
+ {
+ this.size = size;
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/ec7b480a/library/src/main/java/org/apache/apex/malhar/lib/fs/s3/S3Reconciler.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/fs/s3/S3Reconciler.java b/library/src/main/java/org/apache/apex/malhar/lib/fs/s3/S3Reconciler.java
new file mode 100644
index 0000000..5fd19f9
--- /dev/null
+++ b/library/src/main/java/org/apache/apex/malhar/lib/fs/s3/S3Reconciler.java
@@ -0,0 +1,316 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.apex.malhar.lib.fs.s3;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+
+import javax.validation.constraints.NotNull;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.apex.malhar.lib.fs.FSRecordCompactionOperator;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+import com.amazonaws.auth.BasicAWSCredentials;
+import com.amazonaws.services.s3.AmazonS3;
+import com.amazonaws.services.s3.AmazonS3Client;
+import com.amazonaws.services.s3.model.ObjectMetadata;
+import com.amazonaws.services.s3.model.PutObjectRequest;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+
+import com.datatorrent.api.Context;
+import com.datatorrent.api.DAG;
+import com.datatorrent.lib.io.fs.AbstractReconciler;
+
+/**
+ * This operator uploads files to Amazon S3 after files are finalized and
+ * frozen by the committed callback.
+ *
+ * S3TupleOutputModule uses this operator in conjunction with S3CompactionOperator
+ */
+@org.apache.hadoop.classification.InterfaceStability.Evolving
+public class S3Reconciler extends AbstractReconciler<FSRecordCompactionOperator.OutputMetaData, FSRecordCompactionOperator.OutputMetaData>
+{
+ /**
+ * Access key id for Amazon S3
+ */
+ @NotNull
+ private String accessKey;
+
+ /**
+ * Secret key for Amazon S3
+ */
+ @NotNull
+ private String secretKey;
+
+ /**
+ * Bucket name for data upload
+ */
+ @NotNull
+ private String bucketName;
+
+ /**
+ * S3 End point
+ */
+ private String endPoint;
+
+ /**
+ * Directory name under S3 bucket
+ */
+ @NotNull
+ private String directoryName;
+
+ /**
+ * Client instance for connecting to Amazon S3
+ */
+ protected transient AmazonS3 s3client;
+
+ /**
+ * FileSystem instance for reading intermediate directory
+ */
+ protected transient FileSystem fs;
+
+ protected transient String filePath;
+
+ private static final String TMP_EXTENSION = ".tmp";
+
+ @Override
+ public void setup(Context.OperatorContext context)
+ {
+ s3client = new AmazonS3Client(new BasicAWSCredentials(accessKey, secretKey));
+ filePath = context.getValue(DAG.APPLICATION_PATH);
+ try {
+ fs = FileSystem.newInstance(new Path(filePath).toUri(), new Configuration());
+ } catch (IOException e) {
+ logger.error("Unable to create FileSystem: {}", e.getMessage());
+ }
+ super.setup(context);
+ }
+
+ /**
+ * Enques the tuple for processing after committed callback
+ */
+ @Override
+ protected void processTuple(FSRecordCompactionOperator.OutputMetaData outputMetaData)
+ {
+ logger.debug("enque : {}", outputMetaData);
+ enqueueForProcessing(outputMetaData);
+ }
+
+ /**
+ * Uploads the file on Amazon S3 using putObject API from S3 client
+ */
+ @Override
+ protected void processCommittedData(FSRecordCompactionOperator.OutputMetaData outputMetaData)
+ {
+ try {
+ Path path = new Path(outputMetaData.getPath());
+ if (fs.exists(path) == false) {
+ logger.debug("Ignoring non-existent path assuming replay : {}", path);
+ return;
+ }
+ FSDataInputStream fsinput = fs.open(path);
+ ObjectMetadata omd = new ObjectMetadata();
+ omd.setContentLength(outputMetaData.getSize());
+ String keyName = directoryName + Path.SEPARATOR + outputMetaData.getFileName();
+ PutObjectRequest request = new PutObjectRequest(bucketName, keyName, fsinput, omd);
+ if (outputMetaData.getSize() < Integer.MAX_VALUE) {
+ request.getRequestClientOptions().setReadLimit((int)outputMetaData.getSize());
+ } else {
+ throw new RuntimeException("PutRequestSize greater than Integer.MAX_VALUE");
+ }
+ if (fs.exists(path)) {
+ logger.debug("Trying to upload : {}", path);
+ s3client.putObject(request);
+ logger.debug("Uploading : {}", keyName);
+ }
+ } catch (FileNotFoundException e) {
+ logger.debug("Ignoring non-existent path assuming replay : {}", outputMetaData.getPath());
+ } catch (IOException e) {
+ logger.error("Unable to create Stream: {}", e.getMessage());
+ }
+ }
+
+ /**
+ * Clears intermediate/temporary files if any
+ */
+ @Override
+ public void endWindow()
+ {
+ logger.info("in endWindow()");
+ while (doneTuples.peek() != null) {
+ FSRecordCompactionOperator.OutputMetaData metaData = doneTuples.poll();
+ logger.debug("found metaData = {}", metaData);
+ committedTuples.remove(metaData);
+ try {
+ Path dest = new Path(metaData.getPath());
+ //Deleting the intermediate files and when writing to tmp files
+ // there can be vagrant tmp files which we have to clean
+ FileStatus[] statuses = fs.listStatus(dest.getParent());
+
+ for (FileStatus status : statuses) {
+ String statusName = status.getPath().getName();
+ if (statusName.endsWith(TMP_EXTENSION) && statusName.startsWith(metaData.getFileName())) {
+ //a tmp file has tmp extension always preceded by timestamp
+ String actualFileName = statusName.substring(0,
+ statusName.lastIndexOf('.', statusName.lastIndexOf('.') - 1));
+ logger.debug("actualFileName = {}", actualFileName);
+ if (metaData.getFileName().equals(actualFileName)) {
+ logger.debug("deleting stray file {}", statusName);
+ fs.delete(status.getPath(), true);
+ }
+ } else if (statusName.equals(metaData.getFileName())) {
+ logger.info("deleting s3-compaction file {}", statusName);
+ fs.delete(status.getPath(), true);
+ }
+ }
+ } catch (IOException e) {
+ logger.error("Unable to Delete a file: {}", metaData.getFileName());
+ }
+ }
+ }
+
+ /**
+ * Get access key id
+ *
+ * @return Access key id for Amazon S3
+ */
+ public String getAccessKey()
+ {
+ return accessKey;
+ }
+
+ /**
+ * Set access key id
+ *
+ * @param accessKey
+ * Access key id for Amazon S3
+ */
+ public void setAccessKey(@NotNull String accessKey)
+ {
+ this.accessKey = Preconditions.checkNotNull(accessKey);
+ }
+
+ /**
+ * Get secret key
+ *
+ * @return Secret key for Amazon S3
+ */
+ public String getSecretKey()
+ {
+ return secretKey;
+ }
+
+ /**
+ * Set secret key
+ *
+ * @param secretKey
+ * Secret key for Amazon S3
+ */
+ public void setSecretKey(@NotNull String secretKey)
+ {
+ this.secretKey = Preconditions.checkNotNull(secretKey);
+ }
+
+ /**
+ * Get bucket name
+ *
+ * @return Bucket name for data upload
+ */
+ public String getBucketName()
+ {
+ return bucketName;
+ }
+
+ /**
+ * Set bucket name
+ *
+ * @param bucketName
+ * Bucket name for data upload
+ */
+ public void setBucketName(@NotNull String bucketName)
+ {
+ this.bucketName = Preconditions.checkNotNull(bucketName);
+ }
+
+ /**
+ * Get directory name
+ *
+ * @return Directory name under S3 bucket
+ */
+ public String getDirectoryName()
+ {
+ return directoryName;
+ }
+
+ /**
+ * Set directory name
+ *
+ * @param directoryName
+ * Directory name under S3 bucket
+ */
+ public void setDirectoryName(@NotNull String directoryName)
+ {
+ this.directoryName = Preconditions.checkNotNull(directoryName);
+ }
+
+ /**
+ * Return the S3 End point
+ *
+ * @return S3 End point
+ */
+ public String getEndPoint()
+ {
+ return endPoint;
+ }
+
+ /**
+ * Set the S3 End point
+ *
+ * @param endPoint
+ * S3 end point
+ */
+ public void setEndPoint(String endPoint)
+ {
+ this.endPoint = endPoint;
+ }
+
+ /**
+ * Set Amazon S3 client
+ *
+ * @param s3client
+ * Client for Amazon S3
+ */
+ @VisibleForTesting
+ void setS3client(AmazonS3 s3client)
+ {
+ this.s3client = s3client;
+ }
+
+ private static final Logger logger = LoggerFactory.getLogger(S3Reconciler.class);
+
+}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/ec7b480a/library/src/main/java/org/apache/apex/malhar/lib/fs/s3/S3ReconcilerQueuePartitioner.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/fs/s3/S3ReconcilerQueuePartitioner.java b/library/src/main/java/org/apache/apex/malhar/lib/fs/s3/S3ReconcilerQueuePartitioner.java
new file mode 100644
index 0000000..edd0054
--- /dev/null
+++ b/library/src/main/java/org/apache/apex/malhar/lib/fs/s3/S3ReconcilerQueuePartitioner.java
@@ -0,0 +1,116 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.apex.malhar.lib.fs.s3;
+
+import java.util.List;
+
+import java.util.Map;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.datatorrent.api.Operator;
+import com.datatorrent.api.Stats;
+import com.datatorrent.lib.partitioner.StatsAwareStatelessPartitioner;
+
+/**
+ * This partitioner looks at Reconciler queue size to decide no. of partitions.
+ * This partitioner is used for S3Reconciler Operator.
+ * @param <T>
+ */
+@org.apache.hadoop.classification.InterfaceStability.Evolving
+public class S3ReconcilerQueuePartitioner<T extends Operator> extends StatsAwareStatelessPartitioner<T>
+{
+ private static final long serialVersionUID = -4407806429128758992L;
+
+ private int maxPartitions = 16;
+ private int minPartitions = 1;
+
+ private int maxQueueSizePerPartition = 4;
+
+ @Override
+ protected int getLoad(BatchedOperatorStats stats)
+ {
+ double totalBacklog = 0;
+ double statsPartitionCount = 0;
+ for (Map.Entry<Integer, BatchedOperatorStats> partitionStatus : partitionedInstanceStatus.entrySet()) {
+ BatchedOperatorStats batchedOperatorStats = partitionStatus.getValue();
+ if (batchedOperatorStats != null) {
+ List<Stats.OperatorStats> lastWindowedStats = batchedOperatorStats.getLastWindowedStats();
+ if (lastWindowedStats != null && lastWindowedStats.size() > 0) {
+ Stats.OperatorStats lastStats = lastWindowedStats.get(lastWindowedStats.size() - 1);
+ Long queueLength = (Long)lastStats.metrics.get("queueLength");
+ totalBacklog += queueLength;
+ statsPartitionCount += 1;
+ logger.debug("queueLength : {}, totalBacklog {},statsPartitionCount{}", queueLength, totalBacklog,
+ statsPartitionCount);
+ }
+ }
+ }
+
+ double backlogPerPartition = totalBacklog / statsPartitionCount;
+ logger.debug("backlogPerPartition : {}", backlogPerPartition);
+ logger.debug("maxQueueSizePerPartition : {}, partitionedInstanceStatus.size():{}" + ", maxPartitions:{}",
+ maxQueueSizePerPartition, partitionedInstanceStatus.size(), maxPartitions);
+
+ if (backlogPerPartition > maxQueueSizePerPartition && partitionedInstanceStatus.size() < maxPartitions) {
+ return 1;
+ }
+ logger.debug("minPartitions:{}", minPartitions);
+
+ if (backlogPerPartition < 1.1 && partitionedInstanceStatus.size() > minPartitions) {
+ return -1;
+ }
+
+ return 0;
+ }
+
+ public int getMaxPartitions()
+ {
+ return maxPartitions;
+ }
+
+ public void setMaxPartitions(int maxPartitions)
+ {
+ this.maxPartitions = maxPartitions;
+ }
+
+ public int getMinPartitions()
+ {
+ return minPartitions;
+ }
+
+ public void setMinPartitions(int minPartitions)
+ {
+ this.minPartitions = minPartitions;
+ }
+
+ public int getMaxQueueSizePerPartition()
+ {
+ return maxQueueSizePerPartition;
+ }
+
+ public void setMaxQueueSizePerPartition(int maxQueueSizePerPartition)
+ {
+ this.maxQueueSizePerPartition = maxQueueSizePerPartition;
+ }
+
+ private static final Logger logger = LoggerFactory.getLogger(S3ReconcilerQueuePartitioner.class);
+}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/ec7b480a/library/src/main/java/org/apache/apex/malhar/lib/fs/s3/S3TupleOutputModule.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/fs/s3/S3TupleOutputModule.java b/library/src/main/java/org/apache/apex/malhar/lib/fs/s3/S3TupleOutputModule.java
new file mode 100644
index 0000000..59cd046
--- /dev/null
+++ b/library/src/main/java/org/apache/apex/malhar/lib/fs/s3/S3TupleOutputModule.java
@@ -0,0 +1,398 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.apex.malhar.lib.fs.s3;
+
+import java.util.Arrays;
+
+import javax.validation.constraints.Min;
+import javax.validation.constraints.NotNull;
+
+import org.apache.apex.malhar.lib.fs.FSRecordCompactionOperator;
+import org.apache.apex.malhar.lib.fs.GenericFileOutputOperator.NoOpConverter;
+import org.apache.apex.malhar.lib.fs.GenericFileOutputOperator.StringToBytesConverter;
+import org.apache.hadoop.conf.Configuration;
+
+import com.google.common.base.Preconditions;
+
+import com.datatorrent.api.Context.OperatorContext;
+import com.datatorrent.api.DAG;
+import com.datatorrent.api.Module;
+import com.datatorrent.api.StatsListener;
+import com.datatorrent.lib.converter.Converter;
+import com.datatorrent.lib.partitioner.StatelessThroughputBasedPartitioner;
+
+/**
+ * S3TupleOutputModule writes incoming tuples into files and uploads these files on Amazon S3.
+ *
+ * @param <INPUT> Type of incoming Tuple.Converter needs to be defined which converts these tuples to byte[].
+ * Default converters for String, byte[] tuples are provided in
+ * S3TupleOutputModule.S3BytesOutputModule, S3TupleOutputModule.S3StringOutputModule
+ *
+ * @displayName S3 Tuple Output Module
+ * @tags S3, Output
+ */
+@org.apache.hadoop.classification.InterfaceStability.Evolving
+public abstract class S3TupleOutputModule<INPUT> implements Module
+{
+ public final transient ProxyInputPort<INPUT> input = new ProxyInputPort<INPUT>();
+
+ /**
+ * AWS access key
+ */
+ @NotNull
+ private String accessKey;
+ /**
+ * AWS secret access key
+ */
+ @NotNull
+ private String secretAccessKey;
+
+ /**
+ * S3 End point
+ */
+ private String endPoint;
+ /**
+ * Name of the bucket in which to upload the files
+ */
+ @NotNull
+ private String bucketName;
+
+ /**
+ * Path of the output directory. Relative path of the files copied will be
+ * maintained w.r.t. source directory and output directory
+ */
+ @NotNull
+ private String outputDirectoryPath;
+
+ /**
+ * Max number of idle windows for which no new data is added to current part
+ * file. Part file will be finalized after these many idle windows after last
+ * new data.
+ */
+ private long maxIdleWindows = 30;
+
+ /**
+ * The maximum length in bytes of a rolling file. The default value of this is
+ * 1MB.
+ */
+ @Min(1)
+ protected Long maxLength = 128 * 1024 * 1024L;
+
+ /**
+ * Maximum number of tuples per sec per partition for HDFS write.
+ */
+ private long maxTuplesPerSecPerPartition = 300000;
+
+ /**
+ * Minimum number of tuples per sec per partition for HDFS write.
+ */
+ private long minTuplesPerSecPerPartition = 30000;
+
+ /**
+ * Time interval in milliseconds to check for repartitioning
+ */
+ private long coolDownMillis = 1 * 60 * 1000;
+
+ /**
+ * Maximum number of S3 upload partitions
+ */
+ private int maxS3UploadPartitions = 16;
+
+ /**
+ * Minimum number of S3 upload partitions
+ */
+ private int minS3UploadPartitions = 1;
+
+ /**
+ * Maximum queue size for S3 upload
+ */
+ private int maxQueueSizeS3Upload = 4;
+
+ @Override
+ public void populateDAG(DAG dag, Configuration conf)
+ {
+ FSRecordCompactionOperator<INPUT> s3compaction = dag.addOperator("S3Compaction", new FSRecordCompactionOperator<INPUT>());
+ s3compaction.setConverter(getConverter());
+ s3compaction.setMaxIdleWindows(maxIdleWindows);
+ s3compaction.setMaxLength(maxLength);
+
+ StatelessThroughputBasedPartitioner<FSRecordCompactionOperator<INPUT>> partitioner = new StatelessThroughputBasedPartitioner<FSRecordCompactionOperator<INPUT>>();
+ partitioner.setMaximumEvents(maxTuplesPerSecPerPartition);
+ partitioner.setMinimumEvents(minTuplesPerSecPerPartition);
+ partitioner.setCooldownMillis(coolDownMillis);
+ dag.setAttribute(s3compaction, OperatorContext.STATS_LISTENERS, Arrays.asList(new StatsListener[] {partitioner}));
+ dag.setAttribute(s3compaction, OperatorContext.PARTITIONER, partitioner);
+
+ S3Reconciler s3Reconciler = dag.addOperator("S3Reconciler", new S3Reconciler());
+ s3Reconciler.setAccessKey(accessKey);
+ s3Reconciler.setSecretKey(secretAccessKey);
+ s3Reconciler.setBucketName(bucketName);
+ s3Reconciler.setEndPoint(endPoint);
+ s3Reconciler.setDirectoryName(outputDirectoryPath);
+
+ S3ReconcilerQueuePartitioner<S3Reconciler> reconcilerPartitioner = new S3ReconcilerQueuePartitioner<S3Reconciler>();
+ reconcilerPartitioner.setCooldownMillis(coolDownMillis);
+ reconcilerPartitioner.setMinPartitions(minS3UploadPartitions);
+ reconcilerPartitioner.setMaxPartitions(maxS3UploadPartitions);
+ reconcilerPartitioner.setMaxQueueSizePerPartition(maxQueueSizeS3Upload);
+
+ dag.setAttribute(s3Reconciler, OperatorContext.STATS_LISTENERS,
+ Arrays.asList(new StatsListener[] {reconcilerPartitioner}));
+ dag.setAttribute(s3Reconciler, OperatorContext.PARTITIONER, reconcilerPartitioner);
+
+ if (endPoint != null) {
+ s3Reconciler.setEndPoint(endPoint);
+ }
+ dag.addStream("write-to-s3", s3compaction.output, s3Reconciler.input);
+ input.set(s3compaction.input);
+ }
+
+ /**
+ * Get the AWS access key
+ *
+ * @return AWS access key
+ */
+ public String getAccessKey()
+ {
+ return accessKey;
+ }
+
+ /**
+ * Set the AWS access key
+ *
+ * @param accessKey
+ * access key
+ */
+ public void setAccessKey(@NotNull String accessKey)
+ {
+ this.accessKey = Preconditions.checkNotNull(accessKey);
+ }
+
+ /**
+ * Return the AWS secret access key
+ *
+ * @return AWS secret access key
+ */
+ public String getSecretAccessKey()
+ {
+ return secretAccessKey;
+ }
+
+ /**
+ * Set the AWS secret access key
+ *
+ * @param secretAccessKey
+ * AWS secret access key
+ */
+ public void setSecretAccessKey(@NotNull String secretAccessKey)
+ {
+ this.secretAccessKey = Preconditions.checkNotNull(secretAccessKey);
+ }
+
+ /**
+ * Get the name of the bucket in which to upload the files
+ *
+ * @return bucket name
+ */
+ public String getBucketName()
+ {
+ return bucketName;
+ }
+
+ /**
+ * Set the name of the bucket in which to upload the files
+ *
+ * @param bucketName
+ * name of the bucket
+ */
+ public void setBucketName(@NotNull String bucketName)
+ {
+ this.bucketName = Preconditions.checkNotNull(bucketName);
+ }
+
+ /**
+ * Return the S3 End point
+ *
+ * @return S3 End point
+ */
+ public String getEndPoint()
+ {
+ return endPoint;
+ }
+
+ /**
+ * Set the S3 End point
+ *
+ * @param endPoint
+ * S3 end point
+ */
+ public void setEndPoint(String endPoint)
+ {
+ this.endPoint = Preconditions.checkNotNull(endPoint);
+ }
+
+ /**
+ * Get the path of the output directory.
+ *
+ * @return path of output directory
+ */
+ public String getOutputDirectoryPath()
+ {
+ return outputDirectoryPath;
+ }
+
+ /**
+ * Set the path of the output directory.
+ *
+ * @param outputDirectoryPath
+ * path of output directory
+ */
+ public void setOutputDirectoryPath(@NotNull String outputDirectoryPath)
+ {
+ this.outputDirectoryPath = Preconditions.checkNotNull(outputDirectoryPath);
+ }
+
+ /**
+ * No. of idle window after which file should be rolled over
+ *
+ * @return max number of idle windows for rollover
+ */
+ public long getMaxIdleWindows()
+ {
+ return maxIdleWindows;
+ }
+
+ /**
+ * No. of idle window after which file should be rolled over
+ *
+ * @param maxIdleWindows
+ * max number of idle windows for rollover
+ */
+ public void setMaxIdleWindows(long maxIdleWindows)
+ {
+ this.maxIdleWindows = maxIdleWindows;
+ }
+
+ /**
+ * Get max length of file after which file should be rolled over
+ *
+ * @return max length of file
+ */
+ public Long getMaxLength()
+ {
+ return maxLength;
+ }
+
+ /**
+ * Set max length of file after which file should be rolled over
+ *
+ * @param maxLength
+ * max length of file
+ */
+ public void setMaxLength(Long maxLength)
+ {
+ this.maxLength = maxLength;
+ }
+
+ public long getMaxTuplesPerSecPerPartition()
+ {
+ return maxTuplesPerSecPerPartition;
+ }
+
+ public void setMaxTuplesPerSecPerPartition(long maxTuplesPerSecPerPartition)
+ {
+ this.maxTuplesPerSecPerPartition = maxTuplesPerSecPerPartition;
+ }
+
+ public long getMinTuplesPerSecPerPartition()
+ {
+ return minTuplesPerSecPerPartition;
+ }
+
+ public void setMinTuplesPerSecPerPartition(long minTuplesPerSecPerPartition)
+ {
+ this.minTuplesPerSecPerPartition = minTuplesPerSecPerPartition;
+ }
+
+ public long getCoolDownMillis()
+ {
+ return coolDownMillis;
+ }
+
+ public void setCoolDownMillis(long coolDownMillis)
+ {
+ this.coolDownMillis = coolDownMillis;
+ }
+
+ public int getMaxS3UploadPartitions()
+ {
+ return maxS3UploadPartitions;
+ }
+
+ public void setMaxS3UploadPartitions(int maxS3UploadPartitions)
+ {
+ this.maxS3UploadPartitions = maxS3UploadPartitions;
+ }
+
+ public int getMinS3UploadPartitions()
+ {
+ return minS3UploadPartitions;
+ }
+
+ public void setMinS3UploadPartitions(int minS3UploadPartitions)
+ {
+ this.minS3UploadPartitions = minS3UploadPartitions;
+ }
+
+ public int getMaxQueueSizeS3Upload()
+ {
+ return maxQueueSizeS3Upload;
+ }
+
+ public void setMaxQueueSizeS3Upload(int maxQueueSizeS3Upload)
+ {
+ this.maxQueueSizeS3Upload = maxQueueSizeS3Upload;
+ }
+
+ /**
+ * Converter for conversion of input tuples to byte[]
+ *
+ * @return converter
+ */
+ protected abstract Converter<INPUT, byte[]> getConverter();
+
+ public static class S3BytesOutputModule extends S3TupleOutputModule<byte[]>
+ {
+ @Override
+ protected Converter<byte[], byte[]> getConverter()
+ {
+ return new NoOpConverter();
+ }
+ }
+
+ public static class S3StringOutputModule extends S3TupleOutputModule<String>
+ {
+ @Override
+ protected Converter<String, byte[]> getConverter()
+ {
+ return new StringToBytesConverter();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/ec7b480a/library/src/test/java/org/apache/apex/malhar/lib/fs/FSRecordCompactionOperatorTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/org/apache/apex/malhar/lib/fs/FSRecordCompactionOperatorTest.java b/library/src/test/java/org/apache/apex/malhar/lib/fs/FSRecordCompactionOperatorTest.java
new file mode 100644
index 0000000..43a8f6c
--- /dev/null
+++ b/library/src/test/java/org/apache/apex/malhar/lib/fs/FSRecordCompactionOperatorTest.java
@@ -0,0 +1,103 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.apex.malhar.lib.fs;
+
+import java.io.File;
+import java.io.IOException;
+
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TestWatcher;
+import org.junit.runner.Description;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.hadoop.fs.Path;
+
+import com.datatorrent.api.Attribute;
+import com.datatorrent.api.Context;
+import com.datatorrent.api.DAG;
+import com.datatorrent.lib.helper.OperatorContextTestHelper;
+import com.datatorrent.lib.testbench.CollectorTestSink;
+
+public class FSRecordCompactionOperatorTest
+{
+
+ private class TestMeta extends TestWatcher
+ {
+ FSRecordCompactionOperator<byte[]> underTest;
+ Context.OperatorContext context;
+ String outputPath;
+
+ @Override
+ protected void starting(Description description)
+ {
+ super.starting(description);
+ outputPath = new File("target/" + description.getClassName() + "/" + description.getMethodName()).getPath();
+
+ Attribute.AttributeMap attributes = new Attribute.AttributeMap.DefaultAttributeMap();
+ attributes.put(DAG.DAGContext.APPLICATION_ID, description.getClassName());
+ attributes.put(DAG.DAGContext.APPLICATION_PATH, outputPath);
+ context = new OperatorContextTestHelper.TestIdOperatorContext(1, attributes);
+
+ underTest = new FSRecordCompactionOperator<byte[]>();
+ underTest.setConverter(new GenericFileOutputOperator.NoOpConverter());
+ underTest.setup(context);
+ underTest.setMaxIdleWindows(10);
+ }
+
+ @Override
+ protected void finished(Description description)
+ {
+ this.underTest.teardown();
+ try {
+ FileUtils.deleteDirectory(new File("target" + Path.SEPARATOR + description.getClassName()));
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ }
+
+ @Rule
+ public TestMeta testMeta = new TestMeta();
+
+ @Test
+ public void testRotate() throws Exception
+ {
+ CollectorTestSink<FSRecordCompactionOperator.OutputMetaData> sink = new CollectorTestSink<FSRecordCompactionOperator.OutputMetaData>();
+ testMeta.underTest.output.setSink((CollectorTestSink)sink);
+
+ for (int i = 0; i < 60; i++) {
+ testMeta.underTest.beginWindow(i);
+ if (i < 10) {
+ testMeta.underTest.input.process(("Record" + Integer.toString(i)).getBytes());
+ }
+ testMeta.underTest.endWindow();
+ }
+ testMeta.underTest.committed(59);
+ for (int i = 60; i < 70; i++) {
+ testMeta.underTest.beginWindow(i);
+ testMeta.underTest.endWindow();
+ }
+
+ Assert.assertEquals("tuples-" + testMeta.context.getAttributes().get(DAG.DAGContext.APPLICATION_ID)
+ + "_1.0", sink.collectedTuples.get(0).getFileName());
+ }
+}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/ec7b480a/library/src/test/java/org/apache/apex/malhar/lib/fs/s3/S3ReconcilerTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/org/apache/apex/malhar/lib/fs/s3/S3ReconcilerTest.java b/library/src/test/java/org/apache/apex/malhar/lib/fs/s3/S3ReconcilerTest.java
new file mode 100644
index 0000000..f1acb9b
--- /dev/null
+++ b/library/src/test/java/org/apache/apex/malhar/lib/fs/s3/S3ReconcilerTest.java
@@ -0,0 +1,138 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.apex.malhar.lib.fs.s3;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Collection;
+
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TestWatcher;
+import org.junit.runner.Description;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+
+import org.apache.apex.malhar.lib.fs.FSRecordCompactionOperator;
+import org.apache.commons.io.FileUtils;
+import org.apache.hadoop.fs.Path;
+
+import com.amazonaws.services.s3.AmazonS3;
+import com.amazonaws.services.s3.model.PutObjectRequest;
+
+import com.datatorrent.api.Attribute;
+import com.datatorrent.api.Context;
+import com.datatorrent.api.DAG;
+import com.datatorrent.lib.helper.OperatorContextTestHelper;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.when;
+
+public class S3ReconcilerTest
+{
+
+ private class TestMeta extends TestWatcher
+ {
+ S3Reconciler underTest;
+ Context.OperatorContext context;
+
+ @Mock
+ AmazonS3 s3clientMock;
+ String outputPath;
+
+ @Override
+ protected void starting(Description description)
+ {
+ super.starting(description);
+ outputPath = new File(
+ "target" + Path.SEPARATOR + description.getClassName() + Path.SEPARATOR + description.getMethodName())
+ .getPath();
+
+ Attribute.AttributeMap attributes = new Attribute.AttributeMap.DefaultAttributeMap();
+ attributes.put(DAG.DAGContext.APPLICATION_ID, description.getClassName());
+ attributes.put(DAG.DAGContext.APPLICATION_PATH, outputPath);
+ context = new OperatorContextTestHelper.TestIdOperatorContext(1, attributes);
+
+ underTest = new S3Reconciler();
+ underTest.setAccessKey("");
+ underTest.setSecretKey("");
+
+ underTest.setup(context);
+
+ MockitoAnnotations.initMocks(this);
+ when(s3clientMock.putObject((PutObjectRequest)any())).thenReturn(null);
+ underTest.setS3client(s3clientMock);
+ }
+
+ @Override
+ protected void finished(Description description)
+ {
+ this.underTest.teardown();
+ try {
+ FileUtils.deleteDirectory(new File("target" + Path.SEPARATOR + description.getClassName()));
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ }
+
+ @Rule
+ public TestMeta testMeta = new TestMeta();
+
+ @Test
+ public void testFileClearing() throws Exception
+ {
+ String fileName = "s3-compaction_1.0";
+ String path = testMeta.outputPath + Path.SEPARATOR + fileName;
+ long size = 80;
+
+ File file = new File(path);
+ File tmpFile = new File(path + "." + System.currentTimeMillis() + ".tmp");
+ StringBuffer sb = new StringBuffer();
+ for (int i = 0; i < 10; i++) {
+ sb.append("Record" + i + "\n");
+ if (i == 5) {
+ FileUtils.write(tmpFile, sb.toString());
+ }
+ }
+ FileUtils.write(file, sb.toString());
+
+ FSRecordCompactionOperator.OutputMetaData outputMetaData = new FSRecordCompactionOperator.OutputMetaData(path, fileName, size);
+ testMeta.underTest.beginWindow(0);
+ testMeta.underTest.input.process(outputMetaData);
+ testMeta.underTest.endWindow();
+
+ for (int i = 1; i < 60; i++) {
+ testMeta.underTest.beginWindow(i);
+ testMeta.underTest.endWindow();
+ }
+ testMeta.underTest.committed(59);
+ for (int i = 60; i < 70; i++) {
+ testMeta.underTest.beginWindow(i);
+ Thread.sleep(10);
+ testMeta.underTest.endWindow();
+ }
+ Collection<File> files =
+ FileUtils.listFiles(new File(testMeta.outputPath), null, true);
+ Assert.assertEquals(0, files.size());
+ }
+}