You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@eagle.apache.org by ha...@apache.org on 2016/09/08 07:14:06 UTC

[05/18] incubator-eagle git commit: [EAGLE-530] Fix checkstyle problems on eagle-alert module and enable failOnViolation

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a0fc8708/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/nodata/TestNoDataAlert.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/nodata/TestNoDataAlert.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/nodata/TestNoDataAlert.java
index f50ad15..fe70630 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/nodata/TestNoDataAlert.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/nodata/TestNoDataAlert.java
@@ -28,20 +28,20 @@ import org.wso2.siddhi.core.util.EventPrinter;
  */
 public class TestNoDataAlert {
     @Test
-    public void test() throws Exception{
-        String[] expectHosts = new String[]{"host_1","host_2","host_3","host_4","host_5","host_6","host_7","host_8"};
+    public void test() throws Exception {
+        String[] expectHosts = new String[] {"host_1", "host_2", "host_3", "host_4", "host_5", "host_6", "host_7", "host_8"};
 //        String[] appearHosts = new String[]{"host_6","host_7","host_8"};
 //        String[] noDataHosts = new String[]{"host_1","host_2","host_3","host_4","host_5"};
 
         ExecutionPlanRuntime runtime = new SiddhiManager().createExecutionPlanRuntime(
-                "define stream appearStream (key string, src string);"+
-                        "define stream expectStream (key string, src string);"+
-                        "define table expectTable (key string, src string);"+
-                        "define trigger fiveSecTriggerStream at every 1 sec;"+
-                        "define trigger initAppearTriggerStream at 'start';"+
-                        "from expectStream insert into expectTable;"+
-                        "from fiveSecTriggerStream join expectTable insert into triggerExpectStream;"+
-                        "from initAppearTriggerStream join expectTable insert into initAppearStream;"
+            "define stream appearStream (key string, src string);" +
+                "define stream expectStream (key string, src string);" +
+                "define table expectTable (key string, src string);" +
+                "define trigger fiveSecTriggerStream at every 1 sec;" +
+                "define trigger initAppearTriggerStream at 'start';" +
+                "from expectStream insert into expectTable;" +
+                "from fiveSecTriggerStream join expectTable insert into triggerExpectStream;" +
+                "from initAppearTriggerStream join expectTable insert into initAppearStream;"
 //                        "from triggerExpectStream as l left outer join appearStream#window.time(5 sec) as r on l.key == r.key select l.key as k1,r.key as k2 insert current events into joinStream;" +
 //                        "from joinStream[k2 is null] select k1 insert current events into missingStream;"
         );
@@ -65,8 +65,8 @@ public class TestNoDataAlert {
         });
 
         runtime.start();
-        for(String host: expectHosts) {
-            runtime.getInputHandler("expectStream").send(System.currentTimeMillis(), new Object[]{host,"expectStream"});
+        for (String host : expectHosts) {
+            runtime.getInputHandler("expectStream").send(System.currentTimeMillis(), new Object[] {host, "expectStream"});
         }
 
 //        for(String host:appearHosts) {
@@ -83,17 +83,17 @@ public class TestNoDataAlert {
 
     /**
      * only alert when the successive 2 events has number of missing blocks changed
-     *from every a = hadoopJmxMetricEventStream[ component=="namenode" and metric == "hadoop.namenode.dfs.missingblocks"] -> b = hadoopJmxMetricEventStream[b.component==a.component and b.metric==a.metric and b.host==a.host and convert(b.value, "long") > convert(a.value, "long") ] select b.metric as metric, b.host as host, b.value as newNumOfMissingBlocks, a.value as oldNumOfMissingBlocks, b.timestamp as timestamp, b.component as component, b.site as site insert into tmp;
+     * from every a = hadoopJmxMetricEventStream[ component=="namenode" and metric == "hadoop.namenode.dfs.missingblocks"] -> b = hadoopJmxMetricEventStream[b.component==a.component and b.metric==a.metric and b.host==a.host and convert(b.value, "long") > convert(a.value, "long") ] select b.metric as metric, b.host as host, b.value as newNumOfMissingBlocks, a.value as oldNumOfMissingBlocks, b.timestamp as timestamp, b.component as component, b.site as site insert into tmp;
      */
     @Test
-    public void testMissingBlock() throws Exception{
+    public void testMissingBlock() throws Exception {
         ExecutionPlanRuntime runtime = new SiddhiManager().createExecutionPlanRuntime(
-                "define stream hadoopJmxMetricEventStream (component string, metric string, host string, site string, value double, timestamp long);"+
-                        "from every a = hadoopJmxMetricEventStream[ component==\"namenode\" and metric == \"hadoop.namenode.dfs.missingblocks\"] -> "+
-                        "b = hadoopJmxMetricEventStream[b.component==a.component and b.metric==a.metric and b.host==a.host and "+
-                        "convert(b.value, \"long\") > convert(a.value, \"long\") ] select b.metric as metric, b.host as host, "+
-                        "b.value as newNumOfMissingBlocks, a.value as oldNumOfMissingBlocks, b.timestamp as timestamp, b.component as component, " +
-                        "b.site as site insert into outputStream;"
+            "define stream hadoopJmxMetricEventStream (component string, metric string, host string, site string, value double, timestamp long);" +
+                "from every a = hadoopJmxMetricEventStream[ component==\"namenode\" and metric == \"hadoop.namenode.dfs.missingblocks\"] -> " +
+                "b = hadoopJmxMetricEventStream[b.component==a.component and b.metric==a.metric and b.host==a.host and " +
+                "convert(b.value, \"long\") > convert(a.value, \"long\") ] select b.metric as metric, b.host as host, " +
+                "b.value as newNumOfMissingBlocks, a.value as oldNumOfMissingBlocks, b.timestamp as timestamp, b.component as component, " +
+                "b.site as site insert into outputStream;"
         );
 
         runtime.addCallback("outputStream", new StreamCallback() {
@@ -104,9 +104,9 @@ public class TestNoDataAlert {
         });
 
         runtime.start();
-        runtime.getInputHandler("hadoopJmxMetricEventStream").send(System.currentTimeMillis(), new Object[]{"namenode", "hadoop.namenode.dfs.missingblocks", "host1", "site1", 12.0, 123000L});
-        runtime.getInputHandler("hadoopJmxMetricEventStream").send(System.currentTimeMillis(), new Object[]{"namenode", "hadoop.namenode.dfs.missingblocks", "host1", "site1", 13.0, 123100L});
-        runtime.getInputHandler("hadoopJmxMetricEventStream").send(System.currentTimeMillis(), new Object[]{"namenode", "hadoop.namenode.dfs.missingblocks", "host1", "site1", 16.0, 123200L});
+        runtime.getInputHandler("hadoopJmxMetricEventStream").send(System.currentTimeMillis(), new Object[] {"namenode", "hadoop.namenode.dfs.missingblocks", "host1", "site1", 12.0, 123000L});
+        runtime.getInputHandler("hadoopJmxMetricEventStream").send(System.currentTimeMillis(), new Object[] {"namenode", "hadoop.namenode.dfs.missingblocks", "host1", "site1", 13.0, 123100L});
+        runtime.getInputHandler("hadoopJmxMetricEventStream").send(System.currentTimeMillis(), new Object[] {"namenode", "hadoop.namenode.dfs.missingblocks", "host1", "site1", 16.0, 123200L});
 
 
         Thread.sleep(5000);

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a0fc8708/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/nodata/TestNoDataPolicyHandler.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/nodata/TestNoDataPolicyHandler.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/nodata/TestNoDataPolicyHandler.java
index 6305da8..5564b90 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/nodata/TestNoDataPolicyHandler.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/nodata/TestNoDataPolicyHandler.java
@@ -16,10 +16,6 @@
  */
 package org.apache.eagle.alert.engine.nodata;
 
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.Map;
-
 import org.apache.eagle.alert.engine.Collector;
 import org.apache.eagle.alert.engine.coordinator.PolicyDefinition;
 import org.apache.eagle.alert.engine.coordinator.StreamColumn;
@@ -33,6 +29,10 @@ import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+
 /**
  * Since 6/29/16.
  */
@@ -42,13 +42,13 @@ public class TestNoDataPolicyHandler {
     private static final String outputStream = "testOutputStream";
 
     @Test
-    public void test() throws Exception{
+    public void test() throws Exception {
         test(buildPolicyDef_provided());
         test(buildPolicyDef_dynamic());
     }
 
     @SuppressWarnings("unchecked")
-    public void test(PolicyDefinition pd) throws Exception{
+    public void test(PolicyDefinition pd) throws Exception {
         Map<String, StreamDefinition> sds = new HashMap<>();
         StreamDefinition sd = buildStreamDef();
         sds.put("testInputStream", sd);
@@ -71,17 +71,17 @@ public class TestNoDataPolicyHandler {
     }
 
     @SuppressWarnings("rawtypes")
-    private static class TestCollector implements Collector{
+    private static class TestCollector implements Collector {
         @Override
         public void emit(Object o) {
-            AlertStreamEvent e = (AlertStreamEvent)o;
+            AlertStreamEvent e = (AlertStreamEvent) o;
             Object[] data = e.getData();
             Assert.assertEquals("host2", data[1]);
             LOG.info(e.toString());
         }
     }
 
-    private PolicyDefinition buildPolicyDef_provided(){
+    private PolicyDefinition buildPolicyDef_provided() {
         PolicyDefinition pd = new PolicyDefinition();
         PolicyDefinition.Definition def = new PolicyDefinition.Definition();
         def.setValue("PT1M,provided,1,host,host1,host2");
@@ -93,7 +93,7 @@ public class TestNoDataPolicyHandler {
         return pd;
     }
 
-    private PolicyDefinition buildPolicyDef_dynamic(){
+    private PolicyDefinition buildPolicyDef_dynamic() {
         PolicyDefinition pd = new PolicyDefinition();
         PolicyDefinition.Definition def = new PolicyDefinition.Definition();
         def.setValue("PT1M,dynamic,1,host");
@@ -104,7 +104,8 @@ public class TestNoDataPolicyHandler {
         pd.setName("nodataalert-test");
         return pd;
     }
-    private StreamDefinition buildStreamDef(){
+
+    private StreamDefinition buildStreamDef() {
         StreamDefinition sd = new StreamDefinition();
         StreamColumn tsColumn = new StreamColumn();
         tsColumn.setName("timestamp");
@@ -124,9 +125,9 @@ public class TestNoDataPolicyHandler {
         return sd;
     }
 
-    private StreamEvent buildStreamEvt(long ts, String host, double value){
+    private StreamEvent buildStreamEvt(long ts, String host, double value) {
         StreamEvent e = new StreamEvent();
-        e.setData(new Object[]{ts, host, value});
+        e.setData(new Object[] {ts, host, value});
         e.setStreamId(inputStream);
         e.setTimestamp(ts);
         return e;

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a0fc8708/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/nodata/TestNoDataPolicyTimeBatchHandler.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/nodata/TestNoDataPolicyTimeBatchHandler.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/nodata/TestNoDataPolicyTimeBatchHandler.java
index 02d19b4..84844e7 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/nodata/TestNoDataPolicyTimeBatchHandler.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/nodata/TestNoDataPolicyTimeBatchHandler.java
@@ -16,10 +16,6 @@
  */
 package org.apache.eagle.alert.engine.nodata;
 
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.Map;
-
 import org.apache.eagle.alert.engine.Collector;
 import org.apache.eagle.alert.engine.coordinator.PolicyDefinition;
 import org.apache.eagle.alert.engine.coordinator.StreamColumn;
@@ -34,125 +30,129 @@ import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+
 public class TestNoDataPolicyTimeBatchHandler {
 
-	private static final Logger LOG = LoggerFactory.getLogger(TestNoDataPolicyTimeBatchHandler.class);
-	
-	private static final String inputStream = "testInputStream";
-	private static final String outputStream = "testOutputStream";
-
-	@Before
-	public void setup() {
-	}
-
-	@SuppressWarnings("unchecked")
-	@Test
-	public void testDynamic1() throws Exception {
-		Map<String, StreamDefinition> sds = new HashMap<>();
-		sds.put("testInputStream", buildStreamDef());
-		sds.put("testOutputStream", buildOutputStreamDef());
-		NoDataPolicyTimeBatchHandler handler = new NoDataPolicyTimeBatchHandler(sds);
-
-		PolicyHandlerContext context = new PolicyHandlerContext();
-		context.setPolicyDefinition(buildPolicyDef_dynamic());
-		handler.prepare(new TestCollector(), context);
-
-		long now = System.currentTimeMillis();
-		
-		handler.send(buildStreamEvt(now, "host1", 12.5));
-		
-		Thread.sleep(2000);
-		
-		handler.send(buildStreamEvt(now, "host2", 12.6));
-		handler.send(buildStreamEvt(now, "host1", 20.9));
-		handler.send(buildStreamEvt(now, "host2", 22.1));
-		handler.send(buildStreamEvt(now, "host2", 22.1));
-		
-		Thread.sleep(5000);
-		
-		handler.send(buildStreamEvt(now, "host2", 22.1));
-		handler.send(buildStreamEvt(now, "host2", 22.3));
-		
-		Thread.sleep(5000);
-		
-		handler.send(buildStreamEvt(now, "host2", 22.9));
-		handler.send(buildStreamEvt(now, "host1", 41.6));
-		handler.send(buildStreamEvt(now, "host2", 45.6));
-		
-		Thread.sleep(1000);
-	}
-	
-	@SuppressWarnings("rawtypes")
-    private static class TestCollector implements Collector{
+    private static final Logger LOG = LoggerFactory.getLogger(TestNoDataPolicyTimeBatchHandler.class);
+
+    private static final String inputStream = "testInputStream";
+    private static final String outputStream = "testOutputStream";
+
+    @Before
+    public void setup() {
+    }
+
+    @SuppressWarnings("unchecked")
+    @Test
+    public void testDynamic1() throws Exception {
+        Map<String, StreamDefinition> sds = new HashMap<>();
+        sds.put("testInputStream", buildStreamDef());
+        sds.put("testOutputStream", buildOutputStreamDef());
+        NoDataPolicyTimeBatchHandler handler = new NoDataPolicyTimeBatchHandler(sds);
+
+        PolicyHandlerContext context = new PolicyHandlerContext();
+        context.setPolicyDefinition(buildPolicyDef_dynamic());
+        handler.prepare(new TestCollector(), context);
+
+        long now = System.currentTimeMillis();
+
+        handler.send(buildStreamEvt(now, "host1", 12.5));
+
+        Thread.sleep(2000);
+
+        handler.send(buildStreamEvt(now, "host2", 12.6));
+        handler.send(buildStreamEvt(now, "host1", 20.9));
+        handler.send(buildStreamEvt(now, "host2", 22.1));
+        handler.send(buildStreamEvt(now, "host2", 22.1));
+
+        Thread.sleep(5000);
+
+        handler.send(buildStreamEvt(now, "host2", 22.1));
+        handler.send(buildStreamEvt(now, "host2", 22.3));
+
+        Thread.sleep(5000);
+
+        handler.send(buildStreamEvt(now, "host2", 22.9));
+        handler.send(buildStreamEvt(now, "host1", 41.6));
+        handler.send(buildStreamEvt(now, "host2", 45.6));
+
+        Thread.sleep(1000);
+    }
+
+    @SuppressWarnings("rawtypes")
+    private static class TestCollector implements Collector {
         @Override
         public void emit(Object o) {
-            AlertStreamEvent e = (AlertStreamEvent)o;
+            AlertStreamEvent e = (AlertStreamEvent) o;
             Object[] data = e.getData();
-            
+
             LOG.info("alert data: {}, {}", data[1], data[0]);
-            
+
             Assert.assertEquals("host1", data[1]);
         }
     }
 
-	private PolicyDefinition buildPolicyDef_dynamic() {
-		PolicyDefinition pd = new PolicyDefinition();
-		PolicyDefinition.Definition def = new PolicyDefinition.Definition();
-		def.setValue("PT5S,dynamic,1,host");
-		def.setType("nodataalert");
-		pd.setDefinition(def);
-		pd.setInputStreams(Arrays.asList(inputStream));
-		pd.setOutputStreams(Arrays.asList(outputStream));
-		pd.setName("nodataalert-test");
-		return pd;
-	}
-
-	private StreamDefinition buildStreamDef() {
-		StreamDefinition sd = new StreamDefinition();
-		StreamColumn tsColumn = new StreamColumn();
-		tsColumn.setName("timestamp");
-		tsColumn.setType(StreamColumn.Type.LONG);
-
-		StreamColumn hostColumn = new StreamColumn();
-		hostColumn.setName("host");
-		hostColumn.setType(StreamColumn.Type.STRING);
-
-		StreamColumn valueColumn = new StreamColumn();
-		valueColumn.setName("value");
-		valueColumn.setType(StreamColumn.Type.DOUBLE);
-
-		sd.setColumns(Arrays.asList(tsColumn, hostColumn, valueColumn));
-		sd.setDataSource("testDataSource");
-		sd.setStreamId("testInputStream");
-		return sd;
-	}
-	
-	private StreamDefinition buildOutputStreamDef() {
-		StreamDefinition sd = new StreamDefinition();
-		StreamColumn tsColumn = new StreamColumn();
-		tsColumn.setName("timestamp");
-		tsColumn.setType(StreamColumn.Type.LONG);
-
-		StreamColumn hostColumn = new StreamColumn();
-		hostColumn.setName("host");
-		hostColumn.setType(StreamColumn.Type.STRING);
-
-		StreamColumn valueColumn = new StreamColumn();
-		valueColumn.setName("originalStreamName");
-		valueColumn.setType(StreamColumn.Type.STRING);
-
-		sd.setColumns(Arrays.asList(tsColumn, hostColumn, valueColumn));
-		sd.setDataSource("testDataSource");
-		sd.setStreamId("testOutputStream");
-		return sd;
-	}
-
-	private StreamEvent buildStreamEvt(long ts, String host, double value) {
-		StreamEvent e = new StreamEvent();
-		e.setData(new Object[] { ts, host, value });
-		e.setStreamId(inputStream);
-		e.setTimestamp(ts);
-		return e;
-	}
+    private PolicyDefinition buildPolicyDef_dynamic() {
+        PolicyDefinition pd = new PolicyDefinition();
+        PolicyDefinition.Definition def = new PolicyDefinition.Definition();
+        def.setValue("PT5S,dynamic,1,host");
+        def.setType("nodataalert");
+        pd.setDefinition(def);
+        pd.setInputStreams(Arrays.asList(inputStream));
+        pd.setOutputStreams(Arrays.asList(outputStream));
+        pd.setName("nodataalert-test");
+        return pd;
+    }
+
+    private StreamDefinition buildStreamDef() {
+        StreamDefinition sd = new StreamDefinition();
+        StreamColumn tsColumn = new StreamColumn();
+        tsColumn.setName("timestamp");
+        tsColumn.setType(StreamColumn.Type.LONG);
+
+        StreamColumn hostColumn = new StreamColumn();
+        hostColumn.setName("host");
+        hostColumn.setType(StreamColumn.Type.STRING);
+
+        StreamColumn valueColumn = new StreamColumn();
+        valueColumn.setName("value");
+        valueColumn.setType(StreamColumn.Type.DOUBLE);
+
+        sd.setColumns(Arrays.asList(tsColumn, hostColumn, valueColumn));
+        sd.setDataSource("testDataSource");
+        sd.setStreamId("testInputStream");
+        return sd;
+    }
+
+    private StreamDefinition buildOutputStreamDef() {
+        StreamDefinition sd = new StreamDefinition();
+        StreamColumn tsColumn = new StreamColumn();
+        tsColumn.setName("timestamp");
+        tsColumn.setType(StreamColumn.Type.LONG);
+
+        StreamColumn hostColumn = new StreamColumn();
+        hostColumn.setName("host");
+        hostColumn.setType(StreamColumn.Type.STRING);
+
+        StreamColumn valueColumn = new StreamColumn();
+        valueColumn.setName("originalStreamName");
+        valueColumn.setType(StreamColumn.Type.STRING);
+
+        sd.setColumns(Arrays.asList(tsColumn, hostColumn, valueColumn));
+        sd.setDataSource("testDataSource");
+        sd.setStreamId("testOutputStream");
+        return sd;
+    }
+
+    private StreamEvent buildStreamEvt(long ts, String host, double value) {
+        StreamEvent e = new StreamEvent();
+        e.setData(new Object[] {ts, host, value});
+        e.setStreamId(inputStream);
+        e.setTimestamp(ts);
+        return e;
+    }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a0fc8708/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/perf/TestSerDeserPer.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/perf/TestSerDeserPer.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/perf/TestSerDeserPer.java
index f2027b2..77ab9c3 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/perf/TestSerDeserPer.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/perf/TestSerDeserPer.java
@@ -16,12 +16,9 @@
  */
 package org.apache.eagle.alert.engine.perf;
 
-import java.io.FileInputStream;
-import java.io.FileOutputStream;
-import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.List;
-
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.Output;
 import org.apache.commons.io.FilenameUtils;
 import org.apache.eagle.alert.engine.coordinator.StreamPartition;
 import org.apache.eagle.alert.engine.coordinator.StreamSortSpec;
@@ -31,23 +28,26 @@ import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 
-import com.esotericsoftware.kryo.Kryo;
-import com.esotericsoftware.kryo.io.Input;
-import com.esotericsoftware.kryo.io.Output;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
 
 /**
  * Since 5/13/16.
  */
 public class TestSerDeserPer {
     Object[] data = null;
+
     @Before
-    public void before(){
+    public void before() {
         int max = 100;
         StringBuilder sb = new StringBuilder();
-        for(int i=0; i<max; i++){
+        for (int i = 0; i < max; i++) {
             sb.append("a");
         }
-        data = new Object[]{sb.toString()};
+        data = new Object[] {sb.toString()};
     }
 
     private String getTmpPath() {
@@ -55,11 +55,11 @@ public class TestSerDeserPer {
     }
 
     @Test
-    public void testSerDeserPerf() throws Exception{
+    public void testSerDeserPerf() throws Exception {
         Kryo kryo = new Kryo();
         String outputPath = FilenameUtils.concat(getTmpPath(), "file.bin");
         Output output = new Output(new FileOutputStream(outputPath));
-        for(int i=0; i<1000; i++){
+        for (int i = 0; i < 1000; i++) {
             kryo.writeObject(output, constructPE());
         }
         output.close();
@@ -69,7 +69,7 @@ public class TestSerDeserPer {
         Assert.assertTrue(someObject.getData().length == 1);
     }
 
-    private PartitionedEvent constructPE(){
+    private PartitionedEvent constructPE() {
         StreamEvent e = new StreamEvent();
         e.setStreamId("testStreamId");
         e.setTimestamp(1463159382000L);
@@ -92,11 +92,11 @@ public class TestSerDeserPer {
     }
 
     @Test
-    public void testSerDeserPerf2() throws Exception{
+    public void testSerDeserPerf2() throws Exception {
         Kryo kryo = new Kryo();
         String outputPath = FilenameUtils.concat(getTmpPath(), "file2.bin");
         Output output = new Output(new FileOutputStream(outputPath));
-        for(int i=0; i<1000; i++){
+        for (int i = 0; i < 1000; i++) {
             kryo.writeObject(output, constructNewPE());
         }
         output.close();
@@ -106,7 +106,7 @@ public class TestSerDeserPer {
         Assert.assertTrue(someObject.getData().length == 1);
     }
 
-    private NewPartitionedEvent constructNewPE(){
+    private NewPartitionedEvent constructNewPE() {
         NewPartitionedEvent pe = new NewPartitionedEvent();
         pe.setStreamId("testStreamId");
         pe.setTimestamp(1463159382000L);
@@ -124,11 +124,11 @@ public class TestSerDeserPer {
     }
 
     @Test
-    public void testSerDeserPerf3() throws Exception{
+    public void testSerDeserPerf3() throws Exception {
         Kryo kryo = new Kryo();
         String outputPath = FilenameUtils.concat(getTmpPath(), "file3.bin");
         Output output = new Output(new FileOutputStream(outputPath));
-        for(int i=0; i<1000; i++){
+        for (int i = 0; i < 1000; i++) {
             kryo.writeObject(output, constructNewPE2());
         }
         output.close();
@@ -138,7 +138,7 @@ public class TestSerDeserPer {
         Assert.assertTrue(someObject.getData().length == 1);
     }
 
-    private NewPartitionedEvent2 constructNewPE2(){
+    private NewPartitionedEvent2 constructNewPE2() {
         NewPartitionedEvent2 pe = new NewPartitionedEvent2();
         pe.setStreamId(100);
         pe.setTimestamp(1463159382000L);
@@ -168,10 +168,10 @@ public class TestSerDeserPer {
         private long partitionKey;
 
         // sort spec
-        private String windowPeriod="";
+        private String windowPeriod = "";
         private long windowMargin = 30 * 1000;
 
-        public NewPartitionedEvent(){
+        public NewPartitionedEvent() {
         }
 
         public String getStreamId() {
@@ -255,7 +255,7 @@ public class TestSerDeserPer {
         private long windowPeriod;
         private long windowMargin = 30 * 1000;
 
-        public NewPartitionedEvent2(){
+        public NewPartitionedEvent2() {
         }
 
         public int getStreamId() {

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a0fc8708/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/router/TestAlertBolt.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/router/TestAlertBolt.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/router/TestAlertBolt.java
index f3548d8..4bec98d 100755
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/router/TestAlertBolt.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/router/TestAlertBolt.java
@@ -58,7 +58,7 @@ import static org.mockito.Mockito.when;
 /**
  * Since 5/2/16.
  */
-@SuppressWarnings({"rawtypes", "unused"})
+@SuppressWarnings( {"rawtypes", "unused"})
 public class TestAlertBolt {
 
     public static final String TEST_STREAM = "test-stream";
@@ -66,22 +66,20 @@ public class TestAlertBolt {
     /**
      * Following knowledge is guaranteed in
      *
+     * @throws Exception Add test case: 2 alerts should be generated even if they are very close to each other in timestamp
      * @see org.apache.eagle.alert.engine.runner.AlertBolt#execute{
-     *    if(!routedStreamEvent.getRoute().getTargetComponentId().equals(this.policyGroupEvaluator.getName())){
-     *      throw new IllegalStateException("Got event targeted to "+ routedStreamEvent.getRoute().getTargetComponentId()+" in "+this.policyGroupEvaluator.getName());
-     *    }
+     * if(!routedStreamEvent.getRoute().getTargetComponentId().equals(this.policyGroupEvaluator.getName())){
+     * throw new IllegalStateException("Got event targeted to "+ routedStreamEvent.getRoute().getTargetComponentId()+" in "+this.policyGroupEvaluator.getName());
+     * }
      * }
-     *
-     * @throws Exception
-     *
-     * Add test case: 2 alerts should be generated even if they are very close to each other in timestamp
      */
     @Test
-    public void testAlertBolt() throws Exception{
+    public void testAlertBolt() throws Exception {
         final AtomicInteger alertCount = new AtomicInteger();
         final Semaphore mutex = new Semaphore(0);
-        OutputCollector collector = new OutputCollector(new IOutputCollector(){
+        OutputCollector collector = new OutputCollector(new IOutputCollector() {
             int count = 0;
+
             @Override
             public List<Integer> emit(String streamId, Collection<Tuple> anchors, List<Object> tuple) {
                 alertCount.incrementAndGet();
@@ -91,14 +89,22 @@ public class TestAlertBolt {
                 System.out.println(String.format("collector received: [streamId=[%s], tuple=[%s] ", streamId, tuple));
                 return null;
             }
+
             @Override
-            public void emitDirect(int taskId, String streamId, Collection<Tuple> anchors, List<Object> tuple) {            }
+            public void emitDirect(int taskId, String streamId, Collection<Tuple> anchors, List<Object> tuple) {
+            }
+
             @Override
-            public void ack(Tuple input) {            }
+            public void ack(Tuple input) {
+            }
+
             @Override
-            public void fail(Tuple input) {            }
+            public void fail(Tuple input) {
+            }
+
             @Override
-            public void reportError(Throwable error) {            }
+            public void reportError(Throwable error) {
+            }
         });
         AlertBolt bolt = createAlertBolt(collector);
 
@@ -143,27 +149,27 @@ public class TestAlertBolt {
 
         // construct event with "value1"
         StreamEvent event1 = new StreamEvent();
-        event1.setTimestamp(DateTimeUtil.humanDateToSeconds("2016-01-01 00:00:00")*1000);
+        event1.setTimestamp(DateTimeUtil.humanDateToSeconds("2016-01-01 00:00:00") * 1000);
         event1.setMetaVersion("version1");
-        Object[] data = new Object[]{"value1"};
+        Object[] data = new Object[] {"value1"};
         event1.setData(data);
         event1.setStreamId(streamId);
-        PartitionedEvent partitionedEvent1 = new PartitionedEvent(event1, sp,1001);
+        PartitionedEvent partitionedEvent1 = new PartitionedEvent(event1, sp, 1001);
 
         // construct another event with "value1"
         StreamEvent event2 = new StreamEvent();
-        event2.setTimestamp(DateTimeUtil.humanDateToSeconds("2016-01-01 00:00:00")*1000);
+        event2.setTimestamp(DateTimeUtil.humanDateToSeconds("2016-01-01 00:00:00") * 1000);
         event2.setMetaVersion("version1");
-        data = new Object[]{"value2"};
+        data = new Object[] {"value2"};
         event2.setData(data);
         event2.setStreamId(streamId);
-        PartitionedEvent partitionedEvent2 = new PartitionedEvent(event2, sp,1001);
+        PartitionedEvent partitionedEvent2 = new PartitionedEvent(event2, sp, 1001);
 
         Tuple input = new TupleImpl(context, Collections.singletonList(partitionedEvent1), taskId, "default");
         Tuple input2 = new TupleImpl(context, Collections.singletonList(partitionedEvent2), taskId, "default");
         bolt.execute(input);
         bolt.execute(input2);
-        Assert.assertTrue("Timeout to acquire mutex in 5s",mutex.tryAcquire(2, 5, TimeUnit.SECONDS));
+        Assert.assertTrue("Timeout to acquire mutex in 5s", mutex.tryAcquire(2, 5, TimeUnit.SECONDS));
         Assert.assertEquals(2, alertCount.get());
         bolt.cleanup();
     }
@@ -183,8 +189,9 @@ public class TestAlertBolt {
     @Test
     public void testMetadataMismatch() throws Exception {
         AtomicInteger failedCount = new AtomicInteger();
-        OutputCollector collector = new OutputCollector(new IOutputCollector(){
+        OutputCollector collector = new OutputCollector(new IOutputCollector() {
             int count = 0;
+
             @Override
             public List<Integer> emit(String streamId, Collection<Tuple> anchors, List<Object> tuple) {
                 Assert.assertEquals("testAlertStream", tuple.get(0));
@@ -192,14 +199,23 @@ public class TestAlertBolt {
                 System.out.println(String.format("collector received: [streamId=[%s], tuple=[%s] ", streamId, tuple));
                 return null;
             }
+
             @Override
-            public void emitDirect(int taskId, String streamId, Collection<Tuple> anchors, List<Object> tuple) {            }
+            public void emitDirect(int taskId, String streamId, Collection<Tuple> anchors, List<Object> tuple) {
+            }
+
             @Override
-            public void ack(Tuple input) {            }
+            public void ack(Tuple input) {
+            }
+
             @Override
-            public void fail(Tuple input) {      failedCount.incrementAndGet();      }
+            public void fail(Tuple input) {
+                failedCount.incrementAndGet();
+            }
+
             @Override
-            public void reportError(Throwable error) {            }
+            public void reportError(Throwable error) {
+            }
         });
         AlertBolt bolt = createAlertBolt(collector);
 
@@ -267,8 +283,9 @@ public class TestAlertBolt {
     @Test
     public void testMetaversionConflict() throws Exception {
         AtomicInteger failedCount = new AtomicInteger();
-        OutputCollector collector = new OutputCollector(new IOutputCollector(){
+        OutputCollector collector = new OutputCollector(new IOutputCollector() {
             int count = 0;
+
             @Override
             public List<Integer> emit(String streamId, Collection<Tuple> anchors, List<Object> tuple) {
                 Assert.assertEquals("testAlertStream", tuple.get(0));
@@ -276,14 +293,23 @@ public class TestAlertBolt {
                 System.out.println(String.format("collector received: [streamId=[%s], tuple=[%s] ", streamId, tuple));
                 return null;
             }
+
             @Override
-            public void emitDirect(int taskId, String streamId, Collection<Tuple> anchors, List<Object> tuple) {            }
+            public void emitDirect(int taskId, String streamId, Collection<Tuple> anchors, List<Object> tuple) {
+            }
+
             @Override
-            public void ack(Tuple input) {            }
+            public void ack(Tuple input) {
+            }
+
             @Override
-            public void fail(Tuple input) {      failedCount.incrementAndGet();      }
+            public void fail(Tuple input) {
+                failedCount.incrementAndGet();
+            }
+
             @Override
-            public void reportError(Throwable error) {            }
+            public void reportError(Throwable error) {
+            }
         });
         AlertBolt bolt = createAlertBolt(collector);
 
@@ -374,21 +400,25 @@ public class TestAlertBolt {
             }
 
             @Override
-            public void emitDirect(int taskId, String streamId, Collection<Tuple> anchors, List<Object> tuple) {}
+            public void emitDirect(int taskId, String streamId, Collection<Tuple> anchors, List<Object> tuple) {
+            }
 
             @Override
-            public void ack(Tuple input) {}
+            public void ack(Tuple input) {
+            }
 
             @Override
-            public void fail(Tuple input) {}
+            public void fail(Tuple input) {
+            }
 
             @Override
-            public void reportError(Throwable error) {}
+            public void reportError(Throwable error) {
+            }
         });
         AlertBolt bolt = createAlertBolt(collector);
 
         boltSpecs.getBoltPoliciesMap().put(bolt.getBoltId(), Arrays.asList(def));
-        boltSpecs.setVersion("spec_"+System.currentTimeMillis());
+        boltSpecs.setVersion("spec_" + System.currentTimeMillis());
         // stream def map
         Map<String, StreamDefinition> sds = new HashMap();
         StreamDefinition sdTest = new StreamDefinition();

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a0fc8708/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/router/TestAlertPublisherBolt.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/router/TestAlertPublisherBolt.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/router/TestAlertPublisherBolt.java
index 1e52036..61a0aba 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/router/TestAlertPublisherBolt.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/router/TestAlertPublisherBolt.java
@@ -66,13 +66,13 @@ public class TestAlertPublisherBolt {
         publisher.nextEvent(event1);
     }
 
-    private AlertStreamEvent create(String streamId){
+    private AlertStreamEvent create(String streamId) {
         AlertStreamEvent alert = new AlertStreamEvent();
         PolicyDefinition policy = new PolicyDefinition();
         policy.setName("policy1");
         alert.setPolicyId(policy.getName());
         alert.setCreatedTime(System.currentTimeMillis());
-        alert.setData(new Object[]{"field_1", 2, "field_3"});
+        alert.setData(new Object[] {"field_1", 2, "field_3"});
         alert.setStreamId(streamId);
         alert.setCreatedBy(this.toString());
         return alert;
@@ -165,13 +165,13 @@ public class TestAlertPublisherBolt {
         return l;
     }
 
-    private AlertStreamEvent createWithStreamDef(String hostname, String appName){
+    private AlertStreamEvent createWithStreamDef(String hostname, String appName) {
         AlertStreamEvent alert = new AlertStreamEvent();
         PolicyDefinition policy = new PolicyDefinition();
         policy.setName("perfmon_cpu_host_check");
         alert.setPolicyId(policy.getName());
         alert.setCreatedTime(System.currentTimeMillis());
-        alert.setData(new Object[]{appName, hostname});
+        alert.setData(new Object[] {appName, hostname});
         alert.setStreamId("testAlertStream");
         alert.setCreatedBy(this.toString());
 

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a0fc8708/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/runner/TestStreamRouterBolt.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/runner/TestStreamRouterBolt.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/runner/TestStreamRouterBolt.java
index 13550e1..830f1f7 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/runner/TestStreamRouterBolt.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/runner/TestStreamRouterBolt.java
@@ -16,19 +16,16 @@
  */
 package org.apache.eagle.alert.engine.runner;
 
-import static org.mockito.Matchers.any;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
+import backtype.storm.metric.api.MultiCountMetric;
+import backtype.storm.task.GeneralTopologyContext;
+import backtype.storm.task.IOutputCollector;
+import backtype.storm.task.OutputCollector;
+import backtype.storm.task.TopologyContext;
+import backtype.storm.tuple.Fields;
+import backtype.storm.tuple.Tuple;
+import backtype.storm.tuple.TupleImpl;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
 import org.apache.eagle.alert.coordination.model.PolicyWorkerQueue;
 import org.apache.eagle.alert.coordination.model.RouterSpec;
 import org.apache.eagle.alert.coordination.model.StreamRouterSpec;
@@ -40,7 +37,6 @@ import org.apache.eagle.alert.engine.coordinator.StreamSortSpec;
 import org.apache.eagle.alert.engine.coordinator.impl.AbstractMetadataChangeNotifyService;
 import org.apache.eagle.alert.engine.model.PartitionedEvent;
 import org.apache.eagle.alert.engine.model.StreamEvent;
-import org.apache.eagle.alert.engine.router.impl.StreamRouterImpl;
 import org.apache.eagle.alert.utils.DateTimeUtil;
 import org.apache.eagle.alert.utils.StreamIdConversion;
 import org.joda.time.Period;
@@ -49,43 +45,39 @@ import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import backtype.storm.metric.api.MultiCountMetric;
-import backtype.storm.task.GeneralTopologyContext;
-import backtype.storm.task.IOutputCollector;
-import backtype.storm.task.OutputCollector;
-import backtype.storm.task.TopologyContext;
-import backtype.storm.tuple.Fields;
-import backtype.storm.tuple.Tuple;
-import backtype.storm.tuple.TupleImpl;
+import java.io.IOException;
+import java.util.*;
 
-import com.typesafe.config.Config;
-import com.typesafe.config.ConfigFactory;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
 
 public class TestStreamRouterBolt {
     private final static Logger LOG = LoggerFactory.getLogger(TestStreamRouterBolt.class);
 
     /**
      * Mocked 5 Events
-     *
+     * <p>
      * 1. Sent in random order:
      * "value1","value2","value3","value4","value5"
-     *
+     * <p>
      * 2. Received correct time order and value5 is thrown because too late: "value2","value1","value3","value4"
      *
      * @throws Exception
      */
     @SuppressWarnings("rawtypes")
     @Test
-    public void testRouterWithSortAndRouteSpec() throws Exception{
+    public void testRouterWithSortAndRouteSpec() throws Exception {
         Config config = ConfigFactory.load();
         MockChangeService mockChangeService = new MockChangeService();
         StreamRouterBolt routerBolt = new StreamRouterBolt("routerBolt1", config, mockChangeService);
 
-        final Map<String,List<PartitionedEvent>> streamCollected = new HashMap<>();
+        final Map<String, List<PartitionedEvent>> streamCollected = new HashMap<>();
         final List<PartitionedEvent> orderCollected = new ArrayList<>();
 
-        OutputCollector collector = new OutputCollector(new IOutputCollector(){
+        OutputCollector collector = new OutputCollector(new IOutputCollector() {
             int count = 0;
+
             @Override
             public List<Integer> emit(String streamId, Collection<Tuple> anchors, List<Object> tuple) {
                 PartitionedEvent event;
@@ -94,27 +86,37 @@ public class TestStreamRouterBolt {
                 } catch (IOException e) {
                     throw new RuntimeException(e);
                 }
-                if(count == 0) {
+                if (count == 0) {
                     count++;
                 }
                 LOG.info(String.format("Collector received: [streamId=[%s], tuple=[%s] ", streamId, tuple));
-                if(!streamCollected.containsKey(streamId)){
-                    streamCollected.put(streamId,new ArrayList<>());
+                if (!streamCollected.containsKey(streamId)) {
+                    streamCollected.put(streamId, new ArrayList<>());
                 }
                 streamCollected.get(streamId).add(event);
                 orderCollected.add(event);
                 return null;
             }
+
             @Override
-            public void emitDirect(int taskId, String streamId, Collection<Tuple> anchors, List<Object> tuple) {            }
+            public void emitDirect(int taskId, String streamId, Collection<Tuple> anchors, List<Object> tuple) {
+            }
+
             @Override
-            public void ack(Tuple input) {            }
+            public void ack(Tuple input) {
+            }
+
             @Override
-            public void fail(Tuple input) {            }
+            public void fail(Tuple input) {
+            }
+
             @SuppressWarnings("unused")
-            public void resetTimeout(Tuple input) {            }
+            public void resetTimeout(Tuple input) {
+            }
+
             @Override
-            public void reportError(Throwable error) {            }
+            public void reportError(Throwable error) {
+            }
         });
 
         Map stormConf = new HashMap<>();
@@ -144,7 +146,7 @@ public class TestStreamRouterBolt {
         routerSpec.setStreamId(streamId);
         PolicyWorkerQueue queue = new PolicyWorkerQueue();
         queue.setPartition(sp);
-        queue.setWorkers(Arrays.asList(new WorkSlot("testTopology","alertBolt1"), new WorkSlot("testTopology","alertBolt2")));
+        queue.setWorkers(Arrays.asList(new WorkSlot("testTopology", "alertBolt1"), new WorkSlot("testTopology", "alertBolt2")));
         routerSpec.setTargetQueue(Collections.singletonList(queue));
         boltSpec.addRouterSpec(routerSpec);
         boltSpec.setVersion("version1");
@@ -178,8 +180,8 @@ public class TestStreamRouterBolt {
 
         // construct event with "value1"
         StreamEvent event = new StreamEvent();
-        event.setTimestamp(DateTimeUtil.humanDateToSeconds("2016-01-01 00:01:30")*1000);
-        Object[] data = new Object[]{"value1"};
+        event.setTimestamp(DateTimeUtil.humanDateToSeconds("2016-01-01 00:01:30") * 1000);
+        Object[] data = new Object[] {"value1"};
         event.setData(data);
         event.setStreamId(streamId);
         PartitionedEvent pEvent = new PartitionedEvent();
@@ -190,8 +192,8 @@ public class TestStreamRouterBolt {
 
         // construct another event with "value2"
         event = new StreamEvent();
-        event.setTimestamp(DateTimeUtil.humanDateToSeconds("2016-01-01 00:01:10")*1000);
-        data = new Object[]{"value2"};
+        event.setTimestamp(DateTimeUtil.humanDateToSeconds("2016-01-01 00:01:10") * 1000);
+        data = new Object[] {"value2"};
         event.setData(data);
         event.setStreamId(streamId);
         pEvent = new PartitionedEvent();
@@ -202,8 +204,8 @@ public class TestStreamRouterBolt {
 
         // construct another event with "value3"
         event = new StreamEvent();
-        event.setTimestamp(DateTimeUtil.humanDateToSeconds("2016-01-01 00:01:40")*1000);
-        data = new Object[]{"value3"};
+        event.setTimestamp(DateTimeUtil.humanDateToSeconds("2016-01-01 00:01:40") * 1000);
+        data = new Object[] {"value3"};
         event.setData(data);
         event.setStreamId(streamId);
         pEvent = new PartitionedEvent();
@@ -214,8 +216,8 @@ public class TestStreamRouterBolt {
 
         // construct another event with "value4"
         event = new StreamEvent();
-        event.setTimestamp(DateTimeUtil.humanDateToSeconds("2016-01-01 00:02:10")*1000);
-        data = new Object[]{"value4"};
+        event.setTimestamp(DateTimeUtil.humanDateToSeconds("2016-01-01 00:02:10") * 1000);
+        data = new Object[] {"value4"};
         event.setData(data);
         event.setStreamId(streamId);
         pEvent = new PartitionedEvent();
@@ -226,8 +228,8 @@ public class TestStreamRouterBolt {
 
         // construct another event with "value5", which will be thrown because two late
         event = new StreamEvent();
-        event.setTimestamp(DateTimeUtil.humanDateToSeconds("2016-01-01 00:00:10")*1000);
-        data = new Object[]{"value5"};
+        event.setTimestamp(DateTimeUtil.humanDateToSeconds("2016-01-01 00:00:10") * 1000);
+        data = new Object[] {"value5"};
         event.setData(data);
         event.setStreamId(streamId);
         pEvent = new PartitionedEvent();
@@ -236,14 +238,14 @@ public class TestStreamRouterBolt {
         input = new TupleImpl(context, Collections.singletonList(pEvent), taskId, "default");
         routerBolt.execute(input);
 
-        Assert.assertEquals("Should finally collect two streams",2,streamCollected.size());
-        Assert.assertTrue("Should collect stream stream_routerBolt_to_alertBolt1",streamCollected.keySet().contains(
-                String.format(StreamIdConversion.generateStreamIdBetween(routerBolt.getBoltId(), "alertBolt1"))));
-        Assert.assertTrue("Should collect stream stream_routerBolt_to_alertBolt2",streamCollected.keySet().contains(
-                String.format(StreamIdConversion.generateStreamIdBetween(routerBolt.getBoltId(), "alertBolt2"))));
+        Assert.assertEquals("Should finally collect two streams", 2, streamCollected.size());
+        Assert.assertTrue("Should collect stream stream_routerBolt_to_alertBolt1", streamCollected.keySet().contains(
+            String.format(StreamIdConversion.generateStreamIdBetween(routerBolt.getBoltId(), "alertBolt1"))));
+        Assert.assertTrue("Should collect stream stream_routerBolt_to_alertBolt2", streamCollected.keySet().contains(
+            String.format(StreamIdConversion.generateStreamIdBetween(routerBolt.getBoltId(), "alertBolt2"))));
 
-        Assert.assertEquals("Should finally collect 3 events",3,orderCollected.size());
-        Assert.assertArrayEquals("Should sort 3 events in ASC order",new String[]{"value2","value1","value3"},orderCollected.stream().map((d)->d.getData()[0]).toArray());
+        Assert.assertEquals("Should finally collect 3 events", 3, orderCollected.size());
+        Assert.assertArrayEquals("Should sort 3 events in ASC order", new String[] {"value2", "value1", "value3"}, orderCollected.stream().map((d) -> d.getData()[0]).toArray());
 
         // The first 3 events are ticked automatically by window
 
@@ -252,14 +254,14 @@ public class TestStreamRouterBolt {
         // Close will flush all events in memory, so will receive the last event which is still in memory as window is not expired according to clock
         // The 5th event will be thrown because too late and out of margin
 
-        Assert.assertEquals("Should finally collect two streams",2,streamCollected.size());
-        Assert.assertEquals("Should finally collect 3 events",4,orderCollected.size());
-        Assert.assertArrayEquals("Should sort 4 events in ASC-ordered timestamp",new String[]{"value2","value1","value3","value4"},orderCollected.stream().map((d)->d.getData()[0]).toArray());
+        Assert.assertEquals("Should finally collect two streams", 2, streamCollected.size());
+        Assert.assertEquals("Should finally collect 3 events", 4, orderCollected.size());
+        Assert.assertArrayEquals("Should sort 4 events in ASC-ordered timestamp", new String[] {"value2", "value1", "value3", "value4"}, orderCollected.stream().map((d) -> d.getData()[0]).toArray());
 
     }
 
     @SuppressWarnings("serial")
-    public static class MockChangeService extends AbstractMetadataChangeNotifyService{
+    public static class MockChangeService extends AbstractMetadataChangeNotifyService {
         private final static Logger LOG = LoggerFactory.getLogger(MockChangeService.class);
 
         @Override

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a0fc8708/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/serialization/JavaSerializationTest.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/serialization/JavaSerializationTest.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/serialization/JavaSerializationTest.java
index 13d1015..a3939cc 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/serialization/JavaSerializationTest.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/serialization/JavaSerializationTest.java
@@ -1,10 +1,5 @@
 package org.apache.eagle.alert.engine.serialization;
 
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.List;
-
 import org.apache.commons.lang3.SerializationUtils;
 import org.apache.eagle.alert.engine.coordinator.StreamColumn;
 import org.apache.eagle.alert.engine.coordinator.StreamDefinition;
@@ -17,6 +12,11 @@ import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+
 /**
  * Licensed to the Apache Software Foundation (ASF) under one or more
  * contributor license agreements.  See the NOTICE file distributed with
@@ -37,18 +37,18 @@ public class JavaSerializationTest {
     private final static Logger LOG = LoggerFactory.getLogger(JavaSerializationTest.class);
 
     @Test
-    public void testJavaSerialization(){
+    public void testJavaSerialization() {
         PartitionedEvent partitionedEvent = new PartitionedEvent();
         partitionedEvent.setPartitionKey(partitionedEvent.hashCode());
-        partitionedEvent.setPartition(createSampleStreamGroupbyPartition("sampleStream",Arrays.asList("name","host")));
+        partitionedEvent.setPartition(createSampleStreamGroupbyPartition("sampleStream", Arrays.asList("name", "host")));
         StreamEvent event = new StreamEvent();
         event.setStreamId("sampleStream");
         event.setTimestamp(System.currentTimeMillis());
-        event.setData(new Object[]{"CPU","LOCALHOST",true,Long.MAX_VALUE,60.0});
+        event.setData(new Object[] {"CPU", "LOCALHOST", true, Long.MAX_VALUE, 60.0});
         partitionedEvent.setEvent(event);
 
         int javaSerializationLength = SerializationUtils.serialize(partitionedEvent).length;
-        LOG.info("Java serialization length: {}, event: {}",javaSerializationLength,partitionedEvent);
+        LOG.info("Java serialization length: {}, event: {}", javaSerializationLength, partitionedEvent);
 
         int compactLength = 0;
         compactLength += "sampleStream".getBytes().length;
@@ -60,17 +60,17 @@ public class JavaSerializationTest {
         compactLength += ByteUtils.longToBytes(Long.MAX_VALUE).length;
         compactLength += ByteUtils.doubleToBytes(60.0).length;
 
-        LOG.info("Compact serialization length: {}, event: {}",compactLength,partitionedEvent);
+        LOG.info("Compact serialization length: {}, event: {}", compactLength, partitionedEvent);
         Assert.assertTrue(compactLength * 20 < javaSerializationLength);
     }
 
 
-    public static StreamDefinition createSampleStreamDefinition(String streamId){
+    public static StreamDefinition createSampleStreamDefinition(String streamId) {
         StreamDefinition sampleStreamDefinition = new StreamDefinition();
         sampleStreamDefinition.setStreamId(streamId);
         sampleStreamDefinition.setTimeseries(true);
         sampleStreamDefinition.setValidate(true);
-        sampleStreamDefinition.setDescription("Schema for "+streamId);
+        sampleStreamDefinition.setDescription("Schema for " + streamId);
         List<StreamColumn> streamColumns = new ArrayList<>();
 
         streamColumns.add(new StreamColumn.Builder().name("name").type(StreamColumn.Type.STRING).build());
@@ -82,7 +82,7 @@ public class JavaSerializationTest {
         return sampleStreamDefinition;
     }
 
-    public static StreamPartition createSampleStreamGroupbyPartition(String streamId, List<String> groupByField){
+    public static StreamPartition createSampleStreamGroupbyPartition(String streamId, List<String> groupByField) {
         StreamPartition streamPartition = new StreamPartition();
         streamPartition.setStreamId(streamId);
         streamPartition.setColumns(groupByField);
@@ -91,22 +91,22 @@ public class JavaSerializationTest {
     }
 
     @SuppressWarnings("serial")
-    public static PartitionedEvent createSimpleStreamEvent()  {
-        StreamEvent event = StreamEvent.Builder()
-                    .schema(createSampleStreamDefinition("sampleStream_1"))
-                    .streamId("sampleStream_1")
-                    .timestamep(System.currentTimeMillis())
-                    .attributes(new HashMap<String,Object>(){{
-                        put("name","cpu");
-                        put("host","localhost");
-                        put("flag",true);
-                        put("value",60.0);
-                        put("data",Long.MAX_VALUE);
-                        put("unknown","unknown column value");
-                    }}).build();
+    public static PartitionedEvent createSimpleStreamEvent() {
+        StreamEvent event = StreamEvent.builder()
+            .schema(createSampleStreamDefinition("sampleStream_1"))
+            .streamId("sampleStream_1")
+            .timestamep(System.currentTimeMillis())
+            .attributes(new HashMap<String, Object>() {{
+                put("name", "cpu");
+                put("host", "localhost");
+                put("flag", true);
+                put("value", 60.0);
+                put("data", Long.MAX_VALUE);
+                put("unknown", "unknown column value");
+            }}).build();
         PartitionedEvent pEvent = new PartitionedEvent();
         pEvent.setEvent(event);
-        pEvent.setPartition(createSampleStreamGroupbyPartition("sampleStream_1", Arrays.asList("name","host")));
+        pEvent.setPartition(createSampleStreamGroupbyPartition("sampleStream_1", Arrays.asList("name", "host")));
         return pEvent;
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a0fc8708/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/serialization/PartitionedEventSerializerTest.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/serialization/PartitionedEventSerializerTest.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/serialization/PartitionedEventSerializerTest.java
index a756ebe..4241c3c 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/serialization/PartitionedEventSerializerTest.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/serialization/PartitionedEventSerializerTest.java
@@ -16,9 +16,14 @@
  */
 package org.apache.eagle.alert.engine.serialization;
 
-import java.io.IOException;
-import java.util.BitSet;
-
+import backtype.storm.serialization.DefaultKryoFactory;
+import backtype.storm.serialization.DefaultSerializationDelegate;
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.Output;
+import com.google.common.io.ByteArrayDataInput;
+import com.google.common.io.ByteArrayDataOutput;
+import com.google.common.io.ByteStreams;
 import org.apache.commons.lang.time.StopWatch;
 import org.apache.eagle.alert.engine.mock.MockSampleMetadataFactory;
 import org.apache.eagle.alert.engine.model.PartitionedEvent;
@@ -30,66 +35,63 @@ import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import backtype.storm.serialization.DefaultKryoFactory;
-import backtype.storm.serialization.DefaultSerializationDelegate;
-
-import com.esotericsoftware.kryo.Kryo;
-import com.esotericsoftware.kryo.io.Input;
-import com.esotericsoftware.kryo.io.Output;
-import com.google.common.io.ByteArrayDataInput;
-import com.google.common.io.ByteArrayDataOutput;
-import com.google.common.io.ByteStreams;
+import java.io.IOException;
+import java.util.BitSet;
 
 
 public class PartitionedEventSerializerTest {
     private final static Logger LOG = LoggerFactory.getLogger(PartitionedEventSerializerTest.class);
+
     @SuppressWarnings("deprecation")
     @Test
     public void testPartitionEventSerialization() throws IOException {
-        PartitionedEvent partitionedEvent = MockSampleMetadataFactory.createPartitionedEventGroupedByName("sampleStream",System.currentTimeMillis());;
+        PartitionedEvent partitionedEvent = MockSampleMetadataFactory.createPartitionedEventGroupedByName("sampleStream", System.currentTimeMillis());
+        ;
         PartitionedEventSerializerImpl serializer = new PartitionedEventSerializerImpl(MockSampleMetadataFactory::createSampleStreamDefinition);
 
         ByteArrayDataOutput dataOutput1 = ByteStreams.newDataOutput();
-        serializer.serialize(partitionedEvent,dataOutput1);
+        serializer.serialize(partitionedEvent, dataOutput1);
         byte[] serializedBytes = dataOutput1.toByteArray();
         PartitionedEvent deserializedEvent = serializer.deserialize(ByteStreams.newDataInput(serializedBytes));
-        Assert.assertEquals(partitionedEvent,deserializedEvent);
+        Assert.assertEquals(partitionedEvent, deserializedEvent);
 
-        PartitionedEventSerializerImpl compressSerializer = new PartitionedEventSerializerImpl(MockSampleMetadataFactory::createSampleStreamDefinition,true);
+        PartitionedEventSerializerImpl compressSerializer = new PartitionedEventSerializerImpl(MockSampleMetadataFactory::createSampleStreamDefinition, true);
 
         byte[] serializedBytesCompressed = compressSerializer.serialize(partitionedEvent);
         PartitionedEvent deserializedEventCompressed = compressSerializer.deserialize(serializedBytesCompressed);
-        Assert.assertEquals(partitionedEvent,deserializedEventCompressed);
+        Assert.assertEquals(partitionedEvent, deserializedEventCompressed);
 
         PartitionedEventDigestSerializer serializer2 = new PartitionedEventDigestSerializer(MockSampleMetadataFactory::createSampleStreamDefinition);
         ByteArrayDataOutput dataOutput2 = ByteStreams.newDataOutput();
-        serializer2.serialize(partitionedEvent,dataOutput2);
+        serializer2.serialize(partitionedEvent, dataOutput2);
         byte[] serializedBytes2 = dataOutput2.toByteArray();
         ByteArrayDataInput dataInput2 = ByteStreams.newDataInput(serializedBytes2);
         PartitionedEvent deserializedEvent2 = serializer2.deserialize(dataInput2);
-        Assert.assertEquals(partitionedEvent,deserializedEvent2);
+        Assert.assertEquals(partitionedEvent, deserializedEvent2);
 
         byte[] javaSerialization = new DefaultSerializationDelegate().serialize(partitionedEvent);
         Kryo kryo = new DefaultKryoFactory.KryoSerializableDefault();
         Output output = new Output(10000);
-        kryo.writeClassAndObject(output,partitionedEvent);
+        kryo.writeClassAndObject(output, partitionedEvent);
         byte[] kryoBytes = output.toBytes();
         Input input = new Input(kryoBytes);
         PartitionedEvent kryoDeserializedEvent = (PartitionedEvent) kryo.readClassAndObject(input);
-        Assert.assertEquals(partitionedEvent,kryoDeserializedEvent);
-        LOG.info("\nCached Stream:{}\nCompressed Cached Stream :{}\nCached Stream + Cached Partition: {}\nJava Native: {}\nKryo: {}\nKryo + Cached Stream: {}\nKryo + Cached Stream + Cached Partition: {}",serializedBytes.length,serializedBytesCompressed.length,serializedBytes2.length,javaSerialization.length,kryoBytes.length,kryoSerialize(serializedBytes).length,kryoSerialize(serializedBytes2).length);
+        Assert.assertEquals(partitionedEvent, kryoDeserializedEvent);
+        LOG.info("\nCached Stream:{}\nCompressed Cached Stream :{}\nCached Stream + Cached Partition: {}\nJava Native: {}\nKryo: {}\nKryo + Cached Stream: {}\nKryo + Cached Stream + Cached Partition: {}", serializedBytes.length, serializedBytesCompressed.length, serializedBytes2.length, javaSerialization.length, kryoBytes.length, kryoSerialize(serializedBytes).length, kryoSerialize(serializedBytes2).length);
     }
+
     @SuppressWarnings("deprecation")
     @Test
     public void testPartitionEventSerializationEfficiency() throws IOException {
-        PartitionedEvent partitionedEvent = MockSampleMetadataFactory.createPartitionedEventGroupedByName("sampleStream",System.currentTimeMillis());;
+        PartitionedEvent partitionedEvent = MockSampleMetadataFactory.createPartitionedEventGroupedByName("sampleStream", System.currentTimeMillis());
+        ;
         PartitionedEventSerializerImpl serializer = new PartitionedEventSerializerImpl(MockSampleMetadataFactory::createSampleStreamDefinition);
 
         int count = 100000;
         StopWatch stopWatch = new StopWatch();
         stopWatch.start();
         int i = 0;
-        while(i<count) {
+        while (i < count) {
             ByteArrayDataOutput dataOutput1 = ByteStreams.newDataOutput();
             serializer.serialize(partitionedEvent, dataOutput1);
             byte[] serializedBytes = dataOutput1.toByteArray();
@@ -98,24 +100,24 @@ public class PartitionedEventSerializerTest {
             i++;
         }
         stopWatch.stop();
-        LOG.info("Cached Stream: {} ms",stopWatch.getTime());
+        LOG.info("Cached Stream: {} ms", stopWatch.getTime());
         stopWatch.reset();
-        PartitionedEventSerializerImpl compressSerializer = new PartitionedEventSerializerImpl(MockSampleMetadataFactory::createSampleStreamDefinition,true);
+        PartitionedEventSerializerImpl compressSerializer = new PartitionedEventSerializerImpl(MockSampleMetadataFactory::createSampleStreamDefinition, true);
         i = 0;
         stopWatch.start();
-        while(i<count) {
+        while (i < count) {
             byte[] serializedBytesCompressed = compressSerializer.serialize(partitionedEvent);
             PartitionedEvent deserializedEventCompressed = compressSerializer.deserialize(serializedBytesCompressed);
             Assert.assertEquals(partitionedEvent, deserializedEventCompressed);
             i++;
         }
         stopWatch.stop();
-        LOG.info("Compressed Cached Stream: {} ms",stopWatch.getTime());
+        LOG.info("Compressed Cached Stream: {} ms", stopWatch.getTime());
         stopWatch.reset();
 
         i = 0;
         stopWatch.start();
-        while(i<count) {
+        while (i < count) {
             PartitionedEventDigestSerializer serializer2 = new PartitionedEventDigestSerializer(MockSampleMetadataFactory::createSampleStreamDefinition);
             ByteArrayDataOutput dataOutput2 = ByteStreams.newDataOutput();
             serializer2.serialize(partitionedEvent, dataOutput2);
@@ -126,23 +128,23 @@ public class PartitionedEventSerializerTest {
             i++;
         }
         stopWatch.stop();
-        LOG.info("Cached Stream&Partition: {} ms",stopWatch.getTime());
+        LOG.info("Cached Stream&Partition: {} ms", stopWatch.getTime());
         stopWatch.reset();
         i = 0;
         stopWatch.start();
-        while(i<count) {
+        while (i < count) {
             byte[] javaSerialization = new DefaultSerializationDelegate().serialize(partitionedEvent);
             PartitionedEvent javaSerializedEvent = (PartitionedEvent) new DefaultSerializationDelegate().deserialize(javaSerialization);
             Assert.assertEquals(partitionedEvent, javaSerializedEvent);
             i++;
         }
         stopWatch.stop();
-        LOG.info("Java Native: {} ms",stopWatch.getTime());
+        LOG.info("Java Native: {} ms", stopWatch.getTime());
         stopWatch.reset();
         i = 0;
         stopWatch.start();
         Kryo kryo = new DefaultKryoFactory.KryoSerializableDefault();
-        while(i<count) {
+        while (i < count) {
             Output output = new Output(10000);
             kryo.writeClassAndObject(output, partitionedEvent);
             byte[] kryoBytes = output.toBytes();
@@ -152,73 +154,73 @@ public class PartitionedEventSerializerTest {
             i++;
         }
         stopWatch.stop();
-        LOG.info("Kryo: {} ms",stopWatch.getTime());
+        LOG.info("Kryo: {} ms", stopWatch.getTime());
     }
 
     /**
      * Kryo Serialization Length = Length of byte[] + 2
      */
     @Test
-    public void testKryoByteArraySerialization(){
+    public void testKryoByteArraySerialization() {
         Kryo kryo = new DefaultKryoFactory.KryoSerializableDefault();
-        byte[] bytes = new byte[]{0,1,2,3,4,5,6,7,8,9};
+        byte[] bytes = new byte[] {0, 1, 2, 3, 4, 5, 6, 7, 8, 9};
         Output output = new Output(1000);
-        kryo.writeObject(output,bytes);
-        Assert.assertEquals(bytes.length + 2,output.toBytes().length);
+        kryo.writeObject(output, bytes);
+        Assert.assertEquals(bytes.length + 2, output.toBytes().length);
     }
 
-    private byte[] kryoSerialize(Object object){
+    private byte[] kryoSerialize(Object object) {
         Kryo kryo = new DefaultKryoFactory.KryoSerializableDefault();
         Output output = new Output(100000);
-        kryo.writeClassAndObject(output,object);
+        kryo.writeClassAndObject(output, object);
         return output.toBytes();
     }
 
     @Test
-    public void testBitSet(){
+    public void testBitSet() {
         BitSet bitSet = new BitSet();
-        bitSet.set(0,true); // 1
-        bitSet.set(1,false); // 0
-        bitSet.set(2,true); // 1
-        LOG.info("Bit Set Size: {}",bitSet.size());
-        LOG.info("Bit Set Byte[]: {}",bitSet.toByteArray());
-        LOG.info("Bit Set Byte[]: {}",bitSet.toLongArray());
-        LOG.info("BitSet[0]: {}",bitSet.get(0));
-        LOG.info("BitSet[1]: {}",bitSet.get(1));
-        LOG.info("BitSet[1]: {}",bitSet.get(2));
+        bitSet.set(0, true); // 1
+        bitSet.set(1, false); // 0
+        bitSet.set(2, true); // 1
+        LOG.info("Bit Set Size: {}", bitSet.size());
+        LOG.info("Bit Set Byte[]: {}", bitSet.toByteArray());
+        LOG.info("Bit Set Byte[]: {}", bitSet.toLongArray());
+        LOG.info("BitSet[0]: {}", bitSet.get(0));
+        LOG.info("BitSet[1]: {}", bitSet.get(1));
+        LOG.info("BitSet[1]: {}", bitSet.get(2));
 
         byte[] bytes = bitSet.toByteArray();
 
         BitSet bitSet2 = BitSet.valueOf(bytes);
 
-        LOG.info("Bit Set Size: {}",bitSet2.size());
-        LOG.info("Bit Set Byte[]: {}",bitSet2.toByteArray());
-        LOG.info("Bit Set Byte[]: {}",bitSet2.toLongArray());
-        LOG.info("BitSet[0]: {}",bitSet2.get(0));
-        LOG.info("BitSet[1]: {}",bitSet2.get(1));
-        LOG.info("BitSet[1]: {}",bitSet2.get(2));
+        LOG.info("Bit Set Size: {}", bitSet2.size());
+        LOG.info("Bit Set Byte[]: {}", bitSet2.toByteArray());
+        LOG.info("Bit Set Byte[]: {}", bitSet2.toLongArray());
+        LOG.info("BitSet[0]: {}", bitSet2.get(0));
+        LOG.info("BitSet[1]: {}", bitSet2.get(1));
+        LOG.info("BitSet[1]: {}", bitSet2.get(2));
 
 
         BitSet bitSet3 = new BitSet();
-        bitSet3.set(0,true);
-        Assert.assertEquals(1,bitSet3.length());
+        bitSet3.set(0, true);
+        Assert.assertEquals(1, bitSet3.length());
 
         BitSet bitSet4 = new BitSet();
-        bitSet4.set(0,false);
-        Assert.assertEquals(0,bitSet4.length());
+        bitSet4.set(0, false);
+        Assert.assertEquals(0, bitSet4.length());
         Assert.assertFalse(bitSet4.get(1));
         Assert.assertFalse(bitSet4.get(2));
     }
 
     @Test
-    public void testPeriod(){
-        Assert.assertEquals(30*60*1000, TimePeriodUtils.getMillisecondsOfPeriod(Period.parse("PT30m")));
-        Assert.assertEquals(30*60*1000, TimePeriodUtils.getMillisecondsOfPeriod(Period.millis(30*60*1000)));
-        Assert.assertEquals("PT1800S", Period.millis(30*60*1000).toString());
+    public void testPeriod() {
+        Assert.assertEquals(30 * 60 * 1000, TimePeriodUtils.getMillisecondsOfPeriod(Period.parse("PT30m")));
+        Assert.assertEquals(30 * 60 * 1000, TimePeriodUtils.getMillisecondsOfPeriod(Period.millis(30 * 60 * 1000)));
+        Assert.assertEquals("PT1800S", Period.millis(30 * 60 * 1000).toString());
     }
 
     @Test
-    public void testPartitionType(){
+    public void testPartitionType() {
 
     }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a0fc8708/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/siddhi/SiddhiPolicyTest.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/siddhi/SiddhiPolicyTest.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/siddhi/SiddhiPolicyTest.java
index 3d373b6..9520b62 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/siddhi/SiddhiPolicyTest.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/siddhi/SiddhiPolicyTest.java
@@ -32,32 +32,31 @@ import java.util.concurrent.atomic.AtomicBoolean;
 
 /**
  * @since Jun 21, 2016
- *
  */
 public class SiddhiPolicyTest {
-    
+
     private static final Logger LOG = LoggerFactory.getLogger(SiddhiPolicyTest.class);
 
     private String streams = " define stream syslog_stream("
-            + "dims_facility string, "
-            + "dims_severity string, "
-            + "dims_hostname string, "
-            + "dims_msgid string, "
-            + "timestamp string, "
-            + "conn string, "
-            + "op string, "
-            + "msgId string, "
-            + "command string, "
-            + "name string, "
-            + "namespace string, "
-            + "epochMillis long); ";
+        + "dims_facility string, "
+        + "dims_severity string, "
+        + "dims_hostname string, "
+        + "dims_msgid string, "
+        + "timestamp string, "
+        + "conn string, "
+        + "op string, "
+        + "msgId string, "
+        + "command string, "
+        + "name string, "
+        + "namespace string, "
+        + "epochMillis long); ";
     private SiddhiManager sm;
-    
+
     @Before
     public void setup() {
         sm = new SiddhiManager();
     }
-    
+
     @After
     public void shutdown() {
         sm.shutdown();
@@ -70,7 +69,9 @@ public class SiddhiPolicyTest {
             @Override
             public void receive(Event[] arg0) {
 
-            };
+            }
+
+            ;
         };
 
         String executionPlan = streams + ql;
@@ -83,13 +84,13 @@ public class SiddhiPolicyTest {
     @Test
     public void testPolicy_agg() throws Exception {
         String sql = " from syslog_stream#window.time(1min) select "
-                + "name, "
-                + "namespace, "
-                + "timestamp, "
-                + "dims_hostname, "
-                + "count(*) as abortCount "
-                + "group by dims_hostname "
-                + "having abortCount > 3 insert into syslog_severity_check_output; ";
+            + "name, "
+            + "namespace, "
+            + "timestamp, "
+            + "dims_hostname, "
+            + "count(*) as abortCount "
+            + "group by dims_hostname "
+            + "having abortCount > 3 insert into syslog_severity_check_output; ";
 
         final AtomicBoolean checked = new AtomicBoolean(false);
         StreamCallback sc = new StreamCallback() {
@@ -107,7 +108,9 @@ public class SiddhiPolicyTest {
                 Assert.assertTrue(hosts.contains("HOSTNAME-" + 1));
                 Assert.assertTrue(hosts.contains("HOSTNAME-" + 2));
                 Assert.assertFalse(hosts.contains("HOSTNAME-" + 3));
-            };
+            }
+
+            ;
         };
 
         String executionPlan = streams + sql;
@@ -124,7 +127,7 @@ public class SiddhiPolicyTest {
 
         runtime.shutdown();
     }
-    
+
     /*
         + "dims_facility string, "
         + "dims_severity string, "
@@ -145,8 +148,8 @@ public class SiddhiPolicyTest {
         for (int i = 0; i < length; i++) {
             Event e = new Event(12);
             e.setTimestamp(System.currentTimeMillis());
-            e.setData(new Object[] {"facitliy", "SEVERITY_EMERG", "HOSTNAME-" + i%4 , "MSGID-...", "Timestamp", "conn-sss", "op-msg-Abort", "msgId..", "command-...", "name-", "namespace", System.currentTimeMillis()});
-            
+            e.setData(new Object[] {"facitliy", "SEVERITY_EMERG", "HOSTNAME-" + i % 4, "MSGID-...", "Timestamp", "conn-sss", "op-msg-Abort", "msgId..", "command-...", "name-", "namespace", System.currentTimeMillis()});
+
             events[i] = e;
         }
 
@@ -156,7 +159,7 @@ public class SiddhiPolicyTest {
 
         Event e = new Event(12);
         e.setTimestamp(System.currentTimeMillis());
-        e.setData(new Object[] {"facitliy", "SEVERITY_EMERG", "HOSTNAME-" + 11 , "MSGID-...", "Timestamp", "conn-sss", "op-msg", "msgId..", "command-...", "name-", "namespace", System.currentTimeMillis()});
+        e.setData(new Object[] {"facitliy", "SEVERITY_EMERG", "HOSTNAME-" + 11, "MSGID-...", "Timestamp", "conn-sss", "op-msg", "msgId..", "command-...", "name-", "namespace", System.currentTimeMillis()});
         handler.send(e);
     }
 
@@ -164,28 +167,30 @@ public class SiddhiPolicyTest {
     @Test
     public void testPolicy_regex() throws Exception {
         String sql = " from syslog_stream[regex:find(\"Abort\", op)]#window.time(1min) select timestamp, dims_hostname, count(*) as abortCount group by dims_hostname insert into syslog_severity_check_output; ";
-        
+
         AtomicBoolean checked = new AtomicBoolean();
         StreamCallback sc = new StreamCallback() {
             @Override
             public void receive(Event[] arg0) {
                 checked.set(true);
-            };
+            }
+
+            ;
         };
 
         String executionPlan = streams + sql;
         ExecutionPlanRuntime runtime = sm.createExecutionPlanRuntime(executionPlan);
-        runtime.addCallback("syslog_severity_check_output", sc); 
+        runtime.addCallback("syslog_severity_check_output", sc);
         runtime.start();
-        
+
         InputHandler handler = runtime.getInputHandler("syslog_stream");
-        
+
         sendInput(handler);
-        
+
         Thread.sleep(1000);
-        
+
         Assert.assertTrue(checked.get());
-        
+
         runtime.shutdown();
     }
 
@@ -193,17 +198,19 @@ public class SiddhiPolicyTest {
     @Test
     public void testPolicy_seq() throws Exception {
         String sql = ""
-                + " from every e1=syslog_stream[regex:find(\"UPDOWN\", op)] -> "
-                + " e2=syslog_stream[dims_hostname == e1.dims_hostname and regex:find(\"Abort\", op)] within 1 min "
-                + " select e1.timestamp as timestamp, e1.op as a_op, e2.op as b_op "
-                + " insert into syslog_severity_check_output; ";
+            + " from every e1=syslog_stream[regex:find(\"UPDOWN\", op)] -> "
+            + " e2=syslog_stream[dims_hostname == e1.dims_hostname and regex:find(\"Abort\", op)] within 1 min "
+            + " select e1.timestamp as timestamp, e1.op as a_op, e2.op as b_op "
+            + " insert into syslog_severity_check_output; ";
 
         AtomicBoolean checked = new AtomicBoolean();
         StreamCallback sc = new StreamCallback() {
             @Override
             public void receive(Event[] arg0) {
                 checked.set(true);
-            };
+            }
+
+            ;
         };
 
         String executionPlan = streams + sql;
@@ -224,21 +231,21 @@ public class SiddhiPolicyTest {
         // validate one
         Event e = new Event(12);
         e.setTimestamp(System.currentTimeMillis());
-        e.setData(new Object[] {"facitliy", "SEVERITY_EMERG", "HOSTNAME-" + 0 , "MSGID-...", "Timestamp", "conn-sss", "op-msg-UPDOWN", "msgId..", "command-...", "name-", "namespace", System.currentTimeMillis()});
-            
+        e.setData(new Object[] {"facitliy", "SEVERITY_EMERG", "HOSTNAME-" + 0, "MSGID-...", "Timestamp", "conn-sss", "op-msg-UPDOWN", "msgId..", "command-...", "name-", "namespace", System.currentTimeMillis()});
+
         e = new Event(12);
         e.setTimestamp(System.currentTimeMillis());
-        e.setData(new Object[] {"facitliy", "SEVERITY_EMERG", "HOSTNAME-" + 0 , "MSGID-...", "Timestamp", "conn-sss", "op-msg-nothing", "msgId..", "command-...", "name-", "namespace", System.currentTimeMillis()});
-        
+        e.setData(new Object[] {"facitliy", "SEVERITY_EMERG", "HOSTNAME-" + 0, "MSGID-...", "Timestamp", "conn-sss", "op-msg-nothing", "msgId..", "command-...", "name-", "namespace", System.currentTimeMillis()});
+
         e = new Event(12);
         e.setTimestamp(System.currentTimeMillis());
-        e.setData(new Object[] {"facitliy", "SEVERITY_EMERG", "HOSTNAME-" + 0 , "MSGID-...", "Timestamp", "conn-sss", "op-msg-Abort", "msgId..", "command-...", "name-", "namespace", System.currentTimeMillis()});
+        e.setData(new Object[] {"facitliy", "SEVERITY_EMERG", "HOSTNAME-" + 0, "MSGID-...", "Timestamp", "conn-sss", "op-msg-Abort", "msgId..", "command-...", "name-", "namespace", System.currentTimeMillis()});
 
         Thread.sleep(61 * 1000);
 
         e = new Event(12);
         e.setTimestamp(System.currentTimeMillis());
-        e.setData(new Object[] {"facitliy", "SEVERITY_EMERG", "HOSTNAME-" + 11 , "MSGID-...", "Timestamp", "conn-sss", "op-msg", "msgId..", "command-...", "name-", "namespace", System.currentTimeMillis()});
+        e.setData(new Object[] {"facitliy", "SEVERITY_EMERG", "HOSTNAME-" + 11, "MSGID-...", "Timestamp", "conn-sss", "op-msg", "msgId..", "command-...", "name-", "namespace", System.currentTimeMillis()});
         handler.send(e);
     }
 
@@ -246,7 +253,7 @@ public class SiddhiPolicyTest {
     @Test
     public void testStrConcat() throws Exception {
         String ql = " define stream log(timestamp long, switchLabel string, port string, message string); " +
-                " from log select timestamp, str:concat(switchLabel, '===', port) as alertKey, message insert into output; ";
+            " from log select timestamp, str:concat(switchLabel, '===', port) as alertKey, message insert into output; ";
         SiddhiManager manager = new SiddhiManager();
         ExecutionPlanRuntime runtime = manager.createExecutionPlanRuntime(ql);
         runtime.addCallback("output", new StreamCallback() {

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a0fc8708/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/siddhi/extension/AttributeCollectAggregatorTest.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/siddhi/extension/AttributeCollectAggregatorTest.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/siddhi/extension/AttributeCollectAggregatorTest.java
index f2f3b46..7694623 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/siddhi/extension/AttributeCollectAggregatorTest.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/siddhi/extension/AttributeCollectAggregatorTest.java
@@ -18,10 +18,6 @@
  */
 package org.apache.eagle.alert.engine.siddhi.extension;
 
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Random;
-
 import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -31,9 +27,12 @@ import org.wso2.siddhi.core.event.Event;
 import org.wso2.siddhi.core.stream.input.InputHandler;
 import org.wso2.siddhi.core.stream.output.StreamCallback;
 
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Random;
+
 /**
  * @since Apr 1, 2016
- *
  */
 public class AttributeCollectAggregatorTest {
 
@@ -85,59 +84,59 @@ public class AttributeCollectAggregatorTest {
         Event e = null;
         long base = System.currentTimeMillis();
         {
-            e = new Event(base, new Object[] { base, "host" + r.nextInt(), "nova" });
+            e = new Event(base, new Object[] {base, "host" + r.nextInt(), "nova"});
             base += 100;
             events.add(e);
-            e = new Event(base, new Object[] { base, "host" + r.nextInt(), "nova" });
+            e = new Event(base, new Object[] {base, "host" + r.nextInt(), "nova"});
             base += 100;
             events.add(e);
-            e = new Event(base, new Object[] { base, "host" + r.nextInt(), "nova" });
+            e = new Event(base, new Object[] {base, "host" + r.nextInt(), "nova"});
             base += 100;
             events.add(e);
         }
 
         {
-            e = new Event(base, new Object[] { base, "host" + r.nextInt(), "neutron" });
+            e = new Event(base, new Object[] {base, "host" + r.nextInt(), "neutron"});
             base += 100;
             events.add(e);
-            e = new Event(base, new Object[] { base, "host" + r.nextInt(), "neutron" });
+            e = new Event(base, new Object[] {base, "host" + r.nextInt(), "neutron"});
             base += 100;
             events.add(e);
-            e = new Event(base, new Object[] { base, "host" + r.nextInt(), "neutron" });
+            e = new Event(base, new Object[] {base, "host" + r.nextInt(), "neutron"});
             base += 100;
             events.add(e);
         }
 
         base += 10000;
         {
-            e = new Event(base, new Object[] { base, "host" + r.nextInt(), "nova1" });
+            e = new Event(base, new Object[] {base, "host" + r.nextInt(), "nova1"});
             base += 100;
             events.add(e);
-            e = new Event(base, new Object[] { base, "host" + r.nextInt(), "nova1" });
+            e = new Event(base, new Object[] {base, "host" + r.nextInt(), "nova1"});
             base += 100;
             events.add(e);
-            e = new Event(base, new Object[] { base, "host" + r.nextInt(), "nova1" });
+            e = new Event(base, new Object[] {base, "host" + r.nextInt(), "nova1"});
             base += 100;
             events.add(e);
         }
 
         {
-            e = new Event(base, new Object[] { base, "host" + r.nextInt(), "neutron2" });
+            e = new Event(base, new Object[] {base, "host" + r.nextInt(), "neutron2"});
             base += 100;
             events.add(e);
-            e = new Event(base, new Object[] { base, "host" + r.nextInt(), "neutron2" });
+            e = new Event(base, new Object[] {base, "host" + r.nextInt(), "neutron2"});
             base += 100;
             events.add(e);
-            e = new Event(base, new Object[] { base, "host" + r.nextInt(), "neutron2" });
+            e = new Event(base, new Object[] {base, "host" + r.nextInt(), "neutron2"});
             base += 100;
             events.add(e);
         }
         base += 10000;
-        e = new Event(base, new Object[] { base, "host" + r.nextInt(), "mq" });
+        e = new Event(base, new Object[] {base, "host" + r.nextInt(), "mq"});
 
         return events.toArray(new Event[0]);
     }
-    
+
     @Test
     public void testQuery() {
         String ql = "define stream perfmon_input_stream_cpu ( host string,timestamp long,metric string,pool string,value double,colo string );";