You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@eagle.apache.org by ji...@apache.org on 2016/11/29 03:08:41 UTC
incubator-eagle git commit: [EAGLE-804] make interface StreamContext
more abstract
Repository: incubator-eagle
Updated Branches:
refs/heads/master 70600b260 -> dcefa2068
[EAGLE-804] make interface StreamContext more abstract
- Add interface StreamCounter to abstract count
https://issues.apache.org/jira/browse/EAGLE-804
Author: r7raul1984 <ta...@yhd.com>
Closes #693 from r7raul1984/EAGLE-804.
Project: http://git-wip-us.apache.org/repos/asf/incubator-eagle/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-eagle/commit/dcefa206
Tree: http://git-wip-us.apache.org/repos/asf/incubator-eagle/tree/dcefa206
Diff: http://git-wip-us.apache.org/repos/asf/incubator-eagle/diff/dcefa206
Branch: refs/heads/master
Commit: dcefa206835542c187059878d2ffbae50c1df1a3
Parents: 70600b2
Author: r7raul1984 <ta...@yhd.com>
Authored: Tue Nov 29 11:08:33 2016 +0800
Committer: wujinhu <wu...@126.com>
Committed: Tue Nov 29 11:08:33 2016 +0800
----------------------------------------------------------------------
.../alert/engine/StormMultiCountMetric.java | 26 ++++++++++++++++++++
.../eagle/alert/engine/StreamContext.java | 3 +--
.../eagle/alert/engine/StreamContextImpl.java | 6 ++---
.../eagle/alert/engine/StreamCounter.java | 9 +++++++
.../engine/evaluator/PolicyHandlerContext.java | 8 +++---
.../impl/AlertBoltOutputCollectorWrapper.java | 2 +-
.../evaluator/impl/AlertStreamCallback.java | 2 +-
.../impl/PolicyGroupEvaluatorImpl.java | 10 ++++----
.../evaluator/impl/SiddhiPolicyHandler.java | 6 ++---
.../impl/StreamRouterBoltOutputCollector.java | 12 ++++-----
.../engine/router/impl/StreamRouterImpl.java | 8 +++---
.../eagle/alert/engine/runner/AlertBolt.java | 8 +++---
.../alert/engine/runner/StreamRouterBolt.java | 4 +--
.../SiddhiCEPPolicyEventHandlerTest.java | 5 ++--
14 files changed, 72 insertions(+), 37 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/dcefa206/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/StormMultiCountMetric.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/StormMultiCountMetric.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/StormMultiCountMetric.java
new file mode 100644
index 0000000..aa97b57
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/StormMultiCountMetric.java
@@ -0,0 +1,26 @@
+package org.apache.eagle.alert.engine;
+
+import backtype.storm.metric.api.MultiCountMetric;
+
+public class StormMultiCountMetric implements StreamCounter {
+ private MultiCountMetric countMetric;
+
+ public StormMultiCountMetric(MultiCountMetric counter) {
+ this.countMetric = counter;
+ }
+
+ @Override
+ public void incr(String scopeName) {
+ countMetric.scope(scopeName).incr();
+ }
+
+ @Override
+ public void incrBy(String scopeName, int length) {
+ countMetric.scope(scopeName).incrBy(length);
+ }
+
+ @Override
+ public void scope(String scopeName) {
+ countMetric.scope(scopeName);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/dcefa206/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/StreamContext.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/StreamContext.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/StreamContext.java
index a03932f..bafba83 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/StreamContext.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/StreamContext.java
@@ -1,6 +1,5 @@
package org.apache.eagle.alert.engine;
-import backtype.storm.metric.api.MultiCountMetric;
import com.typesafe.config.Config;
/**
@@ -20,7 +19,7 @@ import com.typesafe.config.Config;
* limitations under the License.
*/
public interface StreamContext {
- MultiCountMetric counter();
+ StreamCounter counter();
Config config();
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/dcefa206/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/StreamContextImpl.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/StreamContextImpl.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/StreamContextImpl.java
index d02028a..e77a41b 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/StreamContextImpl.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/StreamContextImpl.java
@@ -22,15 +22,15 @@ import com.typesafe.config.Config;
public class StreamContextImpl implements StreamContext {
private final Config config;
- private final MultiCountMetric counter;
+ private final StreamCounter counter;
public StreamContextImpl(Config config, MultiCountMetric counter, TopologyContext context) {
- this.counter = counter;
+ this.counter = new StormMultiCountMetric(counter);
this.config = config;
}
@Override
- public MultiCountMetric counter() {
+ public StreamCounter counter() {
return this.counter;
}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/dcefa206/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/StreamCounter.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/StreamCounter.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/StreamCounter.java
new file mode 100644
index 0000000..ff96c30
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/StreamCounter.java
@@ -0,0 +1,9 @@
+package org.apache.eagle.alert.engine;
+
+public interface StreamCounter {
+ void incr(String scopeName);
+
+ void incrBy(String scopeName, int length);
+
+ void scope(String scopeName);
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/dcefa206/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/PolicyHandlerContext.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/PolicyHandlerContext.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/PolicyHandlerContext.java
index 49f7eed..59d9e1f 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/PolicyHandlerContext.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/PolicyHandlerContext.java
@@ -17,14 +17,14 @@
package org.apache.eagle.alert.engine.evaluator;
+import org.apache.eagle.alert.engine.StreamCounter;
import org.apache.eagle.alert.engine.coordinator.PolicyDefinition;
-import backtype.storm.metric.api.MultiCountMetric;
import com.typesafe.config.Config;
public class PolicyHandlerContext {
private PolicyDefinition policyDefinition;
private PolicyGroupEvaluator policyEvaluator;
- private MultiCountMetric policyCounter;
+ private StreamCounter policyCounter;
private String policyEvaluatorId;
private Config config;
@@ -44,11 +44,11 @@ public class PolicyHandlerContext {
this.policyEvaluator = policyEvaluator;
}
- public void setPolicyCounter(MultiCountMetric metric) {
+ public void setPolicyCounter(StreamCounter metric) {
this.policyCounter = metric;
}
- public MultiCountMetric getPolicyCounter() {
+ public StreamCounter getPolicyCounter() {
return policyCounter;
}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/dcefa206/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/impl/AlertBoltOutputCollectorWrapper.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/impl/AlertBoltOutputCollectorWrapper.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/impl/AlertBoltOutputCollectorWrapper.java
index b1be2da..af2b9f8 100755
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/impl/AlertBoltOutputCollectorWrapper.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/impl/AlertBoltOutputCollectorWrapper.java
@@ -64,7 +64,7 @@ public class AlertBoltOutputCollectorWrapper implements AlertStreamCollector {
}
synchronized (outputLock) {
- streamContext.counter().scope("alert_count").incr();
+ streamContext.counter().incr("alert_count");
delegate.emit(Arrays.asList(cloned, event));
}
}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/dcefa206/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/impl/AlertStreamCallback.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/impl/AlertStreamCallback.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/impl/AlertStreamCallback.java
index ee1853c..6b6e0d5 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/impl/AlertStreamCallback.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/impl/AlertStreamCallback.java
@@ -90,6 +90,6 @@ public class AlertStreamCallback extends StreamCallback {
LOG.error(String.format("send event %s to index %d failed with exception. ", event, currentIndex), ex);
}
}
- context.getPolicyCounter().scope(String.format("%s.%s", this.context.getPolicyDefinition().getName(), "alert_count")).incrBy(events.length);
+ context.getPolicyCounter().incrBy(String.format("%s.%s", this.context.getPolicyDefinition().getName(), "alert_count"), events.length);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/dcefa206/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/impl/PolicyGroupEvaluatorImpl.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/impl/PolicyGroupEvaluatorImpl.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/impl/PolicyGroupEvaluatorImpl.java
index eed4b3b..9b1d76c 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/impl/PolicyGroupEvaluatorImpl.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/impl/PolicyGroupEvaluatorImpl.java
@@ -57,7 +57,7 @@ public class PolicyGroupEvaluatorImpl implements PolicyGroupEvaluator {
}
public void nextEvent(PartitionedEvent event) {
- this.context.counter().scope("receive_count").incr();
+ this.context.counter().incr("receive_count");
dispatch(event);
}
@@ -87,19 +87,19 @@ public class PolicyGroupEvaluatorImpl implements PolicyGroupEvaluator {
if (isAcceptedByPolicy(partitionedEvent, policyDefinitionMap.get(policyStreamHandler.getKey()))) {
try {
handled = true;
- this.context.counter().scope("eval_count").incr();
+ this.context.counter().incr("eval_count");
policyStreamHandler.getValue().send(partitionedEvent.getEvent());
} catch (Exception e) {
- this.context.counter().scope("fail_count").incr();
+ this.context.counter().incr("fail_count");
LOG.error("{} failed to handle {}", policyStreamHandler.getValue(), partitionedEvent.getEvent(), e);
}
}
}
if (!handled) {
- this.context.counter().scope("drop_count").incr();
+ this.context.counter().incr("drop_count");
LOG.warn("Drop stream non-matched event {}, which should not be sent to evaluator", partitionedEvent);
} else {
- this.context.counter().scope("accept_count").incr();
+ this.context.counter().incr("accept_count");
}
}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/dcefa206/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/impl/SiddhiPolicyHandler.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/impl/SiddhiPolicyHandler.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/impl/SiddhiPolicyHandler.java
index c668935..72aca06 100755
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/impl/SiddhiPolicyHandler.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/impl/SiddhiPolicyHandler.java
@@ -88,18 +88,18 @@ public class SiddhiPolicyHandler implements PolicyStreamHandler {
}
public void send(StreamEvent event) throws Exception {
- context.getPolicyCounter().scope(String.format("%s.%s", this.context.getPolicyDefinition().getName(), "receive_count")).incr();
+ context.getPolicyCounter().incr(String.format("%s.%s", this.context.getPolicyDefinition().getName(), "receive_count"));
String streamId = event.getStreamId();
InputHandler inputHandler = executionRuntime.getInputHandler(streamId);
if (inputHandler != null) {
- context.getPolicyCounter().scope(String.format("%s.%s", this.context.getPolicyDefinition().getName(), "eval_count")).incr();
+ context.getPolicyCounter().incr(String.format("%s.%s", this.context.getPolicyDefinition().getName(), "eval_count"));
inputHandler.send(event.getTimestamp(), event.getData());
if (LOG.isDebugEnabled()) {
LOG.debug("sent event to siddhi stream {} ", streamId);
}
} else {
- context.getPolicyCounter().scope(String.format("%s.%s", this.context.getPolicyDefinition().getName(), "drop_count")).incr();
+ context.getPolicyCounter().incr(String.format("%s.%s", this.context.getPolicyDefinition().getName(), "drop_count"));
LOG.warn("No input handler found for stream {}", streamId);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/dcefa206/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/router/impl/StreamRouterBoltOutputCollector.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/router/impl/StreamRouterBoltOutputCollector.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/router/impl/StreamRouterBoltOutputCollector.java
index 3a53b44..77e8daa 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/router/impl/StreamRouterBoltOutputCollector.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/router/impl/StreamRouterBoltOutputCollector.java
@@ -68,7 +68,7 @@ public class StreamRouterBoltOutputCollector implements PartitionedEventCollecto
public void emit(PartitionedEvent event) {
try {
- this.streamContext.counter().scope("send_count").incr();
+ this.streamContext.counter().incr("send_count");
StreamPartition partition = event.getPartition();
List<StreamRouterSpec> routerSpecs = routeSpecMap.get(partition);
if (routerSpecs == null || routerSpecs.size() <= 0) {
@@ -83,7 +83,7 @@ public class StreamRouterBoltOutputCollector implements PartitionedEventCollecto
if (routePartitionerMap.get(partition) == null) {
LOG.error("Partitioner for " + routerSpecs.get(0) + " is null");
synchronized (outputLock) {
- this.streamContext.counter().scope("fail_count").incr();
+ this.streamContext.counter().incr("fail_count");
this.outputCollector.fail(event.getAnchor());
}
return;
@@ -111,9 +111,9 @@ public class StreamRouterBoltOutputCollector implements PartitionedEventCollecto
} else {
outputCollector.emit(targetStreamId, event.getAnchor(), Collections.singletonList(serializer.serialize(emittedEvent)));
}
- this.streamContext.counter().scope("emit_count").incr();
+ this.streamContext.counter().incr("emit_count");
} catch (RuntimeException ex) {
- this.streamContext.counter().scope("fail_count").incr();
+ this.streamContext.counter().incr("fail_count");
LOG.error("Failed to emit to {} with {}", targetStreamId, newEvent, ex);
throw ex;
}
@@ -124,7 +124,7 @@ public class StreamRouterBoltOutputCollector implements PartitionedEventCollecto
} catch (Exception ex) {
LOG.error(ex.getMessage(), ex);
synchronized (outputLock) {
- this.streamContext.counter().scope("fail_count").incr();
+ this.streamContext.counter().incr("fail_count");
this.outputCollector.fail(event.getAnchor());
}
}
@@ -217,7 +217,7 @@ public class StreamRouterBoltOutputCollector implements PartitionedEventCollecto
@Override
public void drop(PartitionedEvent event) {
synchronized (outputLock) {
- this.streamContext.counter().scope("drop_count").incr();
+ this.streamContext.counter().incr("drop_count");
if (event.getAnchor() != null) {
this.outputCollector.ack(event.getAnchor());
} else {
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/dcefa206/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/router/impl/StreamRouterImpl.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/router/impl/StreamRouterImpl.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/router/impl/StreamRouterImpl.java
index 7b2a1de..41523cc 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/router/impl/StreamRouterImpl.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/router/impl/StreamRouterImpl.java
@@ -72,13 +72,13 @@ public class StreamRouterImpl implements StreamRouter {
* @param event StreamEvent
*/
public void nextEvent(PartitionedEvent event) {
- this.context.counter().scope("receive_count").incr();
+ this.context.counter().incr("receive_count");
if (!dispatchToSortHandler(event)) {
- this.context.counter().scope("direct_count").incr();
+ this.context.counter().incr("direct_count");
// Pass through directly if no need to sort
outputCollector.emit(event);
}
- this.context.counter().scope("sort_count").incr();
+ this.context.counter().incr("sort_count");
// Update stream clock time if moving forward and trigger all tick listeners
streamTimeClockManager.onTimeUpdate(event.getStreamId(), event.getTimestamp());
}
@@ -96,7 +96,7 @@ public class StreamRouterImpl implements StreamRouter {
if (sortHandler == null) {
if (event.isSortRequired()) {
LOG.warn("Stream sort handler required has not been loaded so emmit directly: {}", event);
- this.context.counter().scope("miss_sort_count").incr();
+ this.context.counter().incr("miss_sort_count");
}
return false;
} else {
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/dcefa206/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/AlertBolt.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/AlertBolt.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/AlertBolt.java
index 627a218..c946fee 100755
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/AlertBolt.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/AlertBolt.java
@@ -89,7 +89,7 @@ public class AlertBolt extends AbstractStreamBolt implements AlertBoltSpecListen
@Override
public void execute(Tuple input) {
- this.streamContext.counter().scope("execute_count").incr();
+ this.streamContext.counter().incr("execute_count");
try {
PartitionedEvent pe = deserialize(input.getValueByField(AlertConstants.FIELD_0));
if (logEventEnabled) {
@@ -118,7 +118,7 @@ public class AlertBolt extends AbstractStreamBolt implements AlertBoltSpecListen
LOG.warn(message);
// send out metrics for meta conflict
- this.streamContext.counter().scope("meta_conflict").incr();
+ this.streamContext.counter().incr("meta_conflict");
ExecutorService executors = SingletonExecutor.getExecutorService();
executors.submit(() -> {
@@ -144,11 +144,11 @@ public class AlertBolt extends AbstractStreamBolt implements AlertBoltSpecListen
synchronized (outputLock) {
this.collector.ack(input);
}
- this.streamContext.counter().scope("ack_count").incr();
+ this.streamContext.counter().incr("ack_count");
} catch (Exception ex) {
LOG.error(ex.getMessage(), ex);
synchronized (outputLock) {
- this.streamContext.counter().scope("fail_count").incr();
+ this.streamContext.counter().incr("fail_count");
this.collector.fail(input);
}
} finally {
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/dcefa206/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/StreamRouterBolt.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/StreamRouterBolt.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/StreamRouterBolt.java
index 6c39189..7acd7e4 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/StreamRouterBolt.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/StreamRouterBolt.java
@@ -78,10 +78,10 @@ public class StreamRouterBolt extends AbstractStreamBolt implements StreamRouter
@Override
public void execute(Tuple input) {
try {
- this.streamContext.counter().scope("execute_count").incr();
+ this.streamContext.counter().incr("execute_count");
this.router.nextEvent(deserialize(input.getValueByField(AlertConstants.FIELD_0)).withAnchor(input));
} catch (Exception ex) {
- this.streamContext.counter().scope("fail_count").incr();
+ this.streamContext.counter().incr("fail_count");
LOG.error(ex.getMessage(), ex);
this.collector.fail(input);
}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/dcefa206/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/evaluator/SiddhiCEPPolicyEventHandlerTest.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/evaluator/SiddhiCEPPolicyEventHandlerTest.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/evaluator/SiddhiCEPPolicyEventHandlerTest.java
index 476d71f..89039f5 100755
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/evaluator/SiddhiCEPPolicyEventHandlerTest.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/evaluator/SiddhiCEPPolicyEventHandlerTest.java
@@ -18,6 +18,7 @@ package org.apache.eagle.alert.engine.evaluator;
import backtype.storm.metric.api.MultiCountMetric;
import org.apache.eagle.alert.engine.Collector;
+import org.apache.eagle.alert.engine.StormMultiCountMetric;
import org.apache.eagle.alert.engine.coordinator.PolicyDefinition;
import org.apache.eagle.alert.engine.coordinator.StreamDefinition;
import org.apache.eagle.alert.engine.evaluator.impl.PolicyGroupEvaluatorImpl;
@@ -59,7 +60,7 @@ public class SiddhiCEPPolicyEventHandlerTest {
PolicyDefinition policyDefinition = MockSampleMetadataFactory.createSingleMetricSamplePolicy();
PolicyHandlerContext context = new PolicyHandlerContext();
context.setPolicyDefinition(policyDefinition);
- context.setPolicyCounter(new MultiCountMetric());
+ context.setPolicyCounter(new StormMultiCountMetric(new MultiCountMetric()));
context.setPolicyEvaluator(new PolicyGroupEvaluatorImpl("evalutorId"));
handler.prepare(collector, context);
StreamEvent event = StreamEvent.builder()
@@ -104,7 +105,7 @@ public class SiddhiCEPPolicyEventHandlerTest {
handler = new SiddhiPolicyHandler(ssd, 0);
PolicyHandlerContext context = new PolicyHandlerContext();
context.setPolicyDefinition(policyDefinition);
- context.setPolicyCounter(new MultiCountMetric());
+ context.setPolicyCounter(new StormMultiCountMetric(new MultiCountMetric()));
context.setPolicyEvaluator(new PolicyGroupEvaluatorImpl("evalutorId"));
handler.prepare(collector, context);