You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@eagle.apache.org by ha...@apache.org on 2016/09/08 07:14:04 UTC

[03/18] incubator-eagle git commit: [EAGLE-530] Fix checkstyle problems on eagle-alert module and enable failOnViolation

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a0fc8708/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/topology/TestMetadataSpecSerDeser.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/topology/TestMetadataSpecSerDeser.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/topology/TestMetadataSpecSerDeser.java
index 65b16d9..6804805 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/topology/TestMetadataSpecSerDeser.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/topology/TestMetadataSpecSerDeser.java
@@ -19,30 +19,8 @@
 
 package org.apache.eagle.alert.engine.topology;
 
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
-import java.util.Set;
-
-import org.apache.eagle.alert.coordination.model.AlertBoltSpec;
-import org.apache.eagle.alert.coordination.model.Kafka2TupleMetadata;
-import org.apache.eagle.alert.coordination.model.PolicyWorkerQueue;
-import org.apache.eagle.alert.coordination.model.RouterSpec;
-import org.apache.eagle.alert.coordination.model.SpoutSpec;
-import org.apache.eagle.alert.coordination.model.StreamRepartitionMetadata;
-import org.apache.eagle.alert.coordination.model.StreamRepartitionStrategy;
-import org.apache.eagle.alert.coordination.model.StreamRouterSpec;
-import org.apache.eagle.alert.coordination.model.Tuple2StreamMetadata;
-import org.apache.eagle.alert.coordination.model.WorkSlot;
-import org.apache.eagle.alert.engine.coordinator.PolicyDefinition;
-import org.apache.eagle.alert.engine.coordinator.StreamColumn;
-import org.apache.eagle.alert.engine.coordinator.StreamDefinition;
-import org.apache.eagle.alert.engine.coordinator.StreamPartition;
-import org.apache.eagle.alert.engine.coordinator.StreamSortSpec;
+import org.apache.eagle.alert.coordination.model.*;
+import org.apache.eagle.alert.engine.coordinator.*;
 import org.apache.eagle.alert.engine.evaluator.PolicyStreamHandlers;
 import org.apache.eagle.alert.engine.utils.MetadataSerDeser;
 import org.codehaus.jackson.type.TypeReference;
@@ -50,19 +28,21 @@ import org.joda.time.Period;
 import org.junit.Assert;
 import org.junit.Test;
 
