You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by ch...@apache.org on 2017/02/25 16:40:34 UTC

apex-malhar git commit: APEXMALHAR-2369 S3 output-tuple-based

Repository: apex-malhar
Updated Branches:
  refs/heads/master dd5341f22 -> ec7b480ac


APEXMALHAR-2369 S3 output-tuple-based


Project: http://git-wip-us.apache.org/repos/asf/apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/apex-malhar/commit/ec7b480a
Tree: http://git-wip-us.apache.org/repos/asf/apex-malhar/tree/ec7b480a
Diff: http://git-wip-us.apache.org/repos/asf/apex-malhar/diff/ec7b480a

Branch: refs/heads/master
Commit: ec7b480ac48402f1c1b0bc765c5730776ff4f15e
Parents: dd5341f
Author: yogidevendra <yo...@apache.org>
Authored: Tue Dec 20 15:38:51 2016 +0530
Committer: yogidevendra <yo...@apache.org>
Committed: Fri Feb 24 12:32:12 2017 +0530

----------------------------------------------------------------------
 .../lib/io/fs/AbstractReconciler.java           |  10 +
 .../StatsAwareStatelessPartitioner.java         |   2 +-
 .../lib/fs/FSRecordCompactionOperator.java      | 177 +++++++++
 .../apex/malhar/lib/fs/s3/S3Reconciler.java     | 316 +++++++++++++++
 .../lib/fs/s3/S3ReconcilerQueuePartitioner.java | 116 ++++++
 .../malhar/lib/fs/s3/S3TupleOutputModule.java   | 398 +++++++++++++++++++
 .../lib/fs/FSRecordCompactionOperatorTest.java  | 103 +++++
 .../apex/malhar/lib/fs/s3/S3ReconcilerTest.java | 138 +++++++
 8 files changed, 1259 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/ec7b480a/library/src/main/java/com/datatorrent/lib/io/fs/AbstractReconciler.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/io/fs/AbstractReconciler.java b/library/src/main/java/com/datatorrent/lib/io/fs/AbstractReconciler.java
index 0d67c31..78f340e 100644
--- a/library/src/main/java/com/datatorrent/lib/io/fs/AbstractReconciler.java
+++ b/library/src/main/java/com/datatorrent/lib/io/fs/AbstractReconciler.java
@@ -25,10 +25,14 @@ import java.util.Queue;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.atomic.AtomicReference;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+
 import com.google.common.collect.Maps;
 import com.google.common.collect.Queues;
+
+import com.datatorrent.api.AutoMetric;
 import com.datatorrent.api.Context;
 import com.datatorrent.api.DefaultInputPort;
 import com.datatorrent.api.Operator.CheckpointNotificationListener;
@@ -76,6 +80,10 @@ public abstract class AbstractReconciler<INPUT, QUEUETUPLE> extends BaseOperator
   private transient volatile boolean execute;
   private transient AtomicReference<Throwable> cause;
 
+  @AutoMetric
+  private long queueLength;
+
+
   @Override
   public void setup(Context.OperatorContext context)
   {
@@ -175,6 +183,7 @@ public abstract class AbstractReconciler<INPUT, QUEUETUPLE> extends BaseOperator
             }
             QUEUETUPLE output = waitingTuples.remove();
             processCommittedData(output);
