You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@eagle.apache.org by ra...@apache.org on 2016/07/28 11:22:08 UTC

incubator-eagle git commit: EAGLE-401: StreamRouterBolt and PublishBolt also have NPE

Repository: incubator-eagle
Updated Branches:
  refs/heads/develop 7e9dfaf3d -> 994a1e584


EAGLE-401: StreamRouterBolt and PublishBolt also have NPE

make all stream bolts avoid NPE exception, instead throw stream not found exception for easy troublesooting

Author: ralphsu
Reviewer: ralphsu

Closes #284


Project: http://git-wip-us.apache.org/repos/asf/incubator-eagle/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-eagle/commit/994a1e58
Tree: http://git-wip-us.apache.org/repos/asf/incubator-eagle/tree/994a1e58
Diff: http://git-wip-us.apache.org/repos/asf/incubator-eagle/diff/994a1e58

Branch: refs/heads/develop
Commit: 994a1e584432e8bef5463be57faf955553bd4d01
Parents: 7e9dfaf
Author: Ralph, Su <su...@gmail.com>
Authored: Thu Jul 28 19:21:03 2016 +0800
Committer: Ralph, Su <su...@gmail.com>
Committed: Thu Jul 28 19:21:03 2016 +0800

----------------------------------------------------------------------
 .../eagle/alert/utils/StreamIdConversion.java   |  4 +-
 .../engine/evaluator/PolicyHandlerContext.java  | 10 ++--
 .../impl/PolicyGroupEvaluatorImpl.java          | 10 ++--
 .../evaluator/impl/SiddhiPolicyHandler.java     |  4 +-
 .../evaluator/nodata/NoDataPolicyHandler.java   |  4 +-
 .../alert/engine/runner/AbstractStreamBolt.java | 51 ++++++++++++++++-
 .../eagle/alert/engine/runner/AlertBolt.java    | 36 ++----------
 .../alert/engine/runner/AlertPublisherBolt.java | 33 ++++++-----
 .../alert/engine/runner/StreamRouterBolt.java   | 60 ++++++--------------
 .../alert/engine/runner/UnitTopologyRunner.java | 37 +++++-------
 .../alert/engine/router/TestAlertBolt.java      |  2 +-
 .../engine/router/TestAlertPublisherBolt.java   |  3 +-
 .../engine/runner/TestStreamRouterBolt.java     | 31 +++++-----
 .../metadata/resource/MetadataResource.java     |  4 ++
 14 files changed, 137 insertions(+), 152 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/994a1e58/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/utils/StreamIdConversion.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/utils/StreamIdConversion.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/utils/StreamIdConversion.java
index ccdb6f3..88661d1 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/utils/StreamIdConversion.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/utils/StreamIdConversion.java
@@ -17,8 +17,8 @@ package org.apache.eagle.alert.utils;
  * limitations under the License.
  */
 public class StreamIdConversion {
-    private final static String STREAM_ID_TEMPLATE = "stream_%s_to_%s";
-    private final static String STREAM_ID_NUM_TEMPLATE = "stream_%s";
+    public final static String STREAM_ID_TEMPLATE = "stream_%s_to_%s";
+    public final static String STREAM_ID_NUM_TEMPLATE = "stream_%s";
     public static String generateStreamIdBetween(String sourceId, String targetId){
         return String.format(STREAM_ID_TEMPLATE,sourceId,targetId);
     }

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/994a1e58/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/PolicyHandlerContext.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/PolicyHandlerContext.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/PolicyHandlerContext.java
index 2898ebc..285ca13 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/PolicyHandlerContext.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/PolicyHandlerContext.java
@@ -22,7 +22,7 @@ import backtype.storm.metric.api.MultiCountMetric;
  */
 public class PolicyHandlerContext {
     private PolicyDefinition policyDefinition;
-    private PolicyGroupEvaluator parentEvaluator;
+    private PolicyGroupEvaluator policyEvaluator;
     private MultiCountMetric policyCounter;
     private String policyEvaluatorId;
 
@@ -34,12 +34,12 @@ public class PolicyHandlerContext {
         this.policyDefinition = policyDefinition;
     }
 
-    public PolicyGroupEvaluator getParentEvaluator() {
-        return parentEvaluator;
+    public PolicyGroupEvaluator getPolicyEvaluator() {
+        return policyEvaluator;
     }
 
-    public void setParentEvaluator(PolicyGroupEvaluator parentEvaluator) {
-        this.parentEvaluator = parentEvaluator;
+    public void setPolicyEvaluator(PolicyGroupEvaluator policyEvaluator) {
+        this.policyEvaluator = policyEvaluator;
     }
 
     public void setPolicyCounter(MultiCountMetric metric) {

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/994a1e58/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/impl/PolicyGroupEvaluatorImpl.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/impl/PolicyGroupEvaluatorImpl.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/impl/PolicyGroupEvaluatorImpl.java
index 8a1f04a..228b7fb 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/impl/PolicyGroupEvaluatorImpl.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/impl/PolicyGroupEvaluatorImpl.java
@@ -16,10 +16,6 @@
  */
 package org.apache.eagle.alert.engine.evaluator.impl;
 
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
 import org.apache.eagle.alert.engine.AlertStreamCollector;
 import org.apache.eagle.alert.engine.StreamContext;
 import org.apache.eagle.alert.engine.coordinator.PolicyDefinition;
@@ -32,6 +28,10 @@ import org.apache.eagle.alert.engine.model.PartitionedEvent;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
 public class PolicyGroupEvaluatorImpl implements PolicyGroupEvaluator {
     private static final long serialVersionUID = -5499413193675985288L;
 
@@ -141,7 +141,7 @@ public class PolicyGroupEvaluatorImpl implements PolicyGroupEvaluator {
                 PolicyHandlerContext context = new PolicyHandlerContext();
                 context.setPolicyCounter(this.context.counter());
                 context.setPolicyDefinition(policy);
-                context.setParentEvaluator(this);
+                context.setPolicyEvaluator(this);
                 context.setPolicyEvaluatorId(policyEvaluatorId);
                 handler.prepare(collector, context);
                 handlers.put(policy.getName(), handler);

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/994a1e58/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/impl/SiddhiPolicyHandler.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/impl/SiddhiPolicyHandler.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/impl/SiddhiPolicyHandler.java
index ed26408..b9d1eb2 100755
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/impl/SiddhiPolicyHandler.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/impl/SiddhiPolicyHandler.java
@@ -85,8 +85,8 @@ public class SiddhiPolicyHandler implements PolicyStreamHandler {
                 event.setData(e.getData());
                 event.setStreamId(outputStream);
                 event.setPolicy(context.getPolicyDefinition());
-                if (this.context.getParentEvaluator() != null) {
-                    event.setCreatedBy(context.getParentEvaluator().getName());
+                if (this.context.getPolicyEvaluator() != null) {
+                    event.setCreatedBy(context.getPolicyEvaluator().getName());
                 }
                 event.setCreatedTime(System.currentTimeMillis());
                 event.setSchema(definition);

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/994a1e58/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/nodata/NoDataPolicyHandler.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/nodata/NoDataPolicyHandler.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/nodata/NoDataPolicyHandler.java
index 0e9ab6c..9ad7529 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/nodata/NoDataPolicyHandler.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/nodata/NoDataPolicyHandler.java
@@ -191,8 +191,8 @@ public class NoDataPolicyHandler implements PolicyStreamHandler{
         event.setData(triggerEvent);
         event.setStreamId(policyDef.getOutputStreams().get(0));
         event.setPolicy(context.getPolicyDefinition());
-        if (this.context.getParentEvaluator() != null) {
-            event.setCreatedBy(context.getParentEvaluator().getName());
+        if (this.context.getPolicyEvaluator() != null) {
+            event.setCreatedBy(context.getPolicyEvaluator().getName());
         }
         event.setCreatedTime(System.currentTimeMillis());
         event.setSchema(sd);

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/994a1e58/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/AbstractStreamBolt.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/AbstractStreamBolt.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/AbstractStreamBolt.java
index 131d85a..fe896d1 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/AbstractStreamBolt.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/AbstractStreamBolt.java
@@ -16,10 +16,21 @@
  */
 package org.apache.eagle.alert.engine.runner;
 
+import java.io.IOException;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
+import backtype.storm.metric.api.MultiCountMetric;
+import org.apache.eagle.alert.engine.StreamContext;
+import org.apache.eagle.alert.engine.StreamContextImpl;
 import org.apache.eagle.alert.engine.coordinator.IMetadataChangeNotifyService;
+import org.apache.eagle.alert.engine.coordinator.StreamDefinition;
+import org.apache.eagle.alert.engine.coordinator.StreamDefinitionNotFoundException;
+import org.apache.eagle.alert.engine.model.PartitionedEvent;
+import org.apache.eagle.alert.engine.serialization.PartitionedEventSerializer;
+import org.apache.eagle.alert.engine.serialization.SerializationMetadataProvider;
+import org.apache.eagle.alert.engine.serialization.Serializers;
 import org.apache.eagle.alert.utils.AlertConstants;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -34,7 +45,7 @@ import com.google.common.base.Preconditions;
 import com.typesafe.config.Config;
 
 @SuppressWarnings({"rawtypes", "serial"})
-public abstract class AbstractStreamBolt extends BaseRichBolt {
+public abstract class AbstractStreamBolt extends BaseRichBolt implements SerializationMetadataProvider {
     private static final Logger LOG = LoggerFactory.getLogger(AbstractStreamBolt.class);
     private IMetadataChangeNotifyService changeNotifyService;
     private Config config;
@@ -42,7 +53,14 @@ public abstract class AbstractStreamBolt extends BaseRichBolt {
     protected OutputCollector collector;
     protected Map stormConf;
 
-    public AbstractStreamBolt(IMetadataChangeNotifyService changeNotifyService, Config config){
+    private String boltId;
+    protected PartitionedEventSerializer serializer;
+    protected volatile Map<String, StreamDefinition> sdf  = new HashMap<String, StreamDefinition>();
+    protected volatile String specVersion = "Not Initialized";
+    protected StreamContext streamContext;
+
+    public AbstractStreamBolt(String boltId, IMetadataChangeNotifyService changeNotifyService, Config config) {
+        this.boltId = boltId;
         this.changeNotifyService = changeNotifyService;
         this.config = config;
     }
@@ -57,15 +75,29 @@ public abstract class AbstractStreamBolt extends BaseRichBolt {
 
     @Override
     public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
-        this.stormConf = stormConf;
         Preconditions.checkNotNull(this.changeNotifyService, "IMetadataChangeNotifyService is not set yet");
+        this.stormConf = stormConf;
         this.collector = collector;
+        this.serializer = Serializers.newPartitionedEventSerializer(this);
         internalPrepare(collector,this.changeNotifyService,this.config,context);
     }
 
+
+    protected PartitionedEvent deserialize(Object object) throws IOException {
+        // byte[] in higher priority
+        if(object instanceof byte[]) {
+            return serializer.deserialize((byte[]) object);
+        } else if (object instanceof PartitionedEvent){
+            return (PartitionedEvent) object;
+        } else {
+            throw new IllegalStateException(String.format("Unsupported event class '%s', expect byte array or PartitionedEvent!", object == null ? null : object.getClass().getCanonicalName()));
+        }
+    }
+
     /**
      * subclass should implement more initialization for example
      * 1) register metadata change
+     * 2) init stream context
      * @param collector
      * @param metadataManager
      * @param config
@@ -92,4 +124,17 @@ public abstract class AbstractStreamBolt extends BaseRichBolt {
             declarer.declare(new Fields(AlertConstants.FIELD_0));
         }
     }
+
+    @Override
+    public StreamDefinition getStreamDefinition(String streamId) throws StreamDefinitionNotFoundException {
+        if (sdf.containsKey(streamId)) {
+            return sdf.get(streamId);
+        } else {
+            throw new StreamDefinitionNotFoundException(streamId, specVersion);
+        }
+    }
+
+    public String getBoltId() {
+        return boltId;
+    }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/994a1e58/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/AlertBolt.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/AlertBolt.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/AlertBolt.java
index 5daa236..683e571 100755
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/AlertBolt.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/AlertBolt.java
@@ -31,6 +31,7 @@ import org.apache.eagle.alert.engine.StreamContextImpl;
 import org.apache.eagle.alert.engine.coordinator.*;
 import org.apache.eagle.alert.engine.evaluator.PolicyGroupEvaluator;
 import org.apache.eagle.alert.engine.evaluator.impl.AlertBoltOutputCollectorWrapper;
+import org.apache.eagle.alert.engine.evaluator.impl.PolicyGroupEvaluatorImpl;
 import org.apache.eagle.alert.engine.model.PartitionedEvent;
 import org.apache.eagle.alert.engine.router.AlertBoltSpecListener;
 import org.apache.eagle.alert.engine.serialization.PartitionedEventSerializer;
@@ -61,26 +62,12 @@ public class AlertBolt extends AbstractStreamBolt implements AlertBoltSpecListen
     // mapping from policy name to PolicyDefinition
     private volatile Map<String, PolicyDefinition> cachedPolicies = new HashMap<>(); // for one streamGroup, there are multiple policies
 
-    private StreamContext streamContext;
-    private volatile Map<String, StreamDefinition> sdf  = new HashMap<String, StreamDefinition>();
-    private PartitionedEventSerializer serializer;
-    private volatile String specVersion = "Not Initialized";
 
-    public AlertBolt(String boltId, PolicyGroupEvaluator policyGroupEvaluator, Config config, IMetadataChangeNotifyService changeNotifyService){
-        super(changeNotifyService, config);
+    public AlertBolt(String boltId, Config config, IMetadataChangeNotifyService changeNotifyService){
+        super(boltId, changeNotifyService, config);
         this.boltId = boltId;
-        this.policyGroupEvaluator = policyGroupEvaluator;
-    }
-
-    PartitionedEvent deserialize(Object object) throws IOException {
-        // byte[] in higher priority
-        if(object instanceof byte[]) {
-            return serializer.deserialize((byte[]) object);
-        } else if (object instanceof PartitionedEvent){
-            return (PartitionedEvent) object;
-        } else {
-            throw new IllegalStateException(String.format("Unsupported event class '%s', expect byte array or PartitionedEvent!", object == null ? null : object.getClass().getCanonicalName()));
-        }
+        this.policyGroupEvaluator = new PolicyGroupEvaluatorImpl(boltId + "-evaluator_stage1"); // use bolt id as evaluatorId.
+        // TODO next stage evaluator
     }
 
     @Override
@@ -108,7 +95,6 @@ public class AlertBolt extends AbstractStreamBolt implements AlertBoltSpecListen
         // instantiate output lock object
         outputLock = new Object();
         streamContext = new StreamContextImpl(config,context.registerMetric("eagle.evaluator",new MultiCountMetric(),60),context);
-        serializer = Serializers.newPartitionedEventSerializer(this);
         alertOutputCollector = new AlertBoltOutputCollectorWrapper(collector, outputLock,streamContext);
         policyGroupEvaluator.init(streamContext, alertOutputCollector);
         metadataChangeNotifyService.registerListener(this);
@@ -149,16 +135,4 @@ public class AlertBolt extends AbstractStreamBolt implements AlertBoltSpecListen
         specVersion = spec.getVersion();
     }
 
-    @Override
-    public StreamDefinition getStreamDefinition(String streamId) throws StreamDefinitionNotFoundException {
-        if (sdf.containsKey(streamId)) {
-            return sdf.get(streamId);
-        } else {
-            throw new StreamDefinitionNotFoundException(streamId, specVersion);
-        }
-    }
-
-    public String getBoltId() {
-        return boltId;
-    }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/994a1e58/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/AlertPublisherBolt.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/AlertPublisherBolt.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/AlertPublisherBolt.java
index 768cf48..5f58536 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/AlertPublisherBolt.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/AlertPublisherBolt.java
@@ -16,11 +16,13 @@
  */
 package org.apache.eagle.alert.engine.runner;
 
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
+import backtype.storm.metric.api.MultiCountMetric;
+import backtype.storm.task.OutputCollector;
+import backtype.storm.task.TopologyContext;
+import backtype.storm.topology.OutputFieldsDeclarer;
+import backtype.storm.tuple.Fields;
+import backtype.storm.tuple.Tuple;
+import com.typesafe.config.Config;
 import org.apache.eagle.alert.coordination.model.PublishSpec;
 import org.apache.eagle.alert.engine.StreamContextImpl;
 import org.apache.eagle.alert.engine.coordinator.IMetadataChangeNotifyService;
@@ -30,29 +32,25 @@ import org.apache.eagle.alert.engine.coordinator.StreamDefinition;
 import org.apache.eagle.alert.engine.model.AlertStreamEvent;
 import org.apache.eagle.alert.engine.publisher.AlertPublishSpecListener;
 import org.apache.eagle.alert.engine.publisher.AlertPublisher;
+import org.apache.eagle.alert.engine.publisher.impl.AlertPublisherImpl;
 import org.apache.eagle.alert.utils.AlertConstants;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import backtype.storm.metric.api.MultiCountMetric;
-import backtype.storm.task.OutputCollector;
-import backtype.storm.task.TopologyContext;
-import backtype.storm.topology.OutputFieldsDeclarer;
-import backtype.storm.tuple.Fields;
-import backtype.storm.tuple.Tuple;
-
-import com.typesafe.config.Config;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
 
 @SuppressWarnings("serial")
 public class AlertPublisherBolt extends AbstractStreamBolt implements AlertPublishSpecListener {
     private final static Logger LOG = LoggerFactory.getLogger(AlertPublisherBolt.class);
     private final AlertPublisher alertPublisher;
     private volatile Map<String, Publishment> cachedPublishments = new HashMap<>();
-    private StreamContextImpl streamContext;
 
-    public AlertPublisherBolt(AlertPublisher alertPublisher, Config config, IMetadataChangeNotifyService coordinatorService){
-        super(coordinatorService, config);
-        this.alertPublisher = alertPublisher;
+    public AlertPublisherBolt(String alertPublisherName, Config config, IMetadataChangeNotifyService coordinatorService){
+        super(alertPublisherName, coordinatorService, config);
+        this.alertPublisher = new AlertPublisherImpl(alertPublisherName);
     }
 
     @Override
@@ -109,5 +107,6 @@ public class AlertPublisherBolt extends AbstractStreamBolt implements AlertPubli
 
         // switch
         cachedPublishments = newPublishmentsMap;
+        specVersion = pubSpec.getVersion();
     }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/994a1e58/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/StreamRouterBolt.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/StreamRouterBolt.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/StreamRouterBolt.java
index 85c2f73..0c1d12c 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/StreamRouterBolt.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/StreamRouterBolt.java
@@ -16,43 +16,27 @@
  */
 package org.apache.eagle.alert.engine.runner;
 
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
+import backtype.storm.metric.api.MultiCountMetric;
+import backtype.storm.task.OutputCollector;
+import backtype.storm.task.TopologyContext;
+import backtype.storm.tuple.Tuple;
+import com.typesafe.config.Config;
 import org.apache.commons.collections.CollectionUtils;
 import org.apache.eagle.alert.coordination.model.PolicyWorkerQueue;
 import org.apache.eagle.alert.coordination.model.RouterSpec;
 import org.apache.eagle.alert.coordination.model.StreamRouterSpec;
-import org.apache.eagle.alert.engine.StreamContext;
 import org.apache.eagle.alert.engine.StreamContextImpl;
-import org.apache.eagle.alert.engine.coordinator.IMetadataChangeNotifyService;
-import org.apache.eagle.alert.engine.coordinator.MetadataType;
-import org.apache.eagle.alert.engine.coordinator.StreamDefinition;
-import org.apache.eagle.alert.engine.coordinator.StreamPartition;
-import org.apache.eagle.alert.engine.coordinator.StreamSortSpec;
-import org.apache.eagle.alert.engine.model.PartitionedEvent;
+import org.apache.eagle.alert.engine.coordinator.*;
 import org.apache.eagle.alert.engine.router.StreamRouter;
 import org.apache.eagle.alert.engine.router.StreamRouterBoltSpecListener;
 import org.apache.eagle.alert.engine.router.impl.StreamRouterBoltOutputCollector;
-import org.apache.eagle.alert.engine.serialization.PartitionedEventSerializer;
+import org.apache.eagle.alert.engine.router.impl.StreamRouterImpl;
 import org.apache.eagle.alert.engine.serialization.SerializationMetadataProvider;
-import org.apache.eagle.alert.engine.serialization.Serializers;
 import org.apache.eagle.alert.utils.AlertConstants;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import backtype.storm.metric.api.MultiCountMetric;
-import backtype.storm.task.OutputCollector;
-import backtype.storm.task.TopologyContext;
-import backtype.storm.tuple.Tuple;
-
-import com.typesafe.config.Config;
+import java.util.*;
 
 public class StreamRouterBolt extends AbstractStreamBolt implements StreamRouterBoltSpecListener, SerializationMetadataProvider{
     private final static Logger LOG = LoggerFactory.getLogger(StreamRouterBolt.class);
@@ -63,35 +47,22 @@ public class StreamRouterBolt extends AbstractStreamBolt implements StreamRouter
     private volatile Map<StreamPartition, StreamSortSpec> cachedSSS = new HashMap<>();
     // mapping from StreamPartition(streamId, groupbyspec) to StreamRouterSpec
     private volatile Map<StreamPartition, StreamRouterSpec> cachedSRS = new HashMap<>();
-    private volatile Map<String,StreamDefinition> sdf = new HashMap<>();
-    private PartitionedEventSerializer serializer;
 
-    public StreamRouterBolt(StreamRouter router, Config config, IMetadataChangeNotifyService changeNotifyService) {
-        super(changeNotifyService, config);
-        this.router = router;
+    public StreamRouterBolt(String boltId, Config config, IMetadataChangeNotifyService changeNotifyService) {
+        super(boltId, changeNotifyService, config);
+        this.router = new StreamRouterImpl(boltId + "-router");
     }
 
-    private StreamContext streamContext;
 
     @Override
     public void internalPrepare(OutputCollector collector, IMetadataChangeNotifyService changeNotifyService, Config config, TopologyContext context) {
         streamContext = new StreamContextImpl(config,context.registerMetric("eagle.router",new MultiCountMetric(),60),context);
-        serializer= Serializers.newPartitionedEventSerializer(this);
-        routeCollector = new StreamRouterBoltOutputCollector(this.router.getName(),collector,this.getOutputStreamIds(),streamContext,serializer);
+        routeCollector = new StreamRouterBoltOutputCollector(getBoltId(),collector,this.getOutputStreamIds(),streamContext,serializer);
         router.prepare(streamContext, routeCollector);
         changeNotifyService.registerListener(this);
         changeNotifyService.init(config, MetadataType.STREAM_ROUTER_BOLT);
     }
 
-    PartitionedEvent deserialize(Object object) throws IOException {
-        // byte[] in higher priority
-        if(object instanceof byte[]) {
-            return serializer.deserialize((byte[]) object);
-        } else {
-            return (PartitionedEvent) object;
-        }
-    }
-
     @Override
     public void execute(Tuple input) {
         try {
@@ -184,6 +155,7 @@ public class StreamRouterBolt extends AbstractStreamBolt implements StreamRouter
         // switch cache
         cachedSRS = newSRS;
         sdf = sds;
+        specVersion = spec.getVersion();
     }
 
     /**
@@ -206,8 +178,8 @@ public class StreamRouterBolt extends AbstractStreamBolt implements StreamRouter
         }
     }
 
-    @Override
-    public StreamDefinition getStreamDefinition(String streamId) {
-        return this.sdf.get(streamId);
+    public StreamRouter getStreamRouter() {
+        return router;
     }
+
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/994a1e58/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/UnitTopologyRunner.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/UnitTopologyRunner.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/UnitTopologyRunner.java
index 5a937f2..87faf82 100755
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/UnitTopologyRunner.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/UnitTopologyRunner.java
@@ -19,20 +19,6 @@
 
 package org.apache.eagle.alert.engine.runner;
 
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.eagle.alert.engine.coordinator.IMetadataChangeNotifyService;
-import org.apache.eagle.alert.engine.coordinator.impl.ZKMetadataChangeNotifyService;
-import org.apache.eagle.alert.engine.evaluator.impl.PolicyGroupEvaluatorImpl;
-import org.apache.eagle.alert.engine.publisher.impl.AlertPublisherImpl;
-import org.apache.eagle.alert.engine.router.impl.StreamRouterImpl;
-import org.apache.eagle.alert.engine.spout.CorrelationSpout;
-import org.apache.eagle.alert.utils.AlertConstants;
-import org.apache.eagle.alert.utils.StreamIdConversion;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
 import backtype.storm.LocalCluster;
 import backtype.storm.StormSubmitter;
 import backtype.storm.generated.StormTopology;
@@ -40,9 +26,18 @@ import backtype.storm.topology.BoltDeclarer;
 import backtype.storm.topology.TopologyBuilder;
 import backtype.storm.tuple.Fields;
 import backtype.storm.utils.Utils;
-
 import com.typesafe.config.Config;
 import com.typesafe.config.ConfigRenderOptions;
+import org.apache.eagle.alert.engine.coordinator.IMetadataChangeNotifyService;
+import org.apache.eagle.alert.engine.coordinator.impl.ZKMetadataChangeNotifyService;
+import org.apache.eagle.alert.engine.spout.CorrelationSpout;
+import org.apache.eagle.alert.utils.AlertConstants;
+import org.apache.eagle.alert.utils.StreamIdConversion;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
 
 /**
  * By default
@@ -86,11 +81,8 @@ public class UnitTopologyRunner {
                               int numOfPublishTasks,
                               Config config) {
 
-        StreamRouterImpl[] routers = new StreamRouterImpl[numOfRouterBolts];
         StreamRouterBolt[] routerBolts = new StreamRouterBolt[numOfRouterBolts];
-        PolicyGroupEvaluatorImpl[] evaluators = new PolicyGroupEvaluatorImpl[numOfAlertBolts];
         AlertBolt[] alertBolts = new AlertBolt[numOfAlertBolts];
-        AlertPublisherImpl publisher;
         AlertPublisherBolt publisherBolt;
 
         TopologyBuilder builder = new TopologyBuilder();
@@ -102,19 +94,16 @@ public class UnitTopologyRunner {
 
         // construct StreamRouterBolt objects
         for(int i=0; i<numOfRouterBolts; i++){
-            routers[i] = new StreamRouterImpl(streamRouterBoltNamePrefix + i);
-            routerBolts[i] = new StreamRouterBolt(routers[i], config, getMetadataChangeNotifyService());
+            routerBolts[i] = new StreamRouterBolt(streamRouterBoltNamePrefix + i, config, getMetadataChangeNotifyService());
         }
 
         // construct AlertBolt objects
         for(int i=0; i<numOfAlertBolts; i++){
-            evaluators[i] = new PolicyGroupEvaluatorImpl(alertBoltNamePrefix + i);
-            alertBolts[i] = new AlertBolt(alertBoltNamePrefix+i, evaluators[i], config, getMetadataChangeNotifyService());
+            alertBolts[i] = new AlertBolt(alertBoltNamePrefix+i, config, getMetadataChangeNotifyService());
         }
 
         // construct AlertPublishBolt object
-        publisher = new AlertPublisherImpl(alertPublishBoltName);
-        publisherBolt = new AlertPublisherBolt(publisher, config, getMetadataChangeNotifyService());
+        publisherBolt = new AlertPublisherBolt(alertPublishBoltName, config, getMetadataChangeNotifyService());
 
         // connect spout and router bolt, also define output streams for downstreaming alert bolt
         for(int i=0; i<numOfRouterBolts; i++){

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/994a1e58/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/router/TestAlertBolt.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/router/TestAlertBolt.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/router/TestAlertBolt.java
index 3bd7b9b..7dea167 100755
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/router/TestAlertBolt.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/router/TestAlertBolt.java
@@ -167,7 +167,7 @@ public class TestAlertBolt {
         Config config = ConfigFactory.load();
         PolicyGroupEvaluator policyGroupEvaluator = new PolicyGroupEvaluatorImpl("testPolicyGroupEvaluatorImpl");
         TestStreamRouterBolt.MockChangeService mockChangeService = new TestStreamRouterBolt.MockChangeService();
-        AlertBolt bolt = new AlertBolt("alertBolt1", policyGroupEvaluator, config, mockChangeService);
+        AlertBolt bolt = new AlertBolt("alertBolt1", config, mockChangeService);
         Map stormConf = new HashMap<>();
         TopologyContext topologyContext = mock(TopologyContext.class);
         when(topologyContext.registerMetric(any(String.class), any(MultiCountMetric.class), any(int.class))).thenReturn(new MultiCountMetric());

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/994a1e58/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/router/TestAlertPublisherBolt.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/router/TestAlertPublisherBolt.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/router/TestAlertPublisherBolt.java
index 23ddd69..01f6de7 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/router/TestAlertPublisherBolt.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/router/TestAlertPublisherBolt.java
@@ -87,8 +87,7 @@ public class TestAlertPublisherBolt {
         comparator.compare();
         Assert.assertTrue(comparator.getModified().size() == 1);
 
-        AlertPublisher alertPublisher = new AlertPublisherImpl("alert-publisher-test");
-        AlertPublisherBolt publisherBolt = new AlertPublisherBolt(alertPublisher, null, null);
+        AlertPublisherBolt publisherBolt = new AlertPublisherBolt("alert-publisher-test", null, null);
         publisherBolt.onAlertPublishSpecChange(spec1, null);
         publisherBolt.onAlertPublishSpecChange(spec2, null);
     }

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/994a1e58/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/runner/TestStreamRouterBolt.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/runner/TestStreamRouterBolt.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/runner/TestStreamRouterBolt.java
index 2de2073..13550e1 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/runner/TestStreamRouterBolt.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/runner/TestStreamRouterBolt.java
@@ -42,6 +42,7 @@ import org.apache.eagle.alert.engine.model.PartitionedEvent;
 import org.apache.eagle.alert.engine.model.StreamEvent;
 import org.apache.eagle.alert.engine.router.impl.StreamRouterImpl;
 import org.apache.eagle.alert.utils.DateTimeUtil;
+import org.apache.eagle.alert.utils.StreamIdConversion;
 import org.joda.time.Period;
 import org.junit.Assert;
 import org.junit.Test;
@@ -77,9 +78,8 @@ public class TestStreamRouterBolt {
     @Test
     public void testRouterWithSortAndRouteSpec() throws Exception{
         Config config = ConfigFactory.load();
-        StreamRouterImpl routerImpl = new StreamRouterImpl("testStreamRouterImpl");
         MockChangeService mockChangeService = new MockChangeService();
-        StreamRouterBolt bolt = new StreamRouterBolt(routerImpl, config, mockChangeService);
+        StreamRouterBolt routerBolt = new StreamRouterBolt("routerBolt1", config, mockChangeService);
 
         final Map<String,List<PartitionedEvent>> streamCollected = new HashMap<>();
         final List<PartitionedEvent> orderCollected = new ArrayList<>();
@@ -90,7 +90,7 @@ public class TestStreamRouterBolt {
             public List<Integer> emit(String streamId, Collection<Tuple> anchors, List<Object> tuple) {
                 PartitionedEvent event;
                 try {
-                    event = bolt.deserialize(tuple.get(0));
+                    event = routerBolt.deserialize(tuple.get(0));
                 } catch (IOException e) {
                     throw new RuntimeException(e);
                 }
@@ -120,7 +120,7 @@ public class TestStreamRouterBolt {
         Map stormConf = new HashMap<>();
         TopologyContext topologyContext = mock(TopologyContext.class);
         when(topologyContext.registerMetric(any(String.class), any(MultiCountMetric.class), any(int.class))).thenReturn(new MultiCountMetric());
-        bolt.prepare(stormConf, topologyContext, collector);
+        routerBolt.prepare(stormConf, topologyContext, collector);
 
         String streamId = "cpuUsageStream";
         // StreamPartition, groupby col1 for stream cpuUsageStream
@@ -147,6 +147,7 @@ public class TestStreamRouterBolt {
         queue.setWorkers(Arrays.asList(new WorkSlot("testTopology","alertBolt1"), new WorkSlot("testTopology","alertBolt2")));
         routerSpec.setTargetQueue(Collections.singletonList(queue));
         boltSpec.addRouterSpec(routerSpec);
+        boltSpec.setVersion("version1");
 
         // construct StreamDefinition
         StreamDefinition schema = new StreamDefinition();
@@ -158,8 +159,8 @@ public class TestStreamRouterBolt {
         Map<String, StreamDefinition> sds = new HashMap<>();
         sds.put(schema.getStreamId(), schema);
 
-        bolt.declareOutputStreams(Arrays.asList("alertBolt1", "alertBolt2"));
-        bolt.onStreamRouteBoltSpecChange(boltSpec, sds);
+        routerBolt.declareOutputStreams(Arrays.asList("alertBolt1", "alertBolt2"));
+        routerBolt.onStreamRouteBoltSpecChange(boltSpec, sds);
         GeneralTopologyContext context = mock(GeneralTopologyContext.class);
         int taskId = 1;
         when(context.getComponentId(taskId)).thenReturn("comp1");
@@ -185,7 +186,7 @@ public class TestStreamRouterBolt {
         pEvent.setEvent(event);
         pEvent.setPartition(sp);
         Tuple input = new TupleImpl(context, Collections.singletonList(pEvent), taskId, "default");
-        bolt.execute(input);
+        routerBolt.execute(input);
 
         // construct another event with "value2"
         event = new StreamEvent();
@@ -197,7 +198,7 @@ public class TestStreamRouterBolt {
         pEvent.setPartition(sp);
         pEvent.setEvent(event);
         input = new TupleImpl(context, Collections.singletonList(pEvent), taskId, "default");
-        bolt.execute(input);
+        routerBolt.execute(input);
 
         // construct another event with "value3"
         event = new StreamEvent();
@@ -209,7 +210,7 @@ public class TestStreamRouterBolt {
         pEvent.setPartition(sp);
         pEvent.setEvent(event);
         input = new TupleImpl(context, Collections.singletonList(pEvent), taskId, "default");
-        bolt.execute(input);
+        routerBolt.execute(input);
 
         // construct another event with "value4"
         event = new StreamEvent();
@@ -221,7 +222,7 @@ public class TestStreamRouterBolt {
         pEvent.setPartition(sp);
         pEvent.setEvent(event);
         input = new TupleImpl(context, Collections.singletonList(pEvent), taskId, "default");
-        bolt.execute(input);
+        routerBolt.execute(input);
 
         // construct another event with "value5", which will be thrown because two late
         event = new StreamEvent();
@@ -233,18 +234,20 @@ public class TestStreamRouterBolt {
         pEvent.setPartition(sp);
         pEvent.setEvent(event);
         input = new TupleImpl(context, Collections.singletonList(pEvent), taskId, "default");
-        bolt.execute(input);
+        routerBolt.execute(input);
 
         Assert.assertEquals("Should finally collect two streams",2,streamCollected.size());
-        Assert.assertTrue("Should collect stream stream_testStreamRouterImpl_to_alertBolt1",streamCollected.keySet().contains("stream_testStreamRouterImpl_to_alertBolt1"));
-        Assert.assertTrue("Should collect stream stream_testStreamRouterImpl_to_alertBolt2",streamCollected.keySet().contains("stream_testStreamRouterImpl_to_alertBolt2"));
+        Assert.assertTrue("Should collect stream stream_routerBolt_to_alertBolt1",streamCollected.keySet().contains(
+                String.format(StreamIdConversion.generateStreamIdBetween(routerBolt.getBoltId(), "alertBolt1"))));
+        Assert.assertTrue("Should collect stream stream_routerBolt_to_alertBolt2",streamCollected.keySet().contains(
+                String.format(StreamIdConversion.generateStreamIdBetween(routerBolt.getBoltId(), "alertBolt2"))));
 
         Assert.assertEquals("Should finally collect 3 events",3,orderCollected.size());
         Assert.assertArrayEquals("Should sort 3 events in ASC order",new String[]{"value2","value1","value3"},orderCollected.stream().map((d)->d.getData()[0]).toArray());
 
         // The first 3 events are ticked automatically by window
 
-        bolt.cleanup();
+        routerBolt.cleanup();
 
         // Close will flush all events in memory, so will receive the last event which is still in memory as window is not expired according to clock
         // The 5th event will be thrown because too late and out of margin

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/994a1e58/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata-service/src/main/java/org/apache/eagle/service/metadata/resource/MetadataResource.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata-service/src/main/java/org/apache/eagle/service/metadata/resource/MetadataResource.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata-service/src/main/java/org/apache/eagle/service/metadata/resource/MetadataResource.java
index 8712241..1799fa8 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata-service/src/main/java/org/apache/eagle/service/metadata/resource/MetadataResource.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata-service/src/main/java/org/apache/eagle/service/metadata/resource/MetadataResource.java
@@ -54,6 +54,10 @@ public class MetadataResource {
 //    private IMetadataDao dao = MetadataDaoFactory.getInstance().getMetadataDao();
     private final IMetadataDao dao;
 
+    public MetadataResource(){
+        this.dao = MetadataDaoFactory.getInstance().getMetadataDao();;
+    }
+
     @Inject
     public MetadataResource(IMetadataDao dao){
         this.dao = dao;