You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@eagle.apache.org by ha...@apache.org on 2016/09/08 07:14:10 UTC

[09/18] incubator-eagle git commit: [EAGLE-530] Fix checkstyle problems on eagle-alert module and enable failOnViolation

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a0fc8708/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/UnitTopologyRunner.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/UnitTopologyRunner.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/UnitTopologyRunner.java
index 87faf82..7a93e72 100755
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/UnitTopologyRunner.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/UnitTopologyRunner.java
@@ -19,6 +19,12 @@
 
 package org.apache.eagle.alert.engine.runner;
 
+import org.apache.eagle.alert.engine.coordinator.IMetadataChangeNotifyService;
+import org.apache.eagle.alert.engine.coordinator.impl.ZKMetadataChangeNotifyService;
+import org.apache.eagle.alert.engine.spout.CorrelationSpout;
+import org.apache.eagle.alert.utils.AlertConstants;
+import org.apache.eagle.alert.utils.StreamIdConversion;
+
 import backtype.storm.LocalCluster;
 import backtype.storm.StormSubmitter;
 import backtype.storm.generated.StormTopology;
@@ -28,11 +34,7 @@ import backtype.storm.tuple.Fields;
 import backtype.storm.utils.Utils;
 import com.typesafe.config.Config;
 import com.typesafe.config.ConfigRenderOptions;