+            --queueLength;
             doneTuples.add(output);
           }
         } catch (Throwable e) {
@@ -195,6 +204,7 @@ public abstract class AbstractReconciler<INPUT, QUEUETUPLE> extends BaseOperator
   protected void enqueueForProcessing(QUEUETUPLE queueTuple)
   {
     currentWindowTuples.get(currentWindowId).add(queueTuple);
+    ++queueLength;
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/ec7b480a/library/src/main/java/com/datatorrent/lib/partitioner/StatsAwareStatelessPartitioner.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/partitioner/StatsAwareStatelessPartitioner.java b/library/src/main/java/com/datatorrent/lib/partitioner/StatsAwareStatelessPartitioner.java
index ed571b4..bf648c7 100644
--- a/library/src/main/java/com/datatorrent/lib/partitioner/StatsAwareStatelessPartitioner.java
+++ b/library/src/main/java/com/datatorrent/lib/partitioner/StatsAwareStatelessPartitioner.java
@@ -64,7 +64,7 @@ public abstract class StatsAwareStatelessPartitioner<T extends Operator> impleme
   private long nextMillis;
   private long partitionNextMillis;
   private boolean repartition;
-  private transient HashMap<Integer, BatchedOperatorStats> partitionedInstanceStatus = new HashMap<Integer, BatchedOperatorStats>();
+  protected transient HashMap<Integer, BatchedOperatorStats> partitionedInstanceStatus = new HashMap<Integer, BatchedOperatorStats>();
   @Min(1)
   private int initialPartitionCount = 1;
 

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/ec7b480a/library/src/main/java/org/apache/apex/malhar/lib/fs/FSRecordCompactionOperator.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/fs/FSRecordCompactionOperator.java b/library/src/main/java/org/apache/apex/malhar/lib/fs/FSRecordCompactionOperator.java
new file mode 100644
index 0000000..1dd12c4
--- /dev/null
+++ b/library/src/main/java/org/apache/apex/malhar/lib/fs/FSRecordCompactionOperator.java
@@ -0,0 +1,177 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.apex.malhar.lib.fs;
+
+import java.io.IOException;
+import java.util.Queue;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import javax.validation.constraints.NotNull;
+
+import org.apache.hadoop.fs.Path;
+
+import com.google.common.base.Preconditions;
+
+import com.datatorrent.api.Context;
+import com.datatorrent.api.DAG;
+import com.datatorrent.api.DefaultOutputPort;
+
+/**
+ * This operator writes incoming tuples to files.
+ * MetaData about the files is emitted on the output port for downstream processing (if any)
+ *
+ * @param <INPUT>
+ *          Type for incoming tuples. Converter needs to be defined which
+ *          converts these tuples to byte[]. Default converters for String,
+ *          byte[] tuples are provided in S3TupleOutputModule.
+ */
+@org.apache.hadoop.classification.InterfaceStability.Evolving
+public class FSRecordCompactionOperator<INPUT> extends GenericFileOutputOperator<INPUT>
+{
+
+  /**
+   * Output port for emitting metadata for finalized files.
+   */
+  public transient DefaultOutputPort<OutputMetaData> output = new DefaultOutputPort<OutputMetaData>();
+
+  /**
+   * Queue for holding finalized files for emitting on output port
+   */
+  private Queue<OutputMetaData> emitQueue = new LinkedBlockingQueue<OutputMetaData>();
+
+  @NotNull
+  String outputDirectoryName = "COMPACTION_OUTPUT_DIR";
+
+  @NotNull
+  String outputFileNamePrefix = "tuples-";
+
+  public FSRecordCompactionOperator()
+  {
+    filePath = "";
+    outputFileName = outputFileNamePrefix;
+    maxLength = 128 * 1024 * 1024L;
+  }
+
+  @Override
+  public void setup(Context.OperatorContext context)
+  {
+    filePath = context.getValue(DAG.APPLICATION_PATH) + Path.SEPARATOR + outputDirectoryName;
+    outputFileName = outputFileNamePrefix + context.getValue(DAG.APPLICATION_ID);
+    super.setup(context);
+  }
+
+  @Override
+  protected void finalizeFile(String fileName) throws IOException
+  {
+    super.finalizeFile(fileName);
+
+    String src = filePath + Path.SEPARATOR + fileName;
+    Path srcPath = new Path(src);
+    long offset = fs.getFileStatus(srcPath).getLen();
+
+    //Add finalized files to the queue
+    OutputMetaData metaData = new OutputMetaData(src, fileName, offset);
+    //finalizeFile is called from committed callback.
+    //Tuples should be emitted only between beginWindow to endWindow. Thus using emitQueue.
+    emitQueue.add(metaData);
+  }
+
+  @Override
+  public void beginWindow(long windowId)
+  {
+    super.beginWindow(windowId);
+    //Emit finalized files from the queue
+    while (!emitQueue.isEmpty()) {
+      output.emit(emitQueue.poll());
+    }
+  }
+
+  public String getOutputDirectoryName()
+  {
+    return outputDirectoryName;
+  }
+
+  public void setOutputDirectoryName(@NotNull String outputDirectoryName)
+  {
+    this.outputDirectoryName = Preconditions.checkNotNull(outputDirectoryName);
+  }
+
+  public String getOutputFileNamePrefix()
+  {
+    return outputFileNamePrefix;
+  }
+
+  public void setOutputFileNamePrefix(@NotNull String outputFileNamePrefix)
+  {
+    this.outputFileNamePrefix = Preconditions.checkNotNull(outputFileNamePrefix);
+  }
+
+  /**
+   * Metadata for output file for downstream processing
+   */
+  public static class OutputMetaData
+  {
+    private String path;
+    private String fileName;
+    private long size;
+
+    public OutputMetaData()
+    {
+    }
+
+    public OutputMetaData(String path, String fileName, long size)
+    {
+      this.path = path;
+      this.fileName = fileName;
+      this.size = size;
+    }
+
+    public String getPath()
+    {
+      return path;
+    }
+
+    public void setPath(String path)
+    {
+      this.path = path;
+    }
+
+    public String getFileName()
+    {
+      return fileName;
+    }
+
+    public void setFileName(String fileName)
+    {
+      this.fileName = fileName;
+    }
+
+    public long getSize()
+    {
+      return size;
+    }
+
+    public void setSize(long size)
+    {
+      this.size = size;
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/ec7b480a/library/src/main/java/org/apache/apex/malhar/lib/fs/s3/S3Reconciler.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/fs/s3/S3Reconciler.java b/library/src/main/java/org/apache/apex/malhar/lib/fs/s3/S3Reconciler.java
new file mode 100644
index 0000000..5fd19f9
--- /dev/null
+++ b/library/src/main/java/org/apache/apex/malhar/lib/fs/s3/S3Reconciler.java
@@ -0,0 +1,316 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.apex.malhar.lib.fs.s3;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+
+import javax.validation.constraints.NotNull;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.apex.malhar.lib.fs.FSRecordCompactionOperator;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+import com.amazonaws.auth.BasicAWSCredentials;
+import com.amazonaws.services.s3.AmazonS3;
+import com.amazonaws.services.s3.AmazonS3Client;
+import com.amazonaws.services.s3.model.ObjectMetadata;
+import com.amazonaws.services.s3.model.PutObjectRequest;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+
+import com.datatorrent.api.Context;
+import com.datatorrent.api.DAG;
+import com.datatorrent.lib.io.fs.AbstractReconciler;
+
+/**
+ * This operator uploads files to Amazon S3 after files are finalized and
+ * frozen by the committed callback.
+ *
+ * S3TupleOutputModule uses this operator in conjunction with S3CompactionOperator
+ */
+@org.apache.hadoop.classification.InterfaceStability.Evolving
+public class S3Reconciler extends AbstractReconciler<FSRecordCompactionOperator.OutputMetaData, FSRecordCompactionOperator.OutputMetaData>
+{
+  /**
+   * Access key id for Amazon S3
+   */
+  @NotNull
+  private String accessKey;
+
+  /**
+   * Secret key for Amazon S3
+   */
+  @NotNull
+  private String secretKey;
+
+  /**
+   * Bucket name for data upload
+   */
+  @NotNull
+  private String bucketName;
+
+  /**
+   * S3 End point
+   */
+  private String endPoint;
+
+  /**
+   * Directory name under S3 bucket
+   */
+  @NotNull
+  private String directoryName;
+
+  /**
+   * Client instance for connecting to Amazon S3
+   */
+  protected transient AmazonS3 s3client;
+
+  /**
+   * FileSystem instance for reading intermediate directory
+   */
+  protected transient FileSystem fs;
+
+  protected transient String filePath;
+
+  private static final String TMP_EXTENSION = ".tmp";
+
+  @Override
+  public void setup(Context.OperatorContext context)
+  {
+    s3client = new AmazonS3Client(new BasicAWSCredentials(accessKey, secretKey));
+    filePath = context.getValue(DAG.APPLICATION_PATH);
+    try {
+      fs = FileSystem.newInstance(new Path(filePath).toUri(), new Configuration());
+    } catch (IOException e) {
+      logger.error("Unable to create FileSystem: {}", e.getMessage());
+    }
+    super.setup(context);
+  }
+
+  /**
+   * Enques the tuple for processing after committed callback
+   */
+  @Override
+  protected void processTuple(FSRecordCompactionOperator.OutputMetaData outputMetaData)
+  {
+    logger.debug("enque : {}", outputMetaData);
+    enqueueForProcessing(outputMetaData);
+  }
+
+  /**
+   * Uploads the file on Amazon S3 using putObject API from S3 client
+   */
+  @Override
+  protected void processCommittedData(FSRecordCompactionOperator.OutputMetaData outputMetaData)
+  {
+    try {
+      Path path = new Path(outputMetaData.getPath());
+      if (fs.exists(path) == false) {
+        logger.debug("Ignoring non-existent path assuming replay : {}", path);
+        return;
+      }
+      FSDataInputStream fsinput = fs.open(path);
+      ObjectMetadata omd = new ObjectMetadata();
+      omd.setContentLength(outputMetaData.getSize());
+      String keyName = directoryName + Path.SEPARATOR + outputMetaData.getFileName();
+      PutObjectRequest request = new PutObjectRequest(bucketName, keyName, fsinput, omd);
+      if (outputMetaData.getSize() < Integer.MAX_VALUE) {
+        request.getRequestClientOptions().setReadLimit((int)outputMetaData.getSize());
+      } else {
+        throw new RuntimeException("PutRequestSize greater than Integer.MAX_VALUE");
+      }
+      if (fs.exists(path)) {
+        logger.debug("Trying to upload : {}", path);
+        s3client.putObject(request);
+        logger.debug("Uploading : {}", keyName);
+      }
+    } catch (FileNotFoundException e) {
+      logger.debug("Ignoring non-existent path assuming replay : {}", outputMetaData.getPath());
+    } catch (IOException e) {
+      logger.error("Unable to create Stream: {}", e.getMessage());
+    }
+  }
+
+  /**
+   * Clears intermediate/temporary files if any
+   */
+  @Override
+  public void endWindow()
+  {
+    logger.info("in endWindow()");
+    while (doneTuples.peek() != null) {
+      FSRecordCompactionOperator.OutputMetaData metaData = doneTuples.poll();
+      logger.debug("found metaData = {}", metaData);
+      committedTuples.remove(metaData);
+      try {
+        Path dest = new Path(metaData.getPath());
+        //Deleting the intermediate files and when writing to tmp files
+        // there can be vagrant tmp files which we have to clean
+        FileStatus[] statuses = fs.listStatus(dest.getParent());
+
+        for (FileStatus status : statuses) {
+          String statusName = status.getPath().getName();
+          if (statusName.endsWith(TMP_EXTENSION) && statusName.startsWith(metaData.getFileName())) {
+            //a tmp file has tmp extension always preceded by timestamp
+            String actualFileName = statusName.substring(0,
+                statusName.lastIndexOf('.', statusName.lastIndexOf('.') - 1));
+            logger.debug("actualFileName = {}", actualFileName);
+            if (metaData.getFileName().equals(actualFileName)) {
+              logger.debug("deleting stray file {}", statusName);
+              fs.delete(status.getPath(), true);
+            }
+          } else if (statusName.equals(metaData.getFileName())) {
+            logger.info("deleting s3-compaction file {}", statusName);
+            fs.delete(status.getPath(), true);
+          }
+        }
+      } catch (IOException e) {
+        logger.error("Unable to Delete a file: {}", metaData.getFileName());
+      }
+    }
+  }
+
+  /**
+   * Get access key id
+   *
+   * @return Access key id for Amazon S3
+   */
+  public String getAccessKey()
+  {
+    return accessKey;
+  }
+
+  /**
+   * Set access key id
+   *
+   * @param accessKey
+   *          Access key id for Amazon S3
+   */
+  public void setAccessKey(@NotNull String accessKey)
+  {
+    this.accessKey = Preconditions.checkNotNull(accessKey);
+  }
+
+  /**
+   * Get secret key
+   *
+   * @return Secret key for Amazon S3
+   */
+  public String getSecretKey()
+  {
+    return secretKey;
+  }
+
+  /**
+   * Set secret key
+   *
+   * @param secretKey
+   *          Secret key for Amazon S3
+   */
+  public void setSecretKey(@NotNull String secretKey)
+  {
+    this.secretKey = Preconditions.checkNotNull(secretKey);
+  }
+
+  /**
+   * Get bucket name
+   *
+   * @return Bucket name for data upload
+   */
+  public String getBucketName()
+  {
+    return bucketName;
+  }
+
+  /**
+   * Set bucket name
+   *
+   * @param bucketName
+   *          Bucket name for data upload
+   */
+  public void setBucketName(@NotNull String bucketName)
+  {
+    this.bucketName = Preconditions.checkNotNull(bucketName);
+  }
+
+  /**
+   * Get directory name
+   *
+   * @return Directory name under S3 bucket
+   */
+  public String getDirectoryName()
+  {
+    return directoryName;
+  }
+
+  /**
+   * Set directory name
+   *
+   * @param directoryName
+   *          Directory name under S3 bucket
+   */
+  public void setDirectoryName(@NotNull String directoryName)
+  {
+    this.directoryName = Preconditions.checkNotNull(directoryName);
+  }
+
+  /**
+   * Return the S3 End point
+   *
+   * @return S3 End point
+   */
+  public String getEndPoint()
+  {
+    return endPoint;
+  }
+
+  /**
+   * Set the S3 End point
+   *
+   * @param endPoint
+   *          S3 end point
+   */
+  public void setEndPoint(String endPoint)
+  {
+    this.endPoint = endPoint;
+  }
+
+  /**
+   * Set Amazon S3 client
+   *
+   * @param s3client
+   *          Client for Amazon S3
+   */
+  @VisibleForTesting
+  void setS3client(AmazonS3 s3client)
+  {
+    this.s3client = s3client;
+  }
+
+  private static final Logger logger = LoggerFactory.getLogger(S3Reconciler.class);
+
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/ec7b480a/library/src/main/java/org/apache/apex/malhar/lib/fs/s3/S3ReconcilerQueuePartitioner.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/fs/s3/S3ReconcilerQueuePartitioner.java b/library/src/main/java/org/apache/apex/malhar/lib/fs/s3/S3ReconcilerQueuePartitioner.java
new file mode 100644
index 0000000..edd0054
--- /dev/null
+++ b/library/src/main/java/org/apache/apex/malhar/lib/fs/s3/S3ReconcilerQueuePartitioner.java
@@ -0,0 +1,116 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.apex.malhar.lib.fs.s3;
+
+import java.util.List;
+
+import java.util.Map;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.datatorrent.api.Operator;
+import com.datatorrent.api.Stats;
+import com.datatorrent.lib.partitioner.StatsAwareStatelessPartitioner;
+
+/**
+ * This partitioner looks at Reconciler queue size to decide no. of partitions.
+ * This partitioner is used for S3Reconciler Operator.
+ * @param <T>
+ */
+@org.apache.hadoop.classification.InterfaceStability.Evolving
+public class S3ReconcilerQueuePartitioner<T extends Operator> extends StatsAwareStatelessPartitioner<T>
+{
+  private static final long serialVersionUID = -4407806429128758992L;
+
+  private int maxPartitions = 16;
+  private int minPartitions = 1;
+
+  private int maxQueueSizePerPartition = 4;
+
+  @Override
+  protected int getLoad(BatchedOperatorStats stats)
+  {
+    double totalBacklog = 0;
+    double statsPartitionCount = 0;
+    for (Map.Entry<Integer, BatchedOperatorStats> partitionStatus : partitionedInstanceStatus.entrySet()) {
+      BatchedOperatorStats batchedOperatorStats = partitionStatus.getValue();
+      if (batchedOperatorStats != null) {
+        List<Stats.OperatorStats> lastWindowedStats = batchedOperatorStats.getLastWindowedStats();
+        if (lastWindowedStats != null && lastWindowedStats.size() > 0) {
+          Stats.OperatorStats lastStats = lastWindowedStats.get(lastWindowedStats.size() - 1);
+          Long queueLength = (Long)lastStats.metrics.get("queueLength");
+          totalBacklog += queueLength;
+          statsPartitionCount += 1;
+          logger.debug("queueLength : {}, totalBacklog {},statsPartitionCount{}", queueLength, totalBacklog,
+              statsPartitionCount);
+        }
+      }
+    }
+
+    double backlogPerPartition = totalBacklog / statsPartitionCount;
+    logger.debug("backlogPerPartition : {}", backlogPerPartition);
+    logger.debug("maxQueueSizePerPartition : {}, partitionedInstanceStatus.size():{}" + ", maxPartitions:{}",
+        maxQueueSizePerPartition, partitionedInstanceStatus.size(), maxPartitions);
+
+    if (backlogPerPartition > maxQueueSizePerPartition && partitionedInstanceStatus.size() < maxPartitions) {
+      return 1;
+    }
+    logger.debug("minPartitions:{}", minPartitions);
+
+    if (backlogPerPartition < 1.1 && partitionedInstanceStatus.size() > minPartitions) {
+      return -1;
+    }
+
+    return 0;
+  }
+
+  public int getMaxPartitions()
+  {
+    return maxPartitions;
+  }
+
+  public void setMaxPartitions(int maxPartitions)
+  {
+    this.maxPartitions = maxPartitions;
+  }
+
+  public int getMinPartitions()
+  {
+    return minPartitions;
+  }
+
+  public void setMinPartitions(int minPartitions)
+  {
+    this.minPartitions = minPartitions;
+  }
+
+  public int getMaxQueueSizePerPartition()
+  {
+    return maxQueueSizePerPartition;
+  }
+
+  public void setMaxQueueSizePerPartition(int maxQueueSizePerPartition)
+  {
+    this.maxQueueSizePerPartition = maxQueueSizePerPartition;
+  }
+
+  private static final Logger logger = LoggerFactory.getLogger(S3ReconcilerQueuePartitioner.class);
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/ec7b480a/library/src/main/java/org/apache/apex/malhar/lib/fs/s3/S3TupleOutputModule.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/fs/s3/S3TupleOutputModule.java b/library/src/main/java/org/apache/apex/malhar/lib/fs/s3/S3TupleOutputModule.java
new file mode 100644
index 0000000..59cd046
--- /dev/null
+++ b/library/src/main/java/org/apache/apex/malhar/lib/fs/s3/S3TupleOutputModule.java
@@ -0,0 +1,398 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.apex.malhar.lib.fs.s3;
+
+import java.util.Arrays;
+
+import javax.validation.constraints.Min;
+import javax.validation.constraints.NotNull;
+
+import org.apache.apex.malhar.lib.fs.FSRecordCompactionOperator;
+import org.apache.apex.malhar.lib.fs.GenericFileOutputOperator.NoOpConverter;
+import org.apache.apex.malhar.lib.fs.GenericFileOutputOperator.StringToBytesConverter;
+import org.apache.hadoop.conf.Configuration;
+
+import com.google.common.base.Preconditions;
+
+import com.datatorrent.api.Context.OperatorContext;
+import com.datatorrent.api.DAG;
+import com.datatorrent.api.Module;
+import com.datatorrent.api.StatsListener;
+import com.datatorrent.lib.converter.Converter;
+import com.datatorrent.lib.partitioner.StatelessThroughputBasedPartitioner;
+
+/**
+ * S3TupleOutputModule writes incoming tuples into files and uploads these files on Amazon S3.
+ *
+ * @param <INPUT> Type of incoming Tuple.Converter needs to be defined which converts these tuples to byte[].
+ * Default converters for String, byte[] tuples are provided in
+ * S3TupleOutputModule.S3BytesOutputModule, S3TupleOutputModule.S3StringOutputModule
+ *
+ * @displayName S3 Tuple Output Module
+ * @tags S3, Output
+ */
+@org.apache.hadoop.classification.InterfaceStability.Evolving
+public abstract class S3TupleOutputModule<INPUT> implements Module
+{
+  public final transient ProxyInputPort<INPUT> input = new ProxyInputPort<INPUT>();
+
+  /**
+   * AWS access key
+   */
+  @NotNull
+  private String accessKey;
+  /**
+   * AWS secret access key
+   */
+  @NotNull
+  private String secretAccessKey;
+
+  /**
+   * S3 End point
+   */
+  private String endPoint;
+  /**
+   * Name of the bucket in which to upload the files
+   */
+  @NotNull
+  private String bucketName;
+
+  /**
+   * Path of the output directory. Relative path of the files copied will be
+   * maintained w.r.t. source directory and output directory
+   */
+  @NotNull
+  private String outputDirectoryPath;
+
+  /**
+   * Max number of idle windows for which no new data is added to current part
+   * file. Part file will be finalized after these many idle windows after last
+   * new data.
+   */
+  private long maxIdleWindows = 30;
+
+  /**
+   * The maximum length in bytes of a rolling file. The default value of this is
+   * 1MB.
+   */
+  @Min(1)
+  protected Long maxLength = 128 * 1024 * 1024L;
+
+  /**
+   * Maximum number of tuples per sec per partition for HDFS write.
+   */
+  private long maxTuplesPerSecPerPartition = 300000;
+
+  /**
+   * Minimum number of tuples per sec per partition for HDFS write.
+   */
+  private long minTuplesPerSecPerPartition = 30000;
+
+  /**
+   * Time interval in milliseconds to check for repartitioning
+   */
+  private long coolDownMillis = 1 * 60 * 1000;
+
+  /**
+   * Maximum number of S3 upload partitions
+   */
+  private int maxS3UploadPartitions = 16;
+
+  /**
+   * Minimum number of S3 upload partitions
+   */
+  private int minS3UploadPartitions = 1;
+
+  /**
+   * Maximum queue size for S3 upload
+   */
+  private int maxQueueSizeS3Upload = 4;
+
+  @Override
+  public void populateDAG(DAG dag, Configuration conf)
+  {
+    FSRecordCompactionOperator<INPUT> s3compaction = dag.addOperator("S3Compaction", new FSRecordCompactionOperator<INPUT>());
+    s3compaction.setConverter(getConverter());
+    s3compaction.setMaxIdleWindows(maxIdleWindows);
+    s3compaction.setMaxLength(maxLength);
+
+    StatelessThroughputBasedPartitioner<FSRecordCompactionOperator<INPUT>> partitioner = new StatelessThroughputBasedPartitioner<FSRecordCompactionOperator<INPUT>>();
+    partitioner.setMaximumEvents(maxTuplesPerSecPerPartition);
+    partitioner.setMinimumEvents(minTuplesPerSecPerPartition);
+    partitioner.setCooldownMillis(coolDownMillis);
+    dag.setAttribute(s3compaction, OperatorContext.STATS_LISTENERS, Arrays.asList(new StatsListener[] {partitioner}));
+    dag.setAttribute(s3compaction, OperatorContext.PARTITIONER, partitioner);
+
+    S3Reconciler s3Reconciler = dag.addOperator("S3Reconciler", new S3Reconciler());
+    s3Reconciler.setAccessKey(accessKey);
+    s3Reconciler.setSecretKey(secretAccessKey);
+    s3Reconciler.setBucketName(bucketName);
+    s3Reconciler.setEndPoint(endPoint);
+    s3Reconciler.setDirectoryName(outputDirectoryPath);
+
+    S3ReconcilerQueuePartitioner<S3Reconciler> reconcilerPartitioner = new S3ReconcilerQueuePartitioner<S3Reconciler>();
+    reconcilerPartitioner.setCooldownMillis(coolDownMillis);
+    reconcilerPartitioner.setMinPartitions(minS3UploadPartitions);
+    reconcilerPartitioner.setMaxPartitions(maxS3UploadPartitions);
+    reconcilerPartitioner.setMaxQueueSizePerPartition(maxQueueSizeS3Upload);
+
+    dag.setAttribute(s3Reconciler, OperatorContext.STATS_LISTENERS,
+        Arrays.asList(new StatsListener[] {reconcilerPartitioner}));
+    dag.setAttribute(s3Reconciler, OperatorContext.PARTITIONER, reconcilerPartitioner);
+
+    if (endPoint != null) {
+      s3Reconciler.setEndPoint(endPoint);
+    }
+    dag.addStream("write-to-s3", s3compaction.output, s3Reconciler.input);
+    input.set(s3compaction.input);
+  }
+
+  /**
+   * Get the AWS access key
+   *
+   * @return AWS access key
+   */
+  public String getAccessKey()
+  {
+    return accessKey;
+  }
+
+  /**
+   * Set the AWS access key
+   *
+   * @param accessKey
+   *          access key
+   */
+  public void setAccessKey(@NotNull String accessKey)
+  {
+    this.accessKey = Preconditions.checkNotNull(accessKey);
+  }
+
+  /**
+   * Return the AWS secret access key
+   *
+   * @return AWS secret access key
+   */
+  public String getSecretAccessKey()
+  {
+    return secretAccessKey;
+  }
+
+  /**
+   * Set the AWS secret access key
+   *
+   * @param secretAccessKey
+   *          AWS secret access key
+   */
+  public void setSecretAccessKey(@NotNull String secretAccessKey)
+  {
+    this.secretAccessKey = Preconditions.checkNotNull(secretAccessKey);
+  }
+
+  /**
+   * Get the name of the bucket in which to upload the files
+   *
+   * @return bucket name
+   */
+  public String getBucketName()
+  {
+    return bucketName;
+  }
+
+  /**
+   * Set the name of the bucket in which to upload the files
+   *
+   * @param bucketName
+   *          name of the bucket
+   */
+  public void setBucketName(@NotNull String bucketName)
+  {
+    this.bucketName = Preconditions.checkNotNull(bucketName);
+  }
+
+  /**
+   * Return the S3 End point
+   *
+   * @return S3 End point
+   */
+  public String getEndPoint()
+  {
+    return endPoint;
+  }
+
+  /**
+   * Set the S3 End point
+   *
+   * @param endPoint
+   *          S3 end point
+   */
+  public void setEndPoint(String endPoint)
+  {
+    this.endPoint = Preconditions.checkNotNull(endPoint);
+  }
+
+  /**
+   * Get the path of the output directory.
+   *
+   * @return path of output directory
+   */
+  public String getOutputDirectoryPath()
+  {
+    return outputDirectoryPath;
+  }
+
+  /**
+   * Set the path of the output directory.
+   *
+   * @param outputDirectoryPath
+   *          path of output directory
+   */
+  public void setOutputDirectoryPath(@NotNull String outputDirectoryPath)
+  {
+    this.outputDirectoryPath = Preconditions.checkNotNull(outputDirectoryPath);
+  }
+
+  /**
+   * No. of idle window after which file should be rolled over
+   *
+   * @return max number of idle windows for rollover
+   */
+  public long getMaxIdleWindows()
+  {
+    return maxIdleWindows;
+  }
+
+  /**
+   * No. of idle window after which file should be rolled over
+   *
+   * @param maxIdleWindows
+   *          max number of idle windows for rollover
+   */
+  public void setMaxIdleWindows(long maxIdleWindows)
+  {
+    this.maxIdleWindows = maxIdleWindows;
+  }
+
+  /**
+   * Get max length of file after which file should be rolled over
+   *
+   * @return max length of file
+   */
+  public Long getMaxLength()
+  {
+    return maxLength;
+  }
+
+  /**
+   * Set max length of file after which file should be rolled over
+   *
+   * @param maxLength
+   *          max length of file
+   */
+  public void setMaxLength(Long maxLength)
+  {
+    this.maxLength = maxLength;
+  }
+
+  public long getMaxTuplesPerSecPerPartition()
+  {
+    return maxTuplesPerSecPerPartition;
+  }
+
+  public void setMaxTuplesPerSecPerPartition(long maxTuplesPerSecPerPartition)
+  {
+    this.maxTuplesPerSecPerPartition = maxTuplesPerSecPerPartition;
+  }
+
+  public long getMinTuplesPerSecPerPartition()
+  {
+    return minTuplesPerSecPerPartition;
+  }
+
+  public void setMinTuplesPerSecPerPartition(long minTuplesPerSecPerPartition)
+  {
+    this.minTuplesPerSecPerPartition = minTuplesPerSecPerPartition;
+  }
+
+  public long getCoolDownMillis()
+  {
+    return coolDownMillis;
+  }
+
+  public void setCoolDownMillis(long coolDownMillis)
+  {
+    this.coolDownMillis = coolDownMillis;
+  }
+
+  public int getMaxS3UploadPartitions()
+  {
+    return maxS3UploadPartitions;
+  }
+
+  public void setMaxS3UploadPartitions(int maxS3UploadPartitions)
+  {
+    this.maxS3UploadPartitions = maxS3UploadPartitions;
+  }
+
+  public int getMinS3UploadPartitions()
+  {
+    return minS3UploadPartitions;
+  }
+
+  public void setMinS3UploadPartitions(int minS3UploadPartitions)
+  {
+    this.minS3UploadPartitions = minS3UploadPartitions;
+  }
+
+  public int getMaxQueueSizeS3Upload()
+  {
+    return maxQueueSizeS3Upload;
+  }
+
+  public void setMaxQueueSizeS3Upload(int maxQueueSizeS3Upload)
+  {
+    this.maxQueueSizeS3Upload = maxQueueSizeS3Upload;
+  }
+
+  /**
+   * Converter for conversion of input tuples to byte[]
+   *
+   * @return converter
+   */
+  protected abstract Converter<INPUT, byte[]> getConverter();
+
+  public static class S3BytesOutputModule extends S3TupleOutputModule<byte[]>
+  {
+    @Override
+    protected Converter<byte[], byte[]> getConverter()
+    {
+      return new NoOpConverter();
+    }
+  }
+
+  public static class S3StringOutputModule extends S3TupleOutputModule<String>
+  {
+    @Override
+    protected Converter<String, byte[]> getConverter()
+    {
+      return new StringToBytesConverter();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/ec7b480a/library/src/test/java/org/apache/apex/malhar/lib/fs/FSRecordCompactionOperatorTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/org/apache/apex/malhar/lib/fs/FSRecordCompactionOperatorTest.java b/library/src/test/java/org/apache/apex/malhar/lib/fs/FSRecordCompactionOperatorTest.java
new file mode 100644
index 0000000..43a8f6c
--- /dev/null
+++ b/library/src/test/java/org/apache/apex/malhar/lib/fs/FSRecordCompactionOperatorTest.java
@@ -0,0 +1,103 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.apex.malhar.lib.fs;
+
+import java.io.File;
+import java.io.IOException;
+
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TestWatcher;
+import org.junit.runner.Description;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.hadoop.fs.Path;
+
+import com.datatorrent.api.Attribute;
+import com.datatorrent.api.Context;
+import com.datatorrent.api.DAG;
+import com.datatorrent.lib.helper.OperatorContextTestHelper;
+import com.datatorrent.lib.testbench.CollectorTestSink;
+
+public class FSRecordCompactionOperatorTest
+{
+
+  private class TestMeta extends TestWatcher
+  {
+    FSRecordCompactionOperator<byte[]> underTest;
+    Context.OperatorContext context;
+    String outputPath;
+
+    @Override
+    protected void starting(Description description)
+    {
+      super.starting(description);
+      outputPath = new File("target/" + description.getClassName() + "/" + description.getMethodName()).getPath();
+
+      Attribute.AttributeMap attributes = new Attribute.AttributeMap.DefaultAttributeMap();
+      attributes.put(DAG.DAGContext.APPLICATION_ID, description.getClassName());
+      attributes.put(DAG.DAGContext.APPLICATION_PATH, outputPath);
+      context = new OperatorContextTestHelper.TestIdOperatorContext(1, attributes);
+
+      underTest = new FSRecordCompactionOperator<byte[]>();
+      underTest.setConverter(new GenericFileOutputOperator.NoOpConverter());
+      underTest.setup(context);
+      underTest.setMaxIdleWindows(10);
+    }
+
+    @Override
+    protected void finished(Description description)
+    {
+      this.underTest.teardown();
+      try {
+        FileUtils.deleteDirectory(new File("target" + Path.SEPARATOR + description.getClassName()));
+      } catch (IOException e) {
+        throw new RuntimeException(e);
+      }
+    }
+  }
+
+  @Rule
+  public TestMeta testMeta = new TestMeta();
+
+  @Test
+  public void testRotate() throws Exception
+  {
+    CollectorTestSink<FSRecordCompactionOperator.OutputMetaData> sink = new CollectorTestSink<FSRecordCompactionOperator.OutputMetaData>();
+    testMeta.underTest.output.setSink((CollectorTestSink)sink);
+
+    for (int i = 0; i < 60; i++) {
+      testMeta.underTest.beginWindow(i);
+      if (i < 10) {
+        testMeta.underTest.input.process(("Record" + Integer.toString(i)).getBytes());
+      }
+      testMeta.underTest.endWindow();
+    }
+    testMeta.underTest.committed(59);
+    for (int i = 60; i < 70; i++) {
+      testMeta.underTest.beginWindow(i);
+      testMeta.underTest.endWindow();
+    }
+
+    Assert.assertEquals("tuples-" + testMeta.context.getAttributes().get(DAG.DAGContext.APPLICATION_ID)
+        + "_1.0", sink.collectedTuples.get(0).getFileName());
+  }
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/ec7b480a/library/src/test/java/org/apache/apex/malhar/lib/fs/s3/S3ReconcilerTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/org/apache/apex/malhar/lib/fs/s3/S3ReconcilerTest.java b/library/src/test/java/org/apache/apex/malhar/lib/fs/s3/S3ReconcilerTest.java
new file mode 100644
index 0000000..f1acb9b
--- /dev/null
+++ b/library/src/test/java/org/apache/apex/malhar/lib/fs/s3/S3ReconcilerTest.java
@@ -0,0 +1,138 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.apex.malhar.lib.fs.s3;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Collection;
+
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TestWatcher;
+import org.junit.runner.Description;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+
+import org.apache.apex.malhar.lib.fs.FSRecordCompactionOperator;
+import org.apache.commons.io.FileUtils;
+import org.apache.hadoop.fs.Path;
+
+import com.amazonaws.services.s3.AmazonS3;
+import com.amazonaws.services.s3.model.PutObjectRequest;
+
+import com.datatorrent.api.Attribute;
+import com.datatorrent.api.Context;
+import com.datatorrent.api.DAG;
+import com.datatorrent.lib.helper.OperatorContextTestHelper;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.when;
+
+public class S3ReconcilerTest
+{
+
+  private class TestMeta extends TestWatcher
+  {
+    S3Reconciler underTest;
+    Context.OperatorContext context;
+
+    @Mock
+    AmazonS3 s3clientMock;
+    String outputPath;
+
+    @Override
+    protected void starting(Description description)
+    {
+      super.starting(description);
+      outputPath = new File(
+          "target" + Path.SEPARATOR + description.getClassName() + Path.SEPARATOR + description.getMethodName())
+              .getPath();
+
+      Attribute.AttributeMap attributes = new Attribute.AttributeMap.DefaultAttributeMap();
+      attributes.put(DAG.DAGContext.APPLICATION_ID, description.getClassName());
+      attributes.put(DAG.DAGContext.APPLICATION_PATH, outputPath);
+      context = new OperatorContextTestHelper.TestIdOperatorContext(1, attributes);
+
+      underTest = new S3Reconciler();
+      underTest.setAccessKey("");
+      underTest.setSecretKey("");
+
+      underTest.setup(context);
+
+      MockitoAnnotations.initMocks(this);
+      when(s3clientMock.putObject((PutObjectRequest)any())).thenReturn(null);
+      underTest.setS3client(s3clientMock);
+    }
+
+    @Override
+    protected void finished(Description description)
+    {
+      this.underTest.teardown();
+      try {
+        FileUtils.deleteDirectory(new File("target" + Path.SEPARATOR + description.getClassName()));
+      } catch (IOException e) {
+        throw new RuntimeException(e);
+      }
+    }
+
+  }
+
+  @Rule
+  public TestMeta testMeta = new TestMeta();
+
+  @Test
+  public void testFileClearing() throws Exception
+  {
+    String fileName = "s3-compaction_1.0";
+    String path = testMeta.outputPath + Path.SEPARATOR + fileName;
+    long size = 80;
+
+    File file = new File(path);
+    File tmpFile = new File(path + "." + System.currentTimeMillis() + ".tmp");
+    StringBuffer sb = new StringBuffer();
+    for (int i = 0; i < 10; i++) {
+      sb.append("Record" + i + "\n");
+      if (i == 5) {
+        FileUtils.write(tmpFile, sb.toString());
+      }
+    }
+    FileUtils.write(file, sb.toString());
+
+    FSRecordCompactionOperator.OutputMetaData outputMetaData = new FSRecordCompactionOperator.OutputMetaData(path, fileName, size);
+    testMeta.underTest.beginWindow(0);
+    testMeta.underTest.input.process(outputMetaData);
+    testMeta.underTest.endWindow();
+
+    for (int i = 1; i < 60; i++) {
+      testMeta.underTest.beginWindow(i);
+      testMeta.underTest.endWindow();
+    }
+    testMeta.underTest.committed(59);
+    for (int i = 60; i < 70; i++) {
+      testMeta.underTest.beginWindow(i);
+      Thread.sleep(10);
+      testMeta.underTest.endWindow();
+    }
+    Collection<File> files =
+        FileUtils.listFiles(new File(testMeta.outputPath), null, true);
+    Assert.assertEquals(0, files.size());
+  }
+}