You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@eagle.apache.org by yo...@apache.org on 2016/07/11 23:53:50 UTC
[2/2] incubator-eagle git commit: EAGLE-370 absence alert engine
absence alert engine
EAGLE-370 absence alert engine
absence alert engine
Author: Yong Zhang <yo...@apache.org>
Reviewer: Yong Zhang
Closes: #262
Project: http://git-wip-us.apache.org/repos/asf/incubator-eagle/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-eagle/commit/1dffec09
Tree: http://git-wip-us.apache.org/repos/asf/incubator-eagle/tree/1dffec09
Diff: http://git-wip-us.apache.org/repos/asf/incubator-eagle/diff/1dffec09
Branch: refs/heads/develop
Commit: 1dffec09ccb084c484e0427d2ce2cb567403cf21
Parents: 0b77d94
Author: yonzhang <yo...@gmail.com>
Authored: Mon Jul 11 16:56:46 2016 -0700
Committer: yonzhang <yo...@gmail.com>
Committed: Mon Jul 11 16:56:46 2016 -0700
----------------------------------------------------------------------
.../engine/evaluator/PolicyStreamHandlers.java | 8 +-
.../evaluator/absence/AbsenceAlertDriver.java | 68 ++++++++
.../evaluator/absence/AbsenceDailyRule.java | 26 +++
.../evaluator/absence/AbsencePolicyHandler.java | 134 +++++++++++++++
.../engine/evaluator/absence/AbsenceRule.java | 23 +++
.../engine/evaluator/absence/AbsenceWindow.java | 38 +++++
.../absence/AbsenceWindowGenerator.java | 50 ++++++
.../absence/AbsenceWindowProcessor.java | 97 +++++++++++
.../impl/DistinctValuesInTimeWindow.java | 141 ---------------
.../nodata/DistinctValuesInTimeWindow.java | 141 +++++++++++++++
.../evaluator/nodata/NoDataPolicyHandler.java | 28 ++-
.../publisher/impl/JsonEventSerializer.java | 71 ++++++++
.../alert/engine/absence/TestAbsenceDriver.java | 96 +++++++++++
.../absence/TestAbsencePolicyHandler.java | 111 ++++++++++++
.../absence/TestAbsenceWindowGenerator.java | 80 +++++++++
.../absence/TestAbsenceWindowProcessor.java | 70 ++++++++
.../engine/e2e/Integration5AbsenceAlert.java | 94 ++++++++++
.../engine/e2e/SampleClient5AbsenceAlert.java | 93 ++++++++++
.../nodata/TestDistinctValuesInTimeWindow.java | 2 +-
.../alert/engine/nodata/TestNoDataAlert.java | 33 +++-
.../engine/nodata/TestNoDataPolicyHandler.java | 4 +-
.../resources/absence/application-absence.conf | 60 +++++++
.../src/test/resources/absence/datasources.json | 17 ++
.../src/test/resources/absence/policies.json | 24 +++
.../test/resources/absence/publishments.json | 20 +++
.../resources/absence/streamdefinitions.json | 29 ++++
.../src/test/resources/absence/topologies.json | 31 ++++
.../src/test/resources/nodata/policies.json | 5 +-
.../eagle-machinelearning-base/pom.xml | 37 ----
.../apache/eagle/ml/MLAlgorithmEvaluator.java | 50 ------
.../org/apache/eagle/ml/MLAnomalyCallback.java | 28 ---
.../java/org/apache/eagle/ml/MLConstants.java | 24 ---
.../java/org/apache/eagle/ml/MLModelDAO.java | 30 ----
.../org/apache/eagle/ml/MLPolicyEvaluator.java | 170 -------------------
.../eagle/ml/impl/MLAnomalyCallbackImpl.java | 107 ------------
.../apache/eagle/ml/impl/MLModelDAOImpl.java | 62 -------
.../MLPolicyEvaluatorServiceProviderImpl.java | 52 ------
.../org/apache/eagle/ml/model/MLAlgorithm.java | 61 -------
.../apache/eagle/ml/model/MLCallbackResult.java | 137 ---------------
.../eagle/ml/model/MLEntityRepository.java | 25 ---
.../apache/eagle/ml/model/MLModelAPIEntity.java | 67 --------
.../eagle/ml/model/MLPolicyDefinition.java | 82 ---------
.../eagle/ml/utils/MLReflectionUtils.java | 38 -----
....eagle.policy.PolicyEvaluatorServiceProvider | 16 --
....eagle.policy.PolicyEvaluatorServiceProvider | 16 --
.../src/test/resources/application.conf | 57 -------
.../src/test/resources/log4j.properties | 34 ----
.../test/resources/ml-policyDef-UserProfile.txt | 51 ------
eagle-core/eagle-machinelearning/pom.xml | 33 ----
eagle-core/pom.xml | 1 -
....eagle.policy.PolicyEvaluatorServiceProvider | 3 +-
51 files changed, 1443 insertions(+), 1332 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/1dffec09/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/PolicyStreamHandlers.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/PolicyStreamHandlers.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/PolicyStreamHandlers.java
index e8f736c..638b240 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/PolicyStreamHandlers.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/PolicyStreamHandlers.java
@@ -19,19 +19,23 @@ package org.apache.eagle.alert.engine.evaluator;
import java.util.Map;
import org.apache.eagle.alert.engine.coordinator.StreamDefinition;
-import org.apache.eagle.alert.engine.evaluator.impl.SiddhiPolicyHandler;
+import org.apache.eagle.alert.engine.evaluator.absence.AbsencePolicyHandler;
import org.apache.eagle.alert.engine.evaluator.nodata.NoDataPolicyHandler;
+import org.apache.eagle.alert.engine.evaluator.impl.SiddhiPolicyHandler;
public class PolicyStreamHandlers {
public static final String SIDDHI_ENGINE ="siddhi";
public static final String NO_DATA_ALERT_ENGINE ="nodataalert";
+ public static final String ABSENCE_ALERT_ENGINE ="absencealert";
public static PolicyStreamHandler createHandler(String type, Map<String, StreamDefinition> sds){
if(SIDDHI_ENGINE.equals(type)) {
return new SiddhiPolicyHandler(sds);
}else if(NO_DATA_ALERT_ENGINE.equals(type)){
return new NoDataPolicyHandler(sds);
+ }else if(ABSENCE_ALERT_ENGINE.equals(type)){
+ return new AbsencePolicyHandler(sds);
}
- throw new IllegalArgumentException("Illegal policy stream handler type: "+type);
+ throw new IllegalArgumentException("Illegal policy stream handler type " + type);
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/1dffec09/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/absence/AbsenceAlertDriver.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/absence/AbsenceAlertDriver.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/absence/AbsenceAlertDriver.java
new file mode 100644
index 0000000..bf142cd
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/absence/AbsenceAlertDriver.java
@@ -0,0 +1,68 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.alert.engine.evaluator.absence;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+
+/**
+ * Since 7/7/16.
+ * this assumes that event comes in time order
+ */
+public class AbsenceAlertDriver {
+ private static final Logger LOG = LoggerFactory.getLogger(AbsenceAlertDriver.class);
+ private List<Object> expectedAttrs;
+ private AbsenceWindowProcessor processor;
+ private AbsenceWindowGenerator windowGenerator;
+
+ public AbsenceAlertDriver(List<Object> expectedAttrs, AbsenceWindowGenerator windowGenerator){
+ this.expectedAttrs = expectedAttrs;
+ this.windowGenerator = windowGenerator;
+ }
+
+ public void process(List<Object> appearAttrs, long occurTime){
+ // initialize window
+ if(processor == null){
+ processor = nextProcessor(occurTime);
+ LOG.info("initialized a new window {}", processor);
+ }
+ processor.process(appearAttrs, occurTime);
+ AbsenceWindowProcessor.OccurStatus status = processor.checkStatus();
+ boolean expired = processor.checkExpired();
+ if(expired){
+ if(status == AbsenceWindowProcessor.OccurStatus.absent){
+ // send alert
+ LOG.info("this is an alert");
+ // figure out next window and set the new window
+ }
+ processor = nextProcessor(occurTime);
+ LOG.info("created a new window {}", processor);
+ }
+ }
+
+ /**
+ * calculate absolute time range based on current timestamp
+ * @param currTime milliseconds
+ * @return
+ */
+ private AbsenceWindowProcessor nextProcessor(long currTime){
+ AbsenceWindow window = windowGenerator.nextWindow(currTime);
+ return new AbsenceWindowProcessor(expectedAttrs, window);
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/1dffec09/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/absence/AbsenceDailyRule.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/absence/AbsenceDailyRule.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/absence/AbsenceDailyRule.java
new file mode 100644
index 0000000..ed50280
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/absence/AbsenceDailyRule.java
@@ -0,0 +1,26 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.alert.engine.evaluator.absence;
+
+/**
+ * Since 7/7/16.
+ */
+public class AbsenceDailyRule implements AbsenceRule {
+ public static final long DAY_MILLI_SECONDS = 86400*1000L;
+ public long startOffset;
+ public long endOffset;
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/1dffec09/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/absence/AbsencePolicyHandler.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/absence/AbsencePolicyHandler.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/absence/AbsencePolicyHandler.java
new file mode 100644
index 0000000..0a07a27
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/absence/AbsencePolicyHandler.java
@@ -0,0 +1,134 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.alert.engine.evaluator.absence;
+
+import org.apache.eagle.alert.engine.Collector;
+import org.apache.eagle.alert.engine.coordinator.PolicyDefinition;
+import org.apache.eagle.alert.engine.coordinator.StreamDefinition;
+import org.apache.eagle.alert.engine.evaluator.PolicyHandlerContext;
+import org.apache.eagle.alert.engine.evaluator.PolicyStreamHandler;
+import org.apache.eagle.alert.engine.model.AlertStreamEvent;
+import org.apache.eagle.alert.engine.model.StreamEvent;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.text.SimpleDateFormat;
+import java.util.*;
+
+/**
+ * Since 7/6/16.
+ * * policy would be like:
+ * {
+ "name": "absenceAlertPolicy",
+ "description": "absenceAlertPolicy",
+ "inputStreams": [
+ "absenceAlertStream"
+ ],
+ "outputStreams": [
+ "absenceAlertStream_out"
+ ],
+ "definition": {
+ "type": "absencealert",
+ "value": "1,jobID,job1,daily_rule,14:00:00,15:00:00"
+ },
+ "partitionSpec": [
+ {
+ "streamId": "absenceAlertStream",
+ "type": "GROUPBY",
+ "columns" : ["jobID"]
+ }
+ ],
+ "parallelismHint": 2
+ }
+ */
+public class AbsencePolicyHandler implements PolicyStreamHandler {
+ private static final Logger LOG = LoggerFactory.getLogger(AbsencePolicyHandler.class);
+ private Map<String, StreamDefinition> sds;
+ private volatile PolicyDefinition policyDef;
+ private volatile Collector<AlertStreamEvent> collector;
+ private volatile PolicyHandlerContext context;
+ private volatile List<Integer> expectFieldIndices = new ArrayList<>();
+ private volatile List<Object> expectValues = new ArrayList<>();
+ private AbsenceAlertDriver driver;
+
+ public AbsencePolicyHandler(Map<String, StreamDefinition> sds){
+ this.sds = sds;
+ }
+
+ @Override
+ public void prepare(Collector<AlertStreamEvent> collector, PolicyHandlerContext context) throws Exception {
+ this.collector = collector;
+ this.context = context;
+ this.policyDef = context.getPolicyDefinition();
+ List<String> inputStreams = policyDef.getInputStreams();
+ // validate inputStreams has to contain only one stream
+ if(inputStreams.size() != 1)
+ throw new IllegalArgumentException("policy inputStream size has to be 1 for absence alert");
+ // validate outputStream has to contain only one stream
+ if(policyDef.getOutputStreams().size() != 1)
+ throw new IllegalArgumentException("policy outputStream size has to be 1 for absense alert");
+
+ String is = inputStreams.get(0);
+ StreamDefinition sd = sds.get(is);
+
+ String policyValue = policyDef.getDefinition().getValue();
+
+ // assume that absence alert policy value consists of "numOfFields, f1_name, f2_name, f1_value, f2_value, absence_window_rule_type, startTimeOffset, endTimeOffset}
+ String[] segments = policyValue.split(",");
+ int offset = 0;
+ // populate wisb field names
+ int numOfFields = Integer.parseInt(segments[offset++]);
+ for(int i = offset; i < offset+numOfFields; i++){
+ String fn = segments[i];
+ expectFieldIndices.add(sd.getColumnIndex(fn));
+ }
+ offset += numOfFields;
+ for(int i = offset; i < offset+numOfFields; i++){
+ String fn = segments[i];
+ expectValues.add(fn);
+ }
+ offset += numOfFields;
+ String absence_window_rule_type = segments[offset++];
+ AbsenceDailyRule rule = new AbsenceDailyRule();
+ SimpleDateFormat sdf = new SimpleDateFormat("HH:mm:ss");
+ sdf.setTimeZone(TimeZone.getTimeZone("UTC"));
+ Date t1 = sdf.parse(segments[offset++]);
+ rule.startOffset = t1.getTime();
+ Date t2 = sdf.parse(segments[offset++]);
+ rule.endOffset = t2.getTime();
+ AbsenceWindowGenerator generator = new AbsenceWindowGenerator(rule);
+ driver = new AbsenceAlertDriver(expectValues, generator);
+ }
+
+ @Override
+ public void send(StreamEvent event) throws Exception {
+ Object[] data = event.getData();
+ List<Object> columnValues = new ArrayList<>();
+ for(int i=0; i<expectFieldIndices.size(); i++){
+ Object o = data[expectFieldIndices.get(i)];
+ // convert value to string
+ columnValues.add(o.toString());
+ }
+
+ driver.process(columnValues, event.getTimestamp());
+ }
+
+ @Override
+ public void close() throws Exception {
+
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/1dffec09/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/absence/AbsenceRule.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/absence/AbsenceRule.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/absence/AbsenceRule.java
new file mode 100644
index 0000000..272d5cf
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/absence/AbsenceRule.java
@@ -0,0 +1,23 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.alert.engine.evaluator.absence;
+
+/**
+ * Since 7/7/16.
+ */
+public interface AbsenceRule {
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/1dffec09/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/absence/AbsenceWindow.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/absence/AbsenceWindow.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/absence/AbsenceWindow.java
new file mode 100644
index 0000000..728e702
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/absence/AbsenceWindow.java
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.alert.engine.evaluator.absence;
+
+import java.text.SimpleDateFormat;
+import java.util.Date;
+import java.util.TimeZone;
+
+/**
+ * Since 7/7/16.
+ */
+public class AbsenceWindow {
+ public long startTime;
+ public long endTime;
+
+ public String toString(){
+ SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
+ sdf.setTimeZone(TimeZone.getTimeZone("UTC"));
+ String t1 = sdf.format(new Date(startTime));
+ String t2 = sdf.format(new Date(endTime));
+ String format = "startTime=%d (%s), endTime=%d (%s)";
+ return String.format(format, startTime, t1, endTime, t2);
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/1dffec09/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/absence/AbsenceWindowGenerator.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/absence/AbsenceWindowGenerator.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/absence/AbsenceWindowGenerator.java
new file mode 100644
index 0000000..6cd0880
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/absence/AbsenceWindowGenerator.java
@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.alert.engine.evaluator.absence;
+
+/**
+ * Since 7/7/16.
+ */
+public class AbsenceWindowGenerator {
+ private AbsenceRule rule;
+ public AbsenceWindowGenerator(AbsenceRule rule){
+ this.rule = rule;
+ }
+
+ /**
+ * @param currTime
+ * @return
+ */
+ public AbsenceWindow nextWindow(long currTime){
+ AbsenceWindow window = new AbsenceWindow();
+ if(rule instanceof AbsenceDailyRule){
+ AbsenceDailyRule r = (AbsenceDailyRule)rule;
+ long adjustment = 0; // if today's window already expires, then adjust to tomorrow's window
+ if(currTime % AbsenceDailyRule.DAY_MILLI_SECONDS > r.startOffset){
+ adjustment = AbsenceDailyRule.DAY_MILLI_SECONDS;
+ }
+ // use current timestamp to round down to day
+ long day = currTime - currTime % AbsenceDailyRule.DAY_MILLI_SECONDS;
+ day += adjustment;
+ window.startTime = day + r.startOffset;
+ window.endTime = day + r.endOffset;
+ return window;
+ }else{
+ throw new UnsupportedOperationException("not supported rule " + rule);
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/1dffec09/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/absence/AbsenceWindowProcessor.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/absence/AbsenceWindowProcessor.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/absence/AbsenceWindowProcessor.java
new file mode 100644
index 0000000..4e8d381
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/absence/AbsenceWindowProcessor.java
@@ -0,0 +1,97 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.alert.engine.evaluator.absence;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+
+/**
+ * Since 7/6/16.
+ * To process each incoming event
+ * internally maintain state machine to trigger alert when some attribute does not occur within this window
+ */
+public class AbsenceWindowProcessor {
+ private static final Logger LOG = LoggerFactory.getLogger(AbsenceWindowProcessor.class);
+ private List<Object> expectAttrs;
+ private AbsenceWindow window;
+ private boolean expired; // to mark if the time range has been went through
+ private OccurStatus status = OccurStatus.not_sure;
+
+ public enum OccurStatus{
+ not_sure,
+ occured,
+ absent
+ }
+
+ public AbsenceWindowProcessor(List<Object> expectAttrs, AbsenceWindow window){
+ this.expectAttrs = expectAttrs;
+ this.window = window;
+ expired = false;
+ }
+
+ /**
+ * return true if it is certain that expected attributes don't occur during startTime and endTime, else return false
+ * @param appearAttrs
+ * @param occurTime
+ * @return
+ */
+ public void process(List<Object> appearAttrs, long occurTime){
+ if(expired)
+ throw new IllegalStateException("Expired window can't recieve events");
+ switch(status) {
+ case not_sure:
+ if(occurTime < window.startTime) {
+ break;
+ }else if(occurTime >= window.startTime &&
+ occurTime <= window.endTime) {
+ if(expectAttrs.equals(appearAttrs)) {
+ status = OccurStatus.occured;
+ }
+ break;
+ }else{
+ status = OccurStatus.absent;
+ break;
+ }
+ case occured:
+ if(occurTime > window.endTime)
+ expired = true;
+ break;
+ default:
+ break;
+ }
+ // reset status
+ if(status == OccurStatus.absent){
+ expired = true;
+ }
+ }
+
+ public OccurStatus checkStatus(){
+ return status;
+ }
+ public boolean checkExpired(){
+ return expired;
+ }
+ public AbsenceWindow currWindow(){
+ return window;
+ }
+
+ public String toString(){
+ return "expectAttrs=" + expectAttrs + ", status=" + status + ", expired=" + expired + ", window=[" + window + "]";
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/1dffec09/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/impl/DistinctValuesInTimeWindow.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/impl/DistinctValuesInTimeWindow.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/impl/DistinctValuesInTimeWindow.java
deleted file mode 100644
index 8a681da..0000000
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/impl/DistinctValuesInTimeWindow.java
+++ /dev/null
@@ -1,141 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.engine.evaluator.impl;
-
-import java.util.Comparator;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.Map;
-import java.util.SortedMap;
-import java.util.TreeMap;
-
-import org.apache.commons.lang.builder.HashCodeBuilder;
-
-/**
- * Since 6/28/16.
- * to get distinct values within a specified time window
- * valueMaxTimeMap : each distinct value is associated with max timestamp it ever had
- * timeSortedMap : map sorted by timestamp first and then value
- * With the above 2 data structure, we can get distinct values in LOG(N)
- */
-public class DistinctValuesInTimeWindow {
- public static class ValueAndTime{
- Object value;
- long timestamp;
- public ValueAndTime(Object value, long timestamp){
- this.value = value;
- this.timestamp = timestamp;
- }
-
- public String toString(){
- return "[" + value + "," + timestamp + "]";
- }
-
- public int hashCode(){
- return new HashCodeBuilder().append(value).append(timestamp).toHashCode();
- }
-
- public boolean equals(Object that){
- if(!(that instanceof ValueAndTime))
- return false;
- ValueAndTime another = (ValueAndTime)that;
- return another.timestamp == this.timestamp && another.value.equals(this.value);
- }
- }
-
- public static class ValueAndTimeComparator implements Comparator<ValueAndTime>{
- @Override
- public int compare(ValueAndTime o1, ValueAndTime o2) {
- if(o1.timestamp != o2.timestamp)
- return (o1.timestamp > o2.timestamp) ? 1 : -1;
- if(o1.value.equals(o2.value))
- return 0;
- else {
- // this is not strictly correct, but I don't want to write too many comparators here :-)
- if(o1.hashCode() > o2.hashCode())
- return 1;
- else
- return -1;
- }
- }
- }
-
- /**
- * map from value to max timestamp for this value
- */
- private Map<Object, Long> valueMaxTimeMap = new HashMap<>();
- /**
- * map sorted by time(max timestamp for the value) and then value
- */
- private SortedMap<ValueAndTime, ValueAndTime> timeSortedMap = new TreeMap<>(new ValueAndTimeComparator());
- private long maxTimestamp = 0L;
- private long window;
- private boolean windowSlided;
-
- /**
- * @param window - milliseconds
- */
- public DistinctValuesInTimeWindow(long window){
- this.window = window;
- }
-
- public void send(Object value, long timestamp){
- ValueAndTime vt = new ValueAndTime(value, timestamp);
-
- // todo think of time out of order
- if(valueMaxTimeMap.containsKey(value)){
- // remove that entry with old timestamp in timeSortedMap
- long oldTime = valueMaxTimeMap.get(value);
- if(oldTime >= timestamp){
- // no any effect as the new timestamp is equal or even less than old timestamp
- return;
- }
- timeSortedMap.remove(new ValueAndTime(value, oldTime));
- }
- // insert entry with new timestamp in timeSortedMap
- timeSortedMap.put(vt, vt);
- // update new timestamp in valueMaxTimeMap
- valueMaxTimeMap.put(value, timestamp);
-
- // evict old entries
- // store max timestamp if possible
- maxTimestamp = Math.max(maxTimestamp, timestamp);
-
- // check if some values should be evicted because of time window
- Iterator<Map.Entry<ValueAndTime, ValueAndTime>> it = timeSortedMap.entrySet().iterator();
- while(it.hasNext()){
- Map.Entry<ValueAndTime, ValueAndTime> entry = it.next();
- if(entry.getKey().timestamp < maxTimestamp - window){
- // should remove the entry in valueMaxTimeMap and timeSortedMap
- valueMaxTimeMap.remove(entry.getKey().value);
- windowSlided = true;
-
- it.remove();
- }else {
- break;
- }
- }
- }
-
- public Map<Object, Long> distinctValues(){
- return valueMaxTimeMap;
- }
-
- public boolean windowSlided(){
- return windowSlided;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/1dffec09/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/nodata/DistinctValuesInTimeWindow.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/nodata/DistinctValuesInTimeWindow.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/nodata/DistinctValuesInTimeWindow.java
new file mode 100644
index 0000000..676357a
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/nodata/DistinctValuesInTimeWindow.java
@@ -0,0 +1,141 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.alert.engine.evaluator.nodata;
+
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.SortedMap;
+import java.util.TreeMap;
+
+import org.apache.commons.lang.builder.HashCodeBuilder;
+
+/**
+ * Since 6/28/16.
+ * to get distinct values within a specified time window
+ * valueMaxTimeMap : each distinct value is associated with max timestamp it ever had
+ * timeSortedMap : map sorted by timestamp first and then value
+ * With the above 2 data structure, we can get distinct values in LOG(N)
+ */
+public class DistinctValuesInTimeWindow {
+ public static class ValueAndTime{
+ Object value;
+ long timestamp;
+ public ValueAndTime(Object value, long timestamp){
+ this.value = value;
+ this.timestamp = timestamp;
+ }
+
+ public String toString(){
+ return "[" + value + "," + timestamp + "]";
+ }
+
+ public int hashCode(){
+ return new HashCodeBuilder().append(value).append(timestamp).toHashCode();
+ }
+
+ public boolean equals(Object that){
+ if(!(that instanceof ValueAndTime))
+ return false;
+ ValueAndTime another = (ValueAndTime)that;
+ return another.timestamp == this.timestamp && another.value.equals(this.value);
+ }
+ }
+
+ public static class ValueAndTimeComparator implements Comparator<ValueAndTime>{
+ @Override
+ public int compare(ValueAndTime o1, ValueAndTime o2) {
+ if(o1.timestamp != o2.timestamp)
+ return (o1.timestamp > o2.timestamp) ? 1 : -1;
+ if(o1.value.equals(o2.value))
+ return 0;
+ else {
+ // this is not strictly correct, but I don't want to write too many comparators here :-)
+ if(o1.hashCode() > o2.hashCode())
+ return 1;
+ else
+ return -1;
+ }
+ }
+ }
+
+ /**
+ * map from value to max timestamp for this value
+ */
+ private Map<Object, Long> valueMaxTimeMap = new HashMap<>();
+ /**
+ * map sorted by time(max timestamp for the value) and then value
+ */
+ private SortedMap<ValueAndTime, ValueAndTime> timeSortedMap = new TreeMap<>(new ValueAndTimeComparator());
+ private long maxTimestamp = 0L;
+ private long window;
+ private boolean windowSlided;
+
+ /**
+ * @param window - milliseconds
+ */
+ public DistinctValuesInTimeWindow(long window){
+ this.window = window;
+ }
+
+ public void send(Object value, long timestamp){
+ ValueAndTime vt = new ValueAndTime(value, timestamp);
+
+ // todo think of time out of order
+ if(valueMaxTimeMap.containsKey(value)){
+ // remove that entry with old timestamp in timeSortedMap
+ long oldTime = valueMaxTimeMap.get(value);
+ if(oldTime >= timestamp){
+ // no any effect as the new timestamp is equal or even less than old timestamp
+ return;
+ }
+ timeSortedMap.remove(new ValueAndTime(value, oldTime));
+ }
+ // insert entry with new timestamp in timeSortedMap
+ timeSortedMap.put(vt, vt);
+ // update new timestamp in valueMaxTimeMap
+ valueMaxTimeMap.put(value, timestamp);
+
+ // evict old entries
+ // store max timestamp if possible
+ maxTimestamp = Math.max(maxTimestamp, timestamp);
+
+ // check if some values should be evicted because of time window
+ Iterator<Map.Entry<ValueAndTime, ValueAndTime>> it = timeSortedMap.entrySet().iterator();
+ while(it.hasNext()){
+ Map.Entry<ValueAndTime, ValueAndTime> entry = it.next();
+ if(entry.getKey().timestamp < maxTimestamp - window){
+ // should remove the entry in valueMaxTimeMap and timeSortedMap
+ valueMaxTimeMap.remove(entry.getKey().value);
+ windowSlided = true;
+
+ it.remove();
+ }else {
+ break;
+ }
+ }
+ }
+
+ public Map<Object, Long> distinctValues(){
+ return valueMaxTimeMap;
+ }
+
+ public boolean windowSlided(){
+ return windowSlided;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/1dffec09/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/nodata/NoDataPolicyHandler.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/nodata/NoDataPolicyHandler.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/nodata/NoDataPolicyHandler.java
index ed13f71..6e5beb6 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/nodata/NoDataPolicyHandler.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/nodata/NoDataPolicyHandler.java
@@ -29,7 +29,6 @@ import org.apache.eagle.alert.engine.coordinator.PolicyDefinition;
import org.apache.eagle.alert.engine.coordinator.StreamDefinition;
import org.apache.eagle.alert.engine.evaluator.PolicyHandlerContext;
import org.apache.eagle.alert.engine.evaluator.PolicyStreamHandler;
-import org.apache.eagle.alert.engine.evaluator.impl.DistinctValuesInTimeWindow;
import org.apache.eagle.alert.engine.model.AlertStreamEvent;
import org.apache.eagle.alert.engine.model.StreamEvent;
import org.apache.eagle.alert.utils.TimePeriodUtils;
@@ -50,6 +49,29 @@ import org.slf4j.LoggerFactory;
* fixed fields and dynamic fields
* fixed fields are leading fields : windowPeriod, type, numOfFields, f1_name, f2_name
* dynamic fields depend on wisb type.
+ *
+ * policy would be like:
+ * {
+ "name": "noDataAlertPolicy",
+ "description": "noDataAlertPolicy",
+ "inputStreams": [
+ "noDataAlertStream"
+ ],
+ "outputStreams": [
+ "noDataAlertStream_out"
+ ],
+ "definition": {
+ "type": "nodataalert",
+ "value": "PT1M,plain,1,host,host1,host2" // or "value": "PT1M,dynamic,1,host"
+ },
+ "partitionSpec": [
+ {
+ "streamId": "noDataAlertStream",
+ "type": "GROUPBY"
+ }
+ ],
+ "parallelismHint": 2
+ }
*/
public class NoDataPolicyHandler implements PolicyStreamHandler{
private static final Logger LOG = LoggerFactory.getLogger(NoDataPolicyHandler.class);
@@ -61,10 +83,10 @@ public class NoDataPolicyHandler implements PolicyStreamHandler{
private volatile List<Integer> wisbFieldIndices = new ArrayList<>();
// reuse PolicyDefinition.defintion.value field to store full set of values separated by comma
private volatile PolicyDefinition policyDef;
- private volatile DistinctValuesInTimeWindow distinctWindow;
private volatile Collector<AlertStreamEvent> collector;
private volatile PolicyHandlerContext context;
private volatile NoDataWisbType wisbType;
+ private volatile DistinctValuesInTimeWindow distinctWindow;
public NoDataPolicyHandler(Map<String, StreamDefinition> sds){
this.sds = sds;
@@ -161,4 +183,4 @@ public class NoDataPolicyHandler implements PolicyStreamHandler{
public void close() throws Exception {
}
-}
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/1dffec09/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/JsonEventSerializer.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/JsonEventSerializer.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/JsonEventSerializer.java
new file mode 100644
index 0000000..bf2a954
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/JsonEventSerializer.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.alert.engine.publisher.impl;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.eagle.alert.engine.codec.IEventSerializer;
+import org.apache.eagle.alert.engine.coordinator.StreamColumn;
+import org.apache.eagle.alert.engine.model.AlertStreamEvent;
+import org.apache.eagle.alert.utils.JsonUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * @since Jul 9, 2016
+ *
+ */
+public class JsonEventSerializer implements IEventSerializer {
+
+ private static final Logger LOG = LoggerFactory.getLogger(JsonEventSerializer.class);
+
+ @SuppressWarnings("rawtypes")
+ public JsonEventSerializer(Map stormConf) throws Exception {
+ }
+
+ @Override
+ public Object serialize(AlertStreamEvent event) {
+ String result = streamEventToJson(event);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("serialized alert event : ", result);
+ }
+ return result;
+ }
+
+ public String streamEventToJson(AlertStreamEvent event) {
+ Map<String, Object> jsonMap = new HashMap<String, Object>();
+ jsonMap.put("policyId", event.getPolicyId());
+ jsonMap.put("streamId", event.getStreamId());
+ jsonMap.put("createBy", event.getCreatedBy());
+ jsonMap.put("createTime", event.getCreatedTime());
+ // data
+ int size = event.getData().length;
+ List<StreamColumn> columns = event.getSchema().getColumns();
+ for (int i = 0; i < size; i++) {
+ if (columns.size() < i) {
+ // redudant check to log inconsistency
+ LOG.error(" strema event data have different lenght compare to column definition! ");
+ } else {
+ jsonMap.put(columns.get(i).getName(), event.getData()[i]);
+ }
+ }
+ return JsonUtils.writeValueAsString(jsonMap);
+ }
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/1dffec09/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/absence/TestAbsenceDriver.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/absence/TestAbsenceDriver.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/absence/TestAbsenceDriver.java
new file mode 100644
index 0000000..ca5bfdf
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/absence/TestAbsenceDriver.java
@@ -0,0 +1,96 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.alert.engine.absence;
+
+import org.apache.eagle.alert.engine.evaluator.absence.AbsenceDailyRule;
+import org.apache.eagle.alert.engine.evaluator.absence.AbsenceAlertDriver;
+import org.apache.eagle.alert.engine.evaluator.absence.AbsenceWindowGenerator;
+import org.junit.Test;
+
+import java.text.DateFormat;
+import java.text.SimpleDateFormat;
+import java.util.Arrays;
+import java.util.Date;
+import java.util.List;
+import java.util.TimeZone;
+
+/**
+ * Since 7/8/16.
+ */
+public class TestAbsenceDriver {
+ @Test
+ public void testAbsence() throws Exception{
+ // from 2PM to 3PM each day
+ AbsenceDailyRule rule = new AbsenceDailyRule();
+ rule.startOffset = 14*3600*1000;
+ rule.endOffset = 15*3600*1000;
+ AbsenceWindowGenerator generator = new AbsenceWindowGenerator(rule);
+ List<Object> expectAttrs = Arrays.asList("host1");
+ AbsenceAlertDriver driver = new AbsenceAlertDriver(expectAttrs, generator);
+
+ // first event came in 2016-07-08 11:20:00
+ String date = "2016-07-08 11:20:00";
+ DateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
+ df.setTimeZone(TimeZone.getTimeZone("UTC"));
+ Date d = df.parse(date);
+ long baseOccurTime = d.getTime();
+
+ // first event
+ driver.process(Arrays.asList("host2"), baseOccurTime);
+ // event after 1 hour
+ driver.process(Arrays.asList("host2"), baseOccurTime + 3600*1000);
+ // event after 2 hour
+ driver.process(Arrays.asList("host2"), baseOccurTime + 2*3600*1000);
+ // event after 3 hour, enter this window
+ driver.process(Arrays.asList("host2"), baseOccurTime + 3*3600*1000);
+ // event after 3.5 hour, still in this window
+ driver.process(Arrays.asList("host2"), baseOccurTime + 3*3600*1000 + 1800*1000);
+ // event after 4 hour, exit this window
+ driver.process(Arrays.asList("host2"), baseOccurTime + 4*3600*1000);
+ }
+
+ @Test
+ public void testOccurrence() throws Exception{
+ // from 2PM to 3PM each day
+ AbsenceDailyRule rule = new AbsenceDailyRule();
+ rule.startOffset = 14*3600*1000;
+ rule.endOffset = 15*3600*1000;
+ AbsenceWindowGenerator generator = new AbsenceWindowGenerator(rule);
+ List<Object> expectAttrs = Arrays.asList("host1");
+ AbsenceAlertDriver driver = new AbsenceAlertDriver(expectAttrs, generator);
+
+ // first event came in 2016-07-08 11:20:00
+ String date = "2016-07-08 11:20:00";
+ DateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
+ df.setTimeZone(TimeZone.getTimeZone("UTC"));
+ Date d = df.parse(date);
+ long baseOccurTime = d.getTime();
+
+ // first event
+ driver.process(Arrays.asList("host2"), baseOccurTime);
+ // event after 1 hour
+ driver.process(Arrays.asList("host2"), baseOccurTime + 3600*1000);
+ // event after 2 hour
+ driver.process(Arrays.asList("host2"), baseOccurTime + 2*3600*1000);
+ // event after 3 hour, enter this window
+ driver.process(Arrays.asList("host2"), baseOccurTime + 3*3600*1000);
+ // event after 3.5 hour, still in this window
+ driver.process(Arrays.asList("host1"), baseOccurTime + 3*3600*1000 + 1800*1000);
+ // event after 4 hour, exit this window
+ driver.process(Arrays.asList("host2"), baseOccurTime + 4*3600*1000);
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/1dffec09/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/absence/TestAbsencePolicyHandler.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/absence/TestAbsencePolicyHandler.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/absence/TestAbsencePolicyHandler.java
new file mode 100644
index 0000000..7f325c4
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/absence/TestAbsencePolicyHandler.java
@@ -0,0 +1,111 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.alert.engine.absence;
+
+import org.apache.eagle.alert.engine.Collector;
+import org.apache.eagle.alert.engine.coordinator.PolicyDefinition;
+import org.apache.eagle.alert.engine.coordinator.StreamColumn;
+import org.apache.eagle.alert.engine.coordinator.StreamDefinition;
+import org.apache.eagle.alert.engine.evaluator.PolicyHandlerContext;
+import org.apache.eagle.alert.engine.evaluator.absence.AbsencePolicyHandler;
+import org.apache.eagle.alert.engine.model.AlertStreamEvent;
+import org.apache.eagle.alert.engine.model.StreamEvent;
+import org.junit.Assert;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Since 7/8/16.
+ */
+public class TestAbsencePolicyHandler {
+ private static final Logger LOG = LoggerFactory.getLogger(TestAbsencePolicyHandler.class);
+ private static final String inputStream = "testInputStream";
+ private static final String outputStream = "testOutputStream";
+
+ @Test
+ public void test() throws Exception{
+ test(buildPolicyDef_provided());
+ }
+
+ public void test(PolicyDefinition pd) throws Exception{
+ Map<String, StreamDefinition> sds = new HashMap<>();
+ StreamDefinition sd = buildStreamDef();
+ sds.put("testInputStream", sd);
+ AbsencePolicyHandler handler = new AbsencePolicyHandler(sds);
+
+ PolicyHandlerContext context = new PolicyHandlerContext();
+ context.setPolicyDefinition(pd);
+ handler.prepare(new TestCollector(), context);
+
+ handler.send(buildStreamEvt(0, "job1", "running"));
+ }
+
+ private static class TestCollector implements Collector {
+ @Override
+ public void emit(Object o) {
+ AlertStreamEvent e = (AlertStreamEvent)o;
+ Object[] data = e.getData();
+ Assert.assertEquals("host2", data[1]);
+ LOG.info(e.toString());
+ }
+ }
+
+ private PolicyDefinition buildPolicyDef_provided(){
+ PolicyDefinition pd = new PolicyDefinition();
+ PolicyDefinition.Definition def = new PolicyDefinition.Definition();
+ def.setValue("1,jobID,job1,daily_rule,14:00:00,15:00:00");
+ def.setType("absencealert");
+ pd.setDefinition(def);
+ pd.setInputStreams(Arrays.asList(inputStream));
+ pd.setOutputStreams(Arrays.asList(outputStream));
+ pd.setName("absencealert-test");
+ return pd;
+ }
+
+ private StreamDefinition buildStreamDef(){
+ StreamDefinition sd = new StreamDefinition();
+ StreamColumn tsColumn = new StreamColumn();
+ tsColumn.setName("timestamp");
+ tsColumn.setType(StreamColumn.Type.LONG);
+
+ StreamColumn hostColumn = new StreamColumn();
+ hostColumn.setName("jobID");
+ hostColumn.setType(StreamColumn.Type.STRING);
+
+ StreamColumn valueColumn = new StreamColumn();
+ valueColumn.setName("status");
+ valueColumn.setType(StreamColumn.Type.STRING);
+
+ sd.setColumns(Arrays.asList(tsColumn, hostColumn, valueColumn));
+ sd.setDataSource("testDataSource");
+ sd.setStreamId("testStreamId");
+ return sd;
+ }
+
+ private StreamEvent buildStreamEvt(long ts, String jobID, String status){
+ StreamEvent e = new StreamEvent();
+ e.setData(new Object[]{ts, jobID, status});
+ e.setStreamId(inputStream);
+ e.setTimestamp(ts);
+ return e;
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/1dffec09/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/absence/TestAbsenceWindowGenerator.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/absence/TestAbsenceWindowGenerator.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/absence/TestAbsenceWindowGenerator.java
new file mode 100644
index 0000000..e2345c9
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/absence/TestAbsenceWindowGenerator.java
@@ -0,0 +1,80 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.alert.engine.absence;
+
+import org.apache.eagle.alert.engine.evaluator.absence.AbsenceDailyRule;
+import org.apache.eagle.alert.engine.evaluator.absence.AbsenceWindow;
+import org.apache.eagle.alert.engine.evaluator.absence.AbsenceWindowGenerator;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.text.DateFormat;
+import java.text.SimpleDateFormat;
+import java.util.Date;
+import java.util.TimeZone;
+
+/**
+ * Since 7/8/16.
+ */
+public class TestAbsenceWindowGenerator {
+ @Test
+ public void testWindowInToday() throws Exception{
+ AbsenceDailyRule rule = new AbsenceDailyRule();
+ // from 2PM to 3PM each day
+ rule.startOffset = 14*3600*1000;
+ rule.endOffset = 15*3600*1000;
+ AbsenceWindowGenerator generator = new AbsenceWindowGenerator(rule);
+
+ // get current time
+ String date = "2016-07-08 00:00:00";
+ DateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
+ df.setTimeZone(TimeZone.getTimeZone("UTC"));
+ Date d = df.parse(date);
+ long startTimeOfDay = d.getTime();
+
+ String currDate = "2016-07-08 11:30:29";
+ df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
+ df.setTimeZone(TimeZone.getTimeZone("UTC"));
+ d = df.parse(currDate);
+ AbsenceWindow window = generator.nextWindow(d.getTime());
+ Assert.assertEquals(startTimeOfDay+rule.startOffset, window.startTime);
+ }
+
+ @Test
+ public void testWindowInTomorrow() throws Exception{
+ AbsenceDailyRule rule = new AbsenceDailyRule();
+ // from 2PM to 3PM each day
+ rule.startOffset = 14*3600*1000;
+ rule.endOffset = 15*3600*1000;
+ AbsenceWindowGenerator generator = new AbsenceWindowGenerator(rule);
+
+ // get current time
+ String date = "2016-07-08 00:00:00";
+ DateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
+ df.setTimeZone(TimeZone.getTimeZone("UTC"));
+ Date d = df.parse(date);
+ long startTimeOfDay = d.getTime();
+
+ String currDate = "2016-07-08 18:20:19";
+ df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
+ df.setTimeZone(TimeZone.getTimeZone("UTC"));
+ d = df.parse(currDate);
+ AbsenceWindow window = generator.nextWindow(d.getTime());
+ // this needs adjustment for one day
+ Assert.assertEquals(startTimeOfDay+rule.startOffset + AbsenceDailyRule.DAY_MILLI_SECONDS, window.startTime);
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/1dffec09/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/absence/TestAbsenceWindowProcessor.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/absence/TestAbsenceWindowProcessor.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/absence/TestAbsenceWindowProcessor.java
new file mode 100644
index 0000000..a47c7a4
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/absence/TestAbsenceWindowProcessor.java
@@ -0,0 +1,70 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.alert.engine.absence;
+
+import org.apache.eagle.alert.engine.evaluator.absence.AbsenceWindow;
+import org.apache.eagle.alert.engine.evaluator.absence.AbsenceWindowProcessor;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.List;
+
+/**
+ * Since 7/6/16.
+ */
+public class TestAbsenceWindowProcessor {
+ @Test
+ public void testDataMissing(){
+ List<Object> expectedHosts = Arrays.asList("host1");
+ AbsenceWindow window = new AbsenceWindow();
+ window.startTime = 100L;
+ window.endTime = 200L;
+ AbsenceWindowProcessor processor = new AbsenceWindowProcessor(expectedHosts, window);
+ processor.process(Arrays.asList("host2"), 90);
+ Assert.assertEquals(processor.checkStatus(), AbsenceWindowProcessor.OccurStatus.not_sure);
+ processor.process(Arrays.asList("host3"), 101);
+ Assert.assertEquals(processor.checkStatus(), AbsenceWindowProcessor.OccurStatus.not_sure);
+ processor.process(Arrays.asList("host3"), 138);
+ Assert.assertEquals(processor.checkStatus(), AbsenceWindowProcessor.OccurStatus.not_sure);
+ processor.process(Arrays.asList("host2"), 189);
+ Assert.assertEquals(processor.checkStatus(), AbsenceWindowProcessor.OccurStatus.not_sure);
+ processor.process(Arrays.asList("host2"), 201);
+ Assert.assertEquals(processor.checkStatus(), AbsenceWindowProcessor.OccurStatus.absent);
+ }
+
+ @Test(expected = IllegalStateException.class)
+ public void testDataExists(){
+ List<Object> expectedHosts = Arrays.asList("host1");
+ AbsenceWindow window = new AbsenceWindow();
+ window.startTime = 100L;
+ window.endTime = 200L;
+ AbsenceWindowProcessor processor = new AbsenceWindowProcessor(expectedHosts, window);
+ processor.process(Arrays.asList("host2"), 90);
+ Assert.assertEquals(processor.checkStatus(), AbsenceWindowProcessor.OccurStatus.not_sure);
+ processor.process(Arrays.asList("host3"), 101);
+ Assert.assertEquals(processor.checkStatus(), AbsenceWindowProcessor.OccurStatus.not_sure);
+ processor.process(Arrays.asList("host1"), 138);
+ Assert.assertEquals(processor.checkStatus(), AbsenceWindowProcessor.OccurStatus.occured);
+ processor.process(Arrays.asList("host2"), 189);
+ Assert.assertEquals(processor.checkStatus(), AbsenceWindowProcessor.OccurStatus.occured);
+ processor.process(Arrays.asList("host2"), 201);
+ Assert.assertEquals(processor.checkStatus(), AbsenceWindowProcessor.OccurStatus.occured);
+ Assert.assertEquals(processor.checkExpired(), true);
+ processor.process(Arrays.asList("host2"), 225);
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/1dffec09/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/e2e/Integration5AbsenceAlert.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/e2e/Integration5AbsenceAlert.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/e2e/Integration5AbsenceAlert.java
new file mode 100644
index 0000000..52d1e5d
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/e2e/Integration5AbsenceAlert.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.alert.engine.e2e;
+
+import backtype.storm.utils.Utils;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+import org.apache.eagle.alert.engine.UnitTopologyMain;
+import org.apache.eagle.alert.utils.KafkaEmbedded;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Ignore;
+import org.junit.Test;
+
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+/**
+ * Since 6/29/16.
+ */
+public class Integration5AbsenceAlert {
+ private String[] args;
+
+ private ExecutorService executors = Executors.newFixedThreadPool(5);
+
+ private static KafkaEmbedded kafka;
+
+ @BeforeClass
+ public static void setup() {
+ // FIXME : start local kafka
+ }
+
+ @AfterClass
+ public static void end() {
+ if (kafka != null) {
+ kafka.shutdown();
+ }
+ }
+ @Test @Ignore
+ public void testTriggerAbsenceAlert() throws Exception{
+ System.setProperty("config.resource", "/absence/application-absence.conf");
+ ConfigFactory.invalidateCaches();
+ Config config = ConfigFactory.load();
+
+ System.out.println("loading metadatas...");
+ Integration1.loadMetadatas("/absence/", config);
+ System.out.println("loading metadatas done!");
+
+
+ executors.submit(() -> {
+ try {
+ UnitTopologyMain.main(args);
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ });
+
+ // wait 20 seconds for topology to bring up
+ try{
+ Thread.sleep(20000);
+ }catch(Exception ex){}
+
+ // send mock data
+ executors.submit(() -> {
+ try {
+ SampleClient5AbsenceAlert.main(args);
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ });
+
+
+ Utils.sleep(1000 * 5l);
+ while (true) {
+ Integration1.proactive_schedule(config);
+
+ Utils.sleep(1000 * 60l * 5);
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/1dffec09/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/e2e/SampleClient5AbsenceAlert.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/e2e/SampleClient5AbsenceAlert.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/e2e/SampleClient5AbsenceAlert.java
new file mode 100644
index 0000000..0256324
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/e2e/SampleClient5AbsenceAlert.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.alert.engine.e2e;
+
+import backtype.storm.utils.Utils;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Properties;
+
+/**
+ * Since 6/29/16.
+ */
+public class SampleClient5AbsenceAlert {
+ private static final Logger LOG = LoggerFactory.getLogger(SampleClient5AbsenceAlert.class);
+ private static long currentTimestamp = 1467240000000L;
+ private static long interval = 3000L;
+ public static void main(String[] args) throws Exception {
+ System.setProperty("config.resource", "/absence/application-absence.conf");
+ ConfigFactory.invalidateCaches();
+
+ Config config = ConfigFactory.load();
+ KafkaProducer producer = createProducer(config);
+ ProducerRecord record = null;
+ record = new ProducerRecord("absenceAlertTopic", createEvent("job1"));
+ producer.send(record);
+ record = new ProducerRecord("absenceAlertTopic", createEvent("job2"));
+ producer.send(record);
+ record = new ProducerRecord("absenceAlertTopic", createEvent("host3"));
+ producer.send(record);
+ }
+
+ private static class AbsenceEvent{
+ @JsonProperty
+ long timestamp;
+ @JsonProperty
+ String jobID;
+ @JsonProperty
+ String status;
+
+ public String toString(){
+ return "timestamp=" + timestamp + ",jobID=" + jobID + ",status=" + status;
+ }
+ }
+
+ private static String createEvent(String jobID) throws Exception{
+ AbsenceEvent e = new AbsenceEvent();
+ long expectTS = currentTimestamp + interval;
+ // adjust back 1 second random
+ long adjust = Math.round(2*Math.random());
+ e.timestamp = expectTS-adjust;
+ e.jobID = jobID;
+ e.status = "running";
+ LOG.info("sending event {} ", e);
+ ObjectMapper mapper = new ObjectMapper();
+ String value = mapper.writeValueAsString(e);
+ return value;
+ }
+
+
+ public static KafkaProducer<String, String> createProducer(Config config) {
+ String servers = config.getString("kafkaProducer.bootstrapServers");
+ Properties configMap = new Properties();
+ configMap.put("bootstrap.servers", servers);
+ configMap.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
+ configMap.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
+ configMap.put("request.required.acks", "1");
+ configMap.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
+ configMap.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
+ KafkaProducer<String, String> proceduer = new KafkaProducer<String, String>(configMap);
+ return proceduer;
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/1dffec09/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/nodata/TestDistinctValuesInTimeWindow.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/nodata/TestDistinctValuesInTimeWindow.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/nodata/TestDistinctValuesInTimeWindow.java
index f97b1a8..27744a4 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/nodata/TestDistinctValuesInTimeWindow.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/nodata/TestDistinctValuesInTimeWindow.java
@@ -21,7 +21,7 @@ import java.util.Map;
import java.util.SortedMap;
import java.util.TreeMap;
-import org.apache.eagle.alert.engine.evaluator.impl.DistinctValuesInTimeWindow;
+import org.apache.eagle.alert.engine.evaluator.nodata.DistinctValuesInTimeWindow;
import org.junit.Test;
/**
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/1dffec09/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/nodata/TestNoDataAlert.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/nodata/TestNoDataAlert.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/nodata/TestNoDataAlert.java
index 569a3b0..f50ad15 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/nodata/TestNoDataAlert.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/nodata/TestNoDataAlert.java
@@ -80,4 +80,35 @@ public class TestNoDataAlert {
// }
// Thread.sleep(10000);
}
-}
+
+ /**
+ * only alert when the successive 2 events has number of missing blocks changed
+ *from every a = hadoopJmxMetricEventStream[ component=="namenode" and metric == "hadoop.namenode.dfs.missingblocks"] -> b = hadoopJmxMetricEventStream[b.component==a.component and b.metric==a.metric and b.host==a.host and convert(b.value, "long") > convert(a.value, "long") ] select b.metric as metric, b.host as host, b.value as newNumOfMissingBlocks, a.value as oldNumOfMissingBlocks, b.timestamp as timestamp, b.component as component, b.site as site insert into tmp;
+ */
+ @Test
+ public void testMissingBlock() throws Exception{
+ ExecutionPlanRuntime runtime = new SiddhiManager().createExecutionPlanRuntime(
+ "define stream hadoopJmxMetricEventStream (component string, metric string, host string, site string, value double, timestamp long);"+
+ "from every a = hadoopJmxMetricEventStream[ component==\"namenode\" and metric == \"hadoop.namenode.dfs.missingblocks\"] -> "+
+ "b = hadoopJmxMetricEventStream[b.component==a.component and b.metric==a.metric and b.host==a.host and "+
+ "convert(b.value, \"long\") > convert(a.value, \"long\") ] select b.metric as metric, b.host as host, "+
+ "b.value as newNumOfMissingBlocks, a.value as oldNumOfMissingBlocks, b.timestamp as timestamp, b.component as component, " +
+ "b.site as site insert into outputStream;"
+ );
+
+ runtime.addCallback("outputStream", new StreamCallback() {
+ @Override
+ public void receive(Event[] events) {
+ EventPrinter.print(events);
+ }
+ });
+
+ runtime.start();
+ runtime.getInputHandler("hadoopJmxMetricEventStream").send(System.currentTimeMillis(), new Object[]{"namenode", "hadoop.namenode.dfs.missingblocks", "host1", "site1", 12.0, 123000L});
+ runtime.getInputHandler("hadoopJmxMetricEventStream").send(System.currentTimeMillis(), new Object[]{"namenode", "hadoop.namenode.dfs.missingblocks", "host1", "site1", 13.0, 123100L});
+ runtime.getInputHandler("hadoopJmxMetricEventStream").send(System.currentTimeMillis(), new Object[]{"namenode", "hadoop.namenode.dfs.missingblocks", "host1", "site1", 16.0, 123200L});
+
+
+ Thread.sleep(5000);
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/1dffec09/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/nodata/TestNoDataPolicyHandler.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/nodata/TestNoDataPolicyHandler.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/nodata/TestNoDataPolicyHandler.java
index 6c48def..6305da8 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/nodata/TestNoDataPolicyHandler.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/nodata/TestNoDataPolicyHandler.java
@@ -85,7 +85,7 @@ public class TestNoDataPolicyHandler {
PolicyDefinition pd = new PolicyDefinition();
PolicyDefinition.Definition def = new PolicyDefinition.Definition();
def.setValue("PT1M,provided,1,host,host1,host2");
- def.setType("string");
+ def.setType("nodataalert");
pd.setDefinition(def);
pd.setInputStreams(Arrays.asList(inputStream));
pd.setOutputStreams(Arrays.asList(outputStream));
@@ -97,7 +97,7 @@ public class TestNoDataPolicyHandler {
PolicyDefinition pd = new PolicyDefinition();
PolicyDefinition.Definition def = new PolicyDefinition.Definition();
def.setValue("PT1M,dynamic,1,host");
- def.setType("string");
+ def.setType("nodataalert");
pd.setDefinition(def);
pd.setInputStreams(Arrays.asList(inputStream));
pd.setOutputStreams(Arrays.asList(outputStream));
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/1dffec09/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/resources/absence/application-absence.conf
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/resources/absence/application-absence.conf b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/resources/absence/application-absence.conf
new file mode 100644
index 0000000..82e3f15
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/resources/absence/application-absence.conf
@@ -0,0 +1,60 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+{
+ "topology" : {
+ "name" : "alertUnitTopology_1",
+ "numOfTotalWorkers": 20,
+ "numOfSpoutTasks" : 1,
+ "numOfRouterBolts" : 4,
+ "numOfAlertBolts" : 10,
+ "numOfPublishTasks" : 1,
+ "localMode" : "true"
+ },
+ "spout" : {
+ "kafkaBrokerZkQuorum": "sandbox.hortonworks.com:2181",
+ "kafkaBrokerZkBasePath": "/brokers",
+ "stormKafkaUseSameZkQuorumWithKafkaBroker": true,
+ "stormKafkaTransactionZkQuorum": "",
+ "stormKafkaTransactionZkPath": "/consumers",
+ "stormKafkaEagleConsumer": "eagle_consumer",
+ "stormKafkaStateUpdateIntervalMs": 2000,
+ "stormKafkaFetchSizeBytes": 1048586,
+ },
+ "zkConfig" : {
+ "zkQuorum" : "sandbox.hortonworks.com:2181",
+ "zkRoot" : "/alert",
+ "zkSessionTimeoutMs" : 10000,
+ "connectionTimeoutMs" : 10000,
+ "zkRetryTimes" : 3,
+ "zkRetryInterval" : 3000
+ },
+ "dynamicConfigSource" : {
+ "initDelayMillis": 3000,
+ "delayMillis" : 10000
+ },
+ "metadataService": {
+ "context" : "/rest",
+ "host" : "localhost",
+ "port" : 8080
+ },
+ "coordinatorService": {
+ "host": "localhost",
+ "port": "8080",
+ "context" : "/rest"
+ },
+ "kafkaProducer": {
+ "bootstrapServers": "sandbox.hortonworks.com:6667"
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/1dffec09/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/resources/absence/datasources.json
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/resources/absence/datasources.json b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/resources/absence/datasources.json
new file mode 100644
index 0000000..ed4d638
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/resources/absence/datasources.json
@@ -0,0 +1,17 @@
+[
+ {
+ "name": "absenceAlertDataSource",
+ "type": "KAFKA",
+ "properties": {},
+ "topic": "absenceAlertTopic",
+ "schemeCls": "org.apache.eagle.alert.engine.scheme.JsonScheme",
+ "codec": {
+ "streamNameSelectorProp": {
+ "userProvidedStreamName": "noDataAlertStream"
+ },
+ "streamNameSelectorCls": "org.apache.eagle.alert.engine.scheme.JsonStringStreamNameSelector",
+ "timestampColumn": "timestamp",
+ "timestampFormat": ""
+ }
+ }
+]
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/1dffec09/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/resources/absence/policies.json
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/resources/absence/policies.json b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/resources/absence/policies.json
new file mode 100644
index 0000000..a7ce7dc
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/resources/absence/policies.json
@@ -0,0 +1,24 @@
+[
+ {
+ "name": "absenceAlertPolicy",
+ "description": "absenceAlertPolicy",
+ "inputStreams": [
+ "absenceAlertStream"
+ ],
+ "outputStreams": [
+ "absenceAlertStream_out"
+ ],
+ "definition": {
+ "type": "absencealert",
+ "value": "1,jobID,job1,daily_rule,14:00:00,15:00:00"
+ },
+ "partitionSpec": [
+ {
+ "streamId": "absenceAlertStream",
+ "type": "GROUPBY",
+ "columns" : ["jobID"]
+ }
+ ],
+ "parallelismHint": 2
+ }
+]
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/1dffec09/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/resources/absence/publishments.json
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/resources/absence/publishments.json b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/resources/absence/publishments.json
new file mode 100644
index 0000000..6e9260f
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/resources/absence/publishments.json
@@ -0,0 +1,20 @@
+[
+ {
+ "name":"test-stream-output",
+ "type":"org.apache.eagle.alert.engine.publisher.impl.AlertEmailPublisher",
+ "policyIds": [
+ "absenceAlertPolicy"
+ ],
+ "properties": {
+ "subject":"UMP Test Alert",
+ "template":"",
+ "sender": "sender@corp.com",
+ "recipients": "yonzhang@ebay.com",
+ "smtp.server":"atom.corp.ebay.com",
+ "connection": "plaintext",
+ "smtp.port": "25"
+ },
+ "dedupIntervalMin" : "PT5M",
+ "serializer" : "org.apache.eagle.alert.engine.publisher.impl.StringEventSerializer"
+ }
+]
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/1dffec09/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/resources/absence/streamdefinitions.json
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/resources/absence/streamdefinitions.json b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/resources/absence/streamdefinitions.json
new file mode 100644
index 0000000..4bd7319
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/resources/absence/streamdefinitions.json
@@ -0,0 +1,29 @@
+[
+ {
+ "streamId": "absenceAlertStream",
+ "dataSource": "absenceAlertDataSource",
+ "description": "the data stream for testing absence alert",
+ "validate": false,
+ "timeseries": false,
+ "columns": [
+ {
+ "name": "jobID",
+ "type": "STRING",
+ "defaultValue": "",
+ "required": true
+ },
+ {
+ "name": "timestamp",
+ "type": "LONG",
+ "defaultValue": 0,
+ "required": true
+ },
+ {
+ "name": "status",
+ "type": "STRING",
+ "defaultValue": "running",
+ "required": true
+ }
+ ]
+ }
+]
\ No newline at end of file