You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streams.apache.org by sb...@apache.org on 2016/11/25 20:25:20 UTC

[39/42] incubator-streams git commit: STREAMS-440: custom checkstyle.xml, address compliance

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-contrib/streams-amazon-aws/streams-persist-kinesis/src/main/java/org/apache/streams/amazon/kinesis/KinesisPersistWriter.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-amazon-aws/streams-persist-kinesis/src/main/java/org/apache/streams/amazon/kinesis/KinesisPersistWriter.java b/streams-contrib/streams-amazon-aws/streams-persist-kinesis/src/main/java/org/apache/streams/amazon/kinesis/KinesisPersistWriter.java
index 0a39461..b61a364 100644
--- a/streams-contrib/streams-amazon-aws/streams-persist-kinesis/src/main/java/org/apache/streams/amazon/kinesis/KinesisPersistWriter.java
+++ b/streams-contrib/streams-amazon-aws/streams-persist-kinesis/src/main/java/org/apache/streams/amazon/kinesis/KinesisPersistWriter.java
@@ -16,8 +16,15 @@
  * specific language governing permissions and limitations
  * under the License.
  */
+
 package org.apache.streams.amazon.kinesis;
 
+import org.apache.streams.config.ComponentConfigurator;
+import org.apache.streams.config.StreamsConfigurator;
+import org.apache.streams.converter.TypeConverterUtil;
+import org.apache.streams.core.StreamsDatum;
+import org.apache.streams.core.StreamsPersistWriter;
+
 import com.amazonaws.ClientConfiguration;
 import com.amazonaws.Protocol;
 import com.amazonaws.auth.AWSCredentials;
@@ -30,11 +37,7 @@ import com.amazonaws.services.kinesis.model.PutRecordResult;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.base.Strings;
 import com.typesafe.config.Config;
-import org.apache.streams.config.ComponentConfigurator;
-import org.apache.streams.config.StreamsConfigurator;
-import org.apache.streams.converter.TypeConverterUtil;
-import org.apache.streams.core.StreamsDatum;
-import org.apache.streams.core.StreamsPersistWriter;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -48,87 +51,94 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
 
 /**
- * Created by sblackmon on 9/2/15.
+ * KinesisPersistWriter writes documents to kinesis.
  */
 public class KinesisPersistWriter implements StreamsPersistWriter {
 
-    public final static String STREAMS_ID = "KinesisPersistWriter";
+  public static final String STREAMS_ID = "KinesisPersistWriter";
 
-    private static final Logger LOGGER = LoggerFactory.getLogger(KinesisPersistWriter.class);
+  private static final Logger LOGGER = LoggerFactory.getLogger(KinesisPersistWriter.class);
 
-    protected volatile Queue<StreamsDatum> persistQueue;
+  protected volatile Queue<StreamsDatum> persistQueue;
 
-    private ObjectMapper mapper = new ObjectMapper();
+  private ObjectMapper mapper = new ObjectMapper();
 
-    private KinesisWriterConfiguration config;
+  private KinesisWriterConfiguration config;
 
-    private List<String> streamName;
+  private List<String> streamName;
 
-    private ExecutorService executor;
+  private ExecutorService executor;
 
-    protected AmazonKinesisClient client;
-
-    public KinesisPersistWriter() {
-        Config config = StreamsConfigurator.config.getConfig("kinesis");
-        this.config = new ComponentConfigurator<>(KinesisWriterConfiguration.class).detectConfiguration(config);
-        this.persistQueue  = new ConcurrentLinkedQueue<StreamsDatum>();
-    }
+  protected AmazonKinesisClient client;
 
-    public KinesisPersistWriter(KinesisWriterConfiguration config) {
-        this.config = config;
-        this.persistQueue  = new ConcurrentLinkedQueue<StreamsDatum>();
-    }
+  /**
+   * KinesisPersistWriter constructor - resolves KinesisWriterConfiguration from JVM 'kinesis'.
+   */
+  public KinesisPersistWriter() {
+    Config config = StreamsConfigurator.config.getConfig("kinesis");
+    this.config = new ComponentConfigurator<>(KinesisWriterConfiguration.class).detectConfiguration(config);
+    this.persistQueue  = new ConcurrentLinkedQueue<StreamsDatum>();
+  }
 
-    public void setConfig(KinesisWriterConfiguration config) {
-        this.config = config;
-    }
+  /**
+   * KinesisPersistWriter constructor - uses provided KinesisWriterConfiguration.
+   */
+  public KinesisPersistWriter(KinesisWriterConfiguration config) {
+    this.config = config;
+    this.persistQueue  = new ConcurrentLinkedQueue<StreamsDatum>();
+  }
 
-    @Override
-    public String getId() {
-        return STREAMS_ID;
-    }
+  public void setConfig(KinesisWriterConfiguration config) {
+    this.config = config;
+  }
 
-    @Override
-    public void write(StreamsDatum entry) {
+  @Override
+  public String getId() {
+    return STREAMS_ID;
+  }
 
-        String document = (String) TypeConverterUtil.getInstance().convert(entry.getDocument(), String.class);
+  @Override
+  public void write(StreamsDatum entry) {
 
-        PutRecordRequest putRecordRequest = new PutRecordRequest()
-                .withStreamName(config.getStream())
-                .withPartitionKey(entry.getId())
-                .withData(ByteBuffer.wrap(document.getBytes()));
+    String document = (String) TypeConverterUtil.getInstance().convert(entry.getDocument(), String.class);
 
-        PutRecordResult putRecordResult = client.putRecord(putRecordRequest);
+    PutRecordRequest putRecordRequest = new PutRecordRequest()
+        .withStreamName(config.getStream())
+        .withPartitionKey(entry.getId())
+        .withData(ByteBuffer.wrap(document.getBytes()));
 
-        entry.setSequenceid(new BigInteger(putRecordResult.getSequenceNumber()));
+    PutRecordResult putRecordResult = client.putRecord(putRecordRequest);
 
-        LOGGER.debug("Wrote {}", entry);
-    }
+    entry.setSequenceid(new BigInteger(putRecordResult.getSequenceNumber()));
 
-    @Override
-    public void prepare(Object configurationObject) {
-        // Connect to Kinesis
-        synchronized (this) {
-            // Create the credentials Object
-            AWSCredentials credentials = new BasicAWSCredentials(config.getKey(), config.getSecretKey());
+    LOGGER.debug("Wrote {}", entry);
+  }
 
-            ClientConfiguration clientConfig = new ClientConfiguration();
-            clientConfig.setProtocol(Protocol.valueOf(config.getProtocol().toString()));
+  @Override
+  public void prepare(Object configurationObject) {
+    // Connect to Kinesis
+    synchronized (this) {
+      // Create the credentials Object
+      AWSCredentials credentials = new BasicAWSCredentials(config.getKey(), config.getSecretKey());
 
-            this.client = new AmazonKinesisClient(credentials, clientConfig);
-            if (!Strings.isNullOrEmpty(config.getRegion()))
-                this.client.setRegion(Region.getRegion(Regions.fromName(config.getRegion())));
-        }
-        executor = Executors.newSingleThreadExecutor();
+      ClientConfiguration clientConfig = new ClientConfiguration();
+      clientConfig.setProtocol(Protocol.valueOf(config.getProtocol().toString()));
 
+      this.client = new AmazonKinesisClient(credentials, clientConfig);
+      if (!Strings.isNullOrEmpty(config.getRegion())) {
+        this.client.setRegion(Region.getRegion(Regions.fromName(config.getRegion())));
+      }
     }
+    executor = Executors.newSingleThreadExecutor();
+
+  }
 
-    @Override
-    public void cleanUp() {
-        try {
-            executor.awaitTermination(5, TimeUnit.SECONDS);
-        } catch (InterruptedException e) {
-            LOGGER.debug("Interrupted! ", e);
-        }
+  @Override
+  public void cleanUp() {
+    try {
+      executor.awaitTermination(5, TimeUnit.SECONDS);
+    } catch (InterruptedException ex) {
+      LOGGER.debug("Interrupted! ", ex);
     }
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3ObjectInputStreamWrapper.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3ObjectInputStreamWrapper.java b/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3ObjectInputStreamWrapper.java
index c13314d..f34782a 100644
--- a/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3ObjectInputStreamWrapper.java
+++ b/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3ObjectInputStreamWrapper.java
@@ -15,10 +15,12 @@
  * specific language governing permissions and limitations
  * under the License.
  */
+
 package org.apache.streams.s3;
 
 import com.amazonaws.services.s3.model.S3Object;
 import com.amazonaws.services.s3.model.S3ObjectInputStream;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -34,125 +36,129 @@ import java.io.InputStream;
  * and transfer the entire file. If you are only reading the first 50 lines of a 5,000,000 line file
  * this becomes problematic.
  *
+ * <p/>
  * This class operates as a wrapper to fix the aforementioned nuances.
  *
+ * <p/>
  * Reference:
  * http://stackoverflow.com/questions/17782937/connectionpooltimeoutexception-when-iterating-objects-in-s3
  */
-public class S3ObjectInputStreamWrapper extends InputStream
-{
-    private final static Logger LOGGER = LoggerFactory.getLogger(S3ObjectInputStreamWrapper.class);
-
-    private final S3Object s3Object;
-    private final S3ObjectInputStream is;
-    private boolean isClosed = false;
-
-    /**
-     * Create an input stream safely from
-     * @param s3Object
-     */
-    public S3ObjectInputStreamWrapper(S3Object s3Object) {
-        this.s3Object = s3Object;
-        this.is = this.s3Object.getObjectContent();
-    }
-
-    public int hashCode() {
-        return this.is.hashCode();
-    }
-
-    public boolean equals(Object obj) {
-        return this.is.equals(obj);
-    }
-
-    public String toString() {
-        return this.is.toString();
-    }
-
-    public int read() throws IOException {
-        return this.is.read();
-    }
-
-    public int read(byte[] b) throws IOException {
-        return this.is.read(b);
-    }
-
-    public int read(byte[] b, int off, int len) throws IOException {
-        return this.is.read(b, off, len);
-    }
-
-    public long skip(long n) throws IOException {
-        return this.is.skip(n);
-    }
-
-    public int available() throws IOException {
-        return this.is.available();
-    }
-
-    public boolean markSupported() {
-        return this.is.markSupported();
-    }
-
-    public synchronized void mark(int readlimit) {
-        this.is.mark(readlimit);
-    }
-
-    public synchronized void reset() throws IOException {
-        this.is.reset();
-    }
-
-    public void close() throws IOException {
-        ensureEverythingIsReleased();
-    }
+public class S3ObjectInputStreamWrapper extends InputStream {
 
-    public void ensureEverythingIsReleased() {
-        if(this.isClosed)
-            return;
-
-
-        try {
-            // ensure that the S3 Object is closed properly.
-            this.s3Object.close();
-        } catch(Throwable e) {
-            LOGGER.warn("Problem Closing the S3Object[{}]: {}", s3Object.getKey(), e.getMessage());
-        }
-
-
-        try {
-            // Abort the stream
-            this.is.abort();
-        }
-        catch(Throwable e) {
-            LOGGER.warn("Problem Aborting S3Object[{}]: {}", s3Object.getKey(), e.getMessage());
-        }
-
-        // close the input Stream Safely
-        closeSafely(this.is);
-
-        // This corrects the issue with Open HTTP connections
-        closeSafely(this.s3Object);
-        this.isClosed = true;
-    }
-
-    private static void closeSafely(Closeable is) {
-        try {
-            if(is != null)
-                is.close();
-        } catch(Exception e) {
-            e.printStackTrace();
-            LOGGER.warn("S3InputStreamWrapper: Issue Closing Closeable - {}", e.getMessage());
-        }
-    }
+  private static final Logger LOGGER = LoggerFactory.getLogger(S3ObjectInputStreamWrapper.class);
+
+  private final S3Object s3Object;
+  private final S3ObjectInputStream is;
+  private boolean isClosed = false;
 
-    protected void finalize( ) throws Throwable
-    {
-        try {
-            // If there is an accidental leak where the user did not close, call this on the classes destructor
-            ensureEverythingIsReleased();
-            super.finalize();
-        } catch(Exception e) {
-            // this should never be called, just being very cautious
-            LOGGER.warn("S3InputStreamWrapper: Issue Releasing Connections on Finalize - {}", e.getMessage());
-        }
+  /**
+   * Create an input stream safely.
+   * @param s3Object s3Object
+   */
+  public S3ObjectInputStreamWrapper(S3Object s3Object) {
+    this.s3Object = s3Object;
+    this.is = this.s3Object.getObjectContent();
+  }
+
+  public int hashCode() {
+    return this.is.hashCode();
+  }
+
+  public boolean equals(Object obj) {
+    return this.is.equals(obj);
+  }
+
+  public String toString() {
+    return this.is.toString();
+  }
+
+  public int read() throws IOException {
+    return this.is.read();
+  }
+
+  public int read(byte[] byt) throws IOException {
+    return this.is.read(byt);
+  }
+
+  public int read(byte[] byt, int off, int len) throws IOException {
+    return this.is.read(byt, off, len);
+  }
+
+  public long skip(long skip) throws IOException {
+    return this.is.skip(skip);
+  }
+
+  public int available() throws IOException {
+    return this.is.available();
+  }
+
+  public boolean markSupported() {
+    return this.is.markSupported();
+  }
+
+  public synchronized void mark(int readlimit) {
+    this.is.mark(readlimit);
+  }
+
+  public synchronized void reset() throws IOException {
+    this.is.reset();
+  }
+
+  public void close() throws IOException {
+    ensureEverythingIsReleased();
+  }
+
+  /**
+   * ensureEverythingIsReleased as part of close process.
+   */
+  public void ensureEverythingIsReleased() {
+    if (this.isClosed) {
+      return;
+    }
+
+    try {
+      // ensure that the S3 Object is closed properly.
+      this.s3Object.close();
+    } catch (Throwable ex) {
+      LOGGER.warn("Problem Closing the S3Object[{}]: {}", s3Object.getKey(), ex.getMessage());
+    }
+
+
+    try {
+      // Abort the stream
+      this.is.abort();
+    } catch (Throwable ex) {
+      LOGGER.warn("Problem Aborting S3Object[{}]: {}", s3Object.getKey(), ex.getMessage());
+    }
+
+    // close the input Stream Safely
+    closeSafely(this.is);
+
+    // This corrects the issue with Open HTTP connections
+    closeSafely(this.s3Object);
+    this.isClosed = true;
+  }
+
+  private static void closeSafely(Closeable is) {
+    try {
+      if (is != null) {
+        is.close();
+      }
+    } catch (Exception ex) {
+      ex.printStackTrace();
+      LOGGER.warn("S3InputStreamWrapper: Issue Closing Closeable - {}", ex.getMessage());
+    }
+  }
+
+  protected void finalize() throws Throwable {
+    try {
+      // If there is an accidental leak where the user did not close, call this on the classes destructor
+      ensureEverythingIsReleased();
+      super.finalize();
+    } catch (Exception ex) {
+      // this should never be called, just being very cautious
+      LOGGER.warn("S3InputStreamWrapper: Issue Releasing Connections on Finalize - {}", ex.getMessage());
     }
+  }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3OutputStreamWrapper.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3OutputStreamWrapper.java b/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3OutputStreamWrapper.java
index 08fc774..e8ca0c7 100644
--- a/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3OutputStreamWrapper.java
+++ b/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3OutputStreamWrapper.java
@@ -15,17 +15,23 @@
  * specific language governing permissions and limitations
  * under the License.
  */
+
 package org.apache.streams.s3;
 
 import com.amazonaws.services.s3.AmazonS3Client;
 import com.amazonaws.services.s3.model.ObjectMetadata;
 import com.amazonaws.services.s3.transfer.TransferManager;
 import com.amazonaws.services.s3.transfer.Upload;
+
 import org.joda.time.DateTime;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.*;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
 import java.util.Map;
 
 /**
@@ -33,112 +39,109 @@ import java.util.Map;
  * in memory ByteArrayOutPutStream before it is finally written to Amazon S3. The size the file is allowed to become
  * is directly controlled by the S3PersistWriter.
  */
-public class S3OutputStreamWrapper extends OutputStream
-{
-    private static final Logger LOGGER = LoggerFactory.getLogger(S3OutputStreamWrapper.class);
-
-    private final AmazonS3Client amazonS3Client;
-    private final String bucketName;
-    private final String path;
-    private final String fileName;
-    private ByteArrayOutputStream outputStream;
-    private final Map<String, String> metaData;
-    private boolean isClosed = false;
-
-    /**
-     * Create an OutputStream Wrapper
-     * @param amazonS3Client
-     * The Amazon S3 Client which will be handling the file
-     * @param bucketName
-     * The Bucket Name you are wishing to write to.
-     * @param path
-     * The path where the object will live
-     * @param fileName
-     * The fileName you ware wishing to write.
-     * @param metaData
-     * Any meta data that is to be written along with the object
-     * @throws IOException
-     * If there is an issue creating the stream, this
-     */
-    public S3OutputStreamWrapper(AmazonS3Client amazonS3Client, String bucketName, String path, String fileName, Map<String, String> metaData) throws IOException {
-        this.amazonS3Client = amazonS3Client;
-        this.bucketName = bucketName;
-        this.path = path;
-        this.fileName = fileName;
-        this.metaData = metaData;
-        this.outputStream = new ByteArrayOutputStream();
+public class S3OutputStreamWrapper extends OutputStream {
+
+  private static final Logger LOGGER = LoggerFactory.getLogger(S3OutputStreamWrapper.class);
+
+  private final AmazonS3Client amazonS3Client;
+  private final String bucketName;
+  private final String path;
+  private final String fileName;
+  private ByteArrayOutputStream outputStream;
+  private final Map<String, String> metaData;
+  private boolean isClosed = false;
+
+  /**
+   * Create an OutputStream Wrapper
+   * @param amazonS3Client
+   * The Amazon S3 Client which will be handling the file
+   * @param bucketName
+   * The Bucket Name you are wishing to write to.
+   * @param path
+   * The path where the object will live
+   * @param fileName
+   * The fileName you ware wishing to write.
+   * @param metaData
+   * Any meta data that is to be written along with the object
+   * @throws IOException
+   * If there is an issue creating the stream, this
+   */
+  public S3OutputStreamWrapper(AmazonS3Client amazonS3Client, String bucketName, String path, String fileName, Map<String, String> metaData) throws IOException {
+    this.amazonS3Client = amazonS3Client;
+    this.bucketName = bucketName;
+    this.path = path;
+    this.fileName = fileName;
+    this.metaData = metaData;
+    this.outputStream = new ByteArrayOutputStream();
+  }
+
+  public void write(int byt) throws IOException {
+    this.outputStream.write(byt);
+  }
+
+  public void write(byte[] byt) throws IOException {
+    this.outputStream.write(byt);
+  }
+
+  public void write(byte[] byt, int off, int len) throws IOException {
+    this.outputStream.write(byt, off, len);
+  }
+
+  public void flush() throws IOException {
+    this.outputStream.flush();
+  }
+
+  /**
+   * Whenever the output stream is closed we are going to kick the ByteArrayOutputStream off to Amazon S3.
+   * @throws IOException
+   * Exception thrown from the FileOutputStream
+   */
+  public void close() throws IOException {
+    if (!isClosed) {
+      try {
+        this.addFile();
+        this.outputStream.close();
+        this.outputStream = null;
+      } catch (Exception ex) {
+        ex.printStackTrace();
+        LOGGER.warn("There was an error adding the temporaryFile to S3");
+      } finally {
+        // we are done here.
+        this.isClosed = true;
+      }
     }
+  }
 
-    public void write(int b) throws IOException {
-        this.outputStream.write(b);
-    }
+  private void addFile() throws Exception {
 
-    public void write(byte[] b) throws IOException {
-        this.outputStream.write(b);
-    }
+    InputStream is = new ByteArrayInputStream(this.outputStream.toByteArray());
+    int contentLength = outputStream.size();
 
-    public void write(byte[] b, int off, int len) throws IOException {
-        this.outputStream.write(b, off, len);
-    }
+    TransferManager transferManager = new TransferManager(amazonS3Client);
+    ObjectMetadata metadata = new ObjectMetadata();
+    metadata.setExpirationTime(DateTime.now().plusDays(365 * 3).toDate());
+    metadata.setContentLength(contentLength);
 
-    public void flush() throws IOException {
-        this.outputStream.flush();
-    }
+    metadata.addUserMetadata("writer", "org.apache.streams");
 
-    /**
-     * Whenever the output stream is closed we are going to kick the ByteArrayOutputStream off to Amazon S3.
-     * @throws IOException
-     * Exception thrown from the FileOutputStream
-     */
-    public void close() throws IOException {
-        if(!isClosed)
-        {
-            try
-            {
-                this.addFile();
-                this.outputStream.close();
-                this.outputStream = null;
-            }
-            catch(Exception e) {
-                e.printStackTrace();
-                LOGGER.warn("There was an error adding the temporaryFile to S3");
-            }
-            finally {
-                // we are done here.
-                this.isClosed = true;
-            }
-        }
+    for (String s : metaData.keySet()) {
+      metadata.addUserMetadata(s, metaData.get(s));
     }
 
-    private void addFile() throws Exception {
-
-        InputStream is = new ByteArrayInputStream(this.outputStream.toByteArray());
-        int contentLength = outputStream.size();
+    String fileNameToWrite = path + fileName;
+    Upload upload = transferManager.upload(bucketName, fileNameToWrite, is, metadata);
+    try {
+      upload.waitForUploadResult();
 
-        TransferManager transferManager = new TransferManager(amazonS3Client);
-        ObjectMetadata metadata = new ObjectMetadata();
-        metadata.setExpirationTime(DateTime.now().plusDays(365*3).toDate());
-        metadata.setContentLength(contentLength);
-
-        metadata.addUserMetadata("writer", "org.apache.streams");
-
-        for(String s : metaData.keySet())
-            metadata.addUserMetadata(s, metaData.get(s));
-
-        String fileNameToWrite = path + fileName;
-        Upload upload = transferManager.upload(bucketName, fileNameToWrite, is, metadata);
-        try {
-            upload.waitForUploadResult();
-
-            is.close();
-            transferManager.shutdownNow(false);
-            LOGGER.info("S3 File Close[{} kb] - {}", contentLength / 1024, path + fileName);
-        } catch (Exception e) {
-            // No Op
-        }
+      is.close();
+      transferManager.shutdownNow(false);
+      LOGGER.info("S3 File Close[{} kb] - {}", contentLength / 1024, path + fileName);
+    } catch (Exception ignored) {
+      LOGGER.trace("Ignoring", ignored);
+    }
 
 
-    }
+  }
 
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3PersistReader.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3PersistReader.java b/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3PersistReader.java
index 702df71..753b439 100644
--- a/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3PersistReader.java
+++ b/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3PersistReader.java
@@ -15,8 +15,16 @@
  * specific language governing permissions and limitations
  * under the License.
  */
+
 package org.apache.streams.s3;
 
+import org.apache.streams.converter.LineReadWriteUtil;
+import org.apache.streams.core.DatumStatusCountable;
+import org.apache.streams.core.DatumStatusCounter;
+import org.apache.streams.core.StreamsDatum;
+import org.apache.streams.core.StreamsPersistReader;
+import org.apache.streams.core.StreamsResultSet;
+
 import com.amazonaws.ClientConfiguration;
 import com.amazonaws.Protocol;
 import com.amazonaws.auth.AWSCredentials;
@@ -31,12 +39,7 @@ import com.amazonaws.services.s3.model.S3ObjectSummary;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.base.Strings;
 import com.google.common.collect.Queues;
-import org.apache.streams.converter.LineReadWriteUtil;
-import org.apache.streams.core.DatumStatusCountable;
-import org.apache.streams.core.DatumStatusCounter;
-import org.apache.streams.core.StreamsDatum;
-import org.apache.streams.core.StreamsPersistReader;
-import org.apache.streams.core.StreamsResultSet;
+
 import org.joda.time.DateTime;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -50,163 +53,168 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 import java.util.concurrent.LinkedBlockingQueue;
 
+/**
+ * S3PersistReader reads documents from s3.
+ */
 public class S3PersistReader implements StreamsPersistReader, DatumStatusCountable {
 
-    private final static Logger LOGGER = LoggerFactory.getLogger(S3PersistReader.class);
-    public final static String STREAMS_ID = "S3PersistReader";
-    protected final static char DELIMITER = '\t';
-
-    private S3ReaderConfiguration s3ReaderConfiguration;
-    private AmazonS3Client amazonS3Client;
-    private ObjectMapper mapper = new ObjectMapper();
-    protected LineReadWriteUtil lineReaderUtil;
-    private Collection<String> files;
-    private ExecutorService executor;
-    protected volatile Queue<StreamsDatum> persistQueue;
-
-    protected DatumStatusCounter countersTotal = new DatumStatusCounter();
-    protected DatumStatusCounter countersCurrent = new DatumStatusCounter();
-    private Future<?> task;
-
-    public AmazonS3Client getAmazonS3Client() {
-        return this.amazonS3Client;
-    }
-
-    public S3ReaderConfiguration getS3ReaderConfiguration() {
-        return this.s3ReaderConfiguration;
-    }
-
-    public String getBucketName() {
-        return this.s3ReaderConfiguration.getBucket();
-    }
-
-    public StreamsResultSet readNew(BigInteger sequence) {
-        return null;
-    }
-
-    public StreamsResultSet readRange(DateTime start, DateTime end) {
-        return null;
-    }
-
-    @Override
-    public boolean isRunning() {
-        return !task.isDone() && !task.isCancelled();
-    }
-
-    public DatumStatusCounter getDatumStatusCounter() {
-        return countersTotal;
+  private static final Logger LOGGER = LoggerFactory.getLogger(S3PersistReader.class);
+  public static final String STREAMS_ID = "S3PersistReader";
+  protected static final char DELIMITER = '\t';
+
+  private S3ReaderConfiguration s3ReaderConfiguration;
+  private AmazonS3Client amazonS3Client;
+  private ObjectMapper mapper = new ObjectMapper();
+  protected LineReadWriteUtil lineReaderUtil;
+  private Collection<String> files;
+  private ExecutorService executor;
+  protected volatile Queue<StreamsDatum> persistQueue;
+
+  protected DatumStatusCounter countersTotal = new DatumStatusCounter();
+  protected DatumStatusCounter countersCurrent = new DatumStatusCounter();
+  private Future<?> task;
+
+  public AmazonS3Client getAmazonS3Client() {
+    return this.amazonS3Client;
+  }
+
+  public S3ReaderConfiguration getS3ReaderConfiguration() {
+    return this.s3ReaderConfiguration;
+  }
+
+  public String getBucketName() {
+    return this.s3ReaderConfiguration.getBucket();
+  }
+
+  public StreamsResultSet readNew(BigInteger sequence) {
+    return null;
+  }
+
+  public StreamsResultSet readRange(DateTime start, DateTime end) {
+    return null;
+  }
+
+  @Override
+  public boolean isRunning() {
+    return !task.isDone() && !task.isCancelled();
+  }
+
+  public DatumStatusCounter getDatumStatusCounter() {
+    return countersTotal;
+  }
+
+  public Collection<String> getFiles() {
+    return this.files;
+  }
+
+  public S3PersistReader(S3ReaderConfiguration s3ReaderConfiguration) {
+    this.s3ReaderConfiguration = s3ReaderConfiguration;
+  }
+
+  @Override
+  public String getId() {
+    return STREAMS_ID;
+  }
+
+  @Override
+  public void prepare(Object configurationObject) {
+
+    lineReaderUtil = LineReadWriteUtil.getInstance(s3ReaderConfiguration);
+    // Connect to S3
+    synchronized (this) {
+      // Create the credentials Object
+      AWSCredentials credentials = new BasicAWSCredentials(s3ReaderConfiguration.getKey(), s3ReaderConfiguration.getSecretKey());
+
+      ClientConfiguration clientConfig = new ClientConfiguration();
+      clientConfig.setProtocol(Protocol.valueOf(s3ReaderConfiguration.getProtocol().toString()));
+
+      // We do not want path style access
+      S3ClientOptions clientOptions = new S3ClientOptions();
+      clientOptions.setPathStyleAccess(false);
+
+      this.amazonS3Client = new AmazonS3Client(credentials, clientConfig);
+      if ( !Strings.isNullOrEmpty(s3ReaderConfiguration.getRegion())) {
+        this.amazonS3Client.setRegion(Region.getRegion(Regions.fromName(s3ReaderConfiguration.getRegion())));
+      }
+      this.amazonS3Client.setS3ClientOptions(clientOptions);
     }
 
-    public Collection<String> getFiles() {
-        return this.files;
-    }
+    final ListObjectsRequest request = new ListObjectsRequest()
+        .withBucketName(this.s3ReaderConfiguration.getBucket())
+        .withPrefix(s3ReaderConfiguration.getReaderPath())
+        .withMaxKeys(500);
+
+
+    ObjectListing listing = this.amazonS3Client.listObjects(request);
+
+    this.files = new ArrayList<String>();
+
+    /**
+     * If you can list files that are in this path, then you must be dealing with a directory
+     * if you cannot list files that are in this path, then you are most likely dealing with
+     * a simple file.
+     */
+    boolean hasCommonPrefixes = listing.getCommonPrefixes().size() > 0 ? true : false;
+    boolean hasObjectSummaries = listing.getObjectSummaries().size() > 0 ? true : false;
+
+    if (hasCommonPrefixes || hasObjectSummaries) {
+      // Handle the 'directory' use case
+      do {
+        if (hasCommonPrefixes) {
+          for (String file : listing.getCommonPrefixes()) {
+            this.files.add(file);
+          }
+        } else {
+          for (final S3ObjectSummary objectSummary : listing.getObjectSummaries()) {
+            this.files.add(objectSummary.getKey());
+          }
+        }
 
-    public S3PersistReader(S3ReaderConfiguration s3ReaderConfiguration) {
-        this.s3ReaderConfiguration = s3ReaderConfiguration;
+        // get the next batch.
+        listing = this.amazonS3Client.listNextBatchOfObjects(listing);
+      }
+      while (listing.isTruncated());
+    } else {
+      // handle the single file use-case
+      this.files.add(s3ReaderConfiguration.getReaderPath());
     }
 
-    @Override
-    public String getId() {
-        return STREAMS_ID;
+    if (this.files.size() <= 0) {
+      LOGGER.error("There are no files to read");
     }
 
-    public void prepare(Object configurationObject) {
-
-        lineReaderUtil = LineReadWriteUtil.getInstance(s3ReaderConfiguration);
-        // Connect to S3
-        synchronized (this)
-        {
-            // Create the credentials Object
-            AWSCredentials credentials = new BasicAWSCredentials(s3ReaderConfiguration.getKey(), s3ReaderConfiguration.getSecretKey());
-
-            ClientConfiguration clientConfig = new ClientConfiguration();
-            clientConfig.setProtocol(Protocol.valueOf(s3ReaderConfiguration.getProtocol().toString()));
-
-            // We do not want path style access
-            S3ClientOptions clientOptions = new S3ClientOptions();
-            clientOptions.setPathStyleAccess(false);
+    this.persistQueue = Queues.synchronizedQueue(new LinkedBlockingQueue<StreamsDatum>(10000));
+    this.executor = Executors.newSingleThreadExecutor();
+  }
 
-            this.amazonS3Client = new AmazonS3Client(credentials, clientConfig);
-            if( !Strings.isNullOrEmpty(s3ReaderConfiguration.getRegion()))
-                this.amazonS3Client.setRegion(Region.getRegion(Regions.fromName(s3ReaderConfiguration.getRegion())));
-            this.amazonS3Client.setS3ClientOptions(clientOptions);
-        }
-
-        final ListObjectsRequest request = new ListObjectsRequest()
-                .withBucketName(this.s3ReaderConfiguration.getBucket())
-                .withPrefix(s3ReaderConfiguration.getReaderPath())
-                .withMaxKeys(500);
-
-
-        ObjectListing listing = this.amazonS3Client.listObjects(request);
-
-        this.files = new ArrayList<String>();
-
-        /**
-         * If you can list files that are in this path, then you must be dealing with a directory
-         * if you cannot list files that are in this path, then you are most likely dealing with
-         * a simple file.
-         */
-        boolean hasCommonPrefixes = listing.getCommonPrefixes().size() > 0 ? true : false;
-        boolean hasObjectSummaries = listing.getObjectSummaries().size() > 0 ? true : false;
-
-        if(hasCommonPrefixes || hasObjectSummaries) {
-            // Handle the 'directory' use case
-            do
-            {
-                if(hasCommonPrefixes) {
-                    for (String file : listing.getCommonPrefixes()) {
-                        this.files.add(file);
-                    }
-                } else {
-                    for(final S3ObjectSummary objectSummary : listing.getObjectSummaries()) {
-                        this.files.add(objectSummary.getKey());
-                    }
-                }
-
-                // get the next batch.
-                listing = this.amazonS3Client.listNextBatchOfObjects(listing);
-            } while (listing.isTruncated());
-        }
-        else {
-            // handle the single file use-case
-            this.files.add(s3ReaderConfiguration.getReaderPath());
-        }
-
-        if(this.files.size() <= 0)
-            LOGGER.error("There are no files to read");
-
-        this.persistQueue = Queues.synchronizedQueue(new LinkedBlockingQueue<StreamsDatum>(10000));
-        this.executor = Executors.newSingleThreadExecutor();
-    }
-
-    public void cleanUp() {
-        // no Op
-    }
+  public void cleanUp() {
+    // no Op
+  }
 
-    public StreamsResultSet readAll() {
-        startStream();
-        return new StreamsResultSet(persistQueue);
-    }
+  public StreamsResultSet readAll() {
+    startStream();
+    return new StreamsResultSet(persistQueue);
+  }
 
-    public void startStream() {
-        LOGGER.debug("startStream");
-        task = executor.submit(new S3PersistReaderTask(this));
-    }
+  public void startStream() {
+    LOGGER.debug("startStream");
+    task = executor.submit(new S3PersistReaderTask(this));
+  }
 
-    public StreamsResultSet readCurrent() {
+  @Override
+  public StreamsResultSet readCurrent() {
 
-        StreamsResultSet current;
+    StreamsResultSet current;
 
-        synchronized( S3PersistReader.class ) {
-            current = new StreamsResultSet(Queues.newConcurrentLinkedQueue(persistQueue));
-            current.setCounter(new DatumStatusCounter());
-            current.getCounter().add(countersCurrent);
-            countersTotal.add(countersCurrent);
-            countersCurrent = new DatumStatusCounter();
-            persistQueue.clear();
-        }
-        return current;
+    synchronized ( S3PersistReader.class ) {
+      current = new StreamsResultSet(Queues.newConcurrentLinkedQueue(persistQueue));
+      current.setCounter(new DatumStatusCounter());
+      current.getCounter().add(countersCurrent);
+      countersTotal.add(countersCurrent);
+      countersCurrent = new DatumStatusCounter();
+      persistQueue.clear();
     }
+    return current;
+  }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3PersistReaderTask.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3PersistReaderTask.java b/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3PersistReaderTask.java
index f2f5567..f0e9626 100644
--- a/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3PersistReaderTask.java
+++ b/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3PersistReaderTask.java
@@ -15,12 +15,15 @@
  * specific language governing permissions and limitations
  * under the License.
  */
+
 package org.apache.streams.s3;
 
-import com.google.common.base.Strings;
 import org.apache.streams.core.DatumStatus;
 import org.apache.streams.core.StreamsDatum;
 import org.apache.streams.util.ComponentUtils;
+
+import com.google.common.base.Strings;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -28,57 +31,61 @@ import java.io.BufferedReader;
 import java.io.Closeable;
 import java.io.InputStreamReader;
 
+/**
+ * S3PersistReaderTask reads documents from s3 on behalf of
+ * @see org.apache.streams.s3.S3PersistReader
+ */
 public class S3PersistReaderTask implements Runnable {
 
-    private static final Logger LOGGER = LoggerFactory.getLogger(S3PersistReaderTask.class);
+  private static final Logger LOGGER = LoggerFactory.getLogger(S3PersistReaderTask.class);
 
-    private S3PersistReader reader;
+  private S3PersistReader reader;
 
-    public S3PersistReaderTask(S3PersistReader reader) {
-        this.reader = reader;
-    }
+  public S3PersistReaderTask(S3PersistReader reader) {
+    this.reader = reader;
+  }
 
-    @Override
-    public void run() {
-
-        for(String file : reader.getFiles()) {
-
-            // Create our buffered reader
-            S3ObjectInputStreamWrapper is = new S3ObjectInputStreamWrapper(reader.getAmazonS3Client().getObject(reader.getBucketName(), file));
-            BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(is));
-            LOGGER.info("Reading: {} ", file);
-
-            String line = "";
-            try {
-                while((line = bufferedReader.readLine()) != null) {
-                    if( !Strings.isNullOrEmpty(line) ) {
-                        reader.countersCurrent.incrementAttempt();
-                        StreamsDatum entry = reader.lineReaderUtil.processLine(line);
-                        ComponentUtils.offerUntilSuccess(entry, reader.persistQueue);
-                        reader.countersCurrent.incrementStatus(DatumStatus.SUCCESS);
-                    }
-                }
-            } catch (Exception e) {
-                e.printStackTrace();
-                LOGGER.warn(e.getMessage());
-                reader.countersCurrent.incrementStatus(DatumStatus.FAIL);
-            }
-
-            LOGGER.info("Completed:  " + file);
-
-            try {
-                closeSafely(file, is);
-            } catch (Exception e) {
-                LOGGER.error(e.getMessage());
-            }
+  @Override
+  public void run() {
+
+    for (String file : reader.getFiles()) {
+
+      // Create our buffered reader
+      S3ObjectInputStreamWrapper is = new S3ObjectInputStreamWrapper(reader.getAmazonS3Client().getObject(reader.getBucketName(), file));
+      BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(is));
+      LOGGER.info("Reading: {} ", file);
+
+      String line = "";
+      try {
+        while ((line = bufferedReader.readLine()) != null) {
+          if ( !Strings.isNullOrEmpty(line) ) {
+            reader.countersCurrent.incrementAttempt();
+            StreamsDatum entry = reader.lineReaderUtil.processLine(line);
+            ComponentUtils.offerUntilSuccess(entry, reader.persistQueue);
+            reader.countersCurrent.incrementStatus(DatumStatus.SUCCESS);
+          }
         }
+      } catch (Exception ex) {
+        ex.printStackTrace();
+        LOGGER.warn(ex.getMessage());
+        reader.countersCurrent.incrementStatus(DatumStatus.FAIL);
+      }
+
+      LOGGER.info("Completed:  " + file);
+
+      try {
+        closeSafely(file, is);
+      } catch (Exception ex) {
+        LOGGER.error(ex.getMessage());
+      }
     }
+  }
 
-    private static void closeSafely(String file, Closeable closeable) {
-        try {
-            closeable.close();
-        } catch(Exception e) {
-            LOGGER.error("There was an issue closing file: {}", file);
-        }
+  private static void closeSafely(String file, Closeable closeable) {
+    try {
+      closeable.close();
+    } catch (Exception ex) {
+      LOGGER.error("There was an issue closing file: {}", file);
     }
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3PersistWriter.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3PersistWriter.java b/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3PersistWriter.java
index 3686f55..ef6e831 100644
--- a/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3PersistWriter.java
+++ b/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3PersistWriter.java
@@ -15,8 +15,19 @@
  * specific language governing permissions and limitations
  * under the License.
  */
+
 package org.apache.streams.s3;
 
+import org.apache.streams.config.ComponentConfigurator;
+import org.apache.streams.config.StreamsConfigurator;
+import org.apache.streams.converter.LineReadWriteUtil;
+import org.apache.streams.core.DatumStatus;
+import org.apache.streams.core.DatumStatusCountable;
+import org.apache.streams.core.DatumStatusCounter;
+import org.apache.streams.core.StreamsDatum;
+import org.apache.streams.core.StreamsPersistWriter;
+import org.apache.streams.jackson.StreamsJacksonMapper;
+
 import com.amazonaws.ClientConfiguration;
 import com.amazonaws.Protocol;
 import com.amazonaws.auth.AWSCredentials;
@@ -28,15 +39,7 @@ import com.amazonaws.services.s3.S3ClientOptions;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.base.Preconditions;
 import com.google.common.base.Strings;
-import org.apache.streams.config.ComponentConfigurator;
-import org.apache.streams.config.StreamsConfigurator;
-import org.apache.streams.converter.LineReadWriteUtil;
-import org.apache.streams.core.DatumStatus;
-import org.apache.streams.core.DatumStatusCountable;
-import org.apache.streams.core.DatumStatusCounter;
-import org.apache.streams.core.StreamsDatum;
-import org.apache.streams.core.StreamsPersistWriter;
-import org.apache.streams.jackson.StreamsJacksonMapper;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -53,239 +56,256 @@ import java.util.Map;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 
-public class S3PersistWriter implements StreamsPersistWriter, DatumStatusCountable
-{
-    public final static String STREAMS_ID = "S3PersistWriter";
-
-    private final static Logger LOGGER = LoggerFactory.getLogger(S3PersistWriter.class);
+/**
+ * S3PersistWriter writes documents to s3.
+ */
+public class S3PersistWriter implements StreamsPersistWriter, DatumStatusCountable {
 
-    private final static char DELIMITER = '\t';
+  public static final String STREAMS_ID = "S3PersistWriter";
 
-    private ObjectMapper objectMapper;
-    private AmazonS3Client amazonS3Client;
-    private S3WriterConfiguration s3WriterConfiguration;
-    private final List<String> writtenFiles = new ArrayList<String>();
-    protected LineReadWriteUtil lineWriterUtil;
+  private static final Logger LOGGER = LoggerFactory.getLogger(S3PersistWriter.class);
 
-    private final AtomicLong totalBytesWritten = new AtomicLong();
-    private AtomicLong bytesWrittenThisFile = new AtomicLong();
+  private static final char DELIMITER = '\t';
 
-    private final AtomicInteger totalRecordsWritten = new AtomicInteger();
-    private AtomicInteger fileLineCounter = new AtomicInteger();
+  private ObjectMapper objectMapper;
+  private AmazonS3Client amazonS3Client;
+  private S3WriterConfiguration s3WriterConfiguration;
+  private final List<String> writtenFiles = new ArrayList<String>();
+  protected LineReadWriteUtil lineWriterUtil;
 
-    private Map<String, String> objectMetaData = new HashMap<String, String>() {{
-        put("line[0]", "id");
-        put("line[1]", "timeStamp");
-        put("line[2]", "metaData");
-        put("line[3]", "document");
-    }};
+  private final AtomicLong totalBytesWritten = new AtomicLong();
+  private AtomicLong bytesWrittenThisFile = new AtomicLong();
 
-    private OutputStreamWriter currentWriter = null;
+  private final AtomicInteger totalRecordsWritten = new AtomicInteger();
+  private AtomicInteger fileLineCounter = new AtomicInteger();
 
-    public AmazonS3Client getAmazonS3Client() {
-        return this.amazonS3Client;
-    }
+  private static Map<String, String> objectMetaData = new HashMap<String, String>();
 
-    public S3WriterConfiguration getS3WriterConfiguration() {
-        return this.s3WriterConfiguration;
-    }
+  static {
+    objectMetaData.put("line[0]", "id");
+    objectMetaData.put("line[1]", "timeStamp");
+    objectMetaData.put("line[2]", "metaData");
+    objectMetaData.put("line[3]", "document");
+  }
 
-    public List<String> getWrittenFiles() {
-        return this.writtenFiles;
-    }
+  private OutputStreamWriter currentWriter = null;
 
-    public Map<String, String> getObjectMetaData() {
-        return this.objectMetaData;
-    }
+  public AmazonS3Client getAmazonS3Client() {
+    return this.amazonS3Client;
+  }
 
-    public ObjectMapper getObjectMapper() {
-        return this.objectMapper;
-    }
+  public S3WriterConfiguration getS3WriterConfiguration() {
+    return this.s3WriterConfiguration;
+  }
 
-    public void setObjectMapper(ObjectMapper mapper) {
-        this.objectMapper = mapper;
-    }
+  public List<String> getWrittenFiles() {
+    return this.writtenFiles;
+  }
 
-    public void setObjectMetaData(Map<String, String> val) {
-        this.objectMetaData = val;
-    }
+  public Map<String, String> getObjectMetaData() {
+    return this.objectMetaData;
+  }
 
-    public S3PersistWriter() {
-        this(new ComponentConfigurator<>(S3WriterConfiguration.class).detectConfiguration(StreamsConfigurator.getConfig().getConfig("s3")));
-    }
+  public ObjectMapper getObjectMapper() {
+    return this.objectMapper;
+  }
 
-    public S3PersistWriter(S3WriterConfiguration s3WriterConfiguration) {
-        this.s3WriterConfiguration = s3WriterConfiguration;
-    }
+  public void setObjectMapper(ObjectMapper mapper) {
+    this.objectMapper = mapper;
+  }
 
-    /**
-     * Instantiator with a pre-existing amazonS3Client, this is used to help with re-use.
-     * @param amazonS3Client
-     * If you have an existing amazonS3Client, it wont' bother to create another one
-     * @param s3WriterConfiguration
-     * Configuration of the write paths and instructions are still required.
-     */
-    public S3PersistWriter(AmazonS3Client amazonS3Client, S3WriterConfiguration s3WriterConfiguration) {
-        this.amazonS3Client = amazonS3Client;
-        this.s3WriterConfiguration = s3WriterConfiguration;
-    }
+  public void setObjectMetaData(Map<String, String> val) {
+    this.objectMetaData = val;
+  }
 
-    @Override
-    public String getId() {
-        return STREAMS_ID;
-    }
+  public S3PersistWriter() {
+    this(new ComponentConfigurator<>(S3WriterConfiguration.class).detectConfiguration(StreamsConfigurator.getConfig().getConfig("s3")));
+  }
 
-    @Override
-    public void write(StreamsDatum streamsDatum) {
-
-        synchronized (this) {
-            // Check to see if we need to reset the file that we are currently working with
-            if (this.currentWriter == null || ( this.bytesWrittenThisFile.get()  >= (this.s3WriterConfiguration.getMaxFileSize() * 1024 * 1024))) {
-                try {
-                    LOGGER.info("Resetting the file");
-                    this.currentWriter = resetFile();
-                } catch (Exception e) {
-                    e.printStackTrace();
-                }
-            }
-
-            String line = lineWriterUtil.convertResultToString(streamsDatum);
-
-            try {
-                this.currentWriter.write(line);
-            } catch (IOException e) {
-                e.printStackTrace();
-            }
-
-            // add the bytes we've written
-            int recordSize = line.getBytes().length;
-            this.totalBytesWritten.addAndGet(recordSize);
-            this.bytesWrittenThisFile.addAndGet(recordSize);
-
-            // increment the record count
-            this.totalRecordsWritten.incrementAndGet();
-            this.fileLineCounter.incrementAndGet();
-        }
+  public S3PersistWriter(S3WriterConfiguration s3WriterConfiguration) {
+    this.s3WriterConfiguration = s3WriterConfiguration;
+  }
 
-    }
+  /**
+   * Instantiator with a pre-existing amazonS3Client, this is used to help with re-use.
+   * @param amazonS3Client
+   * If you have an existing amazonS3Client, it wont' bother to create another one
+   * @param s3WriterConfiguration
+   * Configuration of the write paths and instructions are still required.
+   */
+  public S3PersistWriter(AmazonS3Client amazonS3Client, S3WriterConfiguration s3WriterConfiguration) {
+    this.amazonS3Client = amazonS3Client;
+    this.s3WriterConfiguration = s3WriterConfiguration;
+  }
 
-    public synchronized OutputStreamWriter resetFile() throws Exception {
-        // this will keep it thread safe, so we don't create too many files
-        if(this.fileLineCounter.get() == 0 && this.currentWriter != null)
-            return this.currentWriter;
+  @Override
+  public String getId() {
+    return STREAMS_ID;
+  }
 
-        closeAndDestroyWriter();
+  @Override
+  public void write(StreamsDatum streamsDatum) {
 
-        // Create the path for where the file is going to live.
+    synchronized (this) {
+      // Check to see if we need to reset the file that we are currently working with
+      if (this.currentWriter == null || ( this.bytesWrittenThisFile.get()  >= (this.s3WriterConfiguration.getMaxFileSize() * 1024 * 1024))) {
         try {
-            // generate a file name
-            String fileName = this.s3WriterConfiguration.getWriterFilePrefix() +
-                    (this.s3WriterConfiguration.getChunk() ? "/" : "-") + new Date().getTime() + ".tsv";
-
-            // create the output stream
-            OutputStream outputStream = new S3OutputStreamWrapper(this.amazonS3Client,
-                    this.s3WriterConfiguration.getBucket(),
-                    this.s3WriterConfiguration.getWriterPath(),
-                    fileName,
-                    this.objectMetaData);
-
-            // reset the counter
-            this.fileLineCounter = new AtomicInteger();
-            this.bytesWrittenThisFile = new AtomicLong();
-
-            // add this to the list of written files
-            writtenFiles.add(this.s3WriterConfiguration.getWriterPath() + fileName);
-
-            // Log that we are creating this file
-            LOGGER.info("File Created: Bucket[{}] - {}", this.s3WriterConfiguration.getBucket(), this.s3WriterConfiguration.getWriterPath() + fileName);
-
-            // return the output stream
-            return new OutputStreamWriter(outputStream);
-        } catch (Exception e) {
-            LOGGER.error(e.getMessage());
-            throw e;
+          LOGGER.info("Resetting the file");
+          this.currentWriter = resetFile();
+        } catch (Exception ex) {
+          ex.printStackTrace();
         }
-    }
+      }
 
-    private synchronized void closeAndDestroyWriter() {
-        // if there is a current writer, we must close it first.
-        if (this.currentWriter != null) {
-            this.safeFlush(this.currentWriter);
-            this.closeSafely(this.currentWriter);
-            this.currentWriter = null;
+      String line = lineWriterUtil.convertResultToString(streamsDatum);
 
-            // Logging of information to alert the user to the activities of this class
-            LOGGER.debug("File Closed: Records[{}] Bytes[{}] {} ", this.fileLineCounter.get(), this.bytesWrittenThisFile.get(), this.writtenFiles.get(this.writtenFiles.size()-1));
-        }
+      try {
+        this.currentWriter.write(line);
+      } catch (IOException ex) {
+        ex.printStackTrace();
+      }
+
+      // add the bytes we've written
+      int recordSize = line.getBytes().length;
+      this.totalBytesWritten.addAndGet(recordSize);
+      this.bytesWrittenThisFile.addAndGet(recordSize);
+
+      // increment the record count
+      this.totalRecordsWritten.incrementAndGet();
+      this.fileLineCounter.incrementAndGet();
     }
 
-    private synchronized void closeSafely(Writer writer)  {
-        if(writer != null) {
-            try {
-                writer.flush();
-                writer.close();
-            } catch(Exception e) {
-                // noOp
-            }
-            LOGGER.debug("File Closed");
-        }
+  }
+
+  /**
+   * Reset File when it's time to create a new file.
+   * @return OutputStreamWriter
+   * @throws Exception Exception
+   */
+  public synchronized OutputStreamWriter resetFile() throws Exception {
+    // this will keep it thread safe, so we don't create too many files
+    if (this.fileLineCounter.get() == 0 && this.currentWriter != null) {
+      return this.currentWriter;
     }
 
-    private void safeFlush(Flushable flushable) {
-        // This is wrapped with a ByteArrayOutputStream, so this is really safe.
-        if(flushable != null) {
-            try {
-                flushable.flush();
-            } catch(IOException e) {
-                // noOp
-            }
-        }
+    closeAndDestroyWriter();
+
+    // Create the path for where the file is going to live.
+    try {
+      // generate a file name
+      String fileName = this.s3WriterConfiguration.getWriterFilePrefix()
+          + (this.s3WriterConfiguration.getChunk() ? "/" : "-")
+          + new Date().getTime()
+          + ".tsv";
+
+      // create the output stream
+      OutputStream outputStream = new S3OutputStreamWrapper(this.amazonS3Client,
+          this.s3WriterConfiguration.getBucket(),
+          this.s3WriterConfiguration.getWriterPath(),
+          fileName,
+          this.objectMetaData);
+
+      // reset the counter
+      this.fileLineCounter = new AtomicInteger();
+      this.bytesWrittenThisFile = new AtomicLong();
+
+      // add this to the list of written files
+      writtenFiles.add(this.s3WriterConfiguration.getWriterPath() + fileName);
+
+      // Log that we are creating this file
+      LOGGER.info("File Created: Bucket[{}] - {}", this.s3WriterConfiguration.getBucket(), this.s3WriterConfiguration.getWriterPath() + fileName);
+
+      // return the output stream
+      return new OutputStreamWriter(outputStream);
+    } catch (Exception ex) {
+      LOGGER.error(ex.getMessage());
+      throw ex;
     }
+  }
+
+  private synchronized void closeAndDestroyWriter() {
+    // if there is a current writer, we must close it first.
+    if (this.currentWriter != null) {
+      this.safeFlush(this.currentWriter);
+      this.closeSafely(this.currentWriter);
+      this.currentWriter = null;
 
-    public void prepare(Object configurationObject) {
+      // Logging of information to alert the user to the activities of this class
+      LOGGER.debug("File Closed: Records[{}] Bytes[{}] {} ", this.fileLineCounter.get(), this.bytesWrittenThisFile.get(), this.writtenFiles.get(this.writtenFiles.size() - 1));
+    }
+  }
+
+  private synchronized void closeSafely(Writer writer)  {
+    if (writer != null) {
+      try {
+        writer.flush();
+        writer.close();
+      } catch (Exception ex) {
+        LOGGER.trace("closeSafely", ex);
+      }
+      LOGGER.debug("File Closed");
+    }
+  }
+
+  private void safeFlush(Flushable flushable) {
+    // This is wrapped with a ByteArrayOutputStream, so this is really safe.
+    if (flushable != null) {
+      try {
+        flushable.flush();
+      } catch (IOException ex) {
+        LOGGER.trace("safeFlush", ex);
+      }
+    }
+  }
 
-        lineWriterUtil = LineReadWriteUtil.getInstance(s3WriterConfiguration);
+  @Override
+  public void prepare(Object configurationObject) {
 
-        // Connect to S3
-        synchronized (this) {
+    lineWriterUtil = LineReadWriteUtil.getInstance(s3WriterConfiguration);
 
-            try {
-                // if the user has chosen to not set the object mapper, then set a default object mapper for them.
-                if (this.objectMapper == null)
-                    this.objectMapper = StreamsJacksonMapper.getInstance();
+    // Connect to S3
+    synchronized (this) {
 
-                // Create the credentials Object
-                if (this.amazonS3Client == null) {
-                    AWSCredentials credentials = new BasicAWSCredentials(s3WriterConfiguration.getKey(), s3WriterConfiguration.getSecretKey());
+      try {
+        // if the user has chosen to not set the object mapper, then set a default object mapper for them.
+        if (this.objectMapper == null) {
+          this.objectMapper = StreamsJacksonMapper.getInstance();
+        }
 
-                    ClientConfiguration clientConfig = new ClientConfiguration();
-                    clientConfig.setProtocol(Protocol.valueOf(s3WriterConfiguration.getProtocol().toString()));
+        // Create the credentials Object
+        if (this.amazonS3Client == null) {
+          AWSCredentials credentials = new BasicAWSCredentials(s3WriterConfiguration.getKey(), s3WriterConfiguration.getSecretKey());
 
-                    // We do not want path style access
-                    S3ClientOptions clientOptions = new S3ClientOptions();
-                    clientOptions.setPathStyleAccess(false);
+          ClientConfiguration clientConfig = new ClientConfiguration();
+          clientConfig.setProtocol(Protocol.valueOf(s3WriterConfiguration.getProtocol().toString()));
 
-                    this.amazonS3Client = new AmazonS3Client(credentials, clientConfig);
-                    if (!Strings.isNullOrEmpty(s3WriterConfiguration.getRegion()))
-                        this.amazonS3Client.setRegion(Region.getRegion(Regions.fromName(s3WriterConfiguration.getRegion())));
-                    this.amazonS3Client.setS3ClientOptions(clientOptions);
-                }
-            } catch (Exception e) {
-                LOGGER.error("Exception while preparing the S3 client: {}", e);
-            }
+          // We do not want path style access
+          S3ClientOptions clientOptions = new S3ClientOptions();
+          clientOptions.setPathStyleAccess(false);
 
-            Preconditions.checkArgument(this.amazonS3Client != null);
+          this.amazonS3Client = new AmazonS3Client(credentials, clientConfig);
+          if (!Strings.isNullOrEmpty(s3WriterConfiguration.getRegion())) {
+            this.amazonS3Client.setRegion(Region.getRegion(Regions.fromName(s3WriterConfiguration.getRegion())));
+          }
+          this.amazonS3Client.setS3ClientOptions(clientOptions);
         }
-    }
-
-    public void cleanUp() {
-        closeAndDestroyWriter();
-    }
+      } catch (Exception ex) {
+        LOGGER.error("Exception while preparing the S3 client: {}", ex);
+      }
 
-    public DatumStatusCounter getDatumStatusCounter() {
-        DatumStatusCounter counters = new DatumStatusCounter();
-        counters.incrementAttempt(this.totalRecordsWritten.get());
-        counters.incrementStatus(DatumStatus.SUCCESS, this.totalRecordsWritten.get());
-        return counters;
+      Preconditions.checkArgument(this.amazonS3Client != null);
     }
+  }
+
+  public void cleanUp() {
+    closeAndDestroyWriter();
+  }
+
+  @Override
+  public DatumStatusCounter getDatumStatusCounter() {
+    DatumStatusCounter counters = new DatumStatusCounter();
+    counters.incrementAttempt(this.totalRecordsWritten.get());
+    counters.incrementStatus(DatumStatus.SUCCESS, this.totalRecordsWritten.get());
+    return counters;
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-contrib/streams-persist-console/src/main/java/org/apache/streams/console/ConsolePersistReader.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-console/src/main/java/org/apache/streams/console/ConsolePersistReader.java b/streams-contrib/streams-persist-console/src/main/java/org/apache/streams/console/ConsolePersistReader.java
index 8793333..43d9e34 100644
--- a/streams-contrib/streams-persist-console/src/main/java/org/apache/streams/console/ConsolePersistReader.java
+++ b/streams-contrib/streams-persist-console/src/main/java/org/apache/streams/console/ConsolePersistReader.java
@@ -18,102 +18,100 @@
 
 package org.apache.streams.console;
 
-import com.fasterxml.jackson.core.JsonProcessingException;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.google.common.base.Preconditions;
 import org.apache.streams.core.StreamsDatum;
 import org.apache.streams.core.StreamsPersistReader;
-import org.apache.streams.core.StreamsPersistWriter;
 import org.apache.streams.core.StreamsResultSet;
+
 import org.joda.time.DateTime;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.BufferedInputStream;
 import java.io.InputStream;
-import java.io.PrintStream;
 import java.math.BigInteger;
 import java.util.Queue;
 import java.util.Scanner;
 import java.util.concurrent.ConcurrentLinkedQueue;
 
+/**
+ * ConsolePersistReader reads documents from stdin.
+ */
 public class ConsolePersistReader implements StreamsPersistReader {
 
-    private final static String STREAMS_ID = "ConsolePersistReader";
+  private static final String STREAMS_ID = "ConsolePersistReader";
 
-    private static final Logger LOGGER = LoggerFactory.getLogger(ConsolePersistReader.class);
+  private static final Logger LOGGER = LoggerFactory.getLogger(ConsolePersistReader.class);
 
-    protected volatile Queue<StreamsDatum> persistQueue;
+  protected volatile Queue<StreamsDatum> persistQueue;
 
-    protected InputStream inputStream = System.in;
+  protected InputStream inputStream = System.in;
 
-    public ConsolePersistReader() {
-        this.persistQueue = new ConcurrentLinkedQueue<StreamsDatum>();
-    }
+  public ConsolePersistReader() {
+    this.persistQueue = new ConcurrentLinkedQueue<StreamsDatum>();
+  }
 
-    public ConsolePersistReader(InputStream inputStream) {
-        this();
-        this.inputStream = inputStream;
-    }
+  public ConsolePersistReader(InputStream inputStream) {
+    this();
+    this.inputStream = inputStream;
+  }
 
-    @Override
-    public String getId() {
-        return STREAMS_ID;
-    }
+  @Override
+  public String getId() {
+    return STREAMS_ID;
+  }
 
-    public void prepare(Object o) {
+  public void prepare(Object configuration) {
 
-    }
+  }
 
-    public void cleanUp() {
+  public void cleanUp() {
 
-    }
+  }
 
-    @Override
-    public void startStream() {
-        // no op
-    }
+  @Override
+  public void startStream() {
+    // no op
+  }
 
-    @Override
-    public StreamsResultSet readAll() {
-        return readCurrent();
-    }
+  @Override
+  public StreamsResultSet readAll() {
+    return readCurrent();
+  }
 
-    @Override
-    public StreamsResultSet readCurrent() {
+  @Override
+  public StreamsResultSet readCurrent() {
 
-        LOGGER.info("{} readCurrent", STREAMS_ID);
+    LOGGER.info("{} readCurrent", STREAMS_ID);
 
-        Scanner sc = new Scanner(inputStream);
+    Scanner sc = new Scanner(inputStream);
 
-        while( sc.hasNextLine() ) {
+    while ( sc.hasNextLine() ) {
 
-            persistQueue.offer(new StreamsDatum(sc.nextLine()));
+      persistQueue.offer(new StreamsDatum(sc.nextLine()));
 
-        }
+    }
 
-        LOGGER.info("Providing {} docs", persistQueue.size());
+    LOGGER.info("Providing {} docs", persistQueue.size());
 
-        StreamsResultSet result =  new StreamsResultSet(persistQueue);
+    StreamsResultSet result =  new StreamsResultSet(persistQueue);
 
-        LOGGER.info("{} Exiting", STREAMS_ID);
+    LOGGER.info("{} Exiting", STREAMS_ID);
 
-        return result;
+    return result;
 
-    }
+  }
 
-    @Override
-    public StreamsResultSet readNew(BigInteger sequence) {
-        return readCurrent();
-    }
+  @Override
+  public StreamsResultSet readNew(BigInteger sequence) {
+    return readCurrent();
+  }
 
-    @Override
-    public StreamsResultSet readRange(DateTime start, DateTime end) {
-        return readCurrent();
-    }
+  @Override
+  public StreamsResultSet readRange(DateTime start, DateTime end) {
+    return readCurrent();
+  }
 
-    @Override
-    public boolean isRunning() {
-        return true;  //Will always be running
-    }
+  @Override
+  public boolean isRunning() {
+    return true;  //Will always be running
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-contrib/streams-persist-console/src/main/java/org/apache/streams/console/ConsolePersistWriter.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-console/src/main/java/org/apache/streams/console/ConsolePersistWriter.java b/streams-contrib/streams-persist-console/src/main/java/org/apache/streams/console/ConsolePersistWriter.java
index 6d284ba..6358071 100644
--- a/streams-contrib/streams-persist-console/src/main/java/org/apache/streams/console/ConsolePersistWriter.java
+++ b/streams-contrib/streams-persist-console/src/main/java/org/apache/streams/console/ConsolePersistWriter.java
@@ -18,12 +18,14 @@
 
 package org.apache.streams.console;
 
-import com.fasterxml.jackson.core.JsonProcessingException;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.google.common.base.Preconditions;
 import org.apache.streams.core.StreamsDatum;
 import org.apache.streams.core.StreamsPersistWriter;
 import org.apache.streams.jackson.StreamsJacksonMapper;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.base.Preconditions;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -31,53 +33,56 @@ import java.io.PrintStream;
 import java.util.Queue;
 import java.util.concurrent.ConcurrentLinkedQueue;
 
+/**
+ * ConsolePersistWriter writes documents to stdout.
+ */
 public class ConsolePersistWriter implements StreamsPersistWriter {
 
-    private final static String STREAMS_ID = "ConsolePersistWriter";
-
-    private static final Logger LOGGER = LoggerFactory.getLogger(ConsolePersistWriter.class);
+  private static final String STREAMS_ID = "ConsolePersistWriter";
 
-    protected PrintStream printStream = System.out;
+  private static final Logger LOGGER = LoggerFactory.getLogger(ConsolePersistWriter.class);
 
-    protected volatile Queue<StreamsDatum> persistQueue;
+  protected PrintStream printStream = System.out;
 
-    private ObjectMapper mapper = StreamsJacksonMapper.getInstance();
+  protected volatile Queue<StreamsDatum> persistQueue;
 
-    public ConsolePersistWriter() {
-        this.persistQueue = new ConcurrentLinkedQueue<StreamsDatum>();
-    }
+  private ObjectMapper mapper = StreamsJacksonMapper.getInstance();
 
-    public ConsolePersistWriter(PrintStream printStream) {
-        this();
-        this.printStream = printStream;
-    }
+  public ConsolePersistWriter() {
+    this.persistQueue = new ConcurrentLinkedQueue<StreamsDatum>();
+  }
 
-    @Override
-    public String getId() {
-        return STREAMS_ID;
-    }
+  public ConsolePersistWriter(PrintStream printStream) {
+    this();
+    this.printStream = printStream;
+  }
 
-    public void prepare(Object o) {
-        Preconditions.checkNotNull(persistQueue);
-    }
+  @Override
+  public String getId() {
+    return STREAMS_ID;
+  }
 
-    public void cleanUp() {
+  public void prepare(Object configuration) {
+    Preconditions.checkNotNull(persistQueue);
+  }
 
-    }
+  public void cleanUp() {
 
-    @Override
-    public void write(StreamsDatum entry) {
+  }
 
-        try {
+  @Override
+  public void write(StreamsDatum entry) {
 
-            String text = mapper.writeValueAsString(entry);
+    try {
 
-            printStream.println(text);
+      String text = mapper.writeValueAsString(entry);
 
-        } catch (JsonProcessingException e) {
-            LOGGER.warn("save: {}", e);
-        }
+      printStream.println(text);
 
+    } catch (JsonProcessingException ex) {
+      LOGGER.warn("save: {}", ex);
     }
 
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-contrib/streams-persist-console/src/main/java/org/apache/streams/console/ConsolePersistWriterTask.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-console/src/main/java/org/apache/streams/console/ConsolePersistWriterTask.java b/streams-contrib/streams-persist-console/src/main/java/org/apache/streams/console/ConsolePersistWriterTask.java
index e5009f0..ecb60e3 100644
--- a/streams-contrib/streams-persist-console/src/main/java/org/apache/streams/console/ConsolePersistWriterTask.java
+++ b/streams-contrib/streams-persist-console/src/main/java/org/apache/streams/console/ConsolePersistWriterTask.java
@@ -19,36 +19,43 @@
 package org.apache.streams.console;
 
 import org.apache.streams.core.StreamsDatum;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.Random;
 
+/**
+ * ConsolePersistWriterTask writes documents to stdout on behalf of
+ * @see org.apache.streams.console.ConsolePersistWriter
+ */
 public class ConsolePersistWriterTask implements Runnable {
 
-    private static final Logger LOGGER = LoggerFactory.getLogger(ConsolePersistWriterTask.class);
+  private static final Logger LOGGER = LoggerFactory.getLogger(ConsolePersistWriterTask.class);
 
-    private ConsolePersistWriter writer;
+  private ConsolePersistWriter writer;
 
-    public ConsolePersistWriterTask(ConsolePersistWriter writer) {
-        this.writer = writer;
-    }
+  public ConsolePersistWriterTask(ConsolePersistWriter writer) {
+    this.writer = writer;
+  }
 
-    @Override
-    public void run() {
-        while(true) {
-            if( writer.persistQueue.peek() != null ) {
-                try {
-                    StreamsDatum entry = writer.persistQueue.remove();
-                    writer.write(entry);
-                } catch (Exception e) {
-                    e.printStackTrace();
-                }
-            }
-            try {
-                Thread.sleep(new Random().nextInt(100));
-            } catch (InterruptedException e) {}
+  @Override
+  public void run() {
+    while (true) {
+      if ( writer.persistQueue.peek() != null ) {
+        try {
+          StreamsDatum entry = writer.persistQueue.remove();
+          writer.write(entry);
+        } catch (Exception ex) {
+          ex.printStackTrace();
         }
+      }
+      try {
+        Thread.sleep(new Random().nextInt(100));
+      } catch (InterruptedException interrupt) {
+        LOGGER.trace("Interrupted", interrupt);
+      }
     }
+  }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchClient.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchClient.java b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchClient.java
index 0b2b782..8c7f724 100644
--- a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchClient.java
+++ b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchClient.java
@@ -21,21 +21,24 @@ package org.apache.streams.elasticsearch;
 import org.elasticsearch.Version;
 import org.elasticsearch.client.Client;
 
+/**
+ * Wrapper class for a client with a known version.
+ */
 public class ElasticsearchClient {
 
-    private Client client;
-    private Version version;
+  private Client client;
+  private Version version;
 
-    public ElasticsearchClient(Client client, Version version) {
-        this.client = client;
-        this.version = version;
-    }
+  public ElasticsearchClient(Client client, Version version) {
+    this.client = client;
+    this.version = version;
+  }
 
-    public Client getClient() {
-        return client;
-    }
+  public Client getClient() {
+    return client;
+  }
 
-    public Version getVersion() {
-        return version;
-    }
+  public Version getVersion() {
+    return version;
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchClientManager.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchClientManager.java b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchClientManager.java
index 4809334..bdff9aa 100644
--- a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchClientManager.java
+++ b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchClientManager.java
@@ -19,6 +19,7 @@
 package org.apache.streams.elasticsearch;
 
 import com.google.common.net.InetAddresses;
+
 import org.apache.commons.lang.builder.EqualsBuilder;
 import org.apache.commons.lang.builder.HashCodeBuilder;
 import org.apache.commons.lang.builder.ToStringBuilder;
@@ -41,157 +42,154 @@ import java.util.HashMap;
 import java.util.Map;
 import java.util.concurrent.ExecutionException;
 
+/**
+ * Wrapper class for multiple
+ * @see org.apache.streams.elasticsearch.ElasticsearchClient
+ */
 public class ElasticsearchClientManager {
-    private static final Logger LOGGER = LoggerFactory.getLogger(ElasticsearchClientManager.class);
-    private static Map<String, ElasticsearchClient> ALL_CLIENTS = new HashMap<>();
-
-    private ElasticsearchConfiguration elasticsearchConfiguration;
-
-    public ElasticsearchClientManager(ElasticsearchConfiguration elasticsearchConfiguration) {
-        this.elasticsearchConfiguration = elasticsearchConfiguration;
-    }
 
-    public ElasticsearchConfiguration getElasticsearchConfiguration() {
-        return elasticsearchConfiguration;
+  private static final Logger LOGGER = LoggerFactory.getLogger(ElasticsearchClientManager.class);
+  private static Map<String, ElasticsearchClient> ALL_CLIENTS = new HashMap<>();
+
+  private ElasticsearchConfiguration elasticsearchConfiguration;
+
+  public ElasticsearchClientManager(ElasticsearchConfiguration elasticsearchConfiguration) {
+    this.elasticsearchConfiguration = elasticsearchConfiguration;
+  }
+
+  public ElasticsearchConfiguration getElasticsearchConfiguration() {
+    return elasticsearchConfiguration;
+  }
+
+  /**
+   * Get the Client for this return, it is actually a transport client, but it is much
+   * easier to work with the generic object as this interface likely won't change from
+   * elasticsearch. This method is synchronized to block threads from creating
+   * too many of these at any given time.
+   *
+   * @return Client for elasticsearch
+   */
+  public Client getClient() {
+    checkAndLoadClient(null);
+
+    return ALL_CLIENTS.get(this.elasticsearchConfiguration.getClusterName()).getClient();
+  }
+
+  /**
+   * Returns Client with clusterName.
+   * @param clusterName clusterName
+   */
+  public Client getClient(String clusterName) {
+    checkAndLoadClient(clusterName);
+
+    return ALL_CLIENTS.get(this.elasticsearchConfiguration.getClusterName()).getClient();
+  }
+
+  public boolean isOnOrAfterVersion(Version version) {
+    return ALL_CLIENTS.get(this.elasticsearchConfiguration.getClusterName()).getVersion().onOrAfter(version);
+  }
+
+  public boolean refresh(String index) {
+    return refresh(new String[]{index});
+  }
+
+  public boolean refresh(String[] indexes) {
+    RefreshResponse refreshResponse = this.getClient().admin().indices().prepareRefresh(indexes).execute().actionGet();
+    return refreshResponse.getFailedShards() == 0;
+  }
+
+  /**
+   * Terminate the elasticsearch clients.
+   */
+  public synchronized void stop() {
+    // Check to see if we have a client.
+    if (ALL_CLIENTS.containsKey(this.elasticsearchConfiguration.getClusterName())) {
+      // Close the client
+      ALL_CLIENTS.get(this.elasticsearchConfiguration.getClusterName()).getClient().close();
+
+      // Remove it so that it isn't in memory any more.
+      ALL_CLIENTS.remove(this.elasticsearchConfiguration.getClusterName());
     }
+  }
 
-    /**
-     * ***********************************************************************************
-     * Get the Client for this return, it is actually a transport client, but it is much
-     * easier to work with the generic object as this interface likely won't change from
-     * elasticsearch. This method is synchronized to block threads from creating
-     * too many of these at any given time.
-     *
-     * @return Client for elasticsearch
-     * ***********************************************************************************
-     */
-    public Client getClient() {
-        checkAndLoadClient(null);
-
-        return ALL_CLIENTS.get(this.elasticsearchConfiguration.getClusterName()).getClient();
-    }
+  public ClusterHealthResponse getStatus() throws ExecutionException, InterruptedException {
+    ClusterHealthRequestBuilder request = this.getClient().admin().cluster().prepareHealth();
+    return request.execute().get();
+  }
 
-    public Client getClient(String clusterName) {
-        checkAndLoadClient(clusterName);
+  public String toString() {
+    return ToStringBuilder.reflectionToString(this);
+  }
 
-        return ALL_CLIENTS.get(this.elasticsearchConfiguration.getClusterName()).getClient();
-    }
+  public boolean equals(Object configuration) {
+    return EqualsBuilder.reflectionEquals(this, configuration, Collections.singletonList(this.elasticsearchConfiguration.toString()));
+  }
 
-    public boolean isOnOrAfterVersion(Version version) {
-        return ALL_CLIENTS.get(this.elasticsearchConfiguration.getClusterName()).getVersion().onOrAfter(version);
-    }
+  public int hashCode() {
+    return HashCodeBuilder.reflectionHashCode(this, Collections.singletonList(this.elasticsearchConfiguration.toString()));
+  }
 
-    public void start() throws Exception {
-        /*
-         * Note:
-         * Everything in these classes is being switched to lazy loading. Within
-         * Heroku you only have 60 seconds to connect, and bind to the service,
-         * and you are only allowed to run in 1Gb of memory. Switching all
-         * of this to lazy loading is how we are fixing some of the issues
-         * if you are having issues with these classes, please, refactor
-         * and create a UNIT TEST CASE!!!!!! To ensure that everything is
-         * working before you check it back in.
-         *
-         * Author: Smashew @ 2013-08-26
-         **********************************************************************/
-    }
+  private synchronized void checkAndLoadClient(String clusterName) {
 
-    public boolean refresh(String index) {
-        return refresh(new String[]{index});
+    if (clusterName == null) {
+      clusterName = this.elasticsearchConfiguration.getClusterName();
     }
 
-    public boolean refresh(String[] indexes) {
-        RefreshResponse refreshResponse = this.getClient().admin().indices().prepareRefresh(indexes).execute().actionGet();
-        return refreshResponse.getFailedShards() == 0;
+    // If it is there, exit early
+    if (ALL_CLIENTS.containsKey(clusterName)) {
+      return;
     }
 
-    public synchronized void stop() {
-        // Terminate the elasticsearch cluster
-        // Check to see if we have a client.
-        if (ALL_CLIENTS.containsKey(this.elasticsearchConfiguration.getClusterName())) {
-            // Close the client
-            ALL_CLIENTS.get(this.elasticsearchConfiguration.getClusterName()).getClient().close();
-
-            // Remove it so that it isn't in memory any more.
-            ALL_CLIENTS.remove(this.elasticsearchConfiguration.getClusterName());
+    try {
+      // We are currently using lazy loading to start the elasticsearch cluster, however.
+      LOGGER.info("Creating a new TransportClient: {}", this.elasticsearchConfiguration.getHosts());
+
+      Settings settings = Settings.settingsBuilder()
+          .put("cluster.name", this.elasticsearchConfiguration.getClusterName())
+          .put("client.transport.ping_timeout", "90s")
+          .put("client.transport.nodes_sampler_interval", "60s")
+          .build();
+
+
+      // Create the client
+      TransportClient transportClient = TransportClient.builder().settings(settings).build();
+      for (String h : elasticsearchConfiguration.getHosts()) {
+        LOGGER.info("Adding Host: {}", h);
+        InetAddress address;
+
+        if ( InetAddresses.isInetAddress(h)) {
+          LOGGER.info("{} is an IP address", h);
+          address = InetAddresses.forString(h);
+        } else {
+          LOGGER.info("{} is a hostname", h);
+          address = InetAddress.getByName(h);
         }
-    }
+        transportClient.addTransportAddress(
+            new InetSocketTransportAddress(
+                address,
+                elasticsearchConfiguration.getPort().intValue()));
+      }
 
-    public ClusterHealthResponse getStatus() throws ExecutionException, InterruptedException {
-        ClusterHealthRequestBuilder request = this.getClient().admin().cluster().prepareHealth();
-        return request.execute().get();
-    }
+      // Add the client and figure out the version.
+      ElasticsearchClient elasticsearchClient = new ElasticsearchClient(transportClient, getVersion(transportClient));
 
-    public String toString() {
-        return ToStringBuilder.reflectionToString(this);
-    }
+      // Add it to our static map
+      ALL_CLIENTS.put(clusterName, elasticsearchClient);
 
-    public boolean equals(Object o) {
-        return EqualsBuilder.reflectionEquals(this, o, Collections.singletonList(this.elasticsearchConfiguration.toString()));
+    } catch (Exception ex) {
+      LOGGER.error("Could not Create elasticsearch Transport Client: {}", ex);
     }
 
-    public int hashCode() {
-        return HashCodeBuilder.reflectionHashCode(this, Collections.singletonList(this.elasticsearchConfiguration.toString()));
-    }
+  }
 
-    private synchronized void checkAndLoadClient(String clusterName) {
-
-        if (clusterName == null)
-            clusterName = this.elasticsearchConfiguration.getClusterName();
-
-        // If it is there, exit early
-        if (ALL_CLIENTS.containsKey(clusterName))
-            return;
-
-        try {
-            // We are currently using lazy loading to start the elasticsearch cluster, however.
-            LOGGER.info("Creating a new TransportClient: {}", this.elasticsearchConfiguration.getHosts());
-
-            Settings settings = Settings.settingsBuilder()
-                    .put("cluster.name", this.elasticsearchConfiguration.getClusterName())
-                    .put("client.transport.ping_timeout", "90s")
-                    .put("client.transport.nodes_sampler_interval", "60s")
-                    .build();
-
-
-            // Create the client
-            TransportClient transportClient = TransportClient.builder().settings(settings).build();
-            for (String h : elasticsearchConfiguration.getHosts()) {
-                LOGGER.info("Adding Host: {}", h);
-                InetAddress address;
-
-                if( InetAddresses.isInetAddress(h)) {
-                    LOGGER.info("{} is an IP address", h);
-                    address = InetAddresses.forString(h);
-                } else {
-                    LOGGER.info("{} is a hostname", h);
-                    address = InetAddress.getByName(h);
-                }
-                transportClient.addTransportAddress(
-                        new InetSocketTransportAddress(
-                                address,
-                                elasticsearchConfiguration.getPort().intValue()));
-            }
-            // Add the client and figure out the version.
-            ElasticsearchClient elasticsearchClient = new ElasticsearchClient(transportClient, getVersion(transportClient));
-
-            // Add it to our static map
-            ALL_CLIENTS.put(clusterName, elasticsearchClient);
-
-        } catch (Exception e) {
-            LOGGER.error("Could not Create elasticsearch Transport Client: {}", e);
-        }
+  private Version getVersion(Client client) {
+    try {
+      ClusterStateRequestBuilder clusterStateRequestBuilder = client.admin().cluster().prepareState();
+      ClusterStateResponse clusterStateResponse = clusterStateRequestBuilder.execute().actionGet();
 
+      return clusterStateResponse.getState().getNodes().getMasterNode().getVersion();
+    } catch (Exception ex) {
+      return null;
     }
-
-    private Version getVersion(Client client) {
-        try {
-            ClusterStateRequestBuilder clusterStateRequestBuilder = client.admin().cluster().prepareState();
-            ClusterStateResponse clusterStateResponse = clusterStateRequestBuilder.execute().actionGet();
-
-            return clusterStateResponse.getState().getNodes().getMasterNode().getVersion();
-        } catch (Exception e) {
-            return null;
-        }
-    }
+  }
 }