You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@eagle.apache.org by ha...@apache.org on 2016/07/07 00:53:35 UTC

[5/6] incubator-eagle git commit: Rebase code base

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/72a1501c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/UnitTopologyMain.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/UnitTopologyMain.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/UnitTopologyMain.java
index 2bca329..57529b6 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/UnitTopologyMain.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/UnitTopologyMain.java
@@ -19,6 +19,10 @@
 
 package org.apache.eagle.alert.engine;
 
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.CommandLineParser;
+import org.apache.commons.cli.DefaultParser;
+import org.apache.commons.cli.Options;
 import org.apache.eagle.alert.config.ZKConfig;
 import org.apache.eagle.alert.config.ZKConfigBuilder;
 import org.apache.eagle.alert.engine.coordinator.impl.ZKMetadataChangeNotifyService;
@@ -40,19 +44,43 @@ import com.typesafe.config.ConfigFactory;
  */
 public class UnitTopologyMain {
 
-    public static void main(String[] args) {
+    public static void main(String[] args) throws Exception {
+        // command line parse
+        Options options = new Options();
+        options.addOption("c", true,
+                "config URL (valid file name) - defaults application.conf according to typesafe config default behavior.");
+        CommandLineParser parser = new DefaultParser();
+        CommandLine cmd = parser.parse(options, args);
+
+        if (cmd.hasOption("c")) {
+            String fileName = cmd.getOptionValue("c", "application.conf");
+            System.setProperty("config.resource", fileName.startsWith("/") ? fileName : "/" + fileName);
+            ConfigFactory.invalidateCaches();
+        }
         Config config = ConfigFactory.load();
-        ZKConfig zkConfig = ZKConfigBuilder.getZKConfig(config);
-        String topologyId = config.getString("topology.name");
-        ZKMetadataChangeNotifyService changeNotifyService = new ZKMetadataChangeNotifyService(zkConfig, topologyId);
 
+        // load config and start
+        String topologyId = config.getString("topology.name");
+        ZKMetadataChangeNotifyService changeNotifyService = createZKNotifyService(config, topologyId);
         new UnitTopologyRunner(changeNotifyService).run(topologyId, config);
     }
+    
+    public static void runTopology(Config config, backtype.storm.Config stormConfig) {
+        // load config and start
+        String topologyId = config.getString("topology.name");
+        ZKMetadataChangeNotifyService changeNotifyService = createZKNotifyService(config, topologyId);
+        new UnitTopologyRunner(changeNotifyService, stormConfig).run(topologyId, config);
+    }
 
-    public static StormTopology createTopology(Config config) {
+    private static ZKMetadataChangeNotifyService createZKNotifyService(Config config, String topologyId) {
         ZKConfig zkConfig = ZKConfigBuilder.getZKConfig(config);
-        String topologyId = config.getString("topology.name");
         ZKMetadataChangeNotifyService changeNotifyService = new ZKMetadataChangeNotifyService(zkConfig, topologyId);
+        return changeNotifyService;
+    }
+    
+    public static StormTopology createTopology(Config config) {
+        String topologyId = config.getString("topology.name");
+        ZKMetadataChangeNotifyService changeNotifyService = createZKNotifyService(config, topologyId);
 
         return new UnitTopologyRunner(changeNotifyService).buildTopology(topologyId, config);
     }

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/72a1501c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/PolicyStreamHandlers.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/PolicyStreamHandlers.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/PolicyStreamHandlers.java
index 2aa70e8..e8f736c 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/PolicyStreamHandlers.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/PolicyStreamHandlers.java
@@ -20,13 +20,17 @@ import java.util.Map;
 
 import org.apache.eagle.alert.engine.coordinator.StreamDefinition;
 import org.apache.eagle.alert.engine.evaluator.impl.SiddhiPolicyHandler;
+import org.apache.eagle.alert.engine.evaluator.nodata.NoDataPolicyHandler;
 
 public class PolicyStreamHandlers {
     public static final String SIDDHI_ENGINE ="siddhi";
+    public static final String NO_DATA_ALERT_ENGINE ="nodataalert";
 
     public static PolicyStreamHandler createHandler(String type, Map<String, StreamDefinition> sds){
         if(SIDDHI_ENGINE.equals(type)) {
             return new SiddhiPolicyHandler(sds);
+        }else if(NO_DATA_ALERT_ENGINE.equals(type)){
+            return new NoDataPolicyHandler(sds);
         }
         throw new IllegalArgumentException("Illegal policy stream handler type: "+type);
     }

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/72a1501c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/impl/DistinctValuesInTimeWindow.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/impl/DistinctValuesInTimeWindow.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/impl/DistinctValuesInTimeWindow.java
new file mode 100644
index 0000000..8a681da
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/impl/DistinctValuesInTimeWindow.java
@@ -0,0 +1,141 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.alert.engine.evaluator.impl;
+
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.SortedMap;
+import java.util.TreeMap;
+
+import org.apache.commons.lang.builder.HashCodeBuilder;
+
+/**
+ * Since 6/28/16.
+ * to get distinct values within a specified time window
+ * valueMaxTimeMap : each distinct value is associated with max timestamp it ever had
+ * timeSortedMap : map sorted by timestamp first and then value
+ * With the above 2 data structure, we can get distinct values in LOG(N)
+ */
+public class DistinctValuesInTimeWindow {
+    public static class ValueAndTime{
+        Object value;
+        long timestamp;
+        public ValueAndTime(Object value, long timestamp){
+            this.value = value;
+            this.timestamp = timestamp;
+        }
+
+        public String toString(){
+            return "[" + value + "," + timestamp + "]";
+        }
+
+        public int hashCode(){
+            return new HashCodeBuilder().append(value).append(timestamp).toHashCode();
+        }
+
+        public boolean equals(Object that){
+            if(!(that instanceof ValueAndTime))
+                return false;
+            ValueAndTime another = (ValueAndTime)that;
+            return another.timestamp == this.timestamp && another.value.equals(this.value);
+        }
+    }
+
+    public static class ValueAndTimeComparator implements Comparator<ValueAndTime>{
+        @Override
+        public int compare(ValueAndTime o1, ValueAndTime o2) {
+            if(o1.timestamp != o2.timestamp)
+                return (o1.timestamp > o2.timestamp) ? 1 : -1;
+            if(o1.value.equals(o2.value))
+                return 0;
+            else {
+                // this is not strictly correct, but I don't want to write too many comparators here :-)
+                if(o1.hashCode() > o2.hashCode())
+                    return 1;
+                else
+                    return -1;
+            }
+        }
+    }
+
+    /**
+     * map from value to max timestamp for this value
+     */
+    private Map<Object, Long> valueMaxTimeMap = new HashMap<>();
+    /**
+     * map sorted by time(max timestamp for the value) and then value
+     */
+    private SortedMap<ValueAndTime, ValueAndTime> timeSortedMap = new TreeMap<>(new ValueAndTimeComparator());
+    private long maxTimestamp = 0L;
+    private long window;
+    private boolean windowSlided;
+
+    /**
+     * @param window - milliseconds
+     */
+    public DistinctValuesInTimeWindow(long window){
+        this.window = window;
+    }
+
+    public void send(Object value, long timestamp){
+        ValueAndTime vt = new ValueAndTime(value, timestamp);
+
+        // todo think of time out of order
+        if(valueMaxTimeMap.containsKey(value)){
+            // remove that entry with old timestamp in timeSortedMap
+            long oldTime = valueMaxTimeMap.get(value);
+            if(oldTime >= timestamp){
+                // no any effect as the new timestamp is equal or even less than old timestamp
+                return;
+            }
+            timeSortedMap.remove(new ValueAndTime(value, oldTime));
+        }
+        // insert entry with new timestamp in timeSortedMap
+        timeSortedMap.put(vt, vt);
+        // update new timestamp in valueMaxTimeMap
+        valueMaxTimeMap.put(value, timestamp);
+
+        // evict old entries
+        // store max timestamp if possible
+        maxTimestamp = Math.max(maxTimestamp, timestamp);
+
+        // check if some values should be evicted because of time window
+        Iterator<Map.Entry<ValueAndTime, ValueAndTime>> it = timeSortedMap.entrySet().iterator();
+        while(it.hasNext()){
+            Map.Entry<ValueAndTime, ValueAndTime> entry = it.next();
+            if(entry.getKey().timestamp < maxTimestamp - window){
+                // should remove the entry in valueMaxTimeMap and timeSortedMap
+                valueMaxTimeMap.remove(entry.getKey().value);
+                windowSlided = true;
+
+                it.remove();
+            }else {
+                break;
+            }
+        }
+    }
+
+    public Map<Object, Long> distinctValues(){
+        return valueMaxTimeMap;
+    }
+
+    public boolean windowSlided(){
+        return windowSlided;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/72a1501c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/impl/PolicyGroupEvaluatorImpl.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/impl/PolicyGroupEvaluatorImpl.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/impl/PolicyGroupEvaluatorImpl.java
index f063618..8a1f04a 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/impl/PolicyGroupEvaluatorImpl.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/impl/PolicyGroupEvaluatorImpl.java
@@ -90,7 +90,7 @@ public class PolicyGroupEvaluatorImpl implements PolicyGroupEvaluator {
                     policyStreamHandler.getValue().send(partitionedEvent.getEvent());
                 } catch (Exception e) {
                     this.context.counter().scope("fail_count").incr();
-                    LOG.error("{} failed to handle {}",policyStreamHandler.getValue(), partitionedEvent.getEvent());
+                    LOG.error("{} failed to handle {}",policyStreamHandler.getValue(), partitionedEvent.getEvent(), e);
                 }
             }
         }

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/72a1501c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/impl/SiddhiPolicyHandler.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/impl/SiddhiPolicyHandler.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/impl/SiddhiPolicyHandler.java
index dfa5612..ed26408 100755
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/impl/SiddhiPolicyHandler.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/impl/SiddhiPolicyHandler.java
@@ -76,7 +76,9 @@ public class SiddhiPolicyHandler implements PolicyStreamHandler {
          */
         @Override
         public void receive(Event[] events) {
-            LOG.info("Generated {} alerts from policy '{}' in {}", events.length,context.getPolicyDefinition().getName(), context.getPolicyEvaluatorId());
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("Generated {} alerts from policy '{}' in {}", events.length,context.getPolicyDefinition().getName(), context.getPolicyEvaluatorId());
+            }
             for(Event e : events) {
                 AlertStreamEvent event = new AlertStreamEvent();
                 event.setTimestamp(e.getTimestamp());
@@ -131,6 +133,10 @@ public class SiddhiPolicyHandler implements PolicyStreamHandler {
         if(inputHandler != null){
             context.getPolicyCounter().scope(String.format("%s.%s",this.context.getPolicyDefinition().getName(),"eval_count")).incr();
             inputHandler.send(event.getTimestamp(),event.getData());
+            
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("sent event to siddhi stream {} ", streamId);
+            }
         }else{
             context.getPolicyCounter().scope(String.format("%s.%s",this.context.getPolicyDefinition().getName(),"drop_count")).incr();
             LOG.warn("No input handler found for stream {}",streamId);

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/72a1501c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/nodata/NoDataPolicyHandler.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/nodata/NoDataPolicyHandler.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/nodata/NoDataPolicyHandler.java
new file mode 100644
index 0000000..ed13f71
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/nodata/NoDataPolicyHandler.java
@@ -0,0 +1,164 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.alert.engine.evaluator.nodata;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.eagle.alert.engine.Collector;
+import org.apache.eagle.alert.engine.coordinator.PolicyDefinition;
+import org.apache.eagle.alert.engine.coordinator.StreamDefinition;
+import org.apache.eagle.alert.engine.evaluator.PolicyHandlerContext;
+import org.apache.eagle.alert.engine.evaluator.PolicyStreamHandler;
+import org.apache.eagle.alert.engine.evaluator.impl.DistinctValuesInTimeWindow;
+import org.apache.eagle.alert.engine.model.AlertStreamEvent;
+import org.apache.eagle.alert.engine.model.StreamEvent;
+import org.apache.eagle.alert.utils.TimePeriodUtils;
+import org.joda.time.Period;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Since 6/28/16.
+ * No Data Policy engine
+ * based on the following information
+ * 1. stream definition: group by columns
+ * 2. timestamp field: timestamp column
+ * 3. wiri safe time window: how long window is good for full set of wiri
+ * 4. wisb: full set
+ *
+ * No data policy definition should include
+ * fixed fields and dynamic fields
+ * fixed fields are leading fields : windowPeriod, type, numOfFields, f1_name, f2_name
+ * dynamic fields depend on wisb type.
+ */
+public class NoDataPolicyHandler implements PolicyStreamHandler{
+    private static final Logger LOG = LoggerFactory.getLogger(NoDataPolicyHandler.class);
+    private Map<String, StreamDefinition> sds;
+
+    // wisb(what is should be) set for expected full set value of multiple columns
+    @SuppressWarnings("rawtypes")
+    private volatile Set wisbValues = null;
+    private volatile List<Integer> wisbFieldIndices = new ArrayList<>();
+    // reuse PolicyDefinition.defintion.value field to store full set of values separated by comma
+    private volatile PolicyDefinition policyDef;
+    private volatile DistinctValuesInTimeWindow distinctWindow;
+    private volatile Collector<AlertStreamEvent> collector;
+    private volatile PolicyHandlerContext context;
+    private volatile NoDataWisbType wisbType;
+
+    public NoDataPolicyHandler(Map<String, StreamDefinition> sds){
+        this.sds = sds;
+    }
+    @Override
+    public void prepare(Collector<AlertStreamEvent> collector, PolicyHandlerContext context) throws Exception {
+        this.collector = collector;
+        this.context = context;
+        this.policyDef = context.getPolicyDefinition();
+        List<String> inputStreams = policyDef.getInputStreams();
+        // validate inputStreams has to contain only one stream
+        if(inputStreams.size() != 1)
+            throw new IllegalArgumentException("policy inputStream size has to be 1 for no data alert");
+        // validate outputStream has to contain only one stream
+        if(policyDef.getOutputStreams().size() != 1)
+            throw new IllegalArgumentException("policy outputStream size has to be 1 for no data alert");
+
+        String is = inputStreams.get(0);
+        StreamDefinition sd = sds.get(is);
+
+        String policyValue = policyDef.getDefinition().getValue();
+        // assume that no data alert policy value consists of "windowPeriod, type, numOfFields, f1_name, f2_name, f1_value, f2_value, f1_value, f2_value}
+        String[] segments = policyValue.split(",");
+        long windowPeriod = TimePeriodUtils.getMillisecondsOfPeriod(Period.parse(segments[0]));
+        distinctWindow = new DistinctValuesInTimeWindow(windowPeriod);
+        this.wisbType = NoDataWisbType.valueOf(segments[1]);
+        // for provided wisb values, need to parse, for dynamic wisb values, it is computed through a window
+        if(wisbType == NoDataWisbType.provided) {
+            wisbValues = new NoDataWisbProvidedParser().parse(segments);
+        }
+        // populate wisb field names
+        int numOfFields = Integer.parseInt(segments[2]);
+        for(int i = 3; i < 3+numOfFields; i++){
+            String fn = segments[i];
+            wisbFieldIndices.add(sd.getColumnIndex(fn));
+        }
+    }
+
+    @SuppressWarnings({ "rawtypes", "unchecked" })
+    @Override
+    public void send(StreamEvent event) throws Exception {
+        Object[] data = event.getData();
+        List<Object> columnValues = new ArrayList<>();
+        for(int i=0; i<wisbFieldIndices.size(); i++){
+            Object o = data[wisbFieldIndices.get(i)];
+            // convert value to string
+            columnValues.add(o.toString());
+        }
+        distinctWindow.send(columnValues, event.getTimestamp());
+        Set wiriValues = distinctWindow.distinctValues().keySet();
+
+        LOG.debug("window slided: {}, with wiri: {}", distinctWindow.windowSlided(), distinctWindow.distinctValues());
+
+        if(distinctWindow.windowSlided()) {
+            compareAndEmit(wisbValues, wiriValues, event);
+        }
+
+        if(wisbType == NoDataWisbType.dynamic) {
+            // deep copy
+            wisbValues = new HashSet<>(wiriValues);
+        }
+    }
+
+    @SuppressWarnings("rawtypes")
+    private void compareAndEmit(Set wisb, Set wiri, StreamEvent event){
+        // compare with wisbValues if wisbValues are already there for dynamic type
+        Collection noDataValues = CollectionUtils.subtract(wisb, wiri);
+        LOG.debug("nodatavalues:" + noDataValues + ", wisb: " + wisb + ", wiri: " + wiri);
+        if (noDataValues != null && noDataValues.size() > 0) {
+            LOG.info("No data alert is triggered with no data values {} and wisb {}", noDataValues, wisbValues);
+            AlertStreamEvent alertEvent = createAlertEvent(event.getTimestamp(), event.getData());
+            collector.emit(alertEvent);
+        }
+    }
+
+    private AlertStreamEvent createAlertEvent(long timestamp, Object[] triggerEvent){
+        String is = policyDef.getInputStreams().get(0);
+        StreamDefinition sd = sds.get(is);
+
+        AlertStreamEvent event = new AlertStreamEvent();
+        event.setTimestamp(timestamp);
+        event.setData(triggerEvent);
+        event.setStreamId(policyDef.getOutputStreams().get(0));
+        event.setPolicy(context.getPolicyDefinition());
+        if (this.context.getParentEvaluator() != null) {
+            event.setCreatedBy(context.getParentEvaluator().getName());
+        }
+        event.setCreatedTime(System.currentTimeMillis());
+        event.setSchema(sd);
+        return event;
+    }
+
+    @Override
+    public void close() throws Exception {
+
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/72a1501c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/nodata/NoDataWisbParser.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/nodata/NoDataWisbParser.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/nodata/NoDataWisbParser.java
new file mode 100644
index 0000000..fe06067
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/nodata/NoDataWisbParser.java
@@ -0,0 +1,33 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.alert.engine.evaluator.nodata;
+
+import java.util.List;
+import java.util.Set;
+
+/**
+ * Since 6/29/16.
+ */
+public interface NoDataWisbParser {
+    /**
+     * parse policy definition and return WISB values for one or multiple fields
+     * for example host and data center are 2 fields for no data alert, then WISB is a list of two values
+     * @param args some information parsed from policy defintion
+     * @return list of list of field values
+     */
+    Set<List<String>> parse(String[] args);
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/72a1501c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/nodata/NoDataWisbProvidedParser.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/nodata/NoDataWisbProvidedParser.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/nodata/NoDataWisbProvidedParser.java
new file mode 100644
index 0000000..e13826a
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/nodata/NoDataWisbProvidedParser.java
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.alert.engine.evaluator.nodata;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+/**
+ * Since 6/29/16.
+ */
+public class NoDataWisbProvidedParser implements NoDataWisbParser{
+    @Override
+    /**
+     * policy value consists of "windowPeriod, type, numOfFields, f1_name, f2_name, f1_value, f2_value, f1_value, f2_value"
+     */
+    public Set<List<String>> parse(String[] args) {
+        int numOfFields = Integer.parseInt(args[2]);
+        Set<List<String>> wisbValues = new HashSet<>();
+        int i = 3 + numOfFields;
+        while(i<args.length){
+            List<String> fields = new ArrayList<>();
+            for(int j=0; j<numOfFields; j++){
+                fields.add(args[i+j]);
+            }
+            wisbValues.add(fields);
+            i += numOfFields;
+        }
+        return wisbValues;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/72a1501c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/nodata/NoDataWisbType.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/nodata/NoDataWisbType.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/nodata/NoDataWisbType.java
new file mode 100644
index 0000000..887d099
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/nodata/NoDataWisbType.java
@@ -0,0 +1,25 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.alert.engine.evaluator.nodata;
+
+/**
+ * Since 6/29/16.
+ */
+public enum NoDataWisbType {
+    provided,
+    dynamic
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/72a1501c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/AlertPublishPlugin.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/AlertPublishPlugin.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/AlertPublishPlugin.java
index 644fe2b..d24bdb0 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/AlertPublishPlugin.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/AlertPublishPlugin.java
@@ -17,25 +17,29 @@
  */
 package org.apache.eagle.alert.engine.publisher;
 
+import java.io.Closeable;
+import java.util.Map;
+
 import org.apache.eagle.alert.engine.coordinator.Publishment;
 import org.apache.eagle.alert.engine.model.AlertStreamEvent;
 import org.apache.eagle.alert.engine.publisher.impl.PublishStatus;
 
 import com.typesafe.config.Config;
 
-import java.io.Closeable;
-import java.util.Map;
-
 /**
  * Created on 2/10/16.
  * Notification Plug-in interface which provide abstraction layer to notify to different system
  */
 public interface AlertPublishPlugin extends Closeable {
     /**
-     * for initialization
+     * 
+     * @param config
+     * @param publishment
+     * @param configProperties - storm config that would be useful for some implementation
      * @throws Exception
      */
-    void init(Config config, Publishment publishment) throws Exception;
+    @SuppressWarnings("rawtypes")
+    void init(Config config, Publishment publishment, Map configProperties) throws Exception;
 
     void update(String dedupIntervalMin, Map<String, String> pluginProperties);
 

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/72a1501c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/AlertPublisher.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/AlertPublisher.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/AlertPublisher.java
index 7a44009..5c0e597 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/AlertPublisher.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/AlertPublisher.java
@@ -2,6 +2,7 @@ package org.apache.eagle.alert.engine.publisher;
 
 
 import java.io.Serializable;
+import java.util.Map;
 
 import org.apache.eagle.alert.engine.model.AlertStreamEvent;
 
@@ -24,7 +25,8 @@ import com.typesafe.config.Config;
  * limitations under the License.
  */
 public interface AlertPublisher extends AlertPublishListener, Serializable {
-    void init(Config config);
+    @SuppressWarnings("rawtypes")
+    void init(Config config, Map stormConfig);
     String getName();
     void nextEvent(AlertStreamEvent event);
     void close();

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/72a1501c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AbstractPublishPlugin.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AbstractPublishPlugin.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AbstractPublishPlugin.java
new file mode 100644
index 0000000..bd21415
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AbstractPublishPlugin.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.alert.engine.publisher.impl;
+
+import java.util.Map;
+
+import org.apache.eagle.alert.engine.codec.IEventSerializer;
+import org.apache.eagle.alert.engine.coordinator.Publishment;
+import org.apache.eagle.alert.engine.model.AlertStreamEvent;
+import org.apache.eagle.alert.engine.publisher.AlertDeduplicator;
+import org.apache.eagle.alert.engine.publisher.AlertPublishPlugin;
+import org.slf4j.Logger;
+
+import com.typesafe.config.Config;
+
+/**
+ * @since Jun 3, 2016
+ *
+ */
+public abstract class AbstractPublishPlugin implements AlertPublishPlugin {
+
+    protected AlertDeduplicator deduplicator;
+    protected PublishStatus status;
+    protected IEventSerializer serializer;
+    protected String pubName;
+
+    @SuppressWarnings("rawtypes")
+    @Override
+    public void init(Config config, Publishment publishment, Map conf) throws Exception {
+        this.deduplicator = new DefaultDeduplicator(publishment.getDedupIntervalMin());
+        this.pubName = publishment.getName();
+        String serializerClz = publishment.getSerializer();
+        try {
+            Object obj = Class.forName(serializerClz).getConstructor(Map.class).newInstance(conf);
+            if (!(obj instanceof IEventSerializer)) {
+                throw new Exception(String.format("serializer %s of publishement %s is not subclass to %s!",
+                        publishment.getSerializer(),
+                        publishment.getName(),
+                        IEventSerializer.class.getName()));
+            }
+            serializer = (IEventSerializer) obj;
+        } catch (Exception e) {
+            getLogger().error(String.format("initialized failed, use default StringEventSerializer, failure message : {}", e.getMessage()), e);
+            serializer = new StringEventSerializer(conf);
+        }
+    }
+
+    @Override
+    public void update(String dedupIntervalMin, Map<String, String> pluginProperties) {
+        deduplicator.setDedupIntervalMin(dedupIntervalMin);
+    }
+
+    @Override
+    public AlertStreamEvent dedup(AlertStreamEvent event) {
+        return deduplicator.dedup(event);
+    }
+
+    @Override
+    public PublishStatus getStatus() {
+        return status;
+    }
+
+    protected abstract Logger getLogger();
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/72a1501c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertEagleStorePersister.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertEagleStorePersister.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertEagleStorePersister.java
deleted file mode 100644
index 2a4e332..0000000
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertEagleStorePersister.java
+++ /dev/null
@@ -1,60 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- *
- */
-
-package org.apache.eagle.alert.engine.publisher.impl;
-
-import java.util.List;
-
-import org.apache.eagle.alert.engine.model.StreamEvent;
-import org.apache.eagle.alert.service.IMetadataServiceClient;
-import org.apache.eagle.alert.service.MetadataServiceClientImpl;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.typesafe.config.Config;
-
-/**
- * Alert API entity Persistor
- */
-public class AlertEagleStorePersister {
-	private static Logger LOG = LoggerFactory.getLogger(AlertEagleStorePersister.class);
-	private Config config;
-
-	public AlertEagleStorePersister(Config config) {
-		this.config = config;
-	}
-
-	/**
-	 * Persist passes list of Entities
-	 * @param list
-	 * @return
-     */
-	public boolean doPersist(List<? extends StreamEvent> list) {
-		if (list.isEmpty()) return false;
-		LOG.info("Going to persist entities, type: " + " " + list.get(0).getClass().getSimpleName() + ", list size: " + list.size());
-		try {
-			IMetadataServiceClient client = new MetadataServiceClientImpl(config);
-			// TODO: metadata service support
-		}
-		catch (Exception ex) {
-			LOG.error("Got an exception in persisting entities", ex);
-			return false;
-		}
-		return false;
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/72a1501c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertEagleStorePublisher.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertEagleStorePublisher.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertEagleStorePublisher.java
deleted file mode 100644
index 807aacc..0000000
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertEagleStorePublisher.java
+++ /dev/null
@@ -1,113 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- *
- */
-
-package org.apache.eagle.alert.engine.publisher.impl;
-
-import java.util.Arrays;
-import java.util.Map;
-
-import org.apache.commons.lang3.builder.HashCodeBuilder;
-import org.apache.eagle.alert.engine.coordinator.Publishment;
-import org.apache.eagle.alert.engine.model.AlertStreamEvent;
-import org.apache.eagle.alert.engine.publisher.AlertDeduplicator;
-import org.apache.eagle.alert.engine.publisher.AlertPublishPlugin;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.typesafe.config.Config;
-
-/**
- * Plugin to persist alerts to Eagle Storage
- */
-public class AlertEagleStorePublisher implements AlertPublishPlugin {
-
-    private static final Logger LOG = LoggerFactory.getLogger(AlertEagleStorePublisher.class);
-    private PublishStatus status;
-    private AlertEagleStorePersister persist;
-    private AlertDeduplicator deduplicator;
-
-    @Override
-    public void init(Config config, Publishment publishment) throws Exception {
-        this.persist = new AlertEagleStorePersister(config);
-        deduplicator = new DefaultDeduplicator(publishment.getDedupIntervalMin());
-        LOG.info("initialized plugin for EagleStorePlugin");
-    }
-
-    @Override
-    public void update(String dedupIntervalMin, Map<String, String> pluginProperties) {
-        deduplicator.setDedupIntervalMin(dedupIntervalMin);
-    }
-
-    @Override
-    public PublishStatus getStatus() {
-        return this.status;
-    }
-
-    @Override
-    public AlertStreamEvent dedup(AlertStreamEvent event) {
-        return deduplicator.dedup(event);
-    }
-
-    /**
-     * Persist AlertEntity to alert_details table
-     * @param event
-     */
-    @Override
-    public void onAlert(AlertStreamEvent event) {
-        LOG.info("write alert to eagle storage " + event);
-        event = dedup(event);
-        if(event == null) {
-            return;
-        }
-        PublishStatus status = new PublishStatus();
-        try{
-            boolean result = persist.doPersist(Arrays.asList(event));
-            if(result) {
-                status.successful = true;
-                status.errorMessage = "";
-            }else{
-                status.successful = false;
-                status.errorMessage = "";
-            }
-        }catch (Exception ex ){
-            status.successful = false;
-            status.errorMessage = ex.getMessage();
-            LOG.error("Fail writing alert entity to Eagle Store", ex);
-        }
-        this.status = status;
-    }
-
-    @Override
-    public void close() {
-
-    }
-
-    @Override
-    public int hashCode(){
-        return new HashCodeBuilder().append(getClass().getCanonicalName()).toHashCode();
-    }
-
-    @Override
-    public boolean equals(Object o){
-        if(o == this)
-            return true;
-        if(!(o instanceof AlertEagleStorePublisher))
-            return false;
-        return true;
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/72a1501c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertEmailPublisher.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertEmailPublisher.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertEmailPublisher.java
index efe29bc..9d191c0 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertEmailPublisher.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertEmailPublisher.java
@@ -27,8 +27,6 @@ import java.util.concurrent.TimeUnit;
 import org.apache.commons.lang3.builder.HashCodeBuilder;
 import org.apache.eagle.alert.engine.coordinator.Publishment;
 import org.apache.eagle.alert.engine.model.AlertStreamEvent;
-import org.apache.eagle.alert.engine.publisher.AlertDeduplicator;
-import org.apache.eagle.alert.engine.publisher.AlertPublishPlugin;
 import org.apache.eagle.alert.engine.publisher.PublishConstants;
 import org.apache.eagle.alert.engine.publisher.email.AlertEmailGenerator;
 import org.apache.eagle.alert.engine.publisher.email.AlertEmailGeneratorBuilder;
@@ -37,27 +35,28 @@ import org.slf4j.LoggerFactory;
 
 import com.typesafe.config.Config;
 
-public class AlertEmailPublisher implements AlertPublishPlugin {
+public class AlertEmailPublisher extends AbstractPublishPlugin {
 
     private static final Logger LOG = LoggerFactory.getLogger(AlertEmailPublisher.class);
-    private AlertEmailGenerator emailGenerator;
-    private AlertDeduplicator deduplicator;
-    private Map<String, String> emailConfig;
     private final static int DEFAULT_THREAD_POOL_CORE_SIZE = 4;
     private final static int DEFAULT_THREAD_POOL_MAX_SIZE = 8;
     private final static long DEFAULT_THREAD_POOL_SHRINK_TIME = 60000L; // 1 minute
+
+    private AlertEmailGenerator emailGenerator;
+    private Map<String, String> emailConfig;
+
     private transient ThreadPoolExecutor executorPool;
-    private PublishStatus status;
 
     @Override
-    public void init(Config config, Publishment publishment) throws Exception {
+    @SuppressWarnings("rawtypes")
+    public void init(Config config, Publishment publishment, Map conf) throws Exception {
+        super.init(config, publishment, conf);
         executorPool = new ThreadPoolExecutor(DEFAULT_THREAD_POOL_CORE_SIZE, DEFAULT_THREAD_POOL_MAX_SIZE, DEFAULT_THREAD_POOL_SHRINK_TIME, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());
         LOG.info(" Creating Email Generator... ");
         if (publishment.getProperties() != null) {
             emailConfig = new HashMap<>(publishment.getProperties());
             emailGenerator = createEmailGenerator(emailConfig);
         }
-        deduplicator = new DefaultDeduplicator(publishment.getDedupIntervalMin());
     }
 
     @Override
@@ -84,7 +83,8 @@ public class AlertEmailPublisher implements AlertPublishPlugin {
 
     @Override
     public void update(String dedupIntervalMin, Map<String, String> pluginProperties) {
-        deduplicator.setDedupIntervalMin(dedupIntervalMin);
+        super.update(dedupIntervalMin, pluginProperties);
+
         if (pluginProperties != null && ! emailConfig.equals(pluginProperties)) {
             emailConfig = new HashMap<>(pluginProperties);
             emailGenerator = createEmailGenerator(pluginProperties);
@@ -96,16 +96,6 @@ public class AlertEmailPublisher implements AlertPublishPlugin {
         this.executorPool.shutdown();
     }
 
-    @Override
-    public PublishStatus getStatus() {
-        return this.status;
-    }
-
-    @Override
-    public AlertStreamEvent dedup(AlertStreamEvent event) {
-        return deduplicator.dedup(event);
-    }
-
     /**
      * @param notificationConfig
      * @return
@@ -148,4 +138,9 @@ public class AlertEmailPublisher implements AlertPublishPlugin {
             return false;
         return true;
     }
+
+    @Override
+    protected Logger getLogger() {
+        return LOG;
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/72a1501c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertKafkaPublisher.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertKafkaPublisher.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertKafkaPublisher.java
index ea65298..2566f79 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertKafkaPublisher.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertKafkaPublisher.java
@@ -26,8 +26,6 @@ import java.util.concurrent.TimeUnit;
 
 import org.apache.eagle.alert.engine.coordinator.Publishment;
 import org.apache.eagle.alert.engine.model.AlertStreamEvent;
-import org.apache.eagle.alert.engine.publisher.AlertDeduplicator;
-import org.apache.eagle.alert.engine.publisher.AlertPublishPlugin;
 import org.apache.eagle.alert.engine.publisher.PublishConstants;
 import org.apache.kafka.clients.producer.KafkaProducer;
 import org.apache.kafka.clients.producer.ProducerRecord;
@@ -36,30 +34,30 @@ import org.slf4j.LoggerFactory;
 
 import com.typesafe.config.Config;
 
-public class AlertKafkaPublisher implements AlertPublishPlugin {
+public class AlertKafkaPublisher extends AbstractPublishPlugin {
 
     private static final Logger LOG = LoggerFactory.getLogger(AlertKafkaPublisher.class);
-    private AlertDeduplicator deduplicator;
-    private PublishStatus status;
+    private static final long MAX_TIMEOUT_MS = 60000;
+
     @SuppressWarnings("rawtypes")
     private KafkaProducer producer;
     private String brokerList;
     private String topic;
 
-    private final static long MAX_TIMEOUT_MS =60000;
-
     @Override
-    public void init(Config config, Publishment publishment) throws Exception {
-        deduplicator = new DefaultDeduplicator(publishment.getDedupIntervalMin());
+    @SuppressWarnings("rawtypes")
+    public void init(Config config, Publishment publishment, Map conf) throws Exception {
+        super.init(config, publishment, conf);
+
         if (publishment.getProperties() != null) {
             Map<String, String> kafkaConfig = new HashMap<>(publishment.getProperties());
             brokerList = kafkaConfig.get(PublishConstants.BROKER_LIST).trim();
-            producer = KafkaProducerManager.INSTANCE.getProducer(brokerList);
+            producer = KafkaProducerManager.INSTANCE.getProducer(brokerList, kafkaConfig);
             topic = kafkaConfig.get(PublishConstants.TOPIC).trim();
         }
     }
 
-    @SuppressWarnings("unchecked")
+    @SuppressWarnings({ "unchecked", "rawtypes" })
     @Override
     public void onAlert(AlertStreamEvent event) throws Exception {
         if (producer == null) {
@@ -72,7 +70,12 @@ public class AlertKafkaPublisher implements AlertPublishPlugin {
         }
         PublishStatus status = new PublishStatus();
         try {
-            Future<?> future = producer.send(createRecord(event, topic));
+            ProducerRecord record = createRecord(event, topic);
+            if (record == null) {
+                LOG.error(" Alert serialize return null, ignored message! ");
+                return;
+            }
+            Future<?> future = producer.send(record);
             future.get(MAX_TIMEOUT_MS, TimeUnit.MILLISECONDS);
             status.successful = true;
             status.errorMessage = "";
@@ -89,6 +92,7 @@ public class AlertKafkaPublisher implements AlertPublishPlugin {
         this.status = status;
     }
 
+    @SuppressWarnings("rawtypes")
     @Override
     public void update(String dedupIntervalMin, Map<String, String> pluginProperties) {
         deduplicator.setDedupIntervalMin(dedupIntervalMin);
@@ -99,7 +103,7 @@ public class AlertKafkaPublisher implements AlertPublishPlugin {
             brokerList = newBrokerList;
             KafkaProducer newProducer = null;
             try {
-                newProducer = KafkaProducerManager.INSTANCE.getProducer(brokerList);
+                newProducer = KafkaProducerManager.INSTANCE.getProducer(brokerList, pluginProperties);
             } catch (Exception e) {
                 LOG.error("Create KafkaProducer failed with configurations: {}", pluginProperties);
             }
@@ -113,24 +117,22 @@ public class AlertKafkaPublisher implements AlertPublishPlugin {
         producer.close();
     }
 
-    /**
-     * To Create  KafkaProducer Record
-     * @param event
-     * @return ProducerRecord
-     * @throws Exception
-     */
-    private ProducerRecord<String, String> createRecord(AlertStreamEvent event, String topic) throws Exception {
-        ProducerRecord<String, String>  record  = new ProducerRecord<>(topic, event.toString());
-        return record;
+    private ProducerRecord<String, Object> createRecord(AlertStreamEvent event, String topic) throws Exception {
+        Object o = serialzeEvent(event);
+        if (o != null) {
+            ProducerRecord<String, Object> record = new ProducerRecord<>(topic, o);
+            return record;
+        } else {
+            return null;
+        }
     }
 
-    @Override
-    public PublishStatus getStatus() {
-        return this.status;
+    private Object serialzeEvent(AlertStreamEvent event) {
+        return serializer.serialize(event);
     }
 
     @Override
-    public AlertStreamEvent dedup(AlertStreamEvent event) {
-        return this.deduplicator.dedup(event);
+    protected Logger getLogger() {
+        return LOG;
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/72a1501c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertPublishPluginsFactory.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertPublishPluginsFactory.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertPublishPluginsFactory.java
index f538088..82be5a0 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertPublishPluginsFactory.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertPublishPluginsFactory.java
@@ -18,6 +18,8 @@
 
 package org.apache.eagle.alert.engine.publisher.impl;
 
+import java.util.Map;
+
 import org.apache.eagle.alert.engine.coordinator.Publishment;
 import org.apache.eagle.alert.engine.publisher.AlertPublishPlugin;
 import org.slf4j.Logger;
@@ -32,12 +34,13 @@ public class AlertPublishPluginsFactory {
 
     private static final Logger LOG = LoggerFactory.getLogger(AlertPublishPluginsFactory.class);
 
-    public static AlertPublishPlugin createNotificationPlugin(Publishment publishment, Config config) {
+    @SuppressWarnings("rawtypes")
+    public static AlertPublishPlugin createNotificationPlugin(Publishment publishment, Config config, Map conf) {
         AlertPublishPlugin plugin = null;
         String publisherType = publishment.getType();
         try {
             plugin = (AlertPublishPlugin) Class.forName(publisherType).newInstance();
-            plugin.init(config, publishment);
+            plugin.init(config, publishment, conf);
         } catch (Exception ex) {
             LOG.error("Error in loading AlertPublisherPlugin class: ", ex);
             //throw new IllegalStateException(ex);

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/72a1501c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertPublisherImpl.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertPublisherImpl.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertPublisherImpl.java
index fce22f1..6baa616 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertPublisherImpl.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertPublisherImpl.java
@@ -32,6 +32,7 @@ import org.slf4j.LoggerFactory;
 
 import com.typesafe.config.Config;
 
+@SuppressWarnings("rawtypes")
 public class AlertPublisherImpl implements AlertPublisher {
     private static final long serialVersionUID = 4809983246198138865L;
     private final static Logger LOG = LoggerFactory.getLogger(AlertPublisherImpl.class);
@@ -40,14 +41,16 @@ public class AlertPublisherImpl implements AlertPublisher {
     private volatile Map<String, List<String>> policyPublishPluginMapping = new ConcurrentHashMap<>(1);
     private volatile Map<String, AlertPublishPlugin> publishPluginMapping = new ConcurrentHashMap<>(1);
     private Config config;
+    private Map conf;
 
     public AlertPublisherImpl(String name) {
         this.name = name;
     }
 
     @Override
-    public void init(Config config) {
+    public void init(Config config, Map conf) {
         this.config = config;
+        this.conf = conf;
     }
 
     @Override
@@ -84,6 +87,7 @@ public class AlertPublisherImpl implements AlertPublisher {
         publishPluginMapping.values().forEach(plugin -> plugin.close());
     }
 
+    @SuppressWarnings("unchecked")
     @Override
     public void onPublishChange(List<Publishment> added,
                                 List<Publishment> removed,
@@ -100,7 +104,7 @@ public class AlertPublisherImpl implements AlertPublisher {
         }
 
         for (Publishment publishment : added) {
-            AlertPublishPlugin plugin = AlertPublishPluginsFactory.createNotificationPlugin(publishment, config);
+            AlertPublishPlugin plugin = AlertPublishPluginsFactory.createNotificationPlugin(publishment, config, conf);
             if(plugin != null) {
                 publishPluginMapping.put(publishment.getName(), plugin);
                 onPolicyAdded(publishment.getPolicyIds(), publishment.getName());

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/72a1501c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/KafkaProducerManager.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/KafkaProducerManager.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/KafkaProducerManager.java
index e8964a8..6b7fc61 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/KafkaProducerManager.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/KafkaProducerManager.java
@@ -17,26 +17,56 @@
  */
 package org.apache.eagle.alert.engine.publisher.impl;
 
+import java.util.Map;
 import java.util.Properties;
 
 import org.apache.kafka.clients.producer.KafkaProducer;
 
 /**
- * The producer is thread safe and sharing a single producer instance across threads will generally be faster than having multiple instances.
+ * The producer is thread safe and sharing a single producer instance across threads will generally be faster than
+ * having multiple instances.
  */
 public class KafkaProducerManager {
-	public static final KafkaProducerManager INSTANCE = new KafkaProducerManager();
-
-	public KafkaProducer<String, Object> getProducer(String brokerList) {
-		Properties configMap = new Properties();
-		configMap.put("bootstrap.servers", brokerList);
-		configMap.put("metadata.broker.list", brokerList);
-		configMap.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
-		configMap.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
-		configMap.put("request.required.acks", "1");	     
-		configMap.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
-		configMap.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
-		KafkaProducer<String, Object> producer = new KafkaProducer<>(configMap);
-		return producer;
-	}
+
+    private static final String STRING_SERIALIZER = "org.apache.kafka.common.serialization.StringSerializer";
+    private static final String VALUE_DESERIALIZER = "value.deserializer";
+    private static final String KEY_DESERIALIZER = "key.deserializer";
+    private static final String VALUE_SERIALIZER = "value.serializer";
+    private static final String KEY_SERIALIZER = "key.serializer";
+
+    public static final KafkaProducerManager INSTANCE = new KafkaProducerManager();
+
+    public KafkaProducer<String, Object> getProducer(String brokerList, Map<String, String> kafkaConfig) {
+        Properties configMap = new Properties();
+        configMap.put("bootstrap.servers", brokerList);
+        configMap.put("metadata.broker.list", brokerList);
+
+        if (kafkaConfig.containsKey(KEY_SERIALIZER)) {
+            configMap.put(KEY_SERIALIZER, kafkaConfig.get(KEY_SERIALIZER));
+        } else {
+            configMap.put(KEY_SERIALIZER, STRING_SERIALIZER);
+        }
+
+        if (kafkaConfig.containsKey(VALUE_SERIALIZER)) {
+            configMap.put(VALUE_SERIALIZER, kafkaConfig.get(VALUE_SERIALIZER));
+        } else {
+            configMap.put(VALUE_SERIALIZER, STRING_SERIALIZER);
+        }
+        configMap.put("request.required.acks", "1");
+
+        if (kafkaConfig.containsKey(KEY_DESERIALIZER)) {
+            configMap.put(KEY_DESERIALIZER, kafkaConfig.get(KEY_DESERIALIZER));
+        } else {
+            configMap.put(KEY_DESERIALIZER, STRING_SERIALIZER);
+        }
+
+        if (kafkaConfig.containsKey(VALUE_DESERIALIZER)) {
+            configMap.put(VALUE_DESERIALIZER, kafkaConfig.get(VALUE_DESERIALIZER));
+        } else {
+            configMap.put(VALUE_DESERIALIZER, STRING_SERIALIZER);
+        }
+
+        KafkaProducer<String, Object> producer = new KafkaProducer<>(configMap);
+        return producer;
+    }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/72a1501c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/StringEventSerializer.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/StringEventSerializer.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/StringEventSerializer.java
new file mode 100644
index 0000000..012ebaa
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/StringEventSerializer.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.alert.engine.publisher.impl;
+
+import java.util.Map;
+
+import org.apache.eagle.alert.engine.codec.IEventSerializer;
+import org.apache.eagle.alert.engine.model.AlertStreamEvent;
+
+/**
+ * @since Jun 3, 2016
+ *
+ */
+public class StringEventSerializer implements IEventSerializer {
+    
+    @SuppressWarnings("rawtypes")
+    public StringEventSerializer(Map stormConf) {
+    }
+
+    @Override
+    public Object serialize(AlertStreamEvent event) {
+        return event.toString();
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/72a1501c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/router/impl/StreamRouterBoltOutputCollector.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/router/impl/StreamRouterBoltOutputCollector.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/router/impl/StreamRouterBoltOutputCollector.java
index db235a7..28d2d22 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/router/impl/StreamRouterBoltOutputCollector.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/router/impl/StreamRouterBoltOutputCollector.java
@@ -57,7 +57,7 @@ public class StreamRouterBoltOutputCollector implements PartitionedEventCollecto
     private final static Logger LOG = LoggerFactory.getLogger(StreamRouterBoltOutputCollector.class);
     private final OutputCollector outputCollector;
     private final Object outputLock = new Object();
-    private final List<String> outputStreamIds;
+//    private final List<String> outputStreamIds;
     private final StreamContext streamContext;
     private final PartitionedEventSerializer serializer;
     private volatile Map<StreamPartition,StreamRouterSpec> routeSpecMap;
@@ -69,7 +69,7 @@ public class StreamRouterBoltOutputCollector implements PartitionedEventCollecto
         this.outputCollector = outputCollector;
         this.routeSpecMap = new HashMap<>();
         this.routePartitionerMap = new HashMap<>();
-        this.outputStreamIds = outputStreamIds;
+//        this.outputStreamIds = outputStreamIds;
         this.streamContext = streamContext;
         this.serializer = serializer;
     }

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/72a1501c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/AbstractStreamBolt.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/AbstractStreamBolt.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/AbstractStreamBolt.java
index cc819ba..131d85a 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/AbstractStreamBolt.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/AbstractStreamBolt.java
@@ -40,6 +40,7 @@ public abstract class AbstractStreamBolt extends BaseRichBolt {
     private Config config;
     private List<String> outputStreamIds;
     protected OutputCollector collector;
+    protected Map stormConf;
 
     public AbstractStreamBolt(IMetadataChangeNotifyService changeNotifyService, Config config){
         this.changeNotifyService = changeNotifyService;
@@ -56,6 +57,7 @@ public abstract class AbstractStreamBolt extends BaseRichBolt {
 
     @Override
     public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
+        this.stormConf = stormConf;
         Preconditions.checkNotNull(this.changeNotifyService, "IMetadataChangeNotifyService is not set yet");
         this.collector = collector;
         internalPrepare(collector,this.changeNotifyService,this.config,context);

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/72a1501c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/AlertBolt.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/AlertBolt.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/AlertBolt.java
index 30ff5f0..e53b0ba 100755
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/AlertBolt.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/AlertBolt.java
@@ -134,7 +134,7 @@ public class AlertBolt extends AbstractStreamBolt implements AlertBoltSpecListen
     public void onAlertBoltSpecChange(AlertBoltSpec spec, Map<String, StreamDefinition> sds) {
         List<PolicyDefinition> newPolicies = spec.getBoltPoliciesMap().get(boltId);
         if(newPolicies == null) {
-            LOG.info("no policy with AlertBoltSpec {} for this bolt {}", spec, boltId);
+            LOG.info("no new policy with AlertBoltSpec {} for this bolt {}", spec, boltId);
             return;
         }
 

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/72a1501c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/AlertPublisherBolt.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/AlertPublisherBolt.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/AlertPublisherBolt.java
index 0a239e2..768cf48 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/AlertPublisherBolt.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/AlertPublisherBolt.java
@@ -53,13 +53,13 @@ public class AlertPublisherBolt extends AbstractStreamBolt implements AlertPubli
     public AlertPublisherBolt(AlertPublisher alertPublisher, Config config, IMetadataChangeNotifyService coordinatorService){
         super(coordinatorService, config);
         this.alertPublisher = alertPublisher;
-        this.alertPublisher.init(config);
     }
 
     @Override
     public void internalPrepare(OutputCollector collector, IMetadataChangeNotifyService coordinatorService, Config config, TopologyContext context) {
         coordinatorService.registerListener(this);
         coordinatorService.init(config, MetadataType.ALERT_PUBLISH_BOLT);
+        this.alertPublisher.init(config, stormConf);
         streamContext = new StreamContextImpl(config,context.registerMetric("eagle.publisher",new MultiCountMetric(),60),context);
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/72a1501c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/StreamRouterBolt.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/StreamRouterBolt.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/StreamRouterBolt.java
index 942ef97..85c2f73 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/StreamRouterBolt.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/StreamRouterBolt.java
@@ -16,18 +16,26 @@
  */
 package org.apache.eagle.alert.engine.runner;
 
-import backtype.storm.metric.api.MultiCountMetric;
-import backtype.storm.task.OutputCollector;
-import backtype.storm.task.TopologyContext;
-import backtype.storm.tuple.Tuple;
-import com.typesafe.config.Config;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
 import org.apache.commons.collections.CollectionUtils;
 import org.apache.eagle.alert.coordination.model.PolicyWorkerQueue;
 import org.apache.eagle.alert.coordination.model.RouterSpec;
 import org.apache.eagle.alert.coordination.model.StreamRouterSpec;
 import org.apache.eagle.alert.engine.StreamContext;
 import org.apache.eagle.alert.engine.StreamContextImpl;
-import org.apache.eagle.alert.engine.coordinator.*;
+import org.apache.eagle.alert.engine.coordinator.IMetadataChangeNotifyService;
+import org.apache.eagle.alert.engine.coordinator.MetadataType;
+import org.apache.eagle.alert.engine.coordinator.StreamDefinition;
+import org.apache.eagle.alert.engine.coordinator.StreamPartition;
+import org.apache.eagle.alert.engine.coordinator.StreamSortSpec;
 import org.apache.eagle.alert.engine.model.PartitionedEvent;
 import org.apache.eagle.alert.engine.router.StreamRouter;
 import org.apache.eagle.alert.engine.router.StreamRouterBoltSpecListener;
@@ -39,8 +47,12 @@ import org.apache.eagle.alert.utils.AlertConstants;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.IOException;
-import java.util.*;
+import backtype.storm.metric.api.MultiCountMetric;
+import backtype.storm.task.OutputCollector;
+import backtype.storm.task.TopologyContext;
+import backtype.storm.tuple.Tuple;
+
+import com.typesafe.config.Config;
 
 public class StreamRouterBolt extends AbstractStreamBolt implements StreamRouterBoltSpecListener, SerializationMetadataProvider{
     private final static Logger LOG = LoggerFactory.getLogger(StreamRouterBolt.class);

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/72a1501c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/UnitTopologyRunner.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/UnitTopologyRunner.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/UnitTopologyRunner.java
index cef94c7..5a937f2 100755
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/UnitTopologyRunner.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/UnitTopologyRunner.java
@@ -23,6 +23,7 @@ import java.util.ArrayList;
 import java.util.List;
 
 import org.apache.eagle.alert.engine.coordinator.IMetadataChangeNotifyService;
+import org.apache.eagle.alert.engine.coordinator.impl.ZKMetadataChangeNotifyService;
 import org.apache.eagle.alert.engine.evaluator.impl.PolicyGroupEvaluatorImpl;
 import org.apache.eagle.alert.engine.publisher.impl.AlertPublisherImpl;
 import org.apache.eagle.alert.engine.router.impl.StreamRouterImpl;
@@ -67,11 +68,17 @@ public class UnitTopologyRunner {
     public final static int DEFAULT_MESSAGE_TIMEOUT_SECS = 3600;
 
     private final IMetadataChangeNotifyService metadataChangeNotifyService;
+    private backtype.storm.Config givenStormConfig = null;
 
     public UnitTopologyRunner(IMetadataChangeNotifyService metadataChangeNotifyService){
         this.metadataChangeNotifyService = metadataChangeNotifyService;
     }
 
+    public UnitTopologyRunner(ZKMetadataChangeNotifyService changeNotifyService, backtype.storm.Config stormConfig) {
+        this(changeNotifyService);
+        this.givenStormConfig = stormConfig;
+    }
+
     public StormTopology buildTopology(String topologyId,
                               int numOfSpoutTasks,
                               int numOfRouterBolts,
@@ -148,7 +155,7 @@ public class UnitTopologyRunner {
         return builder.createTopology();
     }
 
-    public void run(String topologyId,
+    private void run(String topologyId,
                     int numOfTotalWorkers,
                     int numOfSpoutTasks,
                     int numOfRouterBolts,
@@ -156,7 +163,8 @@ public class UnitTopologyRunner {
                     int numOfPublishTasks,
                     Config config,
                     boolean localMode) {
-        backtype.storm.Config stormConfig = new backtype.storm.Config();
+
+        backtype.storm.Config stormConfig = givenStormConfig == null ? new backtype.storm.Config() : givenStormConfig;
         // TODO: Configurable metric consumer instance number
 
         int messageTimeoutSecs = config.hasPath(MESSAGE_TIMEOUT_SECS)?config.getInt(MESSAGE_TIMEOUT_SECS) : DEFAULT_MESSAGE_TIMEOUT_SECS;
@@ -186,11 +194,6 @@ public class UnitTopologyRunner {
         }
     }
 
-    public void run(Config config) {
-        String topologyId = config.getString("topology.name");
-        run(topologyId,config);
-    }
-
     public void run(String topologyId,Config config) {
         int numOfSpoutTasks = config.getInt(SPOUT_TASK_NUM);
         int numOfRouterBolts = config.getInt(ROUTER_TASK_NUM);

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/72a1501c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/scheme/JsonScheme.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/scheme/JsonScheme.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/scheme/JsonScheme.java
index a3487d3..c1da90f 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/scheme/JsonScheme.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/scheme/JsonScheme.java
@@ -40,7 +40,8 @@ public class JsonScheme implements Scheme {
 
     private String topic;
 
-    public JsonScheme(String topic) {
+    @SuppressWarnings("rawtypes")
+    public JsonScheme(String topic, Map conf) {
         this.topic = topic;
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/72a1501c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/scheme/PlainStringScheme.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/scheme/PlainStringScheme.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/scheme/PlainStringScheme.java
index 89d2e76..194b0c2 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/scheme/PlainStringScheme.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/scheme/PlainStringScheme.java
@@ -41,7 +41,8 @@ public class PlainStringScheme implements Scheme {
     private static final Logger LOG = LoggerFactory.getLogger(PlainStringScheme.class);
     private String topic;
 
-    public PlainStringScheme(String topic){
+    @SuppressWarnings("rawtypes")
+    public PlainStringScheme(String topic, Map conf){
         this.topic = topic;
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/72a1501c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/PartitionedEventDigestSerializer.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/PartitionedEventDigestSerializer.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/PartitionedEventDigestSerializer.java
index 5ba1080..03c1dfb 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/PartitionedEventDigestSerializer.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/PartitionedEventDigestSerializer.java
@@ -16,16 +16,16 @@
  */
 package org.apache.eagle.alert.engine.serialization;
 
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
 import org.apache.eagle.alert.engine.coordinator.StreamPartition;
 import org.apache.eagle.alert.engine.model.PartitionedEvent;
 import org.apache.eagle.alert.engine.model.StreamEvent;
 import org.apache.eagle.alert.engine.serialization.impl.StreamEventSerializer;
 import org.apache.eagle.alert.engine.serialization.impl.StreamPartitionDigestSerializer;
 
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-
 /**
  * TODO: Seams the complexity dosen't bring enough performance improve
  *

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/72a1501c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/PartitionedEventSerializer.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/PartitionedEventSerializer.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/PartitionedEventSerializer.java
index c518e40..f653361 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/PartitionedEventSerializer.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/PartitionedEventSerializer.java
@@ -16,10 +16,10 @@
  */
 package org.apache.eagle.alert.engine.serialization;
 
-import org.apache.eagle.alert.engine.model.PartitionedEvent;
-
 import java.io.IOException;
 
+import org.apache.eagle.alert.engine.model.PartitionedEvent;
+
 public interface PartitionedEventSerializer {
     /**
      *

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/72a1501c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/Serializers.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/Serializers.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/Serializers.java
index a94604c..6be8f1a 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/Serializers.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/Serializers.java
@@ -16,12 +16,19 @@
  */
 package org.apache.eagle.alert.engine.serialization;
 
-import org.apache.eagle.alert.engine.coordinator.StreamColumn;
-import org.apache.eagle.alert.engine.serialization.impl.*;
-
 import java.util.HashMap;
 import java.util.Map;
 
+import org.apache.eagle.alert.engine.coordinator.StreamColumn;
+import org.apache.eagle.alert.engine.serialization.impl.BooleanSerializer;
+import org.apache.eagle.alert.engine.serialization.impl.DoubleSerializer;
+import org.apache.eagle.alert.engine.serialization.impl.FloatSerializer;
+import org.apache.eagle.alert.engine.serialization.impl.IntegerSerializer;
+import org.apache.eagle.alert.engine.serialization.impl.JavaObjectSerializer;
+import org.apache.eagle.alert.engine.serialization.impl.LongSerializer;
+import org.apache.eagle.alert.engine.serialization.impl.PartitionedEventSerializerImpl;
+import org.apache.eagle.alert.engine.serialization.impl.StringSerializer;
+
 public class Serializers {
     private final static Map<StreamColumn.Type,Serializer<?>> COLUMN_TYPE_SER_MAPPING = new HashMap<>();
 
@@ -32,6 +39,7 @@ public class Serializers {
         COLUMN_TYPE_SER_MAPPING.put(type,serializer);
     }
 
+    @SuppressWarnings("unchecked")
     public static <T> Serializer<T> getColumnSerializer(StreamColumn.Type type){
         if(COLUMN_TYPE_SER_MAPPING.containsKey(type)){
             return (Serializer<T>) COLUMN_TYPE_SER_MAPPING.get(type);

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/72a1501c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/impl/BooleanSerializer.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/impl/BooleanSerializer.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/impl/BooleanSerializer.java
index 1e90569..db91a70 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/impl/BooleanSerializer.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/impl/BooleanSerializer.java
@@ -1,11 +1,11 @@
 package org.apache.eagle.alert.engine.serialization.impl;
 
-import org.apache.eagle.alert.engine.serialization.Serializer;
-
 import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
 
+import org.apache.eagle.alert.engine.serialization.Serializer;
+
 /**
  * Licensed to the Apache Software Foundation (ASF) under one or more
  * contributor license agreements.  See the NOTICE file distributed with

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/72a1501c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/impl/DoubleSerializer.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/impl/DoubleSerializer.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/impl/DoubleSerializer.java
index ad5f53c..f2f5359 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/impl/DoubleSerializer.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/impl/DoubleSerializer.java
@@ -1,11 +1,11 @@
 package org.apache.eagle.alert.engine.serialization.impl;
 
-import org.apache.eagle.alert.engine.serialization.Serializer;
-
 import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
 
+import org.apache.eagle.alert.engine.serialization.Serializer;
+
 /**
  * Licensed to the Apache Software Foundation (ASF) under one or more
  * contributor license agreements.  See the NOTICE file distributed with