You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@eagle.apache.org by yo...@apache.org on 2016/07/11 23:53:49 UTC
[1/2] incubator-eagle git commit: EAGLE-370 absence alert engine
absence alert engine
Repository: incubator-eagle
Updated Branches:
refs/heads/develop 0b77d947a -> 1dffec09c
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/1dffec09/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/resources/absence/topologies.json
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/resources/absence/topologies.json b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/resources/absence/topologies.json
new file mode 100644
index 0000000..2726aff
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/resources/absence/topologies.json
@@ -0,0 +1,31 @@
+[
+ {
+ "name": "alertUnitTopology_1",
+ "numOfSpout":1,
+ "numOfAlertBolt": 10,
+ "numOfGroupBolt": 4,
+ "spoutId": "alertEngineSpout",
+ "groupNodeIds" : [
+ "streamRouterBolt0",
+ "streamRouterBolt1",
+ "streamRouterBolt2",
+ "streamRouterBolt3"
+ ],
+ "alertBoltIds": [
+ "alertBolt0",
+ "alertBolt1",
+ "alertBolt2",
+ "alertBolt3",
+ "alertBolt4",
+ "alertBolt5",
+ "alertBolt6",
+ "alertBolt7",
+ "alertBolt8",
+ "alertBolt9"
+ ],
+ "pubBoltId" : "alertPublishBolt",
+ "spoutParallelism": 1,
+ "groupParallelism": 1,
+ "alertParallelism": 1
+ }
+]
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/1dffec09/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/resources/nodata/policies.json
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/resources/nodata/policies.json b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/resources/nodata/policies.json
index 74f3016..a8e4a9c 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/resources/nodata/policies.json
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/resources/nodata/policies.json
@@ -10,12 +10,13 @@
],
"definition": {
"type": "nodataalert",
- "value": "PT1M,dynamic,1,host"
+ "value": "PT1M,dynamic,1,host",
},
"partitionSpec": [
{
"streamId": "noDataAlertStream",
- "type": "GROUPBY"
+ "type": "GROUPBY",
+ "columns" : ["host"]
}
],
"parallelismHint": 2
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/1dffec09/eagle-core/eagle-machinelearning/eagle-machinelearning-base/pom.xml
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-machinelearning/eagle-machinelearning-base/pom.xml b/eagle-core/eagle-machinelearning/eagle-machinelearning-base/pom.xml
deleted file mode 100644
index 390705d..0000000
--- a/eagle-core/eagle-machinelearning/eagle-machinelearning-base/pom.xml
+++ /dev/null
@@ -1,37 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
- ~ Licensed to the Apache Software Foundation (ASF) under one or more
- ~ contributor license agreements. See the NOTICE file distributed with
- ~ this work for additional information regarding copyright ownership.
- ~ The ASF licenses this file to You under the Apache License, Version 2.0
- ~ (the "License"); you may not use this file except in compliance with
- ~ the License. You may obtain a copy of the License at
- ~
- ~ http://www.apache.org/licenses/LICENSE-2.0
- ~
- ~ Unless required by applicable law or agreed to in writing, software
- ~ distributed under the License is distributed on an "AS IS" BASIS,
- ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- ~ See the License for the specific language governing permissions and
- ~ limitations under the License.
- -->
-
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
- <modelVersion>4.0.0</modelVersion>
- <parent>
- <groupId>org.apache.eagle</groupId>
- <artifactId>eagle-machinelearning-parent</artifactId>
- <version>0.5.0-incubating-SNAPSHOT</version>
- <relativePath>../pom.xml</relativePath>
- </parent>
- <artifactId>eagle-machinelearning-base</artifactId>
- <name>eagle-machinelearning-base</name>
- <url>http://maven.apache.org</url>
- <dependencies>
- <dependency>
- <groupId>org.apache.eagle</groupId>
- <artifactId>eagle-alert-process</artifactId>
- <version>${project.version}</version>
- </dependency>
- </dependencies>
-</project>
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/1dffec09/eagle-core/eagle-machinelearning/eagle-machinelearning-base/src/main/java/org/apache/eagle/ml/MLAlgorithmEvaluator.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-machinelearning/eagle-machinelearning-base/src/main/java/org/apache/eagle/ml/MLAlgorithmEvaluator.java b/eagle-core/eagle-machinelearning/eagle-machinelearning-base/src/main/java/org/apache/eagle/ml/MLAlgorithmEvaluator.java
deleted file mode 100644
index 800b6e0..0000000
--- a/eagle-core/eagle-machinelearning/eagle-machinelearning-base/src/main/java/org/apache/eagle/ml/MLAlgorithmEvaluator.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.ml;
-
-import com.typesafe.config.Config;
-import org.apache.eagle.dataproc.core.ValuesArray;
-import org.apache.eagle.ml.model.MLAlgorithm;
-import org.apache.eagle.policy.PolicyEvaluationContext;
-
-/**
- * Machine Learning Algorithm Evaluator
- */
-public interface MLAlgorithmEvaluator {
- /**
- * Prepare Machine learning algorithm
- *
- * @param algorithm MLAlgorithm instance
- */
- public void init(MLAlgorithm algorithm, Config config, PolicyEvaluationContext context);
-
- /**
- * Evaluate input user profile model
- *
- * @param data ValuesArray
- * @throws Exception
- */
- public void evaluate(ValuesArray data) throws Exception;
-
- /**
- * Register callback
- *
- * @param callbackObj MachineLearningCallback
- * @throws Exception
- */
- public void register(MLAnomalyCallback callbackObj) throws Exception;
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/1dffec09/eagle-core/eagle-machinelearning/eagle-machinelearning-base/src/main/java/org/apache/eagle/ml/MLAnomalyCallback.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-machinelearning/eagle-machinelearning-base/src/main/java/org/apache/eagle/ml/MLAnomalyCallback.java b/eagle-core/eagle-machinelearning/eagle-machinelearning-base/src/main/java/org/apache/eagle/ml/MLAnomalyCallback.java
deleted file mode 100644
index 5005c42..0000000
--- a/eagle-core/eagle-machinelearning/eagle-machinelearning-base/src/main/java/org/apache/eagle/ml/MLAnomalyCallback.java
+++ /dev/null
@@ -1,28 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.ml;
-
-import org.apache.eagle.policy.PolicyEvaluationContext;
-import org.apache.eagle.ml.model.MLCallbackResult;
-
-public interface MLAnomalyCallback {
- /**
- * @param callbackResult call-backed result
- * @param alertContext context
- */
- void receive(MLCallbackResult callbackResult,PolicyEvaluationContext alertContext);
-}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/1dffec09/eagle-core/eagle-machinelearning/eagle-machinelearning-base/src/main/java/org/apache/eagle/ml/MLConstants.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-machinelearning/eagle-machinelearning-base/src/main/java/org/apache/eagle/ml/MLConstants.java b/eagle-core/eagle-machinelearning/eagle-machinelearning-base/src/main/java/org/apache/eagle/ml/MLConstants.java
deleted file mode 100644
index 9197d59..0000000
--- a/eagle-core/eagle-machinelearning/eagle-machinelearning-base/src/main/java/org/apache/eagle/ml/MLConstants.java
+++ /dev/null
@@ -1,24 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.eagle.ml;
-
-public class MLConstants {
- public final static String ML_MODEL_SERVICE_NAME = "MLModelService";
- public final static String ALGORITHM = "algorithm";
- public final static String USER = "user";
-}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/1dffec09/eagle-core/eagle-machinelearning/eagle-machinelearning-base/src/main/java/org/apache/eagle/ml/MLModelDAO.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-machinelearning/eagle-machinelearning-base/src/main/java/org/apache/eagle/ml/MLModelDAO.java b/eagle-core/eagle-machinelearning/eagle-machinelearning-base/src/main/java/org/apache/eagle/ml/MLModelDAO.java
deleted file mode 100644
index 13eb7e1..0000000
--- a/eagle-core/eagle-machinelearning/eagle-machinelearning-base/src/main/java/org/apache/eagle/ml/MLModelDAO.java
+++ /dev/null
@@ -1,30 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.ml;
-
-import org.apache.eagle.ml.model.MLModelAPIEntity;
-
-import java.util.List;
-
-public interface MLModelDAO {
- /**
- * @param user
- * @param algorithm
- * @return
- */
- List<MLModelAPIEntity> findMLModelByContext(String user, String algorithm);
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/1dffec09/eagle-core/eagle-machinelearning/eagle-machinelearning-base/src/main/java/org/apache/eagle/ml/MLPolicyEvaluator.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-machinelearning/eagle-machinelearning-base/src/main/java/org/apache/eagle/ml/MLPolicyEvaluator.java b/eagle-core/eagle-machinelearning/eagle-machinelearning-base/src/main/java/org/apache/eagle/ml/MLPolicyEvaluator.java
deleted file mode 100644
index 36bcaec..0000000
--- a/eagle-core/eagle-machinelearning/eagle-machinelearning-base/src/main/java/org/apache/eagle/ml/MLPolicyEvaluator.java
+++ /dev/null
@@ -1,170 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.ml;
-
-import com.typesafe.config.Config;
-import org.apache.commons.lang3.StringUtils;
-import org.apache.eagle.alert.entity.AlertAPIEntity;
-import org.apache.eagle.alert.entity.AlertDefinitionAPIEntity;
-import org.apache.eagle.dataproc.core.JsonSerDeserUtils;
-import org.apache.eagle.dataproc.core.ValuesArray;
-import org.apache.eagle.ml.impl.MLAnomalyCallbackImpl;
-import org.apache.eagle.ml.model.MLAlgorithm;
-import org.apache.eagle.ml.model.MLPolicyDefinition;
-import org.apache.eagle.ml.utils.MLReflectionUtils;
-import org.apache.eagle.policy.PolicyEvaluationContext;
-import org.apache.eagle.policy.PolicyEvaluator;
-import org.apache.eagle.policy.PolicyManager;
-import org.apache.eagle.policy.common.Constants;
-import org.apache.eagle.policy.config.AbstractPolicyDefinition;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.*;
-
-public class MLPolicyEvaluator implements PolicyEvaluator<AlertDefinitionAPIEntity> {
- private static Logger LOG = LoggerFactory.getLogger(MLPolicyEvaluator.class);
- private volatile MLRuntime mlRuntime;
- private Config config;
- private Map<String,String> context;
- private final PolicyEvaluationContext<AlertDefinitionAPIEntity, AlertAPIEntity> evalContext;
-
- private class MLRuntime{
- MLPolicyDefinition mlPolicyDef;
- MLAlgorithmEvaluator[] mlAlgorithmEvaluators;
- List<MLAnomalyCallback> mlAnomalyCallbacks = new ArrayList<>();
- }
-
- public MLPolicyEvaluator(Config config, PolicyEvaluationContext<AlertDefinitionAPIEntity, AlertAPIEntity> evalContext, AbstractPolicyDefinition policyDef, String[] sourceStreams){
- this(config, evalContext, policyDef, sourceStreams, false);
- }
-
- /**
- * needValidation does not take effect for machine learning use case
- * @param policyDef
- * @param sourceStreams
- * @param needValidation
- */
- public MLPolicyEvaluator(Config config, PolicyEvaluationContext<AlertDefinitionAPIEntity, AlertAPIEntity> evalContext, AbstractPolicyDefinition policyDef, String[] sourceStreams, boolean needValidation){
- this.config = config;
- this.evalContext = evalContext;
- LOG.info("Initializing policy named: " + evalContext.policyId);
- this.context = new HashMap<>();
- this.context.put(Constants.SOURCE_STREAMS, StringUtils.join(sourceStreams,","));
- this.init(policyDef);
- }
-
- public void init(AbstractPolicyDefinition policyDef){
- LOG.info("Initializing MLPolicyEvaluator ...");
- try{
- mlRuntime = newMLRuntime((MLPolicyDefinition) policyDef);
- }catch(Exception e){
- LOG.error("ML Runtime creation failed: " + e.getMessage());
- }
- }
-
- private MLRuntime newMLRuntime(MLPolicyDefinition mlPolicyDef) {
- MLRuntime runtime = new MLRuntime();
-
- try{
- runtime.mlPolicyDef = mlPolicyDef;
-
- LOG.info("policydef: " + ((runtime.mlPolicyDef == null)? "policy definition is null": "policy definition is not null"));
- Properties alertContext = runtime.mlPolicyDef.getContext();
- LOG.info("alert context received null? " + ((alertContext == null? "yes": "no")));
- MLAnomalyCallback callbackImpl = new MLAnomalyCallbackImpl(this, config);
- runtime.mlAnomalyCallbacks.add(callbackImpl);
-
- MLAlgorithm[] mlAlgorithms = mlPolicyDef.getAlgorithms();
- runtime.mlAlgorithmEvaluators = new MLAlgorithmEvaluator[mlAlgorithms.length];
- LOG.info("mlAlgorithms size:: " + mlAlgorithms.length);
- int i = 0;
- for(MLAlgorithm algorithm:mlAlgorithms){
- MLAlgorithmEvaluator mlAlgorithmEvaluator = MLReflectionUtils.newMLAlgorithmEvaluator(algorithm);
- mlAlgorithmEvaluator.init(algorithm, config, evalContext);
- runtime.mlAlgorithmEvaluators[i] = mlAlgorithmEvaluator;
- LOG.info("mlAlgorithmEvaluator: " + mlAlgorithmEvaluator.toString());
- mlAlgorithmEvaluator.register(callbackImpl);
- i++;
- }
- }catch(Exception ex){
- LOG.error("Failed to create runtime for policy named: "+this.getPolicyName(),ex);
- }
- return runtime;
- }
-
- @Override
- public void evaluate(ValuesArray data) throws Exception {
- LOG.info("Evaluate called with input: " + data.size());
- synchronized(mlRuntime){
- for(MLAlgorithmEvaluator mlAlgorithm:mlRuntime.mlAlgorithmEvaluators){
- mlAlgorithm.evaluate(data);
- }
- }
- }
-
- @Override
- public void onPolicyUpdate(AlertDefinitionAPIEntity newAlertDef) {
- LOG.info("onPolicyUpdate called");
- AbstractPolicyDefinition policyDef = null;
- try {
- policyDef = JsonSerDeserUtils.deserialize(newAlertDef.getPolicyDef(),
- AbstractPolicyDefinition.class, PolicyManager.getInstance().getPolicyModules(newAlertDef.getTags().get("policyType")));
- } catch (Exception ex) {
- LOG.error("initial policy def error, ", ex);
- }
- MLRuntime previous = mlRuntime;
- mlRuntime = newMLRuntime((MLPolicyDefinition) policyDef);
- synchronized (previous) {
- previous.mlAnomalyCallbacks = null;
- previous.mlAlgorithmEvaluators = null;
- previous.mlPolicyDef = null;
- }
- previous = null;
- }
-
- @Override
- public void onPolicyDelete() {
- LOG.info("onPolicyDelete called");
- MLRuntime previous = mlRuntime;
- synchronized (previous) {
- previous.mlAnomalyCallbacks = null;
- previous.mlAlgorithmEvaluators = null;
- previous.mlPolicyDef = null;
- }
- previous = null;
- }
-
- public String getPolicyName() {
- return evalContext.policyId;
- }
-
- @Override
- public Map<String, String> getAdditionalContext() {
- return this.context;
- }
-
- public List<String> getOutputStreamAttrNameList() {
- return new ArrayList<String>();
- }
-
- @Override
- public boolean isMarkdownEnabled() { return false; }
-
- @Override
- public String getMarkdownReason() { return null; }
-}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/1dffec09/eagle-core/eagle-machinelearning/eagle-machinelearning-base/src/main/java/org/apache/eagle/ml/impl/MLAnomalyCallbackImpl.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-machinelearning/eagle-machinelearning-base/src/main/java/org/apache/eagle/ml/impl/MLAnomalyCallbackImpl.java b/eagle-core/eagle-machinelearning/eagle-machinelearning-base/src/main/java/org/apache/eagle/ml/impl/MLAnomalyCallbackImpl.java
deleted file mode 100644
index 1e8c013..0000000
--- a/eagle-core/eagle-machinelearning/eagle-machinelearning-base/src/main/java/org/apache/eagle/ml/impl/MLAnomalyCallbackImpl.java
+++ /dev/null
@@ -1,107 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.ml.impl;
-
-import java.lang.management.ManagementFactory;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.Map;
-
-import org.apache.eagle.policy.common.Constants;
-import org.apache.eagle.alert.entity.AlertAPIEntity;
-import org.apache.eagle.policy.PolicyEvaluationContext;
-import org.apache.eagle.common.DateTimeUtil;
-import org.apache.eagle.common.config.EagleConfigConstants;
-import org.apache.eagle.common.metric.AlertContext;
-import org.apache.eagle.ml.MLAnomalyCallback;
-import org.apache.eagle.ml.MLPolicyEvaluator;
-import org.apache.eagle.ml.model.MLCallbackResult;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.typesafe.config.Config;
-
-public class MLAnomalyCallbackImpl implements MLAnomalyCallback {
- private static Logger LOG = LoggerFactory.getLogger(MLAnomalyCallbackImpl.class);
- private MLPolicyEvaluator mlAlertEvaluator;
- private Config config;
-
-
- public static final String source = ManagementFactory.getRuntimeMXBean().getName();
-
- public MLAnomalyCallbackImpl(MLPolicyEvaluator mlAlertEvaluator, Config config){
- this.mlAlertEvaluator = mlAlertEvaluator;
- this.config = config;
- }
-
- /**
- * TODO: generate alert
- *
- * @param aResult
- * @param alertContext context
- */
- @Override
- public void receive(MLCallbackResult aResult,PolicyEvaluationContext alertContext) {
- LOG.info("Receive called with : " + aResult.toString());
- AlertAPIEntity alert = renderAlert(aResult,alertContext);
- alertContext.alertExecutor.onEvalEvents(alertContext, Arrays.asList(alert));
- }
-
- private AlertAPIEntity renderAlert(MLCallbackResult aResult,PolicyEvaluationContext alertContext){
- String site = config.getString(EagleConfigConstants.EAGLE_PROPS + "." + EagleConfigConstants.SITE);
- String applicatioin = config.getString(EagleConfigConstants.EAGLE_PROPS + "." + EagleConfigConstants.APPLICATION);
-
- AlertAPIEntity entity = new AlertAPIEntity();
- entity.setDescription(aResult.toString());
-
- Map<String, String> tags = new HashMap<>();
- tags.put(EagleConfigConstants.SITE, site);
- tags.put(EagleConfigConstants.APPLICATION, applicatioin);
- tags.put(Constants.SOURCE_STREAMS, (String)alertContext.evaluator.getAdditionalContext().get(Constants.SOURCE_STREAMS));
- tags.put(Constants.POLICY_ID, alertContext.policyId);
- tags.put(Constants.ALERT_SOURCE, source);
- tags.put(Constants.ALERT_EXECUTOR_ID, alertContext.alertExecutor.getExecutorId());
- entity.setTags(tags);
-
- entity.setTimestamp(aResult.getTimestamp());
-
- AlertContext context = new AlertContext();
-
- if(aResult.getContext() != null) context.addAll(aResult.getContext());
-
- String alertMessage = "Anomaly activities detected by algorithm ["+aResult.getAlgorithmName()+"] with information: " + aResult.toString() ;
- context.addProperty(Constants.ALERT_EVENT, aResult.toString());
- context.addProperty(Constants.ALERT_MESSAGE, alertMessage);
- context.addProperty(Constants.ALERT_TIMESTAMP_PROPERTY, DateTimeUtil.millisecondsToHumanDateWithSeconds(System.currentTimeMillis()));
-
- try {
- site = config.getString("eagleProps.site");
- applicatioin = config.getString("eagleProps.application");
- context.addProperty(EagleConfigConstants.APPLICATION, applicatioin);
- context.addProperty(EagleConfigConstants.SITE, site);
- } catch (Exception ex) {
- LOG.error("site, dataSource not set in config file, ", ex);
- }
-
- context.addProperty(EagleConfigConstants.APPLICATION, applicatioin);
- context.addProperty(EagleConfigConstants.SITE, site);
- context.addProperty(Constants.POLICY_NAME, alertContext.policyId);
-
- entity.setAlertContext(context.toJsonString());
- return entity;
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/1dffec09/eagle-core/eagle-machinelearning/eagle-machinelearning-base/src/main/java/org/apache/eagle/ml/impl/MLModelDAOImpl.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-machinelearning/eagle-machinelearning-base/src/main/java/org/apache/eagle/ml/impl/MLModelDAOImpl.java b/eagle-core/eagle-machinelearning/eagle-machinelearning-base/src/main/java/org/apache/eagle/ml/impl/MLModelDAOImpl.java
deleted file mode 100644
index 1e3067e..0000000
--- a/eagle-core/eagle-machinelearning/eagle-machinelearning-base/src/main/java/org/apache/eagle/ml/impl/MLModelDAOImpl.java
+++ /dev/null
@@ -1,62 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.ml.impl;
-
-import org.apache.eagle.log.entity.GenericServiceAPIResponseEntity;
-import org.apache.eagle.ml.MLConstants;
-import org.apache.eagle.ml.MLModelDAO;
-import org.apache.eagle.ml.model.MLModelAPIEntity;
-import org.apache.eagle.service.client.EagleServiceConnector;
-import org.apache.eagle.service.client.IEagleServiceClient;
-import org.apache.eagle.service.client.impl.EagleServiceClientImpl;
-import org.apache.commons.lang.time.DateUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.List;
-
-public class MLModelDAOImpl implements MLModelDAO {
- private final Logger LOG = LoggerFactory.getLogger(MLModelDAOImpl.class);
- private final EagleServiceConnector connector;
-
- public MLModelDAOImpl(EagleServiceConnector connector){
- this.connector = connector;
- }
-
- @Override
- public List<MLModelAPIEntity> findMLModelByContext(String user, String algorithm) {
- try {
- IEagleServiceClient client = new EagleServiceClientImpl(connector);
- String query = MLConstants.ML_MODEL_SERVICE_NAME + "[@user=\"" + user + "\" AND @algorithm=\""
- + algorithm + "\"]{*}";
- GenericServiceAPIResponseEntity<MLModelAPIEntity> response = client.search().startTime(0)
- .endTime(10 * DateUtils.MILLIS_PER_DAY)
- .pageSize(Integer.MAX_VALUE)
- .query(query)
- .send();
- if(!response.isSuccess()) {
- LOG.error(String.format("Failed to get model for user: %s, algorithm: %s, due to: %s",user,algorithm,response.getException()));
- }
-
- client.close();
- return response.getObj();
- } catch (Exception ex) {
- LOG.info("Got an exception when query machinelearning model service ", ex);
- throw new IllegalStateException(ex);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/1dffec09/eagle-core/eagle-machinelearning/eagle-machinelearning-base/src/main/java/org/apache/eagle/ml/impl/MLPolicyEvaluatorServiceProviderImpl.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-machinelearning/eagle-machinelearning-base/src/main/java/org/apache/eagle/ml/impl/MLPolicyEvaluatorServiceProviderImpl.java b/eagle-core/eagle-machinelearning/eagle-machinelearning-base/src/main/java/org/apache/eagle/ml/impl/MLPolicyEvaluatorServiceProviderImpl.java
deleted file mode 100644
index d0ac75f..0000000
--- a/eagle-core/eagle-machinelearning/eagle-machinelearning-base/src/main/java/org/apache/eagle/ml/impl/MLPolicyEvaluatorServiceProviderImpl.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.ml.impl;
-
-import org.apache.eagle.policy.common.Constants;
-import org.apache.eagle.policy.PolicyEvaluator;
-import org.apache.eagle.policy.PolicyEvaluatorServiceProvider;
-import org.apache.eagle.ml.MLPolicyEvaluator;
-import org.apache.eagle.ml.model.MLPolicyDefinition;
-import com.fasterxml.jackson.databind.Module;
-import com.fasterxml.jackson.databind.jsontype.NamedType;
-import com.fasterxml.jackson.databind.module.SimpleModule;
-
-import java.util.Arrays;
-import java.util.List;
-import java.util.Properties;
-
-public class MLPolicyEvaluatorServiceProviderImpl implements PolicyEvaluatorServiceProvider {
-
- private final static String ALERT_CONTEXT = "alertContext";
-
- @Override
- public String getPolicyType() {
- return Constants.policyType.MachineLearning.name();
- }
-
- @Override
- public Class<? extends PolicyEvaluator> getPolicyEvaluator() {
- return MLPolicyEvaluator.class;
- }
-
- @Override
- public List<Module> getBindingModules() {
- Module module1 = new SimpleModule(Constants.POLICY_DEFINITION).registerSubtypes(new NamedType(MLPolicyDefinition.class, getPolicyType()));
- Module module2 = new SimpleModule(ALERT_CONTEXT).registerSubtypes(new NamedType(Properties.class, getPolicyType()));
- return Arrays.asList(module1, module2);
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/1dffec09/eagle-core/eagle-machinelearning/eagle-machinelearning-base/src/main/java/org/apache/eagle/ml/model/MLAlgorithm.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-machinelearning/eagle-machinelearning-base/src/main/java/org/apache/eagle/ml/model/MLAlgorithm.java b/eagle-core/eagle-machinelearning/eagle-machinelearning-base/src/main/java/org/apache/eagle/ml/model/MLAlgorithm.java
deleted file mode 100644
index 24b4bad..0000000
--- a/eagle-core/eagle-machinelearning/eagle-machinelearning-base/src/main/java/org/apache/eagle/ml/model/MLAlgorithm.java
+++ /dev/null
@@ -1,61 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.ml.model;
-
-import java.io.Serializable;
-
-public class MLAlgorithm implements Serializable {
- private static final long serialVersionUID = -223531147520123L;
-
- public String getName() {
- return name;
- }
-
- public void setName(String name) {
- this.name = name;
- }
-
- private String name;
- private String evaluator;
- private String description;
-
- public String getFeatures() {
- return features;
- }
-
- public void setFeatures(String features) {
- this.features = features;
- }
-
- private String features;
-
- public String getDescription() {
- return description;
- }
-
- public void setDescription(String description) {
- this.description = description;
- }
-
- public String getEvaluator() {
- return evaluator;
- }
-
- public void setEvaluator(String evaluator) {
- this.evaluator = evaluator;
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/1dffec09/eagle-core/eagle-machinelearning/eagle-machinelearning-base/src/main/java/org/apache/eagle/ml/model/MLCallbackResult.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-machinelearning/eagle-machinelearning-base/src/main/java/org/apache/eagle/ml/model/MLCallbackResult.java b/eagle-core/eagle-machinelearning/eagle-machinelearning-base/src/main/java/org/apache/eagle/ml/model/MLCallbackResult.java
deleted file mode 100644
index b457973..0000000
--- a/eagle-core/eagle-machinelearning/eagle-machinelearning-base/src/main/java/org/apache/eagle/ml/model/MLCallbackResult.java
+++ /dev/null
@@ -1,137 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.ml.model;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-
-public class MLCallbackResult {
- private boolean isAnomaly;
- private List<String> feature;
- private double confidence;
- private long timestamp;
- private List<String> datapoints;
- private String id;
- private String algorithmName;
-
- public Map<String, String> getContext() {
- return context;
- }
-
- public void setContext(Map<String, String> context) {
- this.context = context;
- }
-
- public void setAlgorithmName(String algorithmName) {
- this.algorithmName = algorithmName;
- }
-
- private Map<String,String> context;
-
- public String getAlgorithmName() {
- return algorithmName;
- }
- public void setAlgorithm(String algorithmName) {
- this.algorithmName = algorithmName;
- }
- public MLCallbackResult(){
- feature = new ArrayList<String>();
- setDatapoints(new ArrayList<String>());
- }
- public boolean isAnomaly() {
- return isAnomaly;
- }
- public void setAnomaly(boolean isAnomaly) {
- this.isAnomaly = isAnomaly;
- }
- public List<String> getFeature() {
- return feature;
- }
- public void setFeature(List<String> feature) {
- this.feature = feature;
- }
-
- private boolean doesFeatureExist(String f){
- boolean alreadyExist = false;
- for(String s:feature){
- if(s.equalsIgnoreCase(f))
- alreadyExist = true;
- }
- return alreadyExist;
- }
- public void setFeature(String aFeature) {
- if(doesFeatureExist(aFeature) == false)
- feature.add(aFeature);
- }
- public double getConfidence() {
- return confidence;
- }
- public void setConfidence(double confidence) {
- this.confidence = confidence;
- }
- public String toString(){
- StringBuffer resultStr = new StringBuffer();
- resultStr.append("datapoint :<");
- int i=0;
- for(String d:datapoints){
- if(i < datapoints.size()-1){
- resultStr.append(d.trim());
- resultStr.append(",");
- i++;
- }else
- resultStr.append(d.trim());
- }
- resultStr.append("> for user: ").append(id).append(" is anomaly: ").append(this.isAnomaly).append(" at timestamp: ").append(this.timestamp);
- if(this.isAnomaly){
- //resultStr.append(" with confidence: " + this.confidence);
- resultStr.append(" with algorithm: " + algorithmName);
- resultStr.append(" with features: ");
- int p=0;
- resultStr.append("[");
- for(String s:feature){
- if(p < feature.size()-1){
- resultStr.append(s.trim());
- resultStr.append(",");
- p++;
- }else
- resultStr.append(s.trim());
- }
- resultStr.append("]");
- }
- return resultStr.toString();
- }
-
- public long getTimestamp() {
- return timestamp;
- }
- public void setTimestamp(long timestamp) {
- this.timestamp = timestamp;
- }
- public List<String> getDatapoints() {
- return datapoints;
- }
- public void setDatapoints(List<String> datapoints) {
- this.datapoints = datapoints;
- }
- public String getId() {
- return id;
- }
- public void setId(String id) {
- this.id = id;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/1dffec09/eagle-core/eagle-machinelearning/eagle-machinelearning-base/src/main/java/org/apache/eagle/ml/model/MLEntityRepository.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-machinelearning/eagle-machinelearning-base/src/main/java/org/apache/eagle/ml/model/MLEntityRepository.java b/eagle-core/eagle-machinelearning/eagle-machinelearning-base/src/main/java/org/apache/eagle/ml/model/MLEntityRepository.java
deleted file mode 100644
index 883847a..0000000
--- a/eagle-core/eagle-machinelearning/eagle-machinelearning-base/src/main/java/org/apache/eagle/ml/model/MLEntityRepository.java
+++ /dev/null
@@ -1,25 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.ml.model;
-
-import org.apache.eagle.log.entity.repo.EntityRepository;
-
-public class MLEntityRepository extends EntityRepository {
- public MLEntityRepository() {
- entitySet.add(MLModelAPIEntity.class);
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/1dffec09/eagle-core/eagle-machinelearning/eagle-machinelearning-base/src/main/java/org/apache/eagle/ml/model/MLModelAPIEntity.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-machinelearning/eagle-machinelearning-base/src/main/java/org/apache/eagle/ml/model/MLModelAPIEntity.java b/eagle-core/eagle-machinelearning/eagle-machinelearning-base/src/main/java/org/apache/eagle/ml/model/MLModelAPIEntity.java
deleted file mode 100644
index 6f91ca6..0000000
--- a/eagle-core/eagle-machinelearning/eagle-machinelearning-base/src/main/java/org/apache/eagle/ml/model/MLModelAPIEntity.java
+++ /dev/null
@@ -1,67 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.ml.model;
-
-import org.apache.eagle.ml.MLConstants;
-import org.codehaus.jackson.annotate.JsonIgnoreProperties;
-import org.codehaus.jackson.map.annotate.JsonSerialize;
-
-import org.apache.eagle.log.base.taggedlog.TaggedLogAPIEntity;
-import org.apache.eagle.log.entity.meta.Column;
-import org.apache.eagle.log.entity.meta.ColumnFamily;
-import org.apache.eagle.log.entity.meta.Prefix;
-import org.apache.eagle.log.entity.meta.Service;
-import org.apache.eagle.log.entity.meta.Table;
-import org.apache.eagle.log.entity.meta.Tags;
-import org.apache.eagle.log.entity.meta.TimeSeries;
-
-/**
- * DDL for creating the table
- * create 'mlmodel', {NAME => 'f', BLOOMFILTER => 'ROW', VERSIONS => '1', COMPRESSION => 'SNAPPY'}
- */
-@JsonSerialize(include=JsonSerialize.Inclusion.NON_NULL)
-@Table("mlmodel")
-@ColumnFamily("f")
-@Prefix("mlmodel")
-@Service(MLConstants.ML_MODEL_SERVICE_NAME)
-@JsonIgnoreProperties(ignoreUnknown = true)
-@TimeSeries(false)
-@Tags({"site","user", "algorithm"})
-public class MLModelAPIEntity extends TaggedLogAPIEntity{
- @Column("b")
- private String content;
- @Column("c")
- private long version;
-
- public String getContent() {
- return content;
- }
-
- public void setContent(String content) {
- this.content = content;
- valueChanged("content");
- }
-
- public long getVersion() {
- return version;
- }
-
- public void setVersion(long version) {
- this.version = version;
- valueChanged("version");
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/1dffec09/eagle-core/eagle-machinelearning/eagle-machinelearning-base/src/main/java/org/apache/eagle/ml/model/MLPolicyDefinition.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-machinelearning/eagle-machinelearning-base/src/main/java/org/apache/eagle/ml/model/MLPolicyDefinition.java b/eagle-core/eagle-machinelearning/eagle-machinelearning-base/src/main/java/org/apache/eagle/ml/model/MLPolicyDefinition.java
deleted file mode 100644
index 70e0f71..0000000
--- a/eagle-core/eagle-machinelearning/eagle-machinelearning-base/src/main/java/org/apache/eagle/ml/model/MLPolicyDefinition.java
+++ /dev/null
@@ -1,82 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.ml.model;
-
-import org.apache.eagle.policy.config.AbstractPolicyDefinition;
-import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
-import com.fasterxml.jackson.annotation.JsonTypeInfo;
-
-import java.util.Properties;
-
-/**
- {
- "type":"MachineLearning",
- "context":{
- "site":"dev",
- "dataSource":"userprofile"
- "component":"dev-component",
- "description":"ML based user profile anomaly detection",
- "severity":"WARNING",
- "notificationByEmail":"true"
- },
- "algorithms":[
- {
- "name":"eagle.security.userprofile.util.EigenBasedAnomalyDetection",
- "description":"EigenBasedAnomalyDetection",
- "features":"getfileinfo, open, listStatus, setTimes, setPermission, rename, mkdirs, create, setReplication, contentSummary, delete"
- },
- {
- "name":"eagle.security.userprofile.util.KDEBasedAnomalyDetection",
- "description":"DensityBasedAnomalyDetection",
- "features":"getfileinfo, open, listStatus, setTimes, setPermission, rename, mkdirs, create, setReplication, contentSummary, delete"
- }
- ]
-}
- version field is used for model update, so eagle framework can understand that something changes.
-
- */
-@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type", visible=true)
-@JsonIgnoreProperties(ignoreUnknown = true)
-public class MLPolicyDefinition extends AbstractPolicyDefinition{
- private String version;
- private Properties context;
- private MLAlgorithm[] algorithms;
-
- public MLAlgorithm[] getAlgorithms() {
- return algorithms;
- }
-
- public void setAlgorithms(MLAlgorithm[] algorithms) {
- this.algorithms = algorithms;
- }
-
- public String getVersion() {
- return version;
- }
-
- public void setVersion(String version) {
- this.version = version;
- }
-
- public Properties getContext() {
- return context;
- }
-
- public void setContext(Properties context) {
- this.context = context;
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/1dffec09/eagle-core/eagle-machinelearning/eagle-machinelearning-base/src/main/java/org/apache/eagle/ml/utils/MLReflectionUtils.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-machinelearning/eagle-machinelearning-base/src/main/java/org/apache/eagle/ml/utils/MLReflectionUtils.java b/eagle-core/eagle-machinelearning/eagle-machinelearning-base/src/main/java/org/apache/eagle/ml/utils/MLReflectionUtils.java
deleted file mode 100644
index fe70165..0000000
--- a/eagle-core/eagle-machinelearning/eagle-machinelearning-base/src/main/java/org/apache/eagle/ml/utils/MLReflectionUtils.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.ml.utils;
-
-import org.apache.eagle.ml.MLAlgorithmEvaluator;
-import org.apache.eagle.ml.model.MLAlgorithm;
-
-/**
- * @since 8/14/15
- */
-public class MLReflectionUtils {
- /**
- * Create a new MLAlgorithmEvaluator based on MLAlgorithm
- *
- * @param algorithm MLAlgorithm
- * @return MLAlgorithmEvaluator object instance
- * @throws ClassNotFoundException
- * @throws IllegalAccessException
- * @throws InstantiationException
- */
- public static MLAlgorithmEvaluator newMLAlgorithmEvaluator(MLAlgorithm algorithm) throws ClassNotFoundException, IllegalAccessException, InstantiationException {
- return (MLAlgorithmEvaluator) Class.forName(algorithm.getEvaluator()).newInstance();
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/1dffec09/eagle-core/eagle-machinelearning/eagle-machinelearning-base/src/main/resources/META-INF/services/org.apache.eagle.policy.PolicyEvaluatorServiceProvider
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-machinelearning/eagle-machinelearning-base/src/main/resources/META-INF/services/org.apache.eagle.policy.PolicyEvaluatorServiceProvider b/eagle-core/eagle-machinelearning/eagle-machinelearning-base/src/main/resources/META-INF/services/org.apache.eagle.policy.PolicyEvaluatorServiceProvider
deleted file mode 100644
index d7a2754..0000000
--- a/eagle-core/eagle-machinelearning/eagle-machinelearning-base/src/main/resources/META-INF/services/org.apache.eagle.policy.PolicyEvaluatorServiceProvider
+++ /dev/null
@@ -1,16 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-org.apache.eagle.ml.impl.MLPolicyEvaluatorServiceProviderImpl
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/1dffec09/eagle-core/eagle-machinelearning/eagle-machinelearning-base/src/test/resources/META-INF/services/org.apache.eagle.policy.PolicyEvaluatorServiceProvider
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-machinelearning/eagle-machinelearning-base/src/test/resources/META-INF/services/org.apache.eagle.policy.PolicyEvaluatorServiceProvider b/eagle-core/eagle-machinelearning/eagle-machinelearning-base/src/test/resources/META-INF/services/org.apache.eagle.policy.PolicyEvaluatorServiceProvider
deleted file mode 100644
index d7a2754..0000000
--- a/eagle-core/eagle-machinelearning/eagle-machinelearning-base/src/test/resources/META-INF/services/org.apache.eagle.policy.PolicyEvaluatorServiceProvider
+++ /dev/null
@@ -1,16 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-org.apache.eagle.ml.impl.MLPolicyEvaluatorServiceProviderImpl
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/1dffec09/eagle-core/eagle-machinelearning/eagle-machinelearning-base/src/test/resources/application.conf
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-machinelearning/eagle-machinelearning-base/src/test/resources/application.conf b/eagle-core/eagle-machinelearning/eagle-machinelearning-base/src/test/resources/application.conf
deleted file mode 100644
index 2125cbc..0000000
--- a/eagle-core/eagle-machinelearning/eagle-machinelearning-base/src/test/resources/application.conf
+++ /dev/null
@@ -1,57 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-{
- "envContextConfig" : {
- "env" : "storm",
- "topologyName" : "securityLogProcessTopology",
- "mode" : "local",
- "parallelismConfig" : {
- }
- },
- "dataSourceConfig": {
- "flavor" : "stormhdfs",
- "hdfsConnection" : "10.225.92.42:9000",
- "hdfsPath" : "/tmp/user1",
- "copyToPath" : "/Users/user1/Security/hive_files/weeks_file/MLDemo_testing/consolidated",
- "fileFormat" : "CSV",
- "typeOperation" : "user profile generation",
- "userlist": "/UserList.txt",
- "containsFileHeader": true
- },
- "eagleProps" : {
- "application" : "hdfsAuditLog-ML",
- "env" : "test",
- "mail.host" : "mailHost.com",
- "mail.debug" : "true",
- "eagleService": {
- "host": "localhost",
- "port": 9099,
- "username": "admin",
- "password": "secret"
- }
- },
- "alertExecutorConfigs" : {
- "userAnomalousActivityDetectionAlertExecutor" : {
- "parallelism" : 2,
- "partitioner" : "org.apache.eagle.policy.DefaultPolicyPartitioner"
- }
- },
- "dynamicConfigSource" : {
- "eagleServiceHost" : "localhost",
- "eagleServicePort" : "8080",
- "enabled" : "true"
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/1dffec09/eagle-core/eagle-machinelearning/eagle-machinelearning-base/src/test/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-machinelearning/eagle-machinelearning-base/src/test/resources/log4j.properties b/eagle-core/eagle-machinelearning/eagle-machinelearning-base/src/test/resources/log4j.properties
deleted file mode 100644
index 71a5dac..0000000
--- a/eagle-core/eagle-machinelearning/eagle-machinelearning-base/src/test/resources/log4j.properties
+++ /dev/null
@@ -1,34 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-log4j.rootLogger=INFO, DRFA, stdout
-eagle.log.dir=./logs
-eagle.log.file=eagle.log
-
-# standard output
-log4j.appender.stdout=org.apache.log4j.ConsoleAppender
-log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
-log4j.appender.stdout.layout.ConversionPattern=%d{ISO8601} %p [%t] %c{2}[%L]: %m%n
-
-# Daily Rolling File Appender
-log4j.appender.DRFA=org.apache.log4j.DailyRollingFileAppender
-log4j.appender.DRFA.File=${eagle.log.dir}/${eagle.log.file}
-log4j.appender.DRFA.DatePattern=.yyyy-MM-dd
-# 30-day backup
-#log4j.appender.DRFA.MaxBackupIndex=30
-log4j.appender.DRFA.layout=org.apache.log4j.PatternLayout
-
-# Pattern format: Date LogLevel LoggerName LogMessage
-log4j.appender.DRFA.layout.ConversionPattern=%d{ISO8601} %p [%t] %c{2}[%L]: %m%n
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/1dffec09/eagle-core/eagle-machinelearning/eagle-machinelearning-base/src/test/resources/ml-policyDef-UserProfile.txt
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-machinelearning/eagle-machinelearning-base/src/test/resources/ml-policyDef-UserProfile.txt b/eagle-core/eagle-machinelearning/eagle-machinelearning-base/src/test/resources/ml-policyDef-UserProfile.txt
deleted file mode 100644
index 2f4bcb1..0000000
--- a/eagle-core/eagle-machinelearning/eagle-machinelearning-base/src/test/resources/ml-policyDef-UserProfile.txt
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-{
- "type":"MachineLearning",
- "fileTypes":[
- {
- "type":"CSV",
- "containsHeader":"true",
- "fileParserClass":"eagle.ml.CSVFileFormatParser"
- }
- ],
- "alertContext":{
- "cluster":"testCluster",
- "datacenter":"testDataCenter",
- "component":"testComponent",
- "description":"ML based user profile anomaly detection",
- "severity":"WARNING",
- "notificationByEmail":"true"
- },
- "algorithms":[
- {
- "name":"org.apache.eagle.security.userprofile.util.EigenBasedAnomalyDetection",
- "description":"EigenBasedAnomalyDetection",
- "modelPath":"/models/",
- "featureSet":"getfileinfo, open, listStatus, setTimes, setPermission, rename, mkdirs, create, setReplication, contentSummary, delete, setOwner, fsck",
- "type":"CSV"
- },
- {
- "name":"org.apache.eagle.security.userprofile.util.KDEBasedAnomalyDetection",
- "description":"DensityBasedAnomalyDetection",
- "modelPath":"/models/",
- "featureSet":"getfileinfo, open, listStatus, setTimes, setPermission, rename, mkdirs, create, setReplication, contentSummary, delete, setOwner, fsck",
- "type":"CSV"
- }
- ]
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/1dffec09/eagle-core/eagle-machinelearning/pom.xml
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-machinelearning/pom.xml b/eagle-core/eagle-machinelearning/pom.xml
deleted file mode 100644
index e79ebfc..0000000
--- a/eagle-core/eagle-machinelearning/pom.xml
+++ /dev/null
@@ -1,33 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
- ~ Licensed to the Apache Software Foundation (ASF) under one or more
- ~ contributor license agreements. See the NOTICE file distributed with
- ~ this work for additional information regarding copyright ownership.
- ~ The ASF licenses this file to You under the Apache License, Version 2.0
- ~ (the "License"); you may not use this file except in compliance with
- ~ the License. You may obtain a copy of the License at
- ~
- ~ http://www.apache.org/licenses/LICENSE-2.0
- ~
- ~ Unless required by applicable law or agreed to in writing, software
- ~ distributed under the License is distributed on an "AS IS" BASIS,
- ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- ~ See the License for the specific language governing permissions and
- ~ limitations under the License.
- -->
-
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
- <parent>
- <artifactId>eagle-core</artifactId>
- <groupId>org.apache.eagle</groupId>
- <version>0.5.0-incubating-SNAPSHOT</version>
- <relativePath>../pom.xml</relativePath>
- </parent>
- <modelVersion>4.0.0</modelVersion>
-
- <artifactId>eagle-machinelearning-parent</artifactId>
- <packaging>pom</packaging>
- <modules>
- <module>eagle-machinelearning-base</module>
- </modules>
-</project>
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/1dffec09/eagle-core/pom.xml
----------------------------------------------------------------------
diff --git a/eagle-core/pom.xml b/eagle-core/pom.xml
index 9041840..0bcab2a 100644
--- a/eagle-core/pom.xml
+++ b/eagle-core/pom.xml
@@ -39,7 +39,6 @@
<module>eagle-policy</module>
<module>eagle-alert-parent</module>
<module>eagle-query</module>
- <module>eagle-machinelearning</module>
<module>eagle-embed</module>
<module>eagle-metric</module>
<module>eagle-application-management</module>
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/1dffec09/eagle-topology-assembly/src/resources/META-INF/services/org.apache.eagle.policy.PolicyEvaluatorServiceProvider
----------------------------------------------------------------------
diff --git a/eagle-topology-assembly/src/resources/META-INF/services/org.apache.eagle.policy.PolicyEvaluatorServiceProvider b/eagle-topology-assembly/src/resources/META-INF/services/org.apache.eagle.policy.PolicyEvaluatorServiceProvider
index f8794f3..eac2bfd 100644
--- a/eagle-topology-assembly/src/resources/META-INF/services/org.apache.eagle.policy.PolicyEvaluatorServiceProvider
+++ b/eagle-topology-assembly/src/resources/META-INF/services/org.apache.eagle.policy.PolicyEvaluatorServiceProvider
@@ -13,5 +13,4 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-org.apache.eagle.policy.siddhi.SiddhiPolicyEvaluatorServiceProviderImpl
-org.apache.eagle.ml.impl.MLPolicyEvaluatorServiceProviderImpl
\ No newline at end of file
+org.apache.eagle.policy.siddhi.SiddhiPolicyEvaluatorServiceProviderImpl
\ No newline at end of file
[2/2] incubator-eagle git commit: EAGLE-370 absence alert engine
absence alert engine
Posted by yo...@apache.org.
EAGLE-370 absence alert engine
absence alert engine
Author: Yong Zhang <yo...@apache.org>
Reviewer: Yong Zhang
Closes: #262
Project: http://git-wip-us.apache.org/repos/asf/incubator-eagle/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-eagle/commit/1dffec09
Tree: http://git-wip-us.apache.org/repos/asf/incubator-eagle/tree/1dffec09
Diff: http://git-wip-us.apache.org/repos/asf/incubator-eagle/diff/1dffec09
Branch: refs/heads/develop
Commit: 1dffec09ccb084c484e0427d2ce2cb567403cf21
Parents: 0b77d94
Author: yonzhang <yo...@gmail.com>
Authored: Mon Jul 11 16:56:46 2016 -0700
Committer: yonzhang <yo...@gmail.com>
Committed: Mon Jul 11 16:56:46 2016 -0700
----------------------------------------------------------------------
.../engine/evaluator/PolicyStreamHandlers.java | 8 +-
.../evaluator/absence/AbsenceAlertDriver.java | 68 ++++++++
.../evaluator/absence/AbsenceDailyRule.java | 26 +++
.../evaluator/absence/AbsencePolicyHandler.java | 134 +++++++++++++++
.../engine/evaluator/absence/AbsenceRule.java | 23 +++
.../engine/evaluator/absence/AbsenceWindow.java | 38 +++++
.../absence/AbsenceWindowGenerator.java | 50 ++++++
.../absence/AbsenceWindowProcessor.java | 97 +++++++++++
.../impl/DistinctValuesInTimeWindow.java | 141 ---------------
.../nodata/DistinctValuesInTimeWindow.java | 141 +++++++++++++++
.../evaluator/nodata/NoDataPolicyHandler.java | 28 ++-
.../publisher/impl/JsonEventSerializer.java | 71 ++++++++
.../alert/engine/absence/TestAbsenceDriver.java | 96 +++++++++++
.../absence/TestAbsencePolicyHandler.java | 111 ++++++++++++
.../absence/TestAbsenceWindowGenerator.java | 80 +++++++++
.../absence/TestAbsenceWindowProcessor.java | 70 ++++++++
.../engine/e2e/Integration5AbsenceAlert.java | 94 ++++++++++
.../engine/e2e/SampleClient5AbsenceAlert.java | 93 ++++++++++
.../nodata/TestDistinctValuesInTimeWindow.java | 2 +-
.../alert/engine/nodata/TestNoDataAlert.java | 33 +++-
.../engine/nodata/TestNoDataPolicyHandler.java | 4 +-
.../resources/absence/application-absence.conf | 60 +++++++
.../src/test/resources/absence/datasources.json | 17 ++
.../src/test/resources/absence/policies.json | 24 +++
.../test/resources/absence/publishments.json | 20 +++
.../resources/absence/streamdefinitions.json | 29 ++++
.../src/test/resources/absence/topologies.json | 31 ++++
.../src/test/resources/nodata/policies.json | 5 +-
.../eagle-machinelearning-base/pom.xml | 37 ----
.../apache/eagle/ml/MLAlgorithmEvaluator.java | 50 ------
.../org/apache/eagle/ml/MLAnomalyCallback.java | 28 ---
.../java/org/apache/eagle/ml/MLConstants.java | 24 ---
.../java/org/apache/eagle/ml/MLModelDAO.java | 30 ----
.../org/apache/eagle/ml/MLPolicyEvaluator.java | 170 -------------------
.../eagle/ml/impl/MLAnomalyCallbackImpl.java | 107 ------------
.../apache/eagle/ml/impl/MLModelDAOImpl.java | 62 -------
.../MLPolicyEvaluatorServiceProviderImpl.java | 52 ------
.../org/apache/eagle/ml/model/MLAlgorithm.java | 61 -------
.../apache/eagle/ml/model/MLCallbackResult.java | 137 ---------------
.../eagle/ml/model/MLEntityRepository.java | 25 ---
.../apache/eagle/ml/model/MLModelAPIEntity.java | 67 --------
.../eagle/ml/model/MLPolicyDefinition.java | 82 ---------
.../eagle/ml/utils/MLReflectionUtils.java | 38 -----
....eagle.policy.PolicyEvaluatorServiceProvider | 16 --
....eagle.policy.PolicyEvaluatorServiceProvider | 16 --
.../src/test/resources/application.conf | 57 -------
.../src/test/resources/log4j.properties | 34 ----
.../test/resources/ml-policyDef-UserProfile.txt | 51 ------
eagle-core/eagle-machinelearning/pom.xml | 33 ----
eagle-core/pom.xml | 1 -
....eagle.policy.PolicyEvaluatorServiceProvider | 3 +-
51 files changed, 1443 insertions(+), 1332 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/1dffec09/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/PolicyStreamHandlers.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/PolicyStreamHandlers.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/PolicyStreamHandlers.java
index e8f736c..638b240 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/PolicyStreamHandlers.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/PolicyStreamHandlers.java
@@ -19,19 +19,23 @@ package org.apache.eagle.alert.engine.evaluator;
import java.util.Map;
import org.apache.eagle.alert.engine.coordinator.StreamDefinition;
-import org.apache.eagle.alert.engine.evaluator.impl.SiddhiPolicyHandler;
+import org.apache.eagle.alert.engine.evaluator.absence.AbsencePolicyHandler;
import org.apache.eagle.alert.engine.evaluator.nodata.NoDataPolicyHandler;
+import org.apache.eagle.alert.engine.evaluator.impl.SiddhiPolicyHandler;
public class PolicyStreamHandlers {
public static final String SIDDHI_ENGINE ="siddhi";
public static final String NO_DATA_ALERT_ENGINE ="nodataalert";
+ public static final String ABSENCE_ALERT_ENGINE ="absencealert";
public static PolicyStreamHandler createHandler(String type, Map<String, StreamDefinition> sds){
if(SIDDHI_ENGINE.equals(type)) {
return new SiddhiPolicyHandler(sds);
}else if(NO_DATA_ALERT_ENGINE.equals(type)){
return new NoDataPolicyHandler(sds);
+ }else if(ABSENCE_ALERT_ENGINE.equals(type)){
+ return new AbsencePolicyHandler(sds);
}
- throw new IllegalArgumentException("Illegal policy stream handler type: "+type);
+ throw new IllegalArgumentException("Illegal policy stream handler type " + type);
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/1dffec09/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/absence/AbsenceAlertDriver.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/absence/AbsenceAlertDriver.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/absence/AbsenceAlertDriver.java
new file mode 100644
index 0000000..bf142cd
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/absence/AbsenceAlertDriver.java
@@ -0,0 +1,68 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.alert.engine.evaluator.absence;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+
+/**
+ * Since 7/7/16.
+ * this assumes that event comes in time order
+ */
+public class AbsenceAlertDriver {
+ private static final Logger LOG = LoggerFactory.getLogger(AbsenceAlertDriver.class);
+ private List<Object> expectedAttrs;
+ private AbsenceWindowProcessor processor;
+ private AbsenceWindowGenerator windowGenerator;
+
+ public AbsenceAlertDriver(List<Object> expectedAttrs, AbsenceWindowGenerator windowGenerator){
+ this.expectedAttrs = expectedAttrs;
+ this.windowGenerator = windowGenerator;
+ }
+
+ public void process(List<Object> appearAttrs, long occurTime){
+ // initialize window
+ if(processor == null){
+ processor = nextProcessor(occurTime);
+ LOG.info("initialized a new window {}", processor);
+ }
+ processor.process(appearAttrs, occurTime);
+ AbsenceWindowProcessor.OccurStatus status = processor.checkStatus();
+ boolean expired = processor.checkExpired();
+ if(expired){
+ if(status == AbsenceWindowProcessor.OccurStatus.absent){
+ // send alert
+ LOG.info("this is an alert");
+ // figure out next window and set the new window
+ }
+ processor = nextProcessor(occurTime);
+ LOG.info("created a new window {}", processor);
+ }
+ }
+
+ /**
+ * calculate absolute time range based on current timestamp
+ * @param currTime milliseconds
+ * @return
+ */
+ private AbsenceWindowProcessor nextProcessor(long currTime){
+ AbsenceWindow window = windowGenerator.nextWindow(currTime);
+ return new AbsenceWindowProcessor(expectedAttrs, window);
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/1dffec09/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/absence/AbsenceDailyRule.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/absence/AbsenceDailyRule.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/absence/AbsenceDailyRule.java
new file mode 100644
index 0000000..ed50280
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/absence/AbsenceDailyRule.java
@@ -0,0 +1,26 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.alert.engine.evaluator.absence;
+
+/**
+ * Since 7/7/16.
+ */
+public class AbsenceDailyRule implements AbsenceRule {
+ public static final long DAY_MILLI_SECONDS = 86400*1000L;
+ public long startOffset;
+ public long endOffset;
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/1dffec09/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/absence/AbsencePolicyHandler.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/absence/AbsencePolicyHandler.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/absence/AbsencePolicyHandler.java
new file mode 100644
index 0000000..0a07a27
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/absence/AbsencePolicyHandler.java
@@ -0,0 +1,134 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.alert.engine.evaluator.absence;
+
+import org.apache.eagle.alert.engine.Collector;
+import org.apache.eagle.alert.engine.coordinator.PolicyDefinition;
+import org.apache.eagle.alert.engine.coordinator.StreamDefinition;
+import org.apache.eagle.alert.engine.evaluator.PolicyHandlerContext;
+import org.apache.eagle.alert.engine.evaluator.PolicyStreamHandler;
+import org.apache.eagle.alert.engine.model.AlertStreamEvent;
+import org.apache.eagle.alert.engine.model.StreamEvent;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.text.SimpleDateFormat;
+import java.util.*;
+
+/**
+ * Since 7/6/16.
+ * * policy would be like:
+ * {
+ "name": "absenceAlertPolicy",
+ "description": "absenceAlertPolicy",
+ "inputStreams": [
+ "absenceAlertStream"
+ ],
+ "outputStreams": [
+ "absenceAlertStream_out"
+ ],
+ "definition": {
+ "type": "absencealert",
+ "value": "1,jobID,job1,daily_rule,14:00:00,15:00:00"
+ },
+ "partitionSpec": [
+ {
+ "streamId": "absenceAlertStream",
+ "type": "GROUPBY",
+ "columns" : ["jobID"]
+ }
+ ],
+ "parallelismHint": 2
+ }
+ */
+public class AbsencePolicyHandler implements PolicyStreamHandler {
+ private static final Logger LOG = LoggerFactory.getLogger(AbsencePolicyHandler.class);
+ private Map<String, StreamDefinition> sds;
+ private volatile PolicyDefinition policyDef;
+ private volatile Collector<AlertStreamEvent> collector;
+ private volatile PolicyHandlerContext context;
+ private volatile List<Integer> expectFieldIndices = new ArrayList<>();
+ private volatile List<Object> expectValues = new ArrayList<>();
+ private AbsenceAlertDriver driver;
+
+ public AbsencePolicyHandler(Map<String, StreamDefinition> sds){
+ this.sds = sds;
+ }
+
+ @Override
+ public void prepare(Collector<AlertStreamEvent> collector, PolicyHandlerContext context) throws Exception {
+ this.collector = collector;
+ this.context = context;
+ this.policyDef = context.getPolicyDefinition();
+ List<String> inputStreams = policyDef.getInputStreams();
+ // validate inputStreams has to contain only one stream
+ if(inputStreams.size() != 1)
+ throw new IllegalArgumentException("policy inputStream size has to be 1 for absence alert");
+ // validate outputStream has to contain only one stream
+ if(policyDef.getOutputStreams().size() != 1)
+ throw new IllegalArgumentException("policy outputStream size has to be 1 for absense alert");
+
+ String is = inputStreams.get(0);
+ StreamDefinition sd = sds.get(is);
+
+ String policyValue = policyDef.getDefinition().getValue();
+
+ // assume that absence alert policy value consists of "numOfFields, f1_name, f2_name, f1_value, f2_value, absence_window_rule_type, startTimeOffset, endTimeOffset}
+ String[] segments = policyValue.split(",");
+ int offset = 0;
+ // populate wisb field names
+ int numOfFields = Integer.parseInt(segments[offset++]);
+ for(int i = offset; i < offset+numOfFields; i++){
+ String fn = segments[i];
+ expectFieldIndices.add(sd.getColumnIndex(fn));
+ }
+ offset += numOfFields;
+ for(int i = offset; i < offset+numOfFields; i++){
+ String fn = segments[i];
+ expectValues.add(fn);
+ }
+ offset += numOfFields;
+ String absence_window_rule_type = segments[offset++];
+ AbsenceDailyRule rule = new AbsenceDailyRule();
+ SimpleDateFormat sdf = new SimpleDateFormat("HH:mm:ss");
+ sdf.setTimeZone(TimeZone.getTimeZone("UTC"));
+ Date t1 = sdf.parse(segments[offset++]);
+ rule.startOffset = t1.getTime();
+ Date t2 = sdf.parse(segments[offset++]);
+ rule.endOffset = t2.getTime();
+ AbsenceWindowGenerator generator = new AbsenceWindowGenerator(rule);
+ driver = new AbsenceAlertDriver(expectValues, generator);
+ }
+
+ @Override
+ public void send(StreamEvent event) throws Exception {
+ Object[] data = event.getData();
+ List<Object> columnValues = new ArrayList<>();
+ for(int i=0; i<expectFieldIndices.size(); i++){
+ Object o = data[expectFieldIndices.get(i)];
+ // convert value to string
+ columnValues.add(o.toString());
+ }
+
+ driver.process(columnValues, event.getTimestamp());
+ }
+
+ @Override
+ public void close() throws Exception {
+
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/1dffec09/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/absence/AbsenceRule.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/absence/AbsenceRule.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/absence/AbsenceRule.java
new file mode 100644
index 0000000..272d5cf
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/absence/AbsenceRule.java
@@ -0,0 +1,23 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.alert.engine.evaluator.absence;
+
+/**
+ * Since 7/7/16.
+ */
+public interface AbsenceRule {
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/1dffec09/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/absence/AbsenceWindow.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/absence/AbsenceWindow.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/absence/AbsenceWindow.java
new file mode 100644
index 0000000..728e702
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/absence/AbsenceWindow.java
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.alert.engine.evaluator.absence;
+
+import java.text.SimpleDateFormat;
+import java.util.Date;
+import java.util.TimeZone;
+
+/**
+ * Since 7/7/16.
+ */
+public class AbsenceWindow {
+ public long startTime;
+ public long endTime;
+
+ public String toString(){
+ SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
+ sdf.setTimeZone(TimeZone.getTimeZone("UTC"));
+ String t1 = sdf.format(new Date(startTime));
+ String t2 = sdf.format(new Date(endTime));
+ String format = "startTime=%d (%s), endTime=%d (%s)";
+ return String.format(format, startTime, t1, endTime, t2);
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/1dffec09/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/absence/AbsenceWindowGenerator.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/absence/AbsenceWindowGenerator.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/absence/AbsenceWindowGenerator.java
new file mode 100644
index 0000000..6cd0880
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/absence/AbsenceWindowGenerator.java
@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.alert.engine.evaluator.absence;
+
+/**
+ * Since 7/7/16.
+ */
+public class AbsenceWindowGenerator {
+ private AbsenceRule rule;
+ public AbsenceWindowGenerator(AbsenceRule rule){
+ this.rule = rule;
+ }
+
+ /**
+ * @param currTime
+ * @return
+ */
+ public AbsenceWindow nextWindow(long currTime){
+ AbsenceWindow window = new AbsenceWindow();
+ if(rule instanceof AbsenceDailyRule){
+ AbsenceDailyRule r = (AbsenceDailyRule)rule;
+ long adjustment = 0; // if today's window already expires, then adjust to tomorrow's window
+ if(currTime % AbsenceDailyRule.DAY_MILLI_SECONDS > r.startOffset){
+ adjustment = AbsenceDailyRule.DAY_MILLI_SECONDS;
+ }
+ // use current timestamp to round down to day
+ long day = currTime - currTime % AbsenceDailyRule.DAY_MILLI_SECONDS;
+ day += adjustment;
+ window.startTime = day + r.startOffset;
+ window.endTime = day + r.endOffset;
+ return window;
+ }else{
+ throw new UnsupportedOperationException("not supported rule " + rule);
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/1dffec09/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/absence/AbsenceWindowProcessor.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/absence/AbsenceWindowProcessor.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/absence/AbsenceWindowProcessor.java
new file mode 100644
index 0000000..4e8d381
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/absence/AbsenceWindowProcessor.java
@@ -0,0 +1,97 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.alert.engine.evaluator.absence;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+
+/**
+ * Since 7/6/16.
+ * To process each incoming event
+ * internally maintain state machine to trigger alert when some attribute does not occur within this window
+ */
+public class AbsenceWindowProcessor {
+ private static final Logger LOG = LoggerFactory.getLogger(AbsenceWindowProcessor.class);
+ private List<Object> expectAttrs;
+ private AbsenceWindow window;
+ private boolean expired; // to mark if the time range has been went through
+ private OccurStatus status = OccurStatus.not_sure;
+
+ public enum OccurStatus{
+ not_sure,
+ occured,
+ absent
+ }
+
+ public AbsenceWindowProcessor(List<Object> expectAttrs, AbsenceWindow window){
+ this.expectAttrs = expectAttrs;
+ this.window = window;
+ expired = false;
+ }
+
+ /**
+ * return true if it is certain that expected attributes don't occur during startTime and endTime, else return false
+ * @param appearAttrs
+ * @param occurTime
+ * @return
+ */
+ public void process(List<Object> appearAttrs, long occurTime){
+ if(expired)
+ throw new IllegalStateException("Expired window can't recieve events");
+ switch(status) {
+ case not_sure:
+ if(occurTime < window.startTime) {
+ break;
+ }else if(occurTime >= window.startTime &&
+ occurTime <= window.endTime) {
+ if(expectAttrs.equals(appearAttrs)) {
+ status = OccurStatus.occured;
+ }
+ break;
+ }else{
+ status = OccurStatus.absent;
+ break;
+ }
+ case occured:
+ if(occurTime > window.endTime)
+ expired = true;
+ break;
+ default:
+ break;
+ }
+ // reset status
+ if(status == OccurStatus.absent){
+ expired = true;
+ }
+ }
+
+ public OccurStatus checkStatus(){
+ return status;
+ }
+ public boolean checkExpired(){
+ return expired;
+ }
+ public AbsenceWindow currWindow(){
+ return window;
+ }
+
+ public String toString(){
+ return "expectAttrs=" + expectAttrs + ", status=" + status + ", expired=" + expired + ", window=[" + window + "]";
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/1dffec09/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/impl/DistinctValuesInTimeWindow.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/impl/DistinctValuesInTimeWindow.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/impl/DistinctValuesInTimeWindow.java
deleted file mode 100644
index 8a681da..0000000
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/impl/DistinctValuesInTimeWindow.java
+++ /dev/null
@@ -1,141 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.engine.evaluator.impl;
-
-import java.util.Comparator;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.Map;
-import java.util.SortedMap;
-import java.util.TreeMap;
-
-import org.apache.commons.lang.builder.HashCodeBuilder;
-
-/**
- * Since 6/28/16.
- * to get distinct values within a specified time window
- * valueMaxTimeMap : each distinct value is associated with max timestamp it ever had
- * timeSortedMap : map sorted by timestamp first and then value
- * With the above 2 data structure, we can get distinct values in LOG(N)
- */
-public class DistinctValuesInTimeWindow {
- public static class ValueAndTime{
- Object value;
- long timestamp;
- public ValueAndTime(Object value, long timestamp){
- this.value = value;
- this.timestamp = timestamp;
- }
-
- public String toString(){
- return "[" + value + "," + timestamp + "]";
- }
-
- public int hashCode(){
- return new HashCodeBuilder().append(value).append(timestamp).toHashCode();
- }
-
- public boolean equals(Object that){
- if(!(that instanceof ValueAndTime))
- return false;
- ValueAndTime another = (ValueAndTime)that;
- return another.timestamp == this.timestamp && another.value.equals(this.value);
- }
- }
-
- public static class ValueAndTimeComparator implements Comparator<ValueAndTime>{
- @Override
- public int compare(ValueAndTime o1, ValueAndTime o2) {
- if(o1.timestamp != o2.timestamp)
- return (o1.timestamp > o2.timestamp) ? 1 : -1;
- if(o1.value.equals(o2.value))
- return 0;
- else {
- // this is not strictly correct, but I don't want to write too many comparators here :-)
- if(o1.hashCode() > o2.hashCode())
- return 1;
- else
- return -1;
- }
- }
- }
-
- /**
- * map from value to max timestamp for this value
- */
- private Map<Object, Long> valueMaxTimeMap = new HashMap<>();
- /**
- * map sorted by time(max timestamp for the value) and then value
- */
- private SortedMap<ValueAndTime, ValueAndTime> timeSortedMap = new TreeMap<>(new ValueAndTimeComparator());
- private long maxTimestamp = 0L;
- private long window;
- private boolean windowSlided;
-
- /**
- * @param window - milliseconds
- */
- public DistinctValuesInTimeWindow(long window){
- this.window = window;
- }
-
- public void send(Object value, long timestamp){
- ValueAndTime vt = new ValueAndTime(value, timestamp);
-
- // todo think of time out of order
- if(valueMaxTimeMap.containsKey(value)){
- // remove that entry with old timestamp in timeSortedMap
- long oldTime = valueMaxTimeMap.get(value);
- if(oldTime >= timestamp){
- // no any effect as the new timestamp is equal or even less than old timestamp
- return;
- }
- timeSortedMap.remove(new ValueAndTime(value, oldTime));
- }
- // insert entry with new timestamp in timeSortedMap
- timeSortedMap.put(vt, vt);
- // update new timestamp in valueMaxTimeMap
- valueMaxTimeMap.put(value, timestamp);
-
- // evict old entries
- // store max timestamp if possible
- maxTimestamp = Math.max(maxTimestamp, timestamp);
-
- // check if some values should be evicted because of time window
- Iterator<Map.Entry<ValueAndTime, ValueAndTime>> it = timeSortedMap.entrySet().iterator();
- while(it.hasNext()){
- Map.Entry<ValueAndTime, ValueAndTime> entry = it.next();
- if(entry.getKey().timestamp < maxTimestamp - window){
- // should remove the entry in valueMaxTimeMap and timeSortedMap
- valueMaxTimeMap.remove(entry.getKey().value);
- windowSlided = true;
-
- it.remove();
- }else {
- break;
- }
- }
- }
-
- public Map<Object, Long> distinctValues(){
- return valueMaxTimeMap;
- }
-
- public boolean windowSlided(){
- return windowSlided;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/1dffec09/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/nodata/DistinctValuesInTimeWindow.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/nodata/DistinctValuesInTimeWindow.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/nodata/DistinctValuesInTimeWindow.java
new file mode 100644
index 0000000..676357a
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/nodata/DistinctValuesInTimeWindow.java
@@ -0,0 +1,141 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.alert.engine.evaluator.nodata;
+
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.SortedMap;
+import java.util.TreeMap;
+
+import org.apache.commons.lang.builder.HashCodeBuilder;
+
+/**
+ * Since 6/28/16.
+ * to get distinct values within a specified time window
+ * valueMaxTimeMap : each distinct value is associated with max timestamp it ever had
+ * timeSortedMap : map sorted by timestamp first and then value
+ * With the above 2 data structure, we can get distinct values in LOG(N)
+ */
+public class DistinctValuesInTimeWindow {
+ public static class ValueAndTime{
+ Object value;
+ long timestamp;
+ public ValueAndTime(Object value, long timestamp){
+ this.value = value;
+ this.timestamp = timestamp;
+ }
+
+ public String toString(){
+ return "[" + value + "," + timestamp + "]";
+ }
+
+ public int hashCode(){
+ return new HashCodeBuilder().append(value).append(timestamp).toHashCode();
+ }
+
+ public boolean equals(Object that){
+ if(!(that instanceof ValueAndTime))
+ return false;
+ ValueAndTime another = (ValueAndTime)that;
+ return another.timestamp == this.timestamp && another.value.equals(this.value);
+ }
+ }
+
+ public static class ValueAndTimeComparator implements Comparator<ValueAndTime>{
+ @Override
+ public int compare(ValueAndTime o1, ValueAndTime o2) {
+ if(o1.timestamp != o2.timestamp)
+ return (o1.timestamp > o2.timestamp) ? 1 : -1;
+ if(o1.value.equals(o2.value))
+ return 0;
+ else {
+ // this is not strictly correct, but I don't want to write too many comparators here :-)
+ if(o1.hashCode() > o2.hashCode())
+ return 1;
+ else
+ return -1;
+ }
+ }
+ }
+
+ /**
+ * map from value to max timestamp for this value
+ */
+ private Map<Object, Long> valueMaxTimeMap = new HashMap<>();
+ /**
+ * map sorted by time(max timestamp for the value) and then value
+ */
+ private SortedMap<ValueAndTime, ValueAndTime> timeSortedMap = new TreeMap<>(new ValueAndTimeComparator());
+ private long maxTimestamp = 0L;
+ private long window;
+ private boolean windowSlided;
+
+ /**
+ * @param window - milliseconds
+ */
+ public DistinctValuesInTimeWindow(long window){
+ this.window = window;
+ }
+
+ public void send(Object value, long timestamp){
+ ValueAndTime vt = new ValueAndTime(value, timestamp);
+
+ // todo think of time out of order
+ if(valueMaxTimeMap.containsKey(value)){
+ // remove that entry with old timestamp in timeSortedMap
+ long oldTime = valueMaxTimeMap.get(value);
+ if(oldTime >= timestamp){
+ // no any effect as the new timestamp is equal or even less than old timestamp
+ return;
+ }
+ timeSortedMap.remove(new ValueAndTime(value, oldTime));
+ }
+ // insert entry with new timestamp in timeSortedMap
+ timeSortedMap.put(vt, vt);
+ // update new timestamp in valueMaxTimeMap
+ valueMaxTimeMap.put(value, timestamp);
+
+ // evict old entries
+ // store max timestamp if possible
+ maxTimestamp = Math.max(maxTimestamp, timestamp);
+
+ // check if some values should be evicted because of time window
+ Iterator<Map.Entry<ValueAndTime, ValueAndTime>> it = timeSortedMap.entrySet().iterator();
+ while(it.hasNext()){
+ Map.Entry<ValueAndTime, ValueAndTime> entry = it.next();
+ if(entry.getKey().timestamp < maxTimestamp - window){
+ // should remove the entry in valueMaxTimeMap and timeSortedMap
+ valueMaxTimeMap.remove(entry.getKey().value);
+ windowSlided = true;
+
+ it.remove();
+ }else {
+ break;
+ }
+ }
+ }
+
+ public Map<Object, Long> distinctValues(){
+ return valueMaxTimeMap;
+ }
+
+ public boolean windowSlided(){
+ return windowSlided;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/1dffec09/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/nodata/NoDataPolicyHandler.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/nodata/NoDataPolicyHandler.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/nodata/NoDataPolicyHandler.java
index ed13f71..6e5beb6 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/nodata/NoDataPolicyHandler.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/nodata/NoDataPolicyHandler.java
@@ -29,7 +29,6 @@ import org.apache.eagle.alert.engine.coordinator.PolicyDefinition;
import org.apache.eagle.alert.engine.coordinator.StreamDefinition;
import org.apache.eagle.alert.engine.evaluator.PolicyHandlerContext;
import org.apache.eagle.alert.engine.evaluator.PolicyStreamHandler;
-import org.apache.eagle.alert.engine.evaluator.impl.DistinctValuesInTimeWindow;
import org.apache.eagle.alert.engine.model.AlertStreamEvent;
import org.apache.eagle.alert.engine.model.StreamEvent;
import org.apache.eagle.alert.utils.TimePeriodUtils;
@@ -50,6 +49,29 @@ import org.slf4j.LoggerFactory;
* fixed fields and dynamic fields
* fixed fields are leading fields : windowPeriod, type, numOfFields, f1_name, f2_name
* dynamic fields depend on wisb type.
+ *
+ * policy would be like:
+ * {
+ "name": "noDataAlertPolicy",
+ "description": "noDataAlertPolicy",
+ "inputStreams": [
+ "noDataAlertStream"
+ ],
+ "outputStreams": [
+ "noDataAlertStream_out"
+ ],
+ "definition": {
+ "type": "nodataalert",
+ "value": "PT1M,plain,1,host,host1,host2" // or "value": "PT1M,dynamic,1,host"
+ },
+ "partitionSpec": [
+ {
+ "streamId": "noDataAlertStream",
+ "type": "GROUPBY"
+ }
+ ],
+ "parallelismHint": 2
+ }
*/
public class NoDataPolicyHandler implements PolicyStreamHandler{
private static final Logger LOG = LoggerFactory.getLogger(NoDataPolicyHandler.class);
@@ -61,10 +83,10 @@ public class NoDataPolicyHandler implements PolicyStreamHandler{
private volatile List<Integer> wisbFieldIndices = new ArrayList<>();
// reuse PolicyDefinition.defintion.value field to store full set of values separated by comma
private volatile PolicyDefinition policyDef;
- private volatile DistinctValuesInTimeWindow distinctWindow;
private volatile Collector<AlertStreamEvent> collector;
private volatile PolicyHandlerContext context;
private volatile NoDataWisbType wisbType;
+ private volatile DistinctValuesInTimeWindow distinctWindow;
public NoDataPolicyHandler(Map<String, StreamDefinition> sds){
this.sds = sds;
@@ -161,4 +183,4 @@ public class NoDataPolicyHandler implements PolicyStreamHandler{
public void close() throws Exception {
}
-}
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/1dffec09/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/JsonEventSerializer.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/JsonEventSerializer.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/JsonEventSerializer.java
new file mode 100644
index 0000000..bf2a954
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/JsonEventSerializer.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.alert.engine.publisher.impl;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.eagle.alert.engine.codec.IEventSerializer;
+import org.apache.eagle.alert.engine.coordinator.StreamColumn;
+import org.apache.eagle.alert.engine.model.AlertStreamEvent;
+import org.apache.eagle.alert.utils.JsonUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * @since Jul 9, 2016
+ *
+ */
+public class JsonEventSerializer implements IEventSerializer {
+
+ private static final Logger LOG = LoggerFactory.getLogger(JsonEventSerializer.class);
+
+ @SuppressWarnings("rawtypes")
+ public JsonEventSerializer(Map stormConf) throws Exception {
+ }
+
+ @Override
+ public Object serialize(AlertStreamEvent event) {
+ String result = streamEventToJson(event);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("serialized alert event : ", result);
+ }
+ return result;
+ }
+
+ public String streamEventToJson(AlertStreamEvent event) {
+ Map<String, Object> jsonMap = new HashMap<String, Object>();
+ jsonMap.put("policyId", event.getPolicyId());
+ jsonMap.put("streamId", event.getStreamId());
+ jsonMap.put("createBy", event.getCreatedBy());
+ jsonMap.put("createTime", event.getCreatedTime());
+ // data
+ int size = event.getData().length;
+ List<StreamColumn> columns = event.getSchema().getColumns();
+ for (int i = 0; i < size; i++) {
+ if (columns.size() < i) {
+ // redudant check to log inconsistency
+ LOG.error(" strema event data have different lenght compare to column definition! ");
+ } else {
+ jsonMap.put(columns.get(i).getName(), event.getData()[i]);
+ }
+ }
+ return JsonUtils.writeValueAsString(jsonMap);
+ }
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/1dffec09/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/absence/TestAbsenceDriver.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/absence/TestAbsenceDriver.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/absence/TestAbsenceDriver.java
new file mode 100644
index 0000000..ca5bfdf
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/absence/TestAbsenceDriver.java
@@ -0,0 +1,96 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.alert.engine.absence;
+
+import org.apache.eagle.alert.engine.evaluator.absence.AbsenceDailyRule;
+import org.apache.eagle.alert.engine.evaluator.absence.AbsenceAlertDriver;
+import org.apache.eagle.alert.engine.evaluator.absence.AbsenceWindowGenerator;
+import org.junit.Test;
+
+import java.text.DateFormat;
+import java.text.SimpleDateFormat;
+import java.util.Arrays;
+import java.util.Date;
+import java.util.List;
+import java.util.TimeZone;
+
+/**
+ * Since 7/8/16.
+ */
+public class TestAbsenceDriver {
+ @Test
+ public void testAbsence() throws Exception{
+ // from 2PM to 3PM each day
+ AbsenceDailyRule rule = new AbsenceDailyRule();
+ rule.startOffset = 14*3600*1000;
+ rule.endOffset = 15*3600*1000;
+ AbsenceWindowGenerator generator = new AbsenceWindowGenerator(rule);
+ List<Object> expectAttrs = Arrays.asList("host1");
+ AbsenceAlertDriver driver = new AbsenceAlertDriver(expectAttrs, generator);
+
+ // first event came in 2016-07-08 11:20:00
+ String date = "2016-07-08 11:20:00";
+ DateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
+ df.setTimeZone(TimeZone.getTimeZone("UTC"));
+ Date d = df.parse(date);
+ long baseOccurTime = d.getTime();
+
+ // first event
+ driver.process(Arrays.asList("host2"), baseOccurTime);
+ // event after 1 hour
+ driver.process(Arrays.asList("host2"), baseOccurTime + 3600*1000);
+ // event after 2 hour
+ driver.process(Arrays.asList("host2"), baseOccurTime + 2*3600*1000);
+ // event after 3 hour, enter this window
+ driver.process(Arrays.asList("host2"), baseOccurTime + 3*3600*1000);
+ // event after 3.5 hour, still in this window
+ driver.process(Arrays.asList("host2"), baseOccurTime + 3*3600*1000 + 1800*1000);
+ // event after 4 hour, exit this window
+ driver.process(Arrays.asList("host2"), baseOccurTime + 4*3600*1000);
+ }
+
+ @Test
+ public void testOccurrence() throws Exception{
+ // from 2PM to 3PM each day
+ AbsenceDailyRule rule = new AbsenceDailyRule();
+ rule.startOffset = 14*3600*1000;
+ rule.endOffset = 15*3600*1000;
+ AbsenceWindowGenerator generator = new AbsenceWindowGenerator(rule);
+ List<Object> expectAttrs = Arrays.asList("host1");
+ AbsenceAlertDriver driver = new AbsenceAlertDriver(expectAttrs, generator);
+
+ // first event came in 2016-07-08 11:20:00
+ String date = "2016-07-08 11:20:00";
+ DateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
+ df.setTimeZone(TimeZone.getTimeZone("UTC"));
+ Date d = df.parse(date);
+ long baseOccurTime = d.getTime();
+
+ // first event
+ driver.process(Arrays.asList("host2"), baseOccurTime);
+ // event after 1 hour
+ driver.process(Arrays.asList("host2"), baseOccurTime + 3600*1000);
+ // event after 2 hour
+ driver.process(Arrays.asList("host2"), baseOccurTime + 2*3600*1000);
+ // event after 3 hour, enter this window
+ driver.process(Arrays.asList("host2"), baseOccurTime + 3*3600*1000);
+ // event after 3.5 hour, still in this window
+ driver.process(Arrays.asList("host1"), baseOccurTime + 3*3600*1000 + 1800*1000);
+ // event after 4 hour, exit this window
+ driver.process(Arrays.asList("host2"), baseOccurTime + 4*3600*1000);
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/1dffec09/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/absence/TestAbsencePolicyHandler.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/absence/TestAbsencePolicyHandler.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/absence/TestAbsencePolicyHandler.java
new file mode 100644
index 0000000..7f325c4
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/absence/TestAbsencePolicyHandler.java
@@ -0,0 +1,111 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.alert.engine.absence;
+
+import org.apache.eagle.alert.engine.Collector;
+import org.apache.eagle.alert.engine.coordinator.PolicyDefinition;
+import org.apache.eagle.alert.engine.coordinator.StreamColumn;
+import org.apache.eagle.alert.engine.coordinator.StreamDefinition;
+import org.apache.eagle.alert.engine.evaluator.PolicyHandlerContext;
+import org.apache.eagle.alert.engine.evaluator.absence.AbsencePolicyHandler;
+import org.apache.eagle.alert.engine.model.AlertStreamEvent;
+import org.apache.eagle.alert.engine.model.StreamEvent;
+import org.junit.Assert;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Since 7/8/16.
+ */
+public class TestAbsencePolicyHandler {
+ private static final Logger LOG = LoggerFactory.getLogger(TestAbsencePolicyHandler.class);
+ private static final String inputStream = "testInputStream";
+ private static final String outputStream = "testOutputStream";
+
+ @Test
+ public void test() throws Exception{
+ test(buildPolicyDef_provided());
+ }
+
+ public void test(PolicyDefinition pd) throws Exception{
+ Map<String, StreamDefinition> sds = new HashMap<>();
+ StreamDefinition sd = buildStreamDef();
+ sds.put("testInputStream", sd);
+ AbsencePolicyHandler handler = new AbsencePolicyHandler(sds);
+
+ PolicyHandlerContext context = new PolicyHandlerContext();
+ context.setPolicyDefinition(pd);
+ handler.prepare(new TestCollector(), context);
+
+ handler.send(buildStreamEvt(0, "job1", "running"));
+ }
+
+ private static class TestCollector implements Collector {
+ @Override
+ public void emit(Object o) {
+ AlertStreamEvent e = (AlertStreamEvent)o;
+ Object[] data = e.getData();
+ Assert.assertEquals("host2", data[1]);
+ LOG.info(e.toString());
+ }
+ }
+
+ private PolicyDefinition buildPolicyDef_provided(){
+ PolicyDefinition pd = new PolicyDefinition();
+ PolicyDefinition.Definition def = new PolicyDefinition.Definition();
+ def.setValue("1,jobID,job1,daily_rule,14:00:00,15:00:00");
+ def.setType("absencealert");
+ pd.setDefinition(def);
+ pd.setInputStreams(Arrays.asList(inputStream));
+ pd.setOutputStreams(Arrays.asList(outputStream));
+ pd.setName("absencealert-test");
+ return pd;
+ }
+
+ private StreamDefinition buildStreamDef(){
+ StreamDefinition sd = new StreamDefinition();
+ StreamColumn tsColumn = new StreamColumn();
+ tsColumn.setName("timestamp");
+ tsColumn.setType(StreamColumn.Type.LONG);
+
+ StreamColumn hostColumn = new StreamColumn();
+ hostColumn.setName("jobID");
+ hostColumn.setType(StreamColumn.Type.STRING);
+
+ StreamColumn valueColumn = new StreamColumn();
+ valueColumn.setName("status");
+ valueColumn.setType(StreamColumn.Type.STRING);
+
+ sd.setColumns(Arrays.asList(tsColumn, hostColumn, valueColumn));
+ sd.setDataSource("testDataSource");
+ sd.setStreamId("testStreamId");
+ return sd;
+ }
+
+ private StreamEvent buildStreamEvt(long ts, String jobID, String status){
+ StreamEvent e = new StreamEvent();
+ e.setData(new Object[]{ts, jobID, status});
+ e.setStreamId(inputStream);
+ e.setTimestamp(ts);
+ return e;
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/1dffec09/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/absence/TestAbsenceWindowGenerator.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/absence/TestAbsenceWindowGenerator.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/absence/TestAbsenceWindowGenerator.java
new file mode 100644
index 0000000..e2345c9
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/absence/TestAbsenceWindowGenerator.java
@@ -0,0 +1,80 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.alert.engine.absence;
+
+import org.apache.eagle.alert.engine.evaluator.absence.AbsenceDailyRule;
+import org.apache.eagle.alert.engine.evaluator.absence.AbsenceWindow;
+import org.apache.eagle.alert.engine.evaluator.absence.AbsenceWindowGenerator;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.text.DateFormat;
+import java.text.SimpleDateFormat;
+import java.util.Date;
+import java.util.TimeZone;
+
+/**
+ * Since 7/8/16.
+ */
+public class TestAbsenceWindowGenerator {
+ @Test
+ public void testWindowInToday() throws Exception{
+ AbsenceDailyRule rule = new AbsenceDailyRule();
+ // from 2PM to 3PM each day
+ rule.startOffset = 14*3600*1000;
+ rule.endOffset = 15*3600*1000;
+ AbsenceWindowGenerator generator = new AbsenceWindowGenerator(rule);
+
+ // get current time
+ String date = "2016-07-08 00:00:00";
+ DateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
+ df.setTimeZone(TimeZone.getTimeZone("UTC"));
+ Date d = df.parse(date);
+ long startTimeOfDay = d.getTime();
+
+ String currDate = "2016-07-08 11:30:29";
+ df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
+ df.setTimeZone(TimeZone.getTimeZone("UTC"));
+ d = df.parse(currDate);
+ AbsenceWindow window = generator.nextWindow(d.getTime());
+ Assert.assertEquals(startTimeOfDay+rule.startOffset, window.startTime);
+ }
+
+ @Test
+ public void testWindowInTomorrow() throws Exception{
+ AbsenceDailyRule rule = new AbsenceDailyRule();
+ // from 2PM to 3PM each day
+ rule.startOffset = 14*3600*1000;
+ rule.endOffset = 15*3600*1000;
+ AbsenceWindowGenerator generator = new AbsenceWindowGenerator(rule);
+
+ // get current time
+ String date = "2016-07-08 00:00:00";
+ DateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
+ df.setTimeZone(TimeZone.getTimeZone("UTC"));
+ Date d = df.parse(date);
+ long startTimeOfDay = d.getTime();
+
+ String currDate = "2016-07-08 18:20:19";
+ df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
+ df.setTimeZone(TimeZone.getTimeZone("UTC"));
+ d = df.parse(currDate);
+ AbsenceWindow window = generator.nextWindow(d.getTime());
+ // this needs adjustment for one day
+ Assert.assertEquals(startTimeOfDay+rule.startOffset + AbsenceDailyRule.DAY_MILLI_SECONDS, window.startTime);
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/1dffec09/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/absence/TestAbsenceWindowProcessor.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/absence/TestAbsenceWindowProcessor.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/absence/TestAbsenceWindowProcessor.java
new file mode 100644
index 0000000..a47c7a4
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/absence/TestAbsenceWindowProcessor.java
@@ -0,0 +1,70 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.alert.engine.absence;
+
+import org.apache.eagle.alert.engine.evaluator.absence.AbsenceWindow;
+import org.apache.eagle.alert.engine.evaluator.absence.AbsenceWindowProcessor;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.List;
+
+/**
+ * Since 7/6/16.
+ */
+public class TestAbsenceWindowProcessor {
+ @Test
+ public void testDataMissing(){
+ List<Object> expectedHosts = Arrays.asList("host1");
+ AbsenceWindow window = new AbsenceWindow();
+ window.startTime = 100L;
+ window.endTime = 200L;
+ AbsenceWindowProcessor processor = new AbsenceWindowProcessor(expectedHosts, window);
+ processor.process(Arrays.asList("host2"), 90);
+ Assert.assertEquals(processor.checkStatus(), AbsenceWindowProcessor.OccurStatus.not_sure);
+ processor.process(Arrays.asList("host3"), 101);
+ Assert.assertEquals(processor.checkStatus(), AbsenceWindowProcessor.OccurStatus.not_sure);
+ processor.process(Arrays.asList("host3"), 138);
+ Assert.assertEquals(processor.checkStatus(), AbsenceWindowProcessor.OccurStatus.not_sure);
+ processor.process(Arrays.asList("host2"), 189);
+ Assert.assertEquals(processor.checkStatus(), AbsenceWindowProcessor.OccurStatus.not_sure);
+ processor.process(Arrays.asList("host2"), 201);
+ Assert.assertEquals(processor.checkStatus(), AbsenceWindowProcessor.OccurStatus.absent);
+ }
+
+ @Test(expected = IllegalStateException.class)
+ public void testDataExists(){
+ List<Object> expectedHosts = Arrays.asList("host1");
+ AbsenceWindow window = new AbsenceWindow();
+ window.startTime = 100L;
+ window.endTime = 200L;
+ AbsenceWindowProcessor processor = new AbsenceWindowProcessor(expectedHosts, window);
+ processor.process(Arrays.asList("host2"), 90);
+ Assert.assertEquals(processor.checkStatus(), AbsenceWindowProcessor.OccurStatus.not_sure);
+ processor.process(Arrays.asList("host3"), 101);
+ Assert.assertEquals(processor.checkStatus(), AbsenceWindowProcessor.OccurStatus.not_sure);
+ processor.process(Arrays.asList("host1"), 138);
+ Assert.assertEquals(processor.checkStatus(), AbsenceWindowProcessor.OccurStatus.occured);
+ processor.process(Arrays.asList("host2"), 189);
+ Assert.assertEquals(processor.checkStatus(), AbsenceWindowProcessor.OccurStatus.occured);
+ processor.process(Arrays.asList("host2"), 201);
+ Assert.assertEquals(processor.checkStatus(), AbsenceWindowProcessor.OccurStatus.occured);
+ Assert.assertEquals(processor.checkExpired(), true);
+ processor.process(Arrays.asList("host2"), 225);
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/1dffec09/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/e2e/Integration5AbsenceAlert.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/e2e/Integration5AbsenceAlert.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/e2e/Integration5AbsenceAlert.java
new file mode 100644
index 0000000..52d1e5d
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/e2e/Integration5AbsenceAlert.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.alert.engine.e2e;
+
+import backtype.storm.utils.Utils;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+import org.apache.eagle.alert.engine.UnitTopologyMain;
+import org.apache.eagle.alert.utils.KafkaEmbedded;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Ignore;
+import org.junit.Test;
+
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+/**
+ * Since 6/29/16.
+ */
+public class Integration5AbsenceAlert {
+ private String[] args;
+
+ private ExecutorService executors = Executors.newFixedThreadPool(5);
+
+ private static KafkaEmbedded kafka;
+
+ @BeforeClass
+ public static void setup() {
+ // FIXME : start local kafka
+ }
+
+ @AfterClass
+ public static void end() {
+ if (kafka != null) {
+ kafka.shutdown();
+ }
+ }
+ @Test @Ignore
+ public void testTriggerAbsenceAlert() throws Exception{
+ System.setProperty("config.resource", "/absence/application-absence.conf");
+ ConfigFactory.invalidateCaches();
+ Config config = ConfigFactory.load();
+
+ System.out.println("loading metadatas...");
+ Integration1.loadMetadatas("/absence/", config);
+ System.out.println("loading metadatas done!");
+
+
+ executors.submit(() -> {
+ try {
+ UnitTopologyMain.main(args);
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ });
+
+ // wait 20 seconds for topology to bring up
+ try{
+ Thread.sleep(20000);
+ }catch(Exception ex){}
+
+ // send mock data
+ executors.submit(() -> {
+ try {
+ SampleClient5AbsenceAlert.main(args);
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ });
+
+
+ Utils.sleep(1000 * 5l);
+ while (true) {
+ Integration1.proactive_schedule(config);
+
+ Utils.sleep(1000 * 60l * 5);
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/1dffec09/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/e2e/SampleClient5AbsenceAlert.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/e2e/SampleClient5AbsenceAlert.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/e2e/SampleClient5AbsenceAlert.java
new file mode 100644
index 0000000..0256324
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/e2e/SampleClient5AbsenceAlert.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.alert.engine.e2e;
+
+import backtype.storm.utils.Utils;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Properties;
+
+/**
+ * Since 6/29/16.
+ */
+public class SampleClient5AbsenceAlert {
+ private static final Logger LOG = LoggerFactory.getLogger(SampleClient5AbsenceAlert.class);
+ private static long currentTimestamp = 1467240000000L;
+ private static long interval = 3000L;
+ public static void main(String[] args) throws Exception {
+ System.setProperty("config.resource", "/absence/application-absence.conf");
+ ConfigFactory.invalidateCaches();
+
+ Config config = ConfigFactory.load();
+ KafkaProducer producer = createProducer(config);
+ ProducerRecord record = null;
+ record = new ProducerRecord("absenceAlertTopic", createEvent("job1"));
+ producer.send(record);
+ record = new ProducerRecord("absenceAlertTopic", createEvent("job2"));
+ producer.send(record);
+ record = new ProducerRecord("absenceAlertTopic", createEvent("host3"));
+ producer.send(record);
+ }
+
+ private static class AbsenceEvent{
+ @JsonProperty
+ long timestamp;
+ @JsonProperty
+ String jobID;
+ @JsonProperty
+ String status;
+
+ public String toString(){
+ return "timestamp=" + timestamp + ",jobID=" + jobID + ",status=" + status;
+ }
+ }
+
+ private static String createEvent(String jobID) throws Exception{
+ AbsenceEvent e = new AbsenceEvent();
+ long expectTS = currentTimestamp + interval;
+ // adjust back 1 second random
+ long adjust = Math.round(2*Math.random());
+ e.timestamp = expectTS-adjust;
+ e.jobID = jobID;
+ e.status = "running";
+ LOG.info("sending event {} ", e);
+ ObjectMapper mapper = new ObjectMapper();
+ String value = mapper.writeValueAsString(e);
+ return value;
+ }
+
+
+ public static KafkaProducer<String, String> createProducer(Config config) {
+ String servers = config.getString("kafkaProducer.bootstrapServers");
+ Properties configMap = new Properties();
+ configMap.put("bootstrap.servers", servers);
+ configMap.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
+ configMap.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
+ configMap.put("request.required.acks", "1");
+ configMap.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
+ configMap.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
+ KafkaProducer<String, String> proceduer = new KafkaProducer<String, String>(configMap);
+ return proceduer;
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/1dffec09/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/nodata/TestDistinctValuesInTimeWindow.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/nodata/TestDistinctValuesInTimeWindow.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/nodata/TestDistinctValuesInTimeWindow.java
index f97b1a8..27744a4 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/nodata/TestDistinctValuesInTimeWindow.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/nodata/TestDistinctValuesInTimeWindow.java
@@ -21,7 +21,7 @@ import java.util.Map;
import java.util.SortedMap;
import java.util.TreeMap;
-import org.apache.eagle.alert.engine.evaluator.impl.DistinctValuesInTimeWindow;
+import org.apache.eagle.alert.engine.evaluator.nodata.DistinctValuesInTimeWindow;
import org.junit.Test;
/**
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/1dffec09/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/nodata/TestNoDataAlert.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/nodata/TestNoDataAlert.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/nodata/TestNoDataAlert.java
index 569a3b0..f50ad15 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/nodata/TestNoDataAlert.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/nodata/TestNoDataAlert.java
@@ -80,4 +80,35 @@ public class TestNoDataAlert {
// }
// Thread.sleep(10000);
}
-}
+
+ /**
+ * only alert when the successive 2 events has number of missing blocks changed
+ *from every a = hadoopJmxMetricEventStream[ component=="namenode" and metric == "hadoop.namenode.dfs.missingblocks"] -> b = hadoopJmxMetricEventStream[b.component==a.component and b.metric==a.metric and b.host==a.host and convert(b.value, "long") > convert(a.value, "long") ] select b.metric as metric, b.host as host, b.value as newNumOfMissingBlocks, a.value as oldNumOfMissingBlocks, b.timestamp as timestamp, b.component as component, b.site as site insert into tmp;
+ */
+ @Test
+ public void testMissingBlock() throws Exception{
+ ExecutionPlanRuntime runtime = new SiddhiManager().createExecutionPlanRuntime(
+ "define stream hadoopJmxMetricEventStream (component string, metric string, host string, site string, value double, timestamp long);"+
+ "from every a = hadoopJmxMetricEventStream[ component==\"namenode\" and metric == \"hadoop.namenode.dfs.missingblocks\"] -> "+
+ "b = hadoopJmxMetricEventStream[b.component==a.component and b.metric==a.metric and b.host==a.host and "+
+ "convert(b.value, \"long\") > convert(a.value, \"long\") ] select b.metric as metric, b.host as host, "+
+ "b.value as newNumOfMissingBlocks, a.value as oldNumOfMissingBlocks, b.timestamp as timestamp, b.component as component, " +
+ "b.site as site insert into outputStream;"
+ );
+
+ runtime.addCallback("outputStream", new StreamCallback() {
+ @Override
+ public void receive(Event[] events) {
+ EventPrinter.print(events);
+ }
+ });
+
+ runtime.start();
+ runtime.getInputHandler("hadoopJmxMetricEventStream").send(System.currentTimeMillis(), new Object[]{"namenode", "hadoop.namenode.dfs.missingblocks", "host1", "site1", 12.0, 123000L});
+ runtime.getInputHandler("hadoopJmxMetricEventStream").send(System.currentTimeMillis(), new Object[]{"namenode", "hadoop.namenode.dfs.missingblocks", "host1", "site1", 13.0, 123100L});
+ runtime.getInputHandler("hadoopJmxMetricEventStream").send(System.currentTimeMillis(), new Object[]{"namenode", "hadoop.namenode.dfs.missingblocks", "host1", "site1", 16.0, 123200L});
+
+
+ Thread.sleep(5000);
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/1dffec09/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/nodata/TestNoDataPolicyHandler.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/nodata/TestNoDataPolicyHandler.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/nodata/TestNoDataPolicyHandler.java
index 6c48def..6305da8 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/nodata/TestNoDataPolicyHandler.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/nodata/TestNoDataPolicyHandler.java
@@ -85,7 +85,7 @@ public class TestNoDataPolicyHandler {
PolicyDefinition pd = new PolicyDefinition();
PolicyDefinition.Definition def = new PolicyDefinition.Definition();
def.setValue("PT1M,provided,1,host,host1,host2");
- def.setType("string");
+ def.setType("nodataalert");
pd.setDefinition(def);
pd.setInputStreams(Arrays.asList(inputStream));
pd.setOutputStreams(Arrays.asList(outputStream));
@@ -97,7 +97,7 @@ public class TestNoDataPolicyHandler {
PolicyDefinition pd = new PolicyDefinition();
PolicyDefinition.Definition def = new PolicyDefinition.Definition();
def.setValue("PT1M,dynamic,1,host");
- def.setType("string");
+ def.setType("nodataalert");
pd.setDefinition(def);
pd.setInputStreams(Arrays.asList(inputStream));
pd.setOutputStreams(Arrays.asList(outputStream));
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/1dffec09/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/resources/absence/application-absence.conf
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/resources/absence/application-absence.conf b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/resources/absence/application-absence.conf
new file mode 100644
index 0000000..82e3f15
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/resources/absence/application-absence.conf
@@ -0,0 +1,60 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+{
+ "topology" : {
+ "name" : "alertUnitTopology_1",
+ "numOfTotalWorkers": 20,
+ "numOfSpoutTasks" : 1,
+ "numOfRouterBolts" : 4,
+ "numOfAlertBolts" : 10,
+ "numOfPublishTasks" : 1,
+ "localMode" : "true"
+ },
+ "spout" : {
+ "kafkaBrokerZkQuorum": "sandbox.hortonworks.com:2181",
+ "kafkaBrokerZkBasePath": "/brokers",
+ "stormKafkaUseSameZkQuorumWithKafkaBroker": true,
+ "stormKafkaTransactionZkQuorum": "",
+ "stormKafkaTransactionZkPath": "/consumers",
+ "stormKafkaEagleConsumer": "eagle_consumer",
+ "stormKafkaStateUpdateIntervalMs": 2000,
+ "stormKafkaFetchSizeBytes": 1048586,
+ },
+ "zkConfig" : {
+ "zkQuorum" : "sandbox.hortonworks.com:2181",
+ "zkRoot" : "/alert",
+ "zkSessionTimeoutMs" : 10000,
+ "connectionTimeoutMs" : 10000,
+ "zkRetryTimes" : 3,
+ "zkRetryInterval" : 3000
+ },
+ "dynamicConfigSource" : {
+ "initDelayMillis": 3000,
+ "delayMillis" : 10000
+ },
+ "metadataService": {
+ "context" : "/rest",
+ "host" : "localhost",
+ "port" : 8080
+ },
+ "coordinatorService": {
+ "host": "localhost",
+ "port": "8080",
+ "context" : "/rest"
+ },
+ "kafkaProducer": {
+ "bootstrapServers": "sandbox.hortonworks.com:6667"
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/1dffec09/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/resources/absence/datasources.json
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/resources/absence/datasources.json b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/resources/absence/datasources.json
new file mode 100644
index 0000000..ed4d638
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/resources/absence/datasources.json
@@ -0,0 +1,17 @@
+[
+ {
+ "name": "absenceAlertDataSource",
+ "type": "KAFKA",
+ "properties": {},
+ "topic": "absenceAlertTopic",
+ "schemeCls": "org.apache.eagle.alert.engine.scheme.JsonScheme",
+ "codec": {
+ "streamNameSelectorProp": {
+ "userProvidedStreamName": "noDataAlertStream"
+ },
+ "streamNameSelectorCls": "org.apache.eagle.alert.engine.scheme.JsonStringStreamNameSelector",
+ "timestampColumn": "timestamp",
+ "timestampFormat": ""
+ }
+ }
+]
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/1dffec09/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/resources/absence/policies.json
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/resources/absence/policies.json b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/resources/absence/policies.json
new file mode 100644
index 0000000..a7ce7dc
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/resources/absence/policies.json
@@ -0,0 +1,24 @@
+[
+ {
+ "name": "absenceAlertPolicy",
+ "description": "absenceAlertPolicy",
+ "inputStreams": [
+ "absenceAlertStream"
+ ],
+ "outputStreams": [
+ "absenceAlertStream_out"
+ ],
+ "definition": {
+ "type": "absencealert",
+ "value": "1,jobID,job1,daily_rule,14:00:00,15:00:00"
+ },
+ "partitionSpec": [
+ {
+ "streamId": "absenceAlertStream",
+ "type": "GROUPBY",
+ "columns" : ["jobID"]
+ }
+ ],
+ "parallelismHint": 2
+ }
+]
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/1dffec09/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/resources/absence/publishments.json
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/resources/absence/publishments.json b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/resources/absence/publishments.json
new file mode 100644
index 0000000..6e9260f
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/resources/absence/publishments.json
@@ -0,0 +1,20 @@
+[
+ {
+ "name":"test-stream-output",
+ "type":"org.apache.eagle.alert.engine.publisher.impl.AlertEmailPublisher",
+ "policyIds": [
+ "absenceAlertPolicy"
+ ],
+ "properties": {
+ "subject":"UMP Test Alert",
+ "template":"",
+ "sender": "sender@corp.com",
+ "recipients": "yonzhang@ebay.com",
+ "smtp.server":"atom.corp.ebay.com",
+ "connection": "plaintext",
+ "smtp.port": "25"
+ },
+ "dedupIntervalMin" : "PT5M",
+ "serializer" : "org.apache.eagle.alert.engine.publisher.impl.StringEventSerializer"
+ }
+]
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/1dffec09/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/resources/absence/streamdefinitions.json
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/resources/absence/streamdefinitions.json b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/resources/absence/streamdefinitions.json
new file mode 100644
index 0000000..4bd7319
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/resources/absence/streamdefinitions.json
@@ -0,0 +1,29 @@
+[
+ {
+ "streamId": "absenceAlertStream",
+ "dataSource": "absenceAlertDataSource",
+ "description": "the data stream for testing absence alert",
+ "validate": false,
+ "timeseries": false,
+ "columns": [
+ {
+ "name": "jobID",
+ "type": "STRING",
+ "defaultValue": "",
+ "required": true
+ },
+ {
+ "name": "timestamp",
+ "type": "LONG",
+ "defaultValue": 0,
+ "required": true
+ },
+ {
+ "name": "status",
+ "type": "STRING",
+ "defaultValue": "running",
+ "required": true
+ }
+ ]
+ }
+]
\ No newline at end of file