You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streams.apache.org by sb...@apache.org on 2016/11/25 20:24:56 UTC

[15/42] incubator-streams git commit: STREAMS-440: custom checkstyle.xml, address compliance

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-monitoring/src/main/java/org/apache/streams/jackson/ThroughputQueueDeserializer.java
----------------------------------------------------------------------
diff --git a/streams-monitoring/src/main/java/org/apache/streams/jackson/ThroughputQueueDeserializer.java b/streams-monitoring/src/main/java/org/apache/streams/jackson/ThroughputQueueDeserializer.java
index e4d883d..35dbcd5 100644
--- a/streams-monitoring/src/main/java/org/apache/streams/jackson/ThroughputQueueDeserializer.java
+++ b/streams-monitoring/src/main/java/org/apache/streams/jackson/ThroughputQueueDeserializer.java
@@ -15,73 +15,78 @@
  * specific language governing permissions and limitations
  * under the License.
  */
+
 package org.apache.streams.jackson;
 
+import org.apache.streams.pojo.json.ThroughputQueueBroadcast;
+
 import com.fasterxml.jackson.core.JsonParser;
 import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.databind.DeserializationContext;
 import com.fasterxml.jackson.databind.JsonDeserializer;
 import com.fasterxml.jackson.databind.JsonNode;
-import org.apache.streams.pojo.json.ThroughputQueueBroadcast;
 import org.slf4j.Logger;
 
+import java.io.IOException;
+import java.lang.management.ManagementFactory;
+import java.util.Arrays;
 import javax.management.MBeanAttributeInfo;
 import javax.management.MBeanInfo;
 import javax.management.MBeanServer;
 import javax.management.ObjectName;
-import java.io.IOException;
-import java.lang.management.ManagementFactory;
-import java.util.Arrays;
 
 public class ThroughputQueueDeserializer extends JsonDeserializer<ThroughputQueueBroadcast> {
-    private static final Logger LOGGER = org.slf4j.LoggerFactory.getLogger(ThroughputQueueDeserializer.class);
 
-    public ThroughputQueueDeserializer() {
+  private static final Logger LOGGER = org.slf4j.LoggerFactory.getLogger(ThroughputQueueDeserializer.class);
 
-    }
+  public ThroughputQueueDeserializer() {
 
-    @Override
-    public ThroughputQueueBroadcast deserialize(JsonParser jsonParser, DeserializationContext deserializationContext) throws IOException, JsonProcessingException {
-        try {
-            MBeanServer server = ManagementFactory.getPlatformMBeanServer();
+  }
 
-            ThroughputQueueBroadcast throughputQueueBroadcast = new ThroughputQueueBroadcast();
-            JsonNode attributes = jsonParser.getCodec().readTree(jsonParser);
+  @Override
+  public ThroughputQueueBroadcast deserialize(JsonParser jsonParser, DeserializationContext deserializationContext) throws IOException, JsonProcessingException {
+    try {
+      MBeanServer server = ManagementFactory.getPlatformMBeanServer();
 
-            ObjectName name = new ObjectName(attributes.get("canonicalName").asText());
-            MBeanInfo info = server.getMBeanInfo(name);
-            throughputQueueBroadcast.setName(name.toString());
+      ThroughputQueueBroadcast throughputQueueBroadcast = new ThroughputQueueBroadcast();
+      JsonNode attributes = jsonParser.getCodec().readTree(jsonParser);
 
-            for (MBeanAttributeInfo attribute : Arrays.asList(info.getAttributes())) {
-                try {
-                    switch(attribute.getName()) {
-                        case "CurrentSize":
-                            throughputQueueBroadcast.setCurrentSize((long) server.getAttribute(name, attribute.getName()));
-                            break;
-                        case "AvgWait":
-                            throughputQueueBroadcast.setAvgWait((double) server.getAttribute(name, attribute.getName()));
-                            break;
-                        case "MaxWait":
-                            throughputQueueBroadcast.setMaxWait((long) server.getAttribute(name, attribute.getName()));
-                            break;
-                        case "Removed":
-                            throughputQueueBroadcast.setRemoved((long) server.getAttribute(name, attribute.getName()));
-                            break;
-                        case "Added":
-                            throughputQueueBroadcast.setAdded((long) server.getAttribute(name, attribute.getName()));
-                            break;
-                        case "Throughput":
-                            throughputQueueBroadcast.setThroughput((double) server.getAttribute(name, attribute.getName()));
-                            break;
-                    }
-                } catch (Exception e) {
-                    LOGGER.error("Exception while trying to deserialize ThroughputQueueBroadcast object: {}", e);
-                }
-            }
+      ObjectName name = new ObjectName(attributes.get("canonicalName").asText());
+      MBeanInfo info = server.getMBeanInfo(name);
+      throughputQueueBroadcast.setName(name.toString());
 
-            return throughputQueueBroadcast;
-        } catch (Exception e) {
-            return null;
+      for (MBeanAttributeInfo attribute : Arrays.asList(info.getAttributes())) {
+        try {
+          switch (attribute.getName()) {
+            case "CurrentSize":
+              throughputQueueBroadcast.setCurrentSize((long) server.getAttribute(name, attribute.getName()));
+              break;
+            case "AvgWait":
+              throughputQueueBroadcast.setAvgWait((double) server.getAttribute(name, attribute.getName()));
+              break;
+            case "MaxWait":
+              throughputQueueBroadcast.setMaxWait((long) server.getAttribute(name, attribute.getName()));
+              break;
+            case "Removed":
+              throughputQueueBroadcast.setRemoved((long) server.getAttribute(name, attribute.getName()));
+              break;
+            case "Added":
+              throughputQueueBroadcast.setAdded((long) server.getAttribute(name, attribute.getName()));
+              break;
+            case "Throughput":
+              throughputQueueBroadcast.setThroughput((double) server.getAttribute(name, attribute.getName()));
+              break;
+            default:
+              break;
+          }
+        } catch (Exception ex) {
+          LOGGER.error("Exception while trying to deserialize ThroughputQueueBroadcast object: {}", ex);
         }
+      }
+
+      return throughputQueueBroadcast;
+    } catch (Exception ex) {
+      return null;
     }
+  }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-monitoring/src/main/java/org/apache/streams/monitoring/persist/MessagePersister.java
----------------------------------------------------------------------
diff --git a/streams-monitoring/src/main/java/org/apache/streams/monitoring/persist/MessagePersister.java b/streams-monitoring/src/main/java/org/apache/streams/monitoring/persist/MessagePersister.java
index 28c7fa7..667e9f6 100644
--- a/streams-monitoring/src/main/java/org/apache/streams/monitoring/persist/MessagePersister.java
+++ b/streams-monitoring/src/main/java/org/apache/streams/monitoring/persist/MessagePersister.java
@@ -15,20 +15,21 @@
  * specific language governing permissions and limitations
  * under the License.
  */
+
 package org.apache.streams.monitoring.persist;
 
 import java.util.List;
 
 /**
- * Interface to define how we persist messages (JMX/monitoring related)
+ * Interface to define how we persist messages (JMX/monitoring related).
  */
 public interface MessagePersister {
 
-    /**
-     * Given a list of messages, persist them out through whatever appropriate
-     * broadcast mechanism (HTTP request, SLF4J log, etc.)
-     * @param messages
-     * @return statusCode represents whether or not the persist was successful
-     */
-    int persistMessages(List<String> messages);
+  /**
+   * Given a list of messages, persist them out through whatever appropriate
+   * broadcast mechanism (HTTP request, SLF4J log, etc.).
+   * @param messages List of String messages
+   * @return statusCode represents whether or not the persist was successful
+   */
+  int persistMessages(List<String> messages);
 }

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-monitoring/src/main/java/org/apache/streams/monitoring/persist/impl/BroadcastMessagePersister.java
----------------------------------------------------------------------
diff --git a/streams-monitoring/src/main/java/org/apache/streams/monitoring/persist/impl/BroadcastMessagePersister.java b/streams-monitoring/src/main/java/org/apache/streams/monitoring/persist/impl/BroadcastMessagePersister.java
index bf0591f..1466f31 100644
--- a/streams-monitoring/src/main/java/org/apache/streams/monitoring/persist/impl/BroadcastMessagePersister.java
+++ b/streams-monitoring/src/main/java/org/apache/streams/monitoring/persist/impl/BroadcastMessagePersister.java
@@ -15,8 +15,11 @@
  * specific language governing permissions and limitations
  * under the License.
  */
+
 package org.apache.streams.monitoring.persist.impl;
 
+import org.apache.streams.monitoring.persist.MessagePersister;
+
 import com.google.common.collect.Lists;
 import org.apache.http.HttpResponse;
 import org.apache.http.NameValuePair;
@@ -25,70 +28,70 @@ import org.apache.http.client.entity.UrlEncodedFormEntity;
 import org.apache.http.client.methods.HttpPost;
 import org.apache.http.impl.client.HttpClients;
 import org.apache.http.message.BasicNameValuePair;
-import org.apache.streams.monitoring.persist.MessagePersister;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.List;
 
 public class BroadcastMessagePersister implements MessagePersister {
-    private final static Logger LOGGER = LoggerFactory.getLogger(BroadcastMessagePersister.class);
-    private String broadcastURI;
 
-    public BroadcastMessagePersister(String broadcastURI) {
-        this.broadcastURI = broadcastURI;
-    }
+  private static final Logger LOGGER = LoggerFactory.getLogger(BroadcastMessagePersister.class);
+  private String broadcastUri;
 
-    @Override
-    /**
-     * Given a list of messages as Strings, broadcast them to the broadcastURI
-     * (if one is defined)
-     * @param messages
-     * @return int status code from POST response
-     */
-    public int persistMessages(List<String> messages) {
-        int responseCode = -1;
+  public BroadcastMessagePersister(String broadcastUri) {
+    this.broadcastUri = broadcastUri;
+  }
 
-        if(broadcastURI != null) {
-            try {
-                HttpClient client = HttpClients.createDefault();
-                HttpPost post = new HttpPost(broadcastURI);
+  @Override
+  /**
+   * Given a list of messages as Strings, broadcast them to the broadcastUri
+   * (if one is defined)
+   * @param messages
+   * @return int status code from POST response
+   */
+  public int persistMessages(List<String> messages) {
+    int responseCode = -1;
 
-                post.setHeader("User-Agent", "Streams");
+    if (broadcastUri != null) {
+      try {
+        HttpClient client = HttpClients.createDefault();
+        HttpPost post = new HttpPost(broadcastUri);
 
-                List<NameValuePair> urlParameters = Lists.newArrayList();
-                urlParameters.add(new BasicNameValuePair("messages", serializeMessages(messages)));
+        post.setHeader("User-Agent", "Streams");
 
-                post.setEntity(new UrlEncodedFormEntity(urlParameters, "UTF-8"));
+        List<NameValuePair> urlParameters = Lists.newArrayList();
+        urlParameters.add(new BasicNameValuePair("messages", serializeMessages(messages)));
 
-                HttpResponse response = client.execute(post);
-                responseCode = response.getStatusLine().getStatusCode();
+        post.setEntity(new UrlEncodedFormEntity(urlParameters, "UTF-8"));
 
-                LOGGER.debug("Broadcast {} messages to URI: {}", messages.size(), broadcastURI);
-            } catch (Exception e) {
-                LOGGER.error("Failed to broadcast message to URI: {}, exception: {}", broadcastURI, e);
-            }
-        }
+        HttpResponse response = client.execute(post);
+        responseCode = response.getStatusLine().getStatusCode();
 
-        return responseCode;
+        LOGGER.debug("Broadcast {} messages to URI: {}", messages.size(), broadcastUri);
+      } catch (Exception ex) {
+        LOGGER.error("Failed to broadcast message to URI: {}, exception: {}", broadcastUri, ex);
+      }
     }
 
-    /**
-     * Given a List of String messages, convert them to a JSON array
-     * @param messages
-     * @return Serialized version of this JSON array
-     */
-    private String serializeMessages(List<String> messages) {
-        String ser = "{\"messages\":[";
-
-        for(String message : messages) {
-            if(messages.get(messages.size()-1).equals(message)) {
-                ser += message + "]}";
-            } else {
-                ser += message + ",";
-            }
-        }
-
-        return ser;
+    return responseCode;
+  }
+
+  /**
+   * Given a List of String messages, convert them to a JSON array.
+   * @param messages List of String messages
+   * @return Serialized version of this JSON array
+   */
+  private String serializeMessages(List<String> messages) {
+    String ser = "{\"messages\":[";
+
+    for (String message : messages) {
+      if (messages.get(messages.size() - 1).equals(message)) {
+        ser += message + "]}";
+      } else {
+        ser += message + ",";
+      }
     }
+
+    return ser;
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-monitoring/src/main/java/org/apache/streams/monitoring/persist/impl/LogstashUdpMessagePersister.java
----------------------------------------------------------------------
diff --git a/streams-monitoring/src/main/java/org/apache/streams/monitoring/persist/impl/LogstashUdpMessagePersister.java b/streams-monitoring/src/main/java/org/apache/streams/monitoring/persist/impl/LogstashUdpMessagePersister.java
index 312502c..c697661 100644
--- a/streams-monitoring/src/main/java/org/apache/streams/monitoring/persist/impl/LogstashUdpMessagePersister.java
+++ b/streams-monitoring/src/main/java/org/apache/streams/monitoring/persist/impl/LogstashUdpMessagePersister.java
@@ -15,90 +15,93 @@
  * specific language governing permissions and limitations
  * under the License.
  */
+
 package org.apache.streams.monitoring.persist.impl;
 
 import org.apache.streams.monitoring.persist.MessagePersister;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.net.DatagramPacket;
 import java.net.DatagramSocket;
-import java.net.InetAddress;
 import java.net.InetSocketAddress;
-import java.net.SocketAddress;
 import java.net.SocketException;
 import java.net.URI;
 import java.net.URISyntaxException;
 import java.nio.ByteBuffer;
 import java.util.List;
-import java.util.concurrent.Executors;
 
 public class LogstashUdpMessagePersister implements MessagePersister {
 
-    private final static Logger LOGGER = LoggerFactory.getLogger(LogstashUdpMessagePersister.class);
-    private String broadcastURI;
-    URI uri;
+  private static final Logger LOGGER = LoggerFactory.getLogger(LogstashUdpMessagePersister.class);
+  private String broadcastUri;
+  URI uri;
+
+  public LogstashUdpMessagePersister(String broadcastUri) {
+    this.broadcastUri = broadcastUri;
+    setup();
+  }
 
-    public LogstashUdpMessagePersister(String broadcastURI) {
-        this.broadcastURI = broadcastURI;
-        setup();
+  /**
+   * setup.
+   */
+  public void setup() {
+
+    try {
+      uri = new URI(broadcastUri);
+    } catch (URISyntaxException ex) {
+      LOGGER.error(ex.getMessage());
     }
 
-    public void setup() {
+  }
 
-        try {
-            uri = new URI(broadcastURI);
-        } catch (URISyntaxException e) {
-            LOGGER.error(e.getMessage());
-        }
+  @Override
+  /**
+   * Given a list of messages as Strings, broadcast them to the broadcastUri
+   * (if one is defined)
+   * @param messages
+   * @return int status code from POST response
+   */
+  public int persistMessages(List<String> messages) {
+    int responseCode = -1;
 
-    }
-    @Override
-    /**
-     * Given a list of messages as Strings, broadcast them to the broadcastURI
-     * (if one is defined)
-     * @param messages
-     * @return int status code from POST response
-     */
-    public int persistMessages(List<String> messages) {
-        int responseCode = -1;
-
-        if(broadcastURI != null) {
-            DatagramSocket socket = null;
-            try {
-                socket = new DatagramSocket();
-            } catch (SocketException e) {
-                LOGGER.error("Metrics Broadcast Setup Failed: " + e.getMessage());
-            }
-            try {
-                ByteBuffer toWrite = ByteBuffer.wrap(serializeMessages(messages).getBytes());
-                byte[] byteArray = toWrite.array();
-                DatagramPacket packet = new DatagramPacket(byteArray, byteArray.length);
-                socket.connect(new InetSocketAddress(uri.getHost(), uri.getPort()));
-                socket.send(packet);
-            } catch( Exception e ) {
-                LOGGER.error("Metrics Broadcast Failed: " + e.getMessage());
-            } finally {
-                socket.close();
-            }
-        }
-
-        return responseCode;
+    if (broadcastUri != null) {
+      DatagramSocket socket = null;
+      try {
+        socket = new DatagramSocket();
+      } catch (SocketException ex) {
+        LOGGER.error("Metrics Broadcast Setup Failed: " + ex.getMessage());
+      }
+      try {
+        ByteBuffer toWrite = ByteBuffer.wrap(serializeMessages(messages).getBytes());
+        byte[] byteArray = toWrite.array();
+        DatagramPacket packet = new DatagramPacket(byteArray, byteArray.length);
+        socket.connect(new InetSocketAddress(uri.getHost(), uri.getPort()));
+        socket.send(packet);
+      } catch ( Exception ex ) {
+        LOGGER.error("Metrics Broadcast Failed: " + ex.getMessage());
+      } finally {
+        socket.close();
+      }
     }
 
-    /**
-     * Given a List of String messages, convert them to a JSON array
-     * @param messages
-     * @return Serialized version of this JSON array
-     */
-    private String serializeMessages(List<String> messages) {
+    return responseCode;
+  }
 
-        StringBuilder json_lines = new StringBuilder();
-        for(String message : messages) {
-            json_lines.append(message).append('\n');
-        }
+  /**
+   * Given a List of String messages, convert them to a JSON array.
+   * @param messages List of String messages
+   * @return Serialized version of this JSON array
+   */
+  private String serializeMessages(List<String> messages) {
 
-        return json_lines.toString();
+    StringBuilder jsonLines = new StringBuilder();
+    for (String message : messages) {
+      jsonLines.append(message).append('\n');
     }
 
+    return jsonLines.toString();
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-monitoring/src/main/java/org/apache/streams/monitoring/persist/impl/SLF4JMessagePersister.java
----------------------------------------------------------------------
diff --git a/streams-monitoring/src/main/java/org/apache/streams/monitoring/persist/impl/SLF4JMessagePersister.java b/streams-monitoring/src/main/java/org/apache/streams/monitoring/persist/impl/SLF4JMessagePersister.java
deleted file mode 100644
index 19c36f2..0000000
--- a/streams-monitoring/src/main/java/org/apache/streams/monitoring/persist/impl/SLF4JMessagePersister.java
+++ /dev/null
@@ -1,43 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.streams.monitoring.persist.impl;
-
-import org.apache.streams.monitoring.persist.MessagePersister;
-import org.slf4j.Logger;
-
-import java.util.List;
-
-public class SLF4JMessagePersister implements MessagePersister {
-    private static final Logger LOGGER = org.slf4j.LoggerFactory.getLogger(SLF4JMessagePersister.class);
-    private static final int SUCCESS_STATUS = 0;
-    private static final int FAILURE_STATUS = -1;
-
-    public SLF4JMessagePersister() {
-
-    }
-
-    @Override
-    public int persistMessages(List<String> messages) {
-        for(String message : messages) {
-            LOGGER.info(message);
-        }
-
-        return SUCCESS_STATUS;
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-monitoring/src/main/java/org/apache/streams/monitoring/persist/impl/Slf4jMessagePersister.java
----------------------------------------------------------------------
diff --git a/streams-monitoring/src/main/java/org/apache/streams/monitoring/persist/impl/Slf4jMessagePersister.java b/streams-monitoring/src/main/java/org/apache/streams/monitoring/persist/impl/Slf4jMessagePersister.java
new file mode 100644
index 0000000..b237871
--- /dev/null
+++ b/streams-monitoring/src/main/java/org/apache/streams/monitoring/persist/impl/Slf4jMessagePersister.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.streams.monitoring.persist.impl;
+
+import org.apache.streams.monitoring.persist.MessagePersister;
+
+import org.slf4j.Logger;
+
+import java.util.List;
+
+/**
+ * Persist montoring messages to SLF4J.
+ */
+public class Slf4jMessagePersister implements MessagePersister {
+
+  private static final Logger LOGGER = org.slf4j.LoggerFactory.getLogger(Slf4jMessagePersister.class);
+  private static final int SUCCESS_STATUS = 0;
+  private static final int FAILURE_STATUS = -1;
+
+  public Slf4jMessagePersister() {
+
+  }
+
+  @Override
+  public int persistMessages(List<String> messages) {
+
+    for (String message : messages) {
+      LOGGER.info(message);
+    }
+
+    return SUCCESS_STATUS;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-monitoring/src/main/java/org/apache/streams/monitoring/tasks/BroadcastMonitorThread.java
----------------------------------------------------------------------
diff --git a/streams-monitoring/src/main/java/org/apache/streams/monitoring/tasks/BroadcastMonitorThread.java b/streams-monitoring/src/main/java/org/apache/streams/monitoring/tasks/BroadcastMonitorThread.java
index f21a212..a797ce5 100644
--- a/streams-monitoring/src/main/java/org/apache/streams/monitoring/tasks/BroadcastMonitorThread.java
+++ b/streams-monitoring/src/main/java/org/apache/streams/monitoring/tasks/BroadcastMonitorThread.java
@@ -15,189 +15,204 @@
  * specific language governing permissions and limitations
  * under the License.
  */
+
 package org.apache.streams.monitoring.tasks;
 
-import com.fasterxml.jackson.databind.DeserializationFeature;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.databind.module.SimpleModule;
-import com.google.common.collect.Lists;
 import org.apache.streams.config.ComponentConfigurator;
-import org.apache.streams.config.StreamsConfigurator;
 import org.apache.streams.config.StreamsConfiguration;
-import org.apache.streams.jackson.*;
+import org.apache.streams.config.StreamsConfigurator;
+import org.apache.streams.jackson.DatumStatusCounterDeserializer;
+import org.apache.streams.jackson.MemoryUsageDeserializer;
+import org.apache.streams.jackson.StreamsJacksonMapper;
+import org.apache.streams.jackson.StreamsTaskCounterDeserializer;
+import org.apache.streams.jackson.ThroughputQueueDeserializer;
 import org.apache.streams.local.monitoring.MonitoringConfiguration;
 import org.apache.streams.monitoring.persist.MessagePersister;
 import org.apache.streams.monitoring.persist.impl.BroadcastMessagePersister;
 import org.apache.streams.monitoring.persist.impl.LogstashUdpMessagePersister;
-import org.apache.streams.monitoring.persist.impl.SLF4JMessagePersister;
+import org.apache.streams.monitoring.persist.impl.Slf4jMessagePersister;
 import org.apache.streams.pojo.json.Broadcast;
 import org.apache.streams.pojo.json.DatumStatusCounterBroadcast;
 import org.apache.streams.pojo.json.MemoryUsageBroadcast;
 import org.apache.streams.pojo.json.StreamsTaskCounterBroadcast;
 import org.apache.streams.pojo.json.ThroughputQueueBroadcast;
+
+import com.fasterxml.jackson.databind.DeserializationFeature;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.module.SimpleModule;
+import com.google.common.collect.Lists;
 import org.slf4j.Logger;
 
-import javax.management.*;
 import java.lang.management.ManagementFactory;
 import java.net.URI;
-import java.net.URISyntaxException;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import javax.management.MBeanServer;
+import javax.management.NotificationBroadcasterSupport;
+import javax.management.ObjectName;
 
 /**
  * This thread runs inside of a Streams runtime and periodically persists information
- * from relevant JMX beans
+ * from relevant JMX beans.
  */
 public class BroadcastMonitorThread extends NotificationBroadcasterSupport implements Runnable {
-    private static final Logger LOGGER = org.slf4j.LoggerFactory.getLogger(BroadcastMonitorThread.class);
-    private static MBeanServer server;
-
-    private MonitoringConfiguration configuration;
-    private URI broadcastURI = null;
-    private MessagePersister messagePersister;
-    private volatile boolean keepRunning;
-
-    private static ObjectMapper objectMapper = StreamsJacksonMapper.getInstance();
-
-    /**
-     * DEPRECATED
-     * Please initialize logging with monitoring object via typesafe
-     * @param streamConfig
-     */
-    @Deprecated
-    public BroadcastMonitorThread(Map<String, Object> streamConfig) {
-        this(objectMapper.convertValue(streamConfig, MonitoringConfiguration.class));
-    }
-
-    public BroadcastMonitorThread(StreamsConfiguration streamConfig) {
-        this(objectMapper.convertValue(streamConfig.getAdditionalProperties().get("monitoring"), MonitoringConfiguration.class));
-    }
-
-    public BroadcastMonitorThread(MonitoringConfiguration configuration) {
-
-        this.configuration = configuration;
-        if( this.configuration == null )
-            this.configuration = new ComponentConfigurator<>(MonitoringConfiguration.class).detectConfiguration(StreamsConfigurator.getConfig().atPath("monitoring"));
-
-        LOGGER.info("BroadcastMonitorThread created");
-
-        initializeObjectMapper();
-
-        prepare();
-
-        LOGGER.info("BroadcastMonitorThread initialized");
 
+  private static final Logger LOGGER = org.slf4j.LoggerFactory.getLogger(BroadcastMonitorThread.class);
+  private static MBeanServer server;
+
+  private MonitoringConfiguration configuration;
+  private URI broadcastUri = null;
+  private MessagePersister messagePersister;
+  private volatile boolean keepRunning;
+
+  private static ObjectMapper objectMapper = StreamsJacksonMapper.getInstance();
+
+  /**
+   * DEPRECATED
+   * Please initialize logging with monitoring object via typesafe.
+   * @param streamConfig streamConfig map.
+   */
+  @Deprecated
+  public BroadcastMonitorThread(Map<String, Object> streamConfig) {
+    this(objectMapper.convertValue(streamConfig, MonitoringConfiguration.class));
+  }
+
+  public BroadcastMonitorThread(StreamsConfiguration streamConfig) {
+    this(objectMapper.convertValue(streamConfig.getAdditionalProperties().get("monitoring"), MonitoringConfiguration.class));
+  }
+
+  /**
+   * BroadcastMonitorThread constructor - uses supplied MonitoringConfiguration.
+   * @param configuration MonitoringConfiguration
+   */
+  public BroadcastMonitorThread(MonitoringConfiguration configuration) {
+
+    this.configuration = configuration;
+    if ( this.configuration == null ) {
+      this.configuration = new ComponentConfigurator<>(MonitoringConfiguration.class).detectConfiguration(StreamsConfigurator.getConfig().atPath("monitoring"));
     }
 
-    /**
-     * Initialize our object mapper with all of our bean's custom deserializers
-     * This way we can convert them to and from Strings dictated by our
-     * POJOs which are generated from JSON schemas
-     */
-    private void initializeObjectMapper() {
-        SimpleModule simpleModule = new SimpleModule();
-
-        simpleModule.addDeserializer(MemoryUsageBroadcast.class, new MemoryUsageDeserializer());
-        simpleModule.addDeserializer(ThroughputQueueBroadcast.class, new ThroughputQueueDeserializer());
-        simpleModule.addDeserializer(StreamsTaskCounterBroadcast.class, new StreamsTaskCounterDeserializer());
-        simpleModule.addDeserializer(DatumStatusCounterBroadcast.class, new DatumStatusCounterDeserializer());
-
-        objectMapper.registerModule(simpleModule);
-        objectMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
-    }
+    LOGGER.info("BroadcastMonitorThread created");
+
+    initializeObjectMapper();
+
+    prepare();
+
+    LOGGER.info("BroadcastMonitorThread initialized");
+
+  }
+
+  /**
+   * Initialize our object mapper with all of our bean's custom deserializers.
+   * This way we can convert them to and from Strings dictated by our
+   * POJOs which are generated from JSON schemas.
+   */
+  private void initializeObjectMapper() {
+    SimpleModule simpleModule = new SimpleModule();
+
+    simpleModule.addDeserializer(MemoryUsageBroadcast.class, new MemoryUsageDeserializer());
+    simpleModule.addDeserializer(ThroughputQueueBroadcast.class, new ThroughputQueueDeserializer());
+    simpleModule.addDeserializer(StreamsTaskCounterBroadcast.class, new StreamsTaskCounterDeserializer());
+    simpleModule.addDeserializer(DatumStatusCounterBroadcast.class, new DatumStatusCounterDeserializer());
+
+    objectMapper.registerModule(simpleModule);
+    objectMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
+  }
+
+  /**
+   * Get all relevant JMX beans, convert their values to strings, and then persist them.
+   */
+  @Override
+  public void run() {
+    LOGGER.info("BroadcastMonitorThread running");
+    while (keepRunning) {
+      try {
+        List<String> messages = Lists.newArrayList();
+        Set<ObjectName> beans = server.queryNames(null, null);
+
+        for (ObjectName name : beans) {
+          String item = objectMapper.writeValueAsString(name);
+          Broadcast broadcast = null;
+
+          if (name.getKeyPropertyList().get("type") != null) {
+            if (name.getKeyPropertyList().get("type").equals("ThroughputQueue")) {
+              broadcast = objectMapper.readValue(item, ThroughputQueueBroadcast.class);
+            } else if (name.getKeyPropertyList().get("type").equals("StreamsTaskCounter")) {
+              broadcast = objectMapper.readValue(item, StreamsTaskCounterBroadcast.class);
+            } else if (name.getKeyPropertyList().get("type").equals("DatumStatusCounter")) {
+              broadcast = objectMapper.readValue(item, DatumStatusCounterBroadcast.class);
+            } else if (name.getKeyPropertyList().get("type").equals("Memory")) {
+              broadcast = objectMapper.readValue(item, MemoryUsageBroadcast.class);
+            }
 
-    /**
-     * Get all relevant JMX beans, convert their values to strings, and then persist them
-     */
-    @Override
-    public void run() {
-        LOGGER.info("BroadcastMonitorThread running");
-        while(keepRunning) {
-            try {
-                List<String> messages = Lists.newArrayList();
-                Set<ObjectName> beans = server.queryNames(null, null);
-
-                for(ObjectName name : beans) {
-                    String item = objectMapper.writeValueAsString(name);
-                    Broadcast broadcast = null;
-
-                    if(name.getKeyPropertyList().get("type") != null) {
-                        if (name.getKeyPropertyList().get("type").equals("ThroughputQueue")) {
-                            broadcast = objectMapper.readValue(item, ThroughputQueueBroadcast.class);
-                        } else if (name.getKeyPropertyList().get("type").equals("StreamsTaskCounter")) {
-                            broadcast = objectMapper.readValue(item, StreamsTaskCounterBroadcast.class);
-                        } else if (name.getKeyPropertyList().get("type").equals("DatumStatusCounter")) {
-                            broadcast = objectMapper.readValue(item, DatumStatusCounterBroadcast.class);
-                        } else if (name.getKeyPropertyList().get("type").equals("Memory")) {
-                            broadcast = objectMapper.readValue(item, MemoryUsageBroadcast.class);
-                        }
-
-                        if(broadcast != null) {
-                            messages.add(objectMapper.writeValueAsString(broadcast));
-                        }
-                    }
-                }
-
-                messagePersister.persistMessages(messages);
-                Thread.sleep(configuration.getMonitoringBroadcastIntervalMs());
-            } catch (InterruptedException e) {
-                LOGGER.debug("Broadcast Monitor Interrupted!");
-                Thread.currentThread().interrupt();
-                this.keepRunning = false;
-            } catch (Exception e) {
-                LOGGER.error("Exception: {}", e);
-                this.keepRunning = false;
+            if (broadcast != null) {
+              messages.add(objectMapper.writeValueAsString(broadcast));
             }
+          }
         }
+
+        messagePersister.persistMessages(messages);
+        Thread.sleep(configuration.getMonitoringBroadcastIntervalMs());
+      } catch (InterruptedException ex) {
+        LOGGER.debug("Broadcast Monitor Interrupted!");
+        Thread.currentThread().interrupt();
+        this.keepRunning = false;
+      } catch (Exception ex) {
+        LOGGER.error("Exception: {}", ex);
+        this.keepRunning = false;
+      }
     }
+  }
 
-    public void prepare() {
+  /**
+   * prepare for execution.
+   */
+  public void prepare() {
 
-        keepRunning = true;
+    keepRunning = true;
 
-        LOGGER.info("BroadcastMonitorThread setup " + this.configuration);
+    LOGGER.info("BroadcastMonitorThread setup " + this.configuration);
 
-        server = ManagementFactory.getPlatformMBeanServer();
+    server = ManagementFactory.getPlatformMBeanServer();
 
-        if (this.configuration != null &&
-            this.configuration.getBroadcastURI() != null) {
+    if (this.configuration != null && this.configuration.getBroadcastURI() != null) {
 
-            try {
-                broadcastURI = new URI(configuration.getBroadcastURI());
-            } catch (Exception e) {
-                LOGGER.error("invalid URI: ", e);
-            }
+      try {
+        broadcastUri = new URI(configuration.getBroadcastURI());
+      } catch (Exception ex) {
+        LOGGER.error("invalid URI: ", ex);
+      }
 
-            if (broadcastURI != null) {
-                if (broadcastURI.getScheme().equals("http")) {
-                    messagePersister = new BroadcastMessagePersister(broadcastURI.toString());
-                } else if (broadcastURI.getScheme().equals("udp")) {
-                    messagePersister = new LogstashUdpMessagePersister(broadcastURI.toString());
-                } else {
-                    LOGGER.error("You need to specify a broadcast URI with either a HTTP or UDP protocol defined.");
-                    throw new RuntimeException();
-                }
-            } else {
-                messagePersister = new SLF4JMessagePersister();
-            }
+      if (broadcastUri != null) {
+        if (broadcastUri.getScheme().equals("http")) {
+          messagePersister = new BroadcastMessagePersister(broadcastUri.toString());
+        } else if (broadcastUri.getScheme().equals("udp")) {
+          messagePersister = new LogstashUdpMessagePersister(broadcastUri.toString());
         } else {
-            messagePersister = new SLF4JMessagePersister();
+          LOGGER.error("You need to specify a broadcast URI with either a HTTP or UDP protocol defined.");
+          throw new RuntimeException();
         }
-
+      } else {
+        messagePersister = new Slf4jMessagePersister();
+      }
+    } else {
+      messagePersister = new Slf4jMessagePersister();
     }
 
-    public void shutdown() {
-        this.keepRunning = false;
-        LOGGER.debug("Shutting down BroadcastMonitor Thread");
-    }
+  }
 
-    public String getBroadcastURI() {
-        return configuration.getBroadcastURI();
-    }
+  public void shutdown() {
+    this.keepRunning = false;
+    LOGGER.debug("Shutting down BroadcastMonitor Thread");
+  }
 
-    public long getWaitTime() {
-        return configuration.getMonitoringBroadcastIntervalMs();
-    }
+  public String getBroadcastUri() {
+    return configuration.getBroadcastURI();
+  }
+
+  public long getWaitTime() {
+    return configuration.getMonitoringBroadcastIntervalMs();
+  }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-monitoring/src/test/java/org/apache/streams/jackson/MemoryUsageDeserializerTest.java
----------------------------------------------------------------------
diff --git a/streams-monitoring/src/test/java/org/apache/streams/jackson/MemoryUsageDeserializerTest.java b/streams-monitoring/src/test/java/org/apache/streams/jackson/MemoryUsageDeserializerTest.java
index 1c68239..8bf3219 100644
--- a/streams-monitoring/src/test/java/org/apache/streams/jackson/MemoryUsageDeserializerTest.java
+++ b/streams-monitoring/src/test/java/org/apache/streams/jackson/MemoryUsageDeserializerTest.java
@@ -15,13 +15,15 @@
  * specific language governing permissions and limitations
  * under the License.
  */
+
 package org.apache.streams.jackson;
 
+import org.apache.streams.pojo.json.MemoryUsageBroadcast;
+
 import com.fasterxml.jackson.databind.DeserializationFeature;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.fasterxml.jackson.databind.module.SimpleModule;
 import org.apache.commons.lang3.StringUtils;
-import org.apache.streams.pojo.json.MemoryUsageBroadcast;
 import org.junit.Before;
 import org.junit.Test;
 import org.slf4j.Logger;
@@ -35,43 +37,46 @@ import static org.junit.Assert.assertNotNull;
 
 public class MemoryUsageDeserializerTest {
 
-    private final static Logger LOGGER = LoggerFactory.getLogger(MemoryUsageDeserializerTest.class);
-    private ObjectMapper objectMapper;
+  private static final Logger LOGGER = LoggerFactory.getLogger(MemoryUsageDeserializerTest.class);
+  private ObjectMapper objectMapper;
 
-    @Before
-    public void setup() {
-        objectMapper = StreamsJacksonMapper.getInstance();
-        SimpleModule simpleModule = new SimpleModule();
-        simpleModule.addDeserializer(MemoryUsageBroadcast.class, new MemoryUsageDeserializer());
-        objectMapper.registerModule(simpleModule);
-        objectMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
-    }
+  /**
+   * setup.
+   */
+  @Before
+  public void setup() {
+    objectMapper = StreamsJacksonMapper.getInstance();
+    SimpleModule simpleModule = new SimpleModule();
+    simpleModule.addDeserializer(MemoryUsageBroadcast.class, new MemoryUsageDeserializer());
+    objectMapper.registerModule(simpleModule);
+    objectMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
+  }
 
-    @Test
-    public void serDeTest() {
-        InputStream is = MemoryUsageDeserializerTest.class.getResourceAsStream("/MemoryUsageObjects.json");
-        InputStreamReader isr = new InputStreamReader(is);
-        BufferedReader br = new BufferedReader(isr);
+  @Test
+  public void serDeTest() {
+    InputStream is = MemoryUsageDeserializerTest.class.getResourceAsStream("/MemoryUsageObjects.json");
+    InputStreamReader isr = new InputStreamReader(is);
+    BufferedReader br = new BufferedReader(isr);
 
-        try {
-            while (br.ready()) {
-                String line = br.readLine();
-                if (!StringUtils.isEmpty(line)) {
-                    LOGGER.info("raw: {}", line);
-                    MemoryUsageBroadcast broadcast = objectMapper.readValue(line, MemoryUsageBroadcast.class);
+    try {
+      while (br.ready()) {
+        String line = br.readLine();
+        if (!StringUtils.isEmpty(line)) {
+          LOGGER.info("raw: {}", line);
+          MemoryUsageBroadcast broadcast = objectMapper.readValue(line, MemoryUsageBroadcast.class);
 
-                    LOGGER.info("activity: {}", broadcast);
+          LOGGER.info("activity: {}", broadcast);
 
-                    assertNotNull(broadcast);
-                    assertNotNull(broadcast.getVerbose());
-                    assertNotNull(broadcast.getObjectPendingFinalizationCount());
-                    assertNotNull(broadcast.getHeapMemoryUsage());
-                    assertNotNull(broadcast.getNonHeapMemoryUsage());
-                    assertNotNull(broadcast.getName());
-                }
-            }
-        } catch (Exception e) {
-            LOGGER.error("Exception while testing serializability: {}", e);
+          assertNotNull(broadcast);
+          assertNotNull(broadcast.getVerbose());
+          assertNotNull(broadcast.getObjectPendingFinalizationCount());
+          assertNotNull(broadcast.getHeapMemoryUsage());
+          assertNotNull(broadcast.getNonHeapMemoryUsage());
+          assertNotNull(broadcast.getName());
         }
+      }
+    } catch (Exception ex) {
+      LOGGER.error("Exception while testing serializability: {}", ex);
     }
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-monitoring/src/test/java/org/apache/streams/monitoring/persist/impl/BroadcastMessagePersisterTest.java
----------------------------------------------------------------------
diff --git a/streams-monitoring/src/test/java/org/apache/streams/monitoring/persist/impl/BroadcastMessagePersisterTest.java b/streams-monitoring/src/test/java/org/apache/streams/monitoring/persist/impl/BroadcastMessagePersisterTest.java
index 6e7ff6d..fc2ff71 100644
--- a/streams-monitoring/src/test/java/org/apache/streams/monitoring/persist/impl/BroadcastMessagePersisterTest.java
+++ b/streams-monitoring/src/test/java/org/apache/streams/monitoring/persist/impl/BroadcastMessagePersisterTest.java
@@ -15,6 +15,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
+
 package org.apache.streams.monitoring.persist.impl;
 
 import com.google.common.collect.Lists;
@@ -28,33 +29,33 @@ import static org.junit.Assert.assertNotNull;
 
 public class BroadcastMessagePersisterTest {
 
-    @Test
-    public void testFailedPersist() {
-        BroadcastMessagePersister persister = new BroadcastMessagePersister("http://fake.url.com/fake_endpointasdfasdfas");
-
-        List<String> messages = Lists.newArrayList();
-        for(int x = 0; x < 10; x ++) {
-            messages.add("Fake_message #" + x);
-        }
-
-        int statusCode = persister.persistMessages(messages);
+  @Test
+  public void testFailedPersist() {
+    BroadcastMessagePersister persister = new BroadcastMessagePersister("http://fake.url.com/fake_endpointasdfasdfas");
 
-        assertNotNull(statusCode);
-        assertNotEquals(statusCode, 200);
+    List<String> messages = Lists.newArrayList();
+    for (int x = 0; x < 10; x++) {
+      messages.add("Fake_message #" + x);
     }
 
-    @Test
-    public void testInvalidURL() {
-        BroadcastMessagePersister persister = new BroadcastMessagePersister("h");
+    int statusCode = persister.persistMessages(messages);
 
-        List<String> messages = Lists.newArrayList();
-        for(int x = 0; x < 10; x ++) {
-            messages.add("Fake_message #" + x);
-        }
+    assertNotNull(statusCode);
+    assertNotEquals(statusCode, 200);
+  }
 
-        int statusCode = persister.persistMessages(messages);
+  @Test
+  public void testInvalidUrl() {
+    BroadcastMessagePersister persister = new BroadcastMessagePersister("h");
 
-        assertNotNull(statusCode);
-        assertEquals(statusCode, -1);
+    List<String> messages = Lists.newArrayList();
+    for (int x = 0; x < 10; x++) {
+      messages.add("Fake_message #" + x);
     }
+
+    int statusCode = persister.persistMessages(messages);
+
+    assertNotNull(statusCode);
+    assertEquals(statusCode, -1);
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-monitoring/src/test/java/org/apache/streams/monitoring/persist/impl/LogstashUdpMessagePersisterTest.java
----------------------------------------------------------------------
diff --git a/streams-monitoring/src/test/java/org/apache/streams/monitoring/persist/impl/LogstashUdpMessagePersisterTest.java b/streams-monitoring/src/test/java/org/apache/streams/monitoring/persist/impl/LogstashUdpMessagePersisterTest.java
index faa99a2..3f9a4c1 100644
--- a/streams-monitoring/src/test/java/org/apache/streams/monitoring/persist/impl/LogstashUdpMessagePersisterTest.java
+++ b/streams-monitoring/src/test/java/org/apache/streams/monitoring/persist/impl/LogstashUdpMessagePersisterTest.java
@@ -15,6 +15,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
+
 package org.apache.streams.monitoring.persist.impl;
 
 import com.google.common.base.Splitter;
@@ -29,47 +30,51 @@ import java.net.DatagramSocket;
 import java.net.SocketException;
 import java.util.List;
 
-import static org.junit.Assert.*;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
 
 public class LogstashUdpMessagePersisterTest {
 
-    private static final Logger LOGGER = org.slf4j.LoggerFactory.getLogger(LogstashUdpMessagePersisterTest.class);
+  private static final Logger LOGGER = org.slf4j.LoggerFactory.getLogger(LogstashUdpMessagePersisterTest.class);
 
-    DatagramSocket socket = null;
+  DatagramSocket socket = null;
 
-    @Before
-    public void setup() {
-        try {
-            socket = new DatagramSocket(56789);
-        } catch (SocketException e) {
-            LOGGER.error("Metrics Broadcast Test Setup Failed: " + e.getMessage());
-        }
+  /**
+   * setup.
+   */
+  @Before
+  public void setup() {
+    try {
+      socket = new DatagramSocket(56789);
+    } catch (SocketException ex) {
+      LOGGER.error("Metrics Broadcast Test Setup Failed: " + ex.getMessage());
     }
+  }
 
 
-    @Test
-    public void testFailedPersist() {
-        LogstashUdpMessagePersister persister = new LogstashUdpMessagePersister("udp://127.0.0.1:56789");
-
-        List<String> messageArray = Lists.newArrayList();
-        for(int x = 0; x < 10; x ++) {
-            messageArray.add("Fake_message #" + x);
-        }
+  @Test
+  public void testFailedPersist() {
+    LogstashUdpMessagePersister persister = new LogstashUdpMessagePersister("udp://127.0.0.1:56789");
 
-        persister.persistMessages(messageArray);
-        byte[] receiveData = new byte[1024];
+    List<String> messageArray = Lists.newArrayList();
+    for (int x = 0; x < 10; x ++) {
+      messageArray.add("Fake_message #" + x);
+    }
 
-        DatagramPacket messageDatagram = new DatagramPacket(receiveData, receiveData.length);
+    persister.persistMessages(messageArray);
+    byte[] receiveData = new byte[1024];
 
-        try {
-            socket.receive(messageDatagram);
-            assertNotNull(messageDatagram);
-            List<String> messages = Lists.newArrayList(Splitter.on('\n').split(new String(messageDatagram.getData())));
-            assertEquals(messageArray, messages.subList(0,10));
-        } catch (IOException e) {
-            LOGGER.error("Metrics Broadcast Test Failed: " + e.getMessage());
-        }
+    DatagramPacket messageDatagram = new DatagramPacket(receiveData, receiveData.length);
 
+    try {
+      socket.receive(messageDatagram);
+      assertNotNull(messageDatagram);
+      List<String> messages = Lists.newArrayList(Splitter.on('\n').split(new String(messageDatagram.getData())));
+      assertEquals(messageArray, messages.subList(0,10));
+    } catch (IOException ex) {
+      LOGGER.error("Metrics Broadcast Test Failed: " + ex.getMessage());
     }
 
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-monitoring/src/test/java/org/apache/streams/monitoring/tasks/BroadcastMonitorThreadTest.java
----------------------------------------------------------------------
diff --git a/streams-monitoring/src/test/java/org/apache/streams/monitoring/tasks/BroadcastMonitorThreadTest.java b/streams-monitoring/src/test/java/org/apache/streams/monitoring/tasks/BroadcastMonitorThreadTest.java
index a959bd2..ad1bf05 100644
--- a/streams-monitoring/src/test/java/org/apache/streams/monitoring/tasks/BroadcastMonitorThreadTest.java
+++ b/streams-monitoring/src/test/java/org/apache/streams/monitoring/tasks/BroadcastMonitorThreadTest.java
@@ -15,67 +15,59 @@
  * specific language governing permissions and limitations
  * under the License.
  */
+
 package org.apache.streams.monitoring.tasks;
 
-import com.google.common.collect.Maps;
 import org.apache.streams.config.StreamsConfiguration;
 import org.apache.streams.local.monitoring.MonitoringConfiguration;
-import org.junit.Ignore;
+
 import org.junit.Test;
 
-import java.util.Map;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 
 public class BroadcastMonitorThreadTest {
-    private ExecutorService executor;
 
-    @Test
-    public void testThreadEmptyBeanConfig() {
-        StreamsConfiguration streamsConfiguration = new StreamsConfiguration();
-        BroadcastMonitorThread thread = new BroadcastMonitorThread(streamsConfiguration);
-        testThread(thread);
-    }
+  private ExecutorService executor;
 
-    @Test
-    public void testThreadEmptyMapConfig() {
-        Map<String, Object> map = Maps.newHashMap();
-        BroadcastMonitorThread thread = new BroadcastMonitorThread(map);
-        testThread(thread);
-    }
+  @Test
+  public void testThreadEmptyBeanConfig() {
+    StreamsConfiguration streamsConfiguration = new StreamsConfiguration();
+    BroadcastMonitorThread thread = new BroadcastMonitorThread(streamsConfiguration);
+    testThread(thread);
+  }
 
-    @Test
-    public void testThreadFakeMapConfig() {
-        Map<String, Object> config = Maps.newHashMap();
-        config.put("broadcastURI", "http://fakeurl.com/fake");
-        BroadcastMonitorThread thread = new BroadcastMonitorThread(config);
-        testThread(thread);
-    }
 
-    @Test
-    public void testThreadStreamsConfig() {
 
-        StreamsConfiguration streams = new StreamsConfiguration();
-        MonitoringConfiguration monitoring = new MonitoringConfiguration();
-        monitoring.setBroadcastURI("http://fakeurl.com/fake");
-        monitoring.setMonitoringBroadcastIntervalMs(30000L);
-        streams.setAdditionalProperty("monitoring", monitoring);
-        BroadcastMonitorThread thread = new BroadcastMonitorThread(streams);
-        testThread(thread);
-    }
 
-    public void testThread(BroadcastMonitorThread thread) {
-        long testRunLength = thread.getWaitTime() * 1;
-        executor = Executors.newFixedThreadPool(1);
-        executor.submit(thread);
+  @Test
+  public void testThreadStreamsConfig() {
 
-        try {
-            Thread.sleep(testRunLength);
-        } catch(InterruptedException e) {
-            Thread.currentThread().interrupt();
-        }
+    StreamsConfiguration streams = new StreamsConfiguration();
+    MonitoringConfiguration monitoring = new MonitoringConfiguration();
+    monitoring.setBroadcastURI("http://fakeurl.com/fake");
+    monitoring.setMonitoringBroadcastIntervalMs(30000L);
+    streams.setAdditionalProperty("monitoring", monitoring);
+    BroadcastMonitorThread thread = new BroadcastMonitorThread(streams);
+    testThread(thread);
+  }
 
-        executor.shutdown();
+  /**
+   * Base Test.
+   * @param thread BroadcastMonitorThread
+   */
+  public void testThread(BroadcastMonitorThread thread) {
+    long testRunLength = thread.getWaitTime() * 1;
+    executor = Executors.newFixedThreadPool(1);
+    executor.submit(thread);
+
+    try {
+      Thread.sleep(testRunLength);
+    } catch (InterruptedException ex) {
+      Thread.currentThread().interrupt();
     }
 
+    executor.shutdown();
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-plugins/streams-plugin-cassandra/src/main/java/org/apache/streams/plugins/cassandra/StreamsCassandraGenerationConfig.java
----------------------------------------------------------------------
diff --git a/streams-plugins/streams-plugin-cassandra/src/main/java/org/apache/streams/plugins/cassandra/StreamsCassandraGenerationConfig.java b/streams-plugins/streams-plugin-cassandra/src/main/java/org/apache/streams/plugins/cassandra/StreamsCassandraGenerationConfig.java
index 964fff6..971b99f 100644
--- a/streams-plugins/streams-plugin-cassandra/src/main/java/org/apache/streams/plugins/cassandra/StreamsCassandraGenerationConfig.java
+++ b/streams-plugins/streams-plugin-cassandra/src/main/java/org/apache/streams/plugins/cassandra/StreamsCassandraGenerationConfig.java
@@ -16,9 +16,11 @@
  * specific language governing permissions and limitations
  * under the License.
  */
+
 package org.apache.streams.plugins.cassandra;
 
 import org.apache.streams.util.schema.GenerationConfig;
+
 import org.jsonschema2pojo.DefaultGenerationConfig;
 import org.jsonschema2pojo.util.URLUtil;
 
@@ -32,68 +34,71 @@ import java.util.List;
 import java.util.Set;
 
 /**
- * Configures StreamsHiveResourceGenerator
- *
- *
+ * Configures StreamsCassandraResourceGenerator.
  */
 public class StreamsCassandraGenerationConfig extends DefaultGenerationConfig implements GenerationConfig {
 
-    public String getSourceDirectory() {
-        return sourceDirectory;
-    }
+  public String getSourceDirectory() {
+    return sourceDirectory;
+  }
 
-    public List<String> getSourcePaths() {
-        return sourcePaths;
-    }
+  public List<String> getSourcePaths() {
+    return sourcePaths;
+  }
 
-    private String sourceDirectory;
-    private List<String> sourcePaths = new ArrayList<String>();
-    private String targetDirectory;
-    private int maxDepth = 1;
+  private String sourceDirectory;
+  private List<String> sourcePaths = new ArrayList<String>();
+  private String targetDirectory;
+  private int maxDepth = 1;
 
-    public Set<String> getExclusions() {
-        return exclusions;
-    }
+  public Set<String> getExclusions() {
+    return exclusions;
+  }
 
-    public void setExclusions(Set<String> exclusions) {
-        this.exclusions = exclusions;
-    }
+  public void setExclusions(Set<String> exclusions) {
+    this.exclusions = exclusions;
+  }
 
-    private Set<String> exclusions = new HashSet<String>();
+  private Set<String> exclusions = new HashSet<String>();
 
-    public int getMaxDepth() {
-        return maxDepth;
-    }
+  public int getMaxDepth() {
+    return maxDepth;
+  }
 
-    public void setSourceDirectory(String sourceDirectory) {
-        this.sourceDirectory = sourceDirectory;
-    }
+  public void setSourceDirectory(String sourceDirectory) {
+    this.sourceDirectory = sourceDirectory;
+  }
 
-    public void setSourcePaths(List<String> sourcePaths) {
-        this.sourcePaths = sourcePaths;
-    }
+  public void setSourcePaths(List<String> sourcePaths) {
+    this.sourcePaths = sourcePaths;
+  }
 
-    public void setTargetDirectory(String targetDirectory) {
-        this.targetDirectory = targetDirectory;
-    }
+  public void setTargetDirectory(String targetDirectory) {
+    this.targetDirectory = targetDirectory;
+  }
 
-    public File getTargetDirectory() {
-        return new File(targetDirectory);
-    }
+  public File getTargetDirectory() {
+    return new File(targetDirectory);
+  }
 
-    public Iterator<URL> getSource() {
-        if (null != sourceDirectory) {
-            return Collections.singleton(URLUtil.parseURL(sourceDirectory)).iterator();
-        }
-        List<URL> sourceURLs = new ArrayList<URL>();
-        if( sourcePaths != null && sourcePaths.size() > 0)
-            for (String source : sourcePaths) {
-                sourceURLs.add(URLUtil.parseURL(source));
-            }
-        return sourceURLs.iterator();
+  /**
+   * get all sources.
+   * @return Iterator of URL
+   */
+  public Iterator<URL> getSource() {
+    if (null != sourceDirectory) {
+      return Collections.singleton(URLUtil.parseURL(sourceDirectory)).iterator();
     }
-
-    public void setMaxDepth(int maxDepth) {
-        this.maxDepth = maxDepth;
+    List<URL> sourceUrls = new ArrayList<URL>();
+    if ( sourcePaths != null && sourcePaths.size() > 0) {
+      for (String source : sourcePaths) {
+        sourceUrls.add(URLUtil.parseURL(source));
+      }
     }
+    return sourceUrls.iterator();
+  }
+
+  public void setMaxDepth(int maxDepth) {
+    this.maxDepth = maxDepth;
+  }
 }