You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@eagle.apache.org by ra...@apache.org on 2016/07/28 11:22:08 UTC
incubator-eagle git commit: EAGLE-401: StreamRouterBolt and
PublishBolt also have NPE
Repository: incubator-eagle
Updated Branches:
refs/heads/develop 7e9dfaf3d -> 994a1e584
EAGLE-401: StreamRouterBolt and PublishBolt also have NPE
make all stream bolts avoid NPE exception, instead throw stream not found exception for easy troublesooting
Author: ralphsu
Reviewer: ralphsu
Closes #284
Project: http://git-wip-us.apache.org/repos/asf/incubator-eagle/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-eagle/commit/994a1e58
Tree: http://git-wip-us.apache.org/repos/asf/incubator-eagle/tree/994a1e58
Diff: http://git-wip-us.apache.org/repos/asf/incubator-eagle/diff/994a1e58
Branch: refs/heads/develop
Commit: 994a1e584432e8bef5463be57faf955553bd4d01
Parents: 7e9dfaf
Author: Ralph, Su <su...@gmail.com>
Authored: Thu Jul 28 19:21:03 2016 +0800
Committer: Ralph, Su <su...@gmail.com>
Committed: Thu Jul 28 19:21:03 2016 +0800
----------------------------------------------------------------------
.../eagle/alert/utils/StreamIdConversion.java | 4 +-
.../engine/evaluator/PolicyHandlerContext.java | 10 ++--
.../impl/PolicyGroupEvaluatorImpl.java | 10 ++--
.../evaluator/impl/SiddhiPolicyHandler.java | 4 +-
.../evaluator/nodata/NoDataPolicyHandler.java | 4 +-
.../alert/engine/runner/AbstractStreamBolt.java | 51 ++++++++++++++++-
.../eagle/alert/engine/runner/AlertBolt.java | 36 ++----------
.../alert/engine/runner/AlertPublisherBolt.java | 33 ++++++-----
.../alert/engine/runner/StreamRouterBolt.java | 60 ++++++--------------
.../alert/engine/runner/UnitTopologyRunner.java | 37 +++++-------
.../alert/engine/router/TestAlertBolt.java | 2 +-
.../engine/router/TestAlertPublisherBolt.java | 3 +-
.../engine/runner/TestStreamRouterBolt.java | 31 +++++-----
.../metadata/resource/MetadataResource.java | 4 ++
14 files changed, 137 insertions(+), 152 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/994a1e58/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/utils/StreamIdConversion.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/utils/StreamIdConversion.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/utils/StreamIdConversion.java
index ccdb6f3..88661d1 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/utils/StreamIdConversion.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/utils/StreamIdConversion.java
@@ -17,8 +17,8 @@ package org.apache.eagle.alert.utils;
* limitations under the License.
*/
public class StreamIdConversion {
- private final static String STREAM_ID_TEMPLATE = "stream_%s_to_%s";
- private final static String STREAM_ID_NUM_TEMPLATE = "stream_%s";
+ public final static String STREAM_ID_TEMPLATE = "stream_%s_to_%s";
+ public final static String STREAM_ID_NUM_TEMPLATE = "stream_%s";
public static String generateStreamIdBetween(String sourceId, String targetId){
return String.format(STREAM_ID_TEMPLATE,sourceId,targetId);
}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/994a1e58/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/PolicyHandlerContext.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/PolicyHandlerContext.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/PolicyHandlerContext.java
index 2898ebc..285ca13 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/PolicyHandlerContext.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/PolicyHandlerContext.java
@@ -22,7 +22,7 @@ import backtype.storm.metric.api.MultiCountMetric;
*/
public class PolicyHandlerContext {
private PolicyDefinition policyDefinition;
- private PolicyGroupEvaluator parentEvaluator;
+ private PolicyGroupEvaluator policyEvaluator;
private MultiCountMetric policyCounter;
private String policyEvaluatorId;
@@ -34,12 +34,12 @@ public class PolicyHandlerContext {
this.policyDefinition = policyDefinition;
}
- public PolicyGroupEvaluator getParentEvaluator() {
- return parentEvaluator;
+ public PolicyGroupEvaluator getPolicyEvaluator() {
+ return policyEvaluator;
}
- public void setParentEvaluator(PolicyGroupEvaluator parentEvaluator) {
- this.parentEvaluator = parentEvaluator;
+ public void setPolicyEvaluator(PolicyGroupEvaluator policyEvaluator) {
+ this.policyEvaluator = policyEvaluator;
}
public void setPolicyCounter(MultiCountMetric metric) {
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/994a1e58/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/impl/PolicyGroupEvaluatorImpl.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/impl/PolicyGroupEvaluatorImpl.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/impl/PolicyGroupEvaluatorImpl.java
index 8a1f04a..228b7fb 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/impl/PolicyGroupEvaluatorImpl.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/impl/PolicyGroupEvaluatorImpl.java
@@ -16,10 +16,6 @@
*/
package org.apache.eagle.alert.engine.evaluator.impl;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
import org.apache.eagle.alert.engine.AlertStreamCollector;
import org.apache.eagle.alert.engine.StreamContext;
import org.apache.eagle.alert.engine.coordinator.PolicyDefinition;
@@ -32,6 +28,10 @@ import org.apache.eagle.alert.engine.model.PartitionedEvent;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
public class PolicyGroupEvaluatorImpl implements PolicyGroupEvaluator {
private static final long serialVersionUID = -5499413193675985288L;
@@ -141,7 +141,7 @@ public class PolicyGroupEvaluatorImpl implements PolicyGroupEvaluator {
PolicyHandlerContext context = new PolicyHandlerContext();
context.setPolicyCounter(this.context.counter());
context.setPolicyDefinition(policy);
- context.setParentEvaluator(this);
+ context.setPolicyEvaluator(this);
context.setPolicyEvaluatorId(policyEvaluatorId);
handler.prepare(collector, context);
handlers.put(policy.getName(), handler);
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/994a1e58/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/impl/SiddhiPolicyHandler.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/impl/SiddhiPolicyHandler.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/impl/SiddhiPolicyHandler.java
index ed26408..b9d1eb2 100755
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/impl/SiddhiPolicyHandler.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/impl/SiddhiPolicyHandler.java
@@ -85,8 +85,8 @@ public class SiddhiPolicyHandler implements PolicyStreamHandler {
event.setData(e.getData());
event.setStreamId(outputStream);
event.setPolicy(context.getPolicyDefinition());
- if (this.context.getParentEvaluator() != null) {
- event.setCreatedBy(context.getParentEvaluator().getName());
+ if (this.context.getPolicyEvaluator() != null) {
+ event.setCreatedBy(context.getPolicyEvaluator().getName());
}
event.setCreatedTime(System.currentTimeMillis());
event.setSchema(definition);
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/994a1e58/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/nodata/NoDataPolicyHandler.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/nodata/NoDataPolicyHandler.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/nodata/NoDataPolicyHandler.java
index 0e9ab6c..9ad7529 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/nodata/NoDataPolicyHandler.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/nodata/NoDataPolicyHandler.java
@@ -191,8 +191,8 @@ public class NoDataPolicyHandler implements PolicyStreamHandler{
event.setData(triggerEvent);
event.setStreamId(policyDef.getOutputStreams().get(0));
event.setPolicy(context.getPolicyDefinition());
- if (this.context.getParentEvaluator() != null) {
- event.setCreatedBy(context.getParentEvaluator().getName());
+ if (this.context.getPolicyEvaluator() != null) {
+ event.setCreatedBy(context.getPolicyEvaluator().getName());
}
event.setCreatedTime(System.currentTimeMillis());
event.setSchema(sd);
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/994a1e58/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/AbstractStreamBolt.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/AbstractStreamBolt.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/AbstractStreamBolt.java
index 131d85a..fe896d1 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/AbstractStreamBolt.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/AbstractStreamBolt.java
@@ -16,10 +16,21 @@
*/
package org.apache.eagle.alert.engine.runner;
+import java.io.IOException;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import backtype.storm.metric.api.MultiCountMetric;
+import org.apache.eagle.alert.engine.StreamContext;
+import org.apache.eagle.alert.engine.StreamContextImpl;
import org.apache.eagle.alert.engine.coordinator.IMetadataChangeNotifyService;
+import org.apache.eagle.alert.engine.coordinator.StreamDefinition;
+import org.apache.eagle.alert.engine.coordinator.StreamDefinitionNotFoundException;
+import org.apache.eagle.alert.engine.model.PartitionedEvent;
+import org.apache.eagle.alert.engine.serialization.PartitionedEventSerializer;
+import org.apache.eagle.alert.engine.serialization.SerializationMetadataProvider;
+import org.apache.eagle.alert.engine.serialization.Serializers;
import org.apache.eagle.alert.utils.AlertConstants;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -34,7 +45,7 @@ import com.google.common.base.Preconditions;
import com.typesafe.config.Config;
@SuppressWarnings({"rawtypes", "serial"})
-public abstract class AbstractStreamBolt extends BaseRichBolt {
+public abstract class AbstractStreamBolt extends BaseRichBolt implements SerializationMetadataProvider {
private static final Logger LOG = LoggerFactory.getLogger(AbstractStreamBolt.class);
private IMetadataChangeNotifyService changeNotifyService;
private Config config;
@@ -42,7 +53,14 @@ public abstract class AbstractStreamBolt extends BaseRichBolt {
protected OutputCollector collector;
protected Map stormConf;
- public AbstractStreamBolt(IMetadataChangeNotifyService changeNotifyService, Config config){
+ private String boltId;
+ protected PartitionedEventSerializer serializer;
+ protected volatile Map<String, StreamDefinition> sdf = new HashMap<String, StreamDefinition>();
+ protected volatile String specVersion = "Not Initialized";
+ protected StreamContext streamContext;
+
+ public AbstractStreamBolt(String boltId, IMetadataChangeNotifyService changeNotifyService, Config config) {
+ this.boltId = boltId;
this.changeNotifyService = changeNotifyService;
this.config = config;
}
@@ -57,15 +75,29 @@ public abstract class AbstractStreamBolt extends BaseRichBolt {
@Override
public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
- this.stormConf = stormConf;
Preconditions.checkNotNull(this.changeNotifyService, "IMetadataChangeNotifyService is not set yet");
+ this.stormConf = stormConf;
this.collector = collector;
+ this.serializer = Serializers.newPartitionedEventSerializer(this);
internalPrepare(collector,this.changeNotifyService,this.config,context);
}
+
+ protected PartitionedEvent deserialize(Object object) throws IOException {
+ // byte[] in higher priority
+ if(object instanceof byte[]) {
+ return serializer.deserialize((byte[]) object);
+ } else if (object instanceof PartitionedEvent){
+ return (PartitionedEvent) object;
+ } else {
+ throw new IllegalStateException(String.format("Unsupported event class '%s', expect byte array or PartitionedEvent!", object == null ? null : object.getClass().getCanonicalName()));
+ }
+ }
+
/**
* subclass should implement more initialization for example
* 1) register metadata change
+ * 2) init stream context
* @param collector
* @param metadataManager
* @param config
@@ -92,4 +124,17 @@ public abstract class AbstractStreamBolt extends BaseRichBolt {
declarer.declare(new Fields(AlertConstants.FIELD_0));
}
}
+
+ @Override
+ public StreamDefinition getStreamDefinition(String streamId) throws StreamDefinitionNotFoundException {
+ if (sdf.containsKey(streamId)) {
+ return sdf.get(streamId);
+ } else {
+ throw new StreamDefinitionNotFoundException(streamId, specVersion);
+ }
+ }
+
+ public String getBoltId() {
+ return boltId;
+ }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/994a1e58/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/AlertBolt.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/AlertBolt.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/AlertBolt.java
index 5daa236..683e571 100755
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/AlertBolt.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/AlertBolt.java
@@ -31,6 +31,7 @@ import org.apache.eagle.alert.engine.StreamContextImpl;
import org.apache.eagle.alert.engine.coordinator.*;
import org.apache.eagle.alert.engine.evaluator.PolicyGroupEvaluator;
import org.apache.eagle.alert.engine.evaluator.impl.AlertBoltOutputCollectorWrapper;
+import org.apache.eagle.alert.engine.evaluator.impl.PolicyGroupEvaluatorImpl;
import org.apache.eagle.alert.engine.model.PartitionedEvent;
import org.apache.eagle.alert.engine.router.AlertBoltSpecListener;
import org.apache.eagle.alert.engine.serialization.PartitionedEventSerializer;
@@ -61,26 +62,12 @@ public class AlertBolt extends AbstractStreamBolt implements AlertBoltSpecListen
// mapping from policy name to PolicyDefinition
private volatile Map<String, PolicyDefinition> cachedPolicies = new HashMap<>(); // for one streamGroup, there are multiple policies
- private StreamContext streamContext;
- private volatile Map<String, StreamDefinition> sdf = new HashMap<String, StreamDefinition>();
- private PartitionedEventSerializer serializer;
- private volatile String specVersion = "Not Initialized";
- public AlertBolt(String boltId, PolicyGroupEvaluator policyGroupEvaluator, Config config, IMetadataChangeNotifyService changeNotifyService){
- super(changeNotifyService, config);
+ public AlertBolt(String boltId, Config config, IMetadataChangeNotifyService changeNotifyService){
+ super(boltId, changeNotifyService, config);
this.boltId = boltId;
- this.policyGroupEvaluator = policyGroupEvaluator;
- }
-
- PartitionedEvent deserialize(Object object) throws IOException {
- // byte[] in higher priority
- if(object instanceof byte[]) {
- return serializer.deserialize((byte[]) object);
- } else if (object instanceof PartitionedEvent){
- return (PartitionedEvent) object;
- } else {
- throw new IllegalStateException(String.format("Unsupported event class '%s', expect byte array or PartitionedEvent!", object == null ? null : object.getClass().getCanonicalName()));
- }
+ this.policyGroupEvaluator = new PolicyGroupEvaluatorImpl(boltId + "-evaluator_stage1"); // use bolt id as evaluatorId.
+ // TODO next stage evaluator
}
@Override
@@ -108,7 +95,6 @@ public class AlertBolt extends AbstractStreamBolt implements AlertBoltSpecListen
// instantiate output lock object
outputLock = new Object();
streamContext = new StreamContextImpl(config,context.registerMetric("eagle.evaluator",new MultiCountMetric(),60),context);
- serializer = Serializers.newPartitionedEventSerializer(this);
alertOutputCollector = new AlertBoltOutputCollectorWrapper(collector, outputLock,streamContext);
policyGroupEvaluator.init(streamContext, alertOutputCollector);
metadataChangeNotifyService.registerListener(this);
@@ -149,16 +135,4 @@ public class AlertBolt extends AbstractStreamBolt implements AlertBoltSpecListen
specVersion = spec.getVersion();
}
- @Override
- public StreamDefinition getStreamDefinition(String streamId) throws StreamDefinitionNotFoundException {
- if (sdf.containsKey(streamId)) {
- return sdf.get(streamId);
- } else {
- throw new StreamDefinitionNotFoundException(streamId, specVersion);
- }
- }
-
- public String getBoltId() {
- return boltId;
- }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/994a1e58/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/AlertPublisherBolt.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/AlertPublisherBolt.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/AlertPublisherBolt.java
index 768cf48..5f58536 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/AlertPublisherBolt.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/AlertPublisherBolt.java
@@ -16,11 +16,13 @@
*/
package org.apache.eagle.alert.engine.runner;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
+import backtype.storm.metric.api.MultiCountMetric;
+import backtype.storm.task.OutputCollector;
+import backtype.storm.task.TopologyContext;
+import backtype.storm.topology.OutputFieldsDeclarer;
+import backtype.storm.tuple.Fields;
+import backtype.storm.tuple.Tuple;
+import com.typesafe.config.Config;
import org.apache.eagle.alert.coordination.model.PublishSpec;
import org.apache.eagle.alert.engine.StreamContextImpl;
import org.apache.eagle.alert.engine.coordinator.IMetadataChangeNotifyService;
@@ -30,29 +32,25 @@ import org.apache.eagle.alert.engine.coordinator.StreamDefinition;
import org.apache.eagle.alert.engine.model.AlertStreamEvent;
import org.apache.eagle.alert.engine.publisher.AlertPublishSpecListener;
import org.apache.eagle.alert.engine.publisher.AlertPublisher;
+import org.apache.eagle.alert.engine.publisher.impl.AlertPublisherImpl;
import org.apache.eagle.alert.utils.AlertConstants;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import backtype.storm.metric.api.MultiCountMetric;
-import backtype.storm.task.OutputCollector;
-import backtype.storm.task.TopologyContext;
-import backtype.storm.topology.OutputFieldsDeclarer;
-import backtype.storm.tuple.Fields;
-import backtype.storm.tuple.Tuple;
-
-import com.typesafe.config.Config;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
@SuppressWarnings("serial")
public class AlertPublisherBolt extends AbstractStreamBolt implements AlertPublishSpecListener {
private final static Logger LOG = LoggerFactory.getLogger(AlertPublisherBolt.class);
private final AlertPublisher alertPublisher;
private volatile Map<String, Publishment> cachedPublishments = new HashMap<>();
- private StreamContextImpl streamContext;
- public AlertPublisherBolt(AlertPublisher alertPublisher, Config config, IMetadataChangeNotifyService coordinatorService){
- super(coordinatorService, config);
- this.alertPublisher = alertPublisher;
+ public AlertPublisherBolt(String alertPublisherName, Config config, IMetadataChangeNotifyService coordinatorService){
+ super(alertPublisherName, coordinatorService, config);
+ this.alertPublisher = new AlertPublisherImpl(alertPublisherName);
}
@Override
@@ -109,5 +107,6 @@ public class AlertPublisherBolt extends AbstractStreamBolt implements AlertPubli
// switch
cachedPublishments = newPublishmentsMap;
+ specVersion = pubSpec.getVersion();
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/994a1e58/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/StreamRouterBolt.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/StreamRouterBolt.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/StreamRouterBolt.java
index 85c2f73..0c1d12c 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/StreamRouterBolt.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/StreamRouterBolt.java
@@ -16,43 +16,27 @@
*/
package org.apache.eagle.alert.engine.runner;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
+import backtype.storm.metric.api.MultiCountMetric;
+import backtype.storm.task.OutputCollector;
+import backtype.storm.task.TopologyContext;
+import backtype.storm.tuple.Tuple;
+import com.typesafe.config.Config;
import org.apache.commons.collections.CollectionUtils;
import org.apache.eagle.alert.coordination.model.PolicyWorkerQueue;
import org.apache.eagle.alert.coordination.model.RouterSpec;
import org.apache.eagle.alert.coordination.model.StreamRouterSpec;
-import org.apache.eagle.alert.engine.StreamContext;
import org.apache.eagle.alert.engine.StreamContextImpl;
-import org.apache.eagle.alert.engine.coordinator.IMetadataChangeNotifyService;
-import org.apache.eagle.alert.engine.coordinator.MetadataType;
-import org.apache.eagle.alert.engine.coordinator.StreamDefinition;
-import org.apache.eagle.alert.engine.coordinator.StreamPartition;
-import org.apache.eagle.alert.engine.coordinator.StreamSortSpec;
-import org.apache.eagle.alert.engine.model.PartitionedEvent;
+import org.apache.eagle.alert.engine.coordinator.*;
import org.apache.eagle.alert.engine.router.StreamRouter;
import org.apache.eagle.alert.engine.router.StreamRouterBoltSpecListener;
import org.apache.eagle.alert.engine.router.impl.StreamRouterBoltOutputCollector;
-import org.apache.eagle.alert.engine.serialization.PartitionedEventSerializer;
+import org.apache.eagle.alert.engine.router.impl.StreamRouterImpl;
import org.apache.eagle.alert.engine.serialization.SerializationMetadataProvider;
-import org.apache.eagle.alert.engine.serialization.Serializers;
import org.apache.eagle.alert.utils.AlertConstants;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import backtype.storm.metric.api.MultiCountMetric;
-import backtype.storm.task.OutputCollector;
-import backtype.storm.task.TopologyContext;
-import backtype.storm.tuple.Tuple;
-
-import com.typesafe.config.Config;
+import java.util.*;
public class StreamRouterBolt extends AbstractStreamBolt implements StreamRouterBoltSpecListener, SerializationMetadataProvider{
private final static Logger LOG = LoggerFactory.getLogger(StreamRouterBolt.class);
@@ -63,35 +47,22 @@ public class StreamRouterBolt extends AbstractStreamBolt implements StreamRouter
private volatile Map<StreamPartition, StreamSortSpec> cachedSSS = new HashMap<>();
// mapping from StreamPartition(streamId, groupbyspec) to StreamRouterSpec
private volatile Map<StreamPartition, StreamRouterSpec> cachedSRS = new HashMap<>();
- private volatile Map<String,StreamDefinition> sdf = new HashMap<>();
- private PartitionedEventSerializer serializer;
- public StreamRouterBolt(StreamRouter router, Config config, IMetadataChangeNotifyService changeNotifyService) {
- super(changeNotifyService, config);
- this.router = router;
+ public StreamRouterBolt(String boltId, Config config, IMetadataChangeNotifyService changeNotifyService) {
+ super(boltId, changeNotifyService, config);
+ this.router = new StreamRouterImpl(boltId + "-router");
}
- private StreamContext streamContext;
@Override
public void internalPrepare(OutputCollector collector, IMetadataChangeNotifyService changeNotifyService, Config config, TopologyContext context) {
streamContext = new StreamContextImpl(config,context.registerMetric("eagle.router",new MultiCountMetric(),60),context);
- serializer= Serializers.newPartitionedEventSerializer(this);
- routeCollector = new StreamRouterBoltOutputCollector(this.router.getName(),collector,this.getOutputStreamIds(),streamContext,serializer);
+ routeCollector = new StreamRouterBoltOutputCollector(getBoltId(),collector,this.getOutputStreamIds(),streamContext,serializer);
router.prepare(streamContext, routeCollector);
changeNotifyService.registerListener(this);
changeNotifyService.init(config, MetadataType.STREAM_ROUTER_BOLT);
}
- PartitionedEvent deserialize(Object object) throws IOException {
- // byte[] in higher priority
- if(object instanceof byte[]) {
- return serializer.deserialize((byte[]) object);
- } else {
- return (PartitionedEvent) object;
- }
- }
-
@Override
public void execute(Tuple input) {
try {
@@ -184,6 +155,7 @@ public class StreamRouterBolt extends AbstractStreamBolt implements StreamRouter
// switch cache
cachedSRS = newSRS;
sdf = sds;
+ specVersion = spec.getVersion();
}
/**
@@ -206,8 +178,8 @@ public class StreamRouterBolt extends AbstractStreamBolt implements StreamRouter
}
}
- @Override
- public StreamDefinition getStreamDefinition(String streamId) {
- return this.sdf.get(streamId);
+ public StreamRouter getStreamRouter() {
+ return router;
}
+
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/994a1e58/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/UnitTopologyRunner.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/UnitTopologyRunner.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/UnitTopologyRunner.java
index 5a937f2..87faf82 100755
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/UnitTopologyRunner.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/UnitTopologyRunner.java
@@ -19,20 +19,6 @@
package org.apache.eagle.alert.engine.runner;
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.eagle.alert.engine.coordinator.IMetadataChangeNotifyService;
-import org.apache.eagle.alert.engine.coordinator.impl.ZKMetadataChangeNotifyService;
-import org.apache.eagle.alert.engine.evaluator.impl.PolicyGroupEvaluatorImpl;
-import org.apache.eagle.alert.engine.publisher.impl.AlertPublisherImpl;
-import org.apache.eagle.alert.engine.router.impl.StreamRouterImpl;
-import org.apache.eagle.alert.engine.spout.CorrelationSpout;
-import org.apache.eagle.alert.utils.AlertConstants;
-import org.apache.eagle.alert.utils.StreamIdConversion;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
import backtype.storm.LocalCluster;
import backtype.storm.StormSubmitter;
import backtype.storm.generated.StormTopology;
@@ -40,9 +26,18 @@ import backtype.storm.topology.BoltDeclarer;
import backtype.storm.topology.TopologyBuilder;
import backtype.storm.tuple.Fields;
import backtype.storm.utils.Utils;
-
import com.typesafe.config.Config;
import com.typesafe.config.ConfigRenderOptions;
+import org.apache.eagle.alert.engine.coordinator.IMetadataChangeNotifyService;
+import org.apache.eagle.alert.engine.coordinator.impl.ZKMetadataChangeNotifyService;
+import org.apache.eagle.alert.engine.spout.CorrelationSpout;
+import org.apache.eagle.alert.utils.AlertConstants;
+import org.apache.eagle.alert.utils.StreamIdConversion;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
/**
* By default
@@ -86,11 +81,8 @@ public class UnitTopologyRunner {
int numOfPublishTasks,
Config config) {
- StreamRouterImpl[] routers = new StreamRouterImpl[numOfRouterBolts];
StreamRouterBolt[] routerBolts = new StreamRouterBolt[numOfRouterBolts];
- PolicyGroupEvaluatorImpl[] evaluators = new PolicyGroupEvaluatorImpl[numOfAlertBolts];
AlertBolt[] alertBolts = new AlertBolt[numOfAlertBolts];
- AlertPublisherImpl publisher;
AlertPublisherBolt publisherBolt;
TopologyBuilder builder = new TopologyBuilder();
@@ -102,19 +94,16 @@ public class UnitTopologyRunner {
// construct StreamRouterBolt objects
for(int i=0; i<numOfRouterBolts; i++){
- routers[i] = new StreamRouterImpl(streamRouterBoltNamePrefix + i);
- routerBolts[i] = new StreamRouterBolt(routers[i], config, getMetadataChangeNotifyService());
+ routerBolts[i] = new StreamRouterBolt(streamRouterBoltNamePrefix + i, config, getMetadataChangeNotifyService());
}
// construct AlertBolt objects
for(int i=0; i<numOfAlertBolts; i++){
- evaluators[i] = new PolicyGroupEvaluatorImpl(alertBoltNamePrefix + i);
- alertBolts[i] = new AlertBolt(alertBoltNamePrefix+i, evaluators[i], config, getMetadataChangeNotifyService());
+ alertBolts[i] = new AlertBolt(alertBoltNamePrefix+i, config, getMetadataChangeNotifyService());
}
// construct AlertPublishBolt object
- publisher = new AlertPublisherImpl(alertPublishBoltName);
- publisherBolt = new AlertPublisherBolt(publisher, config, getMetadataChangeNotifyService());
+ publisherBolt = new AlertPublisherBolt(alertPublishBoltName, config, getMetadataChangeNotifyService());
// connect spout and router bolt, also define output streams for downstreaming alert bolt
for(int i=0; i<numOfRouterBolts; i++){
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/994a1e58/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/router/TestAlertBolt.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/router/TestAlertBolt.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/router/TestAlertBolt.java
index 3bd7b9b..7dea167 100755
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/router/TestAlertBolt.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/router/TestAlertBolt.java
@@ -167,7 +167,7 @@ public class TestAlertBolt {
Config config = ConfigFactory.load();
PolicyGroupEvaluator policyGroupEvaluator = new PolicyGroupEvaluatorImpl("testPolicyGroupEvaluatorImpl");
TestStreamRouterBolt.MockChangeService mockChangeService = new TestStreamRouterBolt.MockChangeService();
- AlertBolt bolt = new AlertBolt("alertBolt1", policyGroupEvaluator, config, mockChangeService);
+ AlertBolt bolt = new AlertBolt("alertBolt1", config, mockChangeService);
Map stormConf = new HashMap<>();
TopologyContext topologyContext = mock(TopologyContext.class);
when(topologyContext.registerMetric(any(String.class), any(MultiCountMetric.class), any(int.class))).thenReturn(new MultiCountMetric());
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/994a1e58/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/router/TestAlertPublisherBolt.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/router/TestAlertPublisherBolt.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/router/TestAlertPublisherBolt.java
index 23ddd69..01f6de7 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/router/TestAlertPublisherBolt.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/router/TestAlertPublisherBolt.java
@@ -87,8 +87,7 @@ public class TestAlertPublisherBolt {
comparator.compare();
Assert.assertTrue(comparator.getModified().size() == 1);
- AlertPublisher alertPublisher = new AlertPublisherImpl("alert-publisher-test");
- AlertPublisherBolt publisherBolt = new AlertPublisherBolt(alertPublisher, null, null);
+ AlertPublisherBolt publisherBolt = new AlertPublisherBolt("alert-publisher-test", null, null);
publisherBolt.onAlertPublishSpecChange(spec1, null);
publisherBolt.onAlertPublishSpecChange(spec2, null);
}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/994a1e58/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/runner/TestStreamRouterBolt.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/runner/TestStreamRouterBolt.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/runner/TestStreamRouterBolt.java
index 2de2073..13550e1 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/runner/TestStreamRouterBolt.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/runner/TestStreamRouterBolt.java
@@ -42,6 +42,7 @@ import org.apache.eagle.alert.engine.model.PartitionedEvent;
import org.apache.eagle.alert.engine.model.StreamEvent;
import org.apache.eagle.alert.engine.router.impl.StreamRouterImpl;
import org.apache.eagle.alert.utils.DateTimeUtil;
+import org.apache.eagle.alert.utils.StreamIdConversion;
import org.joda.time.Period;
import org.junit.Assert;
import org.junit.Test;
@@ -77,9 +78,8 @@ public class TestStreamRouterBolt {
@Test
public void testRouterWithSortAndRouteSpec() throws Exception{
Config config = ConfigFactory.load();
- StreamRouterImpl routerImpl = new StreamRouterImpl("testStreamRouterImpl");
MockChangeService mockChangeService = new MockChangeService();
- StreamRouterBolt bolt = new StreamRouterBolt(routerImpl, config, mockChangeService);
+ StreamRouterBolt routerBolt = new StreamRouterBolt("routerBolt1", config, mockChangeService);
final Map<String,List<PartitionedEvent>> streamCollected = new HashMap<>();
final List<PartitionedEvent> orderCollected = new ArrayList<>();
@@ -90,7 +90,7 @@ public class TestStreamRouterBolt {
public List<Integer> emit(String streamId, Collection<Tuple> anchors, List<Object> tuple) {
PartitionedEvent event;
try {
- event = bolt.deserialize(tuple.get(0));
+ event = routerBolt.deserialize(tuple.get(0));
} catch (IOException e) {
throw new RuntimeException(e);
}
@@ -120,7 +120,7 @@ public class TestStreamRouterBolt {
Map stormConf = new HashMap<>();
TopologyContext topologyContext = mock(TopologyContext.class);
when(topologyContext.registerMetric(any(String.class), any(MultiCountMetric.class), any(int.class))).thenReturn(new MultiCountMetric());
- bolt.prepare(stormConf, topologyContext, collector);
+ routerBolt.prepare(stormConf, topologyContext, collector);
String streamId = "cpuUsageStream";
// StreamPartition, groupby col1 for stream cpuUsageStream
@@ -147,6 +147,7 @@ public class TestStreamRouterBolt {
queue.setWorkers(Arrays.asList(new WorkSlot("testTopology","alertBolt1"), new WorkSlot("testTopology","alertBolt2")));
routerSpec.setTargetQueue(Collections.singletonList(queue));
boltSpec.addRouterSpec(routerSpec);
+ boltSpec.setVersion("version1");
// construct StreamDefinition
StreamDefinition schema = new StreamDefinition();
@@ -158,8 +159,8 @@ public class TestStreamRouterBolt {
Map<String, StreamDefinition> sds = new HashMap<>();
sds.put(schema.getStreamId(), schema);
- bolt.declareOutputStreams(Arrays.asList("alertBolt1", "alertBolt2"));
- bolt.onStreamRouteBoltSpecChange(boltSpec, sds);
+ routerBolt.declareOutputStreams(Arrays.asList("alertBolt1", "alertBolt2"));
+ routerBolt.onStreamRouteBoltSpecChange(boltSpec, sds);
GeneralTopologyContext context = mock(GeneralTopologyContext.class);
int taskId = 1;
when(context.getComponentId(taskId)).thenReturn("comp1");
@@ -185,7 +186,7 @@ public class TestStreamRouterBolt {
pEvent.setEvent(event);
pEvent.setPartition(sp);
Tuple input = new TupleImpl(context, Collections.singletonList(pEvent), taskId, "default");
- bolt.execute(input);
+ routerBolt.execute(input);
// construct another event with "value2"
event = new StreamEvent();
@@ -197,7 +198,7 @@ public class TestStreamRouterBolt {
pEvent.setPartition(sp);
pEvent.setEvent(event);
input = new TupleImpl(context, Collections.singletonList(pEvent), taskId, "default");
- bolt.execute(input);
+ routerBolt.execute(input);
// construct another event with "value3"
event = new StreamEvent();
@@ -209,7 +210,7 @@ public class TestStreamRouterBolt {
pEvent.setPartition(sp);
pEvent.setEvent(event);
input = new TupleImpl(context, Collections.singletonList(pEvent), taskId, "default");
- bolt.execute(input);
+ routerBolt.execute(input);
// construct another event with "value4"
event = new StreamEvent();
@@ -221,7 +222,7 @@ public class TestStreamRouterBolt {
pEvent.setPartition(sp);
pEvent.setEvent(event);
input = new TupleImpl(context, Collections.singletonList(pEvent), taskId, "default");
- bolt.execute(input);
+ routerBolt.execute(input);
// construct another event with "value5", which will be thrown because two late
event = new StreamEvent();
@@ -233,18 +234,20 @@ public class TestStreamRouterBolt {
pEvent.setPartition(sp);
pEvent.setEvent(event);
input = new TupleImpl(context, Collections.singletonList(pEvent), taskId, "default");
- bolt.execute(input);
+ routerBolt.execute(input);
Assert.assertEquals("Should finally collect two streams",2,streamCollected.size());
- Assert.assertTrue("Should collect stream stream_testStreamRouterImpl_to_alertBolt1",streamCollected.keySet().contains("stream_testStreamRouterImpl_to_alertBolt1"));
- Assert.assertTrue("Should collect stream stream_testStreamRouterImpl_to_alertBolt2",streamCollected.keySet().contains("stream_testStreamRouterImpl_to_alertBolt2"));
+ Assert.assertTrue("Should collect stream stream_routerBolt_to_alertBolt1",streamCollected.keySet().contains(
+ String.format(StreamIdConversion.generateStreamIdBetween(routerBolt.getBoltId(), "alertBolt1"))));
+ Assert.assertTrue("Should collect stream stream_routerBolt_to_alertBolt2",streamCollected.keySet().contains(
+ String.format(StreamIdConversion.generateStreamIdBetween(routerBolt.getBoltId(), "alertBolt2"))));
Assert.assertEquals("Should finally collect 3 events",3,orderCollected.size());
Assert.assertArrayEquals("Should sort 3 events in ASC order",new String[]{"value2","value1","value3"},orderCollected.stream().map((d)->d.getData()[0]).toArray());
// The first 3 events are ticked automatically by window
- bolt.cleanup();
+ routerBolt.cleanup();
// Close will flush all events in memory, so will receive the last event which is still in memory as window is not expired according to clock
// The 5th event will be thrown because too late and out of margin
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/994a1e58/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata-service/src/main/java/org/apache/eagle/service/metadata/resource/MetadataResource.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata-service/src/main/java/org/apache/eagle/service/metadata/resource/MetadataResource.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata-service/src/main/java/org/apache/eagle/service/metadata/resource/MetadataResource.java
index 8712241..1799fa8 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata-service/src/main/java/org/apache/eagle/service/metadata/resource/MetadataResource.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata-service/src/main/java/org/apache/eagle/service/metadata/resource/MetadataResource.java
@@ -54,6 +54,10 @@ public class MetadataResource {
// private IMetadataDao dao = MetadataDaoFactory.getInstance().getMetadataDao();
private final IMetadataDao dao;
+ public MetadataResource(){
+ this.dao = MetadataDaoFactory.getInstance().getMetadataDao();;
+ }
+
@Inject
public MetadataResource(IMetadataDao dao){
this.dao = dao;