You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@eagle.apache.org by ji...@apache.org on 2016/11/29 03:08:41 UTC

incubator-eagle git commit: [EAGLE-804] make interface StreamContext more abstract

Repository: incubator-eagle
Updated Branches:
  refs/heads/master 70600b260 -> dcefa2068


[EAGLE-804] make interface StreamContext more abstract

 - Add interface StreamCounter to abstract count

https://issues.apache.org/jira/browse/EAGLE-804

Author: r7raul1984 <ta...@yhd.com>

Closes #693 from r7raul1984/EAGLE-804.


Project: http://git-wip-us.apache.org/repos/asf/incubator-eagle/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-eagle/commit/dcefa206
Tree: http://git-wip-us.apache.org/repos/asf/incubator-eagle/tree/dcefa206
Diff: http://git-wip-us.apache.org/repos/asf/incubator-eagle/diff/dcefa206

Branch: refs/heads/master
Commit: dcefa206835542c187059878d2ffbae50c1df1a3
Parents: 70600b2
Author: r7raul1984 <ta...@yhd.com>
Authored: Tue Nov 29 11:08:33 2016 +0800
Committer: wujinhu <wu...@126.com>
Committed: Tue Nov 29 11:08:33 2016 +0800

----------------------------------------------------------------------
 .../alert/engine/StormMultiCountMetric.java     | 26 ++++++++++++++++++++
 .../eagle/alert/engine/StreamContext.java       |  3 +--
 .../eagle/alert/engine/StreamContextImpl.java   |  6 ++---
 .../eagle/alert/engine/StreamCounter.java       |  9 +++++++
 .../engine/evaluator/PolicyHandlerContext.java  |  8 +++---
 .../impl/AlertBoltOutputCollectorWrapper.java   |  2 +-
 .../evaluator/impl/AlertStreamCallback.java     |  2 +-
 .../impl/PolicyGroupEvaluatorImpl.java          | 10 ++++----
 .../evaluator/impl/SiddhiPolicyHandler.java     |  6 ++---
 .../impl/StreamRouterBoltOutputCollector.java   | 12 ++++-----
 .../engine/router/impl/StreamRouterImpl.java    |  8 +++---
 .../eagle/alert/engine/runner/AlertBolt.java    |  8 +++---
 .../alert/engine/runner/StreamRouterBolt.java   |  4 +--
 .../SiddhiCEPPolicyEventHandlerTest.java        |  5 ++--
 14 files changed, 72 insertions(+), 37 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/dcefa206/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/StormMultiCountMetric.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/StormMultiCountMetric.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/StormMultiCountMetric.java
