You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streams.apache.org by sb...@apache.org on 2016/11/25 20:24:44 UTC

[03/42] incubator-streams git commit: STREAMS-440: custom checkstyle.xml, address compliance

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-runtimes/streams-runtime-storm/pom.xml
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-storm/pom.xml b/streams-runtimes/streams-runtime-storm/pom.xml
deleted file mode 100644
index ded3efc..0000000
--- a/streams-runtimes/streams-runtime-storm/pom.xml
+++ /dev/null
@@ -1,124 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
-  ~ Licensed to the Apache Software Foundation (ASF) under one
-  ~ or more contributor license agreements.  See the NOTICE file
-  ~ distributed with this work for additional information
-  ~ regarding copyright ownership.  The ASF licenses this file
-  ~ to you under the Apache License, Version 2.0 (the
-  ~ "License"); you may not use this file except in compliance
-  ~
-  ~   http://www.apache.org/licenses/LICENSE-2.0
-  ~
-  ~ Unless required by applicable law or agreed to in writing,
-  ~ software distributed under the License is distributed on an
-  ~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-  ~ KIND, either express or implied.  See the License for the
-  ~ specific language governing permissions and limitations
-  ~ under the License.
-  -->
-
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
-    <modelVersion>4.0.0</modelVersion>
-    <parent>
-        <artifactId>streams-runtimes</artifactId>
-        <groupId>org.apache.streams</groupId>
-        <version>0.4-incubating-SNAPSHOT</version>
-    </parent>
-    <artifactId>streams-runtime-storm</artifactId>
-    <name>${project.artifactId}</name>
-    <description>Apache Streams Runtimes</description>
-
-    <properties>
-        <storm.version>0.9.1-incubating</storm.version>
-        <scala.version>2.9.2</scala.version>
-        <zkclient.version>0.4</zkclient.version>
-    </properties>
-
-    <dependencies>
-        <dependency>
-            <groupId>org.apache.streams</groupId>
-            <artifactId>streams-config</artifactId>
-            <version>${project.version}</version>
-        </dependency>
-        <dependency>
-            <groupId>org.apache.streams</groupId>
-            <artifactId>streams-core</artifactId>
-            <version>${project.version}</version>
-        </dependency>
-        <dependency>
-            <groupId>org.apache.streams</groupId>
-            <artifactId>streams-util</artifactId>
-            <version>${project.version}</version>
-        </dependency>
-        <dependency>
-            <groupId>org.apache.commons</groupId>
-            <artifactId>commons-lang3</artifactId>
-        </dependency>
-        <dependency>
-            <groupId>org.apache.commons</groupId>
-            <artifactId>commons-collections4</artifactId>
-            <version>4.0</version>
-        </dependency>
-        <dependency>
-            <groupId>org.apache.storm</groupId>
-            <artifactId>storm-core</artifactId>
-            <version>${storm.version}</version>
-            <scope>provided</scope>
-            <exclusions>
-                <exclusion>
-                    <groupId>commons-logging</groupId>
-                    <artifactId>commons-logging</artifactId>
-                </exclusion>
-            </exclusions>
-        </dependency>
-        <dependency>
-            <groupId>org.slf4j</groupId>
-            <artifactId>jcl-over-slf4j</artifactId>
-        </dependency>
-        <dependency>
-            <groupId>org.scala-lang</groupId>
-            <artifactId>scala-library</artifactId>
-            <version>${scala.version}</version>
-            <scope>compile</scope>
-            <type>jar</type>
-        </dependency>
-        <dependency>
-            <groupId>com.101tec</groupId>
-            <artifactId>zkclient</artifactId>
-            <version>${zkclient.version}</version>
-            <scope>compile</scope>
-            <exclusions>
-                <exclusion>
-                    <groupId>log4j</groupId>
-                    <artifactId>log4j</artifactId>
-                </exclusion>
-                <exclusion>
-                    <groupId>org.slf4j</groupId>
-                    <artifactId>slf4j-log4j12</artifactId>
-                </exclusion>
-            </exclusions>
-        </dependency>
-        <dependency>
-            <groupId>org.apache.streams</groupId>
-            <artifactId>streams-testing</artifactId>
-            <version>${project.version}</version>
-            <scope>test</scope>
-            <type>test-jar</type>
-        </dependency>
-    </dependencies>
-
-    <build>
-        <sourceDirectory>src/main/java</sourceDirectory>
-        <testSourceDirectory>src/test/java</testSourceDirectory>
-        <resources>
-            <resource>
-                <directory>src/main/resources</directory>
-            </resource>
-        </resources>
-        <testResources>
-            <testResource>
-                <directory>src/test/resources</directory>
-            </testResource>
-        </testResources>
-    </build>
-</project>

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-schemas/streams-schema-activitystreams/src/test/java/org/w3c/activitystreams/test/SchemaValidationTest.java
----------------------------------------------------------------------
diff --git a/streams-schemas/streams-schema-activitystreams/src/test/java/org/w3c/activitystreams/test/SchemaValidationTest.java b/streams-schemas/streams-schema-activitystreams/src/test/java/org/w3c/activitystreams/test/SchemaValidationTest.java
index 8f22450..6344c3c 100644
--- a/streams-schemas/streams-schema-activitystreams/src/test/java/org/w3c/activitystreams/test/SchemaValidationTest.java
+++ b/streams-schemas/streams-schema-activitystreams/src/test/java/org/w3c/activitystreams/test/SchemaValidationTest.java
@@ -40,47 +40,50 @@ import java.util.Set;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.core.Is.is;
 
