You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@eagle.apache.org by ha...@apache.org on 2016/07/07 00:53:35 UTC
[5/6] incubator-eagle git commit: Rebase code base
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/72a1501c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/UnitTopologyMain.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/UnitTopologyMain.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/UnitTopologyMain.java
index 2bca329..57529b6 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/UnitTopologyMain.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/UnitTopologyMain.java
@@ -19,6 +19,10 @@
package org.apache.eagle.alert.engine;
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.CommandLineParser;
+import org.apache.commons.cli.DefaultParser;
+import org.apache.commons.cli.Options;
import org.apache.eagle.alert.config.ZKConfig;
import org.apache.eagle.alert.config.ZKConfigBuilder;
import org.apache.eagle.alert.engine.coordinator.impl.ZKMetadataChangeNotifyService;
@@ -40,19 +44,43 @@ import com.typesafe.config.ConfigFactory;
*/
public class UnitTopologyMain {
- public static void main(String[] args) {
+ public static void main(String[] args) throws Exception {
+ // command line parse
+ Options options = new Options();
+ options.addOption("c", true,
+ "config URL (valid file name) - defaults application.conf according to typesafe config default behavior.");
+ CommandLineParser parser = new DefaultParser();
+ CommandLine cmd = parser.parse(options, args);
+
+ if (cmd.hasOption("c")) {
+ String fileName = cmd.getOptionValue("c", "application.conf");
+ System.setProperty("config.resource", fileName.startsWith("/") ? fileName : "/" + fileName);
+ ConfigFactory.invalidateCaches();
+ }
Config config = ConfigFactory.load();
- ZKConfig zkConfig = ZKConfigBuilder.getZKConfig(config);
- String topologyId = config.getString("topology.name");
- ZKMetadataChangeNotifyService changeNotifyService = new ZKMetadataChangeNotifyService(zkConfig, topologyId);
+ // load config and start
+ String topologyId = config.getString("topology.name");
+ ZKMetadataChangeNotifyService changeNotifyService = createZKNotifyService(config, topologyId);
new UnitTopologyRunner(changeNotifyService).run(topologyId, config);
}
+
+ public static void runTopology(Config config, backtype.storm.Config stormConfig) {
+ // load config and start
+ String topologyId = config.getString("topology.name");
+ ZKMetadataChangeNotifyService changeNotifyService = createZKNotifyService(config, topologyId);
+ new UnitTopologyRunner(changeNotifyService, stormConfig).run(topologyId, config);
+ }
- public static StormTopology createTopology(Config config) {
+ private static ZKMetadataChangeNotifyService createZKNotifyService(Config config, String topologyId) {
ZKConfig zkConfig = ZKConfigBuilder.getZKConfig(config);
- String topologyId = config.getString("topology.name");
ZKMetadataChangeNotifyService changeNotifyService = new ZKMetadataChangeNotifyService(zkConfig, topologyId);
+ return changeNotifyService;
+ }
+
+ public static StormTopology createTopology(Config config) {
+ String topologyId = config.getString("topology.name");
+ ZKMetadataChangeNotifyService changeNotifyService = createZKNotifyService(config, topologyId);
return new UnitTopologyRunner(changeNotifyService).buildTopology(topologyId, config);
}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/72a1501c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/PolicyStreamHandlers.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/PolicyStreamHandlers.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/PolicyStreamHandlers.java
index 2aa70e8..e8f736c 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/PolicyStreamHandlers.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/PolicyStreamHandlers.java
@@ -20,13 +20,17 @@ import java.util.Map;
import org.apache.eagle.alert.engine.coordinator.StreamDefinition;
import org.apache.eagle.alert.engine.evaluator.impl.SiddhiPolicyHandler;
+import org.apache.eagle.alert.engine.evaluator.nodata.NoDataPolicyHandler;
public class PolicyStreamHandlers {
public static final String SIDDHI_ENGINE ="siddhi";
+ public static final String NO_DATA_ALERT_ENGINE ="nodataalert";
public static PolicyStreamHandler createHandler(String type, Map<String, StreamDefinition> sds){
if(SIDDHI_ENGINE.equals(type)) {
return new SiddhiPolicyHandler(sds);
+ }else if(NO_DATA_ALERT_ENGINE.equals(type)){
+ return new NoDataPolicyHandler(sds);
}
throw new IllegalArgumentException("Illegal policy stream handler type: "+type);
}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/72a1501c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/impl/DistinctValuesInTimeWindow.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/impl/DistinctValuesInTimeWindow.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/impl/DistinctValuesInTimeWindow.java
new file mode 100644
index 0000000..8a681da
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/impl/DistinctValuesInTimeWindow.java
@@ -0,0 +1,141 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.alert.engine.evaluator.impl;
+
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.SortedMap;
+import java.util.TreeMap;
+
+import org.apache.commons.lang.builder.HashCodeBuilder;
+
+/**
+ * Since 6/28/16.
+ * to get distinct values within a specified time window
+ * valueMaxTimeMap : each distinct value is associated with max timestamp it ever had
+ * timeSortedMap : map sorted by timestamp first and then value
+ * With the above 2 data structure, we can get distinct values in LOG(N)
+ */
+public class DistinctValuesInTimeWindow {
+ public static class ValueAndTime{
+ Object value;
+ long timestamp;
+ public ValueAndTime(Object value, long timestamp){
+ this.value = value;
+ this.timestamp = timestamp;
+ }
+
+ public String toString(){
+ return "[" + value + "," + timestamp + "]";
+ }
+
+ public int hashCode(){
+ return new HashCodeBuilder().append(value).append(timestamp).toHashCode();
+ }
+
+ public boolean equals(Object that){
+ if(!(that instanceof ValueAndTime))
+ return false;
+ ValueAndTime another = (ValueAndTime)that;
+ return another.timestamp == this.timestamp && another.value.equals(this.value);
+ }
+ }
+
+ public static class ValueAndTimeComparator implements Comparator<ValueAndTime>{
+ @Override
+ public int compare(ValueAndTime o1, ValueAndTime o2) {
+ if(o1.timestamp != o2.timestamp)
+ return (o1.timestamp > o2.timestamp) ? 1 : -1;
+ if(o1.value.equals(o2.value))
+ return 0;
+ else {
+ // this is not strictly correct, but I don't want to write too many comparators here :-)
+ if(o1.hashCode() > o2.hashCode())
+ return 1;
+ else
+ return -1;
+ }
+ }
+ }
+
+ /**
+ * map from value to max timestamp for this value
+ */
+ private Map<Object, Long> valueMaxTimeMap = new HashMap<>();
+ /**
+ * map sorted by time(max timestamp for the value) and then value
+ */
+ private SortedMap<ValueAndTime, ValueAndTime> timeSortedMap = new TreeMap<>(new ValueAndTimeComparator());
+ private long maxTimestamp = 0L;
+ private long window;
+ private boolean windowSlided;
+
+ /**
+ * @param window - milliseconds
+ */
+ public DistinctValuesInTimeWindow(long window){
+ this.window = window;
+ }
+
+ public void send(Object value, long timestamp){
+ ValueAndTime vt = new ValueAndTime(value, timestamp);
+
+ // todo think of time out of order
+ if(valueMaxTimeMap.containsKey(value)){
+ // remove that entry with old timestamp in timeSortedMap
+ long oldTime = valueMaxTimeMap.get(value);
+ if(oldTime >= timestamp){
+ // no any effect as the new timestamp is equal or even less than old timestamp
+ return;
+ }
+ timeSortedMap.remove(new ValueAndTime(value, oldTime));
+ }
+ // insert entry with new timestamp in timeSortedMap
+ timeSortedMap.put(vt, vt);
+ // update new timestamp in valueMaxTimeMap
+ valueMaxTimeMap.put(value, timestamp);
+
+ // evict old entries
+ // store max timestamp if possible
+ maxTimestamp = Math.max(maxTimestamp, timestamp);
+
+ // check if some values should be evicted because of time window
+ Iterator<Map.Entry<ValueAndTime, ValueAndTime>> it = timeSortedMap.entrySet().iterator();
+ while(it.hasNext()){
+ Map.Entry<ValueAndTime, ValueAndTime> entry = it.next();
+ if(entry.getKey().timestamp < maxTimestamp - window){
+ // should remove the entry in valueMaxTimeMap and timeSortedMap
+ valueMaxTimeMap.remove(entry.getKey().value);
+ windowSlided = true;
+
+ it.remove();
+ }else {
+ break;
+ }
+ }
+ }
+
+ public Map<Object, Long> distinctValues(){
+ return valueMaxTimeMap;
+ }
+
+ public boolean windowSlided(){
+ return windowSlided;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/72a1501c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/impl/PolicyGroupEvaluatorImpl.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/impl/PolicyGroupEvaluatorImpl.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/impl/PolicyGroupEvaluatorImpl.java
index f063618..8a1f04a 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/impl/PolicyGroupEvaluatorImpl.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/impl/PolicyGroupEvaluatorImpl.java
@@ -90,7 +90,7 @@ public class PolicyGroupEvaluatorImpl implements PolicyGroupEvaluator {
policyStreamHandler.getValue().send(partitionedEvent.getEvent());
} catch (Exception e) {
this.context.counter().scope("fail_count").incr();
- LOG.error("{} failed to handle {}",policyStreamHandler.getValue(), partitionedEvent.getEvent());
+ LOG.error("{} failed to handle {}",policyStreamHandler.getValue(), partitionedEvent.getEvent(), e);
}
}
}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/72a1501c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/impl/SiddhiPolicyHandler.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/impl/SiddhiPolicyHandler.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/impl/SiddhiPolicyHandler.java
index dfa5612..ed26408 100755
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/impl/SiddhiPolicyHandler.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/impl/SiddhiPolicyHandler.java
@@ -76,7 +76,9 @@ public class SiddhiPolicyHandler implements PolicyStreamHandler {
*/
@Override
public void receive(Event[] events) {
- LOG.info("Generated {} alerts from policy '{}' in {}", events.length,context.getPolicyDefinition().getName(), context.getPolicyEvaluatorId());
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Generated {} alerts from policy '{}' in {}", events.length,context.getPolicyDefinition().getName(), context.getPolicyEvaluatorId());
+ }
for(Event e : events) {
AlertStreamEvent event = new AlertStreamEvent();
event.setTimestamp(e.getTimestamp());
@@ -131,6 +133,10 @@ public class SiddhiPolicyHandler implements PolicyStreamHandler {
if(inputHandler != null){
context.getPolicyCounter().scope(String.format("%s.%s",this.context.getPolicyDefinition().getName(),"eval_count")).incr();
inputHandler.send(event.getTimestamp(),event.getData());
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("sent event to siddhi stream {} ", streamId);
+ }
}else{
context.getPolicyCounter().scope(String.format("%s.%s",this.context.getPolicyDefinition().getName(),"drop_count")).incr();
LOG.warn("No input handler found for stream {}",streamId);
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/72a1501c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/nodata/NoDataPolicyHandler.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/nodata/NoDataPolicyHandler.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/nodata/NoDataPolicyHandler.java
new file mode 100644
index 0000000..ed13f71
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/nodata/NoDataPolicyHandler.java
@@ -0,0 +1,164 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.alert.engine.evaluator.nodata;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.eagle.alert.engine.Collector;
+import org.apache.eagle.alert.engine.coordinator.PolicyDefinition;
+import org.apache.eagle.alert.engine.coordinator.StreamDefinition;
+import org.apache.eagle.alert.engine.evaluator.PolicyHandlerContext;
+import org.apache.eagle.alert.engine.evaluator.PolicyStreamHandler;
+import org.apache.eagle.alert.engine.evaluator.impl.DistinctValuesInTimeWindow;
+import org.apache.eagle.alert.engine.model.AlertStreamEvent;
+import org.apache.eagle.alert.engine.model.StreamEvent;
+import org.apache.eagle.alert.utils.TimePeriodUtils;
+import org.joda.time.Period;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Since 6/28/16.
+ * No Data Policy engine
+ * based on the following information
+ * 1. stream definition: group by columns
+ * 2. timestamp field: timestamp column
+ * 3. wiri safe time window: how long window is good for full set of wiri
+ * 4. wisb: full set
+ *
+ * No data policy definition should include
+ * fixed fields and dynamic fields
+ * fixed fields are leading fields : windowPeriod, type, numOfFields, f1_name, f2_name
+ * dynamic fields depend on wisb type.
+ */
+public class NoDataPolicyHandler implements PolicyStreamHandler{
+ private static final Logger LOG = LoggerFactory.getLogger(NoDataPolicyHandler.class);
+ private Map<String, StreamDefinition> sds;
+
+ // wisb(what is should be) set for expected full set value of multiple columns
+ @SuppressWarnings("rawtypes")
+ private volatile Set wisbValues = null;
+ private volatile List<Integer> wisbFieldIndices = new ArrayList<>();
+ // reuse PolicyDefinition.defintion.value field to store full set of values separated by comma
+ private volatile PolicyDefinition policyDef;
+ private volatile DistinctValuesInTimeWindow distinctWindow;
+ private volatile Collector<AlertStreamEvent> collector;
+ private volatile PolicyHandlerContext context;
+ private volatile NoDataWisbType wisbType;
+
+ public NoDataPolicyHandler(Map<String, StreamDefinition> sds){
+ this.sds = sds;
+ }
+ @Override
+ public void prepare(Collector<AlertStreamEvent> collector, PolicyHandlerContext context) throws Exception {
+ this.collector = collector;
+ this.context = context;
+ this.policyDef = context.getPolicyDefinition();
+ List<String> inputStreams = policyDef.getInputStreams();
+ // validate inputStreams has to contain only one stream
+ if(inputStreams.size() != 1)
+ throw new IllegalArgumentException("policy inputStream size has to be 1 for no data alert");
+ // validate outputStream has to contain only one stream
+ if(policyDef.getOutputStreams().size() != 1)
+ throw new IllegalArgumentException("policy outputStream size has to be 1 for no data alert");
+
+ String is = inputStreams.get(0);
+ StreamDefinition sd = sds.get(is);
+
+ String policyValue = policyDef.getDefinition().getValue();
+ // assume that no data alert policy value consists of "windowPeriod, type, numOfFields, f1_name, f2_name, f1_value, f2_value, f1_value, f2_value}
+ String[] segments = policyValue.split(",");
+ long windowPeriod = TimePeriodUtils.getMillisecondsOfPeriod(Period.parse(segments[0]));
+ distinctWindow = new DistinctValuesInTimeWindow(windowPeriod);
+ this.wisbType = NoDataWisbType.valueOf(segments[1]);
+ // for provided wisb values, need to parse, for dynamic wisb values, it is computed through a window
+ if(wisbType == NoDataWisbType.provided) {
+ wisbValues = new NoDataWisbProvidedParser().parse(segments);
+ }
+ // populate wisb field names
+ int numOfFields = Integer.parseInt(segments[2]);
+ for(int i = 3; i < 3+numOfFields; i++){
+ String fn = segments[i];
+ wisbFieldIndices.add(sd.getColumnIndex(fn));
+ }
+ }
+
+ @SuppressWarnings({ "rawtypes", "unchecked" })
+ @Override
+ public void send(StreamEvent event) throws Exception {
+ Object[] data = event.getData();
+ List<Object> columnValues = new ArrayList<>();
+ for(int i=0; i<wisbFieldIndices.size(); i++){
+ Object o = data[wisbFieldIndices.get(i)];
+ // convert value to string
+ columnValues.add(o.toString());
+ }
+ distinctWindow.send(columnValues, event.getTimestamp());
+ Set wiriValues = distinctWindow.distinctValues().keySet();
+
+ LOG.debug("window slided: {}, with wiri: {}", distinctWindow.windowSlided(), distinctWindow.distinctValues());
+
+ if(distinctWindow.windowSlided()) {
+ compareAndEmit(wisbValues, wiriValues, event);
+ }
+
+ if(wisbType == NoDataWisbType.dynamic) {
+ // deep copy
+ wisbValues = new HashSet<>(wiriValues);
+ }
+ }
+
+ @SuppressWarnings("rawtypes")
+ private void compareAndEmit(Set wisb, Set wiri, StreamEvent event){
+ // compare with wisbValues if wisbValues are already there for dynamic type
+ Collection noDataValues = CollectionUtils.subtract(wisb, wiri);
+ LOG.debug("nodatavalues:" + noDataValues + ", wisb: " + wisb + ", wiri: " + wiri);
+ if (noDataValues != null && noDataValues.size() > 0) {
+ LOG.info("No data alert is triggered with no data values {} and wisb {}", noDataValues, wisbValues);
+ AlertStreamEvent alertEvent = createAlertEvent(event.getTimestamp(), event.getData());
+ collector.emit(alertEvent);
+ }
+ }
+
+ private AlertStreamEvent createAlertEvent(long timestamp, Object[] triggerEvent){
+ String is = policyDef.getInputStreams().get(0);
+ StreamDefinition sd = sds.get(is);
+
+ AlertStreamEvent event = new AlertStreamEvent();
+ event.setTimestamp(timestamp);
+ event.setData(triggerEvent);
+ event.setStreamId(policyDef.getOutputStreams().get(0));
+ event.setPolicy(context.getPolicyDefinition());
+ if (this.context.getParentEvaluator() != null) {
+ event.setCreatedBy(context.getParentEvaluator().getName());
+ }
+ event.setCreatedTime(System.currentTimeMillis());
+ event.setSchema(sd);
+ return event;
+ }
+
+ @Override
+ public void close() throws Exception {
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/72a1501c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/nodata/NoDataWisbParser.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/nodata/NoDataWisbParser.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/nodata/NoDataWisbParser.java
new file mode 100644
index 0000000..fe06067
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/nodata/NoDataWisbParser.java
@@ -0,0 +1,33 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.alert.engine.evaluator.nodata;
+
+import java.util.List;
+import java.util.Set;
+
+/**
+ * Since 6/29/16.
+ */
+public interface NoDataWisbParser {
+ /**
+ * parse policy definition and return WISB values for one or multiple fields
+ * for example host and data center are 2 fields for no data alert, then WISB is a list of two values
+ * @param args some information parsed from policy defintion
+ * @return list of list of field values
+ */
+ Set<List<String>> parse(String[] args);
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/72a1501c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/nodata/NoDataWisbProvidedParser.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/nodata/NoDataWisbProvidedParser.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/nodata/NoDataWisbProvidedParser.java
new file mode 100644
index 0000000..e13826a
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/nodata/NoDataWisbProvidedParser.java
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.alert.engine.evaluator.nodata;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+/**
+ * Since 6/29/16.
+ */
+public class NoDataWisbProvidedParser implements NoDataWisbParser{
+ @Override
+ /**
+ * policy value consists of "windowPeriod, type, numOfFields, f1_name, f2_name, f1_value, f2_value, f1_value, f2_value"
+ */
+ public Set<List<String>> parse(String[] args) {
+ int numOfFields = Integer.parseInt(args[2]);
+ Set<List<String>> wisbValues = new HashSet<>();
+ int i = 3 + numOfFields;
+ while(i<args.length){
+ List<String> fields = new ArrayList<>();
+ for(int j=0; j<numOfFields; j++){
+ fields.add(args[i+j]);
+ }
+ wisbValues.add(fields);
+ i += numOfFields;
+ }
+ return wisbValues;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/72a1501c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/nodata/NoDataWisbType.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/nodata/NoDataWisbType.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/nodata/NoDataWisbType.java
new file mode 100644
index 0000000..887d099
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/nodata/NoDataWisbType.java
@@ -0,0 +1,25 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.alert.engine.evaluator.nodata;
+
+/**
+ * Since 6/29/16.
+ */
+public enum NoDataWisbType {
+ provided,
+ dynamic
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/72a1501c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/AlertPublishPlugin.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/AlertPublishPlugin.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/AlertPublishPlugin.java
index 644fe2b..d24bdb0 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/AlertPublishPlugin.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/AlertPublishPlugin.java
@@ -17,25 +17,29 @@
*/
package org.apache.eagle.alert.engine.publisher;
+import java.io.Closeable;
+import java.util.Map;
+
import org.apache.eagle.alert.engine.coordinator.Publishment;
import org.apache.eagle.alert.engine.model.AlertStreamEvent;
import org.apache.eagle.alert.engine.publisher.impl.PublishStatus;
import com.typesafe.config.Config;
-import java.io.Closeable;
-import java.util.Map;
-
/**
* Created on 2/10/16.
* Notification Plug-in interface which provide abstraction layer to notify to different system
*/
public interface AlertPublishPlugin extends Closeable {
/**
- * for initialization
+ *
+ * @param config
+ * @param publishment
+ * @param configProperties - storm config that would be useful for some implementation
* @throws Exception
*/
- void init(Config config, Publishment publishment) throws Exception;
+ @SuppressWarnings("rawtypes")
+ void init(Config config, Publishment publishment, Map configProperties) throws Exception;
void update(String dedupIntervalMin, Map<String, String> pluginProperties);
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/72a1501c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/AlertPublisher.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/AlertPublisher.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/AlertPublisher.java
index 7a44009..5c0e597 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/AlertPublisher.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/AlertPublisher.java
@@ -2,6 +2,7 @@ package org.apache.eagle.alert.engine.publisher;
import java.io.Serializable;
+import java.util.Map;
import org.apache.eagle.alert.engine.model.AlertStreamEvent;
@@ -24,7 +25,8 @@ import com.typesafe.config.Config;
* limitations under the License.
*/
public interface AlertPublisher extends AlertPublishListener, Serializable {
- void init(Config config);
+ @SuppressWarnings("rawtypes")
+ void init(Config config, Map stormConfig);
String getName();
void nextEvent(AlertStreamEvent event);
void close();
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/72a1501c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AbstractPublishPlugin.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AbstractPublishPlugin.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AbstractPublishPlugin.java
new file mode 100644
index 0000000..bd21415
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AbstractPublishPlugin.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.alert.engine.publisher.impl;
+
+import java.util.Map;
+
+import org.apache.eagle.alert.engine.codec.IEventSerializer;
+import org.apache.eagle.alert.engine.coordinator.Publishment;
+import org.apache.eagle.alert.engine.model.AlertStreamEvent;
+import org.apache.eagle.alert.engine.publisher.AlertDeduplicator;
+import org.apache.eagle.alert.engine.publisher.AlertPublishPlugin;
+import org.slf4j.Logger;
+
+import com.typesafe.config.Config;
+
+/**
+ * @since Jun 3, 2016
+ *
+ */
+public abstract class AbstractPublishPlugin implements AlertPublishPlugin {
+
+ protected AlertDeduplicator deduplicator;
+ protected PublishStatus status;
+ protected IEventSerializer serializer;
+ protected String pubName;
+
+ @SuppressWarnings("rawtypes")
+ @Override
+ public void init(Config config, Publishment publishment, Map conf) throws Exception {
+ this.deduplicator = new DefaultDeduplicator(publishment.getDedupIntervalMin());
+ this.pubName = publishment.getName();
+ String serializerClz = publishment.getSerializer();
+ try {
+ Object obj = Class.forName(serializerClz).getConstructor(Map.class).newInstance(conf);
+ if (!(obj instanceof IEventSerializer)) {
+ throw new Exception(String.format("serializer %s of publishement %s is not subclass to %s!",
+ publishment.getSerializer(),
+ publishment.getName(),
+ IEventSerializer.class.getName()));
+ }
+ serializer = (IEventSerializer) obj;
+ } catch (Exception e) {
+ getLogger().error(String.format("initialized failed, use default StringEventSerializer, failure message : {}", e.getMessage()), e);
+ serializer = new StringEventSerializer(conf);
+ }
+ }
+
+ @Override
+ public void update(String dedupIntervalMin, Map<String, String> pluginProperties) {
+ deduplicator.setDedupIntervalMin(dedupIntervalMin);
+ }
+
+ @Override
+ public AlertStreamEvent dedup(AlertStreamEvent event) {
+ return deduplicator.dedup(event);
+ }
+
+ @Override
+ public PublishStatus getStatus() {
+ return status;
+ }
+
+ protected abstract Logger getLogger();
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/72a1501c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertEagleStorePersister.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertEagleStorePersister.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertEagleStorePersister.java
deleted file mode 100644
index 2a4e332..0000000
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertEagleStorePersister.java
+++ /dev/null
@@ -1,60 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.eagle.alert.engine.publisher.impl;
-
-import java.util.List;
-
-import org.apache.eagle.alert.engine.model.StreamEvent;
-import org.apache.eagle.alert.service.IMetadataServiceClient;
-import org.apache.eagle.alert.service.MetadataServiceClientImpl;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.typesafe.config.Config;
-
-/**
- * Alert API entity Persistor
- */
-public class AlertEagleStorePersister {
- private static Logger LOG = LoggerFactory.getLogger(AlertEagleStorePersister.class);
- private Config config;
-
- public AlertEagleStorePersister(Config config) {
- this.config = config;
- }
-
- /**
- * Persist passes list of Entities
- * @param list
- * @return
- */
- public boolean doPersist(List<? extends StreamEvent> list) {
- if (list.isEmpty()) return false;
- LOG.info("Going to persist entities, type: " + " " + list.get(0).getClass().getSimpleName() + ", list size: " + list.size());
- try {
- IMetadataServiceClient client = new MetadataServiceClientImpl(config);
- // TODO: metadata service support
- }
- catch (Exception ex) {
- LOG.error("Got an exception in persisting entities", ex);
- return false;
- }
- return false;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/72a1501c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertEagleStorePublisher.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertEagleStorePublisher.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertEagleStorePublisher.java
deleted file mode 100644
index 807aacc..0000000
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertEagleStorePublisher.java
+++ /dev/null
@@ -1,113 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.eagle.alert.engine.publisher.impl;
-
-import java.util.Arrays;
-import java.util.Map;
-
-import org.apache.commons.lang3.builder.HashCodeBuilder;
-import org.apache.eagle.alert.engine.coordinator.Publishment;
-import org.apache.eagle.alert.engine.model.AlertStreamEvent;
-import org.apache.eagle.alert.engine.publisher.AlertDeduplicator;
-import org.apache.eagle.alert.engine.publisher.AlertPublishPlugin;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.typesafe.config.Config;
-
-/**
- * Plugin to persist alerts to Eagle Storage
- */
-public class AlertEagleStorePublisher implements AlertPublishPlugin {
-
- private static final Logger LOG = LoggerFactory.getLogger(AlertEagleStorePublisher.class);
- private PublishStatus status;
- private AlertEagleStorePersister persist;
- private AlertDeduplicator deduplicator;
-
- @Override
- public void init(Config config, Publishment publishment) throws Exception {
- this.persist = new AlertEagleStorePersister(config);
- deduplicator = new DefaultDeduplicator(publishment.getDedupIntervalMin());
- LOG.info("initialized plugin for EagleStorePlugin");
- }
-
- @Override
- public void update(String dedupIntervalMin, Map<String, String> pluginProperties) {
- deduplicator.setDedupIntervalMin(dedupIntervalMin);
- }
-
- @Override
- public PublishStatus getStatus() {
- return this.status;
- }
-
- @Override
- public AlertStreamEvent dedup(AlertStreamEvent event) {
- return deduplicator.dedup(event);
- }
-
- /**
- * Persist AlertEntity to alert_details table
- * @param event
- */
- @Override
- public void onAlert(AlertStreamEvent event) {
- LOG.info("write alert to eagle storage " + event);
- event = dedup(event);
- if(event == null) {
- return;
- }
- PublishStatus status = new PublishStatus();
- try{
- boolean result = persist.doPersist(Arrays.asList(event));
- if(result) {
- status.successful = true;
- status.errorMessage = "";
- }else{
- status.successful = false;
- status.errorMessage = "";
- }
- }catch (Exception ex ){
- status.successful = false;
- status.errorMessage = ex.getMessage();
- LOG.error("Fail writing alert entity to Eagle Store", ex);
- }
- this.status = status;
- }
-
- @Override
- public void close() {
-
- }
-
- @Override
- public int hashCode(){
- return new HashCodeBuilder().append(getClass().getCanonicalName()).toHashCode();
- }
-
- @Override
- public boolean equals(Object o){
- if(o == this)
- return true;
- if(!(o instanceof AlertEagleStorePublisher))
- return false;
- return true;
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/72a1501c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertEmailPublisher.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertEmailPublisher.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertEmailPublisher.java
index efe29bc..9d191c0 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertEmailPublisher.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertEmailPublisher.java
@@ -27,8 +27,6 @@ import java.util.concurrent.TimeUnit;
import org.apache.commons.lang3.builder.HashCodeBuilder;
import org.apache.eagle.alert.engine.coordinator.Publishment;
import org.apache.eagle.alert.engine.model.AlertStreamEvent;
-import org.apache.eagle.alert.engine.publisher.AlertDeduplicator;
-import org.apache.eagle.alert.engine.publisher.AlertPublishPlugin;
import org.apache.eagle.alert.engine.publisher.PublishConstants;
import org.apache.eagle.alert.engine.publisher.email.AlertEmailGenerator;
import org.apache.eagle.alert.engine.publisher.email.AlertEmailGeneratorBuilder;
@@ -37,27 +35,28 @@ import org.slf4j.LoggerFactory;
import com.typesafe.config.Config;
-public class AlertEmailPublisher implements AlertPublishPlugin {
+public class AlertEmailPublisher extends AbstractPublishPlugin {
private static final Logger LOG = LoggerFactory.getLogger(AlertEmailPublisher.class);
- private AlertEmailGenerator emailGenerator;
- private AlertDeduplicator deduplicator;
- private Map<String, String> emailConfig;
private final static int DEFAULT_THREAD_POOL_CORE_SIZE = 4;
private final static int DEFAULT_THREAD_POOL_MAX_SIZE = 8;
private final static long DEFAULT_THREAD_POOL_SHRINK_TIME = 60000L; // 1 minute
+
+ private AlertEmailGenerator emailGenerator;
+ private Map<String, String> emailConfig;
+
private transient ThreadPoolExecutor executorPool;
- private PublishStatus status;
@Override
- public void init(Config config, Publishment publishment) throws Exception {
+ @SuppressWarnings("rawtypes")
+ public void init(Config config, Publishment publishment, Map conf) throws Exception {
+ super.init(config, publishment, conf);
executorPool = new ThreadPoolExecutor(DEFAULT_THREAD_POOL_CORE_SIZE, DEFAULT_THREAD_POOL_MAX_SIZE, DEFAULT_THREAD_POOL_SHRINK_TIME, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());
LOG.info(" Creating Email Generator... ");
if (publishment.getProperties() != null) {
emailConfig = new HashMap<>(publishment.getProperties());
emailGenerator = createEmailGenerator(emailConfig);
}
- deduplicator = new DefaultDeduplicator(publishment.getDedupIntervalMin());
}
@Override
@@ -84,7 +83,8 @@ public class AlertEmailPublisher implements AlertPublishPlugin {
@Override
public void update(String dedupIntervalMin, Map<String, String> pluginProperties) {
- deduplicator.setDedupIntervalMin(dedupIntervalMin);
+ super.update(dedupIntervalMin, pluginProperties);
+
if (pluginProperties != null && ! emailConfig.equals(pluginProperties)) {
emailConfig = new HashMap<>(pluginProperties);
emailGenerator = createEmailGenerator(pluginProperties);
@@ -96,16 +96,6 @@ public class AlertEmailPublisher implements AlertPublishPlugin {
this.executorPool.shutdown();
}
- @Override
- public PublishStatus getStatus() {
- return this.status;
- }
-
- @Override
- public AlertStreamEvent dedup(AlertStreamEvent event) {
- return deduplicator.dedup(event);
- }
-
/**
* @param notificationConfig
* @return
@@ -148,4 +138,9 @@ public class AlertEmailPublisher implements AlertPublishPlugin {
return false;
return true;
}
+
+ @Override
+ protected Logger getLogger() {
+ return LOG;
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/72a1501c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertKafkaPublisher.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertKafkaPublisher.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertKafkaPublisher.java
index ea65298..2566f79 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertKafkaPublisher.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertKafkaPublisher.java
@@ -26,8 +26,6 @@ import java.util.concurrent.TimeUnit;
import org.apache.eagle.alert.engine.coordinator.Publishment;
import org.apache.eagle.alert.engine.model.AlertStreamEvent;
-import org.apache.eagle.alert.engine.publisher.AlertDeduplicator;
-import org.apache.eagle.alert.engine.publisher.AlertPublishPlugin;
import org.apache.eagle.alert.engine.publisher.PublishConstants;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
@@ -36,30 +34,30 @@ import org.slf4j.LoggerFactory;
import com.typesafe.config.Config;
-public class AlertKafkaPublisher implements AlertPublishPlugin {
+public class AlertKafkaPublisher extends AbstractPublishPlugin {
private static final Logger LOG = LoggerFactory.getLogger(AlertKafkaPublisher.class);
- private AlertDeduplicator deduplicator;
- private PublishStatus status;
+ private static final long MAX_TIMEOUT_MS = 60000;
+
@SuppressWarnings("rawtypes")
private KafkaProducer producer;
private String brokerList;
private String topic;
- private final static long MAX_TIMEOUT_MS =60000;
-
@Override
- public void init(Config config, Publishment publishment) throws Exception {
- deduplicator = new DefaultDeduplicator(publishment.getDedupIntervalMin());
+ @SuppressWarnings("rawtypes")
+ public void init(Config config, Publishment publishment, Map conf) throws Exception {
+ super.init(config, publishment, conf);
+
if (publishment.getProperties() != null) {
Map<String, String> kafkaConfig = new HashMap<>(publishment.getProperties());
brokerList = kafkaConfig.get(PublishConstants.BROKER_LIST).trim();
- producer = KafkaProducerManager.INSTANCE.getProducer(brokerList);
+ producer = KafkaProducerManager.INSTANCE.getProducer(brokerList, kafkaConfig);
topic = kafkaConfig.get(PublishConstants.TOPIC).trim();
}
}
- @SuppressWarnings("unchecked")
+ @SuppressWarnings({ "unchecked", "rawtypes" })
@Override
public void onAlert(AlertStreamEvent event) throws Exception {
if (producer == null) {
@@ -72,7 +70,12 @@ public class AlertKafkaPublisher implements AlertPublishPlugin {
}
PublishStatus status = new PublishStatus();
try {
- Future<?> future = producer.send(createRecord(event, topic));
+ ProducerRecord record = createRecord(event, topic);
+ if (record == null) {
+ LOG.error(" Alert serialize return null, ignored message! ");
+ return;
+ }
+ Future<?> future = producer.send(record);
future.get(MAX_TIMEOUT_MS, TimeUnit.MILLISECONDS);
status.successful = true;
status.errorMessage = "";
@@ -89,6 +92,7 @@ public class AlertKafkaPublisher implements AlertPublishPlugin {
this.status = status;
}
+ @SuppressWarnings("rawtypes")
@Override
public void update(String dedupIntervalMin, Map<String, String> pluginProperties) {
deduplicator.setDedupIntervalMin(dedupIntervalMin);
@@ -99,7 +103,7 @@ public class AlertKafkaPublisher implements AlertPublishPlugin {
brokerList = newBrokerList;
KafkaProducer newProducer = null;
try {
- newProducer = KafkaProducerManager.INSTANCE.getProducer(brokerList);
+ newProducer = KafkaProducerManager.INSTANCE.getProducer(brokerList, pluginProperties);
} catch (Exception e) {
LOG.error("Create KafkaProducer failed with configurations: {}", pluginProperties);
}
@@ -113,24 +117,22 @@ public class AlertKafkaPublisher implements AlertPublishPlugin {
producer.close();
}
- /**
- * To Create KafkaProducer Record
- * @param event
- * @return ProducerRecord
- * @throws Exception
- */
- private ProducerRecord<String, String> createRecord(AlertStreamEvent event, String topic) throws Exception {
- ProducerRecord<String, String> record = new ProducerRecord<>(topic, event.toString());
- return record;
+ private ProducerRecord<String, Object> createRecord(AlertStreamEvent event, String topic) throws Exception {
+ Object o = serialzeEvent(event);
+ if (o != null) {
+ ProducerRecord<String, Object> record = new ProducerRecord<>(topic, o);
+ return record;
+ } else {
+ return null;
+ }
}
- @Override
- public PublishStatus getStatus() {
- return this.status;
+ private Object serialzeEvent(AlertStreamEvent event) {
+ return serializer.serialize(event);
}
@Override
- public AlertStreamEvent dedup(AlertStreamEvent event) {
- return this.deduplicator.dedup(event);
+ protected Logger getLogger() {
+ return LOG;
}
}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/72a1501c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertPublishPluginsFactory.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertPublishPluginsFactory.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertPublishPluginsFactory.java
index f538088..82be5a0 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertPublishPluginsFactory.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertPublishPluginsFactory.java
@@ -18,6 +18,8 @@
package org.apache.eagle.alert.engine.publisher.impl;
+import java.util.Map;
+
import org.apache.eagle.alert.engine.coordinator.Publishment;
import org.apache.eagle.alert.engine.publisher.AlertPublishPlugin;
import org.slf4j.Logger;
@@ -32,12 +34,13 @@ public class AlertPublishPluginsFactory {
private static final Logger LOG = LoggerFactory.getLogger(AlertPublishPluginsFactory.class);
- public static AlertPublishPlugin createNotificationPlugin(Publishment publishment, Config config) {
+ @SuppressWarnings("rawtypes")
+ public static AlertPublishPlugin createNotificationPlugin(Publishment publishment, Config config, Map conf) {
AlertPublishPlugin plugin = null;
String publisherType = publishment.getType();
try {
plugin = (AlertPublishPlugin) Class.forName(publisherType).newInstance();
- plugin.init(config, publishment);
+ plugin.init(config, publishment, conf);
} catch (Exception ex) {
LOG.error("Error in loading AlertPublisherPlugin class: ", ex);
//throw new IllegalStateException(ex);
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/72a1501c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertPublisherImpl.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertPublisherImpl.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertPublisherImpl.java
index fce22f1..6baa616 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertPublisherImpl.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertPublisherImpl.java
@@ -32,6 +32,7 @@ import org.slf4j.LoggerFactory;
import com.typesafe.config.Config;
+@SuppressWarnings("rawtypes")
public class AlertPublisherImpl implements AlertPublisher {
private static final long serialVersionUID = 4809983246198138865L;
private final static Logger LOG = LoggerFactory.getLogger(AlertPublisherImpl.class);
@@ -40,14 +41,16 @@ public class AlertPublisherImpl implements AlertPublisher {
private volatile Map<String, List<String>> policyPublishPluginMapping = new ConcurrentHashMap<>(1);
private volatile Map<String, AlertPublishPlugin> publishPluginMapping = new ConcurrentHashMap<>(1);
private Config config;
+ private Map conf;
public AlertPublisherImpl(String name) {
this.name = name;
}
@Override
- public void init(Config config) {
+ public void init(Config config, Map conf) {
this.config = config;
+ this.conf = conf;
}
@Override
@@ -84,6 +87,7 @@ public class AlertPublisherImpl implements AlertPublisher {
publishPluginMapping.values().forEach(plugin -> plugin.close());
}
+ @SuppressWarnings("unchecked")
@Override
public void onPublishChange(List<Publishment> added,
List<Publishment> removed,
@@ -100,7 +104,7 @@ public class AlertPublisherImpl implements AlertPublisher {
}
for (Publishment publishment : added) {
- AlertPublishPlugin plugin = AlertPublishPluginsFactory.createNotificationPlugin(publishment, config);
+ AlertPublishPlugin plugin = AlertPublishPluginsFactory.createNotificationPlugin(publishment, config, conf);
if(plugin != null) {
publishPluginMapping.put(publishment.getName(), plugin);
onPolicyAdded(publishment.getPolicyIds(), publishment.getName());
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/72a1501c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/KafkaProducerManager.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/KafkaProducerManager.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/KafkaProducerManager.java
index e8964a8..6b7fc61 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/KafkaProducerManager.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/KafkaProducerManager.java
@@ -17,26 +17,56 @@
*/
package org.apache.eagle.alert.engine.publisher.impl;
+import java.util.Map;
import java.util.Properties;
import org.apache.kafka.clients.producer.KafkaProducer;
/**
- * The producer is thread safe and sharing a single producer instance across threads will generally be faster than having multiple instances.
+ * The producer is thread safe and sharing a single producer instance across threads will generally be faster than
+ * having multiple instances.
*/
public class KafkaProducerManager {
- public static final KafkaProducerManager INSTANCE = new KafkaProducerManager();
-
- public KafkaProducer<String, Object> getProducer(String brokerList) {
- Properties configMap = new Properties();
- configMap.put("bootstrap.servers", brokerList);
- configMap.put("metadata.broker.list", brokerList);
- configMap.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
- configMap.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
- configMap.put("request.required.acks", "1");
- configMap.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
- configMap.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
- KafkaProducer<String, Object> producer = new KafkaProducer<>(configMap);
- return producer;
- }
+
+ private static final String STRING_SERIALIZER = "org.apache.kafka.common.serialization.StringSerializer";
+ private static final String VALUE_DESERIALIZER = "value.deserializer";
+ private static final String KEY_DESERIALIZER = "key.deserializer";
+ private static final String VALUE_SERIALIZER = "value.serializer";
+ private static final String KEY_SERIALIZER = "key.serializer";
+
+ public static final KafkaProducerManager INSTANCE = new KafkaProducerManager();
+
+ public KafkaProducer<String, Object> getProducer(String brokerList, Map<String, String> kafkaConfig) {
+ Properties configMap = new Properties();
+ configMap.put("bootstrap.servers", brokerList);
+ configMap.put("metadata.broker.list", brokerList);
+
+ if (kafkaConfig.containsKey(KEY_SERIALIZER)) {
+ configMap.put(KEY_SERIALIZER, kafkaConfig.get(KEY_SERIALIZER));
+ } else {
+ configMap.put(KEY_SERIALIZER, STRING_SERIALIZER);
+ }
+
+ if (kafkaConfig.containsKey(VALUE_SERIALIZER)) {
+ configMap.put(VALUE_SERIALIZER, kafkaConfig.get(VALUE_SERIALIZER));
+ } else {
+ configMap.put(VALUE_SERIALIZER, STRING_SERIALIZER);
+ }
+ configMap.put("request.required.acks", "1");
+
+ if (kafkaConfig.containsKey(KEY_DESERIALIZER)) {
+ configMap.put(KEY_DESERIALIZER, kafkaConfig.get(KEY_DESERIALIZER));
+ } else {
+ configMap.put(KEY_DESERIALIZER, STRING_SERIALIZER);
+ }
+
+ if (kafkaConfig.containsKey(VALUE_DESERIALIZER)) {
+ configMap.put(VALUE_DESERIALIZER, kafkaConfig.get(VALUE_DESERIALIZER));
+ } else {
+ configMap.put(VALUE_DESERIALIZER, STRING_SERIALIZER);
+ }
+
+ KafkaProducer<String, Object> producer = new KafkaProducer<>(configMap);
+ return producer;
+ }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/72a1501c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/StringEventSerializer.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/StringEventSerializer.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/StringEventSerializer.java
new file mode 100644
index 0000000..012ebaa
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/StringEventSerializer.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.alert.engine.publisher.impl;
+
+import java.util.Map;
+
+import org.apache.eagle.alert.engine.codec.IEventSerializer;
+import org.apache.eagle.alert.engine.model.AlertStreamEvent;
+
+/**
+ * @since Jun 3, 2016
+ *
+ */
+public class StringEventSerializer implements IEventSerializer {
+
+ @SuppressWarnings("rawtypes")
+ public StringEventSerializer(Map stormConf) {
+ }
+
+ @Override
+ public Object serialize(AlertStreamEvent event) {
+ return event.toString();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/72a1501c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/router/impl/StreamRouterBoltOutputCollector.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/router/impl/StreamRouterBoltOutputCollector.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/router/impl/StreamRouterBoltOutputCollector.java
index db235a7..28d2d22 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/router/impl/StreamRouterBoltOutputCollector.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/router/impl/StreamRouterBoltOutputCollector.java
@@ -57,7 +57,7 @@ public class StreamRouterBoltOutputCollector implements PartitionedEventCollecto
private final static Logger LOG = LoggerFactory.getLogger(StreamRouterBoltOutputCollector.class);
private final OutputCollector outputCollector;
private final Object outputLock = new Object();
- private final List<String> outputStreamIds;
+// private final List<String> outputStreamIds;
private final StreamContext streamContext;
private final PartitionedEventSerializer serializer;
private volatile Map<StreamPartition,StreamRouterSpec> routeSpecMap;
@@ -69,7 +69,7 @@ public class StreamRouterBoltOutputCollector implements PartitionedEventCollecto
this.outputCollector = outputCollector;
this.routeSpecMap = new HashMap<>();
this.routePartitionerMap = new HashMap<>();
- this.outputStreamIds = outputStreamIds;
+// this.outputStreamIds = outputStreamIds;
this.streamContext = streamContext;
this.serializer = serializer;
}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/72a1501c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/AbstractStreamBolt.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/AbstractStreamBolt.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/AbstractStreamBolt.java
index cc819ba..131d85a 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/AbstractStreamBolt.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/AbstractStreamBolt.java
@@ -40,6 +40,7 @@ public abstract class AbstractStreamBolt extends BaseRichBolt {
private Config config;
private List<String> outputStreamIds;
protected OutputCollector collector;
+ protected Map stormConf;
public AbstractStreamBolt(IMetadataChangeNotifyService changeNotifyService, Config config){
this.changeNotifyService = changeNotifyService;
@@ -56,6 +57,7 @@ public abstract class AbstractStreamBolt extends BaseRichBolt {
@Override
public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
+ this.stormConf = stormConf;
Preconditions.checkNotNull(this.changeNotifyService, "IMetadataChangeNotifyService is not set yet");
this.collector = collector;
internalPrepare(collector,this.changeNotifyService,this.config,context);
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/72a1501c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/AlertBolt.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/AlertBolt.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/AlertBolt.java
index 30ff5f0..e53b0ba 100755
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/AlertBolt.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/AlertBolt.java
@@ -134,7 +134,7 @@ public class AlertBolt extends AbstractStreamBolt implements AlertBoltSpecListen
public void onAlertBoltSpecChange(AlertBoltSpec spec, Map<String, StreamDefinition> sds) {
List<PolicyDefinition> newPolicies = spec.getBoltPoliciesMap().get(boltId);
if(newPolicies == null) {
- LOG.info("no policy with AlertBoltSpec {} for this bolt {}", spec, boltId);
+ LOG.info("no new policy with AlertBoltSpec {} for this bolt {}", spec, boltId);
return;
}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/72a1501c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/AlertPublisherBolt.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/AlertPublisherBolt.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/AlertPublisherBolt.java
index 0a239e2..768cf48 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/AlertPublisherBolt.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/AlertPublisherBolt.java
@@ -53,13 +53,13 @@ public class AlertPublisherBolt extends AbstractStreamBolt implements AlertPubli
public AlertPublisherBolt(AlertPublisher alertPublisher, Config config, IMetadataChangeNotifyService coordinatorService){
super(coordinatorService, config);
this.alertPublisher = alertPublisher;
- this.alertPublisher.init(config);
}
@Override
public void internalPrepare(OutputCollector collector, IMetadataChangeNotifyService coordinatorService, Config config, TopologyContext context) {
coordinatorService.registerListener(this);
coordinatorService.init(config, MetadataType.ALERT_PUBLISH_BOLT);
+ this.alertPublisher.init(config, stormConf);
streamContext = new StreamContextImpl(config,context.registerMetric("eagle.publisher",new MultiCountMetric(),60),context);
}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/72a1501c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/StreamRouterBolt.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/StreamRouterBolt.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/StreamRouterBolt.java
index 942ef97..85c2f73 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/StreamRouterBolt.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/StreamRouterBolt.java
@@ -16,18 +16,26 @@
*/
package org.apache.eagle.alert.engine.runner;
-import backtype.storm.metric.api.MultiCountMetric;
-import backtype.storm.task.OutputCollector;
-import backtype.storm.task.TopologyContext;
-import backtype.storm.tuple.Tuple;
-import com.typesafe.config.Config;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
import org.apache.commons.collections.CollectionUtils;
import org.apache.eagle.alert.coordination.model.PolicyWorkerQueue;
import org.apache.eagle.alert.coordination.model.RouterSpec;
import org.apache.eagle.alert.coordination.model.StreamRouterSpec;
import org.apache.eagle.alert.engine.StreamContext;
import org.apache.eagle.alert.engine.StreamContextImpl;
-import org.apache.eagle.alert.engine.coordinator.*;
+import org.apache.eagle.alert.engine.coordinator.IMetadataChangeNotifyService;
+import org.apache.eagle.alert.engine.coordinator.MetadataType;
+import org.apache.eagle.alert.engine.coordinator.StreamDefinition;
+import org.apache.eagle.alert.engine.coordinator.StreamPartition;
+import org.apache.eagle.alert.engine.coordinator.StreamSortSpec;
import org.apache.eagle.alert.engine.model.PartitionedEvent;
import org.apache.eagle.alert.engine.router.StreamRouter;
import org.apache.eagle.alert.engine.router.StreamRouterBoltSpecListener;
@@ -39,8 +47,12 @@ import org.apache.eagle.alert.utils.AlertConstants;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.IOException;
-import java.util.*;
+import backtype.storm.metric.api.MultiCountMetric;
+import backtype.storm.task.OutputCollector;
+import backtype.storm.task.TopologyContext;
+import backtype.storm.tuple.Tuple;
+
+import com.typesafe.config.Config;
public class StreamRouterBolt extends AbstractStreamBolt implements StreamRouterBoltSpecListener, SerializationMetadataProvider{
private final static Logger LOG = LoggerFactory.getLogger(StreamRouterBolt.class);
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/72a1501c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/UnitTopologyRunner.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/UnitTopologyRunner.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/UnitTopologyRunner.java
index cef94c7..5a937f2 100755
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/UnitTopologyRunner.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/UnitTopologyRunner.java
@@ -23,6 +23,7 @@ import java.util.ArrayList;
import java.util.List;
import org.apache.eagle.alert.engine.coordinator.IMetadataChangeNotifyService;
+import org.apache.eagle.alert.engine.coordinator.impl.ZKMetadataChangeNotifyService;
import org.apache.eagle.alert.engine.evaluator.impl.PolicyGroupEvaluatorImpl;
import org.apache.eagle.alert.engine.publisher.impl.AlertPublisherImpl;
import org.apache.eagle.alert.engine.router.impl.StreamRouterImpl;
@@ -67,11 +68,17 @@ public class UnitTopologyRunner {
public final static int DEFAULT_MESSAGE_TIMEOUT_SECS = 3600;
private final IMetadataChangeNotifyService metadataChangeNotifyService;
+ private backtype.storm.Config givenStormConfig = null;
public UnitTopologyRunner(IMetadataChangeNotifyService metadataChangeNotifyService){
this.metadataChangeNotifyService = metadataChangeNotifyService;
}
+ public UnitTopologyRunner(ZKMetadataChangeNotifyService changeNotifyService, backtype.storm.Config stormConfig) {
+ this(changeNotifyService);
+ this.givenStormConfig = stormConfig;
+ }
+
public StormTopology buildTopology(String topologyId,
int numOfSpoutTasks,
int numOfRouterBolts,
@@ -148,7 +155,7 @@ public class UnitTopologyRunner {
return builder.createTopology();
}
- public void run(String topologyId,
+ private void run(String topologyId,
int numOfTotalWorkers,
int numOfSpoutTasks,
int numOfRouterBolts,
@@ -156,7 +163,8 @@ public class UnitTopologyRunner {
int numOfPublishTasks,
Config config,
boolean localMode) {
- backtype.storm.Config stormConfig = new backtype.storm.Config();
+
+ backtype.storm.Config stormConfig = givenStormConfig == null ? new backtype.storm.Config() : givenStormConfig;
// TODO: Configurable metric consumer instance number
int messageTimeoutSecs = config.hasPath(MESSAGE_TIMEOUT_SECS)?config.getInt(MESSAGE_TIMEOUT_SECS) : DEFAULT_MESSAGE_TIMEOUT_SECS;
@@ -186,11 +194,6 @@ public class UnitTopologyRunner {
}
}
- public void run(Config config) {
- String topologyId = config.getString("topology.name");
- run(topologyId,config);
- }
-
public void run(String topologyId,Config config) {
int numOfSpoutTasks = config.getInt(SPOUT_TASK_NUM);
int numOfRouterBolts = config.getInt(ROUTER_TASK_NUM);
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/72a1501c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/scheme/JsonScheme.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/scheme/JsonScheme.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/scheme/JsonScheme.java
index a3487d3..c1da90f 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/scheme/JsonScheme.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/scheme/JsonScheme.java
@@ -40,7 +40,8 @@ public class JsonScheme implements Scheme {
private String topic;
- public JsonScheme(String topic) {
+ @SuppressWarnings("rawtypes")
+ public JsonScheme(String topic, Map conf) {
this.topic = topic;
}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/72a1501c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/scheme/PlainStringScheme.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/scheme/PlainStringScheme.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/scheme/PlainStringScheme.java
index 89d2e76..194b0c2 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/scheme/PlainStringScheme.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/scheme/PlainStringScheme.java
@@ -41,7 +41,8 @@ public class PlainStringScheme implements Scheme {
private static final Logger LOG = LoggerFactory.getLogger(PlainStringScheme.class);
private String topic;
- public PlainStringScheme(String topic){
+ @SuppressWarnings("rawtypes")
+ public PlainStringScheme(String topic, Map conf){
this.topic = topic;
}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/72a1501c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/PartitionedEventDigestSerializer.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/PartitionedEventDigestSerializer.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/PartitionedEventDigestSerializer.java
index 5ba1080..03c1dfb 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/PartitionedEventDigestSerializer.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/PartitionedEventDigestSerializer.java
@@ -16,16 +16,16 @@
*/
package org.apache.eagle.alert.engine.serialization;
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
import org.apache.eagle.alert.engine.coordinator.StreamPartition;
import org.apache.eagle.alert.engine.model.PartitionedEvent;
import org.apache.eagle.alert.engine.model.StreamEvent;
import org.apache.eagle.alert.engine.serialization.impl.StreamEventSerializer;
import org.apache.eagle.alert.engine.serialization.impl.StreamPartitionDigestSerializer;
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-
/**
* TODO: Seams the complexity dosen't bring enough performance improve
*
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/72a1501c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/PartitionedEventSerializer.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/PartitionedEventSerializer.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/PartitionedEventSerializer.java
index c518e40..f653361 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/PartitionedEventSerializer.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/PartitionedEventSerializer.java
@@ -16,10 +16,10 @@
*/
package org.apache.eagle.alert.engine.serialization;
-import org.apache.eagle.alert.engine.model.PartitionedEvent;
-
import java.io.IOException;
+import org.apache.eagle.alert.engine.model.PartitionedEvent;
+
public interface PartitionedEventSerializer {
/**
*
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/72a1501c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/Serializers.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/Serializers.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/Serializers.java
index a94604c..6be8f1a 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/Serializers.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/Serializers.java
@@ -16,12 +16,19 @@
*/
package org.apache.eagle.alert.engine.serialization;
-import org.apache.eagle.alert.engine.coordinator.StreamColumn;
-import org.apache.eagle.alert.engine.serialization.impl.*;
-
import java.util.HashMap;
import java.util.Map;
+import org.apache.eagle.alert.engine.coordinator.StreamColumn;
+import org.apache.eagle.alert.engine.serialization.impl.BooleanSerializer;
+import org.apache.eagle.alert.engine.serialization.impl.DoubleSerializer;
+import org.apache.eagle.alert.engine.serialization.impl.FloatSerializer;
+import org.apache.eagle.alert.engine.serialization.impl.IntegerSerializer;
+import org.apache.eagle.alert.engine.serialization.impl.JavaObjectSerializer;
+import org.apache.eagle.alert.engine.serialization.impl.LongSerializer;
+import org.apache.eagle.alert.engine.serialization.impl.PartitionedEventSerializerImpl;
+import org.apache.eagle.alert.engine.serialization.impl.StringSerializer;
+
public class Serializers {
private final static Map<StreamColumn.Type,Serializer<?>> COLUMN_TYPE_SER_MAPPING = new HashMap<>();
@@ -32,6 +39,7 @@ public class Serializers {
COLUMN_TYPE_SER_MAPPING.put(type,serializer);
}
+ @SuppressWarnings("unchecked")
public static <T> Serializer<T> getColumnSerializer(StreamColumn.Type type){
if(COLUMN_TYPE_SER_MAPPING.containsKey(type)){
return (Serializer<T>) COLUMN_TYPE_SER_MAPPING.get(type);
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/72a1501c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/impl/BooleanSerializer.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/impl/BooleanSerializer.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/impl/BooleanSerializer.java
index 1e90569..db91a70 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/impl/BooleanSerializer.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/impl/BooleanSerializer.java
@@ -1,11 +1,11 @@
package org.apache.eagle.alert.engine.serialization.impl;
-import org.apache.eagle.alert.engine.serialization.Serializer;
-
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
+import org.apache.eagle.alert.engine.serialization.Serializer;
+
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/72a1501c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/impl/DoubleSerializer.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/impl/DoubleSerializer.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/impl/DoubleSerializer.java
index ad5f53c..f2f5359 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/impl/DoubleSerializer.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/impl/DoubleSerializer.java
@@ -1,11 +1,11 @@
package org.apache.eagle.alert.engine.serialization.impl;
-import org.apache.eagle.alert.engine.serialization.Serializer;
-
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
+import org.apache.eagle.alert.engine.serialization.Serializer;
+
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with