+import java.util.*;
+
 /**
  * Since 5/6/16.
  */
 public class TestMetadataSpecSerDeser {
-    private String getStreamNameByTopic(String topic){
+    private String getStreamNameByTopic(String topic) {
         return topic + "Stream";
     }
 
     @Test
-    public void testStreamDefinitions(){
+    public void testStreamDefinitions() {
         Map<String, StreamDefinition> sds = new HashMap<>();
         List<String> topics = Arrays.asList("testTopic3", "testTopic4", "testTopic5");
-        for(String topic : topics) {
+        for (String topic : topics) {
             String streamId = getStreamNameByTopic(topic);
             if (topic.equals("testTopic3") || topic.equals("testTopic4")) {
                 StreamDefinition schema = new StreamDefinition();
@@ -72,7 +52,7 @@ public class TestMetadataSpecSerDeser {
                 column.setType(StreamColumn.Type.STRING);
                 schema.setColumns(Collections.singletonList(column));
                 sds.put(schema.getStreamId(), schema);
-            }else if(topic.equals("testTopic5")){
+            } else if (topic.equals("testTopic5")) {
                 StreamDefinition schema = new StreamDefinition();
                 schema.setStreamId(streamId);
                 StreamColumn column = new StreamColumn();
@@ -86,26 +66,27 @@ public class TestMetadataSpecSerDeser {
         String json = MetadataSerDeser.serialize(sds);
         System.out.println(json);
 
-        Map<String, StreamDefinition> deserializedSpec = MetadataSerDeser.deserialize(json, new TypeReference<Map<String, StreamDefinition>>(){});
+        Map<String, StreamDefinition> deserializedSpec = MetadataSerDeser.deserialize(json, new TypeReference<Map<String, StreamDefinition>>() {
+        });
         Assert.assertEquals(3, deserializedSpec.size());
     }
 
     @SuppressWarnings("unused")
     @Test
-    public void testSpoutSpec(){
+    public void testSpoutSpec() {
         String topologyName = "testTopology";
         String spoutId = "alertEngineSpout";
         List<String> plainStringTopics = Arrays.asList("testTopic3", "testTopic4");
         List<String> jsonStringTopics = Arrays.asList("testTopic5");
         Map<String, Kafka2TupleMetadata> kafka2TupleMetadataMap = new HashMap<>();
-        for(String topic : plainStringTopics) {
+        for (String topic : plainStringTopics) {
             Kafka2TupleMetadata kafka2TupleMetadata = new Kafka2TupleMetadata();
             kafka2TupleMetadata.setName(topic);
             kafka2TupleMetadata.setTopic(topic);
             kafka2TupleMetadata.setSchemeCls("org.apache.eagle.alert.engine.scheme.PlainStringScheme");
             kafka2TupleMetadataMap.put(topic, kafka2TupleMetadata);
         }
-        for(String topic : jsonStringTopics){
+        for (String topic : jsonStringTopics) {
             Kafka2TupleMetadata kafka2TupleMetadata = new Kafka2TupleMetadata();
             kafka2TupleMetadata.setName(topic);
             kafka2TupleMetadata.setTopic(topic);
@@ -115,7 +96,7 @@ public class TestMetadataSpecSerDeser {
 
         // construct Tuple2StreamMetadata
         Map<String, Tuple2StreamMetadata> tuple2StreamMetadataMap = new HashMap<>();
-        for(String topic : plainStringTopics) {
+        for (String topic : plainStringTopics) {
             String streamId = getStreamNameByTopic(topic);
             Tuple2StreamMetadata tuple2StreamMetadata = new Tuple2StreamMetadata();
             Set<String> activeStreamNames = new HashSet<>();
@@ -128,7 +109,7 @@ public class TestMetadataSpecSerDeser {
             tuple2StreamMetadataMap.put(topic, tuple2StreamMetadata);
         }
 
-        for(String topic : jsonStringTopics) {
+        for (String topic : jsonStringTopics) {
             String streamId = getStreamNameByTopic(topic);
             Tuple2StreamMetadata tuple2StreamMetadata = new Tuple2StreamMetadata();
             Set<String> activeStreamNames = new HashSet<>();
@@ -143,7 +124,7 @@ public class TestMetadataSpecSerDeser {
 
         // construct StreamRepartitionMetadata
         Map<String, List<StreamRepartitionMetadata>> streamRepartitionMetadataMap = new HashMap<>();
-        for(String topic : plainStringTopics) {
+        for (String topic : plainStringTopics) {
             String streamId = getStreamNameByTopic(topic);
             StreamRepartitionMetadata streamRepartitionMetadata = new StreamRepartitionMetadata(topic, "defaultStringStream");
             StreamRepartitionStrategy gs = new StreamRepartitionStrategy();
@@ -164,7 +145,7 @@ public class TestMetadataSpecSerDeser {
             streamRepartitionMetadataMap.put(topic, Arrays.asList(streamRepartitionMetadata));
         }
 
-        for(String topic : jsonStringTopics) {
+        for (String topic : jsonStringTopics) {
             String streamId = getStreamNameByTopic(topic);
             StreamRepartitionMetadata streamRepartitionMetadata = new StreamRepartitionMetadata(topic, "defaultStringStream");
             StreamRepartitionStrategy gs = new StreamRepartitionStrategy();
@@ -195,10 +176,10 @@ public class TestMetadataSpecSerDeser {
     }
 
     @Test
-    public void testRouterBoltSpec(){
+    public void testRouterBoltSpec() {
         List<String> topics = Arrays.asList("testTopic3", "testTopic4", "testTopic5");
         RouterSpec boltSpec = new RouterSpec();
-        for(String topic : topics) {
+        for (String topic : topics) {
             String streamId = getStreamNameByTopic(topic);
             // StreamPartition, groupby col1 for stream cpuUsageStream
             StreamPartition sp = new StreamPartition();
@@ -229,11 +210,11 @@ public class TestMetadataSpecSerDeser {
     }
 
     @Test
-    public void testAlertBoltSpec(){
+    public void testAlertBoltSpec() {
         String topologyName = "testTopology";
         AlertBoltSpec spec = new AlertBoltSpec();
         List<String> topics = Arrays.asList("testTopic3", "testTopic4", "testTopic5");
-        for(String topic : topics) {
+        for (String topic : topics) {
             String streamId = getStreamNameByTopic(topic);
 
             // construct StreamPartition

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a0fc8708/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/topology/TestStormCustomGroupingRouting.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/topology/TestStormCustomGroupingRouting.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/topology/TestStormCustomGroupingRouting.java
index 0466ed3..c612d28 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/topology/TestStormCustomGroupingRouting.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/topology/TestStormCustomGroupingRouting.java
@@ -19,15 +19,6 @@
 
 package org.apache.eagle.alert.engine.topology;
 
-import java.io.Serializable;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import org.junit.Ignore;
-import org.junit.Test;
-
 import backtype.storm.Config;
 import backtype.storm.LocalCluster;
 import backtype.storm.generated.GlobalStreamId;
@@ -42,45 +33,54 @@ import backtype.storm.topology.base.BaseRichBolt;
 import backtype.storm.topology.base.BaseRichSpout;
 import backtype.storm.tuple.Fields;
 import backtype.storm.tuple.Tuple;
+import org.junit.Ignore;
+import org.junit.Test;
+
+import java.io.Serializable;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
 
 /**
  * Since 4/29/16.
  */
-@SuppressWarnings({ "rawtypes", "serial" })
+@SuppressWarnings( {"rawtypes", "serial"})
 public class TestStormCustomGroupingRouting implements Serializable {
     @Ignore
     @Test
-    public void testRoutingByCustomGrouping() throws Exception{
+    public void testRoutingByCustomGrouping() throws Exception {
         Config conf = new Config();
         conf.setNumWorkers(2); // use two worker processes
         TopologyBuilder topologyBuilder = new TopologyBuilder();
         topologyBuilder.setSpout("blue-spout", new BlueSpout()); // parallelism hint
 
         topologyBuilder.setBolt("green-bolt-1", new GreenBolt(0)).setNumTasks(2)
-                .customGrouping("blue-spout", new CustomStreamGrouping(){
-                    int count = 0;
-                    List<Integer> targetTask;
-                    @Override
-                    public void prepare(WorkerTopologyContext context, GlobalStreamId stream, List<Integer> targetTasks) {
-                        this.targetTask = targetTasks;
-                    }
-
-                    @Override
-                    public List<Integer> chooseTasks(int taskId, List<Object> values) {
-                        if(count % 2 == 0) {
-                            count++;
-                            return Arrays.asList(targetTask.get(0));
-                        }else{
-                            count++;
-                            return Arrays.asList(targetTask.get(1));
-                        }
+            .customGrouping("blue-spout", new CustomStreamGrouping() {
+                int count = 0;
+                List<Integer> targetTask;
+
+                @Override
+                public void prepare(WorkerTopologyContext context, GlobalStreamId stream, List<Integer> targetTasks) {
+                    this.targetTask = targetTasks;
+                }
+
+                @Override
+                public List<Integer> chooseTasks(int taskId, List<Object> values) {
+                    if (count % 2 == 0) {
+                        count++;
+                        return Arrays.asList(targetTask.get(0));
+                    } else {
+                        count++;
+                        return Arrays.asList(targetTask.get(1));
                     }
-                });
+                }
+            });
 
         LocalCluster cluster = new LocalCluster();
         cluster.submitTopology("mytopology", new HashMap(), topologyBuilder.createTopology());
 
-        while(true) {
+        while (true) {
             try {
                 Thread.sleep(1000);
             } catch (Exception e) {
@@ -92,8 +92,10 @@ public class TestStormCustomGroupingRouting implements Serializable {
     private static class BlueSpout extends BaseRichSpout {
         int count = 0;
         private SpoutOutputCollector collector;
-        public BlueSpout(){
+
+        public BlueSpout() {
         }
+
         @Override
         public void declareOutputFields(OutputFieldsDeclarer declarer) {
             declarer.declare(new Fields("a"));
@@ -106,16 +108,16 @@ public class TestStormCustomGroupingRouting implements Serializable {
 
         @Override
         public void nextTuple() {
-            if(count % 2 == 0) {
+            if (count % 2 == 0) {
                 this.collector.emit(Arrays.asList("testdata" + count));
                 count++;
-            }else{
+            } else {
                 this.collector.emit(Arrays.asList("testdata" + count));
                 count++;
             }
-            try{
+            try {
                 Thread.sleep(10000);
-            }catch(Exception ex){
+            } catch (Exception ex) {
 
             }
         }
@@ -123,9 +125,11 @@ public class TestStormCustomGroupingRouting implements Serializable {
 
     private static class GreenBolt extends BaseRichBolt {
         private int id;
-        public GreenBolt(int id){
+
+        public GreenBolt(int id) {
             this.id = id;
         }
+
         @Override
         public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
         }

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a0fc8708/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/topology/TestStormParallelism.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/topology/TestStormParallelism.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/topology/TestStormParallelism.java
index 279a041..c235e73 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/topology/TestStormParallelism.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/topology/TestStormParallelism.java
@@ -19,12 +19,6 @@
 
 package org.apache.eagle.alert.engine.topology;
 
-import java.util.HashMap;
-import java.util.Map;
-
-import org.junit.Ignore;
-import org.junit.Test;
-
 import backtype.storm.Config;
 import backtype.storm.LocalCluster;
 import backtype.storm.spout.SpoutOutputCollector;
@@ -36,38 +30,43 @@ import backtype.storm.topology.base.BaseRichBolt;
 import backtype.storm.topology.base.BaseRichSpout;
 import backtype.storm.tuple.Fields;
 import backtype.storm.tuple.Tuple;
+import org.junit.Ignore;
+import org.junit.Test;
+
+import java.util.HashMap;
+import java.util.Map;
 
 /**
  * Since 4/29/16.
  */
-@SuppressWarnings({"serial", "rawtypes"})
+@SuppressWarnings( {"serial", "rawtypes"})
 public class TestStormParallelism {
     /**
      * When run this test, please check the following through jstack and log
      * 1) for blue-spout, num of executors is 2, # of tasks is 2
-     *
+     * <p>
      * Expected:
-     *
+     * <p>
      * a. 2 threads uniquely named Thread-*-blue-spout-executor[*,*]
      * b. each thread will have single task
-     *
+     * <p>
      * 2) for green-bolt, num of executors is 2, # of tasks is 4
-     *
+     * <p>
      * Expected:
-     *
+     * <p>
      * a. 2 threads uniquely named Thread-*-green-bolt-executor[*,*]
      * b. each thread will have 2 tasks
-     *
+     * <p>
      * 3) for yellow-bolt, num of executors is 6, # of tasks is 6
-     *
+     * <p>
      * Expected:
-     *
+     * <p>
      * a. 6 threads uniquely named Thread-*-yellow-bolt-executor[*,*]
      * b. each thread will have 1 tasks
-     *
-     *
+     * <p>
+     * <p>
      * Continue to think:
-     *
+     * <p>
      * For alter engine, if we use multiple tasks per component instead of one task per component,
      * what will the parallelism mechanism affect?
      *
@@ -75,23 +74,23 @@ public class TestStormParallelism {
      */
     @Ignore
     @Test
-    public void testParallelism() throws Exception{
+    public void testParallelism() throws Exception {
         Config conf = new Config();
         conf.setNumWorkers(2); // use two worker processes
         TopologyBuilder topologyBuilder = new TopologyBuilder();
         topologyBuilder.setSpout("blue-spout", new BlueSpout(), 2); // parallelism hint
 
         topologyBuilder.setBolt("green-bolt", new GreenBolt(), 2)
-                .setNumTasks(4)
-                .shuffleGrouping("blue-spout");
+            .setNumTasks(4)
+            .shuffleGrouping("blue-spout");
 
         topologyBuilder.setBolt("yellow-bolt", new YellowBolt(), 6)
-                .shuffleGrouping("green-bolt");
+            .shuffleGrouping("green-bolt");
 
         LocalCluster cluster = new LocalCluster();
         cluster.submitTopology("mytopology", new HashMap(), topologyBuilder.createTopology());
 
-        while(true) {
+        while (true) {
             try {
                 Thread.sleep(1000);
             } catch (Exception e) {
@@ -100,10 +99,12 @@ public class TestStormParallelism {
         }
     }
 
-    private static class BlueSpout extends BaseRichSpout{
+    private static class BlueSpout extends BaseRichSpout {
         static int count = 0;
-        public BlueSpout(){
+
+        public BlueSpout() {
         }
+
         @Override
         public void declareOutputFields(OutputFieldsDeclarer declarer) {
             declarer.declare(new Fields("a"));
@@ -121,8 +122,9 @@ public class TestStormParallelism {
         }
     }
 
-    private static class GreenBolt extends BaseRichBolt{
+    private static class GreenBolt extends BaseRichBolt {
         static int count;
+
         @Override
         public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
             count++;
@@ -140,8 +142,9 @@ public class TestStormParallelism {
         }
     }
 
-    private static class YellowBolt extends BaseRichBolt{
+    private static class YellowBolt extends BaseRichBolt {
         static int count;
+
         @Override
         public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
             count++;

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a0fc8708/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/topology/TestStormStreamIdRouting.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/topology/TestStormStreamIdRouting.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/topology/TestStormStreamIdRouting.java
index 9d3892a..c27edf4 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/topology/TestStormStreamIdRouting.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/topology/TestStormStreamIdRouting.java
@@ -19,13 +19,6 @@
 
 package org.apache.eagle.alert.engine.topology;
 
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.Map;
-
-import org.junit.Ignore;
-import org.junit.Test;
-
 import backtype.storm.Config;
 import backtype.storm.LocalCluster;
 import backtype.storm.spout.SpoutOutputCollector;
@@ -37,30 +30,36 @@ import backtype.storm.topology.base.BaseRichBolt;
 import backtype.storm.topology.base.BaseRichSpout;
 import backtype.storm.tuple.Fields;
 import backtype.storm.tuple.Tuple;
+import org.junit.Ignore;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
 
 
 /**
  * Since 4/29/16.
  */
-@SuppressWarnings({"serial", "rawtypes", "unused"})
+@SuppressWarnings( {"serial", "rawtypes", "unused"})
 public class TestStormStreamIdRouting {
     @Ignore
     @Test
-    public void testRoutingByStreamId() throws Exception{
+    public void testRoutingByStreamId() throws Exception {
         Config conf = new Config();
         conf.setNumWorkers(2); // use two worker processes
         TopologyBuilder topologyBuilder = new TopologyBuilder();
         topologyBuilder.setSpout("blue-spout", new BlueSpout()); // parallelism hint
 
         topologyBuilder.setBolt("green-bolt-1", new GreenBolt(1))
-                .shuffleGrouping("blue-spout", "green-bolt-stream-1");
+            .shuffleGrouping("blue-spout", "green-bolt-stream-1");
         topologyBuilder.setBolt("green-bolt-2", new GreenBolt(2))
-                .shuffleGrouping("blue-spout", "green-bolt-stream-2");
+            .shuffleGrouping("blue-spout", "green-bolt-stream-2");
 
         LocalCluster cluster = new LocalCluster();
         cluster.submitTopology("mytopology", new HashMap(), topologyBuilder.createTopology());
 
-        while(true) {
+        while (true) {
             try {
                 Thread.sleep(1000);
             } catch (Exception e) {
@@ -72,8 +71,10 @@ public class TestStormStreamIdRouting {
     private static class BlueSpout extends BaseRichSpout {
         int count = 0;
         private SpoutOutputCollector collector;
-        public BlueSpout(){
+
+        public BlueSpout() {
         }
+
         @Override
         public void declareOutputFields(OutputFieldsDeclarer declarer) {
             declarer.declareStream("green-bolt-stream-1", new Fields("a"));
@@ -87,16 +88,16 @@ public class TestStormStreamIdRouting {
 
         @Override
         public void nextTuple() {
-            if(count % 2 == 0) {
+            if (count % 2 == 0) {
                 this.collector.emit("green-bolt-stream-1", Arrays.asList("testdata" + count));
                 count++;
-            }else{
+            } else {
                 this.collector.emit("green-bolt-stream-2", Arrays.asList("testdata" + count));
                 count++;
             }
-            try{
+            try {
                 Thread.sleep(10000);
-            }catch(Exception ex){
+            } catch (Exception ex) {
 
             }
         }
@@ -104,9 +105,11 @@ public class TestStormStreamIdRouting {
 
     private static class GreenBolt extends BaseRichBolt {
         private int id;
-        public GreenBolt(int id){
+
+        public GreenBolt(int id) {
             this.id = id;
         }
+
         @Override
         public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
         }
@@ -122,7 +125,7 @@ public class TestStormStreamIdRouting {
         }
     }
 
-    private static class YellowBolt extends BaseRichBolt{
+    private static class YellowBolt extends BaseRichBolt {
         @Override
         public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
         }

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a0fc8708/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/topology/TestTuple2StreamConverter.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/topology/TestTuple2StreamConverter.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/topology/TestTuple2StreamConverter.java
index b0e5c4a..19ab1da 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/topology/TestTuple2StreamConverter.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/topology/TestTuple2StreamConverter.java
@@ -19,26 +19,20 @@
 
 package org.apache.eagle.alert.engine.topology;
 
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
-import java.util.Set;
-
 import org.apache.eagle.alert.coordination.model.Tuple2StreamConverter;
 import org.apache.eagle.alert.coordination.model.Tuple2StreamMetadata;
 import org.junit.Assert;
 import org.junit.Test;
 
+import java.util.*;
+
 /**
  * Since 5/3/16.
  */
 public class TestTuple2StreamConverter {
-    @SuppressWarnings({ "unchecked", "rawtypes" })
+    @SuppressWarnings( {"unchecked", "rawtypes"})
     @Test
-    public void test(){
+    public void test() {
         Tuple2StreamMetadata metadata = new Tuple2StreamMetadata();
         Set activeStreamNames = new HashSet<>();
         activeStreamNames.add("defaultStringStream");

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a0fc8708/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/topology/TestUnitTopologyMain.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/topology/TestUnitTopologyMain.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/topology/TestUnitTopologyMain.java
index 7132f5a..9647830 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/topology/TestUnitTopologyMain.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/topology/TestUnitTopologyMain.java
@@ -19,36 +19,35 @@
 
 package org.apache.eagle.alert.engine.topology;
 
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
 import org.apache.eagle.alert.engine.runner.UnitTopologyRunner;
 import org.junit.Ignore;
 import org.junit.Test;
 
-import com.typesafe.config.Config;
-import com.typesafe.config.ConfigFactory;
-
 /**
  * Since 5/4/16.
  */
 public class TestUnitTopologyMain {
     @Ignore
     @Test
-    public void testTopologyRun(){
+    public void testTopologyRun() {
         testTopologyRun("/application-test.conf");
     }
 
-    public void testTopologyRun(String configResourceName){
+    public void testTopologyRun(String configResourceName) {
         ConfigFactory.invalidateCaches();
         System.setProperty("config.resource", configResourceName);
-        System.out.print("Set config.resource = "+configResourceName);
+        System.out.print("Set config.resource = " + configResourceName);
         Config config = ConfigFactory.load();
         String topologyId = config.getString("topology.name");
         MockMetadataChangeNotifyService changeNotifyService =
-                new MockMetadataChangeNotifyService(topologyId,"alertEngineSpout");
-        new UnitTopologyRunner(changeNotifyService).run(topologyId,config);
+            new MockMetadataChangeNotifyService(topologyId, "alertEngineSpout");
+        new UnitTopologyRunner(changeNotifyService).run(topologyId, config);
     }
 
-    public static void main(String[] args){
-        if(args.length>0) {
+    public static void main(String[] args) {
+        if (args.length > 0) {
             new TestUnitTopologyMain().testTopologyRun(args[0]);
         } else {
             new TestUnitTopologyMain().testTopologyRun();

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a0fc8708/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/utils/CompressionUtilsTest.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/utils/CompressionUtilsTest.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/utils/CompressionUtilsTest.java
index 8317943..3bfb1ab 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/utils/CompressionUtilsTest.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/utils/CompressionUtilsTest.java
@@ -16,13 +16,13 @@
  */
 package org.apache.eagle.alert.engine.utils;
 
-import java.io.IOException;
-
 import org.junit.Assert;
 import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.IOException;
+
 public class CompressionUtilsTest {
     private final static Logger LOG = LoggerFactory.getLogger(CompressionUtilsTest.class);
 
@@ -33,11 +33,11 @@ public class CompressionUtilsTest {
         byte[] compressed = CompressionUtils.compress(original);
         byte[] decompressed = CompressionUtils.decompress(compressed);
 
-        LOG.info("original size: {}",original.length);
-        LOG.info("compressed size: {}",compressed.length);
-        LOG.info("decompressed size: {}",decompressed.length);
+        LOG.info("original size: {}", original.length);
+        LOG.info("compressed size: {}", compressed.length);
+        LOG.info("decompressed size: {}", decompressed.length);
 
         String decompressedValue = new String(decompressed);
-        Assert.assertEquals(value,decompressedValue);
+        Assert.assertEquals(value, decompressedValue);
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a0fc8708/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/utils/TimePeriodUtilsTest.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/utils/TimePeriodUtilsTest.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/utils/TimePeriodUtilsTest.java
index cad53d4..b74e807 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/utils/TimePeriodUtilsTest.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/utils/TimePeriodUtilsTest.java
@@ -16,8 +16,6 @@
  */
 package org.apache.eagle.alert.engine.utils;
 
-import java.text.ParseException;
-
 import org.apache.eagle.alert.utils.DateTimeUtil;
 import org.apache.eagle.alert.utils.TimePeriodUtils;
 import org.joda.time.Period;
@@ -25,6 +23,8 @@ import org.joda.time.Seconds;
 import org.junit.Assert;
 import org.junit.Test;
 
+import java.text.ParseException;
+
 public class TimePeriodUtilsTest {
     @Test
     public void testJodaTimePeriod() throws ParseException {
@@ -48,44 +48,44 @@ public class TimePeriodUtilsTest {
 
         Period period = new Period("PT15m");
         Seconds seconds = period.toStandardSeconds();
-        Assert.assertEquals(15*60,seconds.getSeconds());
+        Assert.assertEquals(15 * 60, seconds.getSeconds());
 
         long time = DateTimeUtil.humanDateToSeconds("2015-07-01 13:56:12");
         long expect = DateTimeUtil.humanDateToSeconds("2015-07-01 13:45:00");
-        long result = TimePeriodUtils.formatSecondsByPeriod(time,seconds);
-        Assert.assertEquals(expect,result);
+        long result = TimePeriodUtils.formatSecondsByPeriod(time, seconds);
+        Assert.assertEquals(expect, result);
 
         time = DateTimeUtil.humanDateToSeconds("2015-07-01 03:14:59");
         expect = DateTimeUtil.humanDateToSeconds("2015-07-01 03:00:00");
         result = TimePeriodUtils.formatSecondsByPeriod(time, seconds);
-        Assert.assertEquals(expect,result);
+        Assert.assertEquals(expect, result);
 
         time = DateTimeUtil.humanDateToSeconds("2015-07-01 03:14:59");
         expect = DateTimeUtil.humanDateToSeconds("2015-07-01 03:00:00");
         result = TimePeriodUtils.formatSecondsByPeriod(time, seconds);
-        Assert.assertEquals(expect,result);
+        Assert.assertEquals(expect, result);
     }
 
     @Test
-     public void testFormatSecondsByPeriod1H() throws ParseException {
+    public void testFormatSecondsByPeriod1H() throws ParseException {
 
         Period period = new Period("PT1h");
         Seconds seconds = period.toStandardSeconds();
-        Assert.assertEquals(60*60,seconds.getSeconds());
+        Assert.assertEquals(60 * 60, seconds.getSeconds());
 
         long time = DateTimeUtil.humanDateToSeconds("2015-07-01 13:56:12");
         long expect = DateTimeUtil.humanDateToSeconds("2015-07-01 13:00:00");
-        long result = TimePeriodUtils.formatSecondsByPeriod(time,seconds);
-        Assert.assertEquals(expect,result);
+        long result = TimePeriodUtils.formatSecondsByPeriod(time, seconds);
+        Assert.assertEquals(expect, result);
 
         time = DateTimeUtil.humanDateToSeconds("2015-07-01 03:14:59");
         expect = DateTimeUtil.humanDateToSeconds("2015-07-01 03:00:00");
         result = TimePeriodUtils.formatSecondsByPeriod(time, seconds);
-        Assert.assertEquals(expect,result);
+        Assert.assertEquals(expect, result);
 
         time = DateTimeUtil.humanDateToSeconds("2015-07-01 03:30:59");
         expect = DateTimeUtil.humanDateToSeconds("2015-07-01 03:00:00");
         result = TimePeriodUtils.formatSecondsByPeriod(time, seconds);
-        Assert.assertEquals(expect,result);
+        Assert.assertEquals(expect, result);
     }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a0fc8708/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/resources/absence/application-absence.conf
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/resources/absence/application-absence.conf b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/resources/absence/application-absence.conf
index 7094820..11df895 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/resources/absence/application-absence.conf
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/resources/absence/application-absence.conf
@@ -13,16 +13,16 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 {
-  "topology" : {
-    "name" : "alertUnitTopology_1",
+  "topology": {
+    "name": "alertUnitTopology_1",
     "numOfTotalWorkers": 20,
-    "numOfSpoutTasks" : 1,
-    "numOfRouterBolts" : 4,
-    "numOfAlertBolts" : 10,
-    "numOfPublishTasks" : 1,
-    "localMode" : "true"
+    "numOfSpoutTasks": 1,
+    "numOfRouterBolts": 4,
+    "numOfAlertBolts": 10,
+    "numOfPublishTasks": 1,
+    "localMode": "true"
   },
-  "spout" : {
+  "spout": {
     "kafkaBrokerZkQuorum": "sandbox.hortonworks.com:2181",
     "kafkaBrokerZkBasePath": "/brokers",
     "stormKafkaUseSameZkQuorumWithKafkaBroker": true,
@@ -32,29 +32,29 @@
     "stormKafkaStateUpdateIntervalMs": 2000,
     "stormKafkaFetchSizeBytes": 1048586,
   },
-  "zkConfig" : {
-    "zkQuorum" : "sandbox.hortonworks.com:2181",
-    "zkRoot" : "/alert",
-    "zkSessionTimeoutMs" : 10000,
-    "connectionTimeoutMs" : 10000,
-    "zkRetryTimes" : 3,
-    "zkRetryInterval" : 3000
+  "zkConfig": {
+    "zkQuorum": "sandbox.hortonworks.com:2181",
+    "zkRoot": "/alert",
+    "zkSessionTimeoutMs": 10000,
+    "connectionTimeoutMs": 10000,
+    "zkRetryTimes": 3,
+    "zkRetryInterval": 3000
   },
-  "dynamicConfigSource" : {
+  "dynamicConfigSource": {
     "initDelayMillis": 3000,
-    "delayMillis" : 10000
+    "delayMillis": 10000
   },
   "metadataService": {
-	"context" : "/rest",
-	"host" : "localhost",
-	"port" : 8080
+    "context": "/rest",
+    "host": "localhost",
+    "port": 8080
   },
   "coordinatorService": {
-  	"host": "localhost",
-  	"port": "8080",
-  	"context" : "/rest"
+    "host": "localhost",
+    "port": "8080",
+    "context": "/rest"
   },
   "kafkaProducer": {
-  	"bootstrapServers": "sandbox.hortonworks.com:6667"
+    "bootstrapServers": "sandbox.hortonworks.com:6667"
   }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a0fc8708/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/resources/absence/datasources.json
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/resources/absence/datasources.json b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/resources/absence/datasources.json
index 7c2f8d8..ed4d638 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/resources/absence/datasources.json
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/resources/absence/datasources.json
@@ -1,17 +1,17 @@
 [
-	{
-		"name": "absenceAlertDataSource",
-		"type": "KAFKA",
-		"properties": {},
-		"topic": "absenceAlertTopic",
-		"schemeCls": "org.apache.eagle.alert.engine.scheme.JsonScheme",
-		"codec": {
-			"streamNameSelectorProp": {
-				"userProvidedStreamName": "noDataAlertStream"
-			},
-			"streamNameSelectorCls": "org.apache.eagle.alert.engine.scheme.JsonStringStreamNameSelector",
-			"timestampColumn": "timestamp",
-			"timestampFormat": ""
-		}
-	}
+  {
+    "name": "absenceAlertDataSource",
+    "type": "KAFKA",
+    "properties": {},
+    "topic": "absenceAlertTopic",
+    "schemeCls": "org.apache.eagle.alert.engine.scheme.JsonScheme",
+    "codec": {
+      "streamNameSelectorProp": {
+        "userProvidedStreamName": "noDataAlertStream"
+      },
+      "streamNameSelectorCls": "org.apache.eagle.alert.engine.scheme.JsonStringStreamNameSelector",
+      "timestampColumn": "timestamp",
+      "timestampFormat": ""
+    }
+  }
 ]
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a0fc8708/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/resources/absence/policies.json
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/resources/absence/policies.json b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/resources/absence/policies.json
index df04e79..d03c86e 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/resources/absence/policies.json
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/resources/absence/policies.json
@@ -1,24 +1,26 @@
 [
-	{
-		"name": "absenceAlertPolicy",
-		"description": "absenceAlertPolicy",
-		"inputStreams": [
-			"absenceAlertStream"
-		],
-		"outputStreams": [
-			"absenceAlertStream_out"
-		],
-		"definition": {
-			"type": "absencealert",
-			"value": "1,jobID,job1,daily_rule,14:00:00,15:00:00"
-		},
-		"partitionSpec": [
-			{
-				"streamId": "absenceAlertStream",
-				"type": "GROUPBY",
-				"columns" : ["jobID"]
-			}
-		],
-		"parallelismHint": 2
-	}
+  {
+    "name": "absenceAlertPolicy",
+    "description": "absenceAlertPolicy",
+    "inputStreams": [
+      "absenceAlertStream"
+    ],
+    "outputStreams": [
+      "absenceAlertStream_out"
+    ],
+    "definition": {
+      "type": "absencealert",
+      "value": "1,jobID,job1,daily_rule,14:00:00,15:00:00"
+    },
+    "partitionSpec": [
+      {
+        "streamId": "absenceAlertStream",
+        "type": "GROUPBY",
+        "columns": [
+          "jobID"
+        ]
+      }
+    ],
+    "parallelismHint": 2
+  }
 ]
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a0fc8708/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/resources/absence/publishments.json
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/resources/absence/publishments.json b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/resources/absence/publishments.json
index b199e19..fca49e2 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/resources/absence/publishments.json
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/resources/absence/publishments.json
@@ -1,20 +1,20 @@
 [
-	{
-		"name":"test-stream-output",
-		"type":"org.apache.eagle.alert.engine.publisher.impl.AlertEmailPublisher",
-		"policyIds": [
-			"absenceAlertPolicy"
-		],
-		"properties": {
-			"subject":"Eagle Test Alert",
-			"template":"",
-			"sender": "sender@corp.com",
-			"recipients": "services@corp.com",
-			"smtp.server":"smtp.mailhost.com",
-			"connection": "plaintext",
-			"smtp.port": "25"
-		},
-		"dedupIntervalMin" : "PT5M",
-		"serializer" : "org.apache.eagle.alert.engine.publisher.impl.StringEventSerializer"
-	}
+  {
+    "name": "test-stream-output",
+    "type": "org.apache.eagle.alert.engine.publisher.impl.AlertEmailPublisher",
+    "policyIds": [
+      "absenceAlertPolicy"
+    ],
+    "properties": {
+      "subject": "Eagle Test Alert",
+      "template": "",
+      "sender": "sender@corp.com",
+      "recipients": "services@corp.com",
+      "smtp.server": "smtp.mailhost.com",
+      "connection": "plaintext",
+      "smtp.port": "25"
+    },
+    "dedupIntervalMin": "PT5M",
+    "serializer": "org.apache.eagle.alert.engine.publisher.impl.StringEventSerializer"
+  }
 ]
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a0fc8708/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/resources/absence/streamdefinitions.json
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/resources/absence/streamdefinitions.json b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/resources/absence/streamdefinitions.json
index c9cd7b1..4bd7319 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/resources/absence/streamdefinitions.json
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/resources/absence/streamdefinitions.json
@@ -1,29 +1,29 @@
 [
-	{
-		"streamId": "absenceAlertStream",
-		"dataSource": "absenceAlertDataSource",
-		"description": "the data stream for testing absence alert",
-		"validate": false,
-		"timeseries": false,
-		"columns": [
-			{
-				"name": "jobID",
-				"type": "STRING",
-				"defaultValue": "",
-				"required": true
-			},
-			{
-				"name": "timestamp",
-				"type": "LONG",
-				"defaultValue": 0,
-				"required": true
-			},
-			{
-				"name": "status",
-				"type": "STRING",
-				"defaultValue": "running",
-				"required": true
-			}
-		]
-	}
+  {
+    "streamId": "absenceAlertStream",
+    "dataSource": "absenceAlertDataSource",
+    "description": "the data stream for testing absence alert",
+    "validate": false,
+    "timeseries": false,
+    "columns": [
+      {
+        "name": "jobID",
+        "type": "STRING",
+        "defaultValue": "",
+        "required": true
+      },
+      {
+        "name": "timestamp",
+        "type": "LONG",
+        "defaultValue": 0,
+        "required": true
+      },
+      {
+        "name": "status",
+        "type": "STRING",
+        "defaultValue": "running",
+        "required": true
+      }
+    ]
+  }
 ]
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a0fc8708/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/resources/absence/topologies.json
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/resources/absence/topologies.json b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/resources/absence/topologies.json
index afac207..8278b2d 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/resources/absence/topologies.json
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/resources/absence/topologies.json
@@ -1,31 +1,31 @@
 [
-{
-	"name": "alertUnitTopology_1",
-	"numOfSpout":1,
-	"numOfAlertBolt": 10,
-	"numOfGroupBolt": 4,
-	"spoutId": "alertEngineSpout",
-	"groupNodeIds" : [
-		"streamRouterBolt0",
-		"streamRouterBolt1",
-		"streamRouterBolt2",
-		"streamRouterBolt3"
-	],
-	"alertBoltIds": [
-		"alertBolt0",
-		"alertBolt1",
-		"alertBolt2",
-		"alertBolt3",
-		"alertBolt4",
-		"alertBolt5",
-		"alertBolt6",
-		"alertBolt7",
-		"alertBolt8",
-		"alertBolt9"
-	],
-	"pubBoltId" : "alertPublishBolt",
-	"spoutParallelism": 1,
-	"groupParallelism": 1,
-	"alertParallelism": 1
-}
+  {
+    "name": "alertUnitTopology_1",
+    "numOfSpout": 1,
+    "numOfAlertBolt": 10,
+    "numOfGroupBolt": 4,
+    "spoutId": "alertEngineSpout",
+    "groupNodeIds": [
+      "streamRouterBolt0",
+      "streamRouterBolt1",
+      "streamRouterBolt2",
+      "streamRouterBolt3"
+    ],
+    "alertBoltIds": [
+      "alertBolt0",
+      "alertBolt1",
+      "alertBolt2",
+      "alertBolt3",
+      "alertBolt4",
+      "alertBolt5",
+      "alertBolt6",
+      "alertBolt7",
+      "alertBolt8",
+      "alertBolt9"
+    ],
+    "pubBoltId": "alertPublishBolt",
+    "spoutParallelism": 1,
+    "groupParallelism": 1,
+    "alertParallelism": 1
+  }
 ]
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a0fc8708/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/resources/application-test-backup.conf
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/resources/application-test-backup.conf b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/resources/application-test-backup.conf
index 4cfcc75..5b4ff04 100755
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/resources/application-test-backup.conf
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/resources/application-test-backup.conf
@@ -13,17 +13,17 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 {
-  "topology" : {
-    "name" : "alertUnitTopology_bug",
-    "numOfSpoutTasks" : 3,
-    "numOfRouterBolts" : 6,
-    "numOfAlertBolts" : 6,
-    "numOfPublishTasks" : 1,
-    "numOfTotalWorkers":1,
+  "topology": {
+    "name": "alertUnitTopology_bug",
+    "numOfSpoutTasks": 3,
+    "numOfRouterBolts": 6,
+    "numOfAlertBolts": 6,
+    "numOfPublishTasks": 1,
+    "numOfTotalWorkers": 1,
     "messageTimeoutSecs": 30,     // topology.message.timeout.secs: 30 by default
-    "localMode" : true
+    "localMode": true
   },
-  "spout" : {
+  "spout": {
     "kafkaBrokerZkQuorum": "localhost:2181",
     "kafkaBrokerZkBasePath": "/brokers",
     "stormKafkaUseSameZkQuorumWithKafkaBroker": true,
@@ -33,34 +33,34 @@
     "stormKafkaStateUpdateIntervalMs": 2000,
     "stormKafkaFetchSizeBytes": 1048586,
   },
-  "zkConfig" : {
-    "zkQuorum" : "localhost:2181",
-    "zkRoot" : "/alert",
-    "zkSessionTimeoutMs" : 10000,
-    "connectionTimeoutMs" : 10000,
-    "zkRetryTimes" : 3,
-    "zkRetryInterval" : 3000
+  "zkConfig": {
+    "zkQuorum": "localhost:2181",
+    "zkRoot": "/alert",
+    "zkSessionTimeoutMs": 10000,
+    "connectionTimeoutMs": 10000,
+    "zkRetryTimes": 3,
+    "zkRetryInterval": 3000
   },
-  "dynamicConfigSource" : {
+  "dynamicConfigSource": {
     "initDelayMillis": 3000,
-    "delayMillis" : 10000
+    "delayMillis": 10000
   },
   "metadataService": {
-    "context" : "/api",
-    "host" : "localhost",
-    "port" : 8080
+    "context": "/api",
+    "host": "localhost",
+    "port": 8080
   },
-  "metric":{
-    "tags":{
-      "topologyName":"alertUnitTopology_1"
+  "metric": {
+    "tags": {
+      "topologyName": "alertUnitTopology_1"
     }
     "sink": {
-//      "kafka": {
-//        "topic": "alert_metric_test"
-//        "bootstrap.servers": "localhost:9092"
-//      }
+      //      "kafka": {
+      //        "topic": "alert_metric_test"
+      //        "bootstrap.servers": "localhost:9092"
+      //      }
       "logger": {
-        "level":"DEBUG"
+        "level": "DEBUG"
       }
       "elasticsearch": {
         "hosts": ["localhost:9200"]

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a0fc8708/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/resources/application-test.conf
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/resources/application-test.conf b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/resources/application-test.conf
index 4cd932f..61b6b49 100755
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/resources/application-test.conf
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/resources/application-test.conf
@@ -13,17 +13,17 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 {
-  "topology" : {
-    "name" : "alertUnitTopology_1_test",
-    "numOfSpoutTasks" : 3,
-    "numOfRouterBolts" : 6,
-    "numOfAlertBolts" : 6,
-    "numOfPublishTasks" : 1,
-    "numOfTotalWorkers":1,
+  "topology": {
+    "name": "alertUnitTopology_1_test",
+    "numOfSpoutTasks": 3,
+    "numOfRouterBolts": 6,
+    "numOfAlertBolts": 6,
+    "numOfPublishTasks": 1,
+    "numOfTotalWorkers": 1,
     "messageTimeoutSecs": 30,     // topology.message.timeout.secs: 30 by default
-    "localMode" : true
+    "localMode": true
   },
-  "spout" : {
+  "spout": {
     "kafkaBrokerZkQuorum": "localhost:2181",
     "kafkaBrokerZkBasePath": "/brokers",
     "stormKafkaUseSameZkQuorumWithKafkaBroker": true,
@@ -33,31 +33,31 @@
     "stormKafkaStateUpdateIntervalMs": 2000,
     "stormKafkaFetchSizeBytes": 1048586,
   },
-  "zkConfig" : {
-    "zkQuorum" : "localhost:2181",
-    "zkRoot" : "/alert",
-    "zkSessionTimeoutMs" : 10000,
-    "connectionTimeoutMs" : 10000,
-    "zkRetryTimes" : 3,
-    "zkRetryInterval" : 3000
+  "zkConfig": {
+    "zkQuorum": "localhost:2181",
+    "zkRoot": "/alert",
+    "zkSessionTimeoutMs": 10000,
+    "connectionTimeoutMs": 10000,
+    "zkRetryTimes": 3,
+    "zkRetryInterval": 3000
   },
-  "dynamicConfigSource" : {
+  "dynamicConfigSource": {
     "initDelayMillis": 3000,
-    "delayMillis" : 10000
+    "delayMillis": 10000
   },
   "metadataService": {
-    "context" : "/api",
-    "host" : "localhost",
-    "port" : 8080
+    "context": "/api",
+    "host": "localhost",
+    "port": 8080
   },
-  "metric":{
+  "metric": {
     "sink": {
-//      "kafka": {
-//        "topic": "alert_metric_test"
-//        "bootstrap.servers": "localhost:9092"
-//      }
+      //      "kafka": {
+      //        "topic": "alert_metric_test"
+      //        "bootstrap.servers": "localhost:9092"
+      //      }
       "logger": {
-        "level":"INFO"
+        "level": "INFO"
       }
       "elasticsearch": {
         "hosts": ["localhost:9200"]

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a0fc8708/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/resources/correlation/application-integration-2.conf
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/resources/correlation/application-integration-2.conf b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/resources/correlation/application-integration-2.conf
index 80522fb..5a626d7 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/resources/correlation/application-integration-2.conf
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/resources/correlation/application-integration-2.conf
@@ -13,16 +13,16 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 {
-  "topology" : {
-    "name" : "alertUnitTopology_2",
-    "numOfSpoutTasks" : 1,
-    "numOfRouterBolts" : 4,
-    "numOfAlertBolts" : 10,
-    "numOfPublishTasks" : 1,
+  "topology": {
+    "name": "alertUnitTopology_2",
+    "numOfSpoutTasks": 1,
+    "numOfRouterBolts": 4,
+    "numOfAlertBolts": 10,
+    "numOfPublishTasks": 1,
     "numOfTotalWorkers": 20,
-    "localMode" : "true"
+    "localMode": "true"
   },
-  "spout" : {
+  "spout": {
     "kafkaBrokerZkQuorum": "localhost:2181",
     "kafkaBrokerZkBasePath": "/brokers",
     "stormKafkaUseSameZkQuorumWithKafkaBroker": true,
@@ -32,29 +32,29 @@
     "stormKafkaStateUpdateIntervalMs": 2000,
     "stormKafkaFetchSizeBytes": 1048586,
   },
-  "zkConfig" : {
-    "zkQuorum" : "localhost:2181",
-    "zkRoot" : "/alert",
-    "zkSessionTimeoutMs" : 10000,
-    "connectionTimeoutMs" : 10000,
-    "zkRetryTimes" : 3,
-    "zkRetryInterval" : 3000
+  "zkConfig": {
+    "zkQuorum": "localhost:2181",
+    "zkRoot": "/alert",
+    "zkSessionTimeoutMs": 10000,
+    "connectionTimeoutMs": 10000,
+    "zkRetryTimes": 3,
+    "zkRetryInterval": 3000
   },
-  "dynamicConfigSource" : {
+  "dynamicConfigSource": {
     "initDelayMillis": 3000,
-    "delayMillis" : 10000
+    "delayMillis": 10000
   },
   "metadataService": {
-	"context" : "/rest",
-	"host" : "localhost",
-	"port" : 8080
+    "context": "/rest",
+    "host": "localhost",
+    "port": 8080
   },
   "coordinatorService": {
-  	"host": "localhost",
-  	"port": "8080",
-  	"context" : "/rest"
+    "host": "localhost",
+    "port": "8080",
+    "context": "/rest"
   },
   "kafkaProducer": {
-  	"bootstrapServers": "localhost:9092"
+    "bootstrapServers": "localhost:9092"
   }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a0fc8708/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/resources/correlation/datasources.json
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/resources/correlation/datasources.json b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/resources/correlation/datasources.json
index 946acd6..d16cea3 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/resources/correlation/datasources.json
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/resources/correlation/datasources.json
@@ -1,37 +1,36 @@
 [
-{
-	"name": "eslog_datasource",
-	"type": "KAFKA",
-	"properties": {
-	},
-	"topic": "eslogs",
-	"schemeCls": "org.apache.eagle.alert.engine.scheme.JsonScheme",
-	"codec": {
-		"streamNameSelectorProp": {
-			"userProvidedStreamName" : "esStream",
-			"streamNameFormat":"%s"
-		},
-		"streamNameSelectorCls":"org.apache.eagle.alert.engine.scheme.JsonStringStreamNameSelector",
-		"timestampColumn": "timestamp",
-		"timestampFormat":""
-	}
-},
-{
-	"name": "bootfailure_datasource",
-	"type": "KAFKA",
-	"properties": {
-	},
-	"topic": "bootfailures",
-	"schemeCls": "org.apache.eagle.alert.engine.scheme.JsonScheme",
-	"codec": {
-		"streamNameSelectorProp": {
-			"userProvidedStreamName" : "ifStream",
-			"streamNameFormat":"%s"
-		},
-		"streamNameSelectorCls":"org.apache.eagle.alert.engine.scheme.JsonStringStreamNameSelector",
-		"timestampColumn": "timestamp",
-		"timestampFormat":""
-	}
-}
-
+  {
+    "name": "eslog_datasource",
+    "type": "KAFKA",
+    "properties": {
+    },
+    "topic": "eslogs",
+    "schemeCls": "org.apache.eagle.alert.engine.scheme.JsonScheme",
+    "codec": {
+      "streamNameSelectorProp": {
+        "userProvidedStreamName": "esStream",
+        "streamNameFormat": "%s"
+      },
+      "streamNameSelectorCls": "org.apache.eagle.alert.engine.scheme.JsonStringStreamNameSelector",
+      "timestampColumn": "timestamp",
+      "timestampFormat": ""
+    }
+  },
+  {
+    "name": "bootfailure_datasource",
+    "type": "KAFKA",
+    "properties": {
+    },
+    "topic": "bootfailures",
+    "schemeCls": "org.apache.eagle.alert.engine.scheme.JsonScheme",
+    "codec": {
+      "streamNameSelectorProp": {
+        "userProvidedStreamName": "ifStream",
+        "streamNameFormat": "%s"
+      },
+      "streamNameSelectorCls": "org.apache.eagle.alert.engine.scheme.JsonStringStreamNameSelector",
+      "timestampColumn": "timestamp",
+      "timestampFormat": ""
+    }
+  }
 ]
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a0fc8708/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/resources/correlation/policies.json
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/resources/correlation/policies.json b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/resources/correlation/policies.json
index 4702575..31463cd 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/resources/correlation/policies.json
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/resources/correlation/policies.json
@@ -1,39 +1,39 @@
 [
-{
-	"name": "logStreamJoinBootFailure",
-	"description" : "policy to check host perfmon_cpu",
-	"inputStreams": [
-		"esStream",
-		"ifStream"
-	],
-	"outputStreams": [
-		"log_stream_join_output"
-	],
-	"definition": {
-		"type": "siddhi",
-		"value": " from esStream#window.externalTime(timestamp, 20 min) as a join ifStream#window.externalTime(timestamp, 5 min) as b on a.instanceUuid == b.instanceUuid select logLevel, a.host as aHost, a.component, a.message as logMessage, b.message as failMessage, a.timestamp as t1, b.timestamp as t2, b.host as bHost, count(1) as errorCount insert into log_stream_join_output;  "
-	},
-	"partitionSpec": [
-		{
-			"streamId" : "esStream",
-			"type" : "GROUPBY",
-			"columns" : [
-				"instanceUuid"
-			],
-			"sortSpec": {
-				"windowPeriod" : "PT1M"
-			}
-		},
-		{
-			"streamId" : "ifStream",
-			"type" : "GROUPBY",
-			"columns" : [
-				"instanceUuid"
-			],
-			"sortSpec": {
-				"windowPeriod" : "PT1M"
-			}
-		}
-	]
-}
+  {
+    "name": "logStreamJoinBootFailure",
+    "description": "policy to check host perfmon_cpu",
+    "inputStreams": [
+      "esStream",
+      "ifStream"
+    ],
+    "outputStreams": [
+      "log_stream_join_output"
+    ],
+    "definition": {
+      "type": "siddhi",
+      "value": " from esStream#window.externalTime(timestamp, 20 min) as a join ifStream#window.externalTime(timestamp, 5 min) as b on a.instanceUuid == b.instanceUuid select logLevel, a.host as aHost, a.component, a.message as logMessage, b.message as failMessage, a.timestamp as t1, b.timestamp as t2, b.host as bHost, count(1) as errorCount insert into log_stream_join_output;  "
+    },
+    "partitionSpec": [
+      {
+        "streamId": "esStream",
+        "type": "GROUPBY",
+        "columns": [
+          "instanceUuid"
+        ],
+        "sortSpec": {
+          "windowPeriod": "PT1M"
+        }
+      },
+      {
+        "streamId": "ifStream",
+        "type": "GROUPBY",
+        "columns": [
+          "instanceUuid"
+        ],
+        "sortSpec": {
+          "windowPeriod": "PT1M"
+        }
+      }
+    ]
+  }
 ]
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a0fc8708/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/resources/correlation/publishments.json
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/resources/correlation/publishments.json b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/resources/correlation/publishments.json
index 427526b..06d7b82 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/resources/correlation/publishments.json
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/resources/correlation/publishments.json
@@ -1,18 +1,18 @@
 [
-{
-	"name":"log-stream-join-output",
-	"type":"org.apache.eagle.alert.engine.publisher.impl.AlertEmailPublisher",
-	"policyIds": [
-		"log_stream_join_output"
-	],
-	"properties": {
-		"subject":"Eagle Test Alert",
-        "template":"",
-	  "sender": "sender@corp.com",
-	  "recipients": "receiver@corp.com",
-	  "smtp.server":"mailhost.com"
-	},
-	"dedupIntervalMin" : "PT1M",
-	"serializer" : "org.apache.eagle.alert.engine.publisher.impl.StringEventSerializer"
-}
+  {
+    "name": "log-stream-join-output",
+    "type": "org.apache.eagle.alert.engine.publisher.impl.AlertEmailPublisher",
+    "policyIds": [
+      "log_stream_join_output"
+    ],
+    "properties": {
+      "subject": "Eagle Test Alert",
+      "template": "",
+      "sender": "sender@corp.com",
+      "recipients": "receiver@corp.com",
+      "smtp.server": "mailhost.com"
+    },
+    "dedupIntervalMin": "PT1M",
+    "serializer": "org.apache.eagle.alert.engine.publisher.impl.StringEventSerializer"
+  }
 ]
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a0fc8708/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/resources/correlation/streamdefinitions.json
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/resources/correlation/streamdefinitions.json b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/resources/correlation/streamdefinitions.json
index 0a26a28..d6603ac 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/resources/correlation/streamdefinitions.json
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/resources/correlation/streamdefinitions.json
@@ -1,93 +1,92 @@
 [
-{
-	"streamId": "esStream",
-	"dataSource" : "eslog_datasource",
-	"description":"the data stream for es log of different modules",
-	"validate": false,
-	"timeseries":false,
-	"columns": [
-		{
-			"name": "instanceUuid",
-			"type" : "string",
-			"defaultValue": "",
-			"required":true
-		},
-		{
-			"name": "timestamp",
-			"type" : "long",
-			"defaultValue": 0,
-			"required":true
-		},
-		{
-			"name": "logLevel",
-			"type" : "string",
-			"defaultValue": "ERROR",
-			"required": true
-		},
-		{
-			"name": "message",
-			"type" : "string",
-			"defaultValue": "",
-			"required":true
-		},
-		{
-			"name": "reqId",
-			"type" : "string",
-			"defaultValue": "",
-			"required":true
-		},
-		{
-			"name": "host",
-			"type" : "string",
-			"defaultValue": "",
-			"required":true
-		},
-		{
-			"name": "component",
-			"type" : "string",
-			"defaultValue": "nova",
-			"required":true
-		}
-	]
-}
-,
-{
-	"streamId": "ifStream",
-	"dataSource" : "bootfailure_datasource",
-	"description":"the data stream for boot failure(instance fault)",
-	"validate": false,
-	"timeseries":false,
-	"columns": [
-		{
-			"name": "instanceUuid",
-			"type" : "string",
-			"defaultValue": "",
-			"required":true
-		},
-		{
-			"name": "timestamp",
-			"type" : "long",
-			"defaultValue": 0,
-			"required":true
-		},
-		{
-			"name": "reqId",
-			"type" : "string",
-			"defaultValue": "",
-			"required": true
-		},
-		{
-			"name": "message",
-			"type" : "string",
-			"defaultValue": "",
-			"required":true
-		},
-		{
-			"name": "host",
-			"type" : "string",
-			"defaultValue": "",
-			"required":true
-		}
-	]
-}
+  {
+    "streamId": "esStream",
+    "dataSource": "eslog_datasource",
+    "description": "the data stream for es log of different modules",
+    "validate": false,
+    "timeseries": false,
+    "columns": [
+      {
+        "name": "instanceUuid",
+        "type": "string",
+        "defaultValue": "",
+        "required": true
+      },
+      {
+        "name": "timestamp",
+        "type": "long",
+        "defaultValue": 0,
+        "required": true
+      },
+      {
+        "name": "logLevel",
+        "type": "string",
+        "defaultValue": "ERROR",
+        "required": true
+      },
+      {
+        "name": "message",
+        "type": "string",
+        "defaultValue": "",
+        "required": true
+      },
+      {
+        "name": "reqId",
+        "type": "string",
+        "defaultValue": "",
+        "required": true
+      },
+      {
+        "name": "host",
+        "type": "string",
+        "defaultValue": "",
+        "required": true
+      },
+      {
+        "name": "component",
+        "type": "string",
+        "defaultValue": "nova",
+        "required": true
+      }
+    ]
+  },
+  {
+    "streamId": "ifStream",
+    "dataSource": "bootfailure_datasource",
+    "description": "the data stream for boot failure(instance fault)",
+    "validate": false,
+    "timeseries": false,
+    "columns": [
+      {
+        "name": "instanceUuid",
+        "type": "string",
+        "defaultValue": "",
+        "required": true
+      },
+      {
+        "name": "timestamp",
+        "type": "long",
+        "defaultValue": 0,
+        "required": true
+      },
+      {
+        "name": "reqId",
+        "type": "string",
+        "defaultValue": "",
+        "required": true
+      },
+      {
+        "name": "message",
+        "type": "string",
+        "defaultValue": "",
+        "required": true
+      },
+      {
+        "name": "host",
+        "type": "string",
+        "defaultValue": "",
+        "required": true
+      }
+    ]
+  }
 ]
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a0fc8708/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/resources/correlation/topologies.json
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/resources/correlation/topologies.json b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/resources/correlation/topologies.json
index 9aa8716..c865746 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/resources/correlation/topologies.json
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/resources/correlation/topologies.json
@@ -1,31 +1,31 @@
 [
-{
-	"name": "alertUnitTopology_2",
-	"numOfSpout":1,
-	"numOfGroupBolt": 4,
-	"numOfAlertBolt": 10,
-	"spoutId": "alertEngineSpout",
-	"groupNodeIds" : [
-		"streamRouterBolt0",
-		"streamRouterBolt1",
-		"streamRouterBolt2",
-		"streamRouterBolt3"
-	],
-	"alertBoltIds": [
-		"alertBolt0",
-		"alertBolt1",
-		"alertBolt2",
-		"alertBolt3",
-		"alertBolt4",
-		"alertBolt5",
-		"alertBolt6",
-		"alertBolt7",
-		"alertBolt8",
-		"alertBolt9"
-	],
-	"pubBoltId" : "alertPublishBolt",
-	"spoutParallelism": 1,
-	"groupParallelism": 1,
-	"alertParallelism": 1
-}
+  {
+    "name": "alertUnitTopology_2",
+    "numOfSpout": 1,
+    "numOfGroupBolt": 4,
+    "numOfAlertBolt": 10,
+    "spoutId": "alertEngineSpout",
+    "groupNodeIds": [
+      "streamRouterBolt0",
+      "streamRouterBolt1",
+      "streamRouterBolt2",
+      "streamRouterBolt3"
+    ],
+    "alertBoltIds": [
+      "alertBolt0",
+      "alertBolt1",
+      "alertBolt2",
+      "alertBolt3",
+      "alertBolt4",
+      "alertBolt5",
+      "alertBolt6",
+      "alertBolt7",
+      "alertBolt8",
+      "alertBolt9"
+    ],
+    "pubBoltId": "alertPublishBolt",
+    "spoutParallelism": 1,
+    "groupParallelism": 1,
+    "alertParallelism": 1
+  }
 ]

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a0fc8708/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/resources/correlation_spouttest.conf
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/resources/correlation_spouttest.conf b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/resources/correlation_spouttest.conf
index 2b6f767..c607702 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/resources/correlation_spouttest.conf
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/resources/correlation_spouttest.conf
@@ -13,17 +13,17 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 {
-  "topology" : {
-    "name" : "alertUnitTopology_1_test",
-    "numOfSpoutTasks" : 3,
-    "numOfRouterBolts" : 6,
-    "numOfAlertBolts" : 6,
-    "numOfPublishTasks" : 1,
-    "numOfTotalWorkers":1,
+  "topology": {
+    "name": "alertUnitTopology_1_test",
+    "numOfSpoutTasks": 3,
+    "numOfRouterBolts": 6,
+    "numOfAlertBolts": 6,
+    "numOfPublishTasks": 1,
+    "numOfTotalWorkers": 1,
     "messageTimeoutSecs": 30,     // topology.message.timeout.secs: 30 by default
-    "localMode" : true
+    "localMode": true
   },
-  "spout" : {
+  "spout": {
     "kafkaBrokerZkQuorum": "localhost:2181",
     "kafkaBrokerZkBasePath": "/brokers",
     "stormKafkaUseSameZkQuorumWithKafkaBroker": true,
@@ -33,21 +33,21 @@
     "stormKafkaStateUpdateIntervalMs": 2000,
     "stormKafkaFetchSizeBytes": 1048586,
   },
-  "zkConfig" : {
-    "zkQuorum" : "localhost:2181",
-    "zkRoot" : "/alert",
-    "zkSessionTimeoutMs" : 10000,
-    "connectionTimeoutMs" : 10000,
-    "zkRetryTimes" : 3,
-    "zkRetryInterval" : 3000
+  "zkConfig": {
+    "zkQuorum": "localhost:2181",
+    "zkRoot": "/alert",
+    "zkSessionTimeoutMs": 10000,
+    "connectionTimeoutMs": 10000,
+    "zkRetryTimes": 3,
+    "zkRetryInterval": 3000
   },
-  "dynamicConfigSource" : {
+  "dynamicConfigSource": {
     "initDelayMillis": 3000,
-    "delayMillis" : 10000
+    "delayMillis": 10000
   },
   "metadataService": {
-    "context" : "/rest",
-    "host" : "localhost",
-    "port" : 8080
+    "context": "/rest",
+    "host": "localhost",
+    "port": 8080
   }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a0fc8708/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/resources/e2e/application-e2e.conf
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/resources/e2e/application-e2e.conf b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/resources/e2e/application-e2e.conf
index 80522fb..5a626d7 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/resources/e2e/application-e2e.conf
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/resources/e2e/application-e2e.conf
@@ -13,16 +13,16 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 {
-  "topology" : {
-    "name" : "alertUnitTopology_2",
-    "numOfSpoutTasks" : 1,
-    "numOfRouterBolts" : 4,
-    "numOfAlertBolts" : 10,
-    "numOfPublishTasks" : 1,
+  "topology": {
+    "name": "alertUnitTopology_2",
+    "numOfSpoutTasks": 1,
+    "numOfRouterBolts": 4,
+    "numOfAlertBolts": 10,
+    "numOfPublishTasks": 1,
     "numOfTotalWorkers": 20,
-    "localMode" : "true"
+    "localMode": "true"
   },
-  "spout" : {
+  "spout": {
     "kafkaBrokerZkQuorum": "localhost:2181",
     "kafkaBrokerZkBasePath": "/brokers",
     "stormKafkaUseSameZkQuorumWithKafkaBroker": true,
@@ -32,29 +32,29 @@
     "stormKafkaStateUpdateIntervalMs": 2000,
     "stormKafkaFetchSizeBytes": 1048586,
   },
-  "zkConfig" : {
-    "zkQuorum" : "localhost:2181",
-    "zkRoot" : "/alert",
-    "zkSessionTimeoutMs" : 10000,
-    "connectionTimeoutMs" : 10000,
-    "zkRetryTimes" : 3,
-    "zkRetryInterval" : 3000
+  "zkConfig": {
+    "zkQuorum": "localhost:2181",
+    "zkRoot": "/alert",
+    "zkSessionTimeoutMs": 10000,
+    "connectionTimeoutMs": 10000,
+    "zkRetryTimes": 3,
+    "zkRetryInterval": 3000
   },
-  "dynamicConfigSource" : {
+  "dynamicConfigSource": {
     "initDelayMillis": 3000,
-    "delayMillis" : 10000
+    "delayMillis": 10000
   },
   "metadataService": {
-	"context" : "/rest",
-	"host" : "localhost",
-	"port" : 8080
+    "context": "/rest",
+    "host": "localhost",
+    "port": 8080
   },
   "coordinatorService": {
-  	"host": "localhost",
-  	"port": "8080",
-  	"context" : "/rest"
+    "host": "localhost",
+    "port": "8080",
+    "context": "/rest"
   },
   "kafkaProducer": {
-  	"bootstrapServers": "localhost:9092"
+    "bootstrapServers": "localhost:9092"
   }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a0fc8708/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/resources/e2e/datasources.json
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/resources/e2e/datasources.json b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/resources/e2e/datasources.json
index 6d745a9..09d1b97 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/resources/e2e/datasources.json
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/resources/e2e/datasources.json
@@ -1,21 +1,19 @@
 [
-
-{
-	"name": "network_syslog_datasource",
-	"type": "KAFKA",
-	"properties": {
-	},
-	"topic": "syslog_events",
-	"schemeCls": "org.apache.eagle.alert.engine.extension.SherlockEventScheme",
-	"codec": {
-		"streamNameSelectorProp": {
-			"userProvidedStreamName" : "syslog_stream",
-			"streamNameFormat":"%s"
-		},
-		"streamNameSelectorCls":"org.apache.eagle.alert.engine.scheme.JsonStringStreamNameSelector",
-		"timestampColumn": "timestamp",
-		"timestampFormat":""
-	}
-}
-
+  {
+    "name": "network_syslog_datasource",
+    "type": "KAFKA",
+    "properties": {
+    },
+    "topic": "syslog_events",
+    "schemeCls": "org.apache.eagle.alert.engine.extension.SherlockEventScheme",
+    "codec": {
+      "streamNameSelectorProp": {
+        "userProvidedStreamName": "syslog_stream",
+        "streamNameFormat": "%s"
+      },
+      "streamNameSelectorCls": "org.apache.eagle.alert.engine.scheme.JsonStringStreamNameSelector",
+      "timestampColumn": "timestamp",
+      "timestampFormat": ""
+    }
+  }
 ]
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a0fc8708/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/resources/e2e/policies.json
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/resources/e2e/policies.json b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/resources/e2e/policies.json
index ba013b2..0848585 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/resources/e2e/policies.json
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/resources/e2e/policies.json
@@ -1,28 +1,26 @@
 [
-
-{
-	"name": "syslog_severity_check",
-	"description" : "syslog.severity > 3 then error",
-	"inputStreams": [
-		"syslog_stream"
-	],
-	"outputStreams": [
-		"syslog_severity_check_output"
-	],
-	"definition": {
-		"type": "siddhi",
-		"value": "from syslog_stream[dims_severity == \"NOTICE\"] select * insert into syslog_severity_check_output;"
-	},
-	"partitionSpec": [
-		{
-			"streamId" : "syslog_stream",
-			"type" : "GROUPBY",
-			"columns" : [
-				"dims_hostname"
-			]
-		}
-	],
-	"parallelismHint": 10
-}
-
+  {
+    "name": "syslog_severity_check",
+    "description": "syslog.severity > 3 then error",
+    "inputStreams": [
+      "syslog_stream"
+    ],
+    "outputStreams": [
+      "syslog_severity_check_output"
+    ],
+    "definition": {
+      "type": "siddhi",
+      "value": "from syslog_stream[dims_severity == \"NOTICE\"] select * insert into syslog_severity_check_output;"
+    },
+    "partitionSpec": [
+      {
+        "streamId": "syslog_stream",
+        "type": "GROUPBY",
+        "columns": [
+          "dims_hostname"
+        ]
+      }
+    ],
+    "parallelismHint": 10
+  }
 ]
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a0fc8708/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/resources/e2e/publishments.json
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/resources/e2e/publishments.json b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/resources/e2e/publishments.json
index a9b85b8..0d56012 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/resources/e2e/publishments.json
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/resources/e2e/publishments.json
@@ -1,17 +1,17 @@
 [
-
-{
-  "type": "org.apache.eagle.alert.engine.publisher.impl.AlertKafkaPublisher",
-  "name":"network-syslog-publish",
-  "policyIds": ["syslog_severity_check"],
-  "dedupIntervalMin": "PT0M",
-  "properties":{
-    "kafka_broker":"localhost:9092",
-    "topic":"syslog_alerts",
-    "value_deserializer": "org.apache.kafka.common.serialization.ByteArrayDeserializer",
-    "value_serializer": "org.apache.kafka.common.serialization.ByteArraySerializer"
-  },
-  "serializer" : "org.apache.eagle.alert.engine.extension.SherlockAlertSerializer"
-}
-
+  {
+    "type": "org.apache.eagle.alert.engine.publisher.impl.AlertKafkaPublisher",
+    "name": "network-syslog-publish",
+    "policyIds": [
+      "syslog_severity_check"
+    ],
+    "dedupIntervalMin": "PT0M",
+    "properties": {
+      "kafka_broker": "localhost:9092",
+      "topic": "syslog_alerts",
+      "value_deserializer": "org.apache.kafka.common.serialization.ByteArrayDeserializer",
+      "value_serializer": "org.apache.kafka.common.serialization.ByteArraySerializer"
+    },
+    "serializer": "org.apache.eagle.alert.engine.extension.SherlockAlertSerializer"
+  }
 ]
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a0fc8708/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/resources/e2e/sherlock.json
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/resources/e2e/sherlock.json b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/resources/e2e/sherlock.json
index a0eac01..ec5a13b 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/resources/e2e/sherlock.json
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/resources/e2e/sherlock.json
@@ -1,4 +1,5 @@
-{"SchemaCompModel": {
+{
+  "SchemaCompModel": {
     "syslog": {
       "demo": {
         "tsdType": "SherlockLog",

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a0fc8708/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/resources/e2e/streamdefinitions.json
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/resources/e2e/streamdefinitions.json b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/resources/e2e/streamdefinitions.json
index fafa8aa..082e068 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/resources/e2e/streamdefinitions.json
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/resources/e2e/streamdefinitions.json
@@ -1,76 +1,83 @@
 [
-
-{
-	"streamId": "syslog_stream",
-	"dataSource" : "network_syslog_datasource",
-	"description":"the data stream for syslog events",
-	"validate": false,
-	"timeseries":false,
-	"columns": [
-		{
-			"name": "dims_facility",
-			"type" : "STRING",
-			"defaultValue": "",
-			"required":true
-		},{
-			"name": "dims_severity",
-			"type" : "STRING",
-			"defaultValue": "",
-			"required": true
-		},
-		{
-			"name": "dims_hostname",
-			"type" : "STRING",
-			"defaultValue": "",
-			"required": true
-		},
-		{
-			"name": "dims_msgid",
-			"type" : "STRING",
-			"defaultValue": "",
-			"required": true
-		},{
-			"name": "timestamp",
-			"type" : "STRING",
-			"defaultValue": "",
-			"required":true
-		},{
-			"name": "conn",
-			"type" : "STRING",
-			"defaultValue": "",
-			"required": true
-		},{
-			"name": "op",
-			"type" : "STRING",
-			"defaultValue": "",
-			"required":true
-		},{
-			"name": "msgId",
-			"type" : "STRING",
-			"defaultValue": "",
-			"required":true
-		},{
-			"name": "command",
-			"type" : "STRING",
-			"defaultValue": "",
-			"required": true
-		},{
-			"name": "name",
-			"type" : "STRING",
-			"defaultValue": "",
-			"required": true
-		},{
-			"name": "namespace",
-			"type" : "STRING",
-			"defaultValue": "",
-			"required": true
-		},{
-			"name": "epochMillis",
-			"type" : "LONG",
-			"defaultValue": 0,
-			"required": true
-		}
-	]
-}
-
+  {
+    "streamId": "syslog_stream",
+    "dataSource": "network_syslog_datasource",
+    "description": "the data stream for syslog events",
+    "validate": false,
+    "timeseries": false,
+    "columns": [
+      {
+        "name": "dims_facility",
+        "type": "STRING",
+        "defaultValue": "",
+        "required": true
+      },
+      {
+        "name": "dims_severity",
+        "type": "STRING",
+        "defaultValue": "",
+        "required": true
+      },
+      {
+        "name": "dims_hostname",
+        "type": "STRING",
+        "defaultValue": "",
+        "required": true
+      },
+      {
+        "name": "dims_msgid",
+        "type": "STRING",
+        "defaultValue": "",
+        "required": true
+      },
+      {
+        "name": "timestamp",
+        "type": "STRING",
+        "defaultValue": "",
+        "required": true
+      },
+      {
+        "name": "conn",
+        "type": "STRING",
+        "defaultValue": "",
+        "required": true
+      },
+      {
+        "name": "op",
+        "type": "STRING",
+        "defaultValue": "",
+        "required": true
+      },
+      {
+        "name": "msgId",
+        "type": "STRING",
+        "defaultValue": "",
+        "required": true
+      },
+      {
+        "name": "command",
+        "type": "STRING",
+        "defaultValue": "",
+        "required": true
+      },
+      {
+        "name": "name",
+        "type": "STRING",
+        "defaultValue": "",
+        "required": true
+      },
+      {
+        "name": "namespace",
+        "type": "STRING",
+        "defaultValue": "",
+        "required": true
+      },
+      {
+        "name": "epochMillis",
+        "type": "LONG",
+        "defaultValue": 0,
+        "required": true
+      }
+    ]
+  }
 ]
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a0fc8708/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/resources/e2e/topologies.json
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/resources/e2e/topologies.json b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/resources/e2e/topologies.json
index 9aa8716..c865746 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/resources/e2e/topologies.json
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/resources/e2e/topologies.json
@@ -1,31 +1,31 @@
 [
-{
-	"name": "alertUnitTopology_2",
-	"numOfSpout":1,
-	"numOfGroupBolt": 4,
-	"numOfAlertBolt": 10,
-	"spoutId": "alertEngineSpout",
-	"groupNodeIds" : [
-		"streamRouterBolt0",
-		"streamRouterBolt1",
-		"streamRouterBolt2",
-		"streamRouterBolt3"
-	],
-	"alertBoltIds": [
-		"alertBolt0",
-		"alertBolt1",
-		"alertBolt2",
-		"alertBolt3",
-		"alertBolt4",
-		"alertBolt5",
-		"alertBolt6",
-		"alertBolt7",
-		"alertBolt8",
-		"alertBolt9"
-	],
-	"pubBoltId" : "alertPublishBolt",
-	"spoutParallelism": 1,
-	"groupParallelism": 1,
-	"alertParallelism": 1
-}
+  {
+    "name": "alertUnitTopology_2",
+    "numOfSpout": 1,
+    "numOfGroupBolt": 4,
+    "numOfAlertBolt": 10,
+    "spoutId": "alertEngineSpout",
+    "groupNodeIds": [
+      "streamRouterBolt0",
+      "streamRouterBolt1",
+      "streamRouterBolt2",
+      "streamRouterBolt3"
+    ],
+    "alertBoltIds": [
+      "alertBolt0",
+      "alertBolt1",
+      "alertBolt2",
+      "alertBolt3",
+      "alertBolt4",
+      "alertBolt5",
+      "alertBolt6",
+      "alertBolt7",
+      "alertBolt8",
+      "alertBolt9"
+    ],
+    "pubBoltId": "alertPublishBolt",
+    "spoutParallelism": 1,
+    "groupParallelism": 1,
+    "alertParallelism": 1
+  }
 ]