+/**
+ * Test validity of documents vs schemas.
+ */
 public class SchemaValidationTest {
 
-    private final static Logger LOGGER = LoggerFactory.getLogger(SchemaValidationTest.class);
+  private static final Logger LOGGER = LoggerFactory.getLogger(SchemaValidationTest.class);
 
-    private final static ObjectMapper MAPPER = new ObjectMapper();
+  private static final ObjectMapper MAPPER = new ObjectMapper();
 
-    /**
-     * Tests that activities matching core-ex* can be parsed by apache streams
-     *
-     * @throws Exception
-     */
-    @Test
-    public void validateToSchema() throws Exception {
+  /**
+   * Tests that activities matching core-ex* can be parsed by apache streams.
+   *
+   * @throws Exception Test Exception
+   */
+  @Test
+  public void testValidateToSchema() throws Exception {
 
-        JsonSchemaFactory factory = new JsonSchemaFactory();
+    JsonSchemaFactory factory = new JsonSchemaFactory();
 
-        InputStream testActivityFolderStream = SchemaValidationTest.class.getClassLoader()
-                .getResourceAsStream("activities");
-        List<String> files = IOUtils.readLines(testActivityFolderStream, Charsets.UTF_8);
+    InputStream testActivityFolderStream = SchemaValidationTest.class.getClassLoader()
+        .getResourceAsStream("activities");
+    List<String> files = IOUtils.readLines(testActivityFolderStream, Charsets.UTF_8);
 
-        for (String file : files) {
-            if( !file.startsWith(".") ) {
+    for (String file : files) {
+      if ( !file.startsWith(".") ) {
 
-                LOGGER.info("Test File: activities/" + file);
-                String testFileString = new String(Files.readAllBytes(Paths.get("target/test-classes/activities/" + file)));
-                LOGGER.info("Test Document JSON: " + testFileString);
-                JsonNode testNode = MAPPER.readValue(testFileString, ObjectNode.class);
-                LOGGER.info("Test Document Object:" + testNode);
-                LOGGER.info("Test Schema File: " + "target/classes/verbs/" + file);
-                String testSchemaString = new String(Files.readAllBytes(Paths.get("target/classes/verbs/" + file)));
-                LOGGER.info("Test Schema JSON: " + testSchemaString);
-                JsonNode testSchemaNode = MAPPER.readValue(testFileString, ObjectNode.class);
-                LOGGER.info("Test Schema Object:" + testSchemaNode);
-                JsonSchema testSchema = factory.getSchema(testSchemaNode);
-                LOGGER.info("Test Schema:" + testSchema);
+        LOGGER.info("Test File: activities/" + file);
+        String testFileString = new String(Files.readAllBytes(Paths.get("target/test-classes/activities/" + file)));
+        LOGGER.info("Test Document JSON: " + testFileString);
+        JsonNode testNode = MAPPER.readValue(testFileString, ObjectNode.class);
+        LOGGER.info("Test Document Object:" + testNode);
+        LOGGER.info("Test Schema File: " + "target/classes/verbs/" + file);
+        String testSchemaString = new String(Files.readAllBytes(Paths.get("target/classes/verbs/" + file)));
+        LOGGER.info("Test Schema JSON: " + testSchemaString);
+        JsonNode testSchemaNode = MAPPER.readValue(testFileString, ObjectNode.class);
+        LOGGER.info("Test Schema Object:" + testSchemaNode);
+        JsonSchema testSchema = factory.getSchema(testSchemaNode);
+        LOGGER.info("Test Schema:" + testSchema);
 
-                Set<ValidationMessage> errors = testSchema.validate(testNode);
-                assertThat(errors.size(), is(0));
+        Set<ValidationMessage> errors = testSchema.validate(testNode);
+        assertThat(errors.size(), is(0));
 
-            }
-        }
+      }
     }
+  }
 
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-schemas/streams-schema-activitystreams2/src/test/java/org/w3c/activitystreams/test/ExamplesSerDeIT.java
----------------------------------------------------------------------
diff --git a/streams-schemas/streams-schema-activitystreams2/src/test/java/org/w3c/activitystreams/test/ExamplesSerDeIT.java b/streams-schemas/streams-schema-activitystreams2/src/test/java/org/w3c/activitystreams/test/ExamplesSerDeIT.java
index b1b5824..8500efd 100644
--- a/streams-schemas/streams-schema-activitystreams2/src/test/java/org/w3c/activitystreams/test/ExamplesSerDeIT.java
+++ b/streams-schemas/streams-schema-activitystreams2/src/test/java/org/w3c/activitystreams/test/ExamplesSerDeIT.java
@@ -33,103 +33,106 @@ import java.nio.file.Files;
 import java.nio.file.Paths;
 import java.util.List;
 
+/**
+ * Tests that activities matching core-ex* can be parsed by apache streams.
+ */
 public class ExamplesSerDeIT {
 
-    private final static Logger LOGGER = LoggerFactory.getLogger(ExamplesSerDeIT.class);
-
-    private final static ObjectMapper MAPPER = new ObjectMapper();
-
-    /**
-     * Tests that activities matching core-ex* can be parsed by apache streams
-     *
-     * @throws Exception
-     */
-    @Test
-    public void testCoreSerDe() throws Exception {
-
-        InputStream testActivityFolderStream = ExamplesSerDeIT.class.getClassLoader()
-                .getResourceAsStream("w3c/activitystreams-master/test");
-        List<String> files = IOUtils.readLines(testActivityFolderStream, Charsets.UTF_8);
-
-        for (String file : files) {
-            if( !file.startsWith(".") && file.contains("core-ex") ) {
-                LOGGER.info("File: activitystreams-master/test/" + file);
-                String testFileString = new String(Files.readAllBytes(Paths.get("target/test-classes/w3c/activitystreams-master/test/" + file)));
-                LOGGER.info("Content: " + testFileString);
-                ObjectNode testFileObjectNode = MAPPER.readValue(testFileString, ObjectNode.class);
-                LOGGER.info("Object:" + testFileObjectNode);
-            }
-        }
+  private static final Logger LOGGER = LoggerFactory.getLogger(ExamplesSerDeIT.class);
+
+  private static final ObjectMapper MAPPER = new ObjectMapper();
+
+  /**
+   * Tests that activities matching core-ex* can be parsed by apache streams.
+   *
+   * @throws Exception test exception
+   */
+  @Test
+  public void testCoreSerDe() throws Exception {
+
+    InputStream testActivityFolderStream = ExamplesSerDeIT.class.getClassLoader()
+        .getResourceAsStream("w3c/activitystreams-master/test");
+    List<String> files = IOUtils.readLines(testActivityFolderStream, Charsets.UTF_8);
+
+    for (String file : files) {
+      if ( !file.startsWith(".") && file.contains("core-ex") ) {
+        LOGGER.info("File: activitystreams-master/test/" + file);
+        String testFileString = new String(Files.readAllBytes(Paths.get("target/test-classes/w3c/activitystreams-master/test/" + file)));
+        LOGGER.info("Content: " + testFileString);
+        ObjectNode testFileObjectNode = MAPPER.readValue(testFileString, ObjectNode.class);
+        LOGGER.info("Object:" + testFileObjectNode);
+      }
     }
-
-    /**
-     * Tests that activities matching simple* can be parsed by apache streams
-     *
-     * @throws Exception
-     */
-    @Test
-    public void testSimpleSerDe() throws Exception {
-
-        InputStream testActivityFolderStream = ExamplesSerDeIT.class.getClassLoader()
-                .getResourceAsStream("w3c/activitystreams-master/test");
-        List<String> files = IOUtils.readLines(testActivityFolderStream, Charsets.UTF_8);
-
-        for (String file : files) {
-            if( !file.startsWith(".") && file.contains("simple") ) {
-                LOGGER.info("File: activitystreams-master/test/" + file);
-                String testFileString = new String(Files.readAllBytes(Paths.get("target/test-classes/w3c/activitystreams-master/test/" + file)));
-                LOGGER.info("Content: " + testFileString);
-                ObjectNode testFileObjectNode = MAPPER.readValue(testFileString, ObjectNode.class);
-                LOGGER.info("Object:" + testFileObjectNode);
-            }
-        }
+  }
+
+  /**
+   * Tests that activities matching simple* can be parsed by apache streams.
+   *
+   * @throws Exception test exception
+   */
+  @Test
+  public void testSimpleSerDe() throws Exception {
+
+    InputStream testActivityFolderStream = ExamplesSerDeIT.class.getClassLoader()
+        .getResourceAsStream("w3c/activitystreams-master/test");
+    List<String> files = IOUtils.readLines(testActivityFolderStream, Charsets.UTF_8);
+
+    for (String file : files) {
+      if ( !file.startsWith(".") && file.contains("simple") ) {
+        LOGGER.info("File: activitystreams-master/test/" + file);
+        String testFileString = new String(Files.readAllBytes(Paths.get("target/test-classes/w3c/activitystreams-master/test/" + file)));
+        LOGGER.info("Content: " + testFileString);
+        ObjectNode testFileObjectNode = MAPPER.readValue(testFileString, ObjectNode.class);
+        LOGGER.info("Object:" + testFileObjectNode);
+      }
     }
-
-    /**
-     * Tests that activities matching vocabulary-ex* can be parsed by apache streams
-     *
-     * @throws Exception
-     */
-    @Ignore
-    @Test
-    public void testVocabularySerDe() throws Exception {
-
-        InputStream testActivityFolderStream = ExamplesSerDeIT.class.getClassLoader()
-                .getResourceAsStream("w3c/activitystreams-master/test");
-        List<String> files = IOUtils.readLines(testActivityFolderStream, Charsets.UTF_8);
-
-        for (String file : files) {
-            if( !file.startsWith(".") && file.contains("vocabulary-ex") ) {
-                LOGGER.info("File: activitystreams-master/test/" + file);
-                String testFileString = new String(Files.readAllBytes(Paths.get("target/test-classes/w3c/activitystreams-master/test/" + file)));
-                LOGGER.info("Content: " + testFileString);
-                ObjectNode testFileObjectNode = MAPPER.readValue(testFileString, ObjectNode.class);
-                LOGGER.info("Object:" + testFileObjectNode);
-            }
-        }
+  }
+
+  /**
+   * Tests that activities matching vocabulary-ex* can be parsed by apache streams.
+   *
+   * @throws Exception test exception
+   */
+  @Ignore
+  @Test
+  public void testVocabularySerDe() throws Exception {
+
+    InputStream testActivityFolderStream = ExamplesSerDeIT.class.getClassLoader()
+        .getResourceAsStream("w3c/activitystreams-master/test");
+    List<String> files = IOUtils.readLines(testActivityFolderStream, Charsets.UTF_8);
+
+    for (String file : files) {
+      if ( !file.startsWith(".") && file.contains("vocabulary-ex") ) {
+        LOGGER.info("File: activitystreams-master/test/" + file);
+        String testFileString = new String(Files.readAllBytes(Paths.get("target/test-classes/w3c/activitystreams-master/test/" + file)));
+        LOGGER.info("Content: " + testFileString);
+        ObjectNode testFileObjectNode = MAPPER.readValue(testFileString, ObjectNode.class);
+        LOGGER.info("Object:" + testFileObjectNode);
+      }
     }
-
-    /**
-     * Tests that activities expect to fail cannot be parsed by apache streams
-     *
-     * @throws Exception
-     */
-    @Ignore
-    @Test
-    public void testFailSerDe() throws Exception {
-
-        InputStream testActivityFolderStream = ExamplesSerDeIT.class.getClassLoader()
-                .getResourceAsStream("w3c/activitystreams-master/test/fail");
-        List<String> files = IOUtils.readLines(testActivityFolderStream, Charsets.UTF_8);
-
-        for (String file : files) {
-            if( !file.startsWith(".") && file.contains("vocabulary-ex") ) {
-                LOGGER.info("File: activitystreams-master/test/fail/" + file);
-                String testFileString = new String(Files.readAllBytes(Paths.get("target/test-classes/w3c/activitystreams-master/test/" + file)));
-                LOGGER.info("Content: " + testFileString);
-                ObjectNode testFileObjectNode = MAPPER.readValue(testFileString, ObjectNode.class);
-                LOGGER.info("Object:" + testFileObjectNode);
-            }
-        }
+  }
+
+  /**
+   * Tests that activities expect to fail cannot be parsed by apache streams.
+   *
+   * @throws Exception test exception
+   */
+  @Ignore
+  @Test
+  public void testFailSerDe() throws Exception {
+
+    InputStream testActivityFolderStream = ExamplesSerDeIT.class.getClassLoader()
+        .getResourceAsStream("w3c/activitystreams-master/test/fail");
+    List<String> files = IOUtils.readLines(testActivityFolderStream, Charsets.UTF_8);
+
+    for (String file : files) {
+      if ( !file.startsWith(".") && file.contains("vocabulary-ex") ) {
+        LOGGER.info("File: activitystreams-master/test/fail/" + file);
+        String testFileString = new String(Files.readAllBytes(Paths.get("target/test-classes/w3c/activitystreams-master/test/" + file)));
+        LOGGER.info("Content: " + testFileString);
+        ObjectNode testFileObjectNode = MAPPER.readValue(testFileString, ObjectNode.class);
+        LOGGER.info("Object:" + testFileObjectNode);
+      }
     }
+  }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-util/src/main/java/org/apache/streams/util/ComponentUtils.java
----------------------------------------------------------------------
diff --git a/streams-util/src/main/java/org/apache/streams/util/ComponentUtils.java b/streams-util/src/main/java/org/apache/streams/util/ComponentUtils.java
index 6037f28..514c851 100644
--- a/streams-util/src/main/java/org/apache/streams/util/ComponentUtils.java
+++ b/streams-util/src/main/java/org/apache/streams/util/ComponentUtils.java
@@ -15,126 +15,116 @@
  * specific language governing permissions and limitations
  * under the License.
  */
+
 package org.apache.streams.util;
 
 import org.apache.commons.lang3.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import javax.management.*;
 import java.lang.management.ManagementFactory;
 import java.util.Queue;
 import java.util.Set;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.TimeUnit;
+import javax.management.InstanceAlreadyExistsException;
+import javax.management.MBeanRegistrationException;
+import javax.management.MBeanServer;
+import javax.management.MalformedObjectNameException;
+import javax.management.NotCompliantMBeanException;
+import javax.management.ObjectName;
 
 /**
  * Common utilities for Streams components.
  */
 public class ComponentUtils {
-    private static final Logger LOGGER = LoggerFactory.getLogger(ComponentUtils.class);
+  private static final Logger LOGGER = LoggerFactory.getLogger(ComponentUtils.class);
 
-    /**
-     * Certain types of queues will fail to {@link java.util.Queue#offer(Object)} an item due to many factors
-     * depending on the type of queue. <code>offerUntilSuccess</code> will not return until the item has been
-     * successfully queued onto the desired queue
-     * @param entry item to queue
-     * @param queue queue to add the entry to
-     * @param <T>
-     */
-    public static <T> void offerUntilSuccess(T entry, Queue<T> queue) {
-        boolean success;
-        do {
-            success = queue.offer(entry);
-            Thread.yield();
-        }
-        while( !success );
+  /**
+   * Certain types of queues will fail to {@link java.util.Queue#offer(Object)} an item due to many factors
+   * depending on the type of queue. <code>offerUntilSuccess</code> will not return until the item has been
+   * successfully queued onto the desired queue
+   * @param entry item to queue
+   * @param queue queue to add the entry to
+   * @param <T> type
+   */
+  public static <T> void offerUntilSuccess(T entry, Queue<T> queue) {
+    boolean success;
+    do {
+      success = queue.offer(entry);
+      Thread.yield();
     }
+    while ( !success );
+  }
 
-    /**
-     * Certain types of queues will return null when calling {@link java.util.Queue#poll()} due to many factors depending
-     * on the type of queue.  <code>pollWhileNotEmpty</code> will poll the queue until an item from the queue is returned
-     * or the queue is empty.  If the queue is empty it will return NULL.
-     * @param queue
-     * @param <T>
-     * @return
-     */
-    public static <T> T pollWhileNotEmpty(Queue<T> queue) {
-        T item = queue.poll();
-        while(!queue.isEmpty() && item == null) {
-            Thread.yield();
-            item = queue.poll();
-        }
-        return item;
+  /**
+   * Certain types of queues will return null when calling {@link java.util.Queue#poll()} due to many factors depending
+   * on the type of queue.  <code>pollWhileNotEmpty</code> will poll the queue until an item from the queue is returned
+   * or the queue is empty.  If the queue is empty it will return NULL.
+   * @param queue queue to read the entry from
+   * @param <T> type
+   * @return result
+   */
+  public static <T> T pollWhileNotEmpty(Queue<T> queue) {
+    T item = queue.poll();
+    while (!queue.isEmpty() && item == null) {
+      Thread.yield();
+      item = queue.poll();
     }
+    return item;
+  }
 
-
-    public static String pollUntilStringNotEmpty(Queue queue) {
-
-        String result = null;
-        do {
-            synchronized( ComponentUtils.class ) {
-                try {
-                    result = (String) queue.remove();
-                } catch( Exception e ) {}
-            }
-            Thread.yield();
+  /**
+   * Attempts to safely {@link java.util.concurrent.ExecutorService#shutdown()}
+   * and {@link java.util.concurrent.ExecutorService#awaitTermination(long, java.util.concurrent.TimeUnit)}
+   * of an {@link java.util.concurrent.ExecutorService}.
+   * @param stream service to be shutdown
+   * @param initialWait time in seconds to wait for currently running threads to finish execution
+   * @param secondaryWait time in seconds to wait for running threads that did not terminate to acknowledge their forced termination
+   */
+  public static void shutdownExecutor(ExecutorService stream, int initialWait, int secondaryWait) {
+    stream.shutdown();
+    try {
+      if (!stream.awaitTermination(initialWait, TimeUnit.SECONDS)) {
+        stream.shutdownNow();
+        if (!stream.awaitTermination(secondaryWait, TimeUnit.SECONDS)) {
+          LOGGER.error("Executor Service did not terminate");
         }
-        while( result == null && !StringUtils.isNotEmpty(result) );
-
-        return result;
+      }
+    } catch (InterruptedException ie) {
+      stream.shutdownNow();
+      Thread.currentThread().interrupt();
     }
+  }
 
-    /**
-     * Attempts to safely {@link java.util.concurrent.ExecutorService#shutdown()} and {@link java.util.concurrent.ExecutorService#awaitTermination(long, java.util.concurrent.TimeUnit)}
-     * of an {@link java.util.concurrent.ExecutorService}.
-     * @param stream service to be shutdown
-     * @param initialWait time in seconds to wait for currently running threads to finish execution
-     * @param secondaryWait time in seconds to wait for running threads that did not terminate in the first wait to acknowledge their forced termination
-     */
-    public static void shutdownExecutor(ExecutorService stream, int initialWait, int secondaryWait) {
-        stream.shutdown();
-        try {
-            if (!stream.awaitTermination(initialWait, TimeUnit.SECONDS)) {
-                stream.shutdownNow();
-                if (!stream.awaitTermination(secondaryWait, TimeUnit.SECONDS)) {
-                    LOGGER.error("Executor Service did not terminate");
-                }
-            }
-        } catch (InterruptedException ie) {
-            stream.shutdownNow();
-            Thread.currentThread().interrupt();
-        }
+  /**
+   * Removes all mbeans registered undered a specific domain.  Made specificly to clean up at unit tests
+   * @param domain mbean domain
+   */
+  public static void removeAllMBeansOfDomain(String domain) throws Exception {
+    MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
+    domain = domain.endsWith(":") ? domain : domain + ":";
+    ObjectName objectName = new ObjectName(domain + "*");
+    Set<ObjectName> mbeanNames = mbs.queryNames(objectName, null);
+    for (ObjectName name : mbeanNames) {
+      mbs.unregisterMBean(name);
     }
+  }
 
-    /**
-     * Removes all mbeans registered undered a specific domain.  Made specificly to clean up at unit tests
-     * @param domain
-     */
-    public static void removeAllMBeansOfDomain(String domain) throws Exception {
-        MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
-        domain = domain.endsWith(":") ? domain : domain+":";
-        ObjectName objectName = new ObjectName(domain+"*");
-        Set<ObjectName> mbeanNames = mbs.queryNames(objectName, null);
-        for(ObjectName name : mbeanNames) {
-            mbs.unregisterMBean(name);
-        }
-    }
-
-    /**
-     * Attempts to register an object with local MBeanServer.  Throws runtime exception on errors.
-     * @param name name to register bean with
-     * @param mbean mbean to register
-     */
-    public static <V> void registerLocalMBean(String name, V mbean) {
-        try {
-            ObjectName objectName = new ObjectName(name);
-            MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
-            mbs.registerMBean(mbean, objectName);
-        } catch (MalformedObjectNameException | InstanceAlreadyExistsException | MBeanRegistrationException | NotCompliantMBeanException e) {
-            LOGGER.error("Failed to register MXBean : {}", e);
-            throw new RuntimeException(e);
-        }
+  /**
+   * Attempts to register an object with local MBeanServer.  Throws runtime exception on errors.
+   * @param name name to register bean with
+   * @param mbean mbean to register
+   */
+  public static <V> void registerLocalMBean(String name, V mbean) {
+    try {
+      ObjectName objectName = new ObjectName(name);
+      MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
+      mbs.registerMBean(mbean, objectName);
+    } catch (MalformedObjectNameException | InstanceAlreadyExistsException | MBeanRegistrationException | NotCompliantMBeanException ex) {
+      LOGGER.error("Failed to register MXBean : {}", ex);
+      throw new RuntimeException(ex);
     }
+  }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-util/src/main/java/org/apache/streams/util/DateUtil.java
----------------------------------------------------------------------
diff --git a/streams-util/src/main/java/org/apache/streams/util/DateUtil.java b/streams-util/src/main/java/org/apache/streams/util/DateUtil.java
deleted file mode 100644
index 7bbb8e9..0000000
--- a/streams-util/src/main/java/org/apache/streams/util/DateUtil.java
+++ /dev/null
@@ -1,192 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.streams.util;
-
-import org.joda.time.DateTime;
-import org.joda.time.DateTimeZone;
-import org.joda.time.format.DateTimeFormatter;
-import org.joda.time.format.ISODateTimeFormat;
-
-import java.text.ParseException;
-import java.text.SimpleDateFormat;
-import java.util.*;
-
-
-/*
- *
- * If you can think of a better way, feel free to implement. This was a great class that I found that
- * solves the majority of the issue I was dealing with.
- *
- * smashew 11=13=2012
- *
- * Site:
- * http://stackoverflow.com/questions/3389348/parse-any-date-in-java
- */
-
-public class DateUtil
-{
-
-    private static final String REGEX_ONLY_NUMBERS = "[0-9]+";
-
-	private static final Map<String, String> DATE_FORMAT_REGEXPS = new HashMap<String, String>()
-	{
-		private static final long serialVersionUID = 1L;
-		{
-			put("^\\d{8}$", "yyyyMMdd");
-			put("^\\d{1,2}-\\d{1,2}-\\d{4}$", "dd-MM-yyyy");
-			put("^\\d{4}-\\d{1,2}-\\d{1,2}$", "yyyy-MM-dd");
-			put("^\\d{1,2}/\\d{1,2}/\\d{4}$", "MM/dd/yyyy");
-			put("^\\d{4}/\\d{1,2}/\\d{1,2}$", "yyyy/MM/dd");
-			put("^\\d{1,2}\\s[a-z]{3}\\s\\d{4}$", "dd MMM yyyy");
-			put("^\\d{1,2}\\s[a-z]{4,}\\s\\d{4}$", "dd MMMM yyyy");
-			put("^\\d{12}$", "yyyyMMddHHmm");
-			put("^\\d{8}\\s\\d{4}$", "yyyyMMdd HHmm");
-			put("^\\d{1,2}-\\d{1,2}-\\d{4}\\s\\d{1,2}:\\d{2}$", "dd-MM-yyyy HH:mm");
-			put("^\\d{4}-\\d{1,2}-\\d{1,2}\\s\\d{1,2}:\\d{2}$", "yyyy-MM-dd HH:mm");
-			put("^\\d{1,2}/\\d{1,2}/\\d{4}\\s\\d{1,2}:\\d{2}$", "MM/dd/yyyy HH:mm");
-			put("^\\d{4}/\\d{1,2}/\\d{1,2}\\s\\d{1,2}:\\d{2}$", "yyyy/MM/dd HH:mm");
-			put("^\\d{1,2}\\s[a-z]{3}\\s\\d{4}\\s\\d{1,2}:\\d{2}$", "dd MMM yyyy HH:mm");
-			put("^\\d{1,2}\\s[a-z]{4,}\\s\\d{4}\\s\\d{1,2}:\\d{2}$", "dd MMMM yyyy HH:mm");
-			put("^\\d{14}$", "yyyyMMddHHmmss");
-			put("^\\d{8}\\s\\d{6}$", "yyyyMMdd HHmmss");
-			put("^\\d{1,2}-\\d{1,2}-\\d{4}\\s\\d{1,2}:\\d{2}:\\d{2}$", "dd-MM-yyyy HH:mm:ss");
-			put("^\\d{4}-\\d{1,2}-\\d{1,2}\\s\\d{1,2}:\\d{2}:\\d{2}$", "yyyy-MM-dd HH:mm:ss");
-			put("^\\d{1,2}/\\d{1,2}/\\d{4}\\s\\d{1,2}:\\d{2}:\\d{2}$", "MM/dd/yyyy HH:mm:ss");
-			put("^\\d{4}/\\d{1,2}/\\d{1,2}\\s\\d{1,2}:\\d{2}:\\d{2}$", "yyyy/MM/dd HH:mm:ss");
-			put("^\\d{1,2}\\s[a-z]{3}\\s\\d{4}\\s\\d{1,2}:\\d{2}:\\d{2}$", "dd MMM yyyy HH:mm:ss");
-			put("^\\d{1,2}\\s[a-z]{4,}\\s\\d{4}\\s\\d{1,2}:\\d{2}:\\d{2}$", "dd MMMM yyyy HH:mm:ss");
-		}
-	};
-
-	/**
-	 * Determine SimpleDateFormat pattern matching with the given date string. Returns null if format is unknown. You
-	 * can simply extend DateUtil with more formats if needed.
-	 *
-	 * @param dateString
-	 *             The date string to determine the SimpleDateFormat pattern for.
-	 * @return The matching SimpleDateFormat pattern, or null if format is unknown.
-	 * @see java.text.SimpleDateFormat
-	 */
-	public static String determineDateFormat(String dateString)
-        throws ParseException
-	{
-		for (String regexp : DATE_FORMAT_REGEXPS.keySet())
-			if (dateString.toLowerCase().matches(regexp))
-				return DATE_FORMAT_REGEXPS.get(regexp);
-
-        throw new ParseException("unable to parse date",0);
-	}
-
-	public static DateTime determineDate(String dateString)
-		throws ParseException
-	{
-        // Trim the string just in case it is dirty.
-        dateString = dateString.trim();
-
-        // check to see if it looks like it is millis. If so, parse as millis and return.
-        if(dateString.matches(REGEX_ONLY_NUMBERS))
-            return new DateTime(new Date(Long.parseLong(dateString)));
-
-        try
-        {
-            // try to parse the string into a java.date object, if possible.
-            SimpleDateFormat dateFormat = new SimpleDateFormat(determineDateFormat(dateString));
-            dateFormat.setLenient(false);
-            return new DateTime(dateFormat.parse(dateString));
-        }
-        catch(Exception e)
-        {
-
-        }
-
-        return new DateTime(DateTime.parse(dateString));
-	}
-
-    public static DateTime determineDateTime(String dateString)
-            throws ParseException
-    {
-        return new DateTime(determineDate(dateString));
-    }
-
-    public static DateTime determineDateTime(String dateString, DateTimeZone theTimeZone)
-            throws ParseException
-    {
-        DateTime beforeTimeZone = determineDateTime(dateString);
-        return new DateTime(beforeTimeZone.getYear(),beforeTimeZone.getMonthOfYear(), beforeTimeZone.getDayOfMonth(), beforeTimeZone.getHourOfDay(), beforeTimeZone.getMinuteOfHour(), beforeTimeZone.getSecondOfMinute(), beforeTimeZone.getMillisOfSecond(), theTimeZone);
-    }
-
-
-    public static String getAliasForDate(String date, String prefix) throws ParseException {
-        return getAliasesForDateRange(date, null, prefix).iterator().next();
-    }
-
-    public static String getAliasForDate(DateTime date, String prefix) throws ParseException {
-        return getAliasesForDateRange(date, null, prefix).iterator().next();
-    }
-
-    public static Set<String> getAliasesForDateRange(String starDate, String endDate, String prefix)
-        throws ParseException
-    {
-        DateTime start = null;
-        DateTime end = null;
-        DateTimeFormatter df = ISODateTimeFormat.dateTimeNoMillis();
-        try {
-            start = df.parseDateTime(starDate);
-        } catch (Exception e) {
-            //do nothing. try to parse with other parsers
-        }
-        if(start == null) {
-            start = determineDateTime(starDate);
-        }
-        if(endDate != null) {
-            try {
-                end = df.parseDateTime(endDate);
-            } catch (Exception e) {
-                //do nothing. try to parse with other parsers
-            }
-            if( end == null)
-                end = determineDateTime(endDate);
-        }
-        return getAliasesForDateRange(start, end, prefix);
-    }
-
-    public static Set<String> getAliasesForDateRange(DateTime startDate, DateTime endDate, String prefix) {
-        Set<String> aliases = new HashSet<String>();
-        aliases.add(prefix+"_"+getDateAbbreviation(startDate.getYear(), startDate.getMonthOfYear()));
-        if(endDate == null) {
-            return aliases;
-        }
-        while(endDate.isAfter(startDate)) {
-            aliases.add(prefix+"_"+getDateAbbreviation(endDate.getYear(), endDate.getMonthOfYear()));
-            endDate = endDate.minusMonths(1);
-        }
-        return aliases;
-    }
-
-    private static String getDateAbbreviation(int year, int month) {
-        if(month > 9) {
-            return Integer.toString(year)+Integer.toString(month);
-        }
-        else {
-            return Integer.toString(year)+"0"+Integer.toString(month);
-        }
-    }
-
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-util/src/main/java/org/apache/streams/util/GuidUtils.java
----------------------------------------------------------------------
diff --git a/streams-util/src/main/java/org/apache/streams/util/GuidUtils.java b/streams-util/src/main/java/org/apache/streams/util/GuidUtils.java
index 1972bc7..2d129de 100644
--- a/streams-util/src/main/java/org/apache/streams/util/GuidUtils.java
+++ b/streams-util/src/main/java/org/apache/streams/util/GuidUtils.java
@@ -29,20 +29,26 @@ import java.nio.charset.Charset;
  */
 public class GuidUtils {
 
-    private static final Charset UTF8_CHARSET = Charset.forName("UTF-8");
+  private static final Charset UTF8_CHARSET = Charset.forName("UTF-8");
 
-    public static String generateGuid(String... parts) {
+  /**
+   * generateGuid from list of parts.
+   * @param parts list of parts
+   * @return guid
+   */
+  public static String generateGuid(String... parts) {
 
-        StringBuilder seed = new StringBuilder();
-        for( String part : parts ) {
-            Preconditions.checkNotNull(part);
-            Preconditions.checkArgument(!Strings.isNullOrEmpty(part));
-            seed.append(part);
-        }
+    StringBuilder seed = new StringBuilder();
 
-        String hash = Hashing.goodFastHash(24).hashString(seed, UTF8_CHARSET).asBytes().toString();
+    for ( String part : parts ) {
+      Preconditions.checkNotNull(part);
+      Preconditions.checkArgument(!Strings.isNullOrEmpty(part));
+      seed.append(part);
+    }
 
-        return hash;
+    String hash = Hashing.goodFastHash(24).hashString(seed, UTF8_CHARSET).asBytes().toString();
 
-    }
+    return hash;
+
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-util/src/main/java/org/apache/streams/util/SerializationUtil.java
----------------------------------------------------------------------
diff --git a/streams-util/src/main/java/org/apache/streams/util/SerializationUtil.java b/streams-util/src/main/java/org/apache/streams/util/SerializationUtil.java
index de324d2..ba22d3d 100644
--- a/streams-util/src/main/java/org/apache/streams/util/SerializationUtil.java
+++ b/streams-util/src/main/java/org/apache/streams/util/SerializationUtil.java
@@ -20,7 +20,11 @@ package org.apache.streams.util;
 
 import org.apache.commons.io.input.ClassLoaderObjectInputStream;
 
-import java.io.*;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
 
 /**
  * SerializationUtil contains methods for serializing, deserializing, and cloning
@@ -28,47 +32,62 @@ import java.io.*;
  */
 public class SerializationUtil {
 
-    /**
-     * BORROwED FROM APACHE STORM PROJECT
-     * @param obj
-     * @return
-     */
-    public static byte[] serialize(Object obj) {
-        try {
-            ByteArrayOutputStream bos = new ByteArrayOutputStream();
-            ObjectOutputStream oos = new ObjectOutputStream(bos);
-            oos.writeObject(obj);
-            oos.close();
-            return bos.toByteArray();
-        } catch(IOException ioe) {
-            throw new RuntimeException(ioe);
-        }
+  /**
+   * serialize Object as byte array.
+   *
+   * <p/>
+   * BORROwED FROM APACHE STORM PROJECT
+   *
+   * @param obj Object
+   * @return byte[]
+   */
+  public static byte[] serialize(Object obj) {
+    try {
+      ByteArrayOutputStream bos = new ByteArrayOutputStream();
+      ObjectOutputStream oos = new ObjectOutputStream(bos);
+      oos.writeObject(obj);
+      oos.close();
+      return bos.toByteArray();
+    } catch (IOException ioe) {
+      throw new RuntimeException(ioe);
     }
+  }
 
-    /**
-     * BORROwED FROM APACHE STORM PROJECT
-     * @param serialized
-     * @return
-     */
-    public static Object deserialize(byte[] serialized) {
-        try {
-            ByteArrayInputStream bis = new ByteArrayInputStream(serialized);
-            ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
-            ObjectInputStream ois = new ClassLoaderObjectInputStream(classLoader, bis);
-            Object ret = ois.readObject();
-            ois.close();
-            return ret;
-        } catch(IOException ioe) {
-            throw new RuntimeException(ioe);
-        } catch(ClassNotFoundException e) {
-            throw new RuntimeException(e);
-        }
+  /**
+   * deserialize byte array as Object.
+   *
+   * <p/>
+   * BORROwED FROM APACHE STORM PROJECT
+   *
+   * @param serialized byte[]
+   * @return Object
+   */
+  public static Object deserialize(byte[] serialized) {
+    try {
+      ByteArrayInputStream bis = new ByteArrayInputStream(serialized);
+      ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
+      ObjectInputStream ois = new ClassLoaderObjectInputStream(classLoader, bis);
+      Object ret = ois.readObject();
+      ois.close();
+      return ret;
+    } catch (IOException ioe) {
+      throw new RuntimeException(ioe);
+    } catch (ClassNotFoundException ex) {
+      throw new RuntimeException(ex);
     }
+  }
 
-
-    public static <T> T cloneBySerialization(T obj) {
-        if( obj != null )
-            return (T) deserialize(serialize(obj));
-        else return null;
+  /**
+   * clone Object by serialization.
+   * @param obj Object
+   * @param <T> type
+   * @return cloned Object
+   */
+  public static <T> T cloneBySerialization(T obj) {
+    if ( obj != null ) {
+      return (T) deserialize(serialize(obj));
+    } else {
+      return null;
     }
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-util/src/main/java/org/apache/streams/util/api/requests/backoff/AbstractBackOffStrategy.java
----------------------------------------------------------------------
diff --git a/streams-util/src/main/java/org/apache/streams/util/api/requests/backoff/AbstractBackOffStrategy.java b/streams-util/src/main/java/org/apache/streams/util/api/requests/backoff/AbstractBackOffStrategy.java
index 7fbfc6b..3dc3e08 100644
--- a/streams-util/src/main/java/org/apache/streams/util/api/requests/backoff/AbstractBackOffStrategy.java
+++ b/streams-util/src/main/java/org/apache/streams/util/api/requests/backoff/AbstractBackOffStrategy.java
@@ -22,61 +22,64 @@ import java.util.concurrent.atomic.AtomicInteger;
  */
 public abstract class AbstractBackOffStrategy implements BackOffStrategy {
 
-    private long baseSleepTime;
-    private long lastSleepTime;
-    private int maxAttempts;
-    private AtomicInteger attemptsCount;
+  private long baseSleepTime;
+  private long lastSleepTime;
+  private int maxAttempts;
+  private AtomicInteger attemptsCount;
 
-    /**
-     * A BackOffStrategy that can effectively be used endlessly.
-     * @param baseBackOffTime amount of time back of in seconds
-     */
-    public AbstractBackOffStrategy(long baseBackOffTime) {
-        this(baseBackOffTime, -1);
-    }
+  /**
+   * A BackOffStrategy that can effectively be used endlessly.
+   * @param baseBackOffTime amount of time back of in seconds
+   */
+  public AbstractBackOffStrategy(long baseBackOffTime) {
+    this(baseBackOffTime, -1);
+  }
 
-    /**
-     * A BackOffStrategy that has a limited number of uses before it throws a {@link org.apache.streams.util.api.requests.backoff.BackOffException}
-     * @param baseBackOffTime time to back off in milliseconds, must be greater than 0.
-     * @param maximumNumberOfBackOffAttempts maximum number of attempts, must be grater than 0 or -1. -1 indicates there is no maximum number of attempts.
-     */
-    public AbstractBackOffStrategy(long baseBackOffTime, int maximumNumberOfBackOffAttempts) {
-        if(baseBackOffTime <= 0) {
-            throw new IllegalArgumentException("backOffTimeInMilliSeconds is not greater than 0 : "+baseBackOffTime);
-        }
-        if(maximumNumberOfBackOffAttempts<=0 && maximumNumberOfBackOffAttempts != -1) {
-            throw new IllegalArgumentException("maximumNumberOfBackOffAttempts is not greater than 0 : "+maximumNumberOfBackOffAttempts);
-        }
-        this.baseSleepTime = baseBackOffTime;
-        this.maxAttempts = maximumNumberOfBackOffAttempts;
-        this.attemptsCount = new AtomicInteger(0);
+  /**
+   * A BackOffStrategy that has a limited number of uses before it throws a
+   * {@link org.apache.streams.util.api.requests.backoff.BackOffException}.
+   * @param baseBackOffTime time to back off in milliseconds, must be greater than 0.
+   * @param maximumNumberOfBackOffAttempts maximum number of attempts, must be grater than 0 or -1.
+   *                                       -1 indicates there is no maximum number of attempts.
+   */
+  public AbstractBackOffStrategy(long baseBackOffTime, int maximumNumberOfBackOffAttempts) {
+    if (baseBackOffTime <= 0) {
+      throw new IllegalArgumentException("backOffTimeInMilliSeconds is not greater than 0 : " + baseBackOffTime);
     }
-
-    @Override
-    public void backOff() throws BackOffException {
-        int attempt = this.attemptsCount.getAndIncrement();
-        if(attempt >= this.maxAttempts && this.maxAttempts != -1) {
-            throw new BackOffException(attempt, this.lastSleepTime);
-        } else {
-            try {
-                Thread.sleep(this.lastSleepTime = calculateBackOffTime(attempt, this.baseSleepTime));
-            } catch (InterruptedException ie) {
-                Thread.currentThread().interrupt();
-            }
-        }
+    if (maximumNumberOfBackOffAttempts <= 0 && maximumNumberOfBackOffAttempts != -1) {
+      throw new IllegalArgumentException("maximumNumberOfBackOffAttempts is not greater than 0 : " + maximumNumberOfBackOffAttempts);
     }
+    this.baseSleepTime = baseBackOffTime;
+    this.maxAttempts = maximumNumberOfBackOffAttempts;
+    this.attemptsCount = new AtomicInteger(0);
+  }
 
-    @Override
-    public void reset() {
-        this.attemptsCount.set(0);
+  @Override
+  public void backOff() throws BackOffException {
+    int attempt = this.attemptsCount.getAndIncrement();
+    if (attempt >= this.maxAttempts && this.maxAttempts != -1) {
+      throw new BackOffException(attempt, this.lastSleepTime);
+    } else {
+      try {
+        Thread.sleep(this.lastSleepTime = calculateBackOffTime(attempt, this.baseSleepTime));
+      } catch (InterruptedException ie) {
+        Thread.currentThread().interrupt();
+      }
     }
+  }
+
+  @Override
+  public void reset() {
+    this.attemptsCount.set(0);
+  }
 
-    /**
-     * Calculate the amount of time in milliseconds that the strategy should back off for
-     * @param attemptCount the number of attempts the strategy has backed off. i.e. 1 -> this is the first attempt, 2 -> this is the second attempt, etc.
-     * @param baseSleepTime the minimum amount of time it should back off for in milliseconds
-     * @return the amount of time it should back off in milliseconds
-     */
-    protected abstract long calculateBackOffTime(int attemptCount, long baseSleepTime);
+  /**
+   * Calculate the amount of time in milliseconds that the strategy should back off for
+   * @param attemptCount the number of attempts the strategy has backed off.
+   *                     i.e. 1 -> this is the first attempt, 2 -> this is the second attempt, etc.
+   * @param baseSleepTime the minimum amount of time it should back off for in milliseconds
+   * @return the amount of time it should back off in milliseconds
+   */
+  protected abstract long calculateBackOffTime(int attemptCount, long baseSleepTime);
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-util/src/main/java/org/apache/streams/util/api/requests/backoff/BackOffException.java
----------------------------------------------------------------------
diff --git a/streams-util/src/main/java/org/apache/streams/util/api/requests/backoff/BackOffException.java b/streams-util/src/main/java/org/apache/streams/util/api/requests/backoff/BackOffException.java
index 223303c..692c0b6 100644
--- a/streams-util/src/main/java/org/apache/streams/util/api/requests/backoff/BackOffException.java
+++ b/streams-util/src/main/java/org/apache/streams/util/api/requests/backoff/BackOffException.java
@@ -21,43 +21,49 @@ package org.apache.streams.util.api.requests.backoff;
  */
 public class BackOffException extends Exception {
 
-    private int attemptCount;
-    private long sleepTime;
-
-    public BackOffException() {
-        this(-1, -1);
-    }
-
-    public BackOffException(String message) {
-        this(message, -1, -1);
-    }
-
-    public BackOffException(int attemptCount, long maxSleepTime) {
-        this.attemptCount = attemptCount;
-        this.sleepTime = maxSleepTime;
-    }
-
-    public BackOffException(String message, int attemptCount, long maxSleepTime) {
-        super(message);
-        this.attemptCount = attemptCount;
-        this.sleepTime = maxSleepTime;
-    }
-
-    /**
-     * Gets the number of back off attempts that happened before the exception was thrown. If the function that
-     * initialized this exception does not set the number of attempts, -1 will be returned.
-     * @return number of attempts
-     */
-    public int getNumberOfBackOffsAttempted() {
-        return this.attemptCount;
-    }
-
-    /**
-     * Gets the longest sleep period that the strategy attempted. If the function that
-     * initialized this exception does not set the longest sleep period, -1 will be returned.
-     * @return
-     */
-    public long getLongestBackOff() {
-        return this.sleepTime;
-    }
+  private int attemptCount;
+  private long sleepTime;
+
+  public BackOffException() {
+    this(-1, -1);
+  }
+
+  public BackOffException(String message) {
+    this(message, -1, -1);
+  }
+
+  public BackOffException(int attemptCount, long maxSleepTime) {
+    this.attemptCount = attemptCount;
+    this.sleepTime = maxSleepTime;
+  }
+
+  /**
+   * BackOffException constructor.
+   * @param message message
+   * @param attemptCount attemptCount
+   * @param maxSleepTime maxSleepTime (in millis)
+   */
+  public BackOffException(String message, int attemptCount, long maxSleepTime) {
+    super(message);
+    this.attemptCount = attemptCount;
+    this.sleepTime = maxSleepTime;
+  }
+
+  /**
+   * Gets the number of back off attempts that happened before the exception was thrown. If the function that
+   * initialized this exception does not set the number of attempts, -1 will be returned.
+   * @return number of back off attempts
+   */
+  public int getNumberOfBackOffsAttempted() {
+    return this.attemptCount;
+  }
+
+  /**
+   * Gets the longest sleep period that the strategy attempted. If the function that
+   * initialized this exception does not set the longest sleep period, -1 will be returned.
+   * @return longest sleep period that the strategy attempted
+   */
+  public long getLongestBackOff() {
+    return this.sleepTime;
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-util/src/main/java/org/apache/streams/util/api/requests/backoff/BackOffStrategy.java
----------------------------------------------------------------------
diff --git a/streams-util/src/main/java/org/apache/streams/util/api/requests/backoff/BackOffStrategy.java b/streams-util/src/main/java/org/apache/streams/util/api/requests/backoff/BackOffStrategy.java
index a0d80e8..44497ab 100644
--- a/streams-util/src/main/java/org/apache/streams/util/api/requests/backoff/BackOffStrategy.java
+++ b/streams-util/src/main/java/org/apache/streams/util/api/requests/backoff/BackOffStrategy.java
@@ -19,6 +19,7 @@ package org.apache.streams.util.api.requests.backoff;
  * BackOffStrategy will cause the current thread to sleep for a specific amount of time. This is used to adhere to
  * api rate limits.
  *
+ * <p/>
  * The example below illustrates using a BackOffStrategy to slow down requests when you hit a rate limit exception.
  *
  * <code>
@@ -36,16 +37,17 @@ package org.apache.streams.util.api.requests.backoff;
  */
 public interface BackOffStrategy {
 
-    /**
-     * Cause the current thread to sleep for an amount of time based on the implemented strategy. If limits are set
-     * on the number of times the backOff can be called, an exception will be thrown.
-     * @throws BackOffException
-     */
-    public void backOff() throws BackOffException;
+  /**
+   * Cause the current thread to sleep for an amount of time based on the implemented strategy. If limits are set
+   * on the number of times the backOff can be called, an exception will be thrown.
+   * @throws BackOffException BackOffException
+   */
+  public void backOff() throws BackOffException;
 
-    /**
-     * Rests the back off strategy to its original state.  After the call the strategy will act as if {@link AbstractBackOffStrategy#backOff()}
-     * has never been called.
-     */
-    public void reset();
+  /**
+   * Rests the back off strategy to its original state.
+   * After the call the strategy will act as if {@link AbstractBackOffStrategy#backOff()}
+   * has never been called.
+   */
+  public void reset();
 }

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-util/src/main/java/org/apache/streams/util/api/requests/backoff/impl/ConstantTimeBackOffStrategy.java
----------------------------------------------------------------------
diff --git a/streams-util/src/main/java/org/apache/streams/util/api/requests/backoff/impl/ConstantTimeBackOffStrategy.java b/streams-util/src/main/java/org/apache/streams/util/api/requests/backoff/impl/ConstantTimeBackOffStrategy.java
index b3fd3f2..26ec225 100644
--- a/streams-util/src/main/java/org/apache/streams/util/api/requests/backoff/impl/ConstantTimeBackOffStrategy.java
+++ b/streams-util/src/main/java/org/apache/streams/util/api/requests/backoff/impl/ConstantTimeBackOffStrategy.java
@@ -24,25 +24,27 @@ import org.apache.streams.util.api.requests.backoff.AbstractBackOffStrategy;
  */
 public class ConstantTimeBackOffStrategy extends AbstractBackOffStrategy {
 
-    /**
-     * A ConstantTimeBackOffStrategy that can effectively be used endlessly.
-     * @param baseBackOffTimeInMiliseconds amount of time back of in milliseconds
-     */
-    public ConstantTimeBackOffStrategy(long baseBackOffTimeInMiliseconds) {
-        this(baseBackOffTimeInMiliseconds, -1);
-    }
+  /**
+   * A ConstantTimeBackOffStrategy that can effectively be used endlessly.
+   * @param baseBackOffTimeInMiliseconds amount of time back of in milliseconds
+   */
+  public ConstantTimeBackOffStrategy(long baseBackOffTimeInMiliseconds) {
+    this(baseBackOffTimeInMiliseconds, -1);
+  }
 
-    /**
-     * A ConstantTimeBackOffStrategy that has a limited number of uses before it throws a {@link org.apache.streams.util.api.requests.backoff.BackOffException}
-     * @param baseBackOffTimeInMiliseconds time to back off in milliseconds, must be greater than 0.
-     * @param maximumNumberOfBackOffAttempts maximum number of attempts, must be grater than 0 or -1. -1 indicates there is no maximum number of attempts.
-     */
-    public ConstantTimeBackOffStrategy(long baseBackOffTimeInMiliseconds, int maximumNumberOfBackOffAttempts) {
-        super(baseBackOffTimeInMiliseconds, maximumNumberOfBackOffAttempts);
-    }
+  /**
+   * A ConstantTimeBackOffStrategy that has a limited number of uses before it
+   * throws a {@link org.apache.streams.util.api.requests.backoff.BackOffException}
+   * @param baseBackOffTimeInMiliseconds time to back off in milliseconds, must be greater than 0.
+   * @param maximumNumberOfBackOffAttempts maximum number of attempts, must be grater than 0 or -1.
+   *                                       -1 indicates there is no maximum number of attempts.
+   */
+  public ConstantTimeBackOffStrategy(long baseBackOffTimeInMiliseconds, int maximumNumberOfBackOffAttempts) {
+    super(baseBackOffTimeInMiliseconds, maximumNumberOfBackOffAttempts);
+  }
 
-    @Override
-    protected long calculateBackOffTime(int attemptCount, long baseSleepTime) {
-        return baseSleepTime;
-    }
+  @Override
+  protected long calculateBackOffTime(int attemptCount, long baseSleepTime) {
+    return baseSleepTime;
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-util/src/main/java/org/apache/streams/util/api/requests/backoff/impl/ExponentialBackOffStrategy.java
----------------------------------------------------------------------
diff --git a/streams-util/src/main/java/org/apache/streams/util/api/requests/backoff/impl/ExponentialBackOffStrategy.java b/streams-util/src/main/java/org/apache/streams/util/api/requests/backoff/impl/ExponentialBackOffStrategy.java
index a5a9656..0962984 100644
--- a/streams-util/src/main/java/org/apache/streams/util/api/requests/backoff/impl/ExponentialBackOffStrategy.java
+++ b/streams-util/src/main/java/org/apache/streams/util/api/requests/backoff/impl/ExponentialBackOffStrategy.java
@@ -18,30 +18,29 @@ package org.apache.streams.util.api.requests.backoff.impl;
 import org.apache.streams.util.api.requests.backoff.AbstractBackOffStrategy;
 
 /**
- * Exponential backk strategy.  Caluclated by baseBackOffTimeInSeconds raised the attempt-count power.
+ * Exponential backoff strategy.  Calculated by baseBackOffTimeInSeconds raised the attempt-count power.
  */
 public class ExponentialBackOffStrategy extends AbstractBackOffStrategy {
 
+  /**
+   * Unlimited use ExponentialBackOffStrategy.
+   * @param baseBackOffTimeInSeconds baseBackOffTimeInSeconds
+   */
+  public ExponentialBackOffStrategy(int baseBackOffTimeInSeconds) {
+    this(baseBackOffTimeInSeconds, -1);
+  }
 
-    /**
-     * Unlimited use ExponentialBackOffStrategy
-     * @param baseBackOffTimeInSeconds
-     */
-    public ExponentialBackOffStrategy(int baseBackOffTimeInSeconds) {
-        this(baseBackOffTimeInSeconds, -1);
-    }
+  /**
+   * Limited use ExponentialBackOffStrategy.
+   * @param baseBackOffTimeInSeconds baseBackOffTimeInSeconds
+   * @param maxNumAttempts maxNumAttempts
+   */
+  public ExponentialBackOffStrategy(int baseBackOffTimeInSeconds, int maxNumAttempts) {
+    super(baseBackOffTimeInSeconds, maxNumAttempts);
+  }
 
-    /**
-     * Limited use ExponentialBackOffStrategy
-     * @param baseBackOffTimeInSeconds
-     * @param maxNumAttempts
-     */
-    public ExponentialBackOffStrategy(int baseBackOffTimeInSeconds, int maxNumAttempts) {
-        super(baseBackOffTimeInSeconds, maxNumAttempts);
-    }
-
-    @Override
-    protected long calculateBackOffTime(int attemptCount, long baseSleepTime) {
-        return Math.round(Math.pow(baseSleepTime, attemptCount)) * 1000;
-    }
+  @Override
+  protected long calculateBackOffTime(int attemptCount, long baseSleepTime) {
+    return Math.round(Math.pow(baseSleepTime, attemptCount)) * 1000;
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-util/src/main/java/org/apache/streams/util/api/requests/backoff/impl/LinearTimeBackOffStrategy.java
----------------------------------------------------------------------
diff --git a/streams-util/src/main/java/org/apache/streams/util/api/requests/backoff/impl/LinearTimeBackOffStrategy.java b/streams-util/src/main/java/org/apache/streams/util/api/requests/backoff/impl/LinearTimeBackOffStrategy.java
index 38d05a1..d6f323f 100644
--- a/streams-util/src/main/java/org/apache/streams/util/api/requests/backoff/impl/LinearTimeBackOffStrategy.java
+++ b/streams-util/src/main/java/org/apache/streams/util/api/requests/backoff/impl/LinearTimeBackOffStrategy.java
@@ -24,17 +24,16 @@ import org.apache.streams.util.api.requests.backoff.AbstractBackOffStrategy;
  */
 public class LinearTimeBackOffStrategy extends AbstractBackOffStrategy {
 
+  public LinearTimeBackOffStrategy(int baseBackOffTimeInSeconds) {
+    this(baseBackOffTimeInSeconds, -1);
+  }
 
-    public LinearTimeBackOffStrategy(int baseBackOffTimeInSeconds) {
-        this(baseBackOffTimeInSeconds, -1);
-    }
+  public LinearTimeBackOffStrategy(int baseBackOffTimeInSeconds, int maxAttempts) {
+    super(baseBackOffTimeInSeconds, -1);
+  }
 
-    public LinearTimeBackOffStrategy(int baseBackOffTimeInSeconds, int maxAttempts) {
-        super(baseBackOffTimeInSeconds, -1);
-    }
-
-    @Override
-    protected long calculateBackOffTime(int attemptCount, long baseSleepTime) {
-        return 1000L * attemptCount * baseSleepTime;
-    }
+  @Override
+  protected long calculateBackOffTime(int attemptCount, long baseSleepTime) {
+    return 1000L * attemptCount * baseSleepTime;
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-util/src/main/java/org/apache/streams/util/oauth/tokens/AbstractOauthToken.java
----------------------------------------------------------------------
diff --git a/streams-util/src/main/java/org/apache/streams/util/oauth/tokens/AbstractOauthToken.java b/streams-util/src/main/java/org/apache/streams/util/oauth/tokens/AbstractOauthToken.java
index dfdec72..41ec4b6 100644
--- a/streams-util/src/main/java/org/apache/streams/util/oauth/tokens/AbstractOauthToken.java
+++ b/streams-util/src/main/java/org/apache/streams/util/oauth/tokens/AbstractOauthToken.java
@@ -12,22 +12,23 @@ software distributed under the License is distributed on an
 KIND, either express or implied. See the License for the
 specific language governing permissions and limitations
 under the License. */
+
 package org.apache.streams.util.oauth.tokens;
 
 /**
- *
+ * AbstractOauthToken.
  */
 public abstract class AbstractOauthToken {
 
-    /**
-     * Must create equals method for all OauthTokens.
-     * @param o
-     * @return true if equal, and false otherwise
-     */
-    protected abstract boolean internalEquals(Object o);
+  /**
+   * Must create equals method for all OauthTokens.
+   * @param object object for comparison
+   * @return true if equal, and false otherwise
+   */
+  protected abstract boolean internalEquals(Object object);
 
-    @Override
-    public boolean equals(Object o) {
-        return this.internalEquals(o);
-    }
+  @Override
+  public boolean equals(Object object) {
+    return this.internalEquals(object);
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-util/src/main/java/org/apache/streams/util/oauth/tokens/tokenmanager/SimpleTokenManager.java
----------------------------------------------------------------------
diff --git a/streams-util/src/main/java/org/apache/streams/util/oauth/tokens/tokenmanager/SimpleTokenManager.java b/streams-util/src/main/java/org/apache/streams/util/oauth/tokens/tokenmanager/SimpleTokenManager.java
index fed194f..7b3f370 100644
--- a/streams-util/src/main/java/org/apache/streams/util/oauth/tokens/tokenmanager/SimpleTokenManager.java
+++ b/streams-util/src/main/java/org/apache/streams/util/oauth/tokens/tokenmanager/SimpleTokenManager.java
@@ -12,42 +12,40 @@ software distributed under the License is distributed on an
 KIND, either express or implied. See the License for the
 specific language governing permissions and limitations
 under the License. */
-package org.apache.streams.util.oauth.tokens.tokenmanager;
 
-import org.apache.streams.util.oauth.tokens.AbstractOauthToken;
+package org.apache.streams.util.oauth.tokens.tokenmanager;
 
 import java.util.Collection;
 
 /**
- * Manges access to oauth tokens.  Allows a caller to add tokens to the token pool and receive an available token.
+ * Manages access to oauth tokens.  Allows a caller to add tokens to the token pool and receive an available token.
  */
 public interface SimpleTokenManager<T> {
 
-
-    /**
-     * Adds a token to the available token pool.
-     * @param token Token to be added
-     * @return true, if token was successfully added to the pool and false otherwise.
-     */
-    public boolean addTokenToPool(T token);
-
-    /**
-     * Adds a {@link java.util.Collection} of tokens to the available token pool.
-     * @param tokens Tokens to be added
-     * @return true, if the token pool size increased after adding the tokens, and false otherwise.
-     */
-    public boolean addAllTokensToPool(Collection<T> tokens);
-
-    /**
-     * Get an available token. If no tokens are available it returns null.
-     * @return next available token
-     */
-    public T getNextAvailableToken();
-
-    /**
-     * Get the number of available tokens
-     * @return number of available tokens
-     */
-    public int numAvailableTokens();
+  /**
+   * Adds a token to the available token pool.
+   * @param token Token to be added
+   * @return true, if token was successfully added to the pool and false otherwise.
+   */
+  public boolean addTokenToPool(T token);
+
+  /**
+   * Adds a {@link java.util.Collection} of tokens to the available token pool.
+   * @param tokens Tokens to be added
+   * @return true, if the token pool size increased after adding the tokens, and false otherwise.
+   */
+  public boolean addAllTokensToPool(Collection<T> tokens);
+
+  /**
+   * Get an available token. If no tokens are available it returns null.
+   * @return next available token
+   */
+  public T getNextAvailableToken();
+
+  /**
+   * Get the number of available tokens.
+   * @return number of available tokens
+   */
+  public int numAvailableTokens();
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-util/src/main/java/org/apache/streams/util/oauth/tokens/tokenmanager/impl/BasicTokenManager.java
----------------------------------------------------------------------
diff --git a/streams-util/src/main/java/org/apache/streams/util/oauth/tokens/tokenmanager/impl/BasicTokenManager.java b/streams-util/src/main/java/org/apache/streams/util/oauth/tokens/tokenmanager/impl/BasicTokenManager.java
new file mode 100644
index 0000000..7c1a9e3
--- /dev/null
+++ b/streams-util/src/main/java/org/apache/streams/util/oauth/tokens/tokenmanager/impl/BasicTokenManager.java
@@ -0,0 +1,94 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance *
+http://www.apache.org/licenses/LICENSE-2.0 *
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License. */
+
+package org.apache.streams.util.oauth.tokens.tokenmanager.impl;
+
+import org.apache.streams.util.oauth.tokens.tokenmanager.SimpleTokenManager;
+
+import java.util.ArrayList;
+import java.util.Collection;
+
+/**
+ * Manages a pool of tokens the most basic possible way.
+ * If all tokens are added to the manager before {@link BasicTokenManager#getNextAvailableToken() getNextAvailableToken}
+ * is called tokens are issued in the order they were added to the manager, FIFO.  The BasicTokenManager acts as a circular queue
+ * of tokens.  Once the manager issues all available tokens it will cycle back to the first token and start issuing tokens again.
+ *
+ * </p>
+ * When adding tokens to the pool of available tokens, the manager will not add tokens that are already in the pool.
+ *
+ * <p/>
+ * The manager class is thread safe.
+ */
+public class BasicTokenManager<T> implements SimpleTokenManager<T> {
+
+  private ArrayList<T> availableTokens;
+  private int nextToken;
+
+  public BasicTokenManager() {
+    this(null);
+  }
+
+  /**
+   * BasicTokenManager constructor.
+   * @param tokens Collection of tokens
+   */
+  public BasicTokenManager(Collection<T> tokens) {
+    if (tokens != null) {
+      this.availableTokens = new ArrayList<T>(tokens.size());
+      this.addAllTokensToPool(tokens);
+    } else {
+      this.availableTokens = new ArrayList<T>();
+    }
+    this.nextToken = 0;
+  }
+
+  @Override
+  public synchronized boolean addTokenToPool(T token) {
+    if (token == null || this.availableTokens.contains(token)) {
+      return false;
+    } else {
+      return this.availableTokens.add(token);
+    }
+  }
+
+  @Override
+  public synchronized boolean addAllTokensToPool(Collection<T> tokens) {
+    int startSize = this.availableTokens.size();
+    for (T token : tokens) {
+      this.addTokenToPool(token);
+    }
+    return startSize < this.availableTokens.size();
+  }
+
+  @Override
+  public synchronized T getNextAvailableToken() {
+    T token = null;
+    if (this.availableTokens.size() == 0) {
+      return token;
+    } else {
+      token = this.availableTokens.get(nextToken++);
+      if (nextToken == this.availableTokens.size()) {
+        nextToken = 0;
+      }
+      return token;
+    }
+  }
+
+  @Override
+  public synchronized int numAvailableTokens() {
+    return this.availableTokens.size();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-util/src/main/java/org/apache/streams/util/oauth/tokens/tokenmanager/impl/BasicTokenManger.java
----------------------------------------------------------------------
diff --git a/streams-util/src/main/java/org/apache/streams/util/oauth/tokens/tokenmanager/impl/BasicTokenManger.java b/streams-util/src/main/java/org/apache/streams/util/oauth/tokens/tokenmanager/impl/BasicTokenManger.java
deleted file mode 100644
index 4c64bf7..0000000
--- a/streams-util/src/main/java/org/apache/streams/util/oauth/tokens/tokenmanager/impl/BasicTokenManger.java
+++ /dev/null
@@ -1,86 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements. See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership. The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance *
-http://www.apache.org/licenses/LICENSE-2.0 *
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied. See the License for the
-specific language governing permissions and limitations
-under the License. */
-package org.apache.streams.util.oauth.tokens.tokenmanager.impl;
-
-import org.apache.streams.util.oauth.tokens.AbstractOauthToken;
-import org.apache.streams.util.oauth.tokens.tokenmanager.SimpleTokenManager;
-
-import java.util.ArrayList;
-import java.util.Collection;
-
-/**
- * Manages a pool of tokens the most basic possible way.  If all tokens are added to the manager before {@link BasicTokenManger#getNextAvailableToken() getNextAvailableToken}
- * is called tokens are issued in the order they were added to the manager, FIFO.  The BasicTokenManager acts as a circular queue
- * of tokens.  Once the manager issues all available tokens it will cycle back to the first token and start issuing tokens again.
- *
- * When adding tokens to the pool of available tokens, the manager will not add tokens that are already in the pool.
- *
- * The manager class is thread safe.
- */
-public class BasicTokenManger<T> implements SimpleTokenManager<T>{
-
-    private ArrayList<T> availableTokens;
-    private int nextToken;
-
-    public BasicTokenManger() {
-        this(null);
-    }
-
-    public BasicTokenManger(Collection<T> tokens) {
-        if(tokens != null) {
-            this.availableTokens = new ArrayList<T>(tokens.size());
-            this.addAllTokensToPool(tokens);
-        } else {
-            this.availableTokens = new ArrayList<T>();
-        }
-        this.nextToken = 0;
-    }
-
-    @Override
-    public synchronized boolean addTokenToPool(T token) {
-        if(token == null || this.availableTokens.contains(token))
-            return false;
-        else
-            return this.availableTokens.add(token);
-    }
-
-    @Override
-    public synchronized boolean addAllTokensToPool(Collection<T> tokens) {
-        int startSize = this.availableTokens.size();
-        for(T token : tokens) {
-            this.addTokenToPool(token);
-        }
-        return startSize < this.availableTokens.size();
-    }
-
-    @Override
-    public synchronized T getNextAvailableToken() {
-        T token = null;
-        if(this.availableTokens.size() == 0) {
-            return token;
-        } else {
-            token = this.availableTokens.get(nextToken++);
-            if(nextToken == this.availableTokens.size()) {
-                nextToken = 0;
-            }
-            return token;
-        }
-    }
-
-    @Override
-    public synchronized int numAvailableTokens() {
-        return this.availableTokens.size();
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-util/src/main/java/org/apache/streams/util/schema/FieldType.java
----------------------------------------------------------------------
diff --git a/streams-util/src/main/java/org/apache/streams/util/schema/FieldType.java b/streams-util/src/main/java/org/apache/streams/util/schema/FieldType.java
index 450851e..57a1d44 100644
--- a/streams-util/src/main/java/org/apache/streams/util/schema/FieldType.java
+++ b/streams-util/src/main/java/org/apache/streams/util/schema/FieldType.java
@@ -15,6 +15,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
+
 package org.apache.streams.util.schema;
 
 /**
@@ -22,10 +23,10 @@ package org.apache.streams.util.schema;
  * be able to translate.
  */
 public enum FieldType {
-    STRING,
-    INTEGER,
-    NUMBER,
-    BOOLEAN,
-    OBJECT,
-    ARRAY
+  STRING,
+  INTEGER,
+  NUMBER,
+  BOOLEAN,
+  OBJECT,
+  ARRAY
 }

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-util/src/main/java/org/apache/streams/util/schema/FieldUtil.java
----------------------------------------------------------------------
diff --git a/streams-util/src/main/java/org/apache/streams/util/schema/FieldUtil.java b/streams-util/src/main/java/org/apache/streams/util/schema/FieldUtil.java
index 6582565..a437ca4 100644
--- a/streams-util/src/main/java/org/apache/streams/util/schema/FieldUtil.java
+++ b/streams-util/src/main/java/org/apache/streams/util/schema/FieldUtil.java
@@ -15,11 +15,9 @@
  * specific language governing permissions and limitations
  * under the License.
  */
+
 package org.apache.streams.util.schema;
 
-import com.fasterxml.jackson.databind.JsonNode;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.databind.node.JsonNodeFactory;
 import com.fasterxml.jackson.databind.node.ObjectNode;
 
 /**
@@ -27,25 +25,32 @@ import com.fasterxml.jackson.databind.node.ObjectNode;
  */
 public class FieldUtil {
 
-    public static FieldType determineFieldType(ObjectNode fieldNode) {
-        String typeSchemaField = "type";
-        if( !fieldNode.has(typeSchemaField))
-            return null;
-        String typeSchemaFieldValue = fieldNode.get(typeSchemaField).asText();
-        if( typeSchemaFieldValue.equals("string")) {
-            return FieldType.STRING;
-        } else if( typeSchemaFieldValue.equals("integer")) {
-            return FieldType.INTEGER;
-        } else if( typeSchemaFieldValue.equals("number")) {
-            return FieldType.NUMBER;
-        } else if( typeSchemaFieldValue.equals("object")) {
-            return FieldType.OBJECT;
-        } else if( typeSchemaFieldValue.equals("boolean")) {
-            return FieldType.BOOLEAN;
-        } else if( typeSchemaFieldValue.equals("array")) {
-            return FieldType.ARRAY;
-        }
-        else return null;
+  /**
+   * determine FieldType from ObjectNode.
+   * @param fieldNode ObjectNode
+   * @return FieldType
+   */
+  public static FieldType determineFieldType(ObjectNode fieldNode) {
+    String typeSchemaField = "type";
+    if ( !fieldNode.has(typeSchemaField)) {
+      return null;
+    }
+    String typeSchemaFieldValue = fieldNode.get(typeSchemaField).asText();
+    if ( typeSchemaFieldValue.equals("string")) {
+      return FieldType.STRING;
+    } else if ( typeSchemaFieldValue.equals("integer")) {
+      return FieldType.INTEGER;
+    } else if ( typeSchemaFieldValue.equals("number")) {
+      return FieldType.NUMBER;
+    } else if ( typeSchemaFieldValue.equals("object")) {
+      return FieldType.OBJECT;
+    } else if ( typeSchemaFieldValue.equals("boolean")) {
+      return FieldType.BOOLEAN;
+    } else if ( typeSchemaFieldValue.equals("array")) {
+      return FieldType.ARRAY;
+    } else {
+      return null;
     }
+  }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-util/src/main/java/org/apache/streams/util/schema/FileUtil.java
----------------------------------------------------------------------
diff --git a/streams-util/src/main/java/org/apache/streams/util/schema/FileUtil.java b/streams-util/src/main/java/org/apache/streams/util/schema/FileUtil.java
index c51339a..5acd5a8 100644
--- a/streams-util/src/main/java/org/apache/streams/util/schema/FileUtil.java
+++ b/streams-util/src/main/java/org/apache/streams/util/schema/FileUtil.java
@@ -15,6 +15,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
+
 package org.apache.streams.util.schema;
 
 import com.google.common.base.Preconditions;
@@ -35,60 +36,93 @@ import java.util.List;
  */
 public class FileUtil {
 
-    private final static Logger LOGGER = LoggerFactory.getLogger(FileUtil.class);
+  private static final Logger LOGGER = LoggerFactory.getLogger(FileUtil.class);
 
-    public static String dropSourcePathPrefix(String inputFile, String sourceDirectory) {
-        if(Strings.isNullOrEmpty(sourceDirectory))
-            return inputFile;
-        else {
-            try {
-                if( inputFile.contains(sourceDirectory) && inputFile.indexOf(sourceDirectory) > 0) {
-                    return inputFile.substring(inputFile.indexOf(sourceDirectory)+sourceDirectory.length()+1);
-                }
-            } catch( Throwable e ) {
-                return inputFile;
-            }
+  /**
+   * drop source path prefix between inputFile and sourceDirectory.
+   * @param inputFile inputFile
+   * @param sourceDirectory sourceDirectory
+   * @return without path prefix
+   */
+  public static String dropSourcePathPrefix(String inputFile, String sourceDirectory) {
+    if (Strings.isNullOrEmpty(sourceDirectory)) {
+      return inputFile;
+    } else {
+      try {
+        if ( inputFile.contains(sourceDirectory) && inputFile.indexOf(sourceDirectory) > 0) {
+          return inputFile.substring(inputFile.indexOf(sourceDirectory) + sourceDirectory.length() + 1);
         }
+      } catch ( Throwable throwable ) {
         return inputFile;
+      }
     }
+    return inputFile;
+  }
 
-    public static String swapExtension(String inputFile, String originalExtension, String newExtension) {
-        if(inputFile.endsWith("."+originalExtension))
-            return inputFile.replace("."+originalExtension, "."+newExtension);
-        else return inputFile;
+  /**
+   * swapExtension.
+   * @param inputFile inputFile
+   * @param originalExtension originalExtension
+   * @param newExtension newExtension
+   * @return extension swapped
+   */
+  public static String swapExtension(String inputFile, String originalExtension, String newExtension) {
+    if (inputFile.endsWith("." + originalExtension)) {
+      return inputFile.replace("." + originalExtension, "." + newExtension);
+    } else {
+      return inputFile;
     }
+  }
 
-    public static String dropExtension(String inputFile) {
-        if(inputFile.contains("."))
-            return inputFile.substring(0, inputFile.lastIndexOf("."));
-        else return inputFile;
+  /**
+   * dropExtension.
+   * @param inputFile inputFile
+   * @return extension dropped
+   */
+  public static String dropExtension(String inputFile) {
+    if (inputFile.contains(".")) {
+      return inputFile.substring(0, inputFile.lastIndexOf("."));
+    } else {
+      return inputFile;
     }
+  }
 
-    public static void writeFile(String resourceFile, String resourceContent) {
-        try {
-            File path = new File(resourceFile);
-            File dir = path.getParentFile();
-            if( !dir.exists() )
-                dir.mkdirs();
-            Files.write(Paths.get(resourceFile), resourceContent.getBytes(), StandardOpenOption.CREATE_NEW);
-        } catch (Exception e) {
-            LOGGER.error("Write Exception: {}", e);
-        }
+  /**
+   * writeFile.
+   * @param resourceFile resourceFile
+   * @param resourceContent resourceContent
+   */
+  public static void writeFile(String resourceFile, String resourceContent) {
+    try {
+      File path = new File(resourceFile);
+      File dir = path.getParentFile();
+      if ( !dir.exists() ) {
+        dir.mkdirs();
+      }
+      Files.write(Paths.get(resourceFile), resourceContent.getBytes(), StandardOpenOption.CREATE_NEW);
+    } catch (Exception ex) {
+      LOGGER.error("Write Exception: {}", ex);
     }
+  }
 
-    public static void resolveRecursive(GenerationConfig config, List<File> schemaFiles) {
-
-        Preconditions.checkArgument(schemaFiles.size() > 0);
-        int i = 0;
-        while( schemaFiles.size() > i) {
-            File child = schemaFiles.get(i);
-            if (child.isDirectory()) {
-                schemaFiles.addAll(Arrays.asList(child.listFiles(config.getFileFilter())));
-                schemaFiles.remove(child);
-            } else {
-                i += 1;
-            }
-        }
+  /**
+   * resolveRecursive.
+   * @param config GenerationConfig
+   * @param schemaFiles List of schemaFiles
+   */
+  public static void resolveRecursive(GenerationConfig config, List<File> schemaFiles) {
 
+    Preconditions.checkArgument(schemaFiles.size() > 0);
+    int index = 0;
+    while ( schemaFiles.size() > index) {
+      File child = schemaFiles.get(index);
+      if (child.isDirectory()) {
+        schemaFiles.addAll(Arrays.asList(child.listFiles(config.getFileFilter())));
+        schemaFiles.remove(child);
+      } else {
+        index += 1;
+      }
     }
+
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-util/src/main/java/org/apache/streams/util/schema/GenerationConfig.java
----------------------------------------------------------------------
diff --git a/streams-util/src/main/java/org/apache/streams/util/schema/GenerationConfig.java b/streams-util/src/main/java/org/apache/streams/util/schema/GenerationConfig.java
index c48d186..d7fa7e7 100644
--- a/streams-util/src/main/java/org/apache/streams/util/schema/GenerationConfig.java
+++ b/streams-util/src/main/java/org/apache/streams/util/schema/GenerationConfig.java
@@ -15,6 +15,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
+
 package org.apache.streams.util.schema;
 
 import java.io.File;
@@ -24,110 +25,39 @@ import java.util.Iterator;
 
 /**
  * GenerationConfig represents the common fields and field accessors for
- * streams modules that transform schemas into generated-sources or generated-resources
+ * streams modules that transform schemas into generated-sources or generated-resources.
  */
 public interface GenerationConfig {
 
-    /**
-     * Gets the 'source' configuration option.
-     *
-     * @return The source file(s) or directory(ies) from which JSON Schema will
-     *         be read.
-     */
-    Iterator<URL> getSource();
-
-    /**
-     * Gets the 'targetDirectory' configuration option.
-     *
-     * @return The target directory into which generated types will be written
-     *         (may or may not exist before types are written)
-     */
-    File getTargetDirectory();
-
-    /**
-     * Gets the 'outputEncoding' configuration option.
-     *
-     * @return The character encoding that should be used when writing output files.
-     */
-    String getOutputEncoding();
-
-    /**
-     * Gets the file filter used to isolate the schema mapping files in the
-     * source directories.
-     *
-     * @return the file filter use when scanning for schema files.
-     */
-    FileFilter getFileFilter();
-
-    /**
-     * Gets the 'includeAdditionalProperties' configuration option.
-     *
-     * @return Whether to allow 'additional properties' support in objects.
-     *         Setting this to false will disable additional properties support,
-     *         regardless of the input schema(s).
-     */
-//    boolean isIncludeAdditionalProperties();
-
-    /**
-     * Gets the 'targetVersion' configuration option.
-     *
-     *  @return The target version for generated source files.
-     */
-//    String getTargetVersion();
-
-//    /**
-//     * Gets the `includeDynamicAccessors` configuraiton option.
-//     *
-//     * @return Whether to include dynamic getters, setters, and builders
-//     *         or to omit these methods.
-//     */
-//    boolean isIncludeDynamicAccessors();
-
-//    /**
-//     * Gets the `dateTimeType` configuration option.
-//     *         <p>
-//     *         Example values:
-//     *         <ul>
-//     *         <li><code>org.joda.time.LocalDateTime</code> (Joda)</li>
-//     *         <li><code>java.time.LocalDateTime</code> (JSR310)</li>
-//     *         <li><code>null</code> (default behavior)</li>
-//     *         </ul>
-//     *
-//     * @return The java type to use instead of {@link java.util.Date}
-//     *         when adding date type fields to generate Java types.
-//     */
-//    String getDateTimeType();
-//
-//    /**
-//     * Gets the `dateType` configuration option.
-//     *         <p>
-//     *         Example values:
-//     *         <ul>
-//     *         <li><code>org.joda.time.LocalDate</code> (Joda)</li>
-//     *         <li><code>java.time.LocalDate</code> (JSR310)</li>
-//     *         <li><code>null</code> (default behavior)</li>
-//     *         </ul>
-//     *
-//     * @return The java type to use instead of string
-//     *         when adding string type fields with a format of date (not
-//     *         date-time) to generated Java types.
-//     */
-//    String getDateType();
-//
-//    /**
-//     * Gets the `timeType` configuration option.
-//     *         <p>
-//     *         Example values:
-//     *         <ul>
-//     *         <li><code>org.joda.time.LocalTime</code> (Joda)</li>
-//     *         <li><code>java.time.LocalTime</code> (JSR310)</li>
-//     *         <li><code>null</code> (default behavior)</li>
-//     *         </ul>
-//     *
-//     * @return The java type to use instead of string
-//     *         when adding string type fields with a format of time (not
-//     *         date-time) to generated Java types.
-//     */
-//    String getTimeType();
+  /**
+   * Gets the 'source' configuration option.
+   *
+   * @return The source file(s) or directory(ies) from which JSON Schema will
+   *         be read.
+   */
+  Iterator<URL> getSource();
+
+  /**
+   * Gets the 'targetDirectory' configuration option.
+   *
+   * @return The target directory into which generated types will be written
+   *         (may or may not exist before types are written)
+   */
+  File getTargetDirectory();
+
+  /**
+   * Gets the 'outputEncoding' configuration option.
+   *
+   * @return The character encoding that should be used when writing output files.
+   */
+  String getOutputEncoding();
+
+  /**
+   * Gets the file filter used to isolate the schema mapping files in the
+   * source directories.
+   *
+   * @return the file filter use when scanning for schema files.
+   */
+  FileFilter getFileFilter();
 
 }