-import org.apache.eagle.alert.engine.coordinator.IMetadataChangeNotifyService;
-import org.apache.eagle.alert.engine.coordinator.impl.ZKMetadataChangeNotifyService;
-import org.apache.eagle.alert.engine.spout.CorrelationSpout;
-import org.apache.eagle.alert.utils.AlertConstants;
-import org.apache.eagle.alert.utils.StreamIdConversion;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -48,24 +50,24 @@ import java.util.List;
  */
 public class UnitTopologyRunner {
     private static final Logger LOG = LoggerFactory.getLogger(UnitTopologyRunner.class);
-    public final static String spoutName = "alertEngineSpout";
-    private final static String streamRouterBoltNamePrefix = "streamRouterBolt";
-    private final static String alertBoltNamePrefix = "alertBolt";
-    public final static String alertPublishBoltName = "alertPublishBolt";
-
-    public final static String TOTAL_WORKER_NUM = "topology.numOfTotalWorkers";
-    public final static String SPOUT_TASK_NUM = "topology.numOfSpoutTasks";
-    public final static String ROUTER_TASK_NUM = "topology.numOfRouterBolts";
-    public final static String ALERT_TASK_NUM = "topology.numOfAlertBolts";
-    public final static String PUBLISH_TASK_NUM = "topology.numOfPublishTasks";
-    public final static String LOCAL_MODE = "topology.localMode";
-    public final static String MESSAGE_TIMEOUT_SECS = "topology.messageTimeoutSecs";
-    public final static int DEFAULT_MESSAGE_TIMEOUT_SECS = 3600;
+    public static final String spoutName = "alertEngineSpout";
+    private static final String streamRouterBoltNamePrefix = "streamRouterBolt";
+    private static final String alertBoltNamePrefix = "alertBolt";
+    public static final String alertPublishBoltName = "alertPublishBolt";
+
+    public static final String TOTAL_WORKER_NUM = "topology.numOfTotalWorkers";
+    public static final String SPOUT_TASK_NUM = "topology.numOfSpoutTasks";
+    public static final String ROUTER_TASK_NUM = "topology.numOfRouterBolts";
+    public static final String ALERT_TASK_NUM = "topology.numOfAlertBolts";
+    public static final String PUBLISH_TASK_NUM = "topology.numOfPublishTasks";
+    public static final String LOCAL_MODE = "topology.localMode";
+    public static final String MESSAGE_TIMEOUT_SECS = "topology.messageTimeoutSecs";
+    public static final int DEFAULT_MESSAGE_TIMEOUT_SECS = 3600;
 
     private final IMetadataChangeNotifyService metadataChangeNotifyService;
     private backtype.storm.Config givenStormConfig = null;
 
-    public UnitTopologyRunner(IMetadataChangeNotifyService metadataChangeNotifyService){
+    public UnitTopologyRunner(IMetadataChangeNotifyService metadataChangeNotifyService) {
         this.metadataChangeNotifyService = metadataChangeNotifyService;
     }
 
@@ -75,15 +77,15 @@ public class UnitTopologyRunner {
     }
 
     public StormTopology buildTopology(String topologyId,
-                              int numOfSpoutTasks,
-                              int numOfRouterBolts,
-                              int numOfAlertBolts,
-                              int numOfPublishTasks,
-                              Config config) {
+                                       int numOfSpoutTasks,
+                                       int numOfRouterBolts,
+                                       int numOfAlertBolts,
+                                       int numOfPublishTasks,
+                                       Config config) {
 
         StreamRouterBolt[] routerBolts = new StreamRouterBolt[numOfRouterBolts];
         AlertBolt[] alertBolts = new AlertBolt[numOfAlertBolts];
-        AlertPublisherBolt publisherBolt;
+
 
         TopologyBuilder builder = new TopologyBuilder();
 
@@ -93,27 +95,27 @@ public class UnitTopologyRunner {
         builder.setSpout(spoutName, spout, numOfSpoutTasks).setNumTasks(numOfSpoutTasks);
 
         // construct StreamRouterBolt objects
-        for(int i=0; i<numOfRouterBolts; i++){
+        for (int i = 0; i < numOfRouterBolts; i++) {
             routerBolts[i] = new StreamRouterBolt(streamRouterBoltNamePrefix + i, config, getMetadataChangeNotifyService());
         }
 
         // construct AlertBolt objects
-        for(int i=0; i<numOfAlertBolts; i++){
-            alertBolts[i] = new AlertBolt(alertBoltNamePrefix+i, config, getMetadataChangeNotifyService());
+        for (int i = 0; i < numOfAlertBolts; i++) {
+            alertBolts[i] = new AlertBolt(alertBoltNamePrefix + i, config, getMetadataChangeNotifyService());
         }
 
         // construct AlertPublishBolt object
-        publisherBolt = new AlertPublisherBolt(alertPublishBoltName, config, getMetadataChangeNotifyService());
+        AlertPublisherBolt publisherBolt = new AlertPublisherBolt(alertPublishBoltName, config, getMetadataChangeNotifyService());
 
         // connect spout and router bolt, also define output streams for downstreaming alert bolt
-        for(int i=0; i<numOfRouterBolts; i++){
+        for (int i = 0; i < numOfRouterBolts; i++) {
             String boltName = streamRouterBoltNamePrefix + i;
 
             // define output streams, which are based on
             String streamId = StreamIdConversion.generateStreamIdBetween(spoutName, boltName);
             List<String> outputStreamIds = new ArrayList<>(numOfAlertBolts);
-            for(int j=0; j<numOfAlertBolts; j++){
-                String sid = StreamIdConversion.generateStreamIdBetween(boltName, alertBoltNamePrefix+j);
+            for (int j = 0; j < numOfAlertBolts; j++) {
+                String sid = StreamIdConversion.generateStreamIdBetween(boltName, alertBoltNamePrefix + j);
                 outputStreamIds.add(sid);
             }
             routerBolts[i].declareOutputStreams(outputStreamIds);
@@ -126,79 +128,79 @@ public class UnitTopologyRunner {
         }
 
         // connect router bolt and alert bolt, also define output streams for downstreaming alert publish bolt
-        for(int i=0; i<numOfAlertBolts; i++){
+        for (int i = 0; i < numOfAlertBolts; i++) {
             String boltName = alertBoltNamePrefix + i;
             BoltDeclarer boltDeclarer = builder.setBolt(boltName, alertBolts[i]);
-            for(int j=0; j<numOfRouterBolts; j++) {
-                String streamId = StreamIdConversion.generateStreamIdBetween(streamRouterBoltNamePrefix+j, boltName);
-                boltDeclarer.fieldsGrouping(streamRouterBoltNamePrefix+j, streamId, new Fields());
+            for (int j = 0; j < numOfRouterBolts; j++) {
+                String streamId = StreamIdConversion.generateStreamIdBetween(streamRouterBoltNamePrefix + j, boltName);
+                boltDeclarer.fieldsGrouping(streamRouterBoltNamePrefix + j, streamId, new Fields());
             }
         }
 
         // connect alert bolt and alert publish bolt, this is the last bolt in the pipeline
         BoltDeclarer boltDeclarer = builder.setBolt(alertPublishBoltName, publisherBolt).setNumTasks(numOfPublishTasks);
-        for(int i=0; i<numOfAlertBolts; i++) {
-            boltDeclarer.fieldsGrouping(alertBoltNamePrefix+i, new Fields(AlertConstants.FIELD_0));
+        for (int i = 0; i < numOfAlertBolts; i++) {
+            boltDeclarer.fieldsGrouping(alertBoltNamePrefix + i, new Fields(AlertConstants.FIELD_0));
         }
 
         return builder.createTopology();
     }
 
+    public StormTopology buildTopology(String topologyId, Config config) {
+        int numOfSpoutTasks = config.getInt("topology.numOfSpoutTasks");
+        int numOfRouterBolts = config.getInt("topology.numOfRouterBolts");
+        int numOfAlertBolts = config.getInt("topology.numOfAlertBolts");
+        int numOfPublishTasks = config.getInt("topology.numOfPublishTasks");
+        return buildTopology(topologyId, numOfSpoutTasks, numOfRouterBolts, numOfAlertBolts, numOfPublishTasks, config);
+    }
+
     private void run(String topologyId,
-                    int numOfTotalWorkers,
-                    int numOfSpoutTasks,
-                    int numOfRouterBolts,
-                    int numOfAlertBolts,
-                    int numOfPublishTasks,
-                    Config config,
-                    boolean localMode) {
+                     int numOfTotalWorkers,
+                     int numOfSpoutTasks,
+                     int numOfRouterBolts,
+                     int numOfAlertBolts,
+                     int numOfPublishTasks,
+                     Config config,
+                     boolean localMode) {
 
         backtype.storm.Config stormConfig = givenStormConfig == null ? new backtype.storm.Config() : givenStormConfig;
         // TODO: Configurable metric consumer instance number
 
-        int messageTimeoutSecs = config.hasPath(MESSAGE_TIMEOUT_SECS)?config.getInt(MESSAGE_TIMEOUT_SECS) : DEFAULT_MESSAGE_TIMEOUT_SECS;
-        LOG.info("Set topology.message.timeout.secs as {}",messageTimeoutSecs);
+        int messageTimeoutSecs = config.hasPath(MESSAGE_TIMEOUT_SECS) ? config.getInt(MESSAGE_TIMEOUT_SECS) : DEFAULT_MESSAGE_TIMEOUT_SECS;
+        LOG.info("Set topology.message.timeout.secs as {}", messageTimeoutSecs);
         stormConfig.setMessageTimeoutSecs(messageTimeoutSecs);
 
-        if(config.hasPath("metric")) {
-            stormConfig.registerMetricsConsumer(StormMetricTaggedConsumer.class, config.root().render(ConfigRenderOptions.concise()),1);
+        if (config.hasPath("metric")) {
+            stormConfig.registerMetricsConsumer(StormMetricTaggedConsumer.class, config.root().render(ConfigRenderOptions.concise()), 1);
         }
 
         stormConfig.setNumWorkers(numOfTotalWorkers);
         StormTopology topology = buildTopology(topologyId, numOfSpoutTasks, numOfRouterBolts, numOfAlertBolts, numOfPublishTasks, config);
 
-        if(localMode) {
+        if (localMode) {
             LOG.info("Submitting as local mode");
             LocalCluster cluster = new LocalCluster();
             cluster.submitTopology(topologyId, stormConfig, topology);
             Utils.sleep(Long.MAX_VALUE);
-        }else{
+        } else {
             LOG.info("Submitting as cluster mode");
             try {
                 StormSubmitter.submitTopologyWithProgressBar(topologyId, stormConfig, topology);
-            } catch(Exception ex) {
+            } catch (Exception ex) {
                 LOG.error("fail submitting topology {}", topology, ex);
                 throw new IllegalStateException(ex);
             }
         }
     }
 
-    public void run(String topologyId,Config config) {
+    public void run(String topologyId, Config config) {
         int numOfSpoutTasks = config.getInt(SPOUT_TASK_NUM);
         int numOfRouterBolts = config.getInt(ROUTER_TASK_NUM);
         int numOfAlertBolts = config.getInt(ALERT_TASK_NUM);
         int numOfPublishTasks = config.getInt(PUBLISH_TASK_NUM);
         boolean localMode = config.getBoolean(LOCAL_MODE);
         int numOfTotalWorkers = config.getInt(TOTAL_WORKER_NUM);
-        run(topologyId,numOfTotalWorkers, numOfSpoutTasks,numOfRouterBolts,numOfAlertBolts,numOfPublishTasks,config, localMode);
-    }
-
-    public StormTopology buildTopology(String topologyId,Config config) {
-        int numOfSpoutTasks = config.getInt("topology.numOfSpoutTasks");
-        int numOfRouterBolts = config.getInt("topology.numOfRouterBolts");
-        int numOfAlertBolts = config.getInt("topology.numOfAlertBolts");
-        int numOfPublishTasks = config.getInt("topology.numOfPublishTasks");
-        return buildTopology(topologyId,numOfSpoutTasks,numOfRouterBolts,numOfAlertBolts,numOfPublishTasks,config);
+        run(topologyId, numOfTotalWorkers, numOfSpoutTasks, numOfRouterBolts, numOfAlertBolts, numOfPublishTasks, config, localMode);
     }
 
     public IMetadataChangeNotifyService getMetadataChangeNotifyService() {

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a0fc8708/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/scheme/JsonScheme.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/scheme/JsonScheme.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/scheme/JsonScheme.java
index c1da90f..43df203 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/scheme/JsonScheme.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/scheme/JsonScheme.java
@@ -19,19 +19,18 @@
 
 package org.apache.eagle.alert.engine.scheme;
 
+import backtype.storm.spout.Scheme;
+import backtype.storm.tuple.Fields;
+import org.codehaus.jackson.map.ObjectMapper;
+import org.slf4j.Logger;
+
 import java.io.IOException;
 import java.util.Arrays;
 import java.util.List;
 import java.util.Map;
 
-import org.codehaus.jackson.map.ObjectMapper;
-import org.slf4j.Logger;
-
-import backtype.storm.spout.Scheme;
-import backtype.storm.tuple.Fields;
-
 /**
- * Expects flat Json scheme
+ * Expects flat Json scheme.
  */
 public class JsonScheme implements Scheme {
     private static final long serialVersionUID = -8352896475656975577L;
@@ -54,16 +53,18 @@ public class JsonScheme implements Scheme {
     @SuppressWarnings("rawtypes")
     public List<Object> deserialize(byte[] ser) {
         try {
-            if(ser != null ) {
+            if (ser != null) {
                 Map map = mapper.readValue(ser, Map.class);
                 return Arrays.asList(topic, map);
-            }else{
-                if(LOG.isDebugEnabled()) LOG.debug("Content is null, ignore");
+            } else {
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug("Content is null, ignore");
+                }
             }
         } catch (IOException e) {
             try {
                 LOG.error("Failed to deserialize as JSON: {}", new String(ser, "UTF-8"), e);
-            }catch(Exception ex){
+            } catch (Exception ex) {
                 LOG.error(ex.getMessage(), ex);
             }
         }

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a0fc8708/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/scheme/JsonStringStreamNameSelector.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/scheme/JsonStringStreamNameSelector.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/scheme/JsonStringStreamNameSelector.java
index 226dd84..4e02edb 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/scheme/JsonStringStreamNameSelector.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/scheme/JsonStringStreamNameSelector.java
@@ -19,23 +19,22 @@
 
 package org.apache.eagle.alert.engine.scheme;
 
-import java.util.Map;
-import java.util.Properties;
-
 import org.apache.eagle.alert.coordination.model.StreamNameSelector;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.Map;
+import java.util.Properties;
+
 /**
  * A strategy to get stream name from message tuple.
- * 
- * Since 5/5/16.
+ * @since 5/5/16.
  */
 public class JsonStringStreamNameSelector implements StreamNameSelector {
-    private final static Logger LOG = LoggerFactory.getLogger(JsonStringStreamNameSelector.class);
-    public final static String USER_PROVIDED_STREAM_NAME_PROPERTY = "userProvidedStreamName";
-    public final static String FIELD_NAMES_TO_INFER_STREAM_NAME_PROPERTY = "fieldNamesToInferStreamName";
-    public final static String STREAM_NAME_FORMAT = "streamNameFormat";
+    private static final  Logger LOG = LoggerFactory.getLogger(JsonStringStreamNameSelector.class);
+    public static final  String USER_PROVIDED_STREAM_NAME_PROPERTY = "userProvidedStreamName";
+    public static final  String FIELD_NAMES_TO_INFER_STREAM_NAME_PROPERTY = "fieldNamesToInferStreamName";
+    public static final  String STREAM_NAME_FORMAT = "streamNameFormat";
 
     private String userProvidedStreamName;
     private String[] fieldNamesToInferStreamName;
@@ -70,5 +69,5 @@ public class JsonStringStreamNameSelector implements StreamNameSelector {
         }
         return "defaultStringStream";
     }
-    
+
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a0fc8708/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/scheme/PlainStringScheme.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/scheme/PlainStringScheme.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/scheme/PlainStringScheme.java
index 194b0c2..57c8897 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/scheme/PlainStringScheme.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/scheme/PlainStringScheme.java
@@ -19,21 +19,20 @@
 
 package org.apache.eagle.alert.engine.scheme;
 
+import backtype.storm.spout.Scheme;
+import backtype.storm.tuple.Fields;
+import backtype.storm.tuple.Values;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import java.nio.charset.Charset;
 import java.nio.charset.StandardCharsets;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import backtype.storm.spout.Scheme;
-import backtype.storm.tuple.Fields;
-import backtype.storm.tuple.Values;
-
 /**
- * used for parsing plain string
+ * used for parsing plain string.
  */
 public class PlainStringScheme implements Scheme {
     private static final long serialVersionUID = 5969724968671646310L;
@@ -42,7 +41,7 @@ public class PlainStringScheme implements Scheme {
     private String topic;
 
     @SuppressWarnings("rawtypes")
-    public PlainStringScheme(String topic, Map conf){
+    public PlainStringScheme(String topic, Map conf) {
         this.topic = topic;
     }
 
@@ -57,7 +56,7 @@ public class PlainStringScheme implements Scheme {
         return new Fields(STRING_SCHEME_KEY);
     }
 
-    @SuppressWarnings({ "unchecked", "rawtypes" })
+    @SuppressWarnings( {"unchecked", "rawtypes"})
     @Override
     public List<Object> deserialize(byte[] ser) {
         Map m = new HashMap<>();

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a0fc8708/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/scheme/PlainStringStreamNameSelector.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/scheme/PlainStringStreamNameSelector.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/scheme/PlainStringStreamNameSelector.java
index 61ec943..0b88483 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/scheme/PlainStringStreamNameSelector.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/scheme/PlainStringStreamNameSelector.java
@@ -19,29 +19,31 @@
 
 package org.apache.eagle.alert.engine.scheme;
 
-import java.util.Map;
-import java.util.Properties;
-
 import org.apache.eagle.alert.coordination.model.StreamNameSelector;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.Map;
+import java.util.Properties;
+
 /**
  * Since 5/3/16.
  */
 public class PlainStringStreamNameSelector implements StreamNameSelector {
     @SuppressWarnings("unused")
     private static final Logger LOG = LoggerFactory.getLogger(PlainStringStreamNameSelector.class);
-    private final static String USER_PROVIDED_STREAM_NAME_PROPERTY = "userProvidedStreamName";
+    private static final String USER_PROVIDED_STREAM_NAME_PROPERTY = "userProvidedStreamName";
     private static final String DEFAULT_STRING_STREAM_NAME = "defaultStringStream";
 
     private String streamName;
 
-    public PlainStringStreamNameSelector(Properties prop){
+    public PlainStringStreamNameSelector(Properties prop) {
         streamName = prop.getProperty(USER_PROVIDED_STREAM_NAME_PROPERTY);
-        if(streamName == null)
+        if (streamName == null) {
             streamName = DEFAULT_STRING_STREAM_NAME;
+        }
     }
+
     @Override
     public String getStreamName(Map<String, Object> tuple) {
         return streamName;

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a0fc8708/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/PartitionedEventDigestSerializer.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/PartitionedEventDigestSerializer.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/PartitionedEventDigestSerializer.java
index 03c1dfb..1e8f440 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/PartitionedEventDigestSerializer.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/PartitionedEventDigestSerializer.java
@@ -16,18 +16,18 @@
  */
 package org.apache.eagle.alert.engine.serialization;
 
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-
 import org.apache.eagle.alert.engine.coordinator.StreamPartition;
 import org.apache.eagle.alert.engine.model.PartitionedEvent;
 import org.apache.eagle.alert.engine.model.StreamEvent;
 import org.apache.eagle.alert.engine.serialization.impl.StreamEventSerializer;
 import org.apache.eagle.alert.engine.serialization.impl.StreamPartitionDigestSerializer;
 
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
 /**
- * TODO: Seams the complexity dosen't bring enough performance improve
+ * TODO: Seams the complexity dosen't bring enough performance improve.
  *
  * @see PartitionedEvent
  */
@@ -36,7 +36,7 @@ public class PartitionedEventDigestSerializer implements Serializer<PartitionedE
     private final Serializer<StreamEvent> streamEventSerializer;
     private final Serializer<StreamPartition> streamPartitionSerializer;
 
-    public PartitionedEventDigestSerializer(SerializationMetadataProvider serializationMetadataProvider){
+    public PartitionedEventDigestSerializer(SerializationMetadataProvider serializationMetadataProvider) {
         this.streamEventSerializer = new StreamEventSerializer(serializationMetadataProvider);
         this.streamPartitionSerializer = StreamPartitionDigestSerializer.INSTANCE;
     }
@@ -44,8 +44,8 @@ public class PartitionedEventDigestSerializer implements Serializer<PartitionedE
     @Override
     public void serialize(PartitionedEvent entity, DataOutput dataOutput) throws IOException {
         dataOutput.writeLong(entity.getPartitionKey());
-        streamEventSerializer.serialize(entity.getEvent(),dataOutput);
-        streamPartitionSerializer.serialize(entity.getPartition(),dataOutput);
+        streamEventSerializer.serialize(entity.getEvent(), dataOutput);
+        streamPartitionSerializer.serialize(entity.getPartition(), dataOutput);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a0fc8708/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/PartitionedEventSerializer.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/PartitionedEventSerializer.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/PartitionedEventSerializer.java
index f653361..428ad34 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/PartitionedEventSerializer.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/PartitionedEventSerializer.java
@@ -16,24 +16,13 @@
  */
 package org.apache.eagle.alert.engine.serialization;
 
-import java.io.IOException;
-
 import org.apache.eagle.alert.engine.model.PartitionedEvent;
 
+import java.io.IOException;
+
 public interface PartitionedEventSerializer {
-    /**
-     *
-     * @param entity
-     * @return
-     * @throws IOException
-     */
+
     byte[] serialize(PartitionedEvent entity) throws IOException;
 
-    /**
-     *
-     * @param bytes
-     * @return
-     * @throws IOException
-     */
     PartitionedEvent deserialize(byte[] bytes) throws IOException;
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a0fc8708/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/SerializationMetadataProvider.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/SerializationMetadataProvider.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/SerializationMetadataProvider.java
index 69bb695..42f0559 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/SerializationMetadataProvider.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/SerializationMetadataProvider.java
@@ -19,15 +19,13 @@ package org.apache.eagle.alert.engine.serialization;
 import org.apache.eagle.alert.engine.coordinator.StreamDefinition;
 import org.apache.eagle.alert.engine.coordinator.StreamDefinitionNotFoundException;
 
-import java.io.IOException;
-
 /**
- * Integration interface to provide stream definition for serializer
+ * Integration interface to provide stream definition for serializer.
  */
 public interface SerializationMetadataProvider {
     /**
      * @param streamId
-     * @return StreamDefinition or null if not exist
+     * @return StreamDefinition or null if not exist.
      */
     StreamDefinition getStreamDefinition(String streamId) throws StreamDefinitionNotFoundException;
 

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a0fc8708/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/Serializer.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/Serializer.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/Serializer.java
index c2f87d0..599152e 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/Serializer.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/Serializer.java
@@ -21,6 +21,7 @@ import java.io.DataOutput;
 import java.io.IOException;
 
 public interface Serializer<V> {
-    void serialize(V value,DataOutput dataOutput) throws IOException;
+    void serialize(V value, DataOutput dataOutput) throws IOException;
+
     V deserialize(DataInput dataInput) throws IOException;
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a0fc8708/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/Serializers.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/Serializers.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/Serializers.java
index 6be8f1a..a84b5dc 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/Serializers.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/Serializers.java
@@ -16,49 +16,42 @@
  */
 package org.apache.eagle.alert.engine.serialization;
 
+import org.apache.eagle.alert.engine.coordinator.StreamColumn;
+import org.apache.eagle.alert.engine.serialization.impl.*;
+
 import java.util.HashMap;
 import java.util.Map;
 
-import org.apache.eagle.alert.engine.coordinator.StreamColumn;
-import org.apache.eagle.alert.engine.serialization.impl.BooleanSerializer;
-import org.apache.eagle.alert.engine.serialization.impl.DoubleSerializer;
-import org.apache.eagle.alert.engine.serialization.impl.FloatSerializer;
-import org.apache.eagle.alert.engine.serialization.impl.IntegerSerializer;
-import org.apache.eagle.alert.engine.serialization.impl.JavaObjectSerializer;
-import org.apache.eagle.alert.engine.serialization.impl.LongSerializer;
-import org.apache.eagle.alert.engine.serialization.impl.PartitionedEventSerializerImpl;
-import org.apache.eagle.alert.engine.serialization.impl.StringSerializer;
-
 public class Serializers {
-    private final static Map<StreamColumn.Type,Serializer<?>> COLUMN_TYPE_SER_MAPPING = new HashMap<>();
+    private static final Map<StreamColumn.Type, Serializer<?>> COLUMN_TYPE_SER_MAPPING = new HashMap<>();
 
-    public static <T> void register(StreamColumn.Type type,Serializer<T> serializer){
-        if(COLUMN_TYPE_SER_MAPPING.containsKey(type)){
-            throw new IllegalArgumentException("Duplicated column type: "+type);
+    public static <T> void register(StreamColumn.Type type, Serializer<T> serializer) {
+        if (COLUMN_TYPE_SER_MAPPING.containsKey(type)) {
+            throw new IllegalArgumentException("Duplicated column type: " + type);
         }
-        COLUMN_TYPE_SER_MAPPING.put(type,serializer);
+        COLUMN_TYPE_SER_MAPPING.put(type, serializer);
     }
 
     @SuppressWarnings("unchecked")
-    public static <T> Serializer<T> getColumnSerializer(StreamColumn.Type type){
-        if(COLUMN_TYPE_SER_MAPPING.containsKey(type)){
+    public static <T> Serializer<T> getColumnSerializer(StreamColumn.Type type) {
+        if (COLUMN_TYPE_SER_MAPPING.containsKey(type)) {
             return (Serializer<T>) COLUMN_TYPE_SER_MAPPING.get(type);
-        }else{
-            throw new IllegalArgumentException("Serializer of type: "+type+" not found");
+        } else {
+            throw new IllegalArgumentException("Serializer of type: " + type + " not found");
         }
     }
 
-    public static PartitionedEventSerializer newPartitionedEventSerializer(SerializationMetadataProvider metadataProvider){
+    public static PartitionedEventSerializer newPartitionedEventSerializer(SerializationMetadataProvider metadataProvider) {
         return new PartitionedEventSerializerImpl(metadataProvider);
     }
 
     static {
-        register(StreamColumn.Type.STRING,new StringSerializer());
-        register(StreamColumn.Type.INT,new IntegerSerializer());
-        register(StreamColumn.Type.LONG,new LongSerializer());
-        register(StreamColumn.Type.FLOAT,new FloatSerializer());
-        register(StreamColumn.Type.DOUBLE,new DoubleSerializer());
-        register(StreamColumn.Type.BOOL,new BooleanSerializer());
-        register(StreamColumn.Type.OBJECT,new JavaObjectSerializer());
+        register(StreamColumn.Type.STRING, new StringSerializer());
+        register(StreamColumn.Type.INT, new IntegerSerializer());
+        register(StreamColumn.Type.LONG, new LongSerializer());
+        register(StreamColumn.Type.FLOAT, new FloatSerializer());
+        register(StreamColumn.Type.DOUBLE, new DoubleSerializer());
+        register(StreamColumn.Type.BOOL, new BooleanSerializer());
+        register(StreamColumn.Type.OBJECT, new JavaObjectSerializer());
     }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a0fc8708/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/impl/BooleanSerializer.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/impl/BooleanSerializer.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/impl/BooleanSerializer.java
index db91a70..1e90569 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/impl/BooleanSerializer.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/impl/BooleanSerializer.java
@@ -1,11 +1,11 @@
 package org.apache.eagle.alert.engine.serialization.impl;
 
+import org.apache.eagle.alert.engine.serialization.Serializer;
+
 import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
 
-import org.apache.eagle.alert.engine.serialization.Serializer;
-
 /**
  * Licensed to the Apache Software Foundation (ASF) under one or more
  * contributor license agreements.  See the NOTICE file distributed with

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a0fc8708/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/impl/DoubleSerializer.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/impl/DoubleSerializer.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/impl/DoubleSerializer.java
index f2f5359..ad5f53c 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/impl/DoubleSerializer.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/impl/DoubleSerializer.java
@@ -1,11 +1,11 @@
 package org.apache.eagle.alert.engine.serialization.impl;
 
+import org.apache.eagle.alert.engine.serialization.Serializer;
+
 import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
 
-import org.apache.eagle.alert.engine.serialization.Serializer;
-
 /**
  * Licensed to the Apache Software Foundation (ASF) under one or more
  * contributor license agreements.  See the NOTICE file distributed with

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a0fc8708/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/impl/FloatSerializer.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/impl/FloatSerializer.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/impl/FloatSerializer.java
index e6b510a..18089a9 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/impl/FloatSerializer.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/impl/FloatSerializer.java
@@ -1,11 +1,11 @@
 package org.apache.eagle.alert.engine.serialization.impl;
 
+import org.apache.eagle.alert.engine.serialization.Serializer;
+
 import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
 
-import org.apache.eagle.alert.engine.serialization.Serializer;
-
 /**
  * Licensed to the Apache Software Foundation (ASF) under one or more
  * contributor license agreements.  See the NOTICE file distributed with

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a0fc8708/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/impl/IntegerSerializer.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/impl/IntegerSerializer.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/impl/IntegerSerializer.java
index f784456..d2473a9 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/impl/IntegerSerializer.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/impl/IntegerSerializer.java
@@ -1,11 +1,11 @@
 package org.apache.eagle.alert.engine.serialization.impl;
 
+import org.apache.eagle.alert.engine.serialization.Serializer;
+
 import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
 
-import org.apache.eagle.alert.engine.serialization.Serializer;
-
 /**
  * Licensed to the Apache Software Foundation (ASF) under one or more
  * contributor license agreements.  See the NOTICE file distributed with

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a0fc8708/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/impl/JavaObjectSerializer.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/impl/JavaObjectSerializer.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/impl/JavaObjectSerializer.java
index 39baf2b..14d9ea5 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/impl/JavaObjectSerializer.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/impl/JavaObjectSerializer.java
@@ -16,14 +16,14 @@
  */
 package org.apache.eagle.alert.engine.serialization.impl;
 
+import org.apache.eagle.alert.engine.serialization.Serializer;
+import org.apache.commons.lang3.SerializationUtils;
+
 import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
 import java.io.Serializable;
 
-import org.apache.commons.lang3.SerializationUtils;
-import org.apache.eagle.alert.engine.serialization.Serializer;
-
 public class JavaObjectSerializer implements Serializer<Object> {
     @Override
     public void serialize(Object value, DataOutput dataOutput) throws IOException {

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a0fc8708/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/impl/LongSerializer.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/impl/LongSerializer.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/impl/LongSerializer.java
index 116b275..8d85c76 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/impl/LongSerializer.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/impl/LongSerializer.java
@@ -1,11 +1,11 @@
 package org.apache.eagle.alert.engine.serialization.impl;
 
+import org.apache.eagle.alert.engine.serialization.Serializer;
+
 import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
 
-import org.apache.eagle.alert.engine.serialization.Serializer;
-
 /**
  * Licensed to the Apache Software Foundation (ASF) under one or more
  * contributor license agreements.  See the NOTICE file distributed with

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a0fc8708/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/impl/PartitionedEventSerializerImpl.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/impl/PartitionedEventSerializerImpl.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/impl/PartitionedEventSerializerImpl.java
index 5a3d77d..2b0140f 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/impl/PartitionedEventSerializerImpl.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/impl/PartitionedEventSerializerImpl.java
@@ -16,10 +16,6 @@
  */
 package org.apache.eagle.alert.engine.serialization.impl;
 
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-
 import org.apache.eagle.alert.engine.coordinator.StreamPartition;
 import org.apache.eagle.alert.engine.model.PartitionedEvent;
 import org.apache.eagle.alert.engine.model.StreamEvent;
@@ -27,23 +23,27 @@ import org.apache.eagle.alert.engine.serialization.PartitionedEventSerializer;
 import org.apache.eagle.alert.engine.serialization.SerializationMetadataProvider;
 import org.apache.eagle.alert.engine.serialization.Serializer;
 import org.apache.eagle.alert.engine.utils.CompressionUtils;
-
 import com.google.common.io.ByteArrayDataOutput;
 import com.google.common.io.ByteStreams;
 
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
 /**
  * Stream Metadata Cached Serializer
  *
- * Performance:
+ * <p> Performance:
  *
  * 1) VS Kryo Direct: reduce 73.4% space (bytes) and 42.5 % time (ms).
  * 2) VS Java Native: reduce 92.5% space (bytes) and 94.2% time (ms)
+ * </p>
  *
- * Tips:
- *
+ * <p>Tips:
  * 1) Without-compression performs better than with compression for small event
+ * </p>
  *
- * TODO: Cache Partition would save little space but almost half of serialization time, how to balance the performance?
+ * <p>TODO: Cache Partition would save little space but almost half of serialization time, how to balance the performance?</p>
  *
  * @see PartitionedEvent
  */
@@ -53,10 +53,10 @@ public class PartitionedEventSerializerImpl implements Serializer<PartitionedEve
     private final boolean compress;
 
     /**
-     * @param serializationMetadataProvider metadata provider
-     * @param compress false by default
+     * @param serializationMetadataProvider metadata provider.
+     * @param compress                      false by default.
      */
-    public PartitionedEventSerializerImpl(SerializationMetadataProvider serializationMetadataProvider,boolean compress) {
+    public PartitionedEventSerializerImpl(SerializationMetadataProvider serializationMetadataProvider, boolean compress) {
         this.streamEventSerializer = new StreamEventSerializer(serializationMetadataProvider);
         this.streamPartitionSerializer = StreamPartitionSerializer.INSTANCE;
         this.compress = compress;
@@ -76,6 +76,13 @@ public class PartitionedEventSerializerImpl implements Serializer<PartitionedEve
     }
 
     @Override
+    public byte[] serialize(PartitionedEvent entity) throws IOException {
+        ByteArrayDataOutput dataOutput = ByteStreams.newDataOutput();
+        this.serialize(entity, dataOutput);
+        return compress ? CompressionUtils.compress(dataOutput.toByteArray()) : dataOutput.toByteArray();
+    }
+
+    @Override
     public PartitionedEvent deserialize(DataInput dataInput) throws IOException {
         PartitionedEvent event = new PartitionedEvent();
         event.setPartitionKey(dataInput.readLong());
@@ -87,15 +94,9 @@ public class PartitionedEventSerializerImpl implements Serializer<PartitionedEve
         return event;
     }
 
-    @Override
-    public byte[] serialize(PartitionedEvent entity) throws IOException {
-        ByteArrayDataOutput dataOutput = ByteStreams.newDataOutput();
-        this.serialize(entity,dataOutput);
-        return compress ? CompressionUtils.compress(dataOutput.toByteArray()):dataOutput.toByteArray();
-    }
 
     @Override
     public PartitionedEvent deserialize(byte[] bytes) throws IOException {
-        return this.deserialize(ByteStreams.newDataInput(compress ? CompressionUtils.decompress(bytes):bytes));
+        return this.deserialize(ByteStreams.newDataInput(compress ? CompressionUtils.decompress(bytes) : bytes));
     }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a0fc8708/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/impl/StreamEventSerializer.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/impl/StreamEventSerializer.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/impl/StreamEventSerializer.java
index e13b23f..d7119db 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/impl/StreamEventSerializer.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/impl/StreamEventSerializer.java
@@ -29,25 +29,22 @@ import java.io.IOException;
 import java.util.BitSet;
 
 /**
+ * StreamEventSerializer.
+ *
  * @see StreamEvent
  */
 public class StreamEventSerializer implements Serializer<StreamEvent> {
     private final SerializationMetadataProvider serializationMetadataProvider;
 
-    public StreamEventSerializer(SerializationMetadataProvider serializationMetadataProvider){
+    public StreamEventSerializer(SerializationMetadataProvider serializationMetadataProvider) {
         this.serializationMetadataProvider = serializationMetadataProvider;
     }
 
-    /**
-     *
-     * @param objects
-     * @return
-     */
-    private BitSet isNullBitSet(Object[] objects){
+    private BitSet isNullBitSet(Object[] objects) {
         BitSet bitSet = new BitSet();
         int i = 0;
-        for(Object obj:objects){
-            bitSet.set(i,obj == null);
+        for (Object obj : objects) {
+            bitSet.set(i, obj == null);
             i++;
         }
         return bitSet;
@@ -58,28 +55,30 @@ public class StreamEventSerializer implements Serializer<StreamEvent> {
         // Bryant: here "metaVersion/streamId" writes to dataOutputUTF
         String metaVersion = event.getMetaVersion();
         String streamId = event.getStreamId();
-        String metaVersion_streamId = String.format("%s/%s", metaVersion, streamId);
+        String metaVersionStreamId = String.format("%s/%s", metaVersion, streamId);
 
-        dataOutput.writeUTF(metaVersion_streamId);
+        dataOutput.writeUTF(metaVersionStreamId);
         dataOutput.writeLong(event.getTimestamp());
-        if(event.getData() == null || event.getData().length == 0){
+        if (event.getData() == null || event.getData().length == 0) {
             dataOutput.writeInt(0);
-        }else{
+        } else {
             BitSet isNullIndex = isNullBitSet(event.getData());
             byte[] isNullBytes = isNullIndex.toByteArray();
             dataOutput.writeInt(isNullBytes.length);
             dataOutput.write(isNullBytes);
-            int i =0;
+            int i = 0;
             StreamDefinition definition = serializationMetadataProvider.getStreamDefinition(event.getStreamId());
-            if(definition == null) throw new IOException("StreamDefinition not found: "+event.getStreamId());
-            if(event.getData().length != definition.getColumns().size()){
-                throw new IOException("Event :"+event+" doesn't match with schema: "+definition);
+            if (definition == null) {
+                throw new IOException("StreamDefinition not found: " + event.getStreamId());
+            }
+            if (event.getData().length != definition.getColumns().size()) {
+                throw new IOException("Event :" + event + " doesn't match with schema: " + definition);
             }
-            for(StreamColumn column:definition.getColumns()){
-                if(!isNullIndex.get(i)) {
-                    Serializers.getColumnSerializer(column.getType()).serialize(event.getData()[i],dataOutput);
+            for (StreamColumn column : definition.getColumns()) {
+                if (!isNullIndex.get(i)) {
+                    Serializers.getColumnSerializer(column.getType()).serialize(event.getData()[i], dataOutput);
                 }
-                i ++;
+                i++;
             }
         }
     }
@@ -87,9 +86,9 @@ public class StreamEventSerializer implements Serializer<StreamEvent> {
     @Override
     public StreamEvent deserialize(DataInput dataInput) throws IOException {
         StreamEvent event = new StreamEvent();
-        String metaVersion_streamId = dataInput.readUTF();
-        String streamId = metaVersion_streamId.split("/")[1];
-        String metaVersion = metaVersion_streamId.split("/")[0];
+        String metaVersionStreamId = dataInput.readUTF();
+        String streamId = metaVersionStreamId.split("/")[1];
+        String metaVersion = metaVersionStreamId.split("/")[0];
         event.setStreamId(streamId);
         event.setMetaVersion(metaVersion);
 
@@ -101,11 +100,11 @@ public class StreamEventSerializer implements Serializer<StreamEvent> {
         BitSet isNullIndex = BitSet.valueOf(isNullBytes);
         Object[] attributes = new Object[definition.getColumns().size()];
         int i = 0;
-        for(StreamColumn column:definition.getColumns()){
-            if(!isNullIndex.get(i)) {
+        for (StreamColumn column : definition.getColumns()) {
+            if (!isNullIndex.get(i)) {
                 attributes[i] = Serializers.getColumnSerializer(column.getType()).deserialize(dataInput);
             }
-            i ++;
+            i++;
         }
         event.setData(attributes);
         return event;

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a0fc8708/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/impl/StreamPartitionDigestSerializer.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/impl/StreamPartitionDigestSerializer.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/impl/StreamPartitionDigestSerializer.java
index f35da39..6a47f1e 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/impl/StreamPartitionDigestSerializer.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/impl/StreamPartitionDigestSerializer.java
@@ -16,39 +16,35 @@
  */
 package org.apache.eagle.alert.engine.serialization.impl;
 
-import java.io.ByteArrayOutputStream;
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-import java.io.ObjectOutputStream;
+import org.apache.eagle.alert.engine.coordinator.StreamPartition;
+import org.apache.eagle.alert.engine.serialization.Serializer;
+
+import java.io.*;
 import java.security.MessageDigest;
 import java.security.NoSuchAlgorithmException;
 import java.util.Arrays;
 import java.util.HashMap;
 import java.util.Map;
 
-import org.apache.eagle.alert.engine.coordinator.StreamPartition;
-import org.apache.eagle.alert.engine.serialization.Serializer;
-
 /**
- * Don't serialize streamId
+ * Don't serialize streamId.
  *
  * @see StreamPartition
  */
 public class StreamPartitionDigestSerializer implements Serializer<StreamPartition> {
-    public final static StreamPartitionDigestSerializer INSTANCE = new StreamPartitionDigestSerializer();
+    public static final StreamPartitionDigestSerializer INSTANCE = new StreamPartitionDigestSerializer();
 
-    private final Map<DigestBytes,StreamPartition> checkSumPartitionMap = new HashMap<>();
-    private final Map<StreamPartition,DigestBytes> partitionCheckSumMap = new HashMap<>();
+    private final Map<DigestBytes, StreamPartition> checkSumPartitionMap = new HashMap<>();
+    private final Map<StreamPartition, DigestBytes> partitionCheckSumMap = new HashMap<>();
 
     @Override
     public void serialize(StreamPartition partition, DataOutput dataOutput) throws IOException {
         DigestBytes checkSum = partitionCheckSumMap.get(partition);
-        if(checkSum == null){
+        if (checkSum == null) {
             try {
                 checkSum = digestCheckSum(partition);
-                partitionCheckSumMap.put(partition,checkSum);
-                checkSumPartitionMap.put(checkSum,partition);
+                partitionCheckSumMap.put(partition, checkSum);
+                checkSumPartitionMap.put(checkSum, partition);
             } catch (NoSuchAlgorithmException e) {
                 throw new IOException(e);
             }
@@ -63,8 +59,8 @@ public class StreamPartitionDigestSerializer implements Serializer<StreamPartiti
         byte[] checksum = new byte[checkSumLen];
         dataInput.readFully(checksum);
         StreamPartition partition = checkSumPartitionMap.get(new DigestBytes(checksum));
-        if(partition == null){
-            throw new IOException("Illegal partition checksum: "+checksum);
+        if (partition == null) {
+            throw new IOException("Illegal partition checksum: " + checksum);
         }
         return partition;
     }
@@ -72,7 +68,7 @@ public class StreamPartitionDigestSerializer implements Serializer<StreamPartiti
     private class DigestBytes {
         private final byte[] data;
 
-        public DigestBytes(byte[] bytes){
+        public DigestBytes(byte[] bytes) {
             this.data = bytes;
         }
 
@@ -85,10 +81,12 @@ public class StreamPartitionDigestSerializer implements Serializer<StreamPartiti
         public int hashCode() {
             return Arrays.hashCode(data);
         }
-        public int size(){
+
+        public int size() {
             return data.length;
         }
-        public byte[] toByteArray(){
+
+        public byte[] toByteArray() {
             return data;
         }
     }

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a0fc8708/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/impl/StreamPartitionSerializer.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/impl/StreamPartitionSerializer.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/impl/StreamPartitionSerializer.java
index 4105277..411368f 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/impl/StreamPartitionSerializer.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/impl/StreamPartitionSerializer.java
@@ -16,38 +16,38 @@
  */
 package org.apache.eagle.alert.engine.serialization.impl;
 
+import org.apache.eagle.alert.engine.coordinator.StreamPartition;
+import org.apache.eagle.alert.engine.coordinator.StreamSortSpec;
+import org.apache.eagle.alert.engine.serialization.Serializer;
+
 import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
 
-import org.apache.eagle.alert.engine.coordinator.StreamPartition;
-import org.apache.eagle.alert.engine.coordinator.StreamSortSpec;
-import org.apache.eagle.alert.engine.serialization.Serializer;
-
 /**
- * Don't serialize streamId
+ * Don't serialize streamId.
  *
  * @see StreamPartition
  */
 public class StreamPartitionSerializer implements Serializer<StreamPartition> {
-    public final static StreamPartitionSerializer INSTANCE = new StreamPartitionSerializer();
+    public static final StreamPartitionSerializer INSTANCE = new StreamPartitionSerializer();
 
     @Override
     public void serialize(StreamPartition partition, DataOutput dataOutput) throws IOException {
         dataOutput.writeUTF(partition.getType().toString());
-        if(partition.getColumns() == null || partition.getColumns().size() == 0){
+        if (partition.getColumns() == null || partition.getColumns().size() == 0) {
             dataOutput.writeInt(0);
         } else {
             dataOutput.writeInt(partition.getColumns().size());
-            for(String column:partition.getColumns()){
+            for (String column : partition.getColumns()) {
                 dataOutput.writeUTF(column);
             }
         }
-        if(partition.getSortSpec() == null){
+        if (partition.getSortSpec() == null) {
             dataOutput.writeByte(0);
-        }else {
+        } else {
             dataOutput.writeByte(1);
             dataOutput.writeUTF(partition.getSortSpec().getWindowPeriod());
             dataOutput.writeInt(partition.getSortSpec().getWindowMargin());
@@ -59,14 +59,14 @@ public class StreamPartitionSerializer implements Serializer<StreamPartition> {
         StreamPartition partition = new StreamPartition();
         partition.setType(StreamPartition.Type.locate(dataInput.readUTF()));
         int colSize = dataInput.readInt();
-        if(colSize>0){
+        if (colSize > 0) {
             List<String> columns = new ArrayList<>(colSize);
-            for(int i=0;i<colSize;i++){
+            for (int i = 0; i < colSize; i++) {
                 columns.add(dataInput.readUTF());
             }
             partition.setColumns(columns);
         }
-        if(dataInput.readByte() == 1){
+        if (dataInput.readByte() == 1) {
             String period = dataInput.readUTF();
             int margin = dataInput.readInt();
 

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a0fc8708/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/impl/StringSerializer.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/impl/StringSerializer.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/impl/StringSerializer.java
index 940024d..2a1541a 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/impl/StringSerializer.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/impl/StringSerializer.java
@@ -1,11 +1,11 @@
 package org.apache.eagle.alert.engine.serialization.impl;
 
+import org.apache.eagle.alert.engine.serialization.Serializer;
+
 import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
 
-import org.apache.eagle.alert.engine.serialization.Serializer;
-
 /**
  * Licensed to the Apache Software Foundation (ASF) under one or more
  * contributor license agreements.  See the NOTICE file distributed with

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a0fc8708/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/siddhi/extension/AttributeCollectAggregator.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/siddhi/extension/AttributeCollectAggregator.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/siddhi/extension/AttributeCollectAggregator.java
index b705564..599f349 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/siddhi/extension/AttributeCollectAggregator.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/siddhi/extension/AttributeCollectAggregator.java
@@ -18,8 +18,7 @@
  */
 package org.apache.eagle.alert.engine.siddhi.extension;
 
-import java.util.LinkedList;
-
+import com.google.common.collect.ImmutableList;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.wso2.siddhi.core.config.ExecutionPlanContext;
@@ -28,11 +27,10 @@ import org.wso2.siddhi.core.query.selector.attribute.aggregator.AttributeAggrega
 import org.wso2.siddhi.query.api.definition.Attribute;
 import org.wso2.siddhi.query.api.definition.Attribute.Type;
 
-import com.google.common.collect.ImmutableList;
+import java.util.LinkedList;
 
 /**
- * @since Apr 1, 2016
- *
+ * @since Apr 1, 2016.
  */
 public class AttributeCollectAggregator extends AttributeAggregator {
 

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a0fc8708/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/siddhi/extension/AttributeCollectWithDistinctAggregator.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/siddhi/extension/AttributeCollectWithDistinctAggregator.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/siddhi/extension/AttributeCollectWithDistinctAggregator.java
index 43400c7..101d05b 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/siddhi/extension/AttributeCollectWithDistinctAggregator.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/siddhi/extension/AttributeCollectWithDistinctAggregator.java
@@ -16,8 +16,7 @@
  */
 package org.apache.eagle.alert.engine.siddhi.extension;
 
-import java.util.LinkedList;
-
+import com.google.common.collect.ImmutableList;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.wso2.siddhi.core.config.ExecutionPlanContext;
@@ -26,7 +25,7 @@ import org.wso2.siddhi.core.query.selector.attribute.aggregator.AttributeAggrega
 import org.wso2.siddhi.query.api.definition.Attribute;
 import org.wso2.siddhi.query.api.definition.Attribute.Type;
 
-import com.google.common.collect.ImmutableList;
+import java.util.LinkedList;
 
 public class AttributeCollectWithDistinctAggregator extends AttributeAggregator {
 
@@ -73,11 +72,11 @@ public class AttributeCollectWithDistinctAggregator extends AttributeAggregator
 
     @Override
     public Object processAdd(Object arg0) {
-    	// AttributeAggregator.process is already synchronized
-    	if (value.contains(arg0)) {
-    		value.remove(arg0);
-    	}
-    	value.add(arg0);
+        // AttributeAggregator.process is already synchronized
+        if (value.contains(arg0)) {
+            value.remove(arg0);
+        }
+        value.add(arg0);
         if (LOG.isDebugEnabled()) {
             LOG.debug("processAdd: current values are : " + value);
         }
@@ -86,11 +85,11 @@ public class AttributeCollectWithDistinctAggregator extends AttributeAggregator
 
     @Override
     public Object processAdd(Object[] arg0) {
-    	// AttributeAggregator.process is already synchronized
-    	if (value.contains(arg0)) {
-    		value.remove(arg0);
-    	}
-    	value.add(arg0);
+        // AttributeAggregator.process is already synchronized
+        if (value.contains(arg0)) {
+            value.remove(arg0);
+        }
+        value.add(arg0);
         if (LOG.isDebugEnabled()) {
             LOG.debug("processAdd: current values are : " + value);
         }
@@ -120,5 +119,5 @@ public class AttributeCollectWithDistinctAggregator extends AttributeAggregator
         value.clear();
         return value;
     }
-    
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a0fc8708/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/siddhi/extension/ContainsIgnoreCaseExtension.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/siddhi/extension/ContainsIgnoreCaseExtension.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/siddhi/extension/ContainsIgnoreCaseExtension.java
index 1bd24ed..27df63b 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/siddhi/extension/ContainsIgnoreCaseExtension.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/siddhi/extension/ContainsIgnoreCaseExtension.java
@@ -31,16 +31,16 @@ public class ContainsIgnoreCaseExtension extends FunctionExecutor {
     @Override
     protected void init(ExpressionExecutor[] attributeExpressionExecutors, ExecutionPlanContext executionPlanContext) {
         if (attributeExpressionExecutors.length != 2) {
-            throw new ExecutionPlanValidationException("Invalid no of arguments passed to str:containsIgnoreCase() function, required 2, " +
-                    "but found " + attributeExpressionExecutors.length);
+            throw new ExecutionPlanValidationException("Invalid no of arguments passed to str:containsIgnoreCase() function, required 2, "
+                + "but found " + attributeExpressionExecutors.length);
         }
         if (attributeExpressionExecutors[0].getReturnType() != Attribute.Type.STRING) {
-            throw new ExecutionPlanValidationException("Invalid parameter type found for the first argument of str:containsIgnoreCase() function, " +
-                    "required "+ Attribute.Type.STRING+", but found "+attributeExpressionExecutors[0].getReturnType().toString());
+            throw new ExecutionPlanValidationException("Invalid parameter type found for the first argument of str:containsIgnoreCase() function, "
+                + "required " + Attribute.Type.STRING + ", but found " + attributeExpressionExecutors[0].getReturnType().toString());
         }
         if (attributeExpressionExecutors[1].getReturnType() != Attribute.Type.STRING) {
-            throw new ExecutionPlanValidationException("Invalid parameter type found for the second argument of str:containsIgnoreCase() function, " +
-                    "required "+ Attribute.Type.STRING+", but found "+attributeExpressionExecutors[1].getReturnType().toString());
+            throw new ExecutionPlanValidationException("Invalid parameter type found for the second argument of str:containsIgnoreCase() function, "
+                + "required " + Attribute.Type.STRING + ", but found " + attributeExpressionExecutors[1].getReturnType().toString());
         }
     }
 
@@ -52,8 +52,8 @@ public class ContainsIgnoreCaseExtension extends FunctionExecutor {
         if (data[1] == null) {
             throw new ExecutionPlanRuntimeException("Invalid input given to str:containsIgnoreCase() function. Second argument cannot be null");
         }
-        String str1 = (String)data[0];
-        String str2 = (String)data[1];
+        String str1 = (String) data[0];
+        String str2 = (String) data[1];
         return str1.toUpperCase().contains(str2.toUpperCase());
     }
 
@@ -79,7 +79,7 @@ public class ContainsIgnoreCaseExtension extends FunctionExecutor {
 
     @Override
     public Object[] currentState() {
-        return new Object[]{};
+        return new Object[] {};
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a0fc8708/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/siddhi/extension/EqualsIgnoreCaseExtension.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/siddhi/extension/EqualsIgnoreCaseExtension.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/siddhi/extension/EqualsIgnoreCaseExtension.java
index e99c4b9..1292e05 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/siddhi/extension/EqualsIgnoreCaseExtension.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/siddhi/extension/EqualsIgnoreCaseExtension.java
@@ -19,15 +19,11 @@ package org.apache.eagle.alert.engine.siddhi.extension;
 
 import org.wso2.siddhi.core.config.ExecutionPlanContext;
 import org.wso2.siddhi.core.exception.ExecutionPlanRuntimeException;
-import org.wso2.siddhi.core.executor.ConstantExpressionExecutor;
 import org.wso2.siddhi.core.executor.ExpressionExecutor;
 import org.wso2.siddhi.core.executor.function.FunctionExecutor;
 import org.wso2.siddhi.query.api.definition.Attribute;
 import org.wso2.siddhi.query.api.exception.ExecutionPlanValidationException;
 
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
-
 public class EqualsIgnoreCaseExtension extends FunctionExecutor {
 
     Attribute.Type returnType = Attribute.Type.BOOL;
@@ -35,16 +31,16 @@ public class EqualsIgnoreCaseExtension extends FunctionExecutor {
     @Override
     protected void init(ExpressionExecutor[] attributeExpressionExecutors, ExecutionPlanContext executionPlanContext) {
         if (attributeExpressionExecutors.length != 2) {
-            throw new ExecutionPlanValidationException("Invalid no of arguments passed to str:equalsIgnoreCase() function, required 2, " +
-                    "but found " + attributeExpressionExecutors.length);
+            throw new ExecutionPlanValidationException("Invalid no of arguments passed to str:equalsIgnoreCase() function, required 2, "
+                + "but found " + attributeExpressionExecutors.length);
         }
         if (attributeExpressionExecutors[0].getReturnType() != Attribute.Type.STRING) {
-            throw new ExecutionPlanValidationException("Invalid parameter type found for the first argument of str:equalsIgnoreCase() function, " +
-                    "required "+ Attribute.Type.STRING+", but found "+attributeExpressionExecutors[0].getReturnType().toString());
+            throw new ExecutionPlanValidationException("Invalid parameter type found for the first argument of str:equalsIgnoreCase() function, "
+                + "required " + Attribute.Type.STRING + ", but found " + attributeExpressionExecutors[0].getReturnType().toString());
         }
         if (attributeExpressionExecutors[1].getReturnType() != Attribute.Type.STRING) {
-            throw new ExecutionPlanValidationException("Invalid parameter type found for the second argument of str:equalsIgnoreCase() function, " +
-                    "required "+ Attribute.Type.STRING+", but found "+attributeExpressionExecutors[1].getReturnType().toString());
+            throw new ExecutionPlanValidationException("Invalid parameter type found for the second argument of str:equalsIgnoreCase() function, "
+                + "required " + Attribute.Type.STRING + ", but found " + attributeExpressionExecutors[1].getReturnType().toString());
         }
     }
 
@@ -56,8 +52,8 @@ public class EqualsIgnoreCaseExtension extends FunctionExecutor {
         if (data[1] == null) {
             throw new ExecutionPlanRuntimeException("Invalid input given to str:equalsIgnoreCase() function. Second argument cannot be null");
         }
-        String str1 = (String)data[0];
-        String str2 = (String)data[1];
+        String str1 = (String) data[0];
+        String str2 = (String) data[1];
         return str1.equalsIgnoreCase(str2);
     }
 
@@ -83,7 +79,7 @@ public class EqualsIgnoreCaseExtension extends FunctionExecutor {
 
     @Override
     public Object[] currentState() {
-        return new Object[]{};
+        return new Object[] {};
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a0fc8708/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/siddhi/extension/RegexpIgnoreCaseFunctionExtension.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/siddhi/extension/RegexpIgnoreCaseFunctionExtension.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/siddhi/extension/RegexpIgnoreCaseFunctionExtension.java
index d384d47..fe2280f 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/siddhi/extension/RegexpIgnoreCaseFunctionExtension.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/siddhi/extension/RegexpIgnoreCaseFunctionExtension.java
@@ -44,18 +44,18 @@ public class RegexpIgnoreCaseFunctionExtension extends RegexpFunctionExtension {
     @Override
     protected void init(ExpressionExecutor[] attributeExpressionExecutors, ExecutionPlanContext executionPlanContext) {
         if (attributeExpressionExecutors.length != 2) {
-            throw new ExecutionPlanValidationException("Invalid no of arguments passed to str:regexpIgnoreCase() function, required 2, " +
-                    "but found " + attributeExpressionExecutors.length);
+            throw new ExecutionPlanValidationException("Invalid no of arguments passed to str:regexpIgnoreCase() function, required 2, "
+                + "but found " + attributeExpressionExecutors.length);
         }
         if (attributeExpressionExecutors[0].getReturnType() != Attribute.Type.STRING) {
-            throw new ExecutionPlanValidationException("Invalid parameter type found for the first argument of str:regexpIgnoreCase() function, " +
-                    "required "+ Attribute.Type.STRING+", but found "+attributeExpressionExecutors[0].getReturnType().toString());
+            throw new ExecutionPlanValidationException("Invalid parameter type found for the first argument of str:regexpIgnoreCase() function, "
+                + "required " + Attribute.Type.STRING + ", but found " + attributeExpressionExecutors[0].getReturnType().toString());
         }
         if (attributeExpressionExecutors[1].getReturnType() != Attribute.Type.STRING) {
-            throw new ExecutionPlanValidationException("Invalid parameter type found for the second argument of str:regexpIgnoreCase() function, " +
-                    "required "+ Attribute.Type.STRING+", but found "+attributeExpressionExecutors[1].getReturnType().toString());
+            throw new ExecutionPlanValidationException("Invalid parameter type found for the second argument of str:regexpIgnoreCase() function, "
+                + "required " + Attribute.Type.STRING + ", but found " + attributeExpressionExecutors[1].getReturnType().toString());
         }
-        if(attributeExpressionExecutors[1] instanceof ConstantExpressionExecutor){
+        if (attributeExpressionExecutors[1] instanceof ConstantExpressionExecutor) {
             isRegexConstant = true;
             regexConstant = (String) ((ConstantExpressionExecutor) attributeExpressionExecutors[1]).getValue();
             patternConstant = Pattern.compile(regexConstant, Pattern.CASE_INSENSITIVE);
@@ -76,7 +76,7 @@ public class RegexpIgnoreCaseFunctionExtension extends RegexpFunctionExtension {
         }
         String source = (String) data[0];
 
-        if(!isRegexConstant){
+        if (!isRegexConstant) {
             regex = (String) data[1];
             pattern = Pattern.compile(regex, Pattern.CASE_INSENSITIVE);
             matcher = pattern.matcher(source);

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a0fc8708/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/sorter/BaseStreamWindow.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/sorter/BaseStreamWindow.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/sorter/BaseStreamWindow.java
index 38c5c30..0f75b6a 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/sorter/BaseStreamWindow.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/sorter/BaseStreamWindow.java
@@ -16,18 +16,18 @@
  */
 package org.apache.eagle.alert.engine.sorter;
 
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicLong;
-
-import org.apache.commons.lang3.builder.HashCodeBuilder;
 import org.apache.eagle.alert.engine.PartitionedEventCollector;
 import org.apache.eagle.alert.utils.DateTimeUtil;
+import org.apache.commons.lang3.builder.HashCodeBuilder;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+
 /**
- * TODO: Make sure thread-safe
- * TODO: Leverage Off-Heap Memory to persist append-only events collection
+ * TODO: Make sure thread-safe.
+ * TODO: Leverage Off-Heap Memory to persist append-only events collection.
  */
 public abstract class BaseStreamWindow implements StreamWindow {
     private final long endTime;
@@ -35,22 +35,17 @@ public abstract class BaseStreamWindow implements StreamWindow {
     private final long margin;
     private final AtomicBoolean expired;
     private final long createdTime;
-    private final static Logger LOG = LoggerFactory.getLogger(BaseStreamWindow.class);
+    private static final Logger LOG = LoggerFactory.getLogger(BaseStreamWindow.class);
     private PartitionedEventCollector collector;
     private final AtomicLong lastFlushedStreamTime;
     private final AtomicLong lastFlushedSystemTime;
 
-    /**
-     * @param startTime
-     * @param endTime
-     * @param marginTime
-     */
-    public BaseStreamWindow(long startTime, long endTime, long marginTime){
-        if(startTime >= endTime){
-            throw new IllegalArgumentException("startTime: "+startTime+" >= endTime: "+endTime+", expected: startTime < endTime");
+    public BaseStreamWindow(long startTime, long endTime, long marginTime) {
+        if (startTime >= endTime) {
+            throw new IllegalArgumentException("startTime: " + startTime + " >= endTime: " + endTime + ", expected: startTime < endTime");
         }
-        if(marginTime > endTime - startTime){
-            throw new IllegalArgumentException("marginTime: "+marginTime+" > endTime: "+endTime+" - startTime "+startTime+", expected: marginTime < endTime - startTime");
+        if (marginTime > endTime - startTime) {
+            throw new IllegalArgumentException("marginTime: " + marginTime + " > endTime: " + endTime + " - startTime " + startTime + ", expected: marginTime < endTime - startTime");
         }
         this.startTime = startTime;
         this.endTime = endTime;
@@ -63,8 +58,9 @@ public abstract class BaseStreamWindow implements StreamWindow {
 
     @Override
     public void register(PartitionedEventCollector collector) {
-        if(this.collector!=null)
+        if (this.collector != null) {
             throw new IllegalArgumentException("Duplicated collector error");
+        }
         this.collector = collector;
     }
 
@@ -78,7 +74,7 @@ public abstract class BaseStreamWindow implements StreamWindow {
     }
 
     @Override
-    public long rejectTime(){
+    public long rejectTime() {
         return this.lastFlushedStreamTime.get();
     }
 
@@ -93,7 +89,7 @@ public abstract class BaseStreamWindow implements StreamWindow {
 
     public boolean accept(final long eventTime) {
         return !expired() && eventTime >= startTime && eventTime < endTime
-                && eventTime >= lastFlushedStreamTime.get(); // dropped
+            && eventTime >= lastFlushedStreamTime.get(); // dropped
     }
 
     public boolean expired() {
@@ -106,35 +102,38 @@ public abstract class BaseStreamWindow implements StreamWindow {
     }
 
     /**
-     *
-     *
      * Expire when
      * 1) If stream time >= endTime + marginTime, then flush and expire
      * 2) If systemTime - flushedTime > endTime - startTime + marginTime && streamTime >= endTime, then flush and expire.
      * 3) If systemTime - flushedTime > endTime - startTime + marginTime && streamTime < endTime, then flush but not expire.
      * 4) else do nothing
      *
-     *  @param clock stream time clock
-     *  @param globalSystemTime system time clock
+     * @param clock            stream time clock
+     * @param globalSystemTime system time clock
      */
     @Override
-    public synchronized void onTick(StreamTimeClock clock,long globalSystemTime) {
-        if(!expired()) {
-            if(clock.getTime() >= endTime + margin){
-                LOG.info("Expiring {} at stream time:{}, latency:{}, window: {}",clock.getStreamId(),DateTimeUtil.millisecondsToHumanDateWithMilliseconds(clock.getTime()),globalSystemTime - lastFlushedSystemTime.get(),this);
+    public synchronized void onTick(StreamTimeClock clock, long globalSystemTime) {
+        if (!expired()) {
+            if (clock.getTime() >= endTime + margin) {
+                LOG.info("Expiring {} at stream time:{}, latency:{}, window: {}", clock.getStreamId(),
+                    DateTimeUtil.millisecondsToHumanDateWithMilliseconds(clock.getTime()), globalSystemTime - lastFlushedSystemTime.get(), this);
                 lastFlushedStreamTime.set(clock.getTime());
                 lastFlushedSystemTime.set(globalSystemTime);
                 flush();
                 expired.set(true);
-            } else if(globalSystemTime - lastFlushedSystemTime.get() >=  endTime + margin - startTime && size() > 0){
-                LOG.info("Flushing {} at system time: {}, stream time: {}, latency: {}, window: {}",clock.getStreamId(),DateTimeUtil.millisecondsToHumanDateWithMilliseconds(globalSystemTime),DateTimeUtil.millisecondsToHumanDateWithMilliseconds(clock.getTime()),globalSystemTime - lastFlushedSystemTime.get(),this);
+            } else if (globalSystemTime - lastFlushedSystemTime.get() >= endTime + margin - startTime && size() > 0) {
+                LOG.info("Flushing {} at system time: {}, stream time: {}, latency: {}, window: {}", clock.getStreamId(),
+                    DateTimeUtil.millisecondsToHumanDateWithMilliseconds(globalSystemTime),
+                    DateTimeUtil.millisecondsToHumanDateWithMilliseconds(clock.getTime()), globalSystemTime - lastFlushedSystemTime.get(), this);
                 lastFlushedStreamTime.set(clock.getTime());
                 lastFlushedSystemTime.set(globalSystemTime);
                 flush();
-                if(lastFlushedStreamTime.get()>=this.endTime) expired.set(true);
+                if (lastFlushedStreamTime.get() >= this.endTime) {
+                    expired.set(true);
+                }
             }
         } else {
-            LOG.warn("Window has already expired, should not tick: {}",this.toString());
+            LOG.warn("Window has already expired, should not tick: {}", this.toString());
         }
     }
 
@@ -150,7 +149,7 @@ public abstract class BaseStreamWindow implements StreamWindow {
 
     @Override
     public boolean equals(Object obj) {
-        if(obj !=null && obj instanceof BaseStreamWindow) {
+        if (obj != null && obj instanceof BaseStreamWindow) {
             BaseStreamWindow another = (BaseStreamWindow) obj;
             return another.startTime == this.startTime && another.endTime == this.endTime && another.margin == this.margin;
         }
@@ -158,26 +157,27 @@ public abstract class BaseStreamWindow implements StreamWindow {
     }
 
     @Override
-    public void flush(){
-        if(this.collector == null) throw new NullPointerException("Collector is not given before window flush");
+    public void flush() {
+        if (this.collector == null) {
+            throw new NullPointerException("Collector is not given before window flush");
+        }
         this.flush(collector);
     }
 
     /**
-     *
-     * @param collector
-     * @return max timestamp
+     * @param collector PartitionedEventCollector.
+     * @return max timestamp.
      */
     protected abstract void flush(PartitionedEventCollector collector);
 
     @Override
     public String toString() {
         return String.format("StreamWindow[period=[%s,%s), margin=%s ms, size=%s, reject=%s]",
-                DateTimeUtil.millisecondsToHumanDateWithMilliseconds(this.startTime),
-                DateTimeUtil.millisecondsToHumanDateWithMilliseconds(this.endTime),
-                this.margin,
-                size(),
-                this.rejectTime() == 0 ? DateTimeUtil.millisecondsToHumanDateWithMilliseconds(this.startTime): DateTimeUtil.millisecondsToHumanDateWithMilliseconds(this.rejectTime())
-                );
+            DateTimeUtil.millisecondsToHumanDateWithMilliseconds(this.startTime),
+            DateTimeUtil.millisecondsToHumanDateWithMilliseconds(this.endTime),
+            this.margin,
+            size(),
+            this.rejectTime() == 0 ? DateTimeUtil.millisecondsToHumanDateWithMilliseconds(this.startTime) : DateTimeUtil.millisecondsToHumanDateWithMilliseconds(this.rejectTime())
+        );
     }
 }
\ No newline at end of file