You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@eagle.apache.org by ha...@apache.org on 2016/09/08 07:14:05 UTC

[04/18] incubator-eagle git commit: [EAGLE-530] Fix checkstyle problems on eagle-alert module and enable failOnViolation

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a0fc8708/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/sorter/MapDBTestSuite.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/sorter/MapDBTestSuite.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/sorter/MapDBTestSuite.java
index 613be00..ebec509 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/sorter/MapDBTestSuite.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/sorter/MapDBTestSuite.java
@@ -25,19 +25,19 @@ import org.mapdb.Serializer;
 
 public class MapDBTestSuite {
     @Test
-    public void testOnHeapDB(){
+    public void testOnHeapDB() {
         DB db = DBMaker.heapDB().make();
-        BTreeMap<Long,String> map = db.treeMap("btree").keySerializer(Serializer.LONG).valueSerializer(Serializer.STRING).create();
-        Assert.assertFalse(map.putIfAbsentBoolean(1L,"val_1"));
-        Assert.assertTrue(map.putIfAbsentBoolean(1L,"val_2"));
-        Assert.assertTrue(map.putIfAbsentBoolean(1L,"val_3"));
-        Assert.assertFalse(map.putIfAbsentBoolean(2L,"val_4"));
+        BTreeMap<Long, String> map = db.treeMap("btree").keySerializer(Serializer.LONG).valueSerializer(Serializer.STRING).create();
+        Assert.assertFalse(map.putIfAbsentBoolean(1L, "val_1"));
+        Assert.assertTrue(map.putIfAbsentBoolean(1L, "val_2"));
+        Assert.assertTrue(map.putIfAbsentBoolean(1L, "val_3"));
+        Assert.assertFalse(map.putIfAbsentBoolean(2L, "val_4"));
 
-        Assert.assertEquals("val_1",map.get(1L));
-        Assert.assertEquals("val_4",map.get(2L));
+        Assert.assertEquals("val_1", map.get(1L));
+        Assert.assertEquals("val_4", map.get(2L));
 
-        Assert.assertTrue(map.replace(2L,"val_4","val_5"));
-        Assert.assertEquals("val_5",map.get(2L));
+        Assert.assertTrue(map.replace(2L, "val_4", "val_5"));
+        Assert.assertEquals("val_5", map.get(2L));
 
         map.close();
         db.close();

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a0fc8708/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/sorter/StreamSortHandlerTest.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/sorter/StreamSortHandlerTest.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/sorter/StreamSortHandlerTest.java
index 6cadba7..98657e7 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/sorter/StreamSortHandlerTest.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/sorter/StreamSortHandlerTest.java
@@ -16,14 +16,12 @@
  */
 package org.apache.eagle.alert.engine.sorter;
 
-import java.lang.management.ManagementFactory;
-import java.util.ArrayList;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Timer;
-import java.util.TimerTask;
-import java.util.concurrent.TimeUnit;
-
+import com.codahale.metrics.MetricRegistry;
+import com.codahale.metrics.ScheduledReporter;
+import com.codahale.metrics.Slf4jReporter;
+import com.codahale.metrics.jvm.GarbageCollectorMetricSet;
+import com.codahale.metrics.jvm.MemoryUsageGaugeSet;
+import com.google.common.collect.Ordering;
 import org.apache.commons.lang.time.StopWatch;
 import org.apache.eagle.alert.engine.mock.MockPartitionedCollector;
 import org.apache.eagle.alert.engine.mock.MockSampleMetadataFactory;
@@ -38,12 +36,9 @@ import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.codahale.metrics.MetricRegistry;
-import com.codahale.metrics.ScheduledReporter;
-import com.codahale.metrics.Slf4jReporter;
-import com.codahale.metrics.jvm.GarbageCollectorMetricSet;
-import com.codahale.metrics.jvm.MemoryUsageGaugeSet;
-import com.google.common.collect.Ordering;
+import java.lang.management.ManagementFactory;
+import java.util.*;
+import java.util.concurrent.TimeUnit;
 
 /**
  * -XX:+UseConcMarkSweepGC -XX:+UseParNewGC -XX:+PrintGCTaskTimeStamps -XX:+PrintGCDetails -verbose:gc
@@ -56,23 +51,24 @@ public class StreamSortHandlerTest {
     }
 
     private ScheduledReporter metricReporter;
+
     @Before
-    public void setUp(){
+    public void setUp() {
         final MetricRegistry metrics = new MetricRegistry();
         metrics.registerAll(new MemoryUsageGaugeSet());
         metrics.registerAll(new GarbageCollectorMetricSet());
         metricReporter = Slf4jReporter.forRegistry(metrics)
-                .filter((name, metric) -> name.matches("(.*heap|pools.PS.*).usage"))
-                .withLoggingLevel(Slf4jReporter.LoggingLevel.DEBUG)
-                .convertRatesTo(TimeUnit.SECONDS)
-                .convertDurationsTo(TimeUnit.MILLISECONDS)
-                .build();
-        metricReporter.start(60,TimeUnit.SECONDS);
+            .filter((name, metric) -> name.matches("(.*heap|pools.PS.*).usage"))
+            .withLoggingLevel(Slf4jReporter.LoggingLevel.DEBUG)
+            .convertRatesTo(TimeUnit.SECONDS)
+            .convertDurationsTo(TimeUnit.MILLISECONDS)
+            .build();
+        metricReporter.start(60, TimeUnit.SECONDS);
     }
 
     /**
      * Used to debug window bucket lifecycle
-     *
+     * <p>
      * Window period: PT1s, margin: 5s
      *
      * @throws InterruptedException
@@ -83,18 +79,18 @@ public class StreamSortHandlerTest {
         StreamTimeClockInLocalMemory timeClock = new StreamTimeClockInLocalMemory("sampleStream_1");
         Ordering<PartitionedEvent> timeOrdering = Ordering.from(PartitionedEventTimeOrderingComparator.INSTANCE);
         StreamSortWindowHandlerImpl sortHandler = new StreamSortWindowHandlerImpl();
-        sortHandler.prepare("sampleStream_1", MockSampleMetadataFactory.createSampleStreamSortSpec("sampleStream_1", "PT1m",5000),mockCollector);
+        sortHandler.prepare("sampleStream_1", MockSampleMetadataFactory.createSampleStreamSortSpec("sampleStream_1", "PT1m", 5000), mockCollector);
         List<PartitionedEvent> unsortedList = new LinkedList<>();
 
         int i = 0;
-        while(i<1000) {
+        while (i < 1000) {
             PartitionedEvent event = MockSampleMetadataFactory.createRandomOutOfTimeOrderEventGroupedByName("sampleStream_1");
             sortHandler.nextEvent(event);
             unsortedList.add(event);
-            if(event.getTimestamp()>timeClock.getTime()) {
+            if (event.getTimestamp() > timeClock.getTime()) {
                 timeClock.moveForward(event.getTimestamp());
             }
-            sortHandler.onTick(timeClock,System.currentTimeMillis());
+            sortHandler.onTick(timeClock, System.currentTimeMillis());
             i++;
         }
         sortHandler.close();
@@ -114,24 +110,24 @@ public class StreamSortHandlerTest {
         StreamTimeClockInLocalMemory timeClock = new StreamTimeClockInLocalMemory("sampleStream_1");
         Ordering<PartitionedEvent> timeOrdering = Ordering.from(PartitionedEventTimeOrderingComparator.INSTANCE);
         StreamSortWindowHandlerImpl sortHandler = new StreamSortWindowHandlerImpl();
-        sortHandler.prepare("sampleStream_1", MockSampleMetadataFactory.createSampleStreamSortSpec("sampleStream_1", "PT1h",5000),mockCollector);
+        sortHandler.prepare("sampleStream_1", MockSampleMetadataFactory.createSampleStreamSortSpec("sampleStream_1", "PT1h", 5000), mockCollector);
         List<PartitionedEvent> sortedList = new LinkedList<>();
 
         int i = 0;
-        while(i<1000000) {
-            PartitionedEvent event = MockSampleMetadataFactory.createRandomPartitionedEvent("sampleStream_1",System.currentTimeMillis()+i);
+        while (i < 1000000) {
+            PartitionedEvent event = MockSampleMetadataFactory.createRandomPartitionedEvent("sampleStream_1", System.currentTimeMillis() + i);
             sortHandler.nextEvent(event);
             sortedList.add(event);
-            if(event.getTimestamp()>timeClock.getTime()) {
+            if (event.getTimestamp() > timeClock.getTime()) {
                 timeClock.moveForward(event.getTimestamp());
             }
-            sortHandler.onTick(timeClock,System.currentTimeMillis());
+            sortHandler.onTick(timeClock, System.currentTimeMillis());
             i++;
         }
         sortHandler.close();
         Assert.assertTrue(timeOrdering.isOrdered(sortedList));
         Assert.assertTrue(timeOrdering.isOrdered(mockCollector.get()));
-        Assert.assertEquals(1000000,mockCollector.get().size());
+        Assert.assertEquals(1000000, mockCollector.get().size());
     }
 
     /**
@@ -159,33 +155,33 @@ public class StreamSortHandlerTest {
         StreamTimeClockInLocalMemory timeClock = new StreamTimeClockInLocalMemory("sampleStream_1");
         Ordering<PartitionedEvent> timeOrdering = Ordering.from(PartitionedEventTimeOrderingComparator.INSTANCE);
         StreamSortWindowHandlerImpl sortHandler = new StreamSortWindowHandlerImpl();
-        sortHandler.prepare("sampleStream_1", MockSampleMetadataFactory.createSampleStreamSortSpec("sampleStream_1", "PT1h",5000),mockCollector);
+        sortHandler.prepare("sampleStream_1", MockSampleMetadataFactory.createSampleStreamSortSpec("sampleStream_1", "PT1h", 5000), mockCollector);
         List<PartitionedEvent> unsortedList = new LinkedList<>();
 
         StopWatch stopWatch = new StopWatch();
         stopWatch.start();
         int i = 0;
-        while(i<count) {
+        while (i < count) {
             PartitionedEvent event = MockSampleMetadataFactory.createRandomOutOfTimeOrderEventGroupedByName("sampleStream_1");
             sortHandler.nextEvent(event);
             unsortedList.add(event);
-            if(event.getEvent().getTimestamp()>timeClock.getTime()) {
+            if (event.getEvent().getTimestamp() > timeClock.getTime()) {
                 timeClock.moveForward(event.getEvent().getTimestamp());
             }
-            sortHandler.onTick(timeClock,System.currentTimeMillis());
+            sortHandler.onTick(timeClock, System.currentTimeMillis());
             i++;
         }
         stopWatch.stop();
-        LOG.info("Produced {} events in {} ms",count,stopWatch.getTime());
+        LOG.info("Produced {} events in {} ms", count, stopWatch.getTime());
         sortHandler.close();
         Assert.assertFalse(timeOrdering.isOrdered(unsortedList));
         Assert.assertTrue(timeOrdering.isOrdered(mockCollector.get()));
-        Assert.assertTrue(mockCollector.get().size()>=0);
+        Assert.assertTrue(mockCollector.get().size() >= 0);
     }
 
     /**
      * Used to debug window bucket lifecycle
-     *
+     * <p>
      * Window period: PT1h, margin: 5s
      *
      * @throws InterruptedException
@@ -196,29 +192,29 @@ public class StreamSortHandlerTest {
         StreamTimeClockInLocalMemory timeClock = new StreamTimeClockInLocalMemory("sampleStream_1");
         Ordering<PartitionedEvent> timeOrdering = Ordering.from(PartitionedEventTimeOrderingComparator.INSTANCE);
         StreamSortWindowHandlerImpl sortHandler = new StreamSortWindowHandlerImpl();
-        sortHandler.prepare("sampleStream_1", MockSampleMetadataFactory.createSampleStreamSortSpec("sampleStream_1", "PT1h",5000),mockCollector);
+        sortHandler.prepare("sampleStream_1", MockSampleMetadataFactory.createSampleStreamSortSpec("sampleStream_1", "PT1h", 5000), mockCollector);
         List<PartitionedEvent> sortedList = new LinkedList<>();
 
         int i = 0;
-        while(i<1000000) {
-            PartitionedEvent event = MockSampleMetadataFactory.createRandomPartitionedEvent("sampleStream_1",System.currentTimeMillis()+i);
+        while (i < 1000000) {
+            PartitionedEvent event = MockSampleMetadataFactory.createRandomPartitionedEvent("sampleStream_1", System.currentTimeMillis() + i);
             sortHandler.nextEvent(event);
             sortedList.add(event);
-            if(event.getTimestamp()>timeClock.getTime()) {
+            if (event.getTimestamp() > timeClock.getTime()) {
                 timeClock.moveForward(event.getTimestamp());
             }
-            sortHandler.onTick(timeClock,System.currentTimeMillis());
+            sortHandler.onTick(timeClock, System.currentTimeMillis());
             i++;
         }
         sortHandler.close();
         Assert.assertTrue(timeOrdering.isOrdered(sortedList));
         Assert.assertTrue(timeOrdering.isOrdered(mockCollector.get()));
-        Assert.assertEquals(1000000,mockCollector.get().size());
+        Assert.assertEquals(1000000, mockCollector.get().size());
     }
 
     /**
      * Used to debug window bucket lifecycle
-     *
+     * <p>
      * Window period: PT1h, margin: 5s
      *
      * @throws InterruptedException
@@ -229,26 +225,26 @@ public class StreamSortHandlerTest {
         StreamTimeClockInLocalMemory timeClock = new StreamTimeClockInLocalMemory("sampleStream_1");
         Ordering<PartitionedEvent> timeOrdering = Ordering.from(PartitionedEventTimeOrderingComparator.INSTANCE);
         StreamSortWindowHandlerImpl sortHandler = new StreamSortWindowHandlerImpl();
-        sortHandler.prepare("sampleStream_1", MockSampleMetadataFactory.createSampleStreamSortSpec("sampleStream_1", "PT10s",1000),mockCollector);
+        sortHandler.prepare("sampleStream_1", MockSampleMetadataFactory.createSampleStreamSortSpec("sampleStream_1", "PT10s", 1000), mockCollector);
         List<PartitionedEvent> sortedList = new LinkedList<>();
 
         PartitionedEvent event = MockSampleMetadataFactory.createRandomSortedEventGroupedByName("sampleStream_1");
         sortHandler.nextEvent(event);
         sortedList.add(event);
         timeClock.moveForward(event.getTimestamp());
-        sortHandler.onTick(timeClock,System.currentTimeMillis());
+        sortHandler.onTick(timeClock, System.currentTimeMillis());
 
         // Triggered to become expired by System time
-        sortHandler.onTick(timeClock,System.currentTimeMillis()+10*1000+1000L + 1);
+        sortHandler.onTick(timeClock, System.currentTimeMillis() + 10 * 1000 + 1000L + 1);
 
         Assert.assertTrue(timeOrdering.isOrdered(sortedList));
         Assert.assertTrue(timeOrdering.isOrdered(mockCollector.get()));
-        Assert.assertEquals(1,mockCollector.get().size());
+        Assert.assertEquals(1, mockCollector.get().size());
 
         sortHandler.close();
     }
 
-//    @Test
+    //    @Test
     public void testWithTimerLock() throws InterruptedException {
         Timer timer = new Timer();
         List<Long> collected = new ArrayList<>();
@@ -265,6 +261,6 @@ public class StreamSortHandlerTest {
                     }
                 }
             }
-        },0,100);
+        }, 0, 100);
     }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a0fc8708/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/sorter/StreamWindowBenchmarkTest.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/sorter/StreamWindowBenchmarkTest.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/sorter/StreamWindowBenchmarkTest.java
index a2f0da6..3d32d72 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/sorter/StreamWindowBenchmarkTest.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/sorter/StreamWindowBenchmarkTest.java
@@ -16,10 +16,11 @@
  */
 package org.apache.eagle.alert.engine.sorter;
 
-import java.util.Map;
-import java.util.TreeMap;
-import java.util.concurrent.TimeUnit;
-
+import com.codahale.metrics.ConsoleReporter;
+import com.codahale.metrics.MetricRegistry;
+import com.codahale.metrics.ScheduledReporter;
+import com.codahale.metrics.jvm.GarbageCollectorMetricSet;
+import com.codahale.metrics.jvm.MemoryUsageGaugeSet;
 import org.apache.commons.lang.time.StopWatch;
 import org.apache.eagle.alert.engine.mock.MockSampleMetadataFactory;
 import org.apache.eagle.alert.engine.model.PartitionedEvent;
@@ -31,115 +32,120 @@ import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.codahale.metrics.ConsoleReporter;
-import com.codahale.metrics.MetricRegistry;
-import com.codahale.metrics.ScheduledReporter;
-import com.codahale.metrics.jvm.GarbageCollectorMetricSet;
-import com.codahale.metrics.jvm.MemoryUsageGaugeSet;
+import java.util.Map;
+import java.util.TreeMap;
+import java.util.concurrent.TimeUnit;
 
 @Ignore("Ignore automatic heavy benchmark test")
 public class StreamWindowBenchmarkTest {
     private final static Logger LOGGER = LoggerFactory.getLogger(StreamWindowBenchmarkTest.class);
+
     public void sendDESCOrderedEventsToWindow(StreamWindow window, StreamWindowRepository.StorageType storageType, int num) {
-        LOGGER.info("Sending {} events to {} ({})",num,window.getClass().getSimpleName(),storageType);
+        LOGGER.info("Sending {} events to {} ({})", num, window.getClass().getSimpleName(), storageType);
         StopWatch stopWatch = new StopWatch();
         stopWatch.start();
-        int i=0;
-        while(i<num) {
-            PartitionedEvent event = MockSampleMetadataFactory.createPartitionedEventGroupedByName("sampleStream_1",(window.startTime()+i));
+        int i = 0;
+        while (i < num) {
+            PartitionedEvent event = MockSampleMetadataFactory.createPartitionedEventGroupedByName("sampleStream_1", (window.startTime() + i));
             window.add(event);
             i++;
         }
         stopWatch.stop();
-        performanceReport.put(num+"\tInsertTime\t"+storageType,stopWatch.getTime());
-        LOGGER.info("Inserted {} events in {} ms",num,stopWatch.getTime());
+        performanceReport.put(num + "\tInsertTime\t" + storageType, stopWatch.getTime());
+        LOGGER.info("Inserted {} events in {} ms", num, stopWatch.getTime());
         stopWatch.reset();
         stopWatch.start();
         window.flush();
         stopWatch.stop();
-        performanceReport.put(num+"\tReadTime\t"+storageType,stopWatch.getTime());
+        performanceReport.put(num + "\tReadTime\t" + storageType, stopWatch.getTime());
     }
 
     private ScheduledReporter metricReporter;
-    private Map<String,Long> performanceReport;
+    private Map<String, Long> performanceReport;
+
     @Before
-    public void setUp(){
+    public void setUp() {
         final MetricRegistry metrics = new MetricRegistry();
         metrics.registerAll(new MemoryUsageGaugeSet());
         metrics.registerAll(new GarbageCollectorMetricSet());
         metricReporter = ConsoleReporter.forRegistry(metrics)
-                .filter((name, metric) -> name.matches("(.*heap|total).(usage|used)"))
+            .filter((name, metric) -> name.matches("(.*heap|total).(usage|used)"))
 //                .withLoggingLevel(Slf4jReporter.LoggingLevel.DEBUG)
-                .convertRatesTo(TimeUnit.SECONDS)
-                .convertDurationsTo(TimeUnit.MILLISECONDS)
-                .build();
-        metricReporter.start(60,TimeUnit.SECONDS);
+            .convertRatesTo(TimeUnit.SECONDS)
+            .convertDurationsTo(TimeUnit.MILLISECONDS)
+            .build();
+        metricReporter.start(60, TimeUnit.SECONDS);
         performanceReport = new TreeMap<>();
     }
 
     @After
-    public void after(){
+    public void after() {
         StringBuilder sb = new StringBuilder();
-        for(Map.Entry<String,Long> entry:performanceReport.entrySet()){
-            sb.append(String.format("%-40s\t%s\n",entry.getKey(),entry.getValue()));
+        for (Map.Entry<String, Long> entry : performanceReport.entrySet()) {
+            sb.append(String.format("%-40s\t%s\n", entry.getKey(), entry.getValue()));
         }
-        LOGGER.info("\n===== Benchmark Result Report =====\n\n{}",sb.toString());
+        LOGGER.info("\n===== Benchmark Result Report =====\n\n{}", sb.toString());
     }
 
     private final long start = DateTimeUtil.humanDateToMillisecondsWithoutException("2016-05-04 00:00:00,000");
     private final long stop = DateTimeUtil.humanDateToMillisecondsWithoutException("2016-05-05 00:00:00,000");
-    private final long margin = (stop - start)/3;
+    private final long margin = (stop - start) / 3;
 
-    private void benchmarkTest(StreamWindow window, StreamWindowRepository.StorageType storageType){
+    private void benchmarkTest(StreamWindow window, StreamWindowRepository.StorageType storageType) {
         StopWatch stopWatch = new StopWatch();
         stopWatch.start();
-        LOGGER.info("\n===== Benchmark Test for {} ({}) =====",window.getClass().getSimpleName(),storageType);
+        LOGGER.info("\n===== Benchmark Test for {} ({}) =====", window.getClass().getSimpleName(), storageType);
         metricReporter.report();
-        sendDESCOrderedEventsToWindow(window,storageType,1000);
+        sendDESCOrderedEventsToWindow(window, storageType, 1000);
         metricReporter.report();
-        sendDESCOrderedEventsToWindow(window,storageType,10000);
+        sendDESCOrderedEventsToWindow(window, storageType, 10000);
         metricReporter.report();
-        sendDESCOrderedEventsToWindow(window,storageType,100000);
+        sendDESCOrderedEventsToWindow(window, storageType, 100000);
         metricReporter.report();
-        sendDESCOrderedEventsToWindow(window,storageType,1000000);
+        sendDESCOrderedEventsToWindow(window, storageType, 1000000);
         metricReporter.report();
         stopWatch.stop();
-        LOGGER.info("\n===== Finished in total {} ms =====\n",stopWatch.getTime());
+        LOGGER.info("\n===== Finished in total {} ms =====\n", stopWatch.getTime());
     }
 
-    @Test @Ignore
-    public void testStreamWindowBenchmarkMain(){
+    @Test
+    @Ignore
+    public void testStreamWindowBenchmarkMain() {
         testStreamSortedWindowOnHeap();
         testStreamSortedWindowInSerializedMemory();
         testStreamSortedWindowOffHeap();
         testStreamSortedWindowFile();
     }
 
-    @Test @Ignore
+    @Test
+    @Ignore
     public void testStreamSortedWindowOnHeap() {
-        StreamWindow window = StreamWindowRepository.getSingletonInstance().createWindow(start,stop,margin,StreamWindowRepository.StorageType.ONHEAP);
-        benchmarkTest(window,StreamWindowRepository.StorageType.ONHEAP);
+        StreamWindow window = StreamWindowRepository.getSingletonInstance().createWindow(start, stop, margin, StreamWindowRepository.StorageType.ONHEAP);
+        benchmarkTest(window, StreamWindowRepository.StorageType.ONHEAP);
         window.close();
     }
 
-    @Test @Ignore
+    @Test
+    @Ignore
     public void testStreamSortedWindowInSerializedMemory() {
-        StreamWindow window = StreamWindowRepository.getSingletonInstance().createWindow(start,stop,margin,StreamWindowRepository.StorageType.MEMORY);
-        benchmarkTest(window,StreamWindowRepository.StorageType.MEMORY);
+        StreamWindow window = StreamWindowRepository.getSingletonInstance().createWindow(start, stop, margin, StreamWindowRepository.StorageType.MEMORY);
+        benchmarkTest(window, StreamWindowRepository.StorageType.MEMORY);
         window.close();
     }
 
-    @Test @Ignore
+    @Test
+    @Ignore
     public void testStreamSortedWindowOffHeap() {
-        StreamWindow window = StreamWindowRepository.getSingletonInstance().createWindow(start,stop,margin,StreamWindowRepository.StorageType.DIRECT_MEMORY);
-        benchmarkTest(window,StreamWindowRepository.StorageType.DIRECT_MEMORY);
+        StreamWindow window = StreamWindowRepository.getSingletonInstance().createWindow(start, stop, margin, StreamWindowRepository.StorageType.DIRECT_MEMORY);
+        benchmarkTest(window, StreamWindowRepository.StorageType.DIRECT_MEMORY);
         window.close();
     }
 
-    @Test @Ignore
+    @Test
+    @Ignore
     public void testStreamSortedWindowFile() {
-        StreamWindow window = StreamWindowRepository.getSingletonInstance().createWindow(start,stop,margin,StreamWindowRepository.StorageType.FILE_RAF);
-        benchmarkTest(window,StreamWindowRepository.StorageType.FILE_RAF);
+        StreamWindow window = StreamWindowRepository.getSingletonInstance().createWindow(start, stop, margin, StreamWindowRepository.StorageType.FILE_RAF);
+        benchmarkTest(window, StreamWindowRepository.StorageType.FILE_RAF);
         window.close();
     }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a0fc8708/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/sorter/StreamWindowTestSuite.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/sorter/StreamWindowTestSuite.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/sorter/StreamWindowTestSuite.java
index 950aa34..6dc9c9f 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/sorter/StreamWindowTestSuite.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/sorter/StreamWindowTestSuite.java
@@ -16,8 +16,7 @@
  */
 package org.apache.eagle.alert.engine.sorter;
 
-import java.util.List;
-
+import com.google.common.collect.Ordering;
 import org.apache.eagle.alert.engine.mock.MockPartitionedCollector;
 import org.apache.eagle.alert.engine.mock.MockSampleMetadataFactory;
 import org.apache.eagle.alert.engine.model.PartitionedEvent;
@@ -29,7 +28,7 @@ import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.google.common.collect.Ordering;
+import java.util.List;
 
 @SuppressWarnings("unused")
 public class StreamWindowTestSuite {
@@ -37,34 +36,34 @@ public class StreamWindowTestSuite {
 
     private final long start = DateTimeUtil.humanDateToMillisecondsWithoutException("2016-05-04 00:00:00,000");
     private final long stop = DateTimeUtil.humanDateToMillisecondsWithoutException("2016-05-04 00:01:00,000");
-    private final long margin = (stop - start)/3;
+    private final long margin = (stop - start) / 3;
 
     @Test
     public void testStreamSortedWindowOnHeap() {
-        StreamWindow window = StreamWindowRepository.getSingletonInstance().createWindow(start,stop,margin,StreamWindowRepository.StorageType.ONHEAP);
+        StreamWindow window = StreamWindowRepository.getSingletonInstance().createWindow(start, stop, margin, StreamWindowRepository.StorageType.ONHEAP);
         streamSortedWindowMustTest(window);
     }
 
     @Test
     public void testStreamSortedWindowInSerializedMemory() {
-        StreamWindow window = StreamWindowRepository.getSingletonInstance().createWindow(start,stop,margin,StreamWindowRepository.StorageType.MEMORY);
+        StreamWindow window = StreamWindowRepository.getSingletonInstance().createWindow(start, stop, margin, StreamWindowRepository.StorageType.MEMORY);
         streamSortedWindowMustTest(window);
     }
 
     @Test
     public void testStreamSortedWindowOffHeap() {
-        StreamWindow window = StreamWindowRepository.getSingletonInstance().createWindow(start,stop,margin,StreamWindowRepository.StorageType.DIRECT_MEMORY);
+        StreamWindow window = StreamWindowRepository.getSingletonInstance().createWindow(start, stop, margin, StreamWindowRepository.StorageType.DIRECT_MEMORY);
         streamSortedWindowMustTest(window);
     }
 
     @Test
     public void testStreamSortedWindowFile() {
-        StreamWindow window = StreamWindowRepository.getSingletonInstance().createWindow(start,stop,margin,StreamWindowRepository.StorageType.FILE_RAF);
+        StreamWindow window = StreamWindowRepository.getSingletonInstance().createWindow(start, stop, margin, StreamWindowRepository.StorageType.FILE_RAF);
         streamSortedWindowMustTest(window);
     }
 
-    @SuppressWarnings({ "unchecked", "rawtypes" })
-    private void streamSortedWindowMustTest(StreamWindow window){
+    @SuppressWarnings( {"unchecked", "rawtypes"})
+    private void streamSortedWindowMustTest(StreamWindow window) {
         MockPartitionedCollector collector = new MockPartitionedCollector();
         window.register(collector);
 
@@ -72,7 +71,7 @@ public class StreamWindowTestSuite {
         clock.moveForward(DateTimeUtil.humanDateToMillisecondsWithoutException("2016-05-04 00:00:30,000"));
 
         // Current time is: "2016-05-04 00:00:30"
-        window.onTick(clock,System.currentTimeMillis());
+        window.onTick(clock, System.currentTimeMillis());
 
         Assert.assertTrue(window.alive());
         Assert.assertFalse(window.expired());
@@ -91,74 +90,74 @@ public class StreamWindowTestSuite {
         Assert.assertFalse(window.accept(DateTimeUtil.humanDateToMillisecondsWithoutException("2016-05-04 00:01:01,000")));
 
         // Accepted
-        Assert.assertTrue(window.add(MockSampleMetadataFactory.createRandomPartitionedEvent("sampleStream_1",DateTimeUtil.humanDateToMillisecondsWithoutException("2016-05-04 00:00:00,000"))));
+        Assert.assertTrue(window.add(MockSampleMetadataFactory.createRandomPartitionedEvent("sampleStream_1", DateTimeUtil.humanDateToMillisecondsWithoutException("2016-05-04 00:00:00,000"))));
 
-        Assert.assertTrue(window.add(MockSampleMetadataFactory.createRandomPartitionedEvent("sampleStream_1",DateTimeUtil.humanDateToMillisecondsWithoutException("2016-05-04 00:00:01,000"))));
-        Assert.assertTrue(window.add(MockSampleMetadataFactory.createRandomPartitionedEvent("sampleStream_1",DateTimeUtil.humanDateToMillisecondsWithoutException("2016-05-04 00:00:50,000"))));
-        Assert.assertTrue(window.add(MockSampleMetadataFactory.createRandomPartitionedEvent("sampleStream_1",DateTimeUtil.humanDateToMillisecondsWithoutException("2016-05-04 00:00:40,000"))));
-        Assert.assertTrue(window.add(MockSampleMetadataFactory.createRandomPartitionedEvent("sampleStream_1",DateTimeUtil.humanDateToMillisecondsWithoutException("2016-05-04 00:00:30,000"))));
+        Assert.assertTrue(window.add(MockSampleMetadataFactory.createRandomPartitionedEvent("sampleStream_1", DateTimeUtil.humanDateToMillisecondsWithoutException("2016-05-04 00:00:01,000"))));
+        Assert.assertTrue(window.add(MockSampleMetadataFactory.createRandomPartitionedEvent("sampleStream_1", DateTimeUtil.humanDateToMillisecondsWithoutException("2016-05-04 00:00:50,000"))));
+        Assert.assertTrue(window.add(MockSampleMetadataFactory.createRandomPartitionedEvent("sampleStream_1", DateTimeUtil.humanDateToMillisecondsWithoutException("2016-05-04 00:00:40,000"))));
+        Assert.assertTrue(window.add(MockSampleMetadataFactory.createRandomPartitionedEvent("sampleStream_1", DateTimeUtil.humanDateToMillisecondsWithoutException("2016-05-04 00:00:30,000"))));
 
         // Should accept Duplicated
-        Assert.assertTrue("Should support duplicated timestamp",window.add(MockSampleMetadataFactory.createRandomPartitionedEvent("sampleStream_1",DateTimeUtil.humanDateToMillisecondsWithoutException("2016-05-04 00:00:00,000"))));
+        Assert.assertTrue("Should support duplicated timestamp", window.add(MockSampleMetadataFactory.createRandomPartitionedEvent("sampleStream_1", DateTimeUtil.humanDateToMillisecondsWithoutException("2016-05-04 00:00:00,000"))));
 
-        Assert.assertEquals(6,window.size());
+        Assert.assertEquals(6, window.size());
 
         // Rejected
-        Assert.assertFalse(window.add(MockSampleMetadataFactory.createRandomPartitionedEvent("sampleStream_1",DateTimeUtil.humanDateToMillisecondsWithoutException("2016-05-03 23:59:59,000"))));
-        Assert.assertFalse(window.add(MockSampleMetadataFactory.createRandomPartitionedEvent("sampleStream_1",DateTimeUtil.humanDateToMillisecondsWithoutException("2016-05-04 00:01:00,000"))));
-        Assert.assertFalse(window.add(MockSampleMetadataFactory.createRandomPartitionedEvent("sampleStream_1",DateTimeUtil.humanDateToMillisecondsWithoutException("2016-05-04 00:01:01,000"))));
+        Assert.assertFalse(window.add(MockSampleMetadataFactory.createRandomPartitionedEvent("sampleStream_1", DateTimeUtil.humanDateToMillisecondsWithoutException("2016-05-03 23:59:59,000"))));
+        Assert.assertFalse(window.add(MockSampleMetadataFactory.createRandomPartitionedEvent("sampleStream_1", DateTimeUtil.humanDateToMillisecondsWithoutException("2016-05-04 00:01:00,000"))));
+        Assert.assertFalse(window.add(MockSampleMetadataFactory.createRandomPartitionedEvent("sampleStream_1", DateTimeUtil.humanDateToMillisecondsWithoutException("2016-05-04 00:01:01,000"))));
 
-        Assert.assertEquals(6,window.size());
+        Assert.assertEquals(6, window.size());
 
         // Now is: "2016-05-04 00:00:55"
         clock.moveForward(DateTimeUtil.humanDateToMillisecondsWithoutException("2016-05-04 00:00:55,000"));
-        window.onTick(clock,System.currentTimeMillis());
+        window.onTick(clock, System.currentTimeMillis());
         Assert.assertTrue(window.alive());
         Assert.assertFalse(window.expired());
-        Assert.assertTrue(window.add(MockSampleMetadataFactory.createRandomPartitionedEvent("sampleStream_1",DateTimeUtil.humanDateToMillisecondsWithoutException("2016-05-04 00:00:35,000"))));
-        Assert.assertEquals(7,window.size());
+        Assert.assertTrue(window.add(MockSampleMetadataFactory.createRandomPartitionedEvent("sampleStream_1", DateTimeUtil.humanDateToMillisecondsWithoutException("2016-05-04 00:00:35,000"))));
+        Assert.assertEquals(7, window.size());
 
         // Flush when stream time delay too much after system time but window will still be alive
-        window.onTick(clock,System.currentTimeMillis() + 1 + stop - start + margin);
+        window.onTick(clock, System.currentTimeMillis() + 1 + stop - start + margin);
         Assert.assertTrue(window.alive());
         Assert.assertFalse(window.expired());
-        Assert.assertEquals(0,window.size());
-        Assert.assertEquals(7,collector.size());
+        Assert.assertEquals(0, window.size());
+        Assert.assertEquals(7, collector.size());
 
         Assert.assertFalse("Because window has flushed but not expired, window should reject future events < last flush stream time",
-                window.add(MockSampleMetadataFactory.createRandomPartitionedEvent("sampleStream_1",DateTimeUtil.humanDateToMillisecondsWithoutException("2016-05-04 00:00:54,000"))));
+            window.add(MockSampleMetadataFactory.createRandomPartitionedEvent("sampleStream_1", DateTimeUtil.humanDateToMillisecondsWithoutException("2016-05-04 00:00:54,000"))));
         Assert.assertTrue("Because window has flushed but not expired, window should still accept future events >= last flush stream time",
-                window.add(MockSampleMetadataFactory.createRandomPartitionedEvent("sampleStream_1",DateTimeUtil.humanDateToMillisecondsWithoutException("2016-05-04 00:00:56,000"))));
-        Assert.assertEquals(1,window.size());
-        Assert.assertEquals(7,collector.size());
+            window.add(MockSampleMetadataFactory.createRandomPartitionedEvent("sampleStream_1", DateTimeUtil.humanDateToMillisecondsWithoutException("2016-05-04 00:00:56,000"))));
+        Assert.assertEquals(1, window.size());
+        Assert.assertEquals(7, collector.size());
 
         // Now is: "2016-05-04 00:01:10", not expire,
         clock.moveForward(DateTimeUtil.humanDateToMillisecondsWithoutException("2016-05-04 00:01:10,000"));
-        window.onTick(clock,System.currentTimeMillis() + 2 * (1+ stop - start + margin));
-        Assert.assertEquals(8,collector.size());
+        window.onTick(clock, System.currentTimeMillis() + 2 * (1 + stop - start + margin));
+        Assert.assertEquals(8, collector.size());
 
         // Now is: "2016-05-04 00:01:20", expire
         clock.moveForward(DateTimeUtil.humanDateToMillisecondsWithoutException("2016-05-04 00:01:20,000"));
-        window.onTick(clock,System.currentTimeMillis());
+        window.onTick(clock, System.currentTimeMillis());
         Assert.assertFalse(window.alive());
         Assert.assertTrue(window.expired());
-        Assert.assertFalse(window.add(MockSampleMetadataFactory.createRandomPartitionedEvent("sampleStream_1",DateTimeUtil.humanDateToMillisecondsWithoutException("2016-05-04 00:00:35,000"))));
-        Assert.assertEquals(0,window.size());
+        Assert.assertFalse(window.add(MockSampleMetadataFactory.createRandomPartitionedEvent("sampleStream_1", DateTimeUtil.humanDateToMillisecondsWithoutException("2016-05-04 00:00:35,000"))));
+        Assert.assertEquals(0, window.size());
 
-        Assert.assertEquals(8,collector.size());
+        Assert.assertEquals(8, collector.size());
 
         Ordering ordering = Ordering.from(PartitionedEventTimeOrderingComparator.INSTANCE);
         Assert.assertTrue(ordering.isOrdered(collector.get()));
 
         List<PartitionedEvent> list = collector.get();
-        Assert.assertEquals(8,list.size());
-        Assert.assertEquals(DateTimeUtil.humanDateToMillisecondsWithoutException("2016-05-04 00:00:00,000"),list.get(0).getTimestamp());
-        Assert.assertEquals(DateTimeUtil.humanDateToMillisecondsWithoutException("2016-05-04 00:00:00,000"),list.get(1).getTimestamp());
-        Assert.assertEquals(DateTimeUtil.humanDateToMillisecondsWithoutException("2016-05-04 00:00:01,000"),list.get(2).getTimestamp());
-        Assert.assertEquals(DateTimeUtil.humanDateToMillisecondsWithoutException("2016-05-04 00:00:30,000"),list.get(3).getTimestamp());
-        Assert.assertEquals(DateTimeUtil.humanDateToMillisecondsWithoutException("2016-05-04 00:00:35,000"),list.get(4).getTimestamp());
-        Assert.assertEquals(DateTimeUtil.humanDateToMillisecondsWithoutException("2016-05-04 00:00:40,000"),list.get(5).getTimestamp());
-        Assert.assertEquals(DateTimeUtil.humanDateToMillisecondsWithoutException("2016-05-04 00:00:50,000"),list.get(6).getTimestamp());
-        Assert.assertEquals(DateTimeUtil.humanDateToMillisecondsWithoutException("2016-05-04 00:00:56,000"),list.get(7).getTimestamp());
+        Assert.assertEquals(8, list.size());
+        Assert.assertEquals(DateTimeUtil.humanDateToMillisecondsWithoutException("2016-05-04 00:00:00,000"), list.get(0).getTimestamp());
+        Assert.assertEquals(DateTimeUtil.humanDateToMillisecondsWithoutException("2016-05-04 00:00:00,000"), list.get(1).getTimestamp());
+        Assert.assertEquals(DateTimeUtil.humanDateToMillisecondsWithoutException("2016-05-04 00:00:01,000"), list.get(2).getTimestamp());
+        Assert.assertEquals(DateTimeUtil.humanDateToMillisecondsWithoutException("2016-05-04 00:00:30,000"), list.get(3).getTimestamp());
+        Assert.assertEquals(DateTimeUtil.humanDateToMillisecondsWithoutException("2016-05-04 00:00:35,000"), list.get(4).getTimestamp());
+        Assert.assertEquals(DateTimeUtil.humanDateToMillisecondsWithoutException("2016-05-04 00:00:40,000"), list.get(5).getTimestamp());
+        Assert.assertEquals(DateTimeUtil.humanDateToMillisecondsWithoutException("2016-05-04 00:00:50,000"), list.get(6).getTimestamp());
+        Assert.assertEquals(DateTimeUtil.humanDateToMillisecondsWithoutException("2016-05-04 00:00:56,000"), list.get(7).getTimestamp());
     }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a0fc8708/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/sorter/TreeMultisetComparatorTest.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/sorter/TreeMultisetComparatorTest.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/sorter/TreeMultisetComparatorTest.java
index b7e76b8..c59f0de 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/sorter/TreeMultisetComparatorTest.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/sorter/TreeMultisetComparatorTest.java
@@ -16,9 +16,7 @@
  */
 package org.apache.eagle.alert.engine.sorter;
 
-import java.util.Arrays;
-import java.util.Iterator;
-
+import com.google.common.collect.TreeMultiset;
 import org.apache.eagle.alert.engine.coordinator.StreamPartition;
 import org.apache.eagle.alert.engine.model.PartitionedEvent;
 import org.apache.eagle.alert.engine.model.StreamEvent;
@@ -26,7 +24,8 @@ import org.apache.eagle.alert.engine.sorter.impl.PartitionedEventTimeOrderingCom
 import org.junit.Assert;
 import org.junit.Test;
 
-import com.google.common.collect.TreeMultiset;
+import java.util.Arrays;
+import java.util.Iterator;
 
 /**
  * Since 5/10/16.
@@ -37,7 +36,7 @@ public class TreeMultisetComparatorTest {
      * when they are added into TreeMultiset, the second event will be replaced by the first event
      */
     @Test
-    public void testComparator(){
+    public void testComparator() {
         TreeMultiset<PartitionedEvent> set = TreeMultiset.create(PartitionedEventTimeOrderingComparator.INSTANCE);
 
         // construct PartitionEvent1
@@ -50,7 +49,7 @@ public class TreeMultisetComparatorTest {
         event1.setPartition(sp);
         event1.setPartitionKey(1000);
         StreamEvent e1 = new StreamEvent();
-        e1.setData(new Object[]{18.4});
+        e1.setData(new Object[] {18.4});
         e1.setStreamId("testStreamId");
         e1.setTimestamp(1462909984000L);
         event1.setEvent(e1);
@@ -60,7 +59,7 @@ public class TreeMultisetComparatorTest {
         event2.setPartition(sp);
         event2.setPartitionKey(1000);
         StreamEvent e2 = new StreamEvent();
-        e2.setData(new Object[]{16.3});
+        e2.setData(new Object[] {16.3});
         e2.setStreamId("testStreamId");
         e2.setTimestamp(1462909984000L);
         event2.setEvent(e2);
@@ -70,7 +69,7 @@ public class TreeMultisetComparatorTest {
         event3.setPartition(sp);
         event3.setPartitionKey(1000);
         StreamEvent e3 = new StreamEvent();
-        e3.setData(new Object[]{14.3});
+        e3.setData(new Object[] {14.3});
         e3.setStreamId("testStreamId");
         e3.setTimestamp(1462909984001L);
         event3.setEvent(e3);
@@ -79,13 +78,13 @@ public class TreeMultisetComparatorTest {
         event4.setPartition(sp);
         event4.setPartitionKey(1000);
         StreamEvent e4 = new StreamEvent();
-        e4.setData(new Object[]{14.3});
+        e4.setData(new Object[] {14.3});
         e4.setStreamId("testStreamId");
         e4.setTimestamp(1462909984001L);
         event4.setEvent(e4);
 
-        Assert.assertNotEquals(event2,event3);
-        Assert.assertEquals(event3,event4);
+        Assert.assertNotEquals(event2, event3);
+        Assert.assertEquals(event3, event4);
 
         // check content in set
         set.add(event1);
@@ -96,15 +95,15 @@ public class TreeMultisetComparatorTest {
         set.forEach(System.out::println);
 
 
-        Assert.assertEquals(-1,PartitionedEventTimeOrderingComparator.INSTANCE.compare(event1,event2));
-        Assert.assertEquals(-1,PartitionedEventTimeOrderingComparator.INSTANCE.compare(event1,event3));
-        Assert.assertEquals(-1,PartitionedEventTimeOrderingComparator.INSTANCE.compare(event2,event3));
-        Assert.assertEquals(0,PartitionedEventTimeOrderingComparator.INSTANCE.compare(event3,event4));
+        Assert.assertEquals(-1, PartitionedEventTimeOrderingComparator.INSTANCE.compare(event1, event2));
+        Assert.assertEquals(-1, PartitionedEventTimeOrderingComparator.INSTANCE.compare(event1, event3));
+        Assert.assertEquals(-1, PartitionedEventTimeOrderingComparator.INSTANCE.compare(event2, event3));
+        Assert.assertEquals(0, PartitionedEventTimeOrderingComparator.INSTANCE.compare(event3, event4));
 
         Iterator<PartitionedEvent> it = set.iterator();
-        Assert.assertEquals(16.3,it.next().getData()[0]);
-        Assert.assertEquals(18.4,it.next().getData()[0]);
-        Assert.assertEquals(14.3,it.next().getData()[0]);
-        Assert.assertEquals(14.3,it.next().getData()[0]);
+        Assert.assertEquals(16.3, it.next().getData()[0]);
+        Assert.assertEquals(18.4, it.next().getData()[0]);
+        Assert.assertEquals(14.3, it.next().getData()[0]);
+        Assert.assertEquals(14.3, it.next().getData()[0]);
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a0fc8708/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/statecheck/TestStateCheckPolicy.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/statecheck/TestStateCheckPolicy.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/statecheck/TestStateCheckPolicy.java
index 8a5d616..1fc54a9 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/statecheck/TestStateCheckPolicy.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/statecheck/TestStateCheckPolicy.java
@@ -106,7 +106,7 @@ public class TestStateCheckPolicy {
             mapdata.put("value", 1000.0 + i * 1000.0);
             mapdata.put("colo", "phx");
 
-            StreamEvent event = StreamEvent.Builder().timestamep(time).attributes(mapdata, definition).build();
+            StreamEvent event = StreamEvent.builder().timestamep(time).attributes(mapdata, definition).build();
             PartitionedEvent pEvent = new PartitionedEvent(event, policyDefinition.getPartitionSpec().get(0), 1);
 
             GeneralTopologyContext mock = Mockito.mock(GeneralTopologyContext.class);
@@ -122,12 +122,13 @@ public class TestStateCheckPolicy {
     @NotNull
     private Map<String, StreamDefinition> createStreamMap() throws Exception {
         List<StreamDefinition> streams = mapper.readValue(TestStateCheckPolicy.class.getResourceAsStream("/statecheck/streamdefinitions.json"),
-                new TypeReference<List<StreamDefinition>>() {
-                });
+            new TypeReference<List<StreamDefinition>>() {
+            });
         return streams.stream().collect(Collectors.toMap(StreamDefinition::getStreamId, item -> item));
     }
 
     private static ObjectMapper mapper = new ObjectMapper();
+
     static {
         mapper.configure(DeserializationConfig.Feature.FAIL_ON_UNKNOWN_PROPERTIES, false);
     }
@@ -139,8 +140,8 @@ public class TestStateCheckPolicy {
         spec.setTopologyName("testTopology");
 
         List<PolicyDefinition> policies = mapper.readValue(TestStateCheckPolicy.class.getResourceAsStream("/statecheck/policies.json"),
-                new TypeReference<List<PolicyDefinition>>() {
-                });
+            new TypeReference<List<PolicyDefinition>>() {
+            });
         Assert.assertTrue(policies.size() > 0);
         spec.addBoltPolicy("alertBolt1", policies.get(0).getName());
         spec.getBoltPoliciesMap().put("alertBolt1", new ArrayList<>(policies));

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a0fc8708/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/topology/AlertTopologyTest.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/topology/AlertTopologyTest.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/topology/AlertTopologyTest.java
index 30a0ef9..7212785 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/topology/AlertTopologyTest.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/topology/AlertTopologyTest.java
@@ -19,10 +19,13 @@
 
 package org.apache.eagle.alert.engine.topology;
 
-import java.io.Serializable;
-import java.util.HashMap;
-import java.util.Properties;
-
+import backtype.storm.LocalCluster;
+import backtype.storm.topology.BoltDeclarer;
+import backtype.storm.topology.SpoutDeclarer;
+import backtype.storm.topology.TopologyBuilder;
+import backtype.storm.tuple.Fields;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
 import org.apache.eagle.alert.engine.spout.CorrelationSpout;
 import org.apache.eagle.alert.engine.spout.CreateTopicUtils;
 import org.apache.kafka.clients.producer.KafkaProducer;
@@ -34,19 +37,15 @@ import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import backtype.storm.LocalCluster;
-import backtype.storm.topology.BoltDeclarer;
-import backtype.storm.topology.SpoutDeclarer;
-import backtype.storm.topology.TopologyBuilder;
-import backtype.storm.tuple.Fields;
-
-import com.typesafe.config.Config;
-import com.typesafe.config.ConfigFactory;
+import java.io.Serializable;
+import java.util.HashMap;
+import java.util.Properties;
 
-@SuppressWarnings({"serial", "unused"})
-public class AlertTopologyTest implements Serializable{
+@SuppressWarnings( {"serial", "unused"})
+public class AlertTopologyTest implements Serializable {
     private static final Logger LOG = LoggerFactory.getLogger(AlertTopologyTest.class);
     char[] alphabets = {'a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i', 'j', 'k', 'l', 'm', 'n', 'o', 'p', 'q', 'r', 's', 't', 'u', 'v', 'w', 'x', 'y', 'z'};
+
     @Ignore
     @Test
     public void testMultipleTopics() throws Exception {
@@ -83,7 +82,7 @@ public class AlertTopologyTest implements Serializable{
         LocalCluster cluster = new LocalCluster();
         cluster.submitTopology(topoName, new HashMap<>(), topoBuilder.createTopology());
 
-        while(true) {
+        while (true) {
             try {
                 Thread.sleep(1000);
             } catch (Exception e) {
@@ -92,10 +91,10 @@ public class AlertTopologyTest implements Serializable{
         }
     }
 
-    @SuppressWarnings({ "rawtypes", "unchecked" })
+    @SuppressWarnings( {"rawtypes", "unchecked"})
     @Ignore
     @Test
-    public void generateRandomStringsToKafka(){
+    public void generateRandomStringsToKafka() {
         String topic = "testTopic3";
         int max = 1000;
         Properties configMap = new Properties();
@@ -104,33 +103,34 @@ public class AlertTopologyTest implements Serializable{
         configMap.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
         configMap.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
         configMap.put("request.required.acks", "1");
-        configMap.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
-        configMap.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
+        configMap.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
+        configMap.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
         KafkaProducer<String, Object> producer = new KafkaProducer<>(configMap);
 
         int i = 0;
-        while(i++ < max) {
+        while (i++ < max) {
             String randomString = generateRandomString();
             System.out.println("sending string : " + randomString);
             ProducerRecord record = new ProducerRecord(topic, randomString);
             producer.send(record);
-            if(i % 10 == 0){
+            if (i % 10 == 0) {
                 try {
                     Thread.sleep(10);
-                }catch(Exception ex){
+                } catch (Exception ex) {
                 }
             }
         }
         producer.close();
     }
 
-    private String generateRandomString(){
+    private String generateRandomString() {
         long count = Math.round(Math.random() * 10);
-        if(count == 0)
+        if (count == 0) {
             count = 1;
+        }
         StringBuilder sb = new StringBuilder();
-        while(count-- > 0) {
-            int index = (int)(Math.floor(Math.random()*26));
+        while (count-- > 0) {
+            int index = (int) (Math.floor(Math.random() * 26));
             sb.append(alphabets[index]);
         }
         return sb.toString();

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a0fc8708/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/topology/CoordinatorSpoutIntegrationTest.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/topology/CoordinatorSpoutIntegrationTest.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/topology/CoordinatorSpoutIntegrationTest.java
index deca9b4..d011770 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/topology/CoordinatorSpoutIntegrationTest.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/topology/CoordinatorSpoutIntegrationTest.java
@@ -19,9 +19,15 @@
 
 package org.apache.eagle.alert.engine.topology;
 
-import java.io.Serializable;
-import java.util.HashMap;
-
+import backtype.storm.LocalCluster;
+import backtype.storm.generated.StormTopology;
+import backtype.storm.topology.BoltDeclarer;
+import backtype.storm.topology.SpoutDeclarer;
+import backtype.storm.topology.TopologyBuilder;
+import backtype.storm.tuple.Fields;
+import backtype.storm.utils.Utils;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
 import org.apache.eagle.alert.engine.spout.CorrelationSpout;
 import org.apache.eagle.alert.engine.spout.CreateTopicUtils;
 import org.apache.eagle.alert.utils.AlertConstants;
@@ -33,26 +39,19 @@ import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import backtype.storm.LocalCluster;
-import backtype.storm.generated.StormTopology;
-import backtype.storm.topology.BoltDeclarer;
-import backtype.storm.topology.SpoutDeclarer;
-import backtype.storm.topology.TopologyBuilder;
-import backtype.storm.tuple.Fields;
-import backtype.storm.utils.Utils;
-
-import com.typesafe.config.Config;
-import com.typesafe.config.ConfigFactory;
+import java.io.Serializable;
+import java.util.HashMap;
 
 /**
  * Since 4/28/16.
  */
-@SuppressWarnings({"serial", "unused", "rawtypes"})
+@SuppressWarnings( {"serial", "unused", "rawtypes"})
 public class CoordinatorSpoutIntegrationTest implements Serializable {
     private static final Logger LOG = LoggerFactory.getLogger(CoordinatorSpoutIntegrationTest.class);
+
     @Ignore  // this test need zookeeper
     @Test
-    public void testConfigNotify() throws Exception{
+    public void testConfigNotify() throws Exception {
         final String topoId = "myTopology";
         int numGroupbyBolts = 2;
         int numTotalGroupbyBolts = 3;
@@ -77,14 +76,14 @@ public class CoordinatorSpoutIntegrationTest implements Serializable {
         int numBolts = config.getInt("correlation.numGroupbyBolts");
         String spoutId = "correlation-spout";
         CorrelationSpout spout = new CorrelationSpout(config, topoId,
-                new MockMetadataChangeNotifyService(topoId, spoutId), numBolts);
+            new MockMetadataChangeNotifyService(topoId, spoutId), numBolts);
         SpoutDeclarer declarer = topoBuilder.setSpout(spoutId, spout);
         declarer.setNumTasks(2);
         for (int i = 0; i < numBolts; i++) {
             TestBolt bolt = new TestBolt();
             BoltDeclarer boltDecl = topoBuilder.setBolt("engineBolt" + i, bolt);
             boltDecl.fieldsGrouping(spoutId,
-                    StreamIdConversion.generateStreamIdBetween(AlertConstants.DEFAULT_SPOUT_NAME, AlertConstants.DEFAULT_ROUTERBOLT_NAME+i), new Fields());
+                StreamIdConversion.generateStreamIdBetween(AlertConstants.DEFAULT_SPOUT_NAME, AlertConstants.DEFAULT_ROUTERBOLT_NAME + i), new Fields());
         }
 
         String topoName = config.getString("correlation.topologyName");

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a0fc8708/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/topology/CorrelationSpoutTest.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/topology/CorrelationSpoutTest.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/topology/CorrelationSpoutTest.java
index 5c5a37f..41e49fa 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/topology/CorrelationSpoutTest.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/topology/CorrelationSpoutTest.java
@@ -19,12 +19,10 @@
 
 package org.apache.eagle.alert.engine.topology;
 
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.atomic.AtomicBoolean;
-
+import backtype.storm.spout.SpoutOutputCollector;
+import backtype.storm.task.TopologyContext;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
 import org.apache.eagle.alert.coordination.model.Kafka2TupleMetadata;
 import org.apache.eagle.alert.coordination.model.SpoutSpec;
 import org.apache.eagle.alert.coordination.model.StreamRepartitionMetadata;
@@ -35,13 +33,13 @@ import org.junit.Assert;
 import org.junit.Ignore;
 import org.junit.Test;
 import org.slf4j.Logger;
-
 import storm.kafka.KafkaSpoutWrapper;
-import backtype.storm.spout.SpoutOutputCollector;
-import backtype.storm.task.TopologyContext;
 
-import com.typesafe.config.Config;
-import com.typesafe.config.ConfigFactory;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 @Ignore
 public class CorrelationSpoutTest {
@@ -50,17 +48,17 @@ public class CorrelationSpoutTest {
     private static final Logger LOG = org.slf4j.LoggerFactory.getLogger(CorrelationSpoutTest.class);
     private String dataSourceName = "ds-name";
 
-    @SuppressWarnings({ "serial", "rawtypes" })
+    @SuppressWarnings( {"serial", "rawtypes"})
     @Test
-    public void testMetadataInjestion_emptyMetadata() throws Exception{
+    public void testMetadataInjestion_emptyMetadata() throws Exception {
         String topoId = "testMetadataInjection";
         Config config = ConfigFactory.load();
         AtomicBoolean validated = new AtomicBoolean(false);
         CorrelationSpout spout = new CorrelationSpout(config, topoId, null, 1) {
             @Override
             protected KafkaSpoutWrapper createKafkaSpout(Map conf, TopologyContext context,
-                    SpoutOutputCollector collector, String topic, String schemeClsName, SpoutSpec streamMetadatas, Map<String, StreamDefinition> sds)
-                    throws Exception {
+                                                         SpoutOutputCollector collector, String topic, String schemeClsName, SpoutSpec streamMetadatas, Map<String, StreamDefinition> sds)
+                throws Exception {
                 validated.set(true);
                 return null;
             }
@@ -72,7 +70,7 @@ public class CorrelationSpoutTest {
         ds.setTopic("name-of-topic1");
         ds.setSchemeCls("PlainStringScheme");
         ds.setCodec(new Tuple2StreamMetadata());
-        Map<String,Kafka2TupleMetadata> dsMap = new HashMap<String, Kafka2TupleMetadata>();
+        Map<String, Kafka2TupleMetadata> dsMap = new HashMap<String, Kafka2TupleMetadata>();
         dsMap.put(ds.getName(), ds);
 
         StreamRepartitionMetadata m1 = new StreamRepartitionMetadata(ds.getName(), "s1");
@@ -81,12 +79,12 @@ public class CorrelationSpoutTest {
         dataSources.put(ds.getName(), Arrays.asList(m1));
 
         SpoutSpec newMetadata = new SpoutSpec(topoId, dataSources, null, dsMap);
-        
+
         spout.onReload(newMetadata, null);
         Assert.assertTrue(validated.get());
     }
 
-    @SuppressWarnings({ "serial", "rawtypes" })
+    @SuppressWarnings( {"serial", "rawtypes"})
     @Test
     public void testMetadataInjestion_oneNewTopic2Streams() throws Exception {
         String topoId = "testMetadataInjection";
@@ -94,15 +92,15 @@ public class CorrelationSpoutTest {
 
         Config config = ConfigFactory.load();
         final AtomicBoolean verified = new AtomicBoolean(false);
-        CorrelationSpout spout = new CorrelationSpout(config, topoId, null, 1)  {
+        CorrelationSpout spout = new CorrelationSpout(config, topoId, null, 1) {
             @Override
-            protected KafkaSpoutWrapper createKafkaSpout(Map conf, 
-                    TopologyContext context,
-                    SpoutOutputCollector collector, 
-                    String topic, 
-                    String topic2SchemeClsName,
-                    SpoutSpec streamMetadatas,
-                    Map<String, StreamDefinition> sds) {
+            protected KafkaSpoutWrapper createKafkaSpout(Map conf,
+                                                         TopologyContext context,
+                                                         SpoutOutputCollector collector,
+                                                         String topic,
+                                                         String topic2SchemeClsName,
+                                                         SpoutSpec streamMetadatas,
+                                                         Map<String, StreamDefinition> sds) {
                 Assert.assertEquals(1, streamMetadatas.getStreamRepartitionMetadataMap().size());
                 Assert.assertTrue(streamMetadatas.getStream("s1") != null);
                 Assert.assertTrue(streamMetadatas.getStream("s2") != null);
@@ -128,7 +126,7 @@ public class CorrelationSpoutTest {
 
     private Map<String, Kafka2TupleMetadata> createDatasource(final String topicName, final String dataSourceName) {
         Kafka2TupleMetadata ds = new Kafka2TupleMetadata();
-        
+
         ds.setName(dataSourceName);
         ds.setType("KAFKA");
         ds.setProperties(new HashMap<String, String>());
@@ -140,9 +138,9 @@ public class CorrelationSpoutTest {
         return dsMap;
     }
 
-    @SuppressWarnings({ "serial", "rawtypes" })
+    @SuppressWarnings( {"serial", "rawtypes"})
     @Test
-    public void testMetadataInjestion_deleteOneTopic() throws Exception{
+    public void testMetadataInjestion_deleteOneTopic() throws Exception {
         String topoId = "testMetadataInjection";
         final String topicName = "testTopic";
         Config config = ConfigFactory.load();
@@ -151,11 +149,12 @@ public class CorrelationSpoutTest {
             @Override
             protected KafkaSpoutWrapper createKafkaSpout(Map conf, TopologyContext context, SpoutOutputCollector collector, final String topic,
                                                          String schemeClsName, SpoutSpec streamMetadatas,
-                                                         Map<String, StreamDefinition> sds){
+                                                         Map<String, StreamDefinition> sds) {
                 return new KafkaSpoutWrapper(null, null);
             }
+
             @Override
-            protected void removeKafkaSpout(KafkaSpoutWrapper wrapper){
+            protected void removeKafkaSpout(KafkaSpoutWrapper wrapper) {
                 LOG.info("successfully verified removed topic and streams");
                 verified.set(true);
             }
@@ -174,8 +173,8 @@ public class CorrelationSpoutTest {
         // delete new topic
         try {
             spout.onReload(new SpoutSpec(topoId, new HashMap<String, List<StreamRepartitionMetadata>>(), new HashMap<>(), new HashMap<String, Kafka2TupleMetadata>()),
-                    null);
-        }catch(Exception ex){
+                null);
+        } catch (Exception ex) {
             LOG.error("error reloading spout metadata", ex);
             throw ex;
         }

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a0fc8708/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/topology/FastWordCountTopology.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/topology/FastWordCountTopology.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/topology/FastWordCountTopology.java
index 4779fac..5aff5c8 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/topology/FastWordCountTopology.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/topology/FastWordCountTopology.java
@@ -6,9 +6,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- *
+ * <p>
  * http://www.apache.org/licenses/LICENSE-2.0
- *
+ * <p>
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -17,23 +17,9 @@
  */
 package org.apache.eagle.alert.engine.topology;
 
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Random;
-import java.util.concurrent.ThreadLocalRandom;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
 import backtype.storm.Config;
 import backtype.storm.LocalCluster;
-import backtype.storm.generated.ClusterSummary;
-import backtype.storm.generated.ExecutorSummary;
-import backtype.storm.generated.KillOptions;
-import backtype.storm.generated.Nimbus;
-import backtype.storm.generated.SpoutStats;
-import backtype.storm.generated.TopologyInfo;
-import backtype.storm.generated.TopologySummary;
+import backtype.storm.generated.*;
 import backtype.storm.metric.LoggingMetricsConsumer;
 import backtype.storm.spout.SpoutOutputCollector;
 import backtype.storm.task.TopologyContext;
@@ -46,6 +32,13 @@ import backtype.storm.tuple.Fields;
 import backtype.storm.tuple.Tuple;
 import backtype.storm.tuple.Values;
 import backtype.storm.utils.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Random;
+import java.util.concurrent.ThreadLocalRandom;
 
 /**
  * WordCount but the spout does not stop, and the bolts are implemented in
@@ -58,11 +51,11 @@ public class FastWordCountTopology {
         SpoutOutputCollector _collector;
         Random _rand;
         private static final String[] CHOICES = {
-                "marry had a little lamb whos fleese was white as snow",
-                "and every where that marry went the lamb was sure to go",
-                "one two three four five six seven eight nine ten",
-                "this is a test of the emergency broadcast system this is only a test",
-                "peter piper picked a peck of pickeled peppers"
+            "marry had a little lamb whos fleese was white as snow",
+            "and every where that marry went the lamb was sure to go",
+            "one two three four five six seven eight nine ten",
+            "this is a test of the emergency broadcast system this is only a test",
+            "peter piper picked a peck of pickeled peppers"
         };
 
         @SuppressWarnings("rawtypes")
@@ -76,7 +69,7 @@ public class FastWordCountTopology {
         public void nextTuple() {
             String sentence = CHOICES[_rand.nextInt(CHOICES.length)];
             _collector.emit(new Values(sentence), sentence);
-            LOG.debug("Emit tuple: {}, id:{}",new Values(sentence),sentence);
+            LOG.debug("Emit tuple: {}, id:{}", new Values(sentence), sentence);
         }
 
         @Override
@@ -99,7 +92,7 @@ public class FastWordCountTopology {
         @Override
         public void execute(Tuple tuple, BasicOutputCollector collector) {
             String sentence = tuple.getString(0);
-            for (String word: sentence.split("\\s+")) {
+            for (String word : sentence.split("\\s+")) {
                 collector.emit(new Values(word, 1));
             }
         }
@@ -118,8 +111,9 @@ public class FastWordCountTopology {
         public void execute(Tuple tuple, BasicOutputCollector collector) {
             String word = tuple.getString(0);
             Integer count = counts.get(word);
-            if (count == null)
+            if (count == null) {
                 count = 0;
+            }
             count++;
             counts.put(word, count);
             collector.emit(new Values(word, count));
@@ -134,26 +128,26 @@ public class FastWordCountTopology {
     public static void printMetrics(Nimbus.Client client, String name) throws Exception {
         ClusterSummary summary = client.getClusterInfo();
         String id = null;
-        for (TopologySummary ts: summary.get_topologies()) {
+        for (TopologySummary ts : summary.get_topologies()) {
             if (name.equals(ts.get_name())) {
                 id = ts.get_id();
             }
         }
         if (id == null) {
-            throw new Exception("Could not find a topology named "+name);
+            throw new Exception("Could not find a topology named " + name);
         }
         TopologyInfo info = client.getTopologyInfo(id);
         int uptime = info.get_uptime_secs();
         long acked = 0;
         long failed = 0;
         double weightedAvgTotal = 0.0;
-        for (ExecutorSummary exec: info.get_executors()) {
+        for (ExecutorSummary exec : info.get_executors()) {
             if ("spout".equals(exec.get_component_id())) {
                 SpoutStats stats = exec.get_stats().get_specific().get_spout();
                 Map<String, Long> failedMap = stats.get_failed().get(":all-time");
                 Map<String, Long> ackedMap = stats.get_acked().get(":all-time");
                 Map<String, Double> avgLatMap = stats.get_complete_ms_avg().get(":all-time");
-                for (String key: ackedMap.keySet()) {
+                for (String key : ackedMap.keySet()) {
                     if (failedMap != null) {
                         Long tmp = failedMap.get(key);
                         if (tmp != null) {
@@ -167,8 +161,8 @@ public class FastWordCountTopology {
                 }
             }
         }
-        double avgLatency = weightedAvgTotal/acked;
-        System.out.println("uptime: "+uptime+" acked: "+acked+" avgLatency: "+avgLatency+" acked/sec: "+(((double)acked)/uptime+" failed: "+failed));
+        double avgLatency = weightedAvgTotal / acked;
+        System.out.println("uptime: " + uptime + " acked: " + acked + " avgLatency: " + avgLatency + " acked/sec: " + (((double) acked) / uptime + " failed: " + failed));
     }
 
     public static void kill(Nimbus.Client client, String name) throws Exception {

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a0fc8708/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/topology/MockMetadataChangeNotifyService.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/topology/MockMetadataChangeNotifyService.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/topology/MockMetadataChangeNotifyService.java
index a8fb6d0..1494313 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/topology/MockMetadataChangeNotifyService.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/topology/MockMetadataChangeNotifyService.java
@@ -19,10 +19,7 @@
 
 package org.apache.eagle.alert.engine.topology;
 
-import java.util.Arrays;
-import java.util.List;
-import java.util.Map;
-
+import com.typesafe.config.Config;
 import org.apache.eagle.alert.coordination.model.AlertBoltSpec;
 import org.apache.eagle.alert.coordination.model.PublishSpec;
 import org.apache.eagle.alert.coordination.model.RouterSpec;
@@ -35,23 +32,25 @@ import org.codehaus.jackson.type.TypeReference;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.typesafe.config.Config;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
 
 /**
  * Since 5/4/16.
  */
-@SuppressWarnings({"serial"})
+@SuppressWarnings( {"serial"})
 public class MockMetadataChangeNotifyService extends AbstractMetadataChangeNotifyService implements Runnable {
     private final static Logger LOG = LoggerFactory.getLogger(MockMetadataChangeNotifyService.class);
     @SuppressWarnings("unused")
-    private static final String[] topics = new String[]{"testTopic3", "testTopic4", "testTopic5"};
+    private static final String[] topics = new String[] {"testTopic3", "testTopic4", "testTopic5"};
     @SuppressWarnings("unused")
     private String topologyName;
     @SuppressWarnings("unused")
     private String spoutId;
     private Map<String, StreamDefinition> sds;
 
-    public MockMetadataChangeNotifyService(String topologyName, String spoutId){
+    public MockMetadataChangeNotifyService(String topologyName, String spoutId) {
         this.topologyName = topologyName;
         this.spoutId = spoutId;
     }
@@ -83,28 +82,29 @@ public class MockMetadataChangeNotifyService extends AbstractMetadataChangeNotif
         }
     }
 
-    private Map<String, StreamDefinition> defineStreamDefinitions(){
+    private Map<String, StreamDefinition> defineStreamDefinitions() {
         Map<String, StreamDefinition> sds = MetadataSerDeser.deserialize(getClass().getResourceAsStream("/testStreamDefinitionsSpec.json"),
-                new TypeReference<Map<String, StreamDefinition>>() {});
+            new TypeReference<Map<String, StreamDefinition>>() {
+            });
         return sds;
     }
 
-    private void notifySpout(List<String> plainStringTopics, List<String> jsonStringTopics){
+    private void notifySpout(List<String> plainStringTopics, List<String> jsonStringTopics) {
         SpoutSpec newSpec = MetadataSerDeser.deserialize(getClass().getResourceAsStream("/testSpoutSpec.json"), SpoutSpec.class);
         notifySpout(newSpec, sds);
     }
 
-    private void populateRouterMetadata(){
+    private void populateRouterMetadata() {
         RouterSpec boltSpec = MetadataSerDeser.deserialize(getClass().getResourceAsStream("/testStreamRouterBoltSpec.json"), RouterSpec.class);
         notifyStreamRouterBolt(boltSpec, sds);
     }
 
-    private void populateAlertBoltSpec(){
+    private void populateAlertBoltSpec() {
         AlertBoltSpec spec = MetadataSerDeser.deserialize(getClass().getResourceAsStream("/testAlertBoltSpec.json"), AlertBoltSpec.class);
         notifyAlertBolt(spec, sds);
     }
 
-    private void notifyAlertPublishBolt(){
+    private void notifyAlertPublishBolt() {
         PublishSpec spec = MetadataSerDeser.deserialize(getClass().getResourceAsStream("/testPublishSpec.json"), PublishSpec.class);
         notifyAlertPublishBolt(spec, sds);
     }

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a0fc8708/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/topology/SendData2KafkaTest.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/topology/SendData2KafkaTest.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/topology/SendData2KafkaTest.java
index fa2f0a6..0ade06a 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/topology/SendData2KafkaTest.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/topology/SendData2KafkaTest.java
@@ -18,6 +18,12 @@
  */
 package org.apache.eagle.alert.engine.topology;
 
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.junit.Ignore;
+import org.junit.Test;
+
 import java.io.FileWriter;
 import java.io.PrintWriter;
 import java.io.Serializable;
@@ -27,22 +33,14 @@ import java.nio.file.Paths;
 import java.util.List;
 import java.util.Properties;
 
-import org.apache.kafka.clients.producer.KafkaProducer;
-import org.apache.kafka.clients.producer.ProducerRecord;
-import org.apache.kafka.common.serialization.StringSerializer;
-import org.junit.Ignore;
-import org.junit.Test;
-
 /**
  * Created on 3/12/16.
  */
-@SuppressWarnings({"serial", "unchecked", "rawtypes", "resource"})
-public class SendData2KafkaTest implements Serializable{
+@SuppressWarnings( {"serial", "unchecked", "rawtypes", "resource"})
+public class SendData2KafkaTest implements Serializable {
     /**
-     *
      * {"timestamp": 10000, "metric": "esErrorLogEvent", "instanceUuid": "vm-InstanceId1", "host":"test-host1", "type":"nova", "stack":"NullPointException-..........."}
-     {"timestamp": 10000, "metric": "instanceFailureLogEvent", "instanceUuid": "vm-InstanceId1", "message":"instance boot failure for user liasu!"}
-     *
+     * {"timestamp": 10000, "metric": "instanceFailureLogEvent", "instanceUuid": "vm-InstanceId1", "message":"instance boot failure for user liasu!"}
      */
     @Test
     @Ignore

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a0fc8708/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/topology/TestBolt.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/topology/TestBolt.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/topology/TestBolt.java
index 0ad6924..1c375fa 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/topology/TestBolt.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/topology/TestBolt.java
@@ -20,27 +20,27 @@
 package org.apache.eagle.alert.engine.topology;
 
 
-import java.util.Map;
-
-import org.junit.Ignore;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
 import backtype.storm.task.OutputCollector;
 import backtype.storm.task.TopologyContext;
 import backtype.storm.topology.OutputFieldsDeclarer;
 import backtype.storm.topology.base.BaseRichBolt;
 import backtype.storm.tuple.Tuple;
+import org.junit.Ignore;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
 
 /**
  * Created by yonzhang on 4/7/16.
  */
 @Ignore
-@SuppressWarnings({"rawtypes", "serial"})
+@SuppressWarnings( {"rawtypes", "serial"})
 public class TestBolt extends BaseRichBolt {
     private static final Logger LOG = LoggerFactory.getLogger(TestBolt.class);
     private OutputCollector collector;
     private long count;
+
     @Override
     public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
         this.collector = collector;
@@ -50,7 +50,7 @@ public class TestBolt extends BaseRichBolt {
     public void execute(Tuple input) {
         LOG.info("data is coming: " + input);
         count++;
-        if(count % 10 == 0){
+        if (count % 10 == 0) {
             LOG.info("count = " + count);
         }
         collector.ack(input);

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a0fc8708/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/topology/TestByteBuffer.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/topology/TestByteBuffer.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/topology/TestByteBuffer.java
index 666d167..29986f1 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/topology/TestByteBuffer.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/topology/TestByteBuffer.java
@@ -19,26 +19,26 @@
 
 package org.apache.eagle.alert.engine.topology;
 
-import java.nio.ByteBuffer;
-
 import org.junit.Assert;
 import org.junit.Test;
 
+import java.nio.ByteBuffer;
+
 /**
  * Storm 1.0.0 uses ByteBuffer, we need test it
  */
 public class TestByteBuffer {
     @Test
-    public void testBB(){
+    public void testBB() {
         ByteBuffer bb = ByteBuffer.allocate(100);
-        bb.put((byte)12);
+        bb.put((byte) 12);
         Assert.assertTrue(bb.hasArray());
         bb.rewind();
         Assert.assertEquals(12, bb.get());
     }
 
     @Test
-    public void testMultipleStrings() throws Exception{
+    public void testMultipleStrings() throws Exception {
         ByteBuffer bb = ByteBuffer.allocate(100);
         bb.put("abc".getBytes("UTF-8"));
         bb.put("xyz".getBytes("UTF-8"));