You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@eagle.apache.org by ra...@apache.org on 2016/09/02 08:34:42 UTC
incubator-eagle git commit: EAGLE-519: no data alert enhancement
Repository: incubator-eagle
Updated Branches:
refs/heads/develop b2b16b745 -> ebbaad091
EAGLE-519: no data alert enhancement
Author: Li, Garrett
Reviewer: ralphsu
This closes #413
Project: http://git-wip-us.apache.org/repos/asf/incubator-eagle/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-eagle/commit/ebbaad09
Tree: http://git-wip-us.apache.org/repos/asf/incubator-eagle/tree/ebbaad09
Diff: http://git-wip-us.apache.org/repos/asf/incubator-eagle/diff/ebbaad09
Branch: refs/heads/develop
Commit: ebbaad091b797945efa7147698de554a53117955
Parents: b2b16b7
Author: Ralph, Su <su...@gmail.com>
Authored: Fri Sep 2 16:33:23 2016 +0800
Committer: Ralph, Su <su...@gmail.com>
Committed: Fri Sep 2 16:33:23 2016 +0800
----------------------------------------------------------------------
.../engine/coordinator/PolicyDefinition.java | 3 +-
.../alert/engine/coordinator/StreamColumn.java | 25 +-
.../eagle/alert/coordinator/Coordinator.java | 2 +-
.../coordinator/impl/GreedyPolicyScheduler.java | 4 +
.../provider/NodataMetadataGenerator.java | 343 +++++++++++++++++++
.../provider/ScheduleContextBuilder.java | 14 +-
.../coordinator/trigger/CoordinatorTrigger.java | 2 +-
.../NodataMetadataGeneratorTest.java | 103 ++++++
.../coordinator/ScheduleContextBuilderTest.java | 93 ++++-
.../TestGreedyScheduleCoordinator.java | 131 +++++++
.../src/test/resources/application.conf | 9 +
.../src/test/resources/test-application.conf | 9 +
.../engine/evaluator/PolicyStreamHandlers.java | 9 +-
.../nodata/DistinctValuesInTimeBatchWindow.java | 128 +++++++
.../nodata/NoDataPolicyTimeBatchHandler.java | 168 +++++++++
.../publisher/impl/JsonEventSerializer.java | 2 +-
.../AttributeCollectWithDistinctAggregator.java | 124 +++++++
.../src/main/resources/eagle.siddhiext | 1 +
.../TestDistinctValuesInTimeBatchWindow.java | 89 +++++
.../TestNoDataPolicyTimeBatchHandler.java | 158 +++++++++
.../src/test/resources/eagle.siddhiext | 18 +
.../simple/application-integration.conf | 16 +-
.../src/test/resources/simple/topologies.json | 14 +-
.../src/main/resources/application.conf | 11 +-
24 files changed, 1442 insertions(+), 34 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/ebbaad09/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/PolicyDefinition.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/PolicyDefinition.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/PolicyDefinition.java
index 0dca247..bc389a2 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/PolicyDefinition.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/PolicyDefinition.java
@@ -144,7 +144,8 @@ public class PolicyDefinition implements Serializable{
if(! (that instanceof PolicyDefinition))
return false;
PolicyDefinition another = (PolicyDefinition)that;
- if(another.name.equals(this.name) &&
+ if(Objects.equals(another.name, this.name) &&
+ Objects.equals(another.description, this.description) &&
CollectionUtils.isEqualCollection(another.inputStreams, this.inputStreams) &&
CollectionUtils.isEqualCollection(another.outputStreams, this.outputStreams) &&
another.definition.equals(this.definition) &&
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/ebbaad09/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/StreamColumn.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/StreamColumn.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/StreamColumn.java
index b11729d..c0d355e 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/StreamColumn.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/StreamColumn.java
@@ -16,15 +16,15 @@
*/
package org.apache.eagle.alert.engine.coordinator;
-import com.fasterxml.jackson.databind.ObjectMapper;
-
-import javax.xml.bind.annotation.XmlType;
-import javax.xml.bind.annotation.adapters.XmlAdapter;
-import javax.xml.bind.annotation.adapters.XmlJavaTypeAdapter;
import java.io.IOException;
import java.io.Serializable;
import java.util.HashMap;
+import javax.xml.bind.annotation.adapters.XmlAdapter;
+import javax.xml.bind.annotation.adapters.XmlJavaTypeAdapter;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+
public class StreamColumn implements Serializable {
private static final long serialVersionUID = -5457861313624389106L;
@@ -33,13 +33,22 @@ public class StreamColumn implements Serializable {
private Object defaultValue;
private boolean required;
private String description;
+ private String nodataExpression;
public String toString() {
- return String.format("StreamColumn=name[%s], type=[%s], defaultValue=[%s], required=[%s]", name, type,
- defaultValue, required);
+ return String.format("StreamColumn=name[%s], type=[%s], defaultValue=[%s], required=[%s], nodataExpression=[%s]",
+ name, type, defaultValue, required, nodataExpression);
}
- public String getName() {
+ public String getNodataExpression() {
+ return nodataExpression;
+ }
+
+ public void setNodataExpression(String nodataExpression) {
+ this.nodataExpression = nodataExpression;
+ }
+
+ public String getName() {
return name;
}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/ebbaad09/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/Coordinator.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/Coordinator.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/Coordinator.java
index f46e4c2..5c455f6 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/Coordinator.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/Coordinator.java
@@ -128,7 +128,7 @@ public class Coordinator {
ScheduleState state = null;
try {
Stopwatch watch = Stopwatch.createStarted();
- IScheduleContext context = new ScheduleContextBuilder(client).buildContext();
+ IScheduleContext context = new ScheduleContextBuilder(config, client).buildContext();
TopologyMgmtService mgmtService = new TopologyMgmtService();
IPolicyScheduler scheduler = PolicySchedulerFactory.createScheduler();
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/ebbaad09/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/impl/GreedyPolicyScheduler.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/impl/GreedyPolicyScheduler.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/impl/GreedyPolicyScheduler.java
index 6c98fa6..ebc533e 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/impl/GreedyPolicyScheduler.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/impl/GreedyPolicyScheduler.java
@@ -298,6 +298,10 @@ public class GreedyPolicyScheduler implements IPolicyScheduler {
* @return
*/
private int getQueueSize(int hint) {
+ if (hint == 0) {
+ // some policies require single bolt to execute
+ return 1;
+ }
return initialQueueSize * ((hint + initialQueueSize - 1) / initialQueueSize);
}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/ebbaad09/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/provider/NodataMetadataGenerator.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/provider/NodataMetadataGenerator.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/provider/NodataMetadataGenerator.java
new file mode 100644
index 0000000..67dedeb
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/provider/NodataMetadataGenerator.java
@@ -0,0 +1,343 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.alert.coordinator.provider;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.eagle.alert.coordination.model.Kafka2TupleMetadata;
+import org.apache.eagle.alert.coordination.model.Tuple2StreamMetadata;
+import org.apache.eagle.alert.engine.coordinator.PolicyDefinition;
+import org.apache.eagle.alert.engine.coordinator.Publishment;
+import org.apache.eagle.alert.engine.coordinator.StreamColumn;
+import org.apache.eagle.alert.engine.coordinator.StreamDefinition;
+import org.apache.eagle.alert.engine.coordinator.StreamPartition;
+import org.apache.eagle.alert.utils.TimePeriodUtils;
+import org.joda.time.Period;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.typesafe.config.Config;
+
+public class NodataMetadataGenerator {
+
+ private static final Logger LOG = LoggerFactory.getLogger(NodataMetadataGenerator.class);
+
+ private static final String NODATA_ALERT_AGGR_STREAM = "nodata_alert_aggregation_stream";
+ private static final String NODATA_ALERT_AGGR_OUTPUT_STREAM = "nodata_alert_aggregation_output_stream";
+ private static final String NODATA_ALERT_AGGR_DATASOURCE_NAME = "nodata_alert_aggregation_ds";
+ private static final String NODATA_ALERT_AGGR_OUTPUT_DATASOURCE_NAME = "nodata_alert_aggregation_output_ds";
+ private static final String NODATA_ALERT_AGGR_TOPIC_NAME = "nodata_alert_aggregation";
+ private static final String NODATA_ALERT_AGGR_OUTPUT_TOPIC_NAME = "nodata_alert";
+
+ private static final String DATASOURCE_TYPE = "KAFKA";
+ private static final String DATASOURCE_SCHEME_CLS = "org.apache.eagle.alert.engine.scheme.JsonScheme";
+
+ private static final String NODATA_ALERT_AGGR_POLICY_TYPE = "nodataalert";
+ private static final String NODATA_ALERT_AGGR_OUTPUT_POLICY_TYPE = "siddhi";
+
+ private static final String JSON_STRING_STREAM_NAME_SELECTOR_CLS = "org.apache.eagle.alert.engine.scheme.JsonStringStreamNameSelector";
+ private static final String STREAM_TIMESTAMP_COLUMN_NAME = "timestamp";
+ private static final String STREAM_TIMESTAMP_FORMAT = "";
+
+ private static final String KAFKA_PUBLISHMENT_TYPE = "org.apache.eagle.alert.engine.publisher.impl.AlertKafkaPublisher";
+ private static final String EMAIL_PUBLISHMENT_TYPE = "org.apache.eagle.alert.engine.publisher.impl.AlertEmailPublisher";
+
+ private static final String PUBLISHMENT_DEDUP_DURATION = "PT0M";
+ private static final String PUBLISHMENT_SERIALIZER = "org.apache.eagle.alert.engine.publisher.impl.JsonEventSerializer";
+
+ public NodataMetadataGenerator() {}
+
+ public void execute(Config config, Map<String, StreamDefinition> streamDefinitionsMap,
+ Map<String, Kafka2TupleMetadata> kafkaSources,
+ Map<String, PolicyDefinition> policies, Map<String, Publishment> publishments) {
+ Collection<StreamDefinition> streamDefinitions = streamDefinitionsMap.values();
+ for (StreamDefinition streamDefinition : streamDefinitions) {
+ StreamColumn columnWithNodataExpression = null;
+ for (StreamColumn column : streamDefinition.getColumns()) {
+ if (StringUtils.isNotBlank(column.getNodataExpression())) {
+ // has nodata alert setting, needs to generate the nodata alert policy
+ if (columnWithNodataExpression != null) {
+ columnWithNodataExpression = null;
+ LOG.warn("Only one column in one stream is allowed to configure nodata alert");
+ break;
+ }
+ columnWithNodataExpression = column;
+ }
+ }
+ if (columnWithNodataExpression != null) {
+ String streamName = streamDefinition.getStreamId();
+
+ // create nodata alert aggr stream
+ if (streamDefinitionsMap.containsKey(NODATA_ALERT_AGGR_STREAM)) {
+ LOG.info("Nodata alert aggregation stream: {} already exists", NODATA_ALERT_AGGR_STREAM);
+ } else {
+ streamDefinitionsMap.put(NODATA_ALERT_AGGR_STREAM, buildAggregationStream());
+ LOG.info("Created nodata alert aggregation stream: {}", NODATA_ALERT_AGGR_STREAM);
+ }
+
+ // create nodata alert aggr output stream
+ if (streamDefinitionsMap.containsKey(NODATA_ALERT_AGGR_OUTPUT_STREAM)) {
+ LOG.info("Nodata alert aggregation output stream: {} already exists", NODATA_ALERT_AGGR_OUTPUT_STREAM);
+ } else {
+ streamDefinitionsMap.put(NODATA_ALERT_AGGR_OUTPUT_STREAM, buildAggregationOutputStream());
+ LOG.info("Created nodata alert aggregation output stream: {}", NODATA_ALERT_AGGR_OUTPUT_STREAM);
+ }
+
+ // create nodata alert data source
+ if (kafkaSources.containsKey(NODATA_ALERT_AGGR_DATASOURCE_NAME)) {
+ LOG.info("Stream: {} nodata alert aggregation datasource: {} already exists",
+ NODATA_ALERT_AGGR_STREAM, NODATA_ALERT_AGGR_DATASOURCE_NAME);
+ } else {
+ kafkaSources.put(NODATA_ALERT_AGGR_DATASOURCE_NAME, buildAggregationDatasource());
+ LOG.info("Created nodata alert aggregation datasource {} for stream {}",
+ NODATA_ALERT_AGGR_DATASOURCE_NAME, NODATA_ALERT_AGGR_STREAM);
+ }
+
+ // create nodata alert aggregation output datasource
+ if (kafkaSources.containsKey(NODATA_ALERT_AGGR_OUTPUT_DATASOURCE_NAME)) {
+ LOG.info("Stream: {} nodata alert aggregation output datasource: {} already exists",
+ NODATA_ALERT_AGGR_OUTPUT_STREAM, NODATA_ALERT_AGGR_OUTPUT_DATASOURCE_NAME);
+ } else {
+ kafkaSources.put(NODATA_ALERT_AGGR_OUTPUT_DATASOURCE_NAME, buildAggregationOutputDatasource());
+ LOG.info("Created nodata alert aggregation output datasource {} for stream {}",
+ NODATA_ALERT_AGGR_DATASOURCE_NAME, NODATA_ALERT_AGGR_OUTPUT_STREAM);
+ }
+
+ // create nodata alert policy
+ String policyName = streamName + "_nodata_alert";
+ String nodataExpression = columnWithNodataExpression.getNodataExpression();
+ String[] segments = nodataExpression.split(",");
+ long windowPeriodInSeconds = TimePeriodUtils.getSecondsOfPeriod(Period.parse(segments[0]));
+ if (policies.containsKey(policyName)) {
+ LOG.info("Stream: {} nodata alert policy: {} already exists", streamName, policyName);
+ } else {
+ policies.put(policyName, buildDynamicNodataPolicy(
+ streamName,
+ policyName,
+ columnWithNodataExpression.getName(),
+ nodataExpression,
+ Arrays.asList(streamName)));
+ LOG.info("Created nodata alert policy {} with expression {} for stream {}",
+ policyName, nodataExpression, streamName);
+ }
+
+ // create nodata alert aggregation
+ String aggrPolicyName = NODATA_ALERT_AGGR_STREAM + "_policy";
+ if (policies.containsKey(aggrPolicyName)) {
+ LOG.info("Stream: {} nodata alert aggregation policy: {} already exists",
+ NODATA_ALERT_AGGR_OUTPUT_STREAM, aggrPolicyName);
+ } else {
+ policies.put(aggrPolicyName, buildAggregationPolicy(
+ aggrPolicyName,
+ columnWithNodataExpression.getName(),
+ windowPeriodInSeconds));
+ LOG.info("Created nodata alert aggregation policy {} for stream {}",
+ aggrPolicyName, NODATA_ALERT_AGGR_OUTPUT_STREAM);
+ }
+
+ // create nodata alert publish
+ String publishmentName = policyName + "_publish";
+ if (publishments.containsKey(publishmentName)) {
+ LOG.info("Stream: {} nodata alert publishment: {} already exists", streamName, publishmentName);
+ } else {
+ String kafkaBroker = config.getString("kafkaProducer.bootstrapServers");
+ publishments.put(publishmentName, buildKafkaAlertPublishment(
+ publishmentName, policyName, kafkaBroker, NODATA_ALERT_AGGR_TOPIC_NAME));
+ publishments.put(publishmentName + "_email", buildEmailAlertPublishment(config,
+ publishmentName + "_email", policyName, kafkaBroker, NODATA_ALERT_AGGR_TOPIC_NAME));
+ LOG.info("Created nodata alert publishment {} for stream {}", policyName + "_publish", streamName);
+ }
+
+ // create nodata alert aggregation publish
+ String aggrPublishName = aggrPolicyName + "_publish";
+ if (publishments.containsKey(aggrPublishName)) {
+ LOG.info("Stream: {} publishment: {} already exists", NODATA_ALERT_AGGR_STREAM, aggrPublishName);
+ } else {
+ String kafkaBroker = config.getString("kafkaProducer.bootstrapServers");
+ publishments.put(aggrPublishName, buildKafkaAlertPublishment(
+ aggrPublishName, aggrPolicyName, kafkaBroker, NODATA_ALERT_AGGR_OUTPUT_TOPIC_NAME));
+ publishments.put(aggrPublishName + "_email", buildEmailAlertPublishment(config,
+ aggrPublishName + "_email", aggrPolicyName, kafkaBroker, NODATA_ALERT_AGGR_OUTPUT_TOPIC_NAME));
+ LOG.info("Created nodata alert publishment {} for stream {}", policyName + "_publish", streamName);
+ }
+ }
+ }
+ }
+
+ private Kafka2TupleMetadata buildAggregationDatasource() {
+ Kafka2TupleMetadata datasource = new Kafka2TupleMetadata();
+ datasource.setName(NODATA_ALERT_AGGR_DATASOURCE_NAME);
+ datasource.setType(DATASOURCE_TYPE);
+ datasource.setSchemeCls(DATASOURCE_SCHEME_CLS);
+ datasource.setTopic(NODATA_ALERT_AGGR_TOPIC_NAME);
+ Tuple2StreamMetadata codec = new Tuple2StreamMetadata();
+ codec.setStreamNameSelectorCls(JSON_STRING_STREAM_NAME_SELECTOR_CLS);
+ codec.setTimestampColumn(STREAM_TIMESTAMP_COLUMN_NAME);
+ codec.setTimestampFormat(STREAM_TIMESTAMP_FORMAT);
+ Properties codecProperties = new Properties();
+ codecProperties.put("userProvidedStreamName", NODATA_ALERT_AGGR_STREAM);
+ codecProperties.put("streamNameFormat", "%s");
+ codec.setStreamNameSelectorProp(codecProperties);
+ datasource.setCodec(codec);
+ return datasource;
+ }
+
+ private Kafka2TupleMetadata buildAggregationOutputDatasource() {
+ Kafka2TupleMetadata datasource = new Kafka2TupleMetadata();
+ datasource.setName(NODATA_ALERT_AGGR_OUTPUT_DATASOURCE_NAME);
+ datasource.setType(DATASOURCE_TYPE);
+ datasource.setSchemeCls(DATASOURCE_SCHEME_CLS);
+ datasource.setTopic(NODATA_ALERT_AGGR_OUTPUT_TOPIC_NAME);
+ Tuple2StreamMetadata codec = new Tuple2StreamMetadata();
+ codec.setStreamNameSelectorCls(JSON_STRING_STREAM_NAME_SELECTOR_CLS);
+ codec.setTimestampColumn(STREAM_TIMESTAMP_COLUMN_NAME);
+ codec.setTimestampFormat(STREAM_TIMESTAMP_FORMAT);
+ Properties codecProperties = new Properties();
+ codecProperties.put("userProvidedStreamName", NODATA_ALERT_AGGR_OUTPUT_STREAM);
+ codecProperties.put("streamNameFormat", "%s");
+ codec.setStreamNameSelectorProp(codecProperties);
+ datasource.setCodec(codec);
+ return datasource;
+ }
+
+ private PolicyDefinition buildDynamicNodataPolicy(String streamName, String policyName,
+ String columnName, String expression, List<String> inputStream) {
+ PolicyDefinition pd = new PolicyDefinition();
+ PolicyDefinition.Definition def = new PolicyDefinition.Definition();
+ //expression, something like "PT5S,dynamic,1,host"
+ def.setValue(expression);
+ def.setType(NODATA_ALERT_AGGR_POLICY_TYPE);
+ pd.setDefinition(def);
+ pd.setInputStreams(inputStream);
+ pd.setOutputStreams(Arrays.asList(NODATA_ALERT_AGGR_STREAM));
+ pd.setName(policyName);
+ pd.setDescription(String.format("Nodata alert policy for stream %s", streamName));
+
+ StreamPartition sp = new StreamPartition();
+ sp.setStreamId(streamName);
+ sp.setColumns(Arrays.asList(columnName));
+ sp.setType(StreamPartition.Type.GROUPBY);
+ pd.addPartition(sp);
+ return pd;
+ }
+
+ private PolicyDefinition buildAggregationPolicy(String policyName, String columnName,
+ long windowPeriodInSeconds) {
+ PolicyDefinition pd = new PolicyDefinition();
+ PolicyDefinition.Definition def = new PolicyDefinition.Definition();
+ String SiddhiQL = String.format(
+ "from %s#window.timeBatch(%s sec) select eagle:collectWithDistinct(%s) as hosts, "
+ + "originalStreamName as streamName group by originalStreamName insert into %s",
+ NODATA_ALERT_AGGR_STREAM, windowPeriodInSeconds * 2,
+ columnName, NODATA_ALERT_AGGR_OUTPUT_STREAM);
+ LOG.info("Generated SiddhiQL {} for stream: {}", SiddhiQL, NODATA_ALERT_AGGR_STREAM);
+ def.setValue(SiddhiQL);
+ def.setType(NODATA_ALERT_AGGR_OUTPUT_POLICY_TYPE);
+ pd.setDefinition(def);
+ pd.setInputStreams(Arrays.asList(NODATA_ALERT_AGGR_STREAM));
+ pd.setOutputStreams(Arrays.asList(NODATA_ALERT_AGGR_OUTPUT_STREAM));
+ pd.setName(policyName);
+ pd.setDescription("Nodata alert aggregation policy, used to merge alerts from multiple bolts");
+
+ StreamPartition sp = new StreamPartition();
+ sp.setStreamId(NODATA_ALERT_AGGR_STREAM);
+ sp.setColumns(Arrays.asList(columnName));
+ sp.setType(StreamPartition.Type.GROUPBY);
+ pd.addPartition(sp);
+ pd.setParallelismHint(0);
+ return pd;
+ }
+
+ private Publishment buildKafkaAlertPublishment(String publishmentName, String policyName, String kafkaBroker, String topic) {
+ Publishment publishment = new Publishment();
+ publishment.setName(publishmentName);
+ publishment.setType(KAFKA_PUBLISHMENT_TYPE);
+ publishment.setPolicyIds(Arrays.asList(policyName));
+ publishment.setDedupIntervalMin(PUBLISHMENT_DEDUP_DURATION);
+ Map<String, String> publishmentProperties = new HashMap<String, String>();
+ publishmentProperties.put("kafka_broker", kafkaBroker);
+ publishmentProperties.put("topic", topic);
+ publishment.setProperties(publishmentProperties);
+ publishment.setSerializer(PUBLISHMENT_SERIALIZER);
+ return publishment;
+ }
+
+ private Publishment buildEmailAlertPublishment(Config config,
+ String publishmentName, String policyName, String kafkaBroker, String topic) {
+ Publishment publishment = new Publishment();
+ publishment.setName(publishmentName);
+ publishment.setType(EMAIL_PUBLISHMENT_TYPE);
+ publishment.setPolicyIds(Arrays.asList(policyName));
+ publishment.setDedupIntervalMin(PUBLISHMENT_DEDUP_DURATION);
+ Map<String, String> publishmentProperties = new HashMap<String, String>();
+ publishmentProperties.put("subject", String.format("Eagle Alert - %s", topic));
+ publishmentProperties.put("template", "");
+ publishmentProperties.put("sender", config.getString("email.sender"));
+ publishmentProperties.put("recipients", config.getString("email.recipients"));
+ publishmentProperties.put("mail.smtp.host", config.getString("email.mailSmtpHost"));
+ publishmentProperties.put("mail.smtp.port", config.getString("email.mailSmtpPort"));
+ publishmentProperties.put("connection", "plaintext");
+ publishment.setProperties(publishmentProperties);
+ publishment.setSerializer(PUBLISHMENT_SERIALIZER);
+ return publishment;
+ }
+
+ private StreamDefinition buildAggregationStream() {
+ StreamDefinition sd = new StreamDefinition();
+ StreamColumn tsColumn = new StreamColumn();
+ tsColumn.setName("timestamp");
+ tsColumn.setType(StreamColumn.Type.LONG);
+
+ StreamColumn hostColumn = new StreamColumn();
+ hostColumn.setName("host");
+ hostColumn.setType(StreamColumn.Type.STRING);
+
+ StreamColumn originalStreamNameColumn = new StreamColumn();
+ originalStreamNameColumn.setName("originalStreamName");
+ originalStreamNameColumn.setType(StreamColumn.Type.STRING);
+
+ sd.setColumns(Arrays.asList(tsColumn, hostColumn, originalStreamNameColumn));
+ sd.setDataSource(NODATA_ALERT_AGGR_DATASOURCE_NAME);
+ sd.setStreamId(NODATA_ALERT_AGGR_STREAM);
+ sd.setDescription("Nodata alert aggregation stream");
+ return sd;
+ }
+
+ private StreamDefinition buildAggregationOutputStream() {
+ StreamDefinition sd = new StreamDefinition();
+ StreamColumn hostColumn = new StreamColumn();
+ hostColumn.setName("hosts");
+ hostColumn.setType(StreamColumn.Type.STRING);
+
+ StreamColumn osnColumn = new StreamColumn();
+ osnColumn.setName("streamName");
+ osnColumn.setType(StreamColumn.Type.STRING);
+
+ sd.setColumns(Arrays.asList(hostColumn, osnColumn));
+ sd.setDataSource(NODATA_ALERT_AGGR_OUTPUT_DATASOURCE_NAME);
+ sd.setStreamId(NODATA_ALERT_AGGR_OUTPUT_STREAM);
+ sd.setDescription("Nodata alert aggregation output stream");
+ return sd;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/ebbaad09/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/provider/ScheduleContextBuilder.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/provider/ScheduleContextBuilder.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/provider/ScheduleContextBuilder.java
index 9013f04..dd38395 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/provider/ScheduleContextBuilder.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/provider/ScheduleContextBuilder.java
@@ -61,6 +61,7 @@ public class ScheduleContextBuilder {
private static final Logger LOG = LoggerFactory.getLogger(ScheduleContextBuilder.class);
private static final String UNIQUE_BOLT_ID = "%s-%s";// toponame-boltname
+ private Config config;
private IMetadataServiceClient client;
private Map<String, Topology> topologies;
@@ -73,10 +74,12 @@ public class ScheduleContextBuilder {
private Map<String, TopologyUsage> usages;
public ScheduleContextBuilder(Config config) {
+ this.config = config;
client = new MetadataServiceClientImpl(config);
}
- public ScheduleContextBuilder(IMetadataServiceClient client) {
+ public ScheduleContextBuilder(Config config, IMetadataServiceClient client) {
+ this.config = config;
this.client = client;
}
@@ -89,10 +92,13 @@ public class ScheduleContextBuilder {
topologies = listToMap(client.listTopologies());
kafkaSources = listToMap(client.listDataSources());
// filter out disabled policies
- policies = listToMap(client.listPolicies().stream().filter(
- (t) -> t.getPolicyStatus() != PolicyStatus.DISABLED).collect(Collectors.toList()));
+ List<PolicyDefinition> enabledPolicies = client.listPolicies().stream().filter(
+ (t) -> t.getPolicyStatus() != PolicyStatus.DISABLED).collect(Collectors.toList());
+ policies = listToMap(enabledPolicies);
publishments = listToMap(client.listPublishment());
streamDefinitions = listToMap(client.listStreams());
+ // generate data sources, policies, publishments for nodata alert
+ new NodataMetadataGenerator().execute(config, streamDefinitions, kafkaSources, policies, publishments);
// TODO: See ScheduleState comments on how to improve the storage
ScheduleState state = client.getVersionedSpec();
@@ -114,7 +120,7 @@ public class ScheduleContextBuilder {
return new InMemScheduleConext(topologies, assignments, kafkaSources, policies, publishments,
streamDefinitions, monitoredStreamMap, usages);
}
-
+
/**
* 1.
* <pre>
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/ebbaad09/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/trigger/CoordinatorTrigger.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/trigger/CoordinatorTrigger.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/trigger/CoordinatorTrigger.java
index c489a0e..4e17179 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/trigger/CoordinatorTrigger.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/trigger/CoordinatorTrigger.java
@@ -64,7 +64,7 @@ public class CoordinatorTrigger implements Runnable {
Stopwatch watch = Stopwatch.createStarted();
// schedule
- IScheduleContext context = new ScheduleContextBuilder(client).buildContext();
+ IScheduleContext context = new ScheduleContextBuilder(config, client).buildContext();
TopologyMgmtService mgmtService = new TopologyMgmtService();
IPolicyScheduler scheduler = PolicySchedulerFactory.createScheduler();
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/ebbaad09/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/test/java/org/apache/alert/coordinator/NodataMetadataGeneratorTest.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/test/java/org/apache/alert/coordinator/NodataMetadataGeneratorTest.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/test/java/org/apache/alert/coordinator/NodataMetadataGeneratorTest.java
new file mode 100644
index 0000000..43dc9c1
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/test/java/org/apache/alert/coordinator/NodataMetadataGeneratorTest.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.alert.coordinator;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.commons.lang3.builder.ToStringBuilder;
+import org.apache.eagle.alert.coordination.model.Kafka2TupleMetadata;
+import org.apache.eagle.alert.coordinator.provider.NodataMetadataGenerator;
+import org.apache.eagle.alert.engine.coordinator.PolicyDefinition;
+import org.apache.eagle.alert.engine.coordinator.Publishment;
+import org.apache.eagle.alert.engine.coordinator.StreamColumn;
+import org.apache.eagle.alert.engine.coordinator.StreamDefinition;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+
+public class NodataMetadataGeneratorTest {
+
+ private static final Logger LOG = LoggerFactory.getLogger(NodataMetadataGeneratorTest.class);
+
+ Config config = ConfigFactory.load().getConfig("coordinator");
+ private NodataMetadataGenerator generator;
+
+ @Before
+ public void setup() {
+ generator = new NodataMetadataGenerator();
+ }
+
+ @Test
+ public void testNormal() throws Exception {
+ StreamDefinition sd = createStreamDefinitionWithNodataAlert();
+ Map<String, StreamDefinition> streamDefinitionsMap = new HashMap<String, StreamDefinition>();
+ streamDefinitionsMap.put(sd.getStreamId(), sd);
+
+ Map<String, Kafka2TupleMetadata> kafkaSources = new HashMap<String, Kafka2TupleMetadata>();
+ Map<String, PolicyDefinition> policies = new HashMap<String, PolicyDefinition>();
+ Map<String, Publishment> publishments = new HashMap<String, Publishment>();
+
+ generator.execute(config, streamDefinitionsMap, kafkaSources, policies, publishments);
+
+ Assert.assertEquals(2, kafkaSources.size());
+
+ kafkaSources.forEach((key, value) -> {
+ LOG.info("KafkaSources > {}: {}", key, ToStringBuilder.reflectionToString(value));
+ });
+
+ Assert.assertEquals(2, policies.size());
+
+ policies.forEach((key, value) -> {
+ LOG.info("Policies > {}: {}", key, ToStringBuilder.reflectionToString(value));
+ });
+
+ Assert.assertEquals(4, publishments.size());
+
+ publishments.forEach((key, value) -> {
+ LOG.info("Publishments > {}: {}", key, ToStringBuilder.reflectionToString(value));
+ });
+ }
+
+ private StreamDefinition createStreamDefinitionWithNodataAlert() {
+ StreamDefinition sd = new StreamDefinition();
+ StreamColumn tsColumn = new StreamColumn();
+ tsColumn.setName("timestamp");
+ tsColumn.setType(StreamColumn.Type.LONG);
+
+ StreamColumn hostColumn = new StreamColumn();
+ hostColumn.setName("host");
+ hostColumn.setType(StreamColumn.Type.STRING);
+ hostColumn.setNodataExpression("PT1M,dynamic,1,host");
+
+ StreamColumn valueColumn = new StreamColumn();
+ valueColumn.setName("value");
+ valueColumn.setType(StreamColumn.Type.DOUBLE);
+
+ sd.setColumns(Arrays.asList(tsColumn, hostColumn, valueColumn));
+ sd.setDataSource("testDataSource");
+ sd.setStreamId("testStreamId");
+ return sd;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/ebbaad09/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/test/java/org/apache/alert/coordinator/ScheduleContextBuilderTest.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/test/java/org/apache/alert/coordinator/ScheduleContextBuilderTest.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/test/java/org/apache/alert/coordinator/ScheduleContextBuilderTest.java
index ed9d7b7..84153f6 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/test/java/org/apache/alert/coordinator/ScheduleContextBuilderTest.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/test/java/org/apache/alert/coordinator/ScheduleContextBuilderTest.java
@@ -21,6 +21,7 @@ import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Map.Entry;
import java.util.Set;
import org.apache.alert.coordinator.mock.InMemMetadataServiceClient;
@@ -48,17 +49,22 @@ import org.apache.eagle.alert.engine.coordinator.StreamSortSpec;
import org.junit.Assert;
import org.junit.Test;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+
/**
* @since May 5, 2016
*
*/
public class ScheduleContextBuilderTest {
+ Config config = ConfigFactory.load().getConfig("coordinator");
+
@Test
public void test() {
InMemMetadataServiceClient client = getSampleMetadataService();
- ScheduleContextBuilder builder = new ScheduleContextBuilder(client);
+ ScheduleContextBuilder builder = new ScheduleContextBuilder(config, client);
IScheduleContext context = builder.buildContext();
@@ -84,7 +90,7 @@ public class ScheduleContextBuilderTest {
@Test
public void test_remove_policy() {
InMemMetadataServiceClient client = getSampleMetadataService();
- ScheduleContextBuilder builder = new ScheduleContextBuilder(client);
+ ScheduleContextBuilder builder = new ScheduleContextBuilder(config, client);
PolicyAssignment assignment1 = client.getVersionedSpec().getAssignments().get(0);
@@ -105,7 +111,7 @@ public class ScheduleContextBuilderTest {
@Test
public void test_changed_policy_partition() {
InMemMetadataServiceClient client = getSampleMetadataService();
- ScheduleContextBuilder builder = new ScheduleContextBuilder(client);
+ ScheduleContextBuilder builder = new ScheduleContextBuilder(config, client);
PolicyAssignment assignment1 = client.getVersionedSpec().getAssignments().get(0);
IScheduleContext context = builder.buildContext();
@@ -143,7 +149,7 @@ public class ScheduleContextBuilderTest {
@Test
public void test_changed_policy_parallelism() {
InMemMetadataServiceClient client = getSampleMetadataService();
- ScheduleContextBuilder builder = new ScheduleContextBuilder(client);
+ ScheduleContextBuilder builder = new ScheduleContextBuilder(config, client);
PolicyAssignment assignment1 = client.getVersionedSpec().getAssignments().get(0);
IScheduleContext context = builder.buildContext();
@@ -171,7 +177,7 @@ public class ScheduleContextBuilderTest {
@Test
public void test_changed_policy_definition() {
InMemMetadataServiceClient client = getSampleMetadataService();
- ScheduleContextBuilder builder = new ScheduleContextBuilder(client);
+ ScheduleContextBuilder builder = new ScheduleContextBuilder(config, client);
PolicyAssignment assignment1 = client.getVersionedSpec().getAssignments().get(0);
IScheduleContext context = builder.buildContext();
@@ -189,11 +195,59 @@ public class ScheduleContextBuilderTest {
// just to make sure queueNew is present
Assert.assertEquals(queue.getQueueId(), queueNew.getQueueId());
}
+
+ @Test
+ public void test_stream_noalert_policies_generation() throws Exception {
+ InMemMetadataServiceClient client = getSampleMetadataServiceWithNodataAlert();
+
+ ScheduleContextBuilder builder = new ScheduleContextBuilder(config, client);
+ IScheduleContext context = builder.buildContext();
+
+ PolicyDefinition policyDefinition = null;
+ PolicyDefinition aggrPolicyDefinition = null;
+ for (Entry<String, PolicyDefinition> entry : context.getPolicies().entrySet()) {
+ if (entry.getKey().endsWith("_nodata_alert")) {
+ policyDefinition = entry.getValue();
+ continue;
+ }
+ if (entry.getKey().endsWith("_aggregation_stream_policy")) {
+ aggrPolicyDefinition = entry.getValue();
+ continue;
+ }
+ }
+ Assert.assertEquals(3, context.getPolicies().size());
+
+ Assert.assertNotNull(policyDefinition);
+ Assert.assertEquals("nodataalert", policyDefinition.getDefinition().getType());
+ Assert.assertEquals("PT5S,dynamic,1," + COL1, policyDefinition.getDefinition().getValue());
+
+ Assert.assertNotNull(aggrPolicyDefinition);
+ Assert.assertEquals("siddhi", aggrPolicyDefinition.getDefinition().getType());
+
+ Kafka2TupleMetadata datasource = null;
+ for (Entry<String, Kafka2TupleMetadata> entry : context.getDataSourceMetadata().entrySet()) {
+ if ("nodata_alert_aggregation_ds".equals(entry.getKey())) {
+ datasource = entry.getValue();
+ break;
+ }
+ }
+ Assert.assertNotNull(datasource);
+
+ String publishmentName = policyDefinition.getName() + "_publish";
+ Publishment publishment = null;
+ for (Entry<String, Publishment> entry : context.getPublishments().entrySet()) {
+ if (publishmentName.equals(entry.getKey())) {
+ publishment = entry.getValue();
+ break;
+ }
+ }
+ Assert.assertNotNull(publishment);
+ }
@Test
public void test_renamed_topologies() {
InMemMetadataServiceClient client = getSampleMetadataService();
- ScheduleContextBuilder builder = new ScheduleContextBuilder(client);
+ ScheduleContextBuilder builder = new ScheduleContextBuilder(config, client);
IScheduleContext context = builder.buildContext();
Assert.assertTrue(context.getPolicyAssignments().containsKey(TEST_POLICY_1));
@@ -226,6 +280,33 @@ public class ScheduleContextBuilderTest {
client.addScheduleState(createScheduleState());
return client;
}
+
+ public static InMemMetadataServiceClient getSampleMetadataServiceWithNodataAlert() {
+ InMemMetadataServiceClient client = new InMemMetadataServiceClient();
+ client.addTopology(createSampleTopology());
+ client.addDataSource(createKafka2TupleMetadata());
+ client.addPolicy(createPolicy());
+ client.addPublishment(createPublishment());
+ client.addStreamDefinition(createStreamDefinitionWithNodataAlert());
+ client.addScheduleState(createScheduleState());
+ return client;
+ }
+
+ private static StreamDefinition createStreamDefinitionWithNodataAlert() {
+ StreamDefinition def = new StreamDefinition();
+ def.setStreamId(TEST_STREAM_DEF_1);
+ def.setDataSource(TEST_DATASOURCE_1);
+
+ StreamColumn col = new StreamColumn();
+ col.setName(COL1);
+ col.setRequired(true);
+ col.setType(Type.STRING);
+ col.setNodataExpression("PT5S,dynamic,1," + COL1);
+ def.getColumns().add(col);
+
+ return def;
+ }
+
private static ScheduleState createScheduleState() {
ScheduleState ss = new ScheduleState();
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/ebbaad09/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/test/java/org/apache/alert/coordinator/TestGreedyScheduleCoordinator.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/test/java/org/apache/alert/coordinator/TestGreedyScheduleCoordinator.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/test/java/org/apache/alert/coordinator/TestGreedyScheduleCoordinator.java
new file mode 100644
index 0000000..a86b13a
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/test/java/org/apache/alert/coordinator/TestGreedyScheduleCoordinator.java
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.alert.coordinator;
+
+import org.apache.eagle.alert.coordinator.ExclusiveExecutor;
+import org.apache.eagle.alert.utils.ZookeeperEmbedded;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TestGreedyScheduleCoordinator {
+
+ public static class ScheduleZkState {
+ volatile boolean scheduleAcquired = false;
+ volatile boolean scheduleCompleted = false;
+ }
+
+ public static class GreedyScheduleCoordinator {
+
+ public int schedule(int input) {
+ ScheduleZkState scheduleZkState = new ScheduleZkState();
+ ExclusiveExecutor.Runnable exclusiveRunnable = new ExclusiveExecutor.Runnable() {
+ @Override
+ public void run() throws Exception {
+ scheduleZkState.scheduleAcquired = true;
+
+ while (!scheduleZkState.scheduleCompleted) {
+ Thread.sleep(2000);
+ }
+ }
+ };
+ ExclusiveExecutor.execute("/alert/test", exclusiveRunnable);
+ int waitMaxTimes = 0;
+ while (waitMaxTimes < 90) { //about 3 minutes waiting
+ if (!scheduleZkState.scheduleAcquired) {
+ waitMaxTimes ++;
+ try {
+ Thread.sleep(2000);
+ } catch (InterruptedException e) {}
+ continue;
+ }
+ try {
+ return input;
+ } finally {
+ //schedule completed
+ scheduleZkState.scheduleCompleted = true;
+ }
+ }
+ throw new RuntimeException("Acquire greedy scheduler lock failed, please retry later");
+ }
+
+ }
+
+ ZookeeperEmbedded zkEmbed;
+
+ @Before
+ public void setUp() throws Exception {
+ zkEmbed = new ZookeeperEmbedded(2181);
+ zkEmbed.start();
+
+ Thread.sleep(2000);
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ zkEmbed.shutdown();
+ }
+
+ @Test
+ public void testMain() throws Exception {
+ final GreedyScheduleCoordinator coordinator = new GreedyScheduleCoordinator();
+
+
+ new Thread(new Runnable() {
+
+ @Override
+ public void run() {
+ System.out.println("output: " + coordinator.schedule(1));
+
+ try {
+ Thread.sleep(1000);
+ } catch (InterruptedException e) {}
+ }
+
+ }).start();
+
+ new Thread(new Runnable() {
+
+ @Override
+ public void run() {
+ System.out.println("output: " + coordinator.schedule(2));
+
+ try {
+ Thread.sleep(1000);
+ } catch (InterruptedException e) {}
+ }
+
+ }).start();
+
+ new Thread(new Runnable() {
+
+ @Override
+ public void run() {
+ System.out.println("output: " + coordinator.schedule(3));
+
+ try {
+ Thread.sleep(1000);
+ } catch (InterruptedException e) {}
+ }
+
+ }).start();
+
+ Thread.sleep(15000);
+ }
+
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/ebbaad09/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/test/resources/application.conf
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/test/resources/application.conf b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/test/resources/application.conf
index 363e661..1ef71a0 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/test/resources/application.conf
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/test/resources/application.conf
@@ -37,6 +37,15 @@
"metadataDynamicCheck" : {
"initDelayMillis" : 1000,
"delayMillis" : 30000
+ },
+ "kafkaProducer": {
+ "bootstrapServers": "localhost:9092"
+ },
+ "email": {
+ "sender": "eagle@eagle.com",
+ "recipients": "test@eagle.com",
+ "mailSmtpHost": "test.eagle.com",
+ "mailSmtpPort": "25"
}
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/ebbaad09/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/test/resources/test-application.conf
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/test/resources/test-application.conf b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/test/resources/test-application.conf
index 361d6d1..63be6a9 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/test/resources/test-application.conf
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/test/resources/test-application.conf
@@ -37,6 +37,15 @@
"metadataDynamicCheck" : {
"initDelayMillis" : 1000,
"delayMillis" : 30000
+ },
+ "kafkaProducer": {
+ "bootstrapServers": "localhost:9092"
+ },
+ "email": {
+ "sender": "eagle@eagle.com",
+ "recipients": "test@eagle.com",
+ "mailSmtpHost": "test.eagle.com",
+ "mailSmtpPort": "25"
}
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/ebbaad09/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/PolicyStreamHandlers.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/PolicyStreamHandlers.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/PolicyStreamHandlers.java
index ef9caf0..1e7aacc 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/PolicyStreamHandlers.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/PolicyStreamHandlers.java
@@ -16,17 +16,17 @@
*/
package org.apache.eagle.alert.engine.evaluator;
+import java.util.Map;
+
import org.apache.eagle.alert.engine.coordinator.PolicyDefinition;
import org.apache.eagle.alert.engine.coordinator.StreamDefinition;
import org.apache.eagle.alert.engine.evaluator.absence.AbsencePolicyHandler;
import org.apache.eagle.alert.engine.evaluator.impl.SiddhiPolicyHandler;
import org.apache.eagle.alert.engine.evaluator.impl.SiddhiPolicyStateHandler;
-import org.apache.eagle.alert.engine.evaluator.nodata.NoDataPolicyHandler;
+import org.apache.eagle.alert.engine.evaluator.nodata.NoDataPolicyTimeBatchHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.Map;
-
/**
* TODO/FIXME: to support multiple stage definition in single policy. The methods in this class is not good to understand now.(Hard code of 0/1).
*/
@@ -42,7 +42,8 @@ public class PolicyStreamHandlers {
if (SIDDHI_ENGINE.equals(definition.getType())) {
return new SiddhiPolicyHandler(sds, 0);// // FIXME: 8/2/16
} else if (NO_DATA_ALERT_ENGINE.equals(definition.getType())) {
- return new NoDataPolicyHandler(sds);
+ // no data for an entire stream won't trigger gap alert (use local time & batch window instead)
+ return new NoDataPolicyTimeBatchHandler(sds);
} else if (ABSENCE_ALERT_ENGINE.equals(definition.getType())) {
return new AbsencePolicyHandler(sds);
} else if (CUSTOMIZED_ENGINE.equals(definition.getType())) {
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/ebbaad09/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/nodata/DistinctValuesInTimeBatchWindow.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/nodata/DistinctValuesInTimeBatchWindow.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/nodata/DistinctValuesInTimeBatchWindow.java
new file mode 100644
index 0000000..357504e
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/nodata/DistinctValuesInTimeBatchWindow.java
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.alert.engine.evaluator.nodata;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.eagle.alert.engine.model.StreamEvent;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class DistinctValuesInTimeBatchWindow {
+
+ private static final Logger LOG = LoggerFactory.getLogger(DistinctValuesInTimeBatchWindow.class);
+
+ private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
+
+ // wisb (what is should be) set for expected full set value of multiple columns
+ @SuppressWarnings("rawtypes")
+ private volatile Set wisb = new HashSet();
+
+ private NoDataPolicyTimeBatchHandler handler;
+
+ /**
+ * map from value to max timestamp for this value
+ */
+ private Map<Object, Long> valueMaxTimeMap = new HashMap<>();
+
+ private long startTime = -1;
+ private long nextEmitTime = -1;
+ private long timeInMilliSeconds;
+
+ public DistinctValuesInTimeBatchWindow(NoDataPolicyTimeBatchHandler handler,
+ long timeInMilliSeconds, @SuppressWarnings("rawtypes") Set wisb) {
+ this.handler = handler;
+ this.timeInMilliSeconds = timeInMilliSeconds;
+ if (wisb != null) {
+ this.wisb = wisb;
+ }
+ }
+
+ public Map<Object, Long> distinctValues() {
+ return valueMaxTimeMap;
+ }
+
+ public void send(StreamEvent event, Object value, long timestamp) {
+ synchronized(this) {
+ if (startTime < 0) {
+ startTime = System.currentTimeMillis();
+
+ scheduler.scheduleAtFixedRate(new Runnable() {
+
+ @SuppressWarnings({ "unchecked", "rawtypes" })
+ @Override
+ public void run() {
+ try {
+ LOG.info("{}/{}: {}", startTime, nextEmitTime, valueMaxTimeMap.keySet());
+ synchronized (valueMaxTimeMap) {
+ boolean sendAlerts = false;
+
+ if (nextEmitTime < 0) {
+ nextEmitTime = startTime + timeInMilliSeconds;
+ }
+
+ if (System.currentTimeMillis() > nextEmitTime) {
+ startTime = nextEmitTime;
+ nextEmitTime += timeInMilliSeconds;
+ sendAlerts = true;
+ } else {
+ sendAlerts = false;
+ }
+
+ if (sendAlerts) {
+ // alert
+ handler.compareAndEmit(wisb, distinctValues().keySet(), event);
+ LOG.info("alert for wiri: {} compares to wisb: {}", distinctValues().keySet(), wisb);
+
+ if (distinctValues().keySet().size() > 0) {
+ wisb = new HashSet(distinctValues().keySet());
+ }
+ valueMaxTimeMap.clear();
+ LOG.info("Clear wiri & update wisb to {}", wisb);
+ }
+ }
+ } catch (Throwable t) {
+ LOG.error("failed to run batch window for gap alert", t);
+ }
+ }
+
+ }, 0, timeInMilliSeconds / 2, TimeUnit.MILLISECONDS);
+ }
+ }
+
+ if (valueMaxTimeMap.containsKey(value)) {
+ // remove that entry with old timestamp in timeSortedMap
+ long oldTime = valueMaxTimeMap.get(value);
+ if (oldTime >= timestamp) {
+ // no any effect as the new timestamp is equal or even less than
+ // old timestamp
+ return;
+ }
+ }
+ // update new timestamp in valueMaxTimeMap
+ valueMaxTimeMap.put(value, timestamp);
+
+ LOG.info("sent: {} with start: {}/next: {}", valueMaxTimeMap.keySet(), startTime, nextEmitTime);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/ebbaad09/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/nodata/NoDataPolicyTimeBatchHandler.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/nodata/NoDataPolicyTimeBatchHandler.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/nodata/NoDataPolicyTimeBatchHandler.java
new file mode 100644
index 0000000..741fce4
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/nodata/NoDataPolicyTimeBatchHandler.java
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.alert.engine.evaluator.nodata;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.eagle.alert.engine.Collector;
+import org.apache.eagle.alert.engine.coordinator.PolicyDefinition;
+import org.apache.eagle.alert.engine.coordinator.StreamDefinition;
+import org.apache.eagle.alert.engine.evaluator.PolicyHandlerContext;
+import org.apache.eagle.alert.engine.evaluator.PolicyStreamHandler;
+import org.apache.eagle.alert.engine.model.AlertStreamEvent;
+import org.apache.eagle.alert.engine.model.StreamEvent;
+import org.apache.eagle.alert.utils.TimePeriodUtils;
+import org.apache.storm.guava.base.Joiner;
+import org.joda.time.Period;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class NoDataPolicyTimeBatchHandler implements PolicyStreamHandler {
+
+ private static final Logger LOG = LoggerFactory.getLogger(NoDataPolicyTimeBatchHandler.class);
+ private Map<String, StreamDefinition> sds;
+
+ private volatile List<Integer> wisbFieldIndices = new ArrayList<>();
+ // reuse PolicyDefinition.defintion.value field to store full set of values
+ // separated by comma
+ private volatile PolicyDefinition policyDef;
+ private volatile Collector<AlertStreamEvent> collector;
+ private volatile PolicyHandlerContext context;
+ private volatile NoDataWisbType wisbType;
+ private volatile DistinctValuesInTimeBatchWindow distinctWindow;
+
+ public NoDataPolicyTimeBatchHandler(Map<String, StreamDefinition> sds){
+ this.sds = sds;
+ }
+
+ @Override
+ public void prepare(Collector<AlertStreamEvent> collector, PolicyHandlerContext context) throws Exception {
+ this.collector = collector;
+ this.context = context;
+ this.policyDef = context.getPolicyDefinition();
+ List<String> inputStreams = policyDef.getInputStreams();
+ // validate inputStreams has to contain only one stream
+ if (inputStreams.size() != 1)
+ throw new IllegalArgumentException("policy inputStream size has to be 1 for no data alert");
+ // validate outputStream has to contain only one stream
+ if (policyDef.getOutputStreams().size() != 1)
+ throw new IllegalArgumentException("policy outputStream size has to be 1 for no data alert");
+
+ String is = inputStreams.get(0);
+ StreamDefinition sd = sds.get(is);
+
+ String policyValue = policyDef.getDefinition().getValue();
+ // assume that no data alert policy value consists of "windowPeriod,
+ // type, numOfFields, f1_name, f2_name, f1_value, f2_value, f1_value,
+ // f2_value}
+ String[] segments = policyValue.split(",");
+ this.wisbType = NoDataWisbType.valueOf(segments[1]);
+ // for provided wisb values, need to parse, for dynamic wisb values, it
+ // is computed through a window
+ @SuppressWarnings("rawtypes")
+ Set wisbValues = null;
+ if (wisbType == NoDataWisbType.provided) {
+ wisbValues = new NoDataWisbProvidedParser().parse(segments);
+ }
+ long windowPeriod = TimePeriodUtils.getMillisecondsOfPeriod(Period.parse(segments[0]));
+ distinctWindow = new DistinctValuesInTimeBatchWindow(this, windowPeriod, wisbValues);
+ // populate wisb field names
+ int numOfFields = Integer.parseInt(segments[2]);
+ for (int i = 3; i < 3 + numOfFields; i++) {
+ String fn = segments[i];
+ wisbFieldIndices.add(sd.getColumnIndex(fn));
+ }
+ }
+
+ @Override
+ public void send(StreamEvent event) throws Exception {
+ Object[] data = event.getData();
+
+ List<Object> columnValues = new ArrayList<>();
+ for (int i = 0; i < wisbFieldIndices.size(); i++) {
+ Object o = data[wisbFieldIndices.get(i)];
+ // convert value to string
+ columnValues.add(o.toString());
+ }
+ // use local timestamp rather than event timestamp
+ distinctWindow.send(event, columnValues, System.currentTimeMillis());
+ LOG.debug("event sent to window with wiri: {}", distinctWindow.distinctValues());
+ }
+
+ @SuppressWarnings("rawtypes")
+ public void compareAndEmit(Set wisb, Set wiri, StreamEvent event) {
+ // compare with wisbValues if wisbValues are already there for dynamic
+ // type
+ Collection noDataValues = CollectionUtils.subtract(wisb, wiri);
+ LOG.debug("nodatavalues:" + noDataValues + ", wisb: " + wisb + ", wiri: " + wiri);
+ if (noDataValues != null && noDataValues.size() > 0) {
+ LOG.info("No data alert is triggered with no data values {} and wisb {}", noDataValues, wisb);
+
+ String is = policyDef.getOutputStreams().get(0);
+ StreamDefinition sd = sds.get(is);
+ int timestampIndex = sd.getColumnIndex("timestamp");
+ int hostIndex = sd.getColumnIndex("host");
+ int originalStreamNameIndex = sd.getColumnIndex("originalStreamName");
+
+ for (Object one : noDataValues) {
+ Object[] triggerEvent = new Object[sd.getColumns().size()];
+ for (int i = 0; i < sd.getColumns().size(); i ++) {
+ if (i == timestampIndex) {
+ triggerEvent[i] = System.currentTimeMillis();
+ } else if (i == hostIndex) {
+ triggerEvent[hostIndex] = ((List) one).get(0);
+ } else if (i == originalStreamNameIndex) {
+ triggerEvent[originalStreamNameIndex] = event.getStreamId();
+ } else if (sd.getColumns().size() < i) {
+ LOG.error("strema event data have different lenght compare to column definition!");
+ } else {
+ triggerEvent[i] = sd.getColumns().get(i).getDefaultValue();
+ }
+ }
+ AlertStreamEvent alertEvent = createAlertEvent(sd, event.getTimestamp(), triggerEvent);
+ LOG.info(String.format("Nodata alert %s generated and will be emitted", Joiner.on(",").join(triggerEvent)));
+ collector.emit(alertEvent);
+ }
+
+ }
+ }
+
+ private AlertStreamEvent createAlertEvent(StreamDefinition sd, long timestamp, Object[] triggerEvent) {
+ AlertStreamEvent event = new AlertStreamEvent();
+ event.setTimestamp(timestamp);
+ event.setData(triggerEvent);
+ event.setStreamId(policyDef.getOutputStreams().get(0));
+ event.setPolicy(context.getPolicyDefinition());
+ if (this.context.getPolicyEvaluator() != null) {
+ event.setCreatedBy(context.getPolicyEvaluator().getName());
+ }
+ event.setCreatedTime(System.currentTimeMillis());
+ event.setSchema(sd);
+ return event;
+ }
+
+ @Override
+ public void close() throws Exception {
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/ebbaad09/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/JsonEventSerializer.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/JsonEventSerializer.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/JsonEventSerializer.java
index bf2a954..f30bf8f 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/JsonEventSerializer.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/JsonEventSerializer.java
@@ -43,7 +43,7 @@ public class JsonEventSerializer implements IEventSerializer {
public Object serialize(AlertStreamEvent event) {
String result = streamEventToJson(event);
if (LOG.isDebugEnabled()) {
- LOG.debug("serialized alert event : ", result);
+ LOG.debug("serialized alert event : {}", result);
}
return result;
}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/ebbaad09/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/siddhi/extension/AttributeCollectWithDistinctAggregator.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/siddhi/extension/AttributeCollectWithDistinctAggregator.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/siddhi/extension/AttributeCollectWithDistinctAggregator.java
new file mode 100644
index 0000000..43400c7
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/siddhi/extension/AttributeCollectWithDistinctAggregator.java
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.alert.engine.siddhi.extension;
+
+import java.util.LinkedList;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.wso2.siddhi.core.config.ExecutionPlanContext;
+import org.wso2.siddhi.core.executor.ExpressionExecutor;
+import org.wso2.siddhi.core.query.selector.attribute.aggregator.AttributeAggregator;
+import org.wso2.siddhi.query.api.definition.Attribute;
+import org.wso2.siddhi.query.api.definition.Attribute.Type;
+
+import com.google.common.collect.ImmutableList;
+
+public class AttributeCollectWithDistinctAggregator extends AttributeAggregator {
+
+ private static final Logger LOG = LoggerFactory.getLogger(AttributeCollectAggregator.class);
+
+ private LinkedList<Object> value;
+
+ public AttributeCollectWithDistinctAggregator() {
+ value = new LinkedList<Object>();
+ }
+
+ @Override
+ public void start() {
+ }
+
+ @Override
+ public void stop() {
+ }
+
+ @Override
+ public Object[] currentState() {
+ return value.toArray();
+ }
+
+ @Override
+ public void restoreState(Object[] arg0) {
+ value = new LinkedList<Object>();
+ if (arg0 != null) {
+ for (Object o : arg0) {
+ value.add(o);
+ }
+ }
+ }
+
+ @Override
+ public Type getReturnType() {
+ return Attribute.Type.OBJECT;
+ }
+
+ @Override
+ protected void init(ExpressionExecutor[] arg0, ExecutionPlanContext arg1) {
+ // TODO: Support max of elements?
+ }
+
+ @Override
+ public Object processAdd(Object arg0) {
+ // AttributeAggregator.process is already synchronized
+ if (value.contains(arg0)) {
+ value.remove(arg0);
+ }
+ value.add(arg0);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("processAdd: current values are : " + value);
+ }
+ return ImmutableList.copyOf(value);
+ }
+
+ @Override
+ public Object processAdd(Object[] arg0) {
+ // AttributeAggregator.process is already synchronized
+ if (value.contains(arg0)) {
+ value.remove(arg0);
+ }
+ value.add(arg0);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("processAdd: current values are : " + value);
+ }
+ return ImmutableList.copyOf(value);
+ }
+
+ // / NOTICE: non O(1)
+ @Override
+ public Object processRemove(Object arg0) {
+ value.remove(arg0);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("processRemove: current values are : " + value);
+ }
+ return ImmutableList.copyOf(value);
+ }
+
+ // / NOTICE: non O(1)
+ @Override
+ public Object processRemove(Object[] arg0) {
+ value.remove(arg0);
+ LOG.info("processRemove: current values are : " + value);
+ return ImmutableList.copyOf(value);
+ }
+
+ @Override
+ public Object reset() {
+ value.clear();
+ return value;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/ebbaad09/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/resources/eagle.siddhiext
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/resources/eagle.siddhiext b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/resources/eagle.siddhiext
index 4ce9805..16569a4 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/resources/eagle.siddhiext
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/resources/eagle.siddhiext
@@ -16,3 +16,4 @@
#
collect=org.apache.eagle.alert.engine.siddhi.extension.AttributeCollectAggregator
+collectWithDistinct=org.apache.eagle.alert.engine.siddhi.extension.AttributeCollectWithDistinctAggregator
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/ebbaad09/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/nodata/TestDistinctValuesInTimeBatchWindow.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/nodata/TestDistinctValuesInTimeBatchWindow.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/nodata/TestDistinctValuesInTimeBatchWindow.java
new file mode 100644
index 0000000..72ef02b
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/nodata/TestDistinctValuesInTimeBatchWindow.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.alert.engine.nodata;
+
+import static org.mockito.Matchers.anyObject;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.times;
+
+import org.apache.eagle.alert.engine.evaluator.nodata.DistinctValuesInTimeBatchWindow;
+import org.apache.eagle.alert.engine.evaluator.nodata.NoDataPolicyTimeBatchHandler;
+import org.apache.eagle.alert.engine.model.StreamEvent;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TestDistinctValuesInTimeBatchWindow {
+
+ private static final String inputStream = "testInputStream";
+
+ private NoDataPolicyTimeBatchHandler handler;
+
+ @Before
+ public void setup() {
+ handler = mock(NoDataPolicyTimeBatchHandler.class);
+ }
+
+ @After
+ public void teardown() {
+ }
+
+ @Test
+ public void testNormal() throws Exception {
+ // wisb is null since it is dynamic mode
+ DistinctValuesInTimeBatchWindow window = new DistinctValuesInTimeBatchWindow(handler, 5 * 1000, null);
+
+ long now = System.currentTimeMillis();
+
+ // handler.compareAndEmit(anyObject(), anyObject(), anyObject());
+
+ // event time
+ sendEventToWindow(window, now, "host1", 95.5);
+
+ Thread.sleep(6000);
+
+ sendEventToWindow(window, now, "host1", 91.0);
+ sendEventToWindow(window, now, "host2", 95.5);
+ sendEventToWindow(window, now, "host2", 97.1);
+
+ Thread.sleep(3000);
+
+ sendEventToWindow(window, now, "host1", 90.7);
+
+ Thread.sleep(4000);
+
+ sendEventToWindow(window, now, "host1", 90.7);
+
+ Thread.sleep(3000);
+
+ verify(handler, times(3)).compareAndEmit(anyObject(), anyObject(), anyObject());
+ }
+
+ private void sendEventToWindow(DistinctValuesInTimeBatchWindow window, long ts, String host, double value) {
+ window.send(buildStreamEvent(ts, host, value), host, ts);
+ }
+
+ private StreamEvent buildStreamEvent(long ts, String host, double value) {
+ StreamEvent e = new StreamEvent();
+ e.setData(new Object[] { ts, host, value });
+ e.setStreamId(inputStream);
+ e.setTimestamp(ts);
+ return e;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/ebbaad09/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/nodata/TestNoDataPolicyTimeBatchHandler.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/nodata/TestNoDataPolicyTimeBatchHandler.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/nodata/TestNoDataPolicyTimeBatchHandler.java
new file mode 100644
index 0000000..02d19b4
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/nodata/TestNoDataPolicyTimeBatchHandler.java
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.alert.engine.nodata;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.eagle.alert.engine.Collector;
+import org.apache.eagle.alert.engine.coordinator.PolicyDefinition;
+import org.apache.eagle.alert.engine.coordinator.StreamColumn;
+import org.apache.eagle.alert.engine.coordinator.StreamDefinition;
+import org.apache.eagle.alert.engine.evaluator.PolicyHandlerContext;
+import org.apache.eagle.alert.engine.evaluator.nodata.NoDataPolicyTimeBatchHandler;
+import org.apache.eagle.alert.engine.model.AlertStreamEvent;
+import org.apache.eagle.alert.engine.model.StreamEvent;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class TestNoDataPolicyTimeBatchHandler {
+
+ private static final Logger LOG = LoggerFactory.getLogger(TestNoDataPolicyTimeBatchHandler.class);
+
+ private static final String inputStream = "testInputStream";
+ private static final String outputStream = "testOutputStream";
+
+ @Before
+ public void setup() {
+ }
+
+ @SuppressWarnings("unchecked")
+ @Test
+ public void testDynamic1() throws Exception {
+ Map<String, StreamDefinition> sds = new HashMap<>();
+ sds.put("testInputStream", buildStreamDef());
+ sds.put("testOutputStream", buildOutputStreamDef());
+ NoDataPolicyTimeBatchHandler handler = new NoDataPolicyTimeBatchHandler(sds);
+
+ PolicyHandlerContext context = new PolicyHandlerContext();
+ context.setPolicyDefinition(buildPolicyDef_dynamic());
+ handler.prepare(new TestCollector(), context);
+
+ long now = System.currentTimeMillis();
+
+ handler.send(buildStreamEvt(now, "host1", 12.5));
+
+ Thread.sleep(2000);
+
+ handler.send(buildStreamEvt(now, "host2", 12.6));
+ handler.send(buildStreamEvt(now, "host1", 20.9));
+ handler.send(buildStreamEvt(now, "host2", 22.1));
+ handler.send(buildStreamEvt(now, "host2", 22.1));
+
+ Thread.sleep(5000);
+
+ handler.send(buildStreamEvt(now, "host2", 22.1));
+ handler.send(buildStreamEvt(now, "host2", 22.3));
+
+ Thread.sleep(5000);
+
+ handler.send(buildStreamEvt(now, "host2", 22.9));
+ handler.send(buildStreamEvt(now, "host1", 41.6));
+ handler.send(buildStreamEvt(now, "host2", 45.6));
+
+ Thread.sleep(1000);
+ }
+
+ @SuppressWarnings("rawtypes")
+ private static class TestCollector implements Collector{
+ @Override
+ public void emit(Object o) {
+ AlertStreamEvent e = (AlertStreamEvent)o;
+ Object[] data = e.getData();
+
+ LOG.info("alert data: {}, {}", data[1], data[0]);
+
+ Assert.assertEquals("host1", data[1]);
+ }
+ }
+
+ private PolicyDefinition buildPolicyDef_dynamic() {
+ PolicyDefinition pd = new PolicyDefinition();
+ PolicyDefinition.Definition def = new PolicyDefinition.Definition();
+ def.setValue("PT5S,dynamic,1,host");
+ def.setType("nodataalert");
+ pd.setDefinition(def);
+ pd.setInputStreams(Arrays.asList(inputStream));
+ pd.setOutputStreams(Arrays.asList(outputStream));
+ pd.setName("nodataalert-test");
+ return pd;
+ }
+
+ private StreamDefinition buildStreamDef() {
+ StreamDefinition sd = new StreamDefinition();
+ StreamColumn tsColumn = new StreamColumn();
+ tsColumn.setName("timestamp");
+ tsColumn.setType(StreamColumn.Type.LONG);
+
+ StreamColumn hostColumn = new StreamColumn();
+ hostColumn.setName("host");
+ hostColumn.setType(StreamColumn.Type.STRING);
+
+ StreamColumn valueColumn = new StreamColumn();
+ valueColumn.setName("value");
+ valueColumn.setType(StreamColumn.Type.DOUBLE);
+
+ sd.setColumns(Arrays.asList(tsColumn, hostColumn, valueColumn));
+ sd.setDataSource("testDataSource");
+ sd.setStreamId("testInputStream");
+ return sd;
+ }
+
+ private StreamDefinition buildOutputStreamDef() {
+ StreamDefinition sd = new StreamDefinition();
+ StreamColumn tsColumn = new StreamColumn();
+ tsColumn.setName("timestamp");
+ tsColumn.setType(StreamColumn.Type.LONG);
+
+ StreamColumn hostColumn = new StreamColumn();
+ hostColumn.setName("host");
+ hostColumn.setType(StreamColumn.Type.STRING);
+
+ StreamColumn valueColumn = new StreamColumn();
+ valueColumn.setName("originalStreamName");
+ valueColumn.setType(StreamColumn.Type.STRING);
+
+ sd.setColumns(Arrays.asList(tsColumn, hostColumn, valueColumn));
+ sd.setDataSource("testDataSource");
+ sd.setStreamId("testOutputStream");
+ return sd;
+ }
+
+ private StreamEvent buildStreamEvt(long ts, String host, double value) {
+ StreamEvent e = new StreamEvent();
+ e.setData(new Object[] { ts, host, value });
+ e.setStreamId(inputStream);
+ e.setTimestamp(ts);
+ return e;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/ebbaad09/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/resources/eagle.siddhiext
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/resources/eagle.siddhiext b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/resources/eagle.siddhiext
new file mode 100644
index 0000000..4ce9805
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/resources/eagle.siddhiext
@@ -0,0 +1,18 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+collect=org.apache.eagle.alert.engine.siddhi.extension.AttributeCollectAggregator
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/ebbaad09/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/resources/simple/application-integration.conf
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/resources/simple/application-integration.conf b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/resources/simple/application-integration.conf
index 73e5b30..3f2fbc3 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/resources/simple/application-integration.conf
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/resources/simple/application-integration.conf
@@ -18,12 +18,12 @@
"numOfTotalWorkers": 20,
"numOfSpoutTasks" : 1,
"numOfRouterBolts" : 4,
- "numOfAlertBolts" : 10,
+ "numOfAlertBolts" : 20,
"numOfPublishTasks" : 1,
"localMode" : "true"
},
"spout" : {
- "kafkaBrokerZkQuorum": "localhost:2181",
+ "kafkaBrokerZkQuorum": "127.0.0.1:2181",
"kafkaBrokerZkBasePath": "/brokers",
"stormKafkaUseSameZkQuorumWithKafkaBroker": true,
"stormKafkaTransactionZkQuorum": "",
@@ -33,7 +33,7 @@
"stormKafkaFetchSizeBytes": 1048586,
},
"zkConfig" : {
- "zkQuorum" : "localhost:2181",
+ "zkQuorum" : "127.0.0.1:2181",
"zkRoot" : "/alert",
"zkSessionTimeoutMs" : 10000,
"connectionTimeoutMs" : 10000,
@@ -55,6 +55,12 @@
"context" : "/rest"
},
"kafkaProducer": {
- "bootstrapServers": "localhost:9092"
- }
+ "bootstrapServers": "127.0.0.1:9092"
+ },
+ "email": {
+ "sender": "eagle@eagle.com",
+ "recipients": "test@eagle.com",
+ "mail.smtp.host": "test.eagle.com",
+ "mail.smtp.port": "25"
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/ebbaad09/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/resources/simple/topologies.json
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/resources/simple/topologies.json b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/resources/simple/topologies.json
index 411cc48..984fcdb 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/resources/simple/topologies.json
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/resources/simple/topologies.json
@@ -2,7 +2,7 @@
{
"name": "alertUnitTopology_1",
"numOfSpout":1,
- "numOfAlertBolt": 10,
+ "numOfAlertBolt": 20,
"numOfGroupBolt": 4,
"spoutId": "alertEngineSpout",
"groupNodeIds" : [
@@ -21,7 +21,17 @@
"alertBolt6",
"alertBolt7",
"alertBolt8",
- "alertBolt9"
+ "alertBolt9",
+ "alertBolt10",
+ "alertBolt11",
+ "alertBolt12",
+ "alertBolt13",
+ "alertBolt14",
+ "alertBolt15",
+ "alertBolt16",
+ "alertBolt17",
+ "alertBolt18",
+ "alertBolt19"
],
"pubBoltId" : "alertPublishBolt",
"spoutParallelism": 1,
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/ebbaad09/eagle-core/eagle-alert-parent/eagle-alert/alert-service/src/main/resources/application.conf
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-service/src/main/resources/application.conf b/eagle-core/eagle-alert-parent/eagle-alert/alert-service/src/main/resources/application.conf
index 97edc5a..72a731a 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-service/src/main/resources/application.conf
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-service/src/main/resources/application.conf
@@ -20,7 +20,7 @@
"policyDefaultParallelism" : 5,
"boltLoadUpbound": 0.8,
"topologyLoadUpbound" : 0.8,
- "numOfAlertBoltsPerTopology" : 5,
+ "numOfAlertBoltsPerTopology" : 20,
"zkConfig" : {
"zkQuorum" : "127.0.0.1:2181",
"zkRoot" : "/alert",
@@ -37,6 +37,15 @@
"metadataDynamicCheck" : {
"initDelayMillis" : 1000,
"delayMillis" : 30000
+ },
+ "kafkaProducer": {
+ "bootstrapServers": "127.0.0.1:9092"
+ },
+ "email": {
+ "sender": "eagle@eagle.com",
+ "recipients": "test@eagle.com",
+ "mailSmtpHost": "test.eagle.com",
+ "mailSmtpPort": "25"
}
},
"datastore": {