new file mode 100644
index 0000000..aa97b57
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/StormMultiCountMetric.java
@@ -0,0 +1,26 @@
+package org.apache.eagle.alert.engine;
+
+import backtype.storm.metric.api.MultiCountMetric;
+
+public class StormMultiCountMetric implements StreamCounter {
+    private MultiCountMetric countMetric;
+
+    public StormMultiCountMetric(MultiCountMetric counter) {
+        this.countMetric = counter;
+    }
+
+    @Override
+    public void incr(String scopeName) {
+        countMetric.scope(scopeName).incr();
+    }
+
+    @Override
+    public void incrBy(String scopeName, int length) {
+        countMetric.scope(scopeName).incrBy(length);
+    }
+
+    @Override
+    public void scope(String scopeName) {
+        countMetric.scope(scopeName);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/dcefa206/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/StreamContext.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/StreamContext.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/StreamContext.java
index a03932f..bafba83 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/StreamContext.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/StreamContext.java
@@ -1,6 +1,5 @@
 package org.apache.eagle.alert.engine;
 
-import backtype.storm.metric.api.MultiCountMetric;
 import com.typesafe.config.Config;
 
 /**
@@ -20,7 +19,7 @@ import com.typesafe.config.Config;
  * limitations under the License.
  */
 public interface StreamContext {
-    MultiCountMetric counter();
+    StreamCounter counter();
 
     Config config();
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/dcefa206/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/StreamContextImpl.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/StreamContextImpl.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/StreamContextImpl.java
index d02028a..e77a41b 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/StreamContextImpl.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/StreamContextImpl.java
@@ -22,15 +22,15 @@ import com.typesafe.config.Config;
 
 public class StreamContextImpl implements StreamContext {
     private final Config config;
-    private final MultiCountMetric counter;
+    private final StreamCounter counter;
 
     public StreamContextImpl(Config config, MultiCountMetric counter, TopologyContext context) {
-        this.counter = counter;
+        this.counter = new StormMultiCountMetric(counter);
         this.config = config;
     }
 
     @Override
-    public MultiCountMetric counter() {
+    public StreamCounter counter() {
         return this.counter;
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/dcefa206/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/StreamCounter.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/StreamCounter.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/StreamCounter.java
new file mode 100644
index 0000000..ff96c30
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/StreamCounter.java
@@ -0,0 +1,9 @@
+package org.apache.eagle.alert.engine;
+
+public interface StreamCounter {
+    void incr(String scopeName);
+
+    void incrBy(String scopeName, int length);
+
+    void scope(String scopeName);
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/dcefa206/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/PolicyHandlerContext.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/PolicyHandlerContext.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/PolicyHandlerContext.java
index 49f7eed..59d9e1f 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/PolicyHandlerContext.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/PolicyHandlerContext.java
@@ -17,14 +17,14 @@
 
 package org.apache.eagle.alert.engine.evaluator;
 
+import org.apache.eagle.alert.engine.StreamCounter;
 import org.apache.eagle.alert.engine.coordinator.PolicyDefinition;
-import backtype.storm.metric.api.MultiCountMetric;
 import com.typesafe.config.Config;
 
 public class PolicyHandlerContext {
     private PolicyDefinition policyDefinition;
     private PolicyGroupEvaluator policyEvaluator;
-    private MultiCountMetric policyCounter;
+    private StreamCounter policyCounter;
     private String policyEvaluatorId;
     private Config config;
 
@@ -44,11 +44,11 @@ public class PolicyHandlerContext {
         this.policyEvaluator = policyEvaluator;
     }
 
-    public void setPolicyCounter(MultiCountMetric metric) {
+    public void setPolicyCounter(StreamCounter metric) {
         this.policyCounter = metric;
     }
 
-    public MultiCountMetric getPolicyCounter() {
+    public StreamCounter getPolicyCounter() {
         return policyCounter;
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/dcefa206/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/impl/AlertBoltOutputCollectorWrapper.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/impl/AlertBoltOutputCollectorWrapper.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/impl/AlertBoltOutputCollectorWrapper.java
index b1be2da..af2b9f8 100755
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/impl/AlertBoltOutputCollectorWrapper.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/impl/AlertBoltOutputCollectorWrapper.java
@@ -64,7 +64,7 @@ public class AlertBoltOutputCollectorWrapper implements AlertStreamCollector {
             }
 
             synchronized (outputLock) {
-                streamContext.counter().scope("alert_count").incr();
+                streamContext.counter().incr("alert_count");
                 delegate.emit(Arrays.asList(cloned, event));
             }
         }

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/dcefa206/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/impl/AlertStreamCallback.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/impl/AlertStreamCallback.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/impl/AlertStreamCallback.java
index ee1853c..6b6e0d5 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/impl/AlertStreamCallback.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/impl/AlertStreamCallback.java
@@ -90,6 +90,6 @@ public class AlertStreamCallback extends StreamCallback {
                 LOG.error(String.format("send event %s to index %d failed with exception. ", event, currentIndex), ex);
             }
         }
-        context.getPolicyCounter().scope(String.format("%s.%s", this.context.getPolicyDefinition().getName(), "alert_count")).incrBy(events.length);
+        context.getPolicyCounter().incrBy(String.format("%s.%s", this.context.getPolicyDefinition().getName(), "alert_count"), events.length);
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/dcefa206/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/impl/PolicyGroupEvaluatorImpl.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/impl/PolicyGroupEvaluatorImpl.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/impl/PolicyGroupEvaluatorImpl.java
index eed4b3b..9b1d76c 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/impl/PolicyGroupEvaluatorImpl.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/impl/PolicyGroupEvaluatorImpl.java
@@ -57,7 +57,7 @@ public class PolicyGroupEvaluatorImpl implements PolicyGroupEvaluator {
     }
 
     public void nextEvent(PartitionedEvent event) {
-        this.context.counter().scope("receive_count").incr();
+        this.context.counter().incr("receive_count");
         dispatch(event);
     }
 
@@ -87,19 +87,19 @@ public class PolicyGroupEvaluatorImpl implements PolicyGroupEvaluator {
             if (isAcceptedByPolicy(partitionedEvent, policyDefinitionMap.get(policyStreamHandler.getKey()))) {
                 try {
                     handled = true;
-                    this.context.counter().scope("eval_count").incr();
+                    this.context.counter().incr("eval_count");
                     policyStreamHandler.getValue().send(partitionedEvent.getEvent());
                 } catch (Exception e) {
-                    this.context.counter().scope("fail_count").incr();
+                    this.context.counter().incr("fail_count");
                     LOG.error("{} failed to handle {}", policyStreamHandler.getValue(), partitionedEvent.getEvent(), e);
                 }
             }
         }
         if (!handled) {
-            this.context.counter().scope("drop_count").incr();
+            this.context.counter().incr("drop_count");
             LOG.warn("Drop stream non-matched event {}, which should not be sent to evaluator", partitionedEvent);
         } else {
-            this.context.counter().scope("accept_count").incr();
+            this.context.counter().incr("accept_count");
         }
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/dcefa206/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/impl/SiddhiPolicyHandler.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/impl/SiddhiPolicyHandler.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/impl/SiddhiPolicyHandler.java
index c668935..72aca06 100755
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/impl/SiddhiPolicyHandler.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/impl/SiddhiPolicyHandler.java
@@ -88,18 +88,18 @@ public class SiddhiPolicyHandler implements PolicyStreamHandler {
     }
 
     public void send(StreamEvent event) throws Exception {
-        context.getPolicyCounter().scope(String.format("%s.%s", this.context.getPolicyDefinition().getName(), "receive_count")).incr();
+        context.getPolicyCounter().incr(String.format("%s.%s", this.context.getPolicyDefinition().getName(), "receive_count"));
         String streamId = event.getStreamId();
         InputHandler inputHandler = executionRuntime.getInputHandler(streamId);
         if (inputHandler != null) {
-            context.getPolicyCounter().scope(String.format("%s.%s", this.context.getPolicyDefinition().getName(), "eval_count")).incr();
+            context.getPolicyCounter().incr(String.format("%s.%s", this.context.getPolicyDefinition().getName(), "eval_count"));
             inputHandler.send(event.getTimestamp(), event.getData());
 
             if (LOG.isDebugEnabled()) {
                 LOG.debug("sent event to siddhi stream {} ", streamId);
             }
         } else {
-            context.getPolicyCounter().scope(String.format("%s.%s", this.context.getPolicyDefinition().getName(), "drop_count")).incr();
+            context.getPolicyCounter().incr(String.format("%s.%s", this.context.getPolicyDefinition().getName(), "drop_count"));
             LOG.warn("No input handler found for stream {}", streamId);
         }
     }

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/dcefa206/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/router/impl/StreamRouterBoltOutputCollector.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/router/impl/StreamRouterBoltOutputCollector.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/router/impl/StreamRouterBoltOutputCollector.java
index 3a53b44..77e8daa 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/router/impl/StreamRouterBoltOutputCollector.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/router/impl/StreamRouterBoltOutputCollector.java
@@ -68,7 +68,7 @@ public class StreamRouterBoltOutputCollector implements PartitionedEventCollecto
 
     public void emit(PartitionedEvent event) {
         try {
-            this.streamContext.counter().scope("send_count").incr();
+            this.streamContext.counter().incr("send_count");
             StreamPartition partition = event.getPartition();
             List<StreamRouterSpec> routerSpecs = routeSpecMap.get(partition);
             if (routerSpecs == null || routerSpecs.size() <= 0) {
@@ -83,7 +83,7 @@ public class StreamRouterBoltOutputCollector implements PartitionedEventCollecto
             if (routePartitionerMap.get(partition) == null) {
                 LOG.error("Partitioner for " + routerSpecs.get(0) + " is null");
                 synchronized (outputLock) {
-                    this.streamContext.counter().scope("fail_count").incr();
+                    this.streamContext.counter().incr("fail_count");
                     this.outputCollector.fail(event.getAnchor());
                 }
                 return;
@@ -111,9 +111,9 @@ public class StreamRouterBoltOutputCollector implements PartitionedEventCollecto
                             } else {
                                 outputCollector.emit(targetStreamId, event.getAnchor(), Collections.singletonList(serializer.serialize(emittedEvent)));
                             }
-                            this.streamContext.counter().scope("emit_count").incr();
+                            this.streamContext.counter().incr("emit_count");
                         } catch (RuntimeException ex) {
-                            this.streamContext.counter().scope("fail_count").incr();
+                            this.streamContext.counter().incr("fail_count");
                             LOG.error("Failed to emit to {} with {}", targetStreamId, newEvent, ex);
                             throw ex;
                         }
@@ -124,7 +124,7 @@ public class StreamRouterBoltOutputCollector implements PartitionedEventCollecto
         } catch (Exception ex) {
             LOG.error(ex.getMessage(), ex);
             synchronized (outputLock) {
-                this.streamContext.counter().scope("fail_count").incr();
+                this.streamContext.counter().incr("fail_count");
                 this.outputCollector.fail(event.getAnchor());
             }
         }
@@ -217,7 +217,7 @@ public class StreamRouterBoltOutputCollector implements PartitionedEventCollecto
     @Override
     public void drop(PartitionedEvent event) {
         synchronized (outputLock) {
-            this.streamContext.counter().scope("drop_count").incr();
+            this.streamContext.counter().incr("drop_count");
             if (event.getAnchor() != null) {
                 this.outputCollector.ack(event.getAnchor());
             } else {

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/dcefa206/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/router/impl/StreamRouterImpl.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/router/impl/StreamRouterImpl.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/router/impl/StreamRouterImpl.java
index 7b2a1de..41523cc 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/router/impl/StreamRouterImpl.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/router/impl/StreamRouterImpl.java
@@ -72,13 +72,13 @@ public class StreamRouterImpl implements StreamRouter {
      * @param event StreamEvent
      */
     public void nextEvent(PartitionedEvent event) {
-        this.context.counter().scope("receive_count").incr();
+        this.context.counter().incr("receive_count");
         if (!dispatchToSortHandler(event)) {
-            this.context.counter().scope("direct_count").incr();
+            this.context.counter().incr("direct_count");
             // Pass through directly if no need to sort
             outputCollector.emit(event);
         }
-        this.context.counter().scope("sort_count").incr();
+        this.context.counter().incr("sort_count");
         // Update stream clock time if moving forward and trigger all tick listeners
         streamTimeClockManager.onTimeUpdate(event.getStreamId(), event.getTimestamp());
     }
@@ -96,7 +96,7 @@ public class StreamRouterImpl implements StreamRouter {
         if (sortHandler == null) {
             if (event.isSortRequired()) {
                 LOG.warn("Stream sort handler required has not been loaded so emmit directly: {}", event);
-                this.context.counter().scope("miss_sort_count").incr();
+                this.context.counter().incr("miss_sort_count");
             }
             return false;
         } else {

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/dcefa206/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/AlertBolt.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/AlertBolt.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/AlertBolt.java
index 627a218..c946fee 100755
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/AlertBolt.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/AlertBolt.java
@@ -89,7 +89,7 @@ public class AlertBolt extends AbstractStreamBolt implements AlertBoltSpecListen
 
     @Override
     public void execute(Tuple input) {
-        this.streamContext.counter().scope("execute_count").incr();
+        this.streamContext.counter().incr("execute_count");
         try {
             PartitionedEvent pe = deserialize(input.getValueByField(AlertConstants.FIELD_0));
             if (logEventEnabled) {
@@ -118,7 +118,7 @@ public class AlertBolt extends AbstractStreamBolt implements AlertBoltSpecListen
                 LOG.warn(message);
 
                 // send out metrics for meta conflict
-                this.streamContext.counter().scope("meta_conflict").incr();
+                this.streamContext.counter().incr("meta_conflict");
 
                 ExecutorService executors = SingletonExecutor.getExecutorService();
                 executors.submit(() -> {
@@ -144,11 +144,11 @@ public class AlertBolt extends AbstractStreamBolt implements AlertBoltSpecListen
             synchronized (outputLock) {
                 this.collector.ack(input);
             }
-            this.streamContext.counter().scope("ack_count").incr();
+            this.streamContext.counter().incr("ack_count");
         } catch (Exception ex) {
             LOG.error(ex.getMessage(), ex);
             synchronized (outputLock) {
-                this.streamContext.counter().scope("fail_count").incr();
+                this.streamContext.counter().incr("fail_count");
                 this.collector.fail(input);
             }
         } finally {

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/dcefa206/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/StreamRouterBolt.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/StreamRouterBolt.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/StreamRouterBolt.java
index 6c39189..7acd7e4 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/StreamRouterBolt.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/StreamRouterBolt.java
@@ -78,10 +78,10 @@ public class StreamRouterBolt extends AbstractStreamBolt implements StreamRouter
     @Override
     public void execute(Tuple input) {
         try {
-            this.streamContext.counter().scope("execute_count").incr();
+            this.streamContext.counter().incr("execute_count");
             this.router.nextEvent(deserialize(input.getValueByField(AlertConstants.FIELD_0)).withAnchor(input));
         } catch (Exception ex) {
-            this.streamContext.counter().scope("fail_count").incr();
+            this.streamContext.counter().incr("fail_count");
             LOG.error(ex.getMessage(), ex);
             this.collector.fail(input);
         }

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/dcefa206/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/evaluator/SiddhiCEPPolicyEventHandlerTest.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/evaluator/SiddhiCEPPolicyEventHandlerTest.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/evaluator/SiddhiCEPPolicyEventHandlerTest.java
index 476d71f..89039f5 100755
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/evaluator/SiddhiCEPPolicyEventHandlerTest.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/evaluator/SiddhiCEPPolicyEventHandlerTest.java
@@ -18,6 +18,7 @@ package org.apache.eagle.alert.engine.evaluator;
 
 import backtype.storm.metric.api.MultiCountMetric;
 import org.apache.eagle.alert.engine.Collector;
+import org.apache.eagle.alert.engine.StormMultiCountMetric;
 import org.apache.eagle.alert.engine.coordinator.PolicyDefinition;
 import org.apache.eagle.alert.engine.coordinator.StreamDefinition;
 import org.apache.eagle.alert.engine.evaluator.impl.PolicyGroupEvaluatorImpl;
@@ -59,7 +60,7 @@ public class SiddhiCEPPolicyEventHandlerTest {
         PolicyDefinition policyDefinition = MockSampleMetadataFactory.createSingleMetricSamplePolicy();
         PolicyHandlerContext context = new PolicyHandlerContext();
         context.setPolicyDefinition(policyDefinition);
-        context.setPolicyCounter(new MultiCountMetric());
+        context.setPolicyCounter(new StormMultiCountMetric(new MultiCountMetric()));
         context.setPolicyEvaluator(new PolicyGroupEvaluatorImpl("evalutorId"));
         handler.prepare(collector, context);
         StreamEvent event = StreamEvent.builder()
@@ -104,7 +105,7 @@ public class SiddhiCEPPolicyEventHandlerTest {
         handler = new SiddhiPolicyHandler(ssd, 0);
         PolicyHandlerContext context = new PolicyHandlerContext();
         context.setPolicyDefinition(policyDefinition);
-        context.setPolicyCounter(new MultiCountMetric());
+        context.setPolicyCounter(new StormMultiCountMetric(new MultiCountMetric()));
         context.setPolicyEvaluator(new PolicyGroupEvaluatorImpl("evalutorId"));
         handler.prepare(collector, context);