You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streams.apache.org by sb...@apache.org on 2016/11/25 20:25:15 UTC

[34/42] incubator-streams git commit: STREAMS-440: custom checkstyle.xml, address compliance

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistWriter.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistWriter.java b/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistWriter.java
index 492eccb..4554c0f 100644
--- a/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistWriter.java
+++ b/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistWriter.java
@@ -18,13 +18,6 @@
 
 package org.apache.streams.hdfs;
 
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.google.common.base.Strings;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.streams.config.ComponentConfigurator;
 import org.apache.streams.config.StreamsConfigurator;
 import org.apache.streams.converter.LineReadWriteUtil;
@@ -34,6 +27,15 @@ import org.apache.streams.core.DatumStatusCounter;
 import org.apache.streams.core.StreamsDatum;
 import org.apache.streams.core.StreamsPersistWriter;
 import org.apache.streams.jackson.StreamsJacksonMapper;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.base.Strings;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.security.UserGroupInformation;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -50,249 +52,278 @@ import java.util.List;
 import java.util.Queue;
 import java.util.zip.GZIPOutputStream;
 
+/**
+ * WebHdfsPersistWriter writes to hdfs.
+ */
 public class WebHdfsPersistWriter implements StreamsPersistWriter, Flushable, Closeable, DatumStatusCountable {
 
-    public final static String STREAMS_ID = "WebHdfsPersistWriter";
-
-    private final static Logger LOGGER = LoggerFactory.getLogger(WebHdfsPersistWriter.class);
-
-    private FileSystem client;
-    private Path path;
-    private int linesPerFile;
-    private int totalRecordsWritten = 0;
-    private final List<Path> writtenFiles = new ArrayList<Path>();
-    private int fileLineCounter = 0;
-    private OutputStreamWriter currentWriter = null;
-
-    private static final int BYTES_IN_MB = 1024 * 1024;
-    private static final int BYTES_BEFORE_FLUSH = 64 * BYTES_IN_MB;
-    private volatile int totalByteCount = 0;
-    private volatile int byteCount = 0;
-
-    public boolean terminate = false;
-
-    protected volatile Queue<StreamsDatum> persistQueue;
-
-    private ObjectMapper mapper;
-    private LineReadWriteUtil lineWriterUtil;
-
-    protected HdfsWriterConfiguration hdfsConfiguration;
-
-    public WebHdfsPersistWriter() {
-        this(new ComponentConfigurator<>(HdfsWriterConfiguration.class).detectConfiguration(StreamsConfigurator.getConfig().getConfig("hdfs")));
-    }
-
-    public WebHdfsPersistWriter(HdfsWriterConfiguration hdfsConfiguration) {
-        this.hdfsConfiguration = hdfsConfiguration;
-        this.linesPerFile = hdfsConfiguration.getLinesPerFile().intValue();
+  public static final String STREAMS_ID = "WebHdfsPersistWriter";
+
+  private static final Logger LOGGER = LoggerFactory.getLogger(WebHdfsPersistWriter.class);
+
+  private FileSystem client;
+  private Path path;
+  private int linesPerFile;
+  private int totalRecordsWritten = 0;
+  private final List<Path> writtenFiles = new ArrayList<Path>();
+  private int fileLineCounter = 0;
+  private OutputStreamWriter currentWriter = null;
+
+  private static final int BYTES_IN_MB = 1024 * 1024;
+  private static final int BYTES_BEFORE_FLUSH = 64 * BYTES_IN_MB;
+  private volatile int totalByteCount = 0;
+  private volatile int byteCount = 0;
+
+  public boolean terminate = false;
+
+  protected volatile Queue<StreamsDatum> persistQueue;
+
+  private ObjectMapper mapper;
+  private LineReadWriteUtil lineWriterUtil;
+
+  protected HdfsWriterConfiguration hdfsConfiguration;
+
+  public WebHdfsPersistWriter() {
+    this(new ComponentConfigurator<>(HdfsWriterConfiguration.class).detectConfiguration(StreamsConfigurator.getConfig().getConfig("hdfs")));
+  }
+
+  public WebHdfsPersistWriter(HdfsWriterConfiguration hdfsConfiguration) {
+    this.hdfsConfiguration = hdfsConfiguration;
+    this.linesPerFile = hdfsConfiguration.getLinesPerFile().intValue();
+  }
+
+  /**
+   * getURI from hdfsConfiguration.
+   * @return URI
+   * @throws URISyntaxException URISyntaxException
+   */
+  // TODO: combine with WebHdfsPersistReader.getURI
+  public URI getURI() throws URISyntaxException {
+    StringBuilder uriBuilder = new StringBuilder();
+    uriBuilder.append(hdfsConfiguration.getScheme());
+    uriBuilder.append("://");
+    if ( !Strings.isNullOrEmpty(hdfsConfiguration.getHost())) {
+      uriBuilder.append(hdfsConfiguration.getHost() + ":" + hdfsConfiguration.getPort());
+    } else {
+      uriBuilder.append("/");
     }
-
-    public URI getURI() throws URISyntaxException {
-        StringBuilder uriBuilder = new StringBuilder();
-        uriBuilder.append(hdfsConfiguration.getScheme());
-        uriBuilder.append("://");
-        if( !Strings.isNullOrEmpty(hdfsConfiguration.getHost()))
-            uriBuilder.append(hdfsConfiguration.getHost() + ":" + hdfsConfiguration.getPort());
-        else
-            uriBuilder.append("/");
-        return new URI(uriBuilder.toString());
+    return new URI(uriBuilder.toString());
+  }
+
+  /**
+   * isConnected.
+   * @return true if connected, false otherwise
+   */
+  // TODO: combine with WebHdfsPersistReader.isConnected
+  public boolean isConnected() {
+    return (client != null);
+  }
+
+  /**
+   * getFileSystem.
+   * @return FileSystem
+   */
+  // TODO: combine with WebHdfsPersistReader.getFileSystem
+  public final synchronized FileSystem getFileSystem() {
+    // Check to see if we are connected.
+    if (!isConnected()) {
+      connectToWebHDFS();
     }
-
-    public boolean isConnected() {
-        return (client != null);
-    }
-
-    public final synchronized FileSystem getFileSystem() {
-        // Check to see if we are connected.
-        if (!isConnected())
-            connectToWebHDFS();
-        return this.client;
-    }
-
-    private synchronized void connectToWebHDFS() {
-        try {
-            LOGGER.info("User : {}", this.hdfsConfiguration.getUser());
-            UserGroupInformation ugi = UserGroupInformation.createRemoteUser(this.hdfsConfiguration.getUser());
-            ugi.setAuthenticationMethod(UserGroupInformation.AuthenticationMethod.SIMPLE);
-
-            ugi.doAs(new PrivilegedExceptionAction<Void>() {
-                public Void run() throws Exception {
-                    Configuration conf = new Configuration();
-                    conf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION, "kerberos");
-                    LOGGER.info("WebURI : {}", getURI().toString());
-                    client = FileSystem.get(getURI(), conf);
-                    LOGGER.info("Connected to WebHDFS");
-
-                    /*
-                    * ************************************************************************************************
-                    * This code is an example of how you would work with HDFS and you weren't going over
-                    * the webHDFS protocol.
-                    *
-                    * Smashew: 2013-10-01
-                    * ************************************************************************************************
-                    conf.set("fs.defaultFS", "hdfs://hadoop.mdigitallife.com:8020/user/" + userName);
-                    conf.set("namenode.host","0.0.0.0");
-                    conf.set("hadoop.job.ugi", userName);
-                    conf.set(DFSConfigKeys.DFS_NAMENODE_USER_NAME_KEY, "runner");
-                    fileSystem.createNewFile(new Path("/user/"+ userName + "/test"));
-                    FileStatus[] status = fs.listStatus(new Path("/user/" + userName));
-                    for(int i=0;i<status.length;i++)
-                    {
-                        LOGGER.info("Directory: {}", status[i].getPath());
-                    }
-                    */
-                    return null;
-                }
-            });
-        } catch (Exception e) {
-            LOGGER.error("There was an error connecting to WebHDFS, please check your settings and try again", e);
-            throw new RuntimeException(e);
+    return this.client;
+  }
+
+  private synchronized void connectToWebHDFS() {
+    try {
+      LOGGER.info("User : {}", this.hdfsConfiguration.getUser());
+      UserGroupInformation ugi = UserGroupInformation.createRemoteUser(this.hdfsConfiguration.getUser());
+      ugi.setAuthenticationMethod(UserGroupInformation.AuthenticationMethod.SIMPLE);
+
+      ugi.doAs(new PrivilegedExceptionAction<Void>() {
+        public Void run() throws Exception {
+          Configuration conf = new Configuration();
+          conf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION, "kerberos");
+          LOGGER.info("WebURI : {}", getURI().toString());
+          client = FileSystem.get(getURI(), conf);
+          LOGGER.info("Connected to WebHDFS");
+
+          /*
+          * ************************************************************************************************
+          * This code is an example of how you would work with HDFS and you weren't going over
+          * the webHDFS protocol.
+          *
+          * Smashew: 2013-10-01
+          * ************************************************************************************************
+          conf.set("fs.defaultFS", "hdfs://hadoop.mdigitallife.com:8020/user/" + userName);
+          conf.set("namenode.host","0.0.0.0");
+          conf.set("hadoop.job.ugi", userName);
+          conf.set(DFSConfigKeys.DFS_NAMENODE_USER_NAME_KEY, "runner");
+          fileSystem.createNewFile(new Path("/user/"+ userName + "/test"));
+          FileStatus[] status = fs.listStatus(new Path("/user/" + userName));
+          for(int i=0;i<status.length;i++)
+          {
+              LOGGER.info("Directory: {}", status[i].getPath());
+          }
+          */
+
+          return null;
         }
+      });
+    } catch (Exception ex) {
+      LOGGER.error("There was an error connecting to WebHDFS, please check your settings and try again", ex);
+      throw new RuntimeException(ex);
     }
-
-    @Override
-    public String getId() {
-        return STREAMS_ID;
-    }
-
-    @Override
-    public void write(StreamsDatum streamsDatum) {
-
-        synchronized (this) {
-            // Check to see if we need to reset the file that we are currently working with
-            if (this.currentWriter == null || (this.fileLineCounter > this.linesPerFile))
-                resetFile();
-
-            String line = lineWriterUtil.convertResultToString(streamsDatum);
-            writeInternal(line);
-            if( !line.endsWith(this.hdfsConfiguration.getLineDelimiter()))
-                writeInternal(this.hdfsConfiguration.getLineDelimiter());
-            int bytesInLine = line.getBytes().length;
-
-            totalRecordsWritten++;
-            totalByteCount += bytesInLine;
-            byteCount += bytesInLine;
-
-            if (byteCount > BYTES_BEFORE_FLUSH)
-                try {
-                    flush();
-                } catch (IOException e) {
-                    LOGGER.warn("Error flushing to HDFS. Creating a new file and continuing execution.  WARNING: There could be data loss.", e);
-                }
-
-            this.fileLineCounter++;
+  }
+
+  @Override
+  public String getId() {
+    return STREAMS_ID;
+  }
+
+  @Override
+  public void write(StreamsDatum streamsDatum) {
+
+    synchronized (this) {
+      // Check to see if we need to reset the file that we are currently working with
+      if (this.currentWriter == null || (this.fileLineCounter > this.linesPerFile)) {
+        resetFile();
+      }
+      String line = lineWriterUtil.convertResultToString(streamsDatum);
+      writeInternal(line);
+      if ( !line.endsWith(this.hdfsConfiguration.getLineDelimiter())) {
+        writeInternal(this.hdfsConfiguration.getLineDelimiter());
+      }
+      int bytesInLine = line.getBytes().length;
+
+      totalRecordsWritten++;
+      totalByteCount += bytesInLine;
+      byteCount += bytesInLine;
+
+      if (byteCount > BYTES_BEFORE_FLUSH) {
+        try {
+          flush();
+        } catch (IOException ex) {
+          LOGGER.warn("Error flushing to HDFS. Creating a new file and continuing execution.  WARNING: There could be data loss.", ex);
         }
+      }
+      this.fileLineCounter++;
     }
-
-    private void writeInternal(String line) {
+  }
+
+  private void writeInternal(String line) {
+    try {
+      this.currentWriter.write(line);
+    } catch (IOException ex) {
+      LOGGER.warn("Error writing to HDFS.  Attempting to try a new file", ex);
+      try {
+        resetFile();
+        this.currentWriter.write(line);
+      } catch (Exception e2) {
+        LOGGER.warn("Failed to write even after creating a new file.  Attempting to reconnect", e2);
         try {
-            this.currentWriter.write(line);
-        } catch (IOException e) {
-            LOGGER.warn("Error writing to HDFS.  Attempting to try a new file", e);
-            try{
-                resetFile();
-                this.currentWriter.write(line);
-            } catch (Exception io) {
-                LOGGER.warn("Failed to write even after creating a new file.  Attempting to reconnect", io);
-                try {
-                    connectToWebHDFS();
-                    resetFile();
-                    this.currentWriter.write(line);
-                } catch (Exception ex) {
-                    LOGGER.error("Failed to write to HDFS after reconnecting client. Terminating writer.", ex);
-                    throw new RuntimeException(e);
-                }
-            }
-
+          connectToWebHDFS();
+          resetFile();
+          this.currentWriter.write(line);
+        } catch (Exception e3) {
+          LOGGER.error("Failed to write to HDFS after reconnecting client. Terminating writer.", e3);
+          throw new RuntimeException(e3);
         }
-    }
+      }
 
-    public void flush() throws IOException {
-        if (this.currentWriter != null && byteCount > BYTES_BEFORE_FLUSH) {
-            this.currentWriter.flush();
-            byteCount = 0;
-        }
     }
+  }
 
-    private synchronized void resetFile() {
-        // this will keep it thread safe, so we don't create too many files
-        if (this.fileLineCounter == 0 && this.currentWriter != null)
-            return;
+  @Override
+  public void flush() throws IOException {
+    if (this.currentWriter != null && byteCount > BYTES_BEFORE_FLUSH) {
+      this.currentWriter.flush();
+      byteCount = 0;
+    }
+  }
 
-        // Create the path for where the file is going to live.
-        Path filePath = this.path.suffix("/" + hdfsConfiguration.getWriterFilePrefix() + "-" + new Date().getTime());
+  private synchronized void resetFile() {
+    // this will keep it thread safe, so we don't create too many files
+    if (this.fileLineCounter == 0 && this.currentWriter != null) {
+      return;
+    }
 
-        if( hdfsConfiguration.getCompression().equals(HdfsWriterConfiguration.Compression.GZIP))
-            filePath = filePath.suffix(".gz");
-        else
-            filePath = filePath.suffix(".tsv");
+    // Create the path for where the file is going to live.
+    Path filePath = this.path.suffix("/" + hdfsConfiguration.getWriterFilePrefix() + "-" + new Date().getTime());
 
-        try {
+    if ( hdfsConfiguration.getCompression().equals(HdfsWriterConfiguration.Compression.GZIP)) {
+      filePath = filePath.suffix(".gz");
+    } else {
+      filePath = filePath.suffix(".tsv");
+    }
 
-            // if there is a current writer, we must close it first.
-            if (this.currentWriter != null) {
-                flush();
-                close();
-            }
+    try {
 
-            this.fileLineCounter = 0;
+      // if there is a current writer, we must close it first.
+      if (this.currentWriter != null) {
+        flush();
+        close();
+      }
 
-            // Check to see if a file of the same name exists, if it does, then we are not going to be able to proceed.
-            if (client.exists(filePath))
-                throw new RuntimeException("Unable to create file: " + filePath);
+      this.fileLineCounter = 0;
 
-            if( hdfsConfiguration.getCompression().equals(HdfsWriterConfiguration.Compression.GZIP))
-                this.currentWriter = new OutputStreamWriter(new GZIPOutputStream(client.create(filePath)));
-            else
-                this.currentWriter = new OutputStreamWriter(client.create(filePath));
+      // Check to see if a file of the same name exists, if it does, then we are not going to be able to proceed.
+      if (client.exists(filePath)) {
+        throw new RuntimeException("Unable to create file: " + filePath);
+      }
 
-            // Add another file to the list of written files.
-            writtenFiles.add(filePath);
+      if ( hdfsConfiguration.getCompression().equals(HdfsWriterConfiguration.Compression.GZIP)) {
+        this.currentWriter = new OutputStreamWriter(new GZIPOutputStream(client.create(filePath)));
+      } else {
+        this.currentWriter = new OutputStreamWriter(client.create(filePath));
+      }
 
-            LOGGER.info("File Created: {}", filePath);
-        } catch (Exception e) {
-            LOGGER.error("COULD NOT CreateFile: {}", filePath);
-            LOGGER.error(e.getMessage());
-            throw new RuntimeException(e);
-        }
-    }
+      // Add another file to the list of written files.
+      writtenFiles.add(filePath);
 
-    public synchronized void close() throws IOException {
-        if (this.currentWriter != null) {
-            this.currentWriter.flush();
-            this.currentWriter.close();
-            this.currentWriter = null;
-            LOGGER.info("File Closed");
-        }
+      LOGGER.info("File Created: {}", filePath);
+    } catch (Exception ex) {
+      LOGGER.error("COULD NOT CreateFile: {}", filePath);
+      LOGGER.error(ex.getMessage());
+      throw new RuntimeException(ex);
     }
-
-    @Override
-    public void prepare(Object configurationObject) {
-        mapper = StreamsJacksonMapper.getInstance();
-        lineWriterUtil = LineReadWriteUtil.getInstance(hdfsConfiguration);
-        connectToWebHDFS();
-        path = new Path(hdfsConfiguration.getPath() + "/" + hdfsConfiguration.getWriterPath());
+  }
+
+  @Override
+  public synchronized void close() throws IOException {
+    if (this.currentWriter != null) {
+      this.currentWriter.flush();
+      this.currentWriter.close();
+      this.currentWriter = null;
+      LOGGER.info("File Closed");
     }
-
-    @Override
-    public void cleanUp() {
-        try {
-            flush();
-        } catch (IOException e) {
-            LOGGER.error("Error flushing on cleanup", e);
-        }
-        try {
-            close();
-        } catch (IOException e) {
-            LOGGER.error("Error closing on cleanup", e);
-        }
+  }
+
+  @Override
+  public void prepare(Object configurationObject) {
+    mapper = StreamsJacksonMapper.getInstance();
+    lineWriterUtil = LineReadWriteUtil.getInstance(hdfsConfiguration);
+    connectToWebHDFS();
+    path = new Path(hdfsConfiguration.getPath() + "/" + hdfsConfiguration.getWriterPath());
+  }
+
+  @Override
+  public void cleanUp() {
+    try {
+      flush();
+    } catch (IOException ex) {
+      LOGGER.error("Error flushing on cleanup", ex);
     }
-
-    @Override
-    public DatumStatusCounter getDatumStatusCounter() {
-        DatumStatusCounter counters = new DatumStatusCounter();
-        counters.incrementAttempt(this.totalRecordsWritten);
-        counters.incrementStatus(DatumStatus.SUCCESS, this.totalRecordsWritten);
-        return counters;
+    try {
+      close();
+    } catch (IOException ex) {
+      LOGGER.error("Error closing on cleanup", ex);
     }
+  }
+
+  @Override
+  public DatumStatusCounter getDatumStatusCounter() {
+    DatumStatusCounter counters = new DatumStatusCounter();
+    counters.incrementAttempt(this.totalRecordsWritten);
+    counters.incrementStatus(DatumStatus.SUCCESS, this.totalRecordsWritten);
+    return counters;
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistWriterTask.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistWriterTask.java b/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistWriterTask.java
index 00cf17f..eb808c1 100644
--- a/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistWriterTask.java
+++ b/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistWriterTask.java
@@ -19,38 +19,43 @@
 package org.apache.streams.hdfs;
 
 import org.apache.streams.core.StreamsDatum;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.Random;
 
+/**
+ * WebHdfsPersistReaderTask writes to hdfs on behalf of
+ * @see org.apache.streams.hdfs.WebHdfsPersistWriter
+ */
 public class WebHdfsPersistWriterTask implements Runnable {
 
-    private static final Logger LOGGER = LoggerFactory.getLogger(WebHdfsPersistWriterTask.class);
+  private static final Logger LOGGER = LoggerFactory.getLogger(WebHdfsPersistWriterTask.class);
 
-    private WebHdfsPersistWriter writer;
+  private WebHdfsPersistWriter writer;
 
-    public WebHdfsPersistWriterTask(WebHdfsPersistWriter writer) {
-        this.writer = writer;
-    }
+  public WebHdfsPersistWriterTask(WebHdfsPersistWriter writer) {
+    this.writer = writer;
+  }
 
-    @Override
-    public void run() {
-
-        while(true) {
-            if( writer.persistQueue.peek() != null ) {
-                try {
-                    StreamsDatum entry = writer.persistQueue.remove();
-                    writer.write(entry);
-                } catch (Exception e) {
-                    e.printStackTrace();
-                }
-            }
-            try {
-                Thread.sleep(new Random().nextInt(1));
-            } catch (InterruptedException e) {}
-        }
+  @Override
+  public void run() {
 
+    while (true) {
+      if ( writer.persistQueue.peek() != null ) {
+        try {
+          StreamsDatum entry = writer.persistQueue.remove();
+          writer.write(entry);
+        } catch (Exception e) {
+          e.printStackTrace();
+        }
+      }
+      try {
+        Thread.sleep(new Random().nextInt(1));
+      } catch (InterruptedException e) {}
     }
 
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-contrib/streams-persist-hdfs/src/test/java/org/apache/streams/hdfs/test/HdfsPersistConfigTest.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-hdfs/src/test/java/org/apache/streams/hdfs/test/HdfsPersistConfigTest.java b/streams-contrib/streams-persist-hdfs/src/test/java/org/apache/streams/hdfs/test/HdfsPersistConfigTest.java
index 819414a..a35f124 100644
--- a/streams-contrib/streams-persist-hdfs/src/test/java/org/apache/streams/hdfs/test/HdfsPersistConfigTest.java
+++ b/streams-contrib/streams-persist-hdfs/src/test/java/org/apache/streams/hdfs/test/HdfsPersistConfigTest.java
@@ -33,7 +33,7 @@ import static org.junit.Assert.*;
  */
 public class HdfsPersistConfigTest {
 
-    private final static Logger LOGGER = LoggerFactory.getLogger(HdfsPersistConfigTest.class);
+    private static final Logger LOGGER = LoggerFactory.getLogger(HdfsPersistConfigTest.class);
 
     @Test
     public void getWriterFileUriTest()

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-contrib/streams-persist-hdfs/src/test/java/org/apache/streams/hdfs/test/TestHdfsPersist.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-hdfs/src/test/java/org/apache/streams/hdfs/test/TestHdfsPersist.java b/streams-contrib/streams-persist-hdfs/src/test/java/org/apache/streams/hdfs/test/TestHdfsPersist.java
index ff33ec3..7191d9a 100644
--- a/streams-contrib/streams-persist-hdfs/src/test/java/org/apache/streams/hdfs/test/TestHdfsPersist.java
+++ b/streams-contrib/streams-persist-hdfs/src/test/java/org/apache/streams/hdfs/test/TestHdfsPersist.java
@@ -46,7 +46,7 @@ import java.util.List;
  */
 public class TestHdfsPersist {
 
-    private final static Logger LOGGER = LoggerFactory.getLogger(TestHdfsPersist.class);
+    private static final Logger LOGGER = LoggerFactory.getLogger(TestHdfsPersist.class);
 
     ObjectMapper MAPPER = StreamsJacksonMapper.getInstance();
 

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-contrib/streams-persist-kafka/src/main/java/org/apache/streams/kafka/KafkaPersistReader.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-kafka/src/main/java/org/apache/streams/kafka/KafkaPersistReader.java b/streams-contrib/streams-persist-kafka/src/main/java/org/apache/streams/kafka/KafkaPersistReader.java
index d54e794..64f7200 100644
--- a/streams-contrib/streams-persist-kafka/src/main/java/org/apache/streams/kafka/KafkaPersistReader.java
+++ b/streams-contrib/streams-persist-kafka/src/main/java/org/apache/streams/kafka/KafkaPersistReader.java
@@ -18,19 +18,14 @@
 
 package org.apache.streams.kafka;
 
-import com.fasterxml.jackson.databind.ObjectMapper;
-import kafka.consumer.Consumer;
-import kafka.consumer.ConsumerConfig;
-import kafka.consumer.KafkaStream;
-import kafka.consumer.Whitelist;
-import kafka.javaapi.consumer.ConsumerConnector;
-import kafka.serializer.StringDecoder;
-import kafka.utils.VerifiableProperties;
 import org.apache.streams.config.ComponentConfigurator;
 import org.apache.streams.config.StreamsConfigurator;
 import org.apache.streams.core.StreamsDatum;
 import org.apache.streams.core.StreamsPersistReader;
 import org.apache.streams.core.StreamsResultSet;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+
 import org.joda.time.DateTime;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -45,113 +40,132 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
 
-public class KafkaPersistReader implements StreamsPersistReader, Serializable {
-
-    public final static String STREAMS_ID = "KafkaPersistReader";
-
-    private static final Logger LOGGER = LoggerFactory.getLogger(KafkaPersistReader.class);
-
-    protected volatile Queue<StreamsDatum> persistQueue;
-
-    private ObjectMapper mapper = new ObjectMapper();
-
-    private KafkaConfiguration config;
-
-    private ConsumerConnector consumerConnector;
-
-    public List<KafkaStream<String, String>> inStreams;
-
-    private ExecutorService executor = Executors.newSingleThreadExecutor();
+import kafka.consumer.Consumer;
+import kafka.consumer.ConsumerConfig;
+import kafka.consumer.KafkaStream;
+import kafka.consumer.Whitelist;
+import kafka.javaapi.consumer.ConsumerConnector;
+import kafka.serializer.StringDecoder;
+import kafka.utils.VerifiableProperties;
 
-    public KafkaPersistReader() {
-        this.config = new ComponentConfigurator<>(KafkaConfiguration.class)
-          .detectConfiguration(StreamsConfigurator.getConfig().getConfig("kafka"));
-        this.persistQueue  = new ConcurrentLinkedQueue<>();
-    }
+/**
+ * KafkaPersistReader reads documents from kafka.
+ */
+public class KafkaPersistReader implements StreamsPersistReader, Serializable {
 
-    public KafkaPersistReader(Queue<StreamsDatum> persistQueue) {
-        this.config = new ComponentConfigurator<>(KafkaConfiguration.class)
-          .detectConfiguration(StreamsConfigurator.getConfig().getConfig("kafka"));
-        this.persistQueue = persistQueue;
-    }
+  public static final String STREAMS_ID = "KafkaPersistReader";
 
-    public void setConfig(KafkaConfiguration config) {
-        this.config = config;
-    }
+  private static final Logger LOGGER = LoggerFactory.getLogger(KafkaPersistReader.class);
 
-    @Override
-    public String getId() {
-        return STREAMS_ID;
-    }
+  protected volatile Queue<StreamsDatum> persistQueue;
 
-    @Override
-    public void startStream() {
+  private ObjectMapper mapper = new ObjectMapper();
 
-        Properties props = new Properties();
-        props.setProperty("serializer.encoding", "UTF8");
+  private KafkaConfiguration config;
 
-        ConsumerConfig consumerConfig = new ConsumerConfig(props);
+  private ConsumerConnector consumerConnector;
 
-        consumerConnector = Consumer.createJavaConsumerConnector(consumerConfig);
+  public List<KafkaStream<String, String>> inStreams;
 
-        Whitelist topics = new Whitelist(config.getTopic());
-        VerifiableProperties vprops = new VerifiableProperties(props);
+  private ExecutorService executor = Executors.newSingleThreadExecutor();
 
-        inStreams = consumerConnector.createMessageStreamsByFilter(topics, 1, new StringDecoder(vprops), new StringDecoder(vprops));
+  /**
+   * KafkaPersistReader constructor - resolves KafkaConfiguration from JVM 'kafka'.
+   */
+  public KafkaPersistReader() {
+    this.config = new ComponentConfigurator<>(KafkaConfiguration.class)
+        .detectConfiguration(StreamsConfigurator.getConfig().getConfig("kafka"));
+    this.persistQueue  = new ConcurrentLinkedQueue<>();
+  }
 
-        for (final KafkaStream stream : inStreams) {
-            executor.submit(new KafkaPersistReaderTask(this, stream));
-        }
+  /**
+   * KafkaPersistReader constructor - uses supplied persistQueue.
+   */
+  public KafkaPersistReader(Queue<StreamsDatum> persistQueue) {
+    this.config = new ComponentConfigurator<>(KafkaConfiguration.class)
+        .detectConfiguration(StreamsConfigurator.getConfig().getConfig("kafka"));
+    this.persistQueue = persistQueue;
+  }
 
-    }
+  public void setConfig(KafkaConfiguration config) {
+    this.config = config;
+  }
 
-    @Override
-    public StreamsResultSet readAll() {
-        return readCurrent();
-    }
+  @Override
+  public String getId() {
+    return STREAMS_ID;
+  }
 
-    @Override
-    public StreamsResultSet readCurrent() {
-        return null;
-    }
+  @Override
+  public void startStream() {
 
-    @Override
-    public StreamsResultSet readNew(BigInteger bigInteger) {
-        return null;
-    }
+    Properties props = new Properties();
+    props.setProperty("serializer.encoding", "UTF8");
 
-    @Override
-    public StreamsResultSet readRange(DateTime dateTime, DateTime dateTime2) {
-        return null;
-    }
+    ConsumerConfig consumerConfig = new ConsumerConfig(props);
 
-    @Override
-    public boolean isRunning() {
-        return !executor.isShutdown() && !executor.isTerminated();
-    }
+    consumerConnector = Consumer.createJavaConsumerConnector(consumerConfig);
 
-    private static ConsumerConfig createConsumerConfig(String a_zookeeper, String a_groupId) {
-        Properties props = new Properties();
-        props.put("zookeeper.connect", a_zookeeper);
-        props.put("group.id", a_groupId);
-        props.put("zookeeper.session.timeout.ms", "400");
-        props.put("zookeeper.sync.time.ms", "200");
-        props.put("auto.commit.interval.ms", "1000");
-        return new ConsumerConfig(props);
-    }
+    Whitelist topics = new Whitelist(config.getTopic());
+    VerifiableProperties vprops = new VerifiableProperties(props);
 
-    @Override
-    public void prepare(Object configurationObject) {
+    inStreams = consumerConnector.createMessageStreamsByFilter(topics, 1, new StringDecoder(vprops), new StringDecoder(vprops));
 
+    for (final KafkaStream stream : inStreams) {
+      executor.submit(new KafkaPersistReaderTask(this, stream));
     }
 
-    @Override
-    public void cleanUp() {
-        consumerConnector.shutdown();
-        while( !executor.isTerminated()) {
-            try {
-                executor.awaitTermination(5, TimeUnit.SECONDS);
-            } catch (InterruptedException ignored) {}
-        }
+  }
+
+  @Override
+  public StreamsResultSet readAll() {
+    return readCurrent();
+  }
+
+  @Override
+  public StreamsResultSet readCurrent() {
+    return null;
+  }
+
+  @Override
+  public StreamsResultSet readNew(BigInteger bigInteger) {
+    return null;
+  }
+
+  @Override
+  public StreamsResultSet readRange(DateTime dateTime, DateTime dateTime2) {
+    return null;
+  }
+
+  @Override
+  public boolean isRunning() {
+    return !executor.isShutdown() && !executor.isTerminated();
+  }
+
+  private static ConsumerConfig createConsumerConfig(String zookeeper, String groupId) {
+    Properties props = new Properties();
+    props.put("zookeeper.connect", zookeeper);
+    props.put("group.id", groupId);
+    props.put("zookeeper.session.timeout.ms", "400");
+    props.put("zookeeper.sync.time.ms", "200");
+    props.put("auto.commit.interval.ms", "1000");
+    return new ConsumerConfig(props);
+  }
+
+  @Override
+  public void prepare(Object configurationObject) {
+
+  }
+
+  @Override
+  public void cleanUp() {
+    consumerConnector.shutdown();
+    while ( !executor.isTerminated()) {
+      try {
+        executor.awaitTermination(5, TimeUnit.SECONDS);
+      } catch (InterruptedException interrupt) {
+        LOGGER.trace("Interrupt", interrupt);
+      }
     }
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-contrib/streams-persist-kafka/src/main/java/org/apache/streams/kafka/KafkaPersistReaderTask.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-kafka/src/main/java/org/apache/streams/kafka/KafkaPersistReaderTask.java b/streams-contrib/streams-persist-kafka/src/main/java/org/apache/streams/kafka/KafkaPersistReaderTask.java
index 83493e0..199be73 100644
--- a/streams-contrib/streams-persist-kafka/src/main/java/org/apache/streams/kafka/KafkaPersistReaderTask.java
+++ b/streams-contrib/streams-persist-kafka/src/main/java/org/apache/streams/kafka/KafkaPersistReaderTask.java
@@ -18,45 +18,51 @@
 
 package org.apache.streams.kafka;
 
-import kafka.consumer.ConsumerIterator;
-import kafka.consumer.KafkaStream;
-import kafka.message.MessageAndMetadata;
 import org.apache.streams.core.StreamsDatum;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.Random;
 
-public class KafkaPersistReaderTask implements Runnable {
-
-    private static final Logger LOGGER = LoggerFactory.getLogger(KafkaPersistReaderTask.class);
-
-    private KafkaPersistReader reader;
-    private KafkaStream<String,String> stream;
+import kafka.consumer.ConsumerIterator;
+import kafka.consumer.KafkaStream;
+import kafka.message.MessageAndMetadata;
 
-    public KafkaPersistReaderTask(KafkaPersistReader reader, KafkaStream<String,String> stream) {
-        this.reader = reader;
-        this.stream = stream;
-    }
+/**
+ * KafkaPersistReaderTask reads documents from kafka on behalf of
+ * @see org.apache.streams.kafka.KafkaPersistReader
+ */
+public class KafkaPersistReaderTask implements Runnable {
 
+  private static final Logger LOGGER = LoggerFactory.getLogger(KafkaPersistReaderTask.class);
 
+  private KafkaPersistReader reader;
+  private KafkaStream<String,String> stream;
 
-    @Override
-    public void run() {
+  public KafkaPersistReaderTask(KafkaPersistReader reader, KafkaStream<String,String> stream) {
+    this.reader = reader;
+    this.stream = stream;
+  }
 
-        MessageAndMetadata<String,String> item;
-        while(true) {
+  @Override
+  public void run() {
 
-            ConsumerIterator<String, String> it = stream.iterator();
-            while (it.hasNext()) {
-                item = it.next();
-                reader.persistQueue.add(new StreamsDatum(item.message()));
-            }
-            try {
-                Thread.sleep(new Random().nextInt(100));
-            } catch (InterruptedException e) {}
-        }
+    MessageAndMetadata<String,String> item;
+    while (true) {
 
+      ConsumerIterator<String, String> it = stream.iterator();
+      while (it.hasNext()) {
+        item = it.next();
+        reader.persistQueue.add(new StreamsDatum(item.message()));
+      }
+      try {
+        Thread.sleep(new Random().nextInt(100));
+      } catch (InterruptedException interrupt) {
+        LOGGER.trace("Interrupt", interrupt);
+      }
     }
 
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-contrib/streams-persist-kafka/src/main/java/org/apache/streams/kafka/KafkaPersistWriter.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-kafka/src/main/java/org/apache/streams/kafka/KafkaPersistWriter.java b/streams-contrib/streams-persist-kafka/src/main/java/org/apache/streams/kafka/KafkaPersistWriter.java
index 83032e6..40e125f 100644
--- a/streams-contrib/streams-persist-kafka/src/main/java/org/apache/streams/kafka/KafkaPersistWriter.java
+++ b/streams-contrib/streams-persist-kafka/src/main/java/org/apache/streams/kafka/KafkaPersistWriter.java
@@ -18,16 +18,15 @@
 
 package org.apache.streams.kafka;
 
-import com.fasterxml.jackson.core.JsonProcessingException;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import kafka.javaapi.producer.Producer;
-import kafka.producer.KeyedMessage;
-import kafka.producer.ProducerConfig;
 import org.apache.streams.config.ComponentConfigurator;
 import org.apache.streams.config.StreamsConfigurator;
 import org.apache.streams.core.StreamsDatum;
 import org.apache.streams.core.StreamsPersistWriter;
 import org.apache.streams.util.GuidUtils;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -36,99 +35,114 @@ import java.util.Properties;
 import java.util.Queue;
 import java.util.concurrent.ConcurrentLinkedQueue;
 
-public class KafkaPersistWriter implements StreamsPersistWriter, Serializable, Runnable {
-
-    public final static String STREAMS_ID = "KafkaPersistWriter";
-
-    private static final Logger LOGGER = LoggerFactory.getLogger(KafkaPersistWriter.class);
+import kafka.javaapi.producer.Producer;
+import kafka.producer.KeyedMessage;
+import kafka.producer.ProducerConfig;
 
-    protected volatile Queue<StreamsDatum> persistQueue;
+/**
+ * KafkaPersistWriter writes documents to kafka.
+ */
+public class KafkaPersistWriter implements StreamsPersistWriter, Serializable, Runnable {
 
-    private ObjectMapper mapper = new ObjectMapper();
+  public static final String STREAMS_ID = "KafkaPersistWriter";
 
-    private KafkaConfiguration config;
+  private static final Logger LOGGER = LoggerFactory.getLogger(KafkaPersistWriter.class);
 
-    private Producer<String, String> producer;
+  protected volatile Queue<StreamsDatum> persistQueue;
 
-    public KafkaPersistWriter() {
-        this.config = new ComponentConfigurator<>(KafkaConfiguration.class)
-          .detectConfiguration(StreamsConfigurator.getConfig().getConfig("kafka"));
-        this.persistQueue  = new ConcurrentLinkedQueue<>();
-    }
+  private ObjectMapper mapper = new ObjectMapper();
 
-    public KafkaPersistWriter(Queue<StreamsDatum> persistQueue) {
-        this.config = new ComponentConfigurator<>(KafkaConfiguration.class)
-          .detectConfiguration(StreamsConfigurator.getConfig().getConfig("kafka"));
-        this.persistQueue = persistQueue;
-    }
+  private KafkaConfiguration config;
 
-    public void setConfig(KafkaConfiguration config) {
-        this.config = config;
-    }
+  private Producer<String, String> producer;
 
-    public void start() {
-        Properties props = new Properties();
+  /**
+   * KafkaPersistWriter constructor - resolves KafkaConfiguration from JVM 'kafka'.
+   */
+  public KafkaPersistWriter() {
+    this.config = new ComponentConfigurator<>(KafkaConfiguration.class)
+        .detectConfiguration(StreamsConfigurator.getConfig().getConfig("kafka"));
+    this.persistQueue  = new ConcurrentLinkedQueue<>();
+  }
 
-        props.put("metadata.broker.list", config.getBrokerlist());
-        props.put("serializer.class", "kafka.serializer.StringEncoder");
-        props.put("partitioner.class", "org.apache.streams.kafka.StreamsPartitioner");
-        props.put("request.required.acks", "1");
+  /**
+   * KafkaPersistWriter constructor - uses supplied persistQueue.
+   */
+  public KafkaPersistWriter(Queue<StreamsDatum> persistQueue) {
+    this.config = new ComponentConfigurator<>(KafkaConfiguration.class)
+        .detectConfiguration(StreamsConfigurator.getConfig().getConfig("kafka"));
+    this.persistQueue = persistQueue;
+  }
 
-        ProducerConfig config = new ProducerConfig(props);
+  public void setConfig(KafkaConfiguration config) {
+    this.config = config;
+  }
 
-        producer = new Producer<>(config);
+  /**
+   * run persist writer thread
+   */
+  public void start() {
+    Properties props = new Properties();
 
-        new Thread(new KafkaPersistWriterTask(this)).start();
-    }
+    props.put("metadata.broker.list", config.getBrokerlist());
+    props.put("serializer.class", "kafka.serializer.StringEncoder");
+    props.put("partitioner.class", "org.apache.streams.kafka.StreamsPartitioner");
+    props.put("request.required.acks", "1");
 
-    public void stop() {
-        producer.close();
-    }
+    ProducerConfig config = new ProducerConfig(props);
 
-    public void setPersistQueue(Queue<StreamsDatum> persistQueue) {
-        this.persistQueue = persistQueue;
-    }
+    producer = new Producer<>(config);
 
-    public Queue<StreamsDatum> getPersistQueue() {
-        return this.persistQueue;
-    }
+    new Thread(new KafkaPersistWriterTask(this)).start();
+  }
 
-    @Override
-    public String getId() {
-        return STREAMS_ID;
-    }
+  public void stop() {
+    producer.close();
+  }
 
-    @Override
-    public void write(StreamsDatum entry) {
+  public void setPersistQueue(Queue<StreamsDatum> persistQueue) {
+    this.persistQueue = persistQueue;
+  }
 
-        try {
-            String text = mapper.writeValueAsString(entry);
+  public Queue<StreamsDatum> getPersistQueue() {
+    return this.persistQueue;
+  }
 
-            String hash = GuidUtils.generateGuid(text);
+  @Override
+  public String getId() {
+    return STREAMS_ID;
+  }
 
-            KeyedMessage<String, String> data = new KeyedMessage<>(config.getTopic(), hash, text);
+  @Override
+  public void write(StreamsDatum entry) {
 
-            producer.send(data);
+    try {
 
-        } catch (JsonProcessingException e) {
-            LOGGER.warn("save: {}", e);
-        }// put
-    }
+      String text = mapper.writeValueAsString(entry);
 
-    @Override
-    public void run() {
-        start();
+      String hash = GuidUtils.generateGuid(text);
 
-        // stop();
-    }
+      KeyedMessage<String, String> data = new KeyedMessage<>(config.getTopic(), hash, text);
 
-    @Override
-    public void prepare(Object configurationObject) {
-        start();
-    }
+      producer.send(data);
 
-    @Override
-    public void cleanUp() {
-        stop();
+    } catch (JsonProcessingException ex) {
+      LOGGER.warn("save: {}", ex);
     }
+  }
+
+  @Override
+  public void run() {
+    start();
+  }
+
+  @Override
+  public void prepare(Object configurationObject) {
+    start();
+  }
+
+  @Override
+  public void cleanUp() {
+    stop();
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-contrib/streams-persist-kafka/src/main/java/org/apache/streams/kafka/KafkaPersistWriterTask.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-kafka/src/main/java/org/apache/streams/kafka/KafkaPersistWriterTask.java b/streams-contrib/streams-persist-kafka/src/main/java/org/apache/streams/kafka/KafkaPersistWriterTask.java
index 5d8ee9e..dae7aa2 100644
--- a/streams-contrib/streams-persist-kafka/src/main/java/org/apache/streams/kafka/KafkaPersistWriterTask.java
+++ b/streams-contrib/streams-persist-kafka/src/main/java/org/apache/streams/kafka/KafkaPersistWriterTask.java
@@ -19,38 +19,45 @@
 package org.apache.streams.kafka;
 
 import org.apache.streams.core.StreamsDatum;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.Random;
 
+/**
+ * KafkaPersistWriterTask writes documents to kafka on behalf of
+ * @see org.apache.streams.kafka.KafkaPersistWriter
+ */
 public class KafkaPersistWriterTask implements Runnable {
 
-    private static final Logger LOGGER = LoggerFactory.getLogger(KafkaPersistWriterTask.class);
+  private static final Logger LOGGER = LoggerFactory.getLogger(KafkaPersistWriterTask.class);
 
-    private KafkaPersistWriter writer;
+  private KafkaPersistWriter writer;
 
-    public KafkaPersistWriterTask(KafkaPersistWriter writer) {
-        this.writer = writer;
-    }
+  public KafkaPersistWriterTask(KafkaPersistWriter writer) {
+    this.writer = writer;
+  }
 
-    @Override
-    public void run() {
-
-        while(true) {
-            if( writer.getPersistQueue().peek() != null ) {
-                try {
-                    StreamsDatum entry = writer.persistQueue.remove();
-                    writer.write(entry);
-                } catch (Exception e) {
-                    e.printStackTrace();
-                }
-            }
-            try {
-                Thread.sleep(new Random().nextInt(100));
-            } catch (InterruptedException e) {}
-        }
+  @Override
+  public void run() {
 
+    while (true) {
+      if ( writer.getPersistQueue().peek() != null ) {
+        try {
+          StreamsDatum entry = writer.persistQueue.remove();
+          writer.write(entry);
+        } catch (Exception ex) {
+          ex.printStackTrace();
+        }
+      }
+      try {
+        Thread.sleep(new Random().nextInt(100));
+      } catch (InterruptedException interrupt) {
+        LOGGER.trace("Interrupt", interrupt);
+      }
     }
 
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-contrib/streams-persist-kafka/src/main/java/org/apache/streams/kafka/StreamsPartitioner.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-kafka/src/main/java/org/apache/streams/kafka/StreamsPartitioner.java b/streams-contrib/streams-persist-kafka/src/main/java/org/apache/streams/kafka/StreamsPartitioner.java
deleted file mode 100644
index ebfff9a..0000000
--- a/streams-contrib/streams-persist-kafka/src/main/java/org/apache/streams/kafka/StreamsPartitioner.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.streams.kafka;
-
-/**
- * Created by sblackmon on 12/15/13.
- */
-import kafka.producer.Partitioner;
-import kafka.utils.VerifiableProperties;
-
-public class StreamsPartitioner implements Partitioner<String> {
-    public StreamsPartitioner (VerifiableProperties props) {
-
-    }
-
-    public int partition(String key, int a_numPartitions) {
-        int partition = 0;
-        int offset = key.lastIndexOf('.');
-        if (offset > 0) {
-            partition = Integer.parseInt( key.substring(offset+1)) % a_numPartitions;
-        }
-        return partition;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-contrib/streams-persist-mongo/src/main/java/org/apache/streams/mongo/MongoPersistReader.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-mongo/src/main/java/org/apache/streams/mongo/MongoPersistReader.java b/streams-contrib/streams-persist-mongo/src/main/java/org/apache/streams/mongo/MongoPersistReader.java
index ba77ff1..b6a7404 100644
--- a/streams-contrib/streams-persist-mongo/src/main/java/org/apache/streams/mongo/MongoPersistReader.java
+++ b/streams-contrib/streams-persist-mongo/src/main/java/org/apache/streams/mongo/MongoPersistReader.java
@@ -18,6 +18,14 @@
 
 package org.apache.streams.mongo;
 
+import org.apache.streams.config.ComponentConfigurator;
+import org.apache.streams.config.StreamsConfigurator;
+import org.apache.streams.core.DatumStatusCounter;
+import org.apache.streams.core.StreamsDatum;
+import org.apache.streams.core.StreamsPersistReader;
+import org.apache.streams.core.StreamsResultSet;
+import org.apache.streams.jackson.StreamsJacksonMapper;
+
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.fasterxml.jackson.databind.node.ObjectNode;
 import com.google.common.base.Strings;
@@ -30,13 +38,7 @@ import com.mongodb.DBObject;
 import com.mongodb.MongoClient;
 import com.mongodb.MongoCredential;
 import com.mongodb.ServerAddress;
-import org.apache.streams.config.ComponentConfigurator;
-import org.apache.streams.config.StreamsConfigurator;
-import org.apache.streams.core.DatumStatusCounter;
-import org.apache.streams.core.StreamsDatum;
-import org.apache.streams.core.StreamsPersistReader;
-import org.apache.streams.core.StreamsResultSet;
-import org.apache.streams.jackson.StreamsJacksonMapper;
+
 import org.joda.time.DateTime;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -52,230 +54,248 @@ import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
+/**
+ * MongoPersistReader reads documents from mongo.
+ */
 public class MongoPersistReader implements StreamsPersistReader {
 
-    public static final String STREAMS_ID = "MongoPersistReader";
+  public static final String STREAMS_ID = "MongoPersistReader";
 
-    private final static Logger LOGGER = LoggerFactory.getLogger(MongoPersistReader.class);
+  private static final Logger LOGGER = LoggerFactory.getLogger(MongoPersistReader.class);
 
-    protected volatile Queue<StreamsDatum> persistQueue;
+  protected volatile Queue<StreamsDatum> persistQueue;
 
-    private ObjectMapper mapper = StreamsJacksonMapper.getInstance();
-    private volatile AtomicLong lastWrite = new AtomicLong(System.currentTimeMillis());
+  private ObjectMapper mapper = StreamsJacksonMapper.getInstance();
+  private volatile AtomicLong lastWrite = new AtomicLong(System.currentTimeMillis());
 
-    private ExecutorService executor;
+  private ExecutorService executor;
 
-    private MongoConfiguration config;
+  private MongoConfiguration config;
 
-    protected MongoClient client;
-    protected DB db;
-    protected DBCollection collection;
+  protected MongoClient client;
+  protected DB db;
+  protected DBCollection collection;
 
-    protected DBCursor cursor;
+  protected DBCursor cursor;
 
-    protected final ReadWriteLock lock = new ReentrantReadWriteLock();
-
-    public MongoPersistReader() {
-        this.config = new ComponentConfigurator<>(MongoConfiguration.class)
-          .detectConfiguration(StreamsConfigurator.getConfig().getConfig("mongo"));
-    }
+  protected final ReadWriteLock lock = new ReentrantReadWriteLock();
 
-    public MongoPersistReader(MongoConfiguration config) {
-        this.config = config;
-    }
-
-    public MongoPersistReader(Queue<StreamsDatum> persistQueue) {
-        this.config = new ComponentConfigurator<>(MongoConfiguration.class)
-          .detectConfiguration(StreamsConfigurator.getConfig().getConfig("mongo"));
-        this.persistQueue = persistQueue;
-    }
+  /**
+   * KafkaPersistReader constructor - resolves KafkaConfiguration from JVM 'mongo'.
+   */
+  public MongoPersistReader() {
+    this.config = new ComponentConfigurator<>(MongoConfiguration.class)
+        .detectConfiguration(StreamsConfigurator.getConfig().getConfig("mongo"));
+  }
 
-    public void setPersistQueue(Queue<StreamsDatum> persistQueue) {
-        this.persistQueue = persistQueue;
-    }
+  /**
+   * KafkaPersistReader constructor - uses supplied MongoConfiguration.
+   * @param config config
+   */
+  public MongoPersistReader(MongoConfiguration config) {
+    this.config = config;
+  }
 
-    public Queue<StreamsDatum> getPersistQueue() {
-        return persistQueue;
-    }
+  /**
+   * KafkaPersistReader constructor - uses supplied persistQueue.
+   * @param persistQueue persistQueue
+   */
+  public MongoPersistReader(Queue<StreamsDatum> persistQueue) {
+    this.config = new ComponentConfigurator<>(MongoConfiguration.class)
+        .detectConfiguration(StreamsConfigurator.getConfig().getConfig("mongo"));
+    this.persistQueue = persistQueue;
+  }
 
-    public void stop() {
-    }
+  public void setPersistQueue(Queue<StreamsDatum> persistQueue) {
+    this.persistQueue = persistQueue;
+  }
 
-    @Override
-    public String getId() {
-        return STREAMS_ID;
-    }
+  public Queue<StreamsDatum> getPersistQueue() {
+    return persistQueue;
+  }
 
-    @Override
-    public void prepare(Object configurationObject) {
+  public void stop() {
+  }
 
-        connectToMongo();
+  @Override
+  public String getId() {
+    return STREAMS_ID;
+  }
 
-        if( client == null ||
-                collection == null )
-            throw new RuntimeException("Unable to connect!");
+  @Override
+  public void prepare(Object configurationObject) {
 
-        cursor = collection.find();
+    connectToMongo();
 
-        if( cursor == null || !cursor.hasNext())
-            throw new RuntimeException("Collection not present or empty!");
+    if ( client == null
+        || collection == null ) {
+      throw new RuntimeException("Unable to connect!");
+    }
+    cursor = collection.find();
 
-        persistQueue = constructQueue();
+    if ( cursor == null
+        || !cursor.hasNext()) {
+      throw new RuntimeException("Collection not present or empty!");
+    }
 
-        executor = Executors.newSingleThreadExecutor();
+    persistQueue = constructQueue();
 
-    }
+    executor = Executors.newSingleThreadExecutor();
 
-    @Override
-    public void cleanUp() {
-        stop();
-    }
+  }
 
-    protected StreamsDatum prepareDatum(DBObject dbObject) {
+  @Override
+  public void cleanUp() {
+    stop();
+  }
 
-        ObjectNode objectNode;
-        String id;
+  protected StreamsDatum prepareDatum(DBObject dbObject) {
 
-        try {
-            objectNode = mapper.readValue(dbObject.toString(), ObjectNode.class);
-            id = objectNode.get("_id").get("$oid").asText();
-            objectNode.remove("_id");
-        } catch (IOException e) {
-            LOGGER.warn("document isn't valid JSON.");
-            return null;
-        }
+    ObjectNode objectNode;
+    String id;
 
-        return new StreamsDatum(objectNode, id);
+    try {
+      objectNode = mapper.readValue(dbObject.toString(), ObjectNode.class);
+      id = objectNode.get("_id").get("$oid").asText();
+      objectNode.remove("_id");
+    } catch (IOException ex) {
+      LOGGER.warn("document isn't valid JSON.");
+      return null;
     }
 
-    private synchronized void connectToMongo() {
+    return new StreamsDatum(objectNode, id);
+  }
 
-        ServerAddress serverAddress = new ServerAddress(config.getHost(), config.getPort().intValue());
+  private synchronized void connectToMongo() {
 
-        if (!Strings.isNullOrEmpty(config.getUser()) && !Strings.isNullOrEmpty(config.getPassword())) {
-            MongoCredential credential =
-                    MongoCredential.createCredential(config.getUser(), config.getDb(), config.getPassword().toCharArray());
-            client = new MongoClient(serverAddress, Lists.newArrayList(credential));
-        } else {
-            client = new MongoClient(serverAddress);
-        }
+    ServerAddress serverAddress = new ServerAddress(config.getHost(), config.getPort().intValue());
 
-        db = client.getDB(config.getDb());
+    if (!Strings.isNullOrEmpty(config.getUser()) && !Strings.isNullOrEmpty(config.getPassword())) {
+      MongoCredential credential =
+          MongoCredential.createCredential(config.getUser(), config.getDb(), config.getPassword().toCharArray());
+      client = new MongoClient(serverAddress, Lists.newArrayList(credential));
+    } else {
+      client = new MongoClient(serverAddress);
+    }
 
-        if (!db.collectionExists(config.getCollection())) {
-            db.createCollection(config.getCollection(), null);
-        }
+    db = client.getDB(config.getDb());
 
-        collection = db.getCollection(config.getCollection());
+    if (!db.collectionExists(config.getCollection())) {
+      db.createCollection(config.getCollection(), null);
     }
 
-    @Override
-    public StreamsResultSet readAll() {
-
-        try (DBCursor cursor = collection.find()) {
-            while (cursor.hasNext()) {
-                DBObject dbObject = cursor.next();
-                StreamsDatum datum = prepareDatum(dbObject);
-                write(datum);
-            }
-        }
+    collection = db.getCollection(config.getCollection());
+  }
 
-        return readCurrent();
-    }
+  @Override
+  public StreamsResultSet readAll() {
 
-    @Override
-    public void startStream() {
+    try (DBCursor cursor = collection.find()) {
+      while (cursor.hasNext()) {
+        DBObject dbObject = cursor.next();
+        StreamsDatum datum = prepareDatum(dbObject);
+        write(datum);
+      }
+    }
 
-        LOGGER.debug("startStream");
-        MongoPersistReaderTask readerTask = new MongoPersistReaderTask(this);
-        Thread readerTaskThread = new Thread(readerTask);
-        Future future = executor.submit(readerTaskThread);
+    return readCurrent();
+  }
 
-        while( !future.isDone() && !future.isCancelled()) {
-            try {
-                Thread.sleep(1000);
-            } catch (InterruptedException ignored) {}
-        }
+  @Override
+  public void startStream() {
 
-        executor.shutdown();
+    LOGGER.debug("startStream");
+    MongoPersistReaderTask readerTask = new MongoPersistReaderTask(this);
+    Thread readerTaskThread = new Thread(readerTask);
+    Future future = executor.submit(readerTaskThread);
 
+    while ( !future.isDone() && !future.isCancelled()) {
+      try {
+        Thread.sleep(1000);
+      } catch (InterruptedException interrupt) {
+        LOGGER.trace("Interrupt", interrupt);
+      }
     }
 
-    @Override
-    public StreamsResultSet readCurrent() {
+    executor.shutdown();
 
-        StreamsResultSet current;
+  }
 
-        try {
-            lock.writeLock().lock();
-            current = new StreamsResultSet(persistQueue);
-            current.setCounter(new DatumStatusCounter());
-            persistQueue = constructQueue();
-        } finally {
-            lock.writeLock().unlock();
-        }
+  @Override
+  public StreamsResultSet readCurrent() {
 
-        return current;
-    }
+    StreamsResultSet current;
 
-    //The locking may appear to be counter intuitive but we really don't care if multiple threads offer to the queue
-    //as it is a synchronized queue.  What we do care about is that we don't want to be offering to the current reference
-    //if the queue is being replaced with a new instance
-    protected void write(StreamsDatum entry) {
-        boolean success;
-        do {
-            try {
-                lock.readLock().lock();
-                success = persistQueue.offer(entry);
-                Thread.yield();
-            } finally {
-                lock.readLock().unlock();
-            }
-        }
-        while (!success);
+    try {
+      lock.writeLock().lock();
+      current = new StreamsResultSet(persistQueue);
+      current.setCounter(new DatumStatusCounter());
+      persistQueue = constructQueue();
+    } finally {
+      lock.writeLock().unlock();
     }
 
-    @Override
-    public StreamsResultSet readNew(BigInteger sequence) {
-        return null;
+    return current;
+  }
+
+  //The locking may appear to be counter intuitive but we really don't care if multiple threads offer to the queue
+  //as it is a synchronized queue.  What we do care about is that we don't want to be offering to the current reference
+  //if the queue is being replaced with a new instance
+  protected void write(StreamsDatum entry) {
+    boolean success;
+    do {
+      try {
+        lock.readLock().lock();
+        success = persistQueue.offer(entry);
+        Thread.yield();
+      } finally {
+        lock.readLock().unlock();
+      }
     }
+    while (!success);
+  }
 
-    @Override
-    public StreamsResultSet readRange(DateTime start, DateTime end) {
-        return null;
-    }
+  @Override
+  public StreamsResultSet readNew(BigInteger sequence) {
+    return null;
+  }
 
-    @Override
-    public boolean isRunning() {
-        return !executor.isTerminated() || !executor.isShutdown();
-    }
+  @Override
+  public StreamsResultSet readRange(DateTime start, DateTime end) {
+    return null;
+  }
 
-    private Queue<StreamsDatum> constructQueue() {
-        return Queues.synchronizedQueue(new LinkedBlockingQueue<StreamsDatum>(10000));
-    }
+  @Override
+  public boolean isRunning() {
+    return !executor.isTerminated() || !executor.isShutdown();
+  }
 
-    public class MongoPersistReaderTask implements Runnable {
+  private Queue<StreamsDatum> constructQueue() {
+    return Queues.synchronizedQueue(new LinkedBlockingQueue<StreamsDatum>(10000));
+  }
 
-        private MongoPersistReader reader;
+  public class MongoPersistReaderTask implements Runnable {
 
-        public MongoPersistReaderTask(MongoPersistReader reader) {
-            this.reader = reader;
-        }
+    private MongoPersistReader reader;
 
-        @Override
-        public void run() {
+    public MongoPersistReaderTask(MongoPersistReader reader) {
+      this.reader = reader;
+    }
 
-            try {
-                while(reader.cursor.hasNext()) {
-                    DBObject dbObject = reader.cursor.next();
-                    StreamsDatum datum = reader.prepareDatum(dbObject);
-                    reader.write(datum);
-                }
-            } finally {
-                reader.cursor.close();
-            }
+    @Override
+    public void run() {
 
+      try {
+        while (reader.cursor.hasNext()) {
+          DBObject dbObject = reader.cursor.next();
+          StreamsDatum datum = reader.prepareDatum(dbObject);
+          reader.write(datum);
         }
+      } finally {
+        reader.cursor.close();
+      }
 
     }
+
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-contrib/streams-persist-mongo/src/main/java/org/apache/streams/mongo/MongoPersistWriter.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-mongo/src/main/java/org/apache/streams/mongo/MongoPersistWriter.java b/streams-contrib/streams-persist-mongo/src/main/java/org/apache/streams/mongo/MongoPersistWriter.java
index 5f6ac1f..6072f58 100644
--- a/streams-contrib/streams-persist-mongo/src/main/java/org/apache/streams/mongo/MongoPersistWriter.java
+++ b/streams-contrib/streams-persist-mongo/src/main/java/org/apache/streams/mongo/MongoPersistWriter.java
@@ -18,6 +18,12 @@
 
 package org.apache.streams.mongo;
 
+import org.apache.streams.config.ComponentConfigurator;
+import org.apache.streams.config.StreamsConfigurator;
+import org.apache.streams.core.StreamsDatum;
+import org.apache.streams.core.StreamsPersistWriter;
+import org.apache.streams.jackson.StreamsJacksonMapper;
+
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.fasterxml.jackson.databind.node.ObjectNode;
 import com.google.common.base.Strings;
@@ -29,14 +35,12 @@ import com.mongodb.MongoClient;
 import com.mongodb.MongoCredential;
 import com.mongodb.ServerAddress;
 import com.mongodb.util.JSON;
-import org.apache.streams.config.ComponentConfigurator;
-import org.apache.streams.config.StreamsConfigurator;
-import org.apache.streams.core.StreamsDatum;
-import org.apache.streams.core.StreamsPersistWriter;
-import org.apache.streams.jackson.StreamsJacksonMapper;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.Closeable;
+import java.io.Flushable;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
@@ -50,209 +54,217 @@ import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
-public class MongoPersistWriter implements StreamsPersistWriter, Runnable {
+public class MongoPersistWriter implements StreamsPersistWriter, Runnable, Flushable, Closeable {
 
-    public final static String STREAMS_ID = "MongoPersistWriter";
+  public static final String STREAMS_ID = "MongoPersistWriter";
 
-    private final static Logger LOGGER = LoggerFactory.getLogger(MongoPersistWriter.class);
+  private static final Logger LOGGER = LoggerFactory.getLogger(MongoPersistWriter.class);
 
-    private final static long MAX_WRITE_LATENCY = 1000;
+  private static final long MAX_WRITE_LATENCY = 1000;
 
-    protected volatile Queue<StreamsDatum> persistQueue;
+  protected volatile Queue<StreamsDatum> persistQueue;
 
-    private ObjectMapper mapper = StreamsJacksonMapper.getInstance();
-    private volatile AtomicLong lastWrite = new AtomicLong(System.currentTimeMillis());
-    private ScheduledExecutorService backgroundFlushTask = Executors.newSingleThreadScheduledExecutor();
+  private ObjectMapper mapper = StreamsJacksonMapper.getInstance();
+  private volatile AtomicLong lastWrite = new AtomicLong(System.currentTimeMillis());
+  private ScheduledExecutorService backgroundFlushTask = Executors.newSingleThreadScheduledExecutor();
 
-    private MongoConfiguration config;
+  private MongoConfiguration config;
 
-    protected MongoClient client;
-    protected DB db;
-    protected DBCollection collection;
+  protected MongoClient client;
+  protected DB db;
+  protected DBCollection collection;
 
-    protected List<DBObject> insertBatch = new ArrayList<>();
+  protected List<DBObject> insertBatch = new ArrayList<>();
 
-    protected final ReadWriteLock lock = new ReentrantReadWriteLock();
+  protected final ReadWriteLock lock = new ReentrantReadWriteLock();
 
-    public MongoPersistWriter() {
-        this(new ComponentConfigurator<>(MongoConfiguration.class)
-          .detectConfiguration(StreamsConfigurator.getConfig().getConfig("mongo")));
-    }
-
-    public MongoPersistWriter(MongoConfiguration config) {
-        this.config = config;
-    }
+  public MongoPersistWriter() {
+    this(new ComponentConfigurator<>(MongoConfiguration.class)
+        .detectConfiguration(StreamsConfigurator.getConfig().getConfig("mongo")));
+  }
 
-    public void setPersistQueue(Queue<StreamsDatum> persistQueue) {
-        this.persistQueue = persistQueue;
-    }
-
-    public Queue<StreamsDatum> getPersistQueue() {
-        return persistQueue;
-    }
+  public MongoPersistWriter(MongoConfiguration config) {
+    this.config = config;
+  }
 
-    @Override
-    public String getId() {
-        return STREAMS_ID;
-    }
+  public void setPersistQueue(Queue<StreamsDatum> persistQueue) {
+    this.persistQueue = persistQueue;
+  }
 
-    @Override
-    public void write(StreamsDatum streamsDatum) {
+  public Queue<StreamsDatum> getPersistQueue() {
+    return persistQueue;
+  }
 
-        DBObject dbObject = prepareObject(streamsDatum);
-        if (dbObject != null) {
-            addToBatch(dbObject);
-            flushIfNecessary();
-        }
-    }
+  @Override
+  public String getId() {
+    return STREAMS_ID;
+  }
 
-    public void flush() throws IOException {
-        try {
-            LOGGER.debug("Attempting to flush {} items to mongo", insertBatch.size());
-            lock.writeLock().lock();
-            collection.insert(insertBatch);
-            lastWrite.set(System.currentTimeMillis());
-            insertBatch = new ArrayList<>();
-        } finally {
-            lock.writeLock().unlock();
-        }
+  @Override
+  public void write(StreamsDatum streamsDatum) {
 
+    DBObject dbObject = prepareObject(streamsDatum);
+    if (dbObject != null) {
+      addToBatch(dbObject);
+      flushIfNecessary();
     }
-
-    public synchronized void close() throws IOException {
-//        client.cleanCursors(true);
-//        backgroundFlushTask.shutdownNow();
+  }
+
+  @Override
+  public void flush() throws IOException {
+    try {
+      LOGGER.debug("Attempting to flush {} items to mongo", insertBatch.size());
+      lock.writeLock().lock();
+      collection.insert(insertBatch);
+      lastWrite.set(System.currentTimeMillis());
+      insertBatch = new ArrayList<>();
+    } finally {
+      lock.writeLock().unlock();
     }
 
-    public void start() {
-        connectToMongo();
-        backgroundFlushTask.scheduleAtFixedRate(new Runnable() {
-            @Override
-            public void run() {
-                flushIfNecessary();
-            }
-        }, 0, MAX_WRITE_LATENCY * 2, TimeUnit.MILLISECONDS);
+  }
+
+  public synchronized void close() throws IOException {
+    client.close();
+    backgroundFlushTask.shutdownNow();
+  }
+
+  /**
+   * start write thread.
+   */
+  public void start() {
+    connectToMongo();
+    backgroundFlushTask.scheduleAtFixedRate(new Runnable() {
+      @Override
+      public void run() {
+        flushIfNecessary();
+      }
+    }, 0, MAX_WRITE_LATENCY * 2, TimeUnit.MILLISECONDS);
+  }
+
+  /**
+   * stop.
+   */
+  public void stop() {
+
+    try {
+      flush();
+    } catch (IOException ex) {
+      LOGGER.error("Error flushing", ex);
     }
-
-    public void stop() {
-
-        try {
-            flush();
-        } catch (IOException e) {
-            LOGGER.error("Error flushing", e);
-        }
-        try {
-            close();
-        } catch (IOException e) {
-            LOGGER.error("Error closing", e);
-        }
-        try {
-            backgroundFlushTask.shutdown();
-            // Wait a while for existing tasks to terminate
-            if (!backgroundFlushTask.awaitTermination(15, TimeUnit.SECONDS)) {
-                backgroundFlushTask.shutdownNow(); // Cancel currently executing tasks
-                // Wait a while for tasks to respond to being cancelled
-                if (!backgroundFlushTask.awaitTermination(15, TimeUnit.SECONDS)) {
-                    LOGGER.error("Stream did not terminate");
-                }
-            }
-        } catch (InterruptedException ie) {
-            // (Re-)Cancel if current thread also interrupted
-            backgroundFlushTask.shutdownNow();
-            // Preserve interrupt status
-            Thread.currentThread().interrupt();
-        }
-
+    try {
+      close();
+    } catch (IOException ex) {
+      LOGGER.error("Error closing", ex);
     }
-
-    @Override
-    public void run() {
-
-        while (true) {
-            if (persistQueue.peek() != null) {
-                try {
-                    StreamsDatum entry = persistQueue.remove();
-                    write(entry);
-                } catch (Exception e) {
-                    e.printStackTrace();
-                }
-            }
-            try {
-                Thread.sleep(new Random().nextInt(1));
-            } catch (InterruptedException ignored) {
-            }
+    try {
+      backgroundFlushTask.shutdown();
+      // Wait a while for existing tasks to terminate
+      if (!backgroundFlushTask.awaitTermination(15, TimeUnit.SECONDS)) {
+        backgroundFlushTask.shutdownNow(); // Cancel currently executing tasks
+        // Wait a while for tasks to respond to being cancelled
+        if (!backgroundFlushTask.awaitTermination(15, TimeUnit.SECONDS)) {
+          LOGGER.error("Stream did not terminate");
         }
-
-    }
-
-    @Override
-    public void prepare(Object configurationObject) {
-        this.persistQueue = new ConcurrentLinkedQueue<>();
-        start();
+      }
+    } catch (InterruptedException ie) {
+      // (Re-)Cancel if current thread also interrupted
+      backgroundFlushTask.shutdownNow();
+      // Preserve interrupt status
+      Thread.currentThread().interrupt();
     }
 
-    @Override
-    public void cleanUp() {
-        stop();
-    }
+  }
 
-    protected void flushIfNecessary() {
-        long lastLatency = System.currentTimeMillis() - lastWrite.get();
-        //Flush iff the size > 0 AND the size is divisible by 100 or the time between now and the last flush is greater
-        //than the maximum desired latency
-        if (insertBatch.size() > 0 && (insertBatch.size() % 100 == 0 || lastLatency > MAX_WRITE_LATENCY)) {
-            try {
-                flush();
-            } catch (IOException e) {
-                LOGGER.error("Error writing to Mongo", e);
-            }
-        }
-    }
+  @Override
+  public void run() {
 
-    protected void addToBatch(DBObject dbObject) {
+    while (true) {
+      if (persistQueue.peek() != null) {
         try {
-            lock.readLock().lock();
-            insertBatch.add(dbObject);
-        } finally {
-            lock.readLock().unlock();
+          StreamsDatum entry = persistQueue.remove();
+          write(entry);
+        } catch (Exception ex) {
+          ex.printStackTrace();
         }
+      }
+      try {
+        Thread.sleep(new Random().nextInt(1));
+      } catch (InterruptedException interrupt) {
+        LOGGER.trace("Interrupt", interrupt);
+      }
     }
 
-    protected DBObject prepareObject(StreamsDatum streamsDatum) {
-        DBObject dbObject = null;
-        if (streamsDatum.getDocument() instanceof String) {
-            dbObject = (DBObject) JSON.parse((String) streamsDatum.getDocument());
-        } else {
-            try {
-                ObjectNode node = mapper.valueToTree(streamsDatum.getDocument());
-                dbObject = (DBObject) JSON.parse(node.toString());
-            } catch (Exception e) {
-                LOGGER.error("Unsupported type: " + streamsDatum.getDocument().getClass(), e);
-            }
-        }
-        return dbObject;
+  }
+
+  @Override
+  public void prepare(Object configurationObject) {
+    this.persistQueue = new ConcurrentLinkedQueue<>();
+    start();
+  }
+
+  @Override
+  public void cleanUp() {
+    stop();
+  }
+
+  protected void flushIfNecessary() {
+    long lastLatency = System.currentTimeMillis() - lastWrite.get();
+    //Flush iff the size > 0 AND the size is divisible by 100 or the time between now and the last flush is greater
+    //than the maximum desired latency
+    if (insertBatch.size() > 0 && (insertBatch.size() % 100 == 0 || lastLatency > MAX_WRITE_LATENCY)) {
+      try {
+        flush();
+      } catch (IOException ex) {
+        LOGGER.error("Error writing to Mongo", ex);
+      }
     }
+  }
+
+  protected void addToBatch(DBObject dbObject) {
+    try {
+      lock.readLock().lock();
+      insertBatch.add(dbObject);
+    } finally {
+      lock.readLock().unlock();
+    }
+  }
+
+  protected DBObject prepareObject(StreamsDatum streamsDatum) {
+    DBObject dbObject = null;
+    if (streamsDatum.getDocument() instanceof String) {
+      dbObject = (DBObject) JSON.parse((String) streamsDatum.getDocument());
+    } else {
+      try {
+        ObjectNode node = mapper.valueToTree(streamsDatum.getDocument());
+        dbObject = (DBObject) JSON.parse(node.toString());
+      } catch (Exception ex) {
+        LOGGER.error("Unsupported type: " + streamsDatum.getDocument().getClass(), ex);
+      }
+    }
+    return dbObject;
+  }
 
-    private synchronized void connectToMongo() {
-
-        ServerAddress serverAddress = new ServerAddress(config.getHost(), config.getPort().intValue());
+  private synchronized void connectToMongo() {
 
-        if (!Strings.isNullOrEmpty(config.getUser()) && !Strings.isNullOrEmpty(config.getPassword())) {
-            MongoCredential credential =
-                    MongoCredential.createCredential(config.getUser(), config.getDb(), config.getPassword().toCharArray());
-            client = new MongoClient(serverAddress, Lists.newArrayList(credential));
-        } else {
-            client = new MongoClient(serverAddress);
-        }
+    ServerAddress serverAddress = new ServerAddress(config.getHost(), config.getPort().intValue());
 
-        db = client.getDB(config.getDb());
+    if (!Strings.isNullOrEmpty(config.getUser()) && !Strings.isNullOrEmpty(config.getPassword())) {
+      MongoCredential credential =
+          MongoCredential.createCredential(config.getUser(), config.getDb(), config.getPassword().toCharArray());
+      client = new MongoClient(serverAddress, Lists.newArrayList(credential));
+    } else {
+      client = new MongoClient(serverAddress);
+    }
 
-        if (!db.collectionExists(config.getCollection())) {
-            db.createCollection(config.getCollection(), null);
-        }
+    db = client.getDB(config.getDb());
 
-        collection = db.getCollection(config.getCollection());
+    if (!db.collectionExists(config.getCollection())) {
+      db.createCollection(config.getCollection(), null);
     }
 
+    collection = db.getCollection(config.getCollection());
+  }
+
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-contrib/streams-persist-mongo/src/test/java/org/apache/streams/mongo/test/MongoPersistIT.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-mongo/src/test/java/org/apache/streams/mongo/test/MongoPersistIT.java b/streams-contrib/streams-persist-mongo/src/test/java/org/apache/streams/mongo/test/MongoPersistIT.java
index 18f5a62..2a2e170 100644
--- a/streams-contrib/streams-persist-mongo/src/test/java/org/apache/streams/mongo/test/MongoPersistIT.java
+++ b/streams-contrib/streams-persist-mongo/src/test/java/org/apache/streams/mongo/test/MongoPersistIT.java
@@ -18,12 +18,6 @@
 
 package org.apache.streams.mongo.test;
 
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.typesafe.config.Config;
-import com.typesafe.config.ConfigFactory;
-import com.typesafe.config.ConfigParseOptions;
-import org.apache.commons.io.Charsets;
-import org.apache.commons.io.IOUtils;
 import org.apache.streams.config.ComponentConfigurator;
 import org.apache.streams.core.StreamsDatum;
 import org.apache.streams.core.StreamsResultSet;
@@ -32,6 +26,14 @@ import org.apache.streams.mongo.MongoConfiguration;
 import org.apache.streams.mongo.MongoPersistReader;
 import org.apache.streams.mongo.MongoPersistWriter;
 import org.apache.streams.pojo.json.Activity;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+import com.typesafe.config.ConfigParseOptions;
+
+import org.apache.commons.io.Charsets;
+import org.apache.commons.io.IOUtils;
 import org.junit.Before;
 import org.junit.Test;
 import org.slf4j.Logger;
@@ -48,64 +50,64 @@ import static org.junit.Assert.assertEquals;
  */
 public class MongoPersistIT {
 
-    private final static Logger LOGGER = LoggerFactory.getLogger(MongoPersistIT.class);
+  private static final Logger LOGGER = LoggerFactory.getLogger(MongoPersistIT.class);
 
-    ObjectMapper MAPPER = StreamsJacksonMapper.getInstance();
+  ObjectMapper MAPPER = StreamsJacksonMapper.getInstance();
 
-    MongoConfiguration testConfiguration;
+  MongoConfiguration testConfiguration;
 
-    int count = 0;
+  int count = 0;
 
-    @Before
-    public void setup() throws Exception {
+  @Before
+  public void setup() throws Exception {
 
-        Config reference  = ConfigFactory.load();
-        File conf_file = new File("target/test-classes/MongoPersistIT.conf");
-        assert(conf_file.exists());
-        Config testResourceConfig  = ConfigFactory.parseFileAnySyntax(conf_file, ConfigParseOptions.defaults().setAllowMissing(false));
-        Config typesafe  = testResourceConfig.withFallback(reference).resolve();
-        testConfiguration = new ComponentConfigurator<>(MongoConfiguration.class).detectConfiguration(typesafe, "mongo");
+    Config reference  = ConfigFactory.load();
+    File conf_file = new File("target/test-classes/MongoPersistIT.conf");
+    assert(conf_file.exists());
+    Config testResourceConfig  = ConfigFactory.parseFileAnySyntax(conf_file, ConfigParseOptions.defaults().setAllowMissing(false));
+    Config typesafe  = testResourceConfig.withFallback(reference).resolve();
+    testConfiguration = new ComponentConfigurator<>(MongoConfiguration.class).detectConfiguration(typesafe, "mongo");
 
-    }
+  }
 
-    @Test
-    public void testMongoPersist() throws Exception {
+  @Test
+  public void testMongoPersist() throws Exception {
 
-        MongoPersistWriter writer = new MongoPersistWriter(testConfiguration);
+    MongoPersistWriter writer = new MongoPersistWriter(testConfiguration);
 
-        writer.prepare(null);
+    writer.prepare(null);
 
-        InputStream testActivityFolderStream = MongoPersistIT.class.getClassLoader()
-                .getResourceAsStream("activities");
-        List<String> files = IOUtils.readLines(testActivityFolderStream, Charsets.UTF_8);
+    InputStream testActivityFolderStream = MongoPersistIT.class.getClassLoader()
+        .getResourceAsStream("activities");
+    List<String> files = IOUtils.readLines(testActivityFolderStream, Charsets.UTF_8);
 
-        for( String file : files) {
-            LOGGER.info("File: " + file );
-            InputStream testActivityFileStream = MongoPersistIT.class.getClassLoader()
-                    .getResourceAsStream("activities/" + file);
-            Activity activity = MAPPER.readValue(testActivityFileStream, Activity.class);
-            activity.getAdditionalProperties().remove("$license");
-            StreamsDatum datum = new StreamsDatum(activity, activity.getVerb());
-            writer.write( datum );
-            LOGGER.info("Wrote: " + activity.getVerb() );
-            count++;
-        }
+    for( String file : files) {
+      LOGGER.info("File: " + file );
+      InputStream testActivityFileStream = MongoPersistIT.class.getClassLoader()
+          .getResourceAsStream("activities/" + file);
+      Activity activity = MAPPER.readValue(testActivityFileStream, Activity.class);
+      activity.getAdditionalProperties().remove("$license");
+      StreamsDatum datum = new StreamsDatum(activity, activity.getVerb());
+      writer.write( datum );
+      LOGGER.info("Wrote: " + activity.getVerb() );
+      count++;
+    }
 
-        LOGGER.info("Total Written: {}", count );
+    LOGGER.info("Total Written: {}", count );
 
-        assertEquals( 89, count );
+    assertEquals( 89, count );
 
-        writer.cleanUp();
+    writer.cleanUp();
 
-        MongoPersistReader reader = new MongoPersistReader(testConfiguration);
+    MongoPersistReader reader = new MongoPersistReader(testConfiguration);
 
-        reader.prepare(null);
+    reader.prepare(null);
 
-        StreamsResultSet resultSet = reader.readAll();
+    StreamsResultSet resultSet = reader.readAll();
 
-        LOGGER.info("Total Read: {}", resultSet.size() );
+    LOGGER.info("Total Read: {}", resultSet.size() );
 
-        assertEquals( 89, resultSet.size() );
+    assertEquals( 89, resultSet.size() );
 
-    }
+  }
 }