You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@eagle.apache.org by ha...@apache.org on 2016/09/08 07:14:18 UTC

[17/18] incubator-eagle git commit: [EAGLE-530] Fix checkstyle problems on eagle-alert module and enable failOnViolation

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a0fc8708/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/coordination/model/Tuple2StreamConverter.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/coordination/model/Tuple2StreamConverter.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/coordination/model/Tuple2StreamConverter.java
index 8867fd7..4627bef 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/coordination/model/Tuple2StreamConverter.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/coordination/model/Tuple2StreamConverter.java
@@ -16,49 +16,46 @@
  */
 package org.apache.eagle.alert.coordination.model;
 
+import com.google.common.base.Strings;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 import java.text.SimpleDateFormat;
 import java.util.Arrays;
 import java.util.List;
 import java.util.Map;
 import java.util.Properties;
 
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.common.base.Strings;
-
 /**
  * Convert incoming tuple to stream
  * incoming tuple consists of 2 fields, topic and map of key/value
- * output stream consists of 3 fields, stream name, timestamp, and map of key/value
+ * output stream consists of 3 fields, stream name, timestamp, and map of key/value.
  */
 public class Tuple2StreamConverter {
     private static final Logger LOG = LoggerFactory.getLogger(Tuple2StreamConverter.class);
     private Tuple2StreamMetadata metadata;
     private StreamNameSelector cachedSelector;
-    public Tuple2StreamConverter(Tuple2StreamMetadata metadata){
+
+    public Tuple2StreamConverter(Tuple2StreamMetadata metadata) {
         this.metadata = metadata;
         try {
-            cachedSelector = (StreamNameSelector)Class.forName(metadata.getStreamNameSelectorCls()).
-                    getConstructor(Properties.class).
-                    newInstance(metadata.getStreamNameSelectorProp());
-        }catch(Exception ex){
+            cachedSelector = (StreamNameSelector) Class.forName(metadata.getStreamNameSelectorCls())
+                .getConstructor(Properties.class)
+                .newInstance(metadata.getStreamNameSelectorProp());
+        } catch (Exception ex) {
             LOG.error("error initializing StreamNameSelector object", ex);
             throw new IllegalStateException(ex);
         }
     }
 
     /**
-     * Assume tuple is composed of topic + map of key/value
-     * @param tuple
-     * @return
+     * Assume tuple is composed of topic + map of key/value.
      */
-    @SuppressWarnings({ "unchecked" })
-    public List<Object> convert(List<Object> tuple){
-        Map<String, Object> m = (Map<String, Object>)tuple.get(1);
+    @SuppressWarnings( {"unchecked"})
+    public List<Object> convert(List<Object> tuple) {
+        Map<String, Object> m = (Map<String, Object>) tuple.get(1);
         String streamName = cachedSelector.getStreamName(m);
-        if(!metadata.getActiveStreamNames().contains(streamName)) {
-            if(LOG.isDebugEnabled()) {
+        if (!metadata.getActiveStreamNames().contains(streamName)) {
+            if (LOG.isDebugEnabled()) {
                 LOG.debug("streamName {} is not within activeStreamNames {}", streamName, metadata.getActiveStreamNames());
             }
             return null;
@@ -81,14 +78,15 @@ public class Tuple2StreamConverter {
                     LOG.debug("continue with current timestamp becuase no data format sepcified! Metadata : {} ", metadata);
                 }
                 timestamp = System.currentTimeMillis();
-            } else 
-            
-            try {
-                SimpleDateFormat sdf = new SimpleDateFormat(metadata.getTimestampFormat());
-                timestamp = sdf.parse(timestampFieldValue).getTime();
-            } catch (Exception ex) {
-                LOG.error("continue with current timestamp because error happens while parsing timestamp column " + metadata.getTimestampColumn() + " with format " + metadata.getTimestampFormat());
-                timestamp = System.currentTimeMillis();
+            } else {
+                try {
+                    SimpleDateFormat sdf = new SimpleDateFormat(metadata.getTimestampFormat());
+                    timestamp = sdf.parse(timestampFieldValue).getTime();
+                } catch (Exception ex) {
+                    LOG.error("continue with current timestamp because error happens while parsing timestamp column "
+                        + metadata.getTimestampColumn() + " with format " + metadata.getTimestampFormat());
+                    timestamp = System.currentTimeMillis();
+                }
             }
         }
         return Arrays.asList(tuple.get(0), streamName, timestamp, tuple.get(1));

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a0fc8708/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/coordination/model/Tuple2StreamMetadata.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/coordination/model/Tuple2StreamMetadata.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/coordination/model/Tuple2StreamMetadata.java
index bde4fe3..788547d 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/coordination/model/Tuple2StreamMetadata.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/coordination/model/Tuple2StreamMetadata.java
@@ -21,21 +21,23 @@ import java.util.Properties;
 import java.util.Set;
 
 /**
- * @Since 4/25/16. This metadata controls how tuple is transformed to stream for
- *        example raw data consists of {"metric" : "cpuUsage", "host" :
- *        "xyz.com", "timestamp" : 1346846400, "value" : "0.9"} field "metric"
- *        is used for creating stream name, here "cpuUsage" is stream name
+ * This metadata controls how tuple is transformed to stream for
+ * example raw data consists of {"metric" : "cpuUsage", "host" :
+ * "xyz.com", "timestamp" : 1346846400, "value" : "0.9"} field "metric"
+ * is used for creating stream name, here "cpuUsage" is stream name
  *
- *        metric could be "cpuUsage", "diskUsage", "memUsage" etc, so
- *        activeStreamNames are subset of all metric names
+ * <p>metric could be "cpuUsage", "diskUsage", "memUsage" etc, so
+ * activeStreamNames are subset of all metric names</p>
  *
- *        All other messages which are not one of activeStreamNames will be
- *        filtered out
+ * <p>All other messages which are not one of activeStreamNames will be
+ * filtered out.</p>
+ *
+ * @since 4/25/16
  */
 public class Tuple2StreamMetadata {
     /**
      * only messages belonging to activeStreamNames will be kept while
-     * transforming tuple into stream
+     * transforming tuple into stream.
      */
     private Set<String> activeStreamNames = new HashSet<String>();
     // the specific stream name selector

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a0fc8708/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/coordination/model/VersionedPolicyDefinition.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/coordination/model/VersionedPolicyDefinition.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/coordination/model/VersionedPolicyDefinition.java
index f4b8ccb..bbd4178 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/coordination/model/VersionedPolicyDefinition.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/coordination/model/VersionedPolicyDefinition.java
@@ -18,10 +18,6 @@ package org.apache.eagle.alert.coordination.model;
 
 import org.apache.eagle.alert.engine.coordinator.PolicyDefinition;
 
-/**
- * @since May 25, 2016
- *
- */
 public class VersionedPolicyDefinition {
     private String version;
     private PolicyDefinition definition;

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a0fc8708/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/coordination/model/VersionedStreamDefinition.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/coordination/model/VersionedStreamDefinition.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/coordination/model/VersionedStreamDefinition.java
index 2770aa1..c9f830b 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/coordination/model/VersionedStreamDefinition.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/coordination/model/VersionedStreamDefinition.java
@@ -18,10 +18,6 @@ package org.apache.eagle.alert.coordination.model;
 
 import org.apache.eagle.alert.engine.coordinator.StreamDefinition;
 
-/**
- * @since May 25, 2016
- *
- */
 public class VersionedStreamDefinition {
     private String version;
     private StreamDefinition definition;

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a0fc8708/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/coordination/model/WorkSlot.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/coordination/model/WorkSlot.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/coordination/model/WorkSlot.java
index 3f6f36d..9353dbd 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/coordination/model/WorkSlot.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/coordination/model/WorkSlot.java
@@ -63,8 +63,8 @@ public class WorkSlot {
             return false;
         }
         WorkSlot workSlot = (WorkSlot) other;
-        return Objects.equals(topologyName, workSlot.topologyName) &&
-                Objects.equals(boltId, workSlot.boltId);
+        return Objects.equals(topologyName, workSlot.topologyName)
+            && Objects.equals(boltId, workSlot.boltId);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a0fc8708/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/coordination/model/internal/MonitoredStream.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/coordination/model/internal/MonitoredStream.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/coordination/model/internal/MonitoredStream.java
index beda896..e72836e 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/coordination/model/internal/MonitoredStream.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/coordination/model/internal/MonitoredStream.java
@@ -16,22 +16,19 @@
  */
 package org.apache.eagle.alert.coordination.model.internal;
 
+import org.apache.commons.lang3.builder.HashCodeBuilder;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Objects;
 
-import org.apache.commons.lang3.builder.HashCodeBuilder;
-
 /**
  * A monitored stream is the unique data set in the system.
- * 
- * It's a combination of stream and the specific grp-by on it.
- * 
- * For correlation stream, it means multiple stream for a given monitored stream.
- * 
- * 
- * @since Apr 27, 2016
  *
+ * <p>It's a combination of stream and the specific grp-by on it.
+ *
+ * <p>For correlation stream, it means multiple stream for a given monitored stream.
+ *
+ * @since Apr 27, 2016
  */
 public class MonitoredStream {
 
@@ -40,7 +37,7 @@ public class MonitoredStream {
     // the stream group that this monitored stream stands for
     private StreamGroup streamGroup = new StreamGroup();
     private List<StreamWorkSlotQueue> queues = new ArrayList<StreamWorkSlotQueue>();
-    
+
     public MonitoredStream() {
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a0fc8708/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/coordination/model/internal/PolicyAssignment.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/coordination/model/internal/PolicyAssignment.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/coordination/model/internal/PolicyAssignment.java
index 3e956ca..7747d58 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/coordination/model/internal/PolicyAssignment.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/coordination/model/internal/PolicyAssignment.java
@@ -17,10 +17,9 @@
 package org.apache.eagle.alert.coordination.model.internal;
 
 /**
- * monitor metadata
- * 
- * @since Apr 27, 2016
+ * monitor metadata.
  *
+ * @since Apr 27, 2016
  */
 public class PolicyAssignment {
 

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a0fc8708/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/coordination/model/internal/ScheduleStateBase.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/coordination/model/internal/ScheduleStateBase.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/coordination/model/internal/ScheduleStateBase.java
index a1efbf9..2462119 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/coordination/model/internal/ScheduleStateBase.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/coordination/model/internal/ScheduleStateBase.java
@@ -17,12 +17,9 @@
 package org.apache.eagle.alert.coordination.model.internal;
 
 /**
- *
- * This is the Base part of ScheduleState, only contains version/generateTime/code/message/scheduleTimeMillis
- *
+ * This is the Base part of ScheduleState, only contains version/generateTime/code/message/scheduleTimeMillis.
  *
  * @since Aug 10, 2016
- *
  */
 public class ScheduleStateBase {
     private String version;
@@ -81,6 +78,4 @@ public class ScheduleStateBase {
     }
 
 
-
-
 }

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a0fc8708/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/coordination/model/internal/StreamGroup.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/coordination/model/internal/StreamGroup.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/coordination/model/internal/StreamGroup.java
index d87d62b..7941b85 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/coordination/model/internal/StreamGroup.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/coordination/model/internal/StreamGroup.java
@@ -16,26 +16,23 @@
  */
 package org.apache.eagle.alert.coordination.model.internal;
 
-import java.util.ArrayList;
-import java.util.List;
 
-import org.apache.commons.lang3.builder.HashCodeBuilder;
 import org.apache.eagle.alert.engine.coordinator.StreamPartition;
 
 import com.fasterxml.jackson.annotation.JsonIgnore;
 import com.google.common.base.Objects;
+import org.apache.commons.lang3.builder.HashCodeBuilder;
+
+import java.util.ArrayList;
+import java.util.List;
 
-/**
- * @since May 6, 2016
- *
- */
 public class StreamGroup {
 
     private List<StreamPartition> streamPartitions = new ArrayList<StreamPartition>();
-    
+
     public StreamGroup() {
     }
-    
+
     public List<StreamPartition> getStreamPartitions() {
         return streamPartitions;
     }

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a0fc8708/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/coordination/model/internal/StreamWorkSlotQueue.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/coordination/model/internal/StreamWorkSlotQueue.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/coordination/model/internal/StreamWorkSlotQueue.java
index f4f6142..86b150c 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/coordination/model/internal/StreamWorkSlotQueue.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/coordination/model/internal/StreamWorkSlotQueue.java
@@ -16,22 +16,20 @@
  */
 package org.apache.eagle.alert.coordination.model.internal;
 
+import org.apache.eagle.alert.coordination.model.WorkSlot;
+
+import com.fasterxml.jackson.annotation.JsonIgnore;
 import java.util.HashMap;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 
-import org.apache.eagle.alert.coordination.model.WorkSlot;
-
-import com.fasterxml.jackson.annotation.JsonIgnore;
-
 /**
  * A work queue for given one monitored stream.
- * 
- * Analog to storm's "tasks for given bolt".
- * 
- * @since Apr 27, 2016
  *
+ * <p>Analog to storm's "tasks for given bolt".
+ *
+ * @since Apr 27, 2016
  */
 public class StreamWorkSlotQueue {
     private String queueId;
@@ -40,15 +38,15 @@ public class StreamWorkSlotQueue {
     private boolean dedicated;
     // some dedicated option, like dedicated userId/tenantId/policyId.
     private Map<String, Object> dedicateOption;
-    
+
     private int numberOfGroupBolts;
-    private Map<String, Integer> topoGroupStartIndex = new HashMap<String, Integer>(); 
+    private Map<String, Integer> topoGroupStartIndex = new HashMap<String, Integer>();
 
     public StreamWorkSlotQueue() {
     }
-    
+
     public StreamWorkSlotQueue(StreamGroup par, boolean isDedicated, Map<String, Object> options,
-            List<WorkSlot> slots) {
+                               List<WorkSlot> slots) {
         this.queueId = par.getStreamId() + System.currentTimeMillis();// simply generate a queue
         this.dedicated = isDedicated;
         dedicateOption = new HashMap<String, Object>();
@@ -81,11 +79,11 @@ public class StreamWorkSlotQueue {
         return workingSlots.size();
     }
 
-//    @org.codehaus.jackson.annotate.JsonIgnore
-//    @JsonIgnore
-//    public void placePolicy(PolicyDefinition pd) {
-//        policies.add(pd.getName());
-//    }
+    //    @org.codehaus.jackson.annotate.JsonIgnore
+    //    @JsonIgnore
+    //    public void placePolicy(PolicyDefinition pd) {
+    //        policies.add(pd.getName());
+    //    }
 
     public int getNumberOfGroupBolts() {
         return numberOfGroupBolts;

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a0fc8708/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/coordination/model/internal/Topology.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/coordination/model/internal/Topology.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/coordination/model/internal/Topology.java
index 189e2a5..c41c867 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/coordination/model/internal/Topology.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/coordination/model/internal/Topology.java
@@ -20,10 +20,12 @@ import java.util.HashSet;
 import java.util.Set;
 
 /**
- * @since Mar 24, 2016 Logically one unit topology consists of S spouts, G
- *        groupby bolts, A alertBolts normally S=1 Physically each spout is
- *        composed of s spout nodes, each groupby bolt is composed of g groupby
- *        nodes, and each alert bolt is composed of a alert nodes
+ * Logically one unit topology consists of S spouts, G
+ * groupby bolts, A alertBolts normally S=1 Physically each spout is
+ * composed of s spout nodes, each groupby bolt is composed of g groupby
+ * nodes, and each alert bolt is composed of a alert nodes.
+ *
+ * @since Mar 24, 2016
  */
 public class Topology {
 

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a0fc8708/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/codec/IEventSerializer.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/codec/IEventSerializer.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/codec/IEventSerializer.java
index dd26d20..ac375e1 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/codec/IEventSerializer.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/codec/IEventSerializer.java
@@ -19,8 +19,7 @@ package org.apache.eagle.alert.engine.codec;
 import org.apache.eagle.alert.engine.model.AlertStreamEvent;
 
 /**
- * @since Jun 3, 2016
- *
+ * @since Jun 3, 2016.
  */
 public interface IEventSerializer {
     Object serialize(AlertStreamEvent event);

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a0fc8708/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/PolicyDefinition.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/PolicyDefinition.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/PolicyDefinition.java
index c54955f..680b21a 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/PolicyDefinition.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/PolicyDefinition.java
@@ -25,11 +25,10 @@ import java.io.Serializable;
 import java.util.*;
 
 /**
- * @since Apr 5, 2016
- *
+ * @since Apr 5, 2016.
  */
 @JsonIgnoreProperties(ignoreUnknown = true)
-public class PolicyDefinition implements Serializable{
+public class PolicyDefinition implements Serializable {
     private static final long serialVersionUID = 377581499339572414L;
     // unique identifier
     private String name;
@@ -116,48 +115,50 @@ public class PolicyDefinition implements Serializable{
     }
 
     public PolicyStatus getPolicyStatus() {
-		return policyStatus;
-	}
+        return policyStatus;
+    }
 
-	public void setPolicyStatus(PolicyStatus policyStatus) {
-		this.policyStatus = policyStatus;
-	}
+    public void setPolicyStatus(PolicyStatus policyStatus) {
+        this.policyStatus = policyStatus;
+    }
 
-	@Override
+    @Override
     public int hashCode() {
-        return new HashCodeBuilder().
-                append(name).
-                append(inputStreams).
-                append(outputStreams).
-                append(definition).
-                append(partitionSpec).
-//                append(parallelismHint).
-                build();
+        return new HashCodeBuilder()
+            .append(name)
+            .append(inputStreams)
+            .append(outputStreams)
+            .append(definition)
+            .append(partitionSpec)
+            // .append(parallelismHint)
+            .build();
     }
 
     @Override
-    public boolean equals(Object that){
-        if(that == this)
+    public boolean equals(Object that) {
+        if (that == this) {
             return true;
-        if(! (that instanceof PolicyDefinition))
+        }
+        if (!(that instanceof PolicyDefinition)) {
             return false;
-        PolicyDefinition another = (PolicyDefinition)that;
-        if(Objects.equals(another.name, this.name) &&
-        		Objects.equals(another.description, this.description) &&
-                CollectionUtils.isEqualCollection(another.inputStreams, this.inputStreams) &&
-                CollectionUtils.isEqualCollection(another.outputStreams, this.outputStreams) &&
-                another.definition.equals(this.definition) &&
-                Objects.equals(this.definition, another.definition) &&
-                CollectionUtils.isEqualCollection(another.partitionSpec, this.partitionSpec) 
-//                && another.parallelismHint == this.parallelismHint
-                ) {
+        }
+        PolicyDefinition another = (PolicyDefinition) that;
+        if (Objects.equals(another.name, this.name)
+            && Objects.equals(another.description, this.description)
+            && CollectionUtils.isEqualCollection(another.inputStreams, this.inputStreams)
+            && CollectionUtils.isEqualCollection(another.outputStreams, this.outputStreams)
+            && another.definition.equals(this.definition)
+            && Objects.equals(this.definition, another.definition)
+            && CollectionUtils.isEqualCollection(another.partitionSpec, this.partitionSpec)
+            // && another.parallelismHint == this.parallelismHint
+            ) {
             return true;
         }
         return false;
     }
 
     @JsonIgnoreProperties(ignoreUnknown = true)
-    public static class Definition implements Serializable{
+    public static class Definition implements Serializable {
         private static final long serialVersionUID = -622366527887848346L;
 
         public String type;
@@ -168,7 +169,7 @@ public class PolicyDefinition implements Serializable{
         private List<String> inputStreams = new ArrayList<String>();
         private List<String> outputStreams = new ArrayList<String>();
 
-        public Definition(String type,String value){
+        public Definition(String type, String value) {
             this.type = type;
             this.value = value;
         }
@@ -184,17 +185,20 @@ public class PolicyDefinition implements Serializable{
         }
 
         @Override
-        public boolean equals(Object that){
-            if(that == this)
+        public boolean equals(Object that) {
+            if (that == this) {
                 return true;
-            if(!(that instanceof Definition))
+            }
+            if (!(that instanceof Definition)) {
                 return false;
-            Definition another = (Definition)that;
-            if(another.type.equals(this.type)
-                    && another.value.equals(this.value)
-                    && ListUtils.isEqualList(another.inputStreams, this.inputStreams)
-                    && ListUtils.isEqualList(another.outputStreams, this.outputStreams))
+            }
+            Definition another = (Definition) that;
+            if (another.type.equals(this.type)
+                && another.value.equals(this.value)
+                && ListUtils.isEqualList(another.inputStreams, this.inputStreams)
+                && ListUtils.isEqualList(another.outputStreams, this.outputStreams)) {
                 return true;
+            }
             return false;
         }
 
@@ -248,16 +252,16 @@ public class PolicyDefinition implements Serializable{
 
         @Override
         public String toString() {
-            return String.format("{type=\"%s\",value=\"%s\", inputStreams=\"%s\", outputStreams=\"%s\" }",type,value, inputStreams, outputStreams);
+            return String.format("{type=\"%s\",value=\"%s\", inputStreams=\"%s\", outputStreams=\"%s\" }", type, value, inputStreams, outputStreams);
         }
     }
-    
+
     public static enum PolicyStatus {
-    	ENABLED, DISABLED
+        ENABLED, DISABLED
     }
 
     @Override
     public String toString() {
-        return String.format("{name=\"%s\",definition=%s}",this.getName(),this.getDefinition()==null?"null": this.getDefinition().toString());
+        return String.format("{name=\"%s\",definition=%s}", this.getName(), this.getDefinition() == null ? "null" : this.getDefinition().toString());
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a0fc8708/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/Publishment.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/Publishment.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/Publishment.java
index e3b4e33..0bada4e 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/Publishment.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/Publishment.java
@@ -24,8 +24,7 @@ import java.util.Map;
 import java.util.Objects;
 
 /**
- * @since Apr 11, 2016
- *
+ * @since Apr 11, 2016.
  */
 @JsonIgnoreProperties(ignoreUnknown = true)
 public class Publishment {
@@ -100,8 +99,8 @@ public class Publishment {
         if (obj instanceof Publishment) {
             Publishment p = (Publishment) obj;
             return (Objects.equals(name, p.getName()) && Objects.equals(type, p.getType())
-                    && Objects.equals(dedupIntervalMin, p.getDedupIntervalMin())
-                    && Objects.equals(policyIds, p.getPolicyIds()) && properties.equals(p.getProperties()));
+                && Objects.equals(dedupIntervalMin, p.getDedupIntervalMin())
+                && Objects.equals(policyIds, p.getPolicyIds()) && properties.equals(p.getProperties()));
         }
         return false;
     }
@@ -109,14 +108,14 @@ public class Publishment {
     @Override
     public int hashCode() {
         return new HashCodeBuilder().append(name).append(type).append(dedupIntervalMin).append(policyIds)
-                .append(properties).build();
+            .append(properties).build();
     }
 
     @Override
     public String toString() {
         StringBuilder sb = new StringBuilder();
         sb.append("Publishment[name:").append(name).append(",type:").append(type).append(",policyId:")
-                .append(policyIds).append(",properties:").append(properties);
+            .append(policyIds).append(",properties:").append(properties);
         return sb.toString();
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a0fc8708/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/PublishmentType.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/PublishmentType.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/PublishmentType.java
index daecab4..5329dfa 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/PublishmentType.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/PublishmentType.java
@@ -18,11 +18,11 @@
 
 package org.apache.eagle.alert.engine.coordinator;
 
-import java.util.Objects;
-
 import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
 import org.apache.commons.lang3.builder.HashCodeBuilder;
 
+import java.util.Objects;
+
 @JsonIgnoreProperties(ignoreUnknown = true)
 public class PublishmentType {
 
@@ -39,17 +39,19 @@ public class PublishmentType {
         this.type = type;
     }
 
-    public String getClassName(){
+    public String getClassName() {
         return className;
     }
-    public void setClassName(String className){
+
+    public void setClassName(String className) {
         this.className = className;
     }
 
-    public String getDescription(){
+    public String getDescription() {
         return description;
     }
-    public void setDescription(String description){
+
+    public void setDescription(String description) {
         this.description = description;
     }
 
@@ -65,10 +67,10 @@ public class PublishmentType {
     public boolean equals(Object obj) {
         if (obj instanceof PublishmentType) {
             PublishmentType p = (PublishmentType) obj;
-            return (Objects.equals(className, p.getClassName()) &&
-                    Objects.equals(type, p.type) && 
-                    Objects.equals(description, p.getDescription()) &&
-                    Objects.equals(fields, p.getFields()));
+            return (Objects.equals(className, p.getClassName())
+                && Objects.equals(type, p.type)
+                && Objects.equals(description, p.getDescription())
+                && Objects.equals(fields, p.getFields()));
         }
         return false;
     }
@@ -76,10 +78,10 @@ public class PublishmentType {
     @Override
     public int hashCode() {
         return new HashCodeBuilder()
-                .append(className)
-                .append(type)
-                .append(description)
-                .append(fields)
-                .build();
+            .append(className)
+            .append(type)
+            .append(description)
+            .append(fields)
+            .build();
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a0fc8708/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/StreamColumn.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/StreamColumn.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/StreamColumn.java
index 4483fe4..2be4936 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/StreamColumn.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/StreamColumn.java
@@ -16,16 +16,14 @@
  */
 package org.apache.eagle.alert.engine.coordinator;
 
+import com.fasterxml.jackson.databind.ObjectMapper;
+
 import java.io.IOException;
 import java.io.Serializable;
 import java.util.HashMap;
-
 import javax.xml.bind.annotation.adapters.XmlAdapter;
 import javax.xml.bind.annotation.adapters.XmlJavaTypeAdapter;
 
-import com.fasterxml.jackson.databind.ObjectMapper;
-
-
 public class StreamColumn implements Serializable {
     private static final long serialVersionUID = -5457861313624389106L;
     private String name;
@@ -36,19 +34,19 @@ public class StreamColumn implements Serializable {
     private String nodataExpression;
 
     public String toString() {
-        return String.format("StreamColumn=name[%s], type=[%s], defaultValue=[%s], required=[%s], nodataExpression=[%s]", 
-        		name, type, defaultValue, required, nodataExpression);
+        return String.format("StreamColumn=name[%s], type=[%s], defaultValue=[%s], required=[%s], nodataExpression=[%s]",
+            name, type, defaultValue, required, nodataExpression);
     }
 
     public String getNodataExpression() {
-		return nodataExpression;
-	}
+        return nodataExpression;
+    }
 
-	public void setNodataExpression(String nodataExpression) {
-		this.nodataExpression = nodataExpression;
-	}
+    public void setNodataExpression(String nodataExpression) {
+        this.nodataExpression = nodataExpression;
+    }
 
-	public String getName() {
+    public String getName() {
         return name;
     }
 
@@ -71,7 +69,7 @@ public class StreamColumn implements Serializable {
     }
 
     private void ensureDefaultValueType() {
-        if(this.getDefaultValue()!=null && (this.getDefaultValue() instanceof String) && this.getType() != Type.STRING){
+        if (this.getDefaultValue() != null && (this.getDefaultValue() instanceof String) && this.getType() != Type.STRING) {
             switch (this.getType()) {
                 case INT:
                     this.setDefaultValue(Integer.valueOf((String) this.getDefaultValue()));
@@ -90,11 +88,13 @@ public class StreamColumn implements Serializable {
                     break;
                 case OBJECT:
                     try {
-                        this.setDefaultValue(new ObjectMapper().readValue((String) this.getDefaultValue(),HashMap.class));
+                        this.setDefaultValue(new ObjectMapper().readValue((String) this.getDefaultValue(), HashMap.class));
                     } catch (IOException e) {
                         throw new IllegalArgumentException(e);
                     }
                     break;
+                default:
+                    throw new IllegalArgumentException("Illegal type: " + this.getType());
             }
         }
     }
@@ -145,7 +145,7 @@ public class StreamColumn implements Serializable {
         }
     }
 
-    public static class StreamColumnTypeAdapter extends XmlAdapter<String,Type>{
+    public static class StreamColumnTypeAdapter extends XmlAdapter<String, Type> {
 
         @Override
         public Type unmarshal(String v) throws Exception {
@@ -158,7 +158,7 @@ public class StreamColumn implements Serializable {
         }
     }
 
-    public static class DefaultValueAdapter extends XmlAdapter<String,Object>{
+    public static class DefaultValueAdapter extends XmlAdapter<String, Object> {
         @Override
         public Object unmarshal(String v) throws Exception {
             return v;

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a0fc8708/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/StreamDefinition.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/StreamDefinition.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/StreamDefinition.java
index beb8491..9130951 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/StreamDefinition.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/StreamDefinition.java
@@ -16,18 +16,18 @@
  */
 package org.apache.eagle.alert.engine.coordinator;
 
-import javax.xml.bind.annotation.*;
 import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.List;
+import javax.xml.bind.annotation.XmlElement;
+import javax.xml.bind.annotation.XmlElementWrapper;
 
 /**
  * This is actually a data source schema.
- * 
- * @since Apr 5, 2016
  *
+ * @since Apr 5, 2016
  */
-public class StreamDefinition implements Serializable{
+public class StreamDefinition implements Serializable {
     private static final long serialVersionUID = 2352202882328931825L;
     private String streamId;
     private String dataSource;
@@ -37,14 +37,14 @@ public class StreamDefinition implements Serializable{
 
     private List<StreamColumn> columns = new ArrayList<>();
 
-    public String toString(){
+    public String toString() {
         return String.format("StreamDefinition[streamId=%s, dataSource=%s, description=%s, validate=%s, timeseries=%s, columns=%s",
-                streamId,
-                dataSource,
-                description,
-                validate,
-                timeseries,
-                columns);
+            streamId,
+            dataSource,
+            description,
+            validate,
+            timeseries,
+            columns);
     }
 
     public String getStreamId() {
@@ -79,7 +79,7 @@ public class StreamDefinition implements Serializable{
         this.timeseries = timeseries;
     }
 
-    @XmlElementWrapper(name="columns")
+    @XmlElementWrapper(name = "columns")
     @XmlElement(name = "column")
     public List<StreamColumn> getColumns() {
         return columns;
@@ -97,10 +97,12 @@ public class StreamDefinition implements Serializable{
         this.dataSource = dataSource;
     }
 
-    public int getColumnIndex(String column){
-        int i=0;
-        for(StreamColumn col:this.getColumns()){
-            if(col.getName().equals(column)) return i;
+    public int getColumnIndex(String column) {
+        int i = 0;
+        for (StreamColumn col : this.getColumns()) {
+            if (col.getName().equals(column)) {
+                return i;
+            }
             i++;
         }
         return -1;

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a0fc8708/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/StreamPartition.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/StreamPartition.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/StreamPartition.java
index 47e15c0..0987463 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/StreamPartition.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/StreamPartition.java
@@ -16,20 +16,19 @@
  */
 package org.apache.eagle.alert.engine.coordinator;
 
-import java.io.Serializable;
-import java.util.*;
-
 import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
 import org.apache.commons.collections.CollectionUtils;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.commons.lang3.builder.HashCodeBuilder;
+import java.io.Serializable;
+import java.util.*;
 
 /**
  * StreamPartition defines how a data stream is partitioned and sorted
  * streamId is used for distinguishing different streams which are spawned from the same data source
  * type defines how to partition data among slots within one slotqueue
  * columns are fields based on which stream is grouped
- * sortSpec defines how data is sorted
+ * sortSpec defines how data is sorted.
  */
 @JsonIgnoreProperties(ignoreUnknown = true)
 public class StreamPartition implements Serializable {
@@ -52,14 +51,15 @@ public class StreamPartition implements Serializable {
 
     @Override
     public boolean equals(Object other) {
-        if (other == this)
+        if (other == this) {
             return true;
+        }
         if (!(other instanceof StreamPartition)) {
             return false;
         }
         StreamPartition sp = (StreamPartition) other;
         return Objects.equals(streamId, sp.streamId) && Objects.equals(type, sp.type)
-                && CollectionUtils.isEqualCollection(columns, sp.columns) && Objects.equals(sortSpec, sp.sortSpec);
+            && CollectionUtils.isEqualCollection(columns, sp.columns) && Objects.equals(sortSpec, sp.sortSpec);
     }
 
     @Override
@@ -71,46 +71,52 @@ public class StreamPartition implements Serializable {
         this.type = type;
     }
 
-    public Type getType(){
+    public Type getType() {
         return this.type;
     }
 
-    public enum Type{
-        GLOBAL("GLOBAL",0), GROUPBY("GROUPBY",1), SHUFFLE("SHUFFLE",2);
+    public enum Type {
+        GLOBAL("GLOBAL", 0), GROUPBY("GROUPBY", 1), SHUFFLE("SHUFFLE", 2);
         private final String name;
         private final int index;
-        Type(String name, int index){
+
+        Type(String name, int index) {
             this.name = name;
             this.index = index;
         }
+
         @Override
         public String toString() {
             return this.name;
         }
-        public static Type locate(String type){
+
+        public static Type locate(String type) {
             Type _type = _NAME_TYPE.get(type.toUpperCase());
-            if(_type == null)
-                throw new IllegalStateException("Illegal type name: "+type);
+            if (_type == null) {
+                throw new IllegalStateException("Illegal type name: " + type);
+            }
             return _type;
         }
 
-        public static Type locate(int index){
+        public static Type locate(int index) {
             Type _type = _INDEX_TYPE.get(index);
-            if(_type == null)
-                throw new IllegalStateException("Illegal type index: "+index);
+            if (_type == null) {
+                throw new IllegalStateException("Illegal type index: " + index);
+            }
             return _type;
         }
 
-        private static final Map<String,Type> _NAME_TYPE = new HashMap<>();
-        private static final Map<Integer,Type> _INDEX_TYPE = new TreeMap<>();
+        private static final Map<String, Type> _NAME_TYPE = new HashMap<>();
+        private static final Map<Integer, Type> _INDEX_TYPE = new TreeMap<>();
+
         static {
-            _NAME_TYPE.put(GLOBAL.name,GLOBAL);
-            _NAME_TYPE.put(GROUPBY.name,GROUPBY);
-            _NAME_TYPE.put(SHUFFLE.name,SHUFFLE);
+            _NAME_TYPE.put(GLOBAL.name, GLOBAL);
+            _NAME_TYPE.put(GROUPBY.name, GROUPBY);
+            _NAME_TYPE.put(SHUFFLE.name, SHUFFLE);
 
-            _INDEX_TYPE.put(GLOBAL.index,GLOBAL);
-            _INDEX_TYPE.put(GROUPBY.index,GLOBAL);
-            _INDEX_TYPE.put(SHUFFLE.index,GLOBAL);
+            _INDEX_TYPE.put(GLOBAL.index, GLOBAL);
+            _INDEX_TYPE.put(GROUPBY.index, GLOBAL);
+            _INDEX_TYPE.put(SHUFFLE.index, GLOBAL);
         }
     }
 
@@ -140,6 +146,6 @@ public class StreamPartition implements Serializable {
 
     @Override
     public String toString() {
-        return String.format("StreamPartition[streamId=%s,type=%s,columns=[%s],sortSpec=[%s]]",this.getStreamId(),this.getType(), StringUtils.join(this.getColumns(),","), sortSpec);
+        return String.format("StreamPartition[streamId=%s,type=%s,columns=[%s],sortSpec=[%s]]", this.getStreamId(), this.getType(), StringUtils.join(this.getColumns(), ","), sortSpec);
     }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a0fc8708/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/StreamSortSpec.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/StreamSortSpec.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/StreamSortSpec.java
index 962a8ee..65b9151 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/StreamSortSpec.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/StreamSortSpec.java
@@ -16,24 +16,24 @@
  */
 package org.apache.eagle.alert.engine.coordinator;
 
-
+import org.apache.eagle.alert.utils.TimePeriodUtils;
 import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
 import org.apache.commons.lang.builder.HashCodeBuilder;
-import org.apache.eagle.alert.utils.TimePeriodUtils;
 import org.joda.time.Period;
 
 import java.io.Serializable;
 
 /**
- * streamId is the key
+ * streamId is the key.
  */
 @JsonIgnoreProperties(ignoreUnknown = true)
-public class StreamSortSpec implements Serializable{
+public class StreamSortSpec implements Serializable {
     private static final long serialVersionUID = 3626506441441584937L;
-    private String windowPeriod="";
+    private String windowPeriod = "";
     private int windowMargin = 30 * 1000; // 30 seconds by default
 
-    public StreamSortSpec() {}
+    public StreamSortSpec() {
+    }
 
     public StreamSortSpec(StreamSortSpec spec) {
         this.windowPeriod = spec.windowPeriod;
@@ -45,14 +45,17 @@ public class StreamSortSpec implements Serializable{
     }
 
     public int getWindowPeriodMillis() {
-        if(windowPeriod!=null) {
+        if (windowPeriod != null) {
             return TimePeriodUtils.getMillisecondsOfPeriod(Period.parse(windowPeriod));
-        } else return 0;
+        } else {
+            return 0;
+        }
     }
 
     public void setWindowPeriod(String windowPeriod) {
         this.windowPeriod = windowPeriod;
     }
+
     public void setWindowPeriodMillis(int windowPeriodMillis) {
         this.windowPeriod = Period.millis(windowPeriodMillis).toString();
     }
@@ -71,30 +74,32 @@ public class StreamSortSpec implements Serializable{
     }
 
     @Override
-    public int hashCode(){
-        return new HashCodeBuilder().
-                append(windowPeriod).
-                append(windowMargin).toHashCode();
+    public int hashCode() {
+        return new HashCodeBuilder()
+            .append(windowPeriod)
+            .append(windowMargin)
+            .toHashCode();
     }
 
     @Override
-    public boolean equals(Object that){
-        if(this == that)
+    public boolean equals(Object that) {
+        if (this == that) {
             return true;
-        if(!(that instanceof StreamSortSpec)){
+        }
+        if (!(that instanceof StreamSortSpec)) {
             return false;
         }
 
-        StreamSortSpec another = (StreamSortSpec)that;
-        return 
-                another.windowPeriod.equals(this.windowPeriod) &&
-                another.windowMargin == this.windowMargin;
+        StreamSortSpec another = (StreamSortSpec) that;
+        return
+            another.windowPeriod.equals(this.windowPeriod)
+                && another.windowMargin == this.windowMargin;
     }
 
     @Override
-    public String toString(){
+    public String toString() {
         return String.format("StreamSortSpec[windowPeriod=%s,windowMargin=%d]",
-                this.getWindowPeriod(),
-                this.getWindowMargin());
+            this.getWindowPeriod(),
+            this.getWindowMargin());
     }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a0fc8708/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/StreamingCluster.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/StreamingCluster.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/StreamingCluster.java
index 6cafb16..1e40309 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/StreamingCluster.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/StreamingCluster.java
@@ -16,13 +16,11 @@
  */
 package org.apache.eagle.alert.engine.coordinator;
 
-import java.util.Map;
-
 import com.fasterxml.jackson.annotation.JsonProperty;
+import java.util.Map;
 
 /**
- * @since Apr 5, 2016
- *
+ * @since Apr 5, 2016.
  */
 public class StreamingCluster {
     public static enum StreamingType {
@@ -38,7 +36,7 @@ public class StreamingCluster {
     @JsonProperty
     private String description;
     /**
-     * key - nimbus for storm
+     * key - nimbus for storm.
      */
     @JsonProperty
     private Map<String, String> deployments;

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a0fc8708/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/model/AlertStreamEvent.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/model/AlertStreamEvent.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/model/AlertStreamEvent.java
index 13881a1..a503dcf 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/model/AlertStreamEvent.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/model/AlertStreamEvent.java
@@ -16,15 +16,14 @@
  */
 package org.apache.eagle.alert.engine.model;
 
-import org.apache.commons.lang3.StringUtils;
 import org.apache.eagle.alert.engine.coordinator.StreamDefinition;
 import org.apache.eagle.alert.utils.DateTimeUtil;
-
+import org.apache.commons.lang3.StringUtils;
 import java.util.ArrayList;
 import java.util.List;
 
 /**
- * streamId stands for alert type instead of source event streamId
+ * streamId stands for alert type instead of source event streamId.
  */
 public class AlertStreamEvent extends StreamEvent {
     private static final long serialVersionUID = 2392131134670106397L;
@@ -45,15 +44,16 @@ public class AlertStreamEvent extends StreamEvent {
     @Override
     public String toString() {
         List<String> dataStrings = new ArrayList<>(this.getData().length);
-        for(Object obj: this.getData()){
-            if(obj!=null) {
+        for (Object obj : this.getData()) {
+            if (obj != null) {
                 dataStrings.add(obj.toString());
-            }else{
+            } else {
                 dataStrings.add(null);
             }
         }
         return String.format("AlertStreamEvent[stream=%S,timestamp=%s,data=[%s], policyId=%s, createdBy=%s, metaVersion=%s]",
-                this.getStreamId(), DateTimeUtil.millisecondsToHumanDateWithMilliseconds(this.getTimestamp()), StringUtils.join(dataStrings,","),this.getPolicyId(),this.getCreatedBy(),this.getMetaVersion());
+            this.getStreamId(), DateTimeUtil.millisecondsToHumanDateWithMilliseconds(this.getTimestamp()),
+            StringUtils.join(dataStrings, ","), this.getPolicyId(), this.getCreatedBy(), this.getMetaVersion());
     }
 
     public String getCreatedBy() {

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a0fc8708/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/model/PartitionedEvent.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/model/PartitionedEvent.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/model/PartitionedEvent.java
index cfed3e2..51e4532 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/model/PartitionedEvent.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/model/PartitionedEvent.java
@@ -16,21 +16,19 @@
  */
 package org.apache.eagle.alert.engine.model;
 
-import java.io.Serializable;
-import java.util.Objects;
-
-import org.apache.commons.lang3.builder.HashCodeBuilder;
 import org.apache.eagle.alert.engine.coordinator.StreamPartition;
-
 import backtype.storm.tuple.Tuple;
+import org.apache.commons.lang3.builder.HashCodeBuilder;
+import java.io.Serializable;
+import java.util.Objects;
 
 /**
  * This is a critical data structure across spout, router bolt and alert bolt
  * partition[StreamPartition] defines how one incoming data stream is partitioned, sorted
  * partitionKey[long] is java hash value of groupby fields. The groupby fields are defined in StreamPartition
- * event[StreamEvent] is actual data
+ * event[StreamEvent] is actual data.
  */
-public class PartitionedEvent implements Serializable{
+public class PartitionedEvent implements Serializable {
     private static final long serialVersionUID = -3840016190614238593L;
     private StreamPartition partition;
     private long partitionKey;
@@ -38,11 +36,11 @@ public class PartitionedEvent implements Serializable{
 
     /**
      * Used for bolt-internal but not inter-bolts,
-     * will not pass across bolts
+     * will not pass across bolts.
      */
     private transient Tuple anchor;
 
-    public PartitionedEvent(){
+    public PartitionedEvent() {
         this.event = null;
         this.partition = null;
         this.partitionKey = 0L;
@@ -56,14 +54,18 @@ public class PartitionedEvent implements Serializable{
 
     @Override
     public boolean equals(Object obj) {
-        if(obj == this) return true;
-        if(obj == null) return false;
-        if(obj instanceof PartitionedEvent){
+        if (obj == this) {
+            return true;
+        }
+        if (obj == null) {
+            return false;
+        }
+        if (obj instanceof PartitionedEvent) {
             PartitionedEvent another = (PartitionedEvent) obj;
             return !(this.partitionKey != another.getPartitionKey()
-                    || !Objects.equals(this.event, another.getEvent())
-                    || !Objects.equals(this.partition, another.getPartition())
-                    || !Objects.equals(this.anchor, another.anchor));
+                || !Objects.equals(this.event, another.getEvent())
+                || !Objects.equals(this.partition, another.getPartition())
+                || !Objects.equals(this.anchor, another.anchor));
         } else {
             return false;
         }
@@ -72,10 +74,10 @@ public class PartitionedEvent implements Serializable{
     @Override
     public int hashCode() {
         return new HashCodeBuilder()
-                .append(partitionKey)
-                .append(event)
-                .append(partition)
-                .build();
+            .append(partitionKey)
+            .append(event)
+            .append(partition)
+            .build();
     }
 
     public StreamEvent getEvent() {
@@ -94,36 +96,36 @@ public class PartitionedEvent implements Serializable{
         this.partition = partition;
     }
 
-    public void setPartitionKey(long partitionKey){
+    public void setPartitionKey(long partitionKey) {
         this.partitionKey = partitionKey;
     }
 
-    public long getPartitionKey(){
+    public long getPartitionKey() {
         return this.partitionKey;
     }
 
-    public String toString(){
-       return String.format("PartitionedEvent[partition=%s,event=%s,key=%s", partition, event,partitionKey);
+    public String toString() {
+        return String.format("PartitionedEvent[partition=%s,event=%s,key=%s", partition, event, partitionKey);
     }
 
     public long getTimestamp() {
         return (event != null) ? event.getTimestamp() : 0L;
     }
 
-    public String getStreamId(){
+    public String getStreamId() {
         return (event != null) ? event.getStreamId() : null;
     }
 
-    public Object[] getData(){
-        return event!=null ? event.getData() : null;
+    public Object[] getData() {
+        return event != null ? event.getData() : null;
     }
 
-    public boolean isSortRequired(){
-        return isPartitionRequired() && this.getPartition().getSortSpec()!=null;
+    public boolean isSortRequired() {
+        return isPartitionRequired() && this.getPartition().getSortSpec() != null;
     }
 
-    public boolean isPartitionRequired(){
-        return this.getPartition()!=null;
+    public boolean isPartitionRequired() {
+        return this.getPartition() != null;
     }
 
     public PartitionedEvent copy() {
@@ -142,7 +144,7 @@ public class PartitionedEvent implements Serializable{
         this.anchor = anchor;
     }
 
-    public PartitionedEvent withAnchor(Tuple tuple){
+    public PartitionedEvent withAnchor(Tuple tuple) {
         this.setAnchor(tuple);
         return this;
     }

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a0fc8708/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/model/StreamEvent.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/model/StreamEvent.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/model/StreamEvent.java
index 5f59b1e..d91b001 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/model/StreamEvent.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/model/StreamEvent.java
@@ -16,11 +16,10 @@
  */
 package org.apache.eagle.alert.engine.model;
 
-import org.apache.commons.lang3.StringUtils;
-import org.apache.commons.lang3.builder.HashCodeBuilder;
 import org.apache.eagle.alert.engine.coordinator.StreamDefinition;
 import org.apache.eagle.alert.utils.DateTimeUtil;
-
+import org.apache.commons.lang3.StringUtils;
+import org.apache.commons.lang3.builder.HashCodeBuilder;
 import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -28,8 +27,7 @@ import java.util.List;
 import java.util.Objects;
 
 /**
- * @since Apr 5, 2016
- *
+ * @since Apr 5, 2016.
  */
 public class StreamEvent implements Serializable {
     private static final long serialVersionUID = 2765116509856609763L;
@@ -39,15 +37,16 @@ public class StreamEvent implements Serializable {
     private long timestamp;
     private String metaVersion;
 
-    public StreamEvent(){}
+    public StreamEvent() {
+    }
 
-    public StreamEvent(String streamId,long timestamp,Object[] data){
+    public StreamEvent(String streamId, long timestamp, Object[] data) {
         this.setStreamId(streamId);
         this.setTimestamp(timestamp);
         this.setData(data);
     }
 
-    public StreamEvent(String streamId,long timestamp,Object[] data,String metaVersion){
+    public StreamEvent(String streamId, long timestamp, Object[] data, String metaVersion) {
         this.setStreamId(streamId);
         this.setTimestamp(timestamp);
         this.setData(data);
@@ -62,9 +61,6 @@ public class StreamEvent implements Serializable {
         this.streamId = streamId;
     }
 
-    public Object[] getData() {
-        return data;
-    }
 
     public void setData(Object[] data) {
         this.data = data;
@@ -93,10 +89,12 @@ public class StreamEvent implements Serializable {
 
     @Override
     public boolean equals(Object obj) {
-        if(obj == this) return true;
-        if(obj instanceof StreamEvent){
+        if (obj == this) {
+            return true;
+        }
+        if (obj instanceof StreamEvent) {
             StreamEvent another = (StreamEvent) obj;
-            return Objects.equals(this.streamId,another.streamId) && this.timestamp == another.timestamp && Arrays.deepEquals(this.data,another.data);
+            return Objects.equals(this.streamId, another.streamId) && this.timestamp == another.timestamp && Arrays.deepEquals(this.data, another.data);
         }
         return false;
     }
@@ -104,7 +102,7 @@ public class StreamEvent implements Serializable {
     @Override
     public String toString() {
         List<String> dataStrings = new ArrayList<>();
-        if(this.getData() != null) {
+        if (this.getData() != null) {
             for (Object obj : this.getData()) {
                 if (obj != null) {
                     dataStrings.add(obj.toString());
@@ -113,17 +111,21 @@ public class StreamEvent implements Serializable {
                 }
             }
         }
-        return String.format("StreamEvent[stream=%S,timestamp=%s,data=[%s],metaVersion=%s]",this.getStreamId(), DateTimeUtil.millisecondsToHumanDateWithMilliseconds(this.getTimestamp()), StringUtils.join(dataStrings,","), this.getMetaVersion());
+        return String.format("StreamEvent[stream=%S,timestamp=%s,data=[%s],metaVersion=%s]",
+            this.getStreamId(),
+            DateTimeUtil.millisecondsToHumanDateWithMilliseconds(this.getTimestamp()),
+            StringUtils.join(dataStrings, ","),
+            this.getMetaVersion());
     }
 
-    public static StreamEventBuilder Builder(){
+    public static StreamEventBuilder builder() {
         return new StreamEventBuilder();
     }
 
     /**
-     * @return cloned new event object
+     * @return cloned new event object.
      */
-    public StreamEvent copy(){
+    public StreamEvent copy() {
         StreamEvent newEvent = new StreamEvent();
         newEvent.setTimestamp(this.getTimestamp());
         newEvent.setData(this.getData());
@@ -132,19 +134,18 @@ public class StreamEvent implements Serializable {
         return newEvent;
     }
 
-    public void copyFrom(StreamEvent event){
+    public void copyFrom(StreamEvent event) {
         this.setTimestamp(event.getTimestamp());
         this.setData(event.getData());
         this.setStreamId(event.getStreamId());
         this.setMetaVersion(event.getMetaVersion());
     }
 
-    /**
-     * @param column
-     * @param streamDefinition
-     * @return
-     */
-    public Object[] getData(StreamDefinition streamDefinition,List<String> column) {
+    public Object[] getData() {
+        return data;
+    }
+
+    public Object[] getData(StreamDefinition streamDefinition, List<String> column) {
         ArrayList<Object> result = new ArrayList<>(column.size());
         for (String colName : column) {
             result.add(this.getData()[streamDefinition.getColumnIndex(colName)]);
@@ -152,7 +153,7 @@ public class StreamEvent implements Serializable {
         return result.toArray();
     }
 
-    public Object[] getData(StreamDefinition streamDefinition,String ... column) {
+    public Object[] getData(StreamDefinition streamDefinition, String... column) {
         ArrayList<Object> result = new ArrayList<>(column.length);
         for (String colName : column) {
             result.add(this.getData()[streamDefinition.getColumnIndex(colName)]);

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a0fc8708/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/model/StreamEventBuilder.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/model/StreamEventBuilder.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/model/StreamEventBuilder.java
index 1036ba2..53101ef 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/model/StreamEventBuilder.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/model/StreamEventBuilder.java
@@ -25,62 +25,65 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
 
-public class StreamEventBuilder{
-    private final static Logger LOG = LoggerFactory.getLogger(StreamEventBuilder.class);
+public class StreamEventBuilder {
+    private static final Logger LOG = LoggerFactory.getLogger(StreamEventBuilder.class);
 
     private StreamEvent instance;
     private StreamDefinition streamDefinition;
-    public StreamEventBuilder(){
+
+    public StreamEventBuilder() {
         instance = new StreamEvent();
     }
 
-    public StreamEventBuilder schema(StreamDefinition streamDefinition){
+    public StreamEventBuilder schema(StreamDefinition streamDefinition) {
         this.streamDefinition = streamDefinition;
-        if(instance.getStreamId() == null) instance.setStreamId(streamDefinition.getStreamId());
+        if (instance.getStreamId() == null) {
+            instance.setStreamId(streamDefinition.getStreamId());
+        }
         return this;
     }
 
-    public StreamEventBuilder streamId(String streamId){
+    public StreamEventBuilder streamId(String streamId) {
         instance.setStreamId(streamId);
         return this;
     }
 
-    public StreamEventBuilder attributes(Map<String,Object> data, StreamDefinition streamDefinition){
+    public StreamEventBuilder attributes(Map<String, Object> data, StreamDefinition streamDefinition) {
         this.schema(streamDefinition);
         List<StreamColumn> columnList = streamDefinition.getColumns();
-        if(columnList!=null && columnList.size() > 0){
+        if (columnList != null && columnList.size() > 0) {
             List<Object> values = new ArrayList<>(columnList.size());
             for (StreamColumn column : columnList) {
-                values.add(data.getOrDefault(column.getName(),column.getDefaultValue()));
+                values.add(data.getOrDefault(column.getName(), column.getDefaultValue()));
             }
             instance.setData(values.toArray());
-        } else if(LOG.isDebugEnabled()){
-            LOG.warn("All data [{}] are ignored as no columns defined in schema {}",data,streamDefinition);
+        } else if (LOG.isDebugEnabled()) {
+            LOG.warn("All data [{}] are ignored as no columns defined in schema {}", data, streamDefinition);
         }
         return this;
     }
 
-    public StreamEventBuilder attributes(Map<String,Object> data){
-        return attributes(data,this.streamDefinition);
+    public StreamEventBuilder attributes(Map<String, Object> data) {
+        return attributes(data, this.streamDefinition);
     }
 
-    public StreamEventBuilder attributes(Object ... data){
+    public StreamEventBuilder attributes(Object... data) {
         instance.setData(data);
         return this;
     }
 
-    public StreamEventBuilder timestamep(long timestamp){
+    public StreamEventBuilder timestamep(long timestamp) {
         instance.setTimestamp(timestamp);
         return this;
     }
 
-    public StreamEventBuilder metaVersion(String metaVersion){
+    public StreamEventBuilder metaVersion(String metaVersion) {
         instance.setMetaVersion(metaVersion);
         return this;
     }
 
-    public StreamEvent build(){
-        if(instance.getStreamId() == null){
+    public StreamEvent build() {
+        if (instance.getStreamId() == null) {
             throw new IllegalArgumentException("streamId is null of event: " + instance);
         }
         return instance;

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a0fc8708/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/metric/IMetricSystem.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/metric/IMetricSystem.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/metric/IMetricSystem.java
index 06a99f4..461a23c 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/metric/IMetricSystem.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/metric/IMetricSystem.java
@@ -1,14 +1,4 @@
-package org.apache.eagle.alert.metric;
-
-import java.util.Map;
-
-import org.apache.eagle.alert.metric.sink.MetricSink;
-import org.apache.eagle.alert.metric.source.MetricSource;
-
-import com.codahale.metrics.MetricRegistry;
-import com.typesafe.config.Config;
-
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one or more
  * contributor license agreements.  See the NOTICE file distributed with
  * this work for additional information regarding copyright ownership.
@@ -24,41 +14,48 @@ import com.typesafe.config.Config;
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+package org.apache.eagle.alert.metric;
+
+import org.apache.eagle.alert.metric.sink.MetricSink;
+import org.apache.eagle.alert.metric.source.MetricSource;
+
+import com.codahale.metrics.MetricRegistry;
+import com.typesafe.config.Config;
+import java.util.Map;
+
 public interface IMetricSystem {
 
     /**
-     * Initialize
+     * Initialize.
      */
     void start();
 
     /**
-     * Schedule reporter
+     * Schedule reporter.
      */
     void schedule();
 
     /**
-     * Close and stop all resources and services
+     * Close and stop all resources and services.
      */
     void stop();
 
     /**
-     * Manual report metric
+     * Manual report metric.
      */
     void report();
 
     /**
-     *
-     * @param sink metric sink
+     * @param sink metric sink.
      */
-    void register(MetricSink sink,Config config);
+    void register(MetricSink sink, Config config);
 
     /**
-     *
-     * @param source metric source
+     * @param source metric source.
      */
     void register(MetricSource source);
 
-    void tags(Map<String,Object> metricTags);
+    void tags(Map<String, Object> metricTags);
 
     MetricRegistry registry();
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a0fc8708/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/metric/MetricSystem.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/metric/MetricSystem.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/metric/MetricSystem.java
index b91c606..555c4ec 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/metric/MetricSystem.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/metric/MetricSystem.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one or more
  * contributor license agreements.  See the NOTICE file distributed with
  * this work for additional information regarding copyright ownership.
@@ -16,74 +16,74 @@
  */
 package org.apache.eagle.alert.metric;
 
-import java.util.HashMap;
-import java.util.Map;
-import java.util.concurrent.TimeUnit;
-
 import org.apache.eagle.alert.metric.sink.MetricSink;
 import org.apache.eagle.alert.metric.sink.MetricSinkRepository;
 import org.apache.eagle.alert.metric.source.MetricSource;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
 import com.codahale.metrics.MetricRegistry;
 import com.typesafe.config.Config;
 import com.typesafe.config.ConfigFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
 
 public class MetricSystem implements IMetricSystem {
     private final Config config;
-    private Map<MetricSink,Config> sinks = new HashMap<>();
-//    private Map<String,MetricSource> sources = new HashMap<>();
+    private Map<MetricSink, Config> sinks = new HashMap<>();
+    //    private Map<String,MetricSource> sources = new HashMap<>();
     private MetricRegistry registry = new MetricRegistry();
     private boolean running;
     private boolean initialized;
-    private final static Logger LOG = LoggerFactory.getLogger(MetricSystem.class);
+    private static final Logger LOG = LoggerFactory.getLogger(MetricSystem.class);
     private final Map<String, Object> metricTags = new HashMap<>();
 
-    public MetricSystem(Config config){
+    public MetricSystem(Config config) {
         this.config = config;
     }
 
-    public static MetricSystem load(Config config){
+    public static MetricSystem load(Config config) {
         MetricSystem instance = new MetricSystem(config);
         instance.loadFromConfig();
         return instance;
     }
 
     @Override
-    public void tags(Map<String,Object> metricTags){
+    public void tags(Map<String, Object> metricTags) {
         this.metricTags.putAll(metricTags);
     }
 
     @Override
     public void start() {
-        if(initialized)
+        if (initialized) {
             throw new IllegalStateException("Attempting to initialize a MetricsSystem that is already intialized");
-        sinks.forEach((sink,conf) -> sink.prepare(conf.withValue("tags",ConfigFactory.parseMap(metricTags).root()),registry));
+        }
+        sinks.forEach((sink, conf) -> sink.prepare(conf.withValue("tags", ConfigFactory.parseMap(metricTags).root()), registry));
         initialized = true;
     }
 
     @Override
     public void schedule() {
-        if(running){
-           throw  new IllegalStateException("Attempting to start a MetricsSystem that is already running");
+        if (running) {
+            throw new IllegalStateException("Attempting to start a MetricsSystem that is already running");
         }
 
-        sinks.keySet().forEach((sink)->sink.start(5, TimeUnit.SECONDS));
+        sinks.keySet().forEach((sink) -> sink.start(5, TimeUnit.SECONDS));
         running = true;
     }
 
-    public void loadFromConfig(){
+    public void loadFromConfig() {
         loadSinksFromConfig();
     }
 
-    private void loadSinksFromConfig(){
+    private void loadSinksFromConfig() {
         Config sinkCls = config.hasPath("metric.sink") ? config.getConfig("metric.sink") : null;
-        if(sinkCls == null){
+        if (sinkCls == null) {
             // do nothing
-        }else{
-            for(String sinkType:sinkCls.root().unwrapped().keySet()){
-                register(MetricSinkRepository.createSink(sinkType),config.getConfig("metric.sink."+sinkType));
+        } else {
+            for (String sinkType : sinkCls.root().unwrapped().keySet()) {
+                register(MetricSinkRepository.createSink(sinkType), config.getConfig("metric.sink." + sinkType));
             }
         }
     }
@@ -99,9 +99,9 @@ public class MetricSystem implements IMetricSystem {
     }
 
     @Override
-    public void register(MetricSink sink,Config config) {
-        LOG.debug("Register {}",sink);
-        sinks.put(sink,config);
+    public void register(MetricSink sink, Config config) {
+        LOG.debug("Register {}", sink);
+        sinks.put(sink, config);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a0fc8708/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/metric/entity/MetricEvent.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/metric/entity/MetricEvent.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/metric/entity/MetricEvent.java
index b5e6c63..f1262c7 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/metric/entity/MetricEvent.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/metric/entity/MetricEvent.java
@@ -16,55 +16,54 @@
  */
 package org.apache.eagle.alert.metric.entity;
 
-import java.util.Map;
-import java.util.TreeMap;
-
 import org.apache.eagle.alert.utils.DateTimeUtil;
-
 import com.codahale.metrics.Counter;
 import com.codahale.metrics.Gauge;
 import com.codahale.metrics.Histogram;
 import com.codahale.metrics.Meter;
 import com.codahale.metrics.Snapshot;
 import com.codahale.metrics.Timer;
+import java.util.Map;
+import java.util.TreeMap;
 
-public class MetricEvent extends TreeMap<String,Object>{
+public class MetricEvent extends TreeMap<String, Object> {
 
     private static final long serialVersionUID = 6862373651636342744L;
 
-    public static Builder of(String name){
+    public static Builder of(String name) {
         return new Builder(name);
     }
 
     /**
-     * TODO: Refactor according to ConsoleReporter
+     * TODO: Refactor according to ConsoleReporter.
      */
-    public static class Builder{
+    public static class Builder {
         private final String name;
         private MetricEvent instance;
-        public Builder(String name){
+
+        public Builder(String name) {
             this.instance = new MetricEvent();
             this.name = name;
         }
 
-        public Builder from(Counter value) {
-//            this.instance.put("type","counter");
-            this.instance.put("count",value.getCount());
-            return this;
-        }
-
-        public MetricEvent build(){
-            this.instance.put("name",name);
-            if(!this.instance.containsKey("timestamp")){
+        public MetricEvent build() {
+            this.instance.put("name", name);
+            if (!this.instance.containsKey("timestamp")) {
                 this.instance.put("timestamp", DateTimeUtil.getCurrentTimestamp());
             }
             return this.instance;
         }
 
-        @SuppressWarnings({ "rawtypes", "unchecked" })
+        public Builder from(Counter value) {
+            // this.instance.put("type","counter");
+            this.instance.put("count", value.getCount());
+            return this;
+        }
+
+        @SuppressWarnings( {"rawtypes", "unchecked"})
         public Builder from(Gauge gauge) {
             Object value = gauge.getValue();
-            if( value instanceof Map){
+            if (value instanceof Map) {
                 Map<? extends String, ?> map = (Map<? extends String, ?>) value;
                 this.instance.putAll(map);
             } else {
@@ -74,7 +73,7 @@ public class MetricEvent extends TreeMap<String,Object>{
         }
 
         public Builder from(Histogram value) {
-            this.instance.put("count",value.getCount());
+            this.instance.put("count", value.getCount());
             Snapshot snapshot = value.getSnapshot();
             this.instance.put("min", snapshot.getMin());
             this.instance.put("max", snapshot.getMax());
@@ -90,21 +89,21 @@ public class MetricEvent extends TreeMap<String,Object>{
         }
 
         public Builder from(Meter value) {
-            this.instance.put("value",value.getCount());
-            this.instance.put("15MinRate",value.getFifteenMinuteRate());
-            this.instance.put("5MinRate",value.getFiveMinuteRate());
-            this.instance.put("mean",value.getMeanRate());
-            this.instance.put("1MinRate",value.getOneMinuteRate());
+            this.instance.put("value", value.getCount());
+            this.instance.put("15MinRate", value.getFifteenMinuteRate());
+            this.instance.put("5MinRate", value.getFiveMinuteRate());
+            this.instance.put("mean", value.getMeanRate());
+            this.instance.put("1MinRate", value.getOneMinuteRate());
             return this;
         }
 
         public Builder from(Timer value) {
-//            this.instance.put("type","timer");
-            this.instance.put("value",value.getCount());
-            this.instance.put("15MinRate",value.getFifteenMinuteRate());
-            this.instance.put("5MinRate",value.getFiveMinuteRate());
-            this.instance.put("mean",value.getMeanRate());
-            this.instance.put("1MinRate",value.getOneMinuteRate());
+            // this.instance.put("type","timer");
+            this.instance.put("value", value.getCount());
+            this.instance.put("15MinRate", value.getFifteenMinuteRate());
+            this.instance.put("5MinRate", value.getFiveMinuteRate());
+            this.instance.put("mean", value.getMeanRate());
+            this.instance.put("1MinRate", value.getOneMinuteRate());
             return this;
         }
     }