You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@eagle.apache.org by yo...@apache.org on 2016/07/11 23:53:49 UTC

[1/2] incubator-eagle git commit: EAGLE-370 absence alert engine absence alert engine

Repository: incubator-eagle
Updated Branches:
  refs/heads/develop 0b77d947a -> 1dffec09c


http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/1dffec09/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/resources/absence/topologies.json
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/resources/absence/topologies.json b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/resources/absence/topologies.json
new file mode 100644
index 0000000..2726aff
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/resources/absence/topologies.json
@@ -0,0 +1,31 @@
+[
+  {
+    "name": "alertUnitTopology_1",
+    "numOfSpout":1,
+    "numOfAlertBolt": 10,
+    "numOfGroupBolt": 4,
+    "spoutId": "alertEngineSpout",
+    "groupNodeIds" : [
+      "streamRouterBolt0",
+      "streamRouterBolt1",
+      "streamRouterBolt2",
+      "streamRouterBolt3"
+    ],
+    "alertBoltIds": [
+      "alertBolt0",
+      "alertBolt1",
+      "alertBolt2",
+      "alertBolt3",
+      "alertBolt4",
+      "alertBolt5",
+      "alertBolt6",
+      "alertBolt7",
+      "alertBolt8",
+      "alertBolt9"
+    ],
+    "pubBoltId" : "alertPublishBolt",
+    "spoutParallelism": 1,
+    "groupParallelism": 1,
+    "alertParallelism": 1
+  }
+]
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/1dffec09/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/resources/nodata/policies.json
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/resources/nodata/policies.json b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/resources/nodata/policies.json
index 74f3016..a8e4a9c 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/resources/nodata/policies.json
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/resources/nodata/policies.json
@@ -10,12 +10,13 @@
 		],
 		"definition": {
 			"type": "nodataalert",
-			"value": "PT1M,dynamic,1,host"
+			"value": "PT1M,dynamic,1,host",
 		},
 		"partitionSpec": [
 			{
 				"streamId": "noDataAlertStream",
-				"type": "GROUPBY"
+				"type": "GROUPBY",
+				"columns" : ["host"]
 			}
 		],
 		"parallelismHint": 2

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/1dffec09/eagle-core/eagle-machinelearning/eagle-machinelearning-base/pom.xml
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-machinelearning/eagle-machinelearning-base/pom.xml b/eagle-core/eagle-machinelearning/eagle-machinelearning-base/pom.xml
deleted file mode 100644
index 390705d..0000000
--- a/eagle-core/eagle-machinelearning/eagle-machinelearning-base/pom.xml
+++ /dev/null
@@ -1,37 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
-  ~ Licensed to the Apache Software Foundation (ASF) under one or more
-  ~ contributor license agreements.  See the NOTICE file distributed with
-  ~ this work for additional information regarding copyright ownership.
-  ~ The ASF licenses this file to You under the Apache License, Version 2.0
-  ~ (the "License"); you may not use this file except in compliance with
-  ~ the License.  You may obtain a copy of the License at
-  ~
-  ~    http://www.apache.org/licenses/LICENSE-2.0
-  ~
-  ~ Unless required by applicable law or agreed to in writing, software
-  ~ distributed under the License is distributed on an "AS IS" BASIS,
-  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-  ~ See the License for the specific language governing permissions and
-  ~ limitations under the License.
-  -->
-
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
-  <modelVersion>4.0.0</modelVersion>
-  <parent>
-    <groupId>org.apache.eagle</groupId>
-    <artifactId>eagle-machinelearning-parent</artifactId>
-    <version>0.5.0-incubating-SNAPSHOT</version>
-    <relativePath>../pom.xml</relativePath>
-  </parent>
-  <artifactId>eagle-machinelearning-base</artifactId>
-  <name>eagle-machinelearning-base</name>
-  <url>http://maven.apache.org</url>
-  <dependencies>
-    <dependency>
-    	<groupId>org.apache.eagle</groupId>
-    	<artifactId>eagle-alert-process</artifactId>
-    	<version>${project.version}</version>
-    </dependency>
-  </dependencies>
-</project>

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/1dffec09/eagle-core/eagle-machinelearning/eagle-machinelearning-base/src/main/java/org/apache/eagle/ml/MLAlgorithmEvaluator.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-machinelearning/eagle-machinelearning-base/src/main/java/org/apache/eagle/ml/MLAlgorithmEvaluator.java b/eagle-core/eagle-machinelearning/eagle-machinelearning-base/src/main/java/org/apache/eagle/ml/MLAlgorithmEvaluator.java
deleted file mode 100644
index 800b6e0..0000000
--- a/eagle-core/eagle-machinelearning/eagle-machinelearning-base/src/main/java/org/apache/eagle/ml/MLAlgorithmEvaluator.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.ml;
-
-import com.typesafe.config.Config;
-import org.apache.eagle.dataproc.core.ValuesArray;
-import org.apache.eagle.ml.model.MLAlgorithm;
-import org.apache.eagle.policy.PolicyEvaluationContext;
-
-/**
- * Machine Learning Algorithm Evaluator
- */
-public interface MLAlgorithmEvaluator {
-    /**
-     * Prepare Machine learning algorithm
-     *
-     * @param algorithm MLAlgorithm instance
-     */
-    public void init(MLAlgorithm algorithm, Config config, PolicyEvaluationContext context);
-
-    /**
-     * Evaluate input user profile model
-     *
-     * @param data ValuesArray
-     * @throws Exception
-     */
-	public void evaluate(ValuesArray data) throws Exception;
-
-    /**
-     * Register callback
-     *
-     * @param callbackObj MachineLearningCallback
-     * @throws Exception
-     */
-	public void register(MLAnomalyCallback callbackObj) throws Exception;
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/1dffec09/eagle-core/eagle-machinelearning/eagle-machinelearning-base/src/main/java/org/apache/eagle/ml/MLAnomalyCallback.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-machinelearning/eagle-machinelearning-base/src/main/java/org/apache/eagle/ml/MLAnomalyCallback.java b/eagle-core/eagle-machinelearning/eagle-machinelearning-base/src/main/java/org/apache/eagle/ml/MLAnomalyCallback.java
deleted file mode 100644
index 5005c42..0000000
--- a/eagle-core/eagle-machinelearning/eagle-machinelearning-base/src/main/java/org/apache/eagle/ml/MLAnomalyCallback.java
+++ /dev/null
@@ -1,28 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.ml;
-
-import org.apache.eagle.policy.PolicyEvaluationContext;
-import org.apache.eagle.ml.model.MLCallbackResult;
-
-public interface MLAnomalyCallback {
-    /**
-     * @param callbackResult call-backed result
-     * @param alertContext context
-     */
-	void receive(MLCallbackResult callbackResult,PolicyEvaluationContext alertContext);
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/1dffec09/eagle-core/eagle-machinelearning/eagle-machinelearning-base/src/main/java/org/apache/eagle/ml/MLConstants.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-machinelearning/eagle-machinelearning-base/src/main/java/org/apache/eagle/ml/MLConstants.java b/eagle-core/eagle-machinelearning/eagle-machinelearning-base/src/main/java/org/apache/eagle/ml/MLConstants.java
deleted file mode 100644
index 9197d59..0000000
--- a/eagle-core/eagle-machinelearning/eagle-machinelearning-base/src/main/java/org/apache/eagle/ml/MLConstants.java
+++ /dev/null
@@ -1,24 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.eagle.ml;
-
-public class MLConstants {
-	public final static String ML_MODEL_SERVICE_NAME = "MLModelService";
-	public final static String ALGORITHM = "algorithm";
-	public final static String USER = "user";
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/1dffec09/eagle-core/eagle-machinelearning/eagle-machinelearning-base/src/main/java/org/apache/eagle/ml/MLModelDAO.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-machinelearning/eagle-machinelearning-base/src/main/java/org/apache/eagle/ml/MLModelDAO.java b/eagle-core/eagle-machinelearning/eagle-machinelearning-base/src/main/java/org/apache/eagle/ml/MLModelDAO.java
deleted file mode 100644
index 13eb7e1..0000000
--- a/eagle-core/eagle-machinelearning/eagle-machinelearning-base/src/main/java/org/apache/eagle/ml/MLModelDAO.java
+++ /dev/null
@@ -1,30 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.ml;
-
-import org.apache.eagle.ml.model.MLModelAPIEntity;
-
-import java.util.List;
-
-public interface MLModelDAO {
-    /**
-     * @param user
-     * @param algorithm
-     * @return
-     */
-	List<MLModelAPIEntity> findMLModelByContext(String user, String algorithm);
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/1dffec09/eagle-core/eagle-machinelearning/eagle-machinelearning-base/src/main/java/org/apache/eagle/ml/MLPolicyEvaluator.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-machinelearning/eagle-machinelearning-base/src/main/java/org/apache/eagle/ml/MLPolicyEvaluator.java b/eagle-core/eagle-machinelearning/eagle-machinelearning-base/src/main/java/org/apache/eagle/ml/MLPolicyEvaluator.java
deleted file mode 100644
index 36bcaec..0000000
--- a/eagle-core/eagle-machinelearning/eagle-machinelearning-base/src/main/java/org/apache/eagle/ml/MLPolicyEvaluator.java
+++ /dev/null
@@ -1,170 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.ml;
-
-import com.typesafe.config.Config;
-import org.apache.commons.lang3.StringUtils;
-import org.apache.eagle.alert.entity.AlertAPIEntity;
-import org.apache.eagle.alert.entity.AlertDefinitionAPIEntity;
-import org.apache.eagle.dataproc.core.JsonSerDeserUtils;
-import org.apache.eagle.dataproc.core.ValuesArray;
-import org.apache.eagle.ml.impl.MLAnomalyCallbackImpl;
-import org.apache.eagle.ml.model.MLAlgorithm;
-import org.apache.eagle.ml.model.MLPolicyDefinition;
-import org.apache.eagle.ml.utils.MLReflectionUtils;
-import org.apache.eagle.policy.PolicyEvaluationContext;
-import org.apache.eagle.policy.PolicyEvaluator;
-import org.apache.eagle.policy.PolicyManager;
-import org.apache.eagle.policy.common.Constants;
-import org.apache.eagle.policy.config.AbstractPolicyDefinition;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.*;
-
-public class MLPolicyEvaluator implements PolicyEvaluator<AlertDefinitionAPIEntity> {
-	private static Logger LOG = LoggerFactory.getLogger(MLPolicyEvaluator.class);
-    private volatile MLRuntime mlRuntime;
-	private Config config;
-	private Map<String,String> context;
-	private final PolicyEvaluationContext<AlertDefinitionAPIEntity, AlertAPIEntity> evalContext;
-
-	private class MLRuntime{
-		MLPolicyDefinition mlPolicyDef;
-		MLAlgorithmEvaluator[] mlAlgorithmEvaluators;
-		List<MLAnomalyCallback> mlAnomalyCallbacks = new ArrayList<>();
-	}
-
-	public MLPolicyEvaluator(Config config, PolicyEvaluationContext<AlertDefinitionAPIEntity, AlertAPIEntity> evalContext, AbstractPolicyDefinition policyDef, String[] sourceStreams){
-		this(config, evalContext, policyDef, sourceStreams, false);
-	}
-
-	/**
-	 * needValidation does not take effect for machine learning use case
-	 * @param policyDef
-	 * @param sourceStreams
-	 * @param needValidation
-	 */
-	public MLPolicyEvaluator(Config config, PolicyEvaluationContext<AlertDefinitionAPIEntity, AlertAPIEntity> evalContext, AbstractPolicyDefinition policyDef, String[] sourceStreams, boolean needValidation){
-		this.config = config;
-		this.evalContext = evalContext;
-        LOG.info("Initializing policy named: " + evalContext.policyId);
-        this.context = new HashMap<>();
-        this.context.put(Constants.SOURCE_STREAMS, StringUtils.join(sourceStreams,","));
-		this.init(policyDef);
-	}
-
-	public void init(AbstractPolicyDefinition policyDef){
-		LOG.info("Initializing MLPolicyEvaluator ...");
-		try{
-			mlRuntime = newMLRuntime((MLPolicyDefinition) policyDef);
-		}catch(Exception e){
-			LOG.error("ML Runtime creation failed: " + e.getMessage());
-		}
-	}
-
-	private MLRuntime newMLRuntime(MLPolicyDefinition mlPolicyDef) {
-		MLRuntime runtime = new MLRuntime();
-
-		try{
-			runtime.mlPolicyDef = mlPolicyDef;
-
-			LOG.info("policydef: " + ((runtime.mlPolicyDef == null)? "policy definition is null": "policy definition is not null"));
-			Properties alertContext = runtime.mlPolicyDef.getContext();
-			LOG.info("alert context received null? " + ((alertContext == null? "yes": "no")));
-			MLAnomalyCallback callbackImpl = new MLAnomalyCallbackImpl(this, config);
-			runtime.mlAnomalyCallbacks.add(callbackImpl);
-
-			MLAlgorithm[] mlAlgorithms = mlPolicyDef.getAlgorithms();
-			runtime.mlAlgorithmEvaluators = new MLAlgorithmEvaluator[mlAlgorithms.length];
-			LOG.info("mlAlgorithms size:: " + mlAlgorithms.length);
-			int i = 0; 
-			for(MLAlgorithm algorithm:mlAlgorithms){
-                MLAlgorithmEvaluator mlAlgorithmEvaluator = MLReflectionUtils.newMLAlgorithmEvaluator(algorithm);
-                mlAlgorithmEvaluator.init(algorithm, config, evalContext);
-				runtime.mlAlgorithmEvaluators[i] =  mlAlgorithmEvaluator;
-				LOG.info("mlAlgorithmEvaluator: " + mlAlgorithmEvaluator.toString());
-						mlAlgorithmEvaluator.register(callbackImpl);
-				i++;
-			}
-		}catch(Exception ex){
-            LOG.error("Failed to create runtime for policy named: "+this.getPolicyName(),ex);
-		}
-		return runtime;
-	}
-	
-	@Override
-	public void evaluate(ValuesArray data) throws Exception {
-		LOG.info("Evaluate called with input: " + data.size());
-		synchronized(mlRuntime){
-			for(MLAlgorithmEvaluator mlAlgorithm:mlRuntime.mlAlgorithmEvaluators){
-				mlAlgorithm.evaluate(data);
-			}
-		}
-	}
-
-	@Override
-	public void onPolicyUpdate(AlertDefinitionAPIEntity newAlertDef) {
-		LOG.info("onPolicyUpdate called");
-		AbstractPolicyDefinition policyDef = null;		
-		try {
-			policyDef = JsonSerDeserUtils.deserialize(newAlertDef.getPolicyDef(), 
-					AbstractPolicyDefinition.class, PolicyManager.getInstance().getPolicyModules(newAlertDef.getTags().get("policyType")));
-		} catch (Exception ex) {
-			LOG.error("initial policy def error, ", ex);
-		}
-		MLRuntime previous = mlRuntime;
-		mlRuntime = newMLRuntime((MLPolicyDefinition) policyDef);
-		synchronized (previous) {
-			previous.mlAnomalyCallbacks = null;
-			previous.mlAlgorithmEvaluators = null;
-			previous.mlPolicyDef = null; 
-		}
-		previous = null;
-	}
-
-	@Override
-	public void onPolicyDelete() {
-		LOG.info("onPolicyDelete called");
-		MLRuntime previous = mlRuntime;
-		synchronized (previous) {
-			previous.mlAnomalyCallbacks = null;
-			previous.mlAlgorithmEvaluators = null;
-			previous.mlPolicyDef = null; 
-		}
-		previous = null;
-	}
-
-    public String getPolicyName() {
-		return evalContext.policyId;
-	}
-	
-	@Override
-	public Map<String, String> getAdditionalContext() {
-		return this.context;
-	}
-	
-	public List<String> getOutputStreamAttrNameList() {
-		return new ArrayList<String>();
-	}
-
-	@Override
-	public boolean isMarkdownEnabled() { return false; }
-
-	@Override
-	public String getMarkdownReason() { return null; }
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/1dffec09/eagle-core/eagle-machinelearning/eagle-machinelearning-base/src/main/java/org/apache/eagle/ml/impl/MLAnomalyCallbackImpl.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-machinelearning/eagle-machinelearning-base/src/main/java/org/apache/eagle/ml/impl/MLAnomalyCallbackImpl.java b/eagle-core/eagle-machinelearning/eagle-machinelearning-base/src/main/java/org/apache/eagle/ml/impl/MLAnomalyCallbackImpl.java
deleted file mode 100644
index 1e8c013..0000000
--- a/eagle-core/eagle-machinelearning/eagle-machinelearning-base/src/main/java/org/apache/eagle/ml/impl/MLAnomalyCallbackImpl.java
+++ /dev/null
@@ -1,107 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.ml.impl;
-
-import java.lang.management.ManagementFactory;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.Map;
-
-import org.apache.eagle.policy.common.Constants;
-import org.apache.eagle.alert.entity.AlertAPIEntity;
-import org.apache.eagle.policy.PolicyEvaluationContext;
-import org.apache.eagle.common.DateTimeUtil;
-import org.apache.eagle.common.config.EagleConfigConstants;
-import org.apache.eagle.common.metric.AlertContext;
-import org.apache.eagle.ml.MLAnomalyCallback;
-import org.apache.eagle.ml.MLPolicyEvaluator;
-import org.apache.eagle.ml.model.MLCallbackResult;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.typesafe.config.Config;
-
-public class MLAnomalyCallbackImpl implements MLAnomalyCallback {
-	private static Logger LOG = LoggerFactory.getLogger(MLAnomalyCallbackImpl.class);
-	private MLPolicyEvaluator mlAlertEvaluator;
-	private Config config;
-
-
-    public static final String source = ManagementFactory.getRuntimeMXBean().getName();
-	
-	public MLAnomalyCallbackImpl(MLPolicyEvaluator mlAlertEvaluator, Config config){
-		this.mlAlertEvaluator = mlAlertEvaluator;
-		this.config = config;
-	}
-
-    /**
-     * TODO: generate alert
-     *
-     * @param aResult
-     * @param alertContext context
-     */
-	@Override
-	public void receive(MLCallbackResult aResult,PolicyEvaluationContext alertContext) {
-		LOG.info("Receive called with : " + aResult.toString());
-        AlertAPIEntity alert = renderAlert(aResult,alertContext);
-        alertContext.alertExecutor.onEvalEvents(alertContext, Arrays.asList(alert));
-	}
-
-    private AlertAPIEntity renderAlert(MLCallbackResult aResult,PolicyEvaluationContext alertContext){
-        String site = config.getString(EagleConfigConstants.EAGLE_PROPS + "." + EagleConfigConstants.SITE);
-        String applicatioin = config.getString(EagleConfigConstants.EAGLE_PROPS + "." + EagleConfigConstants.APPLICATION);
-
-        AlertAPIEntity entity = new AlertAPIEntity();
-        entity.setDescription(aResult.toString());
-
-        Map<String, String> tags = new HashMap<>();
-        tags.put(EagleConfigConstants.SITE, site);
-        tags.put(EagleConfigConstants.APPLICATION, applicatioin);
-        tags.put(Constants.SOURCE_STREAMS, (String)alertContext.evaluator.getAdditionalContext().get(Constants.SOURCE_STREAMS));
-        tags.put(Constants.POLICY_ID, alertContext.policyId);
-        tags.put(Constants.ALERT_SOURCE, source);
-        tags.put(Constants.ALERT_EXECUTOR_ID, alertContext.alertExecutor.getExecutorId());
-        entity.setTags(tags);
-
-        entity.setTimestamp(aResult.getTimestamp());
-
-        AlertContext context = new AlertContext();
-
-        if(aResult.getContext() != null) context.addAll(aResult.getContext());
-
-        String alertMessage = "Anomaly activities detected by algorithm ["+aResult.getAlgorithmName()+"] with information: " + aResult.toString() ;
-        context.addProperty(Constants.ALERT_EVENT, aResult.toString());
-        context.addProperty(Constants.ALERT_MESSAGE, alertMessage);
-        context.addProperty(Constants.ALERT_TIMESTAMP_PROPERTY, DateTimeUtil.millisecondsToHumanDateWithSeconds(System.currentTimeMillis()));
-
-        try {
-            site = config.getString("eagleProps.site");
-            applicatioin = config.getString("eagleProps.application");
-            context.addProperty(EagleConfigConstants.APPLICATION, applicatioin);
-            context.addProperty(EagleConfigConstants.SITE, site);
-        } catch (Exception ex) {
-            LOG.error("site, dataSource not set in config file, ", ex);
-        }
-
-        context.addProperty(EagleConfigConstants.APPLICATION, applicatioin);
-        context.addProperty(EagleConfigConstants.SITE, site);
-        context.addProperty(Constants.POLICY_NAME, alertContext.policyId);
-
-        entity.setAlertContext(context.toJsonString());
-        return entity;
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/1dffec09/eagle-core/eagle-machinelearning/eagle-machinelearning-base/src/main/java/org/apache/eagle/ml/impl/MLModelDAOImpl.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-machinelearning/eagle-machinelearning-base/src/main/java/org/apache/eagle/ml/impl/MLModelDAOImpl.java b/eagle-core/eagle-machinelearning/eagle-machinelearning-base/src/main/java/org/apache/eagle/ml/impl/MLModelDAOImpl.java
deleted file mode 100644
index 1e3067e..0000000
--- a/eagle-core/eagle-machinelearning/eagle-machinelearning-base/src/main/java/org/apache/eagle/ml/impl/MLModelDAOImpl.java
+++ /dev/null
@@ -1,62 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.ml.impl;
-
-import org.apache.eagle.log.entity.GenericServiceAPIResponseEntity;
-import org.apache.eagle.ml.MLConstants;
-import org.apache.eagle.ml.MLModelDAO;
-import org.apache.eagle.ml.model.MLModelAPIEntity;
-import org.apache.eagle.service.client.EagleServiceConnector;
-import org.apache.eagle.service.client.IEagleServiceClient;
-import org.apache.eagle.service.client.impl.EagleServiceClientImpl;
-import org.apache.commons.lang.time.DateUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.List;
-
-public class MLModelDAOImpl implements MLModelDAO {
-	private final Logger LOG = LoggerFactory.getLogger(MLModelDAOImpl.class);
-	private final EagleServiceConnector connector;
-
-	public MLModelDAOImpl(EagleServiceConnector connector){
-		this.connector = connector;
-	}
-
-	@Override
-	public List<MLModelAPIEntity> findMLModelByContext(String user, String algorithm) {
-		try {
-			IEagleServiceClient client = new EagleServiceClientImpl(connector);
-			String query = MLConstants.ML_MODEL_SERVICE_NAME + "[@user=\"" + user + "\" AND @algorithm=\""
-						+ algorithm + "\"]{*}";
-			GenericServiceAPIResponseEntity<MLModelAPIEntity> response =  client.search().startTime(0)
-																		                 .endTime(10 * DateUtils.MILLIS_PER_DAY)
-																		                 .pageSize(Integer.MAX_VALUE)
-																		                 .query(query)
-																	                     .send();
-            if(!response.isSuccess()) {
-                LOG.error(String.format("Failed to get model for user: %s, algorithm: %s, due to: %s",user,algorithm,response.getException()));
-            }
-
-            client.close();
-            return response.getObj();
-		} catch (Exception ex) {
-			LOG.info("Got an exception when query machinelearning model service ", ex);
-			throw new IllegalStateException(ex);
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/1dffec09/eagle-core/eagle-machinelearning/eagle-machinelearning-base/src/main/java/org/apache/eagle/ml/impl/MLPolicyEvaluatorServiceProviderImpl.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-machinelearning/eagle-machinelearning-base/src/main/java/org/apache/eagle/ml/impl/MLPolicyEvaluatorServiceProviderImpl.java b/eagle-core/eagle-machinelearning/eagle-machinelearning-base/src/main/java/org/apache/eagle/ml/impl/MLPolicyEvaluatorServiceProviderImpl.java
deleted file mode 100644
index d0ac75f..0000000
--- a/eagle-core/eagle-machinelearning/eagle-machinelearning-base/src/main/java/org/apache/eagle/ml/impl/MLPolicyEvaluatorServiceProviderImpl.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.ml.impl;
-
-import org.apache.eagle.policy.common.Constants;
-import org.apache.eagle.policy.PolicyEvaluator;
-import org.apache.eagle.policy.PolicyEvaluatorServiceProvider;
-import org.apache.eagle.ml.MLPolicyEvaluator;
-import org.apache.eagle.ml.model.MLPolicyDefinition;
-import com.fasterxml.jackson.databind.Module;
-import com.fasterxml.jackson.databind.jsontype.NamedType;
-import com.fasterxml.jackson.databind.module.SimpleModule;
-
-import java.util.Arrays;
-import java.util.List;
-import java.util.Properties;
-
-public class MLPolicyEvaluatorServiceProviderImpl implements PolicyEvaluatorServiceProvider {
-
-	private final static String ALERT_CONTEXT = "alertContext";
-
-	@Override
-	public String getPolicyType() {
-		return Constants.policyType.MachineLearning.name();
-	}
-
-	@Override
-	public Class<? extends PolicyEvaluator> getPolicyEvaluator() {
-		return MLPolicyEvaluator.class;
-	}
-
-	@Override
-	public List<Module> getBindingModules() {
-		Module module1 = new SimpleModule(Constants.POLICY_DEFINITION).registerSubtypes(new NamedType(MLPolicyDefinition.class, getPolicyType()));
-		Module module2 = new SimpleModule(ALERT_CONTEXT).registerSubtypes(new NamedType(Properties.class, getPolicyType()));
-		return Arrays.asList(module1, module2);
-	}
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/1dffec09/eagle-core/eagle-machinelearning/eagle-machinelearning-base/src/main/java/org/apache/eagle/ml/model/MLAlgorithm.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-machinelearning/eagle-machinelearning-base/src/main/java/org/apache/eagle/ml/model/MLAlgorithm.java b/eagle-core/eagle-machinelearning/eagle-machinelearning-base/src/main/java/org/apache/eagle/ml/model/MLAlgorithm.java
deleted file mode 100644
index 24b4bad..0000000
--- a/eagle-core/eagle-machinelearning/eagle-machinelearning-base/src/main/java/org/apache/eagle/ml/model/MLAlgorithm.java
+++ /dev/null
@@ -1,61 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.ml.model;
-
-import java.io.Serializable;
-
-public class MLAlgorithm implements Serializable {
-    private static final long serialVersionUID = -223531147520123L;
-
-    public String getName() {
-        return name;
-    }
-
-    public void setName(String name) {
-        this.name = name;
-    }
-
-    private String name;
-    private String evaluator;
-    private String description;
-
-    public String getFeatures() {
-        return features;
-    }
-
-    public void setFeatures(String features) {
-        this.features = features;
-    }
-
-    private String features;
-
-    public String getDescription() {
-        return description;
-    }
-
-    public void setDescription(String description) {
-        this.description = description;
-    }
-
-    public String getEvaluator() {
-        return evaluator;
-    }
-
-    public void setEvaluator(String evaluator) {
-        this.evaluator = evaluator;
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/1dffec09/eagle-core/eagle-machinelearning/eagle-machinelearning-base/src/main/java/org/apache/eagle/ml/model/MLCallbackResult.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-machinelearning/eagle-machinelearning-base/src/main/java/org/apache/eagle/ml/model/MLCallbackResult.java b/eagle-core/eagle-machinelearning/eagle-machinelearning-base/src/main/java/org/apache/eagle/ml/model/MLCallbackResult.java
deleted file mode 100644
index b457973..0000000
--- a/eagle-core/eagle-machinelearning/eagle-machinelearning-base/src/main/java/org/apache/eagle/ml/model/MLCallbackResult.java
+++ /dev/null
@@ -1,137 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.ml.model;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-
-public class MLCallbackResult {
-	private boolean isAnomaly;
-	private List<String> feature;
-	private double confidence;
-	private long timestamp;
-	private List<String> datapoints;
-	private String id;
-	private String algorithmName;
-
-    public Map<String, String> getContext() {
-        return context;
-    }
-
-    public void setContext(Map<String, String> context) {
-        this.context = context;
-    }
-
-    public void setAlgorithmName(String algorithmName) {
-        this.algorithmName = algorithmName;
-    }
-
-    private Map<String,String> context;
-
-	public String getAlgorithmName() {
-		return algorithmName;
-	}
-	public void setAlgorithm(String algorithmName) {
-		this.algorithmName = algorithmName;
-	}
-	public MLCallbackResult(){
-		feature = new ArrayList<String>();
-		setDatapoints(new ArrayList<String>());
-	}
-	public boolean isAnomaly() {
-		return isAnomaly;
-	}
-	public void setAnomaly(boolean isAnomaly) {
-		this.isAnomaly = isAnomaly;
-	}
-	public List<String> getFeature() {
-		return feature;
-	}
-	public void setFeature(List<String> feature) {
-		this.feature = feature;
-	}
-
-	private boolean doesFeatureExist(String f){
-		boolean alreadyExist = false;
-		for(String s:feature){
-			if(s.equalsIgnoreCase(f))
-				alreadyExist = true;
-		}
-		return alreadyExist;
-	}
-	public void setFeature(String aFeature) {
-		if(doesFeatureExist(aFeature) == false)
-			feature.add(aFeature);
-	}
-	public double getConfidence() {
-		return confidence;
-	}
-	public void setConfidence(double confidence) {
-		this.confidence = confidence;
-	} 
-	public String toString(){
-		StringBuffer resultStr = new StringBuffer(); 
-		resultStr.append("datapoint :<");
-		int i=0; 
-		for(String d:datapoints){
-			if(i < datapoints.size()-1){
-				resultStr.append(d.trim()); 
-				resultStr.append(",");
-				i++;
-			}else
-				resultStr.append(d.trim());
-		}
-		resultStr.append("> for user: ").append(id).append(" is anomaly: ").append(this.isAnomaly).append(" at timestamp: ").append(this.timestamp);
-		if(this.isAnomaly){
-			//resultStr.append(" with confidence: " + this.confidence);
-			resultStr.append(" with algorithm: " + algorithmName);
-			resultStr.append(" with features: ");
-			int p=0;
-			resultStr.append("["); 
-			for(String s:feature){
-				if(p < feature.size()-1){
-					resultStr.append(s.trim());
-					resultStr.append(",");
-					p++;
-				}else
-					resultStr.append(s.trim());
-			}
-			resultStr.append("]");
-		}
-		return resultStr.toString();
-	}
-	
-	public long getTimestamp() {
-		return timestamp;
-	}
-	public void setTimestamp(long timestamp) {
-		this.timestamp = timestamp;
-	}
-	public List<String> getDatapoints() {
-		return datapoints;
-	}
-	public void setDatapoints(List<String> datapoints) {
-		this.datapoints = datapoints;
-	}
-	public String getId() {
-		return id;
-	}
-	public void setId(String id) {
-		this.id = id;
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/1dffec09/eagle-core/eagle-machinelearning/eagle-machinelearning-base/src/main/java/org/apache/eagle/ml/model/MLEntityRepository.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-machinelearning/eagle-machinelearning-base/src/main/java/org/apache/eagle/ml/model/MLEntityRepository.java b/eagle-core/eagle-machinelearning/eagle-machinelearning-base/src/main/java/org/apache/eagle/ml/model/MLEntityRepository.java
deleted file mode 100644
index 883847a..0000000
--- a/eagle-core/eagle-machinelearning/eagle-machinelearning-base/src/main/java/org/apache/eagle/ml/model/MLEntityRepository.java
+++ /dev/null
@@ -1,25 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.ml.model;
-
-import org.apache.eagle.log.entity.repo.EntityRepository;
-
-public class MLEntityRepository extends EntityRepository {
-	public MLEntityRepository() {
-		entitySet.add(MLModelAPIEntity.class);
-	}
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/1dffec09/eagle-core/eagle-machinelearning/eagle-machinelearning-base/src/main/java/org/apache/eagle/ml/model/MLModelAPIEntity.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-machinelearning/eagle-machinelearning-base/src/main/java/org/apache/eagle/ml/model/MLModelAPIEntity.java b/eagle-core/eagle-machinelearning/eagle-machinelearning-base/src/main/java/org/apache/eagle/ml/model/MLModelAPIEntity.java
deleted file mode 100644
index 6f91ca6..0000000
--- a/eagle-core/eagle-machinelearning/eagle-machinelearning-base/src/main/java/org/apache/eagle/ml/model/MLModelAPIEntity.java
+++ /dev/null
@@ -1,67 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.ml.model;
-
-import org.apache.eagle.ml.MLConstants;
-import org.codehaus.jackson.annotate.JsonIgnoreProperties;
-import org.codehaus.jackson.map.annotate.JsonSerialize;
-
-import org.apache.eagle.log.base.taggedlog.TaggedLogAPIEntity;
-import org.apache.eagle.log.entity.meta.Column;
-import org.apache.eagle.log.entity.meta.ColumnFamily;
-import org.apache.eagle.log.entity.meta.Prefix;
-import org.apache.eagle.log.entity.meta.Service;
-import org.apache.eagle.log.entity.meta.Table;
-import org.apache.eagle.log.entity.meta.Tags;
-import org.apache.eagle.log.entity.meta.TimeSeries;
-
-/**
- * DDL for creating the table
- * create 'mlmodel', {NAME => 'f', BLOOMFILTER => 'ROW', VERSIONS => '1', COMPRESSION => 'SNAPPY'}
- */
-@JsonSerialize(include=JsonSerialize.Inclusion.NON_NULL)
-@Table("mlmodel")
-@ColumnFamily("f")
-@Prefix("mlmodel")
-@Service(MLConstants.ML_MODEL_SERVICE_NAME)
-@JsonIgnoreProperties(ignoreUnknown = true)
-@TimeSeries(false)
-@Tags({"site","user", "algorithm"})
-public class MLModelAPIEntity extends TaggedLogAPIEntity{
-	@Column("b")
-	private String content;
-    @Column("c")
-    private long version;
-
-    public String getContent() {
-		return content;
-	}
-
-	public void setContent(String content) {
-		this.content = content;
-		valueChanged("content");
-	}
-
-    public long getVersion() {
-        return version;
-    }
-
-    public void setVersion(long version) {
-        this.version = version;
-        valueChanged("version");
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/1dffec09/eagle-core/eagle-machinelearning/eagle-machinelearning-base/src/main/java/org/apache/eagle/ml/model/MLPolicyDefinition.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-machinelearning/eagle-machinelearning-base/src/main/java/org/apache/eagle/ml/model/MLPolicyDefinition.java b/eagle-core/eagle-machinelearning/eagle-machinelearning-base/src/main/java/org/apache/eagle/ml/model/MLPolicyDefinition.java
deleted file mode 100644
index 70e0f71..0000000
--- a/eagle-core/eagle-machinelearning/eagle-machinelearning-base/src/main/java/org/apache/eagle/ml/model/MLPolicyDefinition.java
+++ /dev/null
@@ -1,82 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.ml.model;
-
-import org.apache.eagle.policy.config.AbstractPolicyDefinition;
-import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
-import com.fasterxml.jackson.annotation.JsonTypeInfo;
-
-import java.util.Properties;
-
-/**
- {  
-   "type":"MachineLearning",
-   "context":{
-      "site":"dev",
-      "dataSource":"userprofile"
-      "component":"dev-component",
-      "description":"ML based user profile anomaly detection",
-      "severity":"WARNING",
-      "notificationByEmail":"true"
-   },
-   "algorithms":[
-      {
-         "name":"eagle.security.userprofile.util.EigenBasedAnomalyDetection",
-         "description":"EigenBasedAnomalyDetection",
-         "features":"getfileinfo, open, listStatus, setTimes, setPermission, rename, mkdirs, create, setReplication, contentSummary, delete"
-      },
-      {  
-         "name":"eagle.security.userprofile.util.KDEBasedAnomalyDetection",
-         "description":"DensityBasedAnomalyDetection",
-         "features":"getfileinfo, open, listStatus, setTimes, setPermission, rename, mkdirs, create, setReplication, contentSummary, delete"
-      }
-   ]
-}
-   version field is used for model update, so eagle framework can understand that something changes.
-
- */
-@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type", visible=true)
-@JsonIgnoreProperties(ignoreUnknown = true)
-public class MLPolicyDefinition extends AbstractPolicyDefinition{
-	private String version;
-	private Properties context;
-    private MLAlgorithm[] algorithms;
-
-    public MLAlgorithm[] getAlgorithms() {
-        return algorithms;
-    }
-
-    public void setAlgorithms(MLAlgorithm[] algorithms) {
-        this.algorithms = algorithms;
-    }
-
-    public String getVersion() {
-        return version;
-    }
-
-    public void setVersion(String version) {
-        this.version = version;
-    }
-
-    public Properties getContext() {
-        return context;
-    }
-
-    public void setContext(Properties context) {
-        this.context = context;
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/1dffec09/eagle-core/eagle-machinelearning/eagle-machinelearning-base/src/main/java/org/apache/eagle/ml/utils/MLReflectionUtils.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-machinelearning/eagle-machinelearning-base/src/main/java/org/apache/eagle/ml/utils/MLReflectionUtils.java b/eagle-core/eagle-machinelearning/eagle-machinelearning-base/src/main/java/org/apache/eagle/ml/utils/MLReflectionUtils.java
deleted file mode 100644
index fe70165..0000000
--- a/eagle-core/eagle-machinelearning/eagle-machinelearning-base/src/main/java/org/apache/eagle/ml/utils/MLReflectionUtils.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.ml.utils;
-
-import org.apache.eagle.ml.MLAlgorithmEvaluator;
-import org.apache.eagle.ml.model.MLAlgorithm;
-
-/**
- * @since 8/14/15
- */
-public class MLReflectionUtils {
-    /**
-     * Create a new MLAlgorithmEvaluator based on MLAlgorithm
-     *
-     * @param algorithm MLAlgorithm
-     * @return MLAlgorithmEvaluator object instance
-     * @throws ClassNotFoundException
-     * @throws IllegalAccessException
-     * @throws InstantiationException
-     */
-    public static MLAlgorithmEvaluator newMLAlgorithmEvaluator(MLAlgorithm algorithm) throws ClassNotFoundException, IllegalAccessException, InstantiationException {
-        return (MLAlgorithmEvaluator) Class.forName(algorithm.getEvaluator()).newInstance();
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/1dffec09/eagle-core/eagle-machinelearning/eagle-machinelearning-base/src/main/resources/META-INF/services/org.apache.eagle.policy.PolicyEvaluatorServiceProvider
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-machinelearning/eagle-machinelearning-base/src/main/resources/META-INF/services/org.apache.eagle.policy.PolicyEvaluatorServiceProvider b/eagle-core/eagle-machinelearning/eagle-machinelearning-base/src/main/resources/META-INF/services/org.apache.eagle.policy.PolicyEvaluatorServiceProvider
deleted file mode 100644
index d7a2754..0000000
--- a/eagle-core/eagle-machinelearning/eagle-machinelearning-base/src/main/resources/META-INF/services/org.apache.eagle.policy.PolicyEvaluatorServiceProvider
+++ /dev/null
@@ -1,16 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-org.apache.eagle.ml.impl.MLPolicyEvaluatorServiceProviderImpl
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/1dffec09/eagle-core/eagle-machinelearning/eagle-machinelearning-base/src/test/resources/META-INF/services/org.apache.eagle.policy.PolicyEvaluatorServiceProvider
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-machinelearning/eagle-machinelearning-base/src/test/resources/META-INF/services/org.apache.eagle.policy.PolicyEvaluatorServiceProvider b/eagle-core/eagle-machinelearning/eagle-machinelearning-base/src/test/resources/META-INF/services/org.apache.eagle.policy.PolicyEvaluatorServiceProvider
deleted file mode 100644
index d7a2754..0000000
--- a/eagle-core/eagle-machinelearning/eagle-machinelearning-base/src/test/resources/META-INF/services/org.apache.eagle.policy.PolicyEvaluatorServiceProvider
+++ /dev/null
@@ -1,16 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-org.apache.eagle.ml.impl.MLPolicyEvaluatorServiceProviderImpl
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/1dffec09/eagle-core/eagle-machinelearning/eagle-machinelearning-base/src/test/resources/application.conf
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-machinelearning/eagle-machinelearning-base/src/test/resources/application.conf b/eagle-core/eagle-machinelearning/eagle-machinelearning-base/src/test/resources/application.conf
deleted file mode 100644
index 2125cbc..0000000
--- a/eagle-core/eagle-machinelearning/eagle-machinelearning-base/src/test/resources/application.conf
+++ /dev/null
@@ -1,57 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-{
-  "envContextConfig" : {
-    "env" : "storm",
-    "topologyName" : "securityLogProcessTopology",
-    "mode" : "local",
-    "parallelismConfig" : {
-    }
-  },
-  "dataSourceConfig": {
-    "flavor" : "stormhdfs",
-    "hdfsConnection" : "10.225.92.42:9000",
-    "hdfsPath" : "/tmp/user1",
-    "copyToPath" : "/Users/user1/Security/hive_files/weeks_file/MLDemo_testing/consolidated",
-    "fileFormat" : "CSV",
-    "typeOperation" : "user profile generation",
-    "userlist": "/UserList.txt",
-    "containsFileHeader": true
-  },
-  "eagleProps" : {
-    "application" : "hdfsAuditLog-ML",
-    "env" : "test",
-    "mail.host" : "mailHost.com",
-    "mail.debug" : "true",
-    "eagleService": {
-      "host": "localhost",
-      "port": 9099,
-      "username": "admin",
-      "password": "secret"
-    }
-  },
-  "alertExecutorConfigs" : {
-     "userAnomalousActivityDetectionAlertExecutor" : {
-       "parallelism" : 2,
-       "partitioner" : "org.apache.eagle.policy.DefaultPolicyPartitioner"
-     }
-  },
-  "dynamicConfigSource" : {
-     "eagleServiceHost" : "localhost",
-     "eagleServicePort" : "8080",
-     "enabled"          : "true"
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/1dffec09/eagle-core/eagle-machinelearning/eagle-machinelearning-base/src/test/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-machinelearning/eagle-machinelearning-base/src/test/resources/log4j.properties b/eagle-core/eagle-machinelearning/eagle-machinelearning-base/src/test/resources/log4j.properties
deleted file mode 100644
index 71a5dac..0000000
--- a/eagle-core/eagle-machinelearning/eagle-machinelearning-base/src/test/resources/log4j.properties
+++ /dev/null
@@ -1,34 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-log4j.rootLogger=INFO, DRFA, stdout
-eagle.log.dir=./logs
-eagle.log.file=eagle.log
-
-# standard output
-log4j.appender.stdout=org.apache.log4j.ConsoleAppender
-log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
-log4j.appender.stdout.layout.ConversionPattern=%d{ISO8601} %p [%t] %c{2}[%L]: %m%n
-
-# Daily Rolling File Appender
-log4j.appender.DRFA=org.apache.log4j.DailyRollingFileAppender
-log4j.appender.DRFA.File=${eagle.log.dir}/${eagle.log.file}
-log4j.appender.DRFA.DatePattern=.yyyy-MM-dd
-# 30-day backup
-#log4j.appender.DRFA.MaxBackupIndex=30
-log4j.appender.DRFA.layout=org.apache.log4j.PatternLayout
-
-# Pattern format: Date LogLevel LoggerName LogMessage
-log4j.appender.DRFA.layout.ConversionPattern=%d{ISO8601} %p [%t] %c{2}[%L]: %m%n
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/1dffec09/eagle-core/eagle-machinelearning/eagle-machinelearning-base/src/test/resources/ml-policyDef-UserProfile.txt
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-machinelearning/eagle-machinelearning-base/src/test/resources/ml-policyDef-UserProfile.txt b/eagle-core/eagle-machinelearning/eagle-machinelearning-base/src/test/resources/ml-policyDef-UserProfile.txt
deleted file mode 100644
index 2f4bcb1..0000000
--- a/eagle-core/eagle-machinelearning/eagle-machinelearning-base/src/test/resources/ml-policyDef-UserProfile.txt
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-{
-   "type":"MachineLearning",
-   "fileTypes":[
-      {
-         "type":"CSV",
-         "containsHeader":"true",
-         "fileParserClass":"eagle.ml.CSVFileFormatParser"
-      }
-   ],
-   "alertContext":{
-      "cluster":"testCluster",
-      "datacenter":"testDataCenter",
-      "component":"testComponent",
-      "description":"ML based user profile anomaly detection",
-      "severity":"WARNING",
-      "notificationByEmail":"true"
-   },
-   "algorithms":[
-      {
-         "name":"org.apache.eagle.security.userprofile.util.EigenBasedAnomalyDetection",
-         "description":"EigenBasedAnomalyDetection",
-         "modelPath":"/models/",
-         "featureSet":"getfileinfo, open, listStatus, setTimes, setPermission, rename, mkdirs, create, setReplication, contentSummary, delete, setOwner, fsck",
-         "type":"CSV"
-      },
-      {
-         "name":"org.apache.eagle.security.userprofile.util.KDEBasedAnomalyDetection",
-         "description":"DensityBasedAnomalyDetection",
-         "modelPath":"/models/",
-         "featureSet":"getfileinfo, open, listStatus, setTimes, setPermission, rename, mkdirs, create, setReplication, contentSummary, delete, setOwner, fsck",
-         "type":"CSV"
-      }
-   ]
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/1dffec09/eagle-core/eagle-machinelearning/pom.xml
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-machinelearning/pom.xml b/eagle-core/eagle-machinelearning/pom.xml
deleted file mode 100644
index e79ebfc..0000000
--- a/eagle-core/eagle-machinelearning/pom.xml
+++ /dev/null
@@ -1,33 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
-  ~ Licensed to the Apache Software Foundation (ASF) under one or more
-  ~ contributor license agreements.  See the NOTICE file distributed with
-  ~ this work for additional information regarding copyright ownership.
-  ~ The ASF licenses this file to You under the Apache License, Version 2.0
-  ~ (the "License"); you may not use this file except in compliance with
-  ~ the License.  You may obtain a copy of the License at
-  ~
-  ~    http://www.apache.org/licenses/LICENSE-2.0
-  ~
-  ~ Unless required by applicable law or agreed to in writing, software
-  ~ distributed under the License is distributed on an "AS IS" BASIS,
-  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-  ~ See the License for the specific language governing permissions and
-  ~ limitations under the License.
-  -->
-
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
-    <parent>
-        <artifactId>eagle-core</artifactId>
-        <groupId>org.apache.eagle</groupId>
-        <version>0.5.0-incubating-SNAPSHOT</version>
-        <relativePath>../pom.xml</relativePath>
-    </parent>
-    <modelVersion>4.0.0</modelVersion>
-
-    <artifactId>eagle-machinelearning-parent</artifactId>
-    <packaging>pom</packaging>
-    <modules>
-        <module>eagle-machinelearning-base</module>
-    </modules>
-</project>

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/1dffec09/eagle-core/pom.xml
----------------------------------------------------------------------
diff --git a/eagle-core/pom.xml b/eagle-core/pom.xml
index 9041840..0bcab2a 100644
--- a/eagle-core/pom.xml
+++ b/eagle-core/pom.xml
@@ -39,7 +39,6 @@
         <module>eagle-policy</module>
         <module>eagle-alert-parent</module>
         <module>eagle-query</module>
-   		<module>eagle-machinelearning</module>
         <module>eagle-embed</module>
         <module>eagle-metric</module>
         <module>eagle-application-management</module>

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/1dffec09/eagle-topology-assembly/src/resources/META-INF/services/org.apache.eagle.policy.PolicyEvaluatorServiceProvider
----------------------------------------------------------------------
diff --git a/eagle-topology-assembly/src/resources/META-INF/services/org.apache.eagle.policy.PolicyEvaluatorServiceProvider b/eagle-topology-assembly/src/resources/META-INF/services/org.apache.eagle.policy.PolicyEvaluatorServiceProvider
index f8794f3..eac2bfd 100644
--- a/eagle-topology-assembly/src/resources/META-INF/services/org.apache.eagle.policy.PolicyEvaluatorServiceProvider
+++ b/eagle-topology-assembly/src/resources/META-INF/services/org.apache.eagle.policy.PolicyEvaluatorServiceProvider
@@ -13,5 +13,4 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-org.apache.eagle.policy.siddhi.SiddhiPolicyEvaluatorServiceProviderImpl
-org.apache.eagle.ml.impl.MLPolicyEvaluatorServiceProviderImpl
\ No newline at end of file
+org.apache.eagle.policy.siddhi.SiddhiPolicyEvaluatorServiceProviderImpl
\ No newline at end of file


[2/2] incubator-eagle git commit: EAGLE-370 absence alert engine absence alert engine

Posted by yo...@apache.org.
EAGLE-370 absence alert engine
absence alert engine

Author: Yong Zhang <yo...@apache.org>
Reviewer: Yong Zhang

Closes: #262


Project: http://git-wip-us.apache.org/repos/asf/incubator-eagle/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-eagle/commit/1dffec09
Tree: http://git-wip-us.apache.org/repos/asf/incubator-eagle/tree/1dffec09
Diff: http://git-wip-us.apache.org/repos/asf/incubator-eagle/diff/1dffec09

Branch: refs/heads/develop
Commit: 1dffec09ccb084c484e0427d2ce2cb567403cf21
Parents: 0b77d94
Author: yonzhang <yo...@gmail.com>
Authored: Mon Jul 11 16:56:46 2016 -0700
Committer: yonzhang <yo...@gmail.com>
Committed: Mon Jul 11 16:56:46 2016 -0700

----------------------------------------------------------------------
 .../engine/evaluator/PolicyStreamHandlers.java  |   8 +-
 .../evaluator/absence/AbsenceAlertDriver.java   |  68 ++++++++
 .../evaluator/absence/AbsenceDailyRule.java     |  26 +++
 .../evaluator/absence/AbsencePolicyHandler.java | 134 +++++++++++++++
 .../engine/evaluator/absence/AbsenceRule.java   |  23 +++
 .../engine/evaluator/absence/AbsenceWindow.java |  38 +++++
 .../absence/AbsenceWindowGenerator.java         |  50 ++++++
 .../absence/AbsenceWindowProcessor.java         |  97 +++++++++++
 .../impl/DistinctValuesInTimeWindow.java        | 141 ---------------
 .../nodata/DistinctValuesInTimeWindow.java      | 141 +++++++++++++++
 .../evaluator/nodata/NoDataPolicyHandler.java   |  28 ++-
 .../publisher/impl/JsonEventSerializer.java     |  71 ++++++++
 .../alert/engine/absence/TestAbsenceDriver.java |  96 +++++++++++
 .../absence/TestAbsencePolicyHandler.java       | 111 ++++++++++++
 .../absence/TestAbsenceWindowGenerator.java     |  80 +++++++++
 .../absence/TestAbsenceWindowProcessor.java     |  70 ++++++++
 .../engine/e2e/Integration5AbsenceAlert.java    |  94 ++++++++++
 .../engine/e2e/SampleClient5AbsenceAlert.java   |  93 ++++++++++
 .../nodata/TestDistinctValuesInTimeWindow.java  |   2 +-
 .../alert/engine/nodata/TestNoDataAlert.java    |  33 +++-
 .../engine/nodata/TestNoDataPolicyHandler.java  |   4 +-
 .../resources/absence/application-absence.conf  |  60 +++++++
 .../src/test/resources/absence/datasources.json |  17 ++
 .../src/test/resources/absence/policies.json    |  24 +++
 .../test/resources/absence/publishments.json    |  20 +++
 .../resources/absence/streamdefinitions.json    |  29 ++++
 .../src/test/resources/absence/topologies.json  |  31 ++++
 .../src/test/resources/nodata/policies.json     |   5 +-
 .../eagle-machinelearning-base/pom.xml          |  37 ----
 .../apache/eagle/ml/MLAlgorithmEvaluator.java   |  50 ------
 .../org/apache/eagle/ml/MLAnomalyCallback.java  |  28 ---
 .../java/org/apache/eagle/ml/MLConstants.java   |  24 ---
 .../java/org/apache/eagle/ml/MLModelDAO.java    |  30 ----
 .../org/apache/eagle/ml/MLPolicyEvaluator.java  | 170 -------------------
 .../eagle/ml/impl/MLAnomalyCallbackImpl.java    | 107 ------------
 .../apache/eagle/ml/impl/MLModelDAOImpl.java    |  62 -------
 .../MLPolicyEvaluatorServiceProviderImpl.java   |  52 ------
 .../org/apache/eagle/ml/model/MLAlgorithm.java  |  61 -------
 .../apache/eagle/ml/model/MLCallbackResult.java | 137 ---------------
 .../eagle/ml/model/MLEntityRepository.java      |  25 ---
 .../apache/eagle/ml/model/MLModelAPIEntity.java |  67 --------
 .../eagle/ml/model/MLPolicyDefinition.java      |  82 ---------
 .../eagle/ml/utils/MLReflectionUtils.java       |  38 -----
 ....eagle.policy.PolicyEvaluatorServiceProvider |  16 --
 ....eagle.policy.PolicyEvaluatorServiceProvider |  16 --
 .../src/test/resources/application.conf         |  57 -------
 .../src/test/resources/log4j.properties         |  34 ----
 .../test/resources/ml-policyDef-UserProfile.txt |  51 ------
 eagle-core/eagle-machinelearning/pom.xml        |  33 ----
 eagle-core/pom.xml                              |   1 -
 ....eagle.policy.PolicyEvaluatorServiceProvider |   3 +-
 51 files changed, 1443 insertions(+), 1332 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/1dffec09/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/PolicyStreamHandlers.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/PolicyStreamHandlers.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/PolicyStreamHandlers.java
index e8f736c..638b240 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/PolicyStreamHandlers.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/PolicyStreamHandlers.java
@@ -19,19 +19,23 @@ package org.apache.eagle.alert.engine.evaluator;
 import java.util.Map;
 
 import org.apache.eagle.alert.engine.coordinator.StreamDefinition;
-import org.apache.eagle.alert.engine.evaluator.impl.SiddhiPolicyHandler;
+import org.apache.eagle.alert.engine.evaluator.absence.AbsencePolicyHandler;
 import org.apache.eagle.alert.engine.evaluator.nodata.NoDataPolicyHandler;
+import org.apache.eagle.alert.engine.evaluator.impl.SiddhiPolicyHandler;
 
 public class PolicyStreamHandlers {
     public static final String SIDDHI_ENGINE ="siddhi";
     public static final String NO_DATA_ALERT_ENGINE ="nodataalert";
+    public static final String ABSENCE_ALERT_ENGINE ="absencealert";
 
     public static PolicyStreamHandler createHandler(String type, Map<String, StreamDefinition> sds){
         if(SIDDHI_ENGINE.equals(type)) {
             return new SiddhiPolicyHandler(sds);
         }else if(NO_DATA_ALERT_ENGINE.equals(type)){
             return new NoDataPolicyHandler(sds);
+        }else if(ABSENCE_ALERT_ENGINE.equals(type)){
+            return new AbsencePolicyHandler(sds);
         }
-        throw new IllegalArgumentException("Illegal policy stream handler type: "+type);
+        throw new IllegalArgumentException("Illegal policy stream handler type " + type);
     }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/1dffec09/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/absence/AbsenceAlertDriver.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/absence/AbsenceAlertDriver.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/absence/AbsenceAlertDriver.java
new file mode 100644
index 0000000..bf142cd
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/absence/AbsenceAlertDriver.java
@@ -0,0 +1,68 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.alert.engine.evaluator.absence;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+
+/**
+ * Since 7/7/16.
+ * this assumes that event comes in time order
+ */
+public class AbsenceAlertDriver {
+    private static final Logger LOG = LoggerFactory.getLogger(AbsenceAlertDriver.class);
+    private List<Object> expectedAttrs;
+    private AbsenceWindowProcessor processor;
+    private AbsenceWindowGenerator windowGenerator;
+
+    public AbsenceAlertDriver(List<Object> expectedAttrs, AbsenceWindowGenerator windowGenerator){
+        this.expectedAttrs = expectedAttrs;
+        this.windowGenerator = windowGenerator;
+    }
+
+    public void process(List<Object> appearAttrs, long occurTime){
+        // initialize window
+        if(processor == null){
+            processor = nextProcessor(occurTime);
+            LOG.info("initialized a new window {}", processor);
+        }
+        processor.process(appearAttrs, occurTime);
+        AbsenceWindowProcessor.OccurStatus status = processor.checkStatus();
+        boolean expired = processor.checkExpired();
+        if(expired){
+            if(status == AbsenceWindowProcessor.OccurStatus.absent){
+                // send alert
+                LOG.info("this is an alert");
+                // figure out next window and set the new window
+            }
+            processor = nextProcessor(occurTime);
+            LOG.info("created a new window {}", processor);
+        }
+    }
+
+    /**
+     * calculate absolute time range based on current timestamp
+     * @param currTime milliseconds
+     * @return
+     */
+    private AbsenceWindowProcessor nextProcessor(long currTime){
+        AbsenceWindow window = windowGenerator.nextWindow(currTime);
+        return new AbsenceWindowProcessor(expectedAttrs, window);
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/1dffec09/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/absence/AbsenceDailyRule.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/absence/AbsenceDailyRule.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/absence/AbsenceDailyRule.java
new file mode 100644
index 0000000..ed50280
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/absence/AbsenceDailyRule.java
@@ -0,0 +1,26 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.alert.engine.evaluator.absence;
+
+/**
+ * Since 7/7/16.
+ */
+public class AbsenceDailyRule implements AbsenceRule {
+    public static final long DAY_MILLI_SECONDS = 86400*1000L;
+    public long startOffset;
+    public long endOffset;
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/1dffec09/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/absence/AbsencePolicyHandler.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/absence/AbsencePolicyHandler.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/absence/AbsencePolicyHandler.java
new file mode 100644
index 0000000..0a07a27
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/absence/AbsencePolicyHandler.java
@@ -0,0 +1,134 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.alert.engine.evaluator.absence;
+
+import org.apache.eagle.alert.engine.Collector;
+import org.apache.eagle.alert.engine.coordinator.PolicyDefinition;
+import org.apache.eagle.alert.engine.coordinator.StreamDefinition;
+import org.apache.eagle.alert.engine.evaluator.PolicyHandlerContext;
+import org.apache.eagle.alert.engine.evaluator.PolicyStreamHandler;
+import org.apache.eagle.alert.engine.model.AlertStreamEvent;
+import org.apache.eagle.alert.engine.model.StreamEvent;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.text.SimpleDateFormat;
+import java.util.*;
+
+/**
+ * Since 7/6/16.
+ *  * policy would be like:
+ * {
+ "name": "absenceAlertPolicy",
+ "description": "absenceAlertPolicy",
+ "inputStreams": [
+ "absenceAlertStream"
+ ],
+ "outputStreams": [
+ "absenceAlertStream_out"
+ ],
+ "definition": {
+ "type": "absencealert",
+ "value": "1,jobID,job1,daily_rule,14:00:00,15:00:00"
+ },
+ "partitionSpec": [
+ {
+ "streamId": "absenceAlertStream",
+ "type": "GROUPBY",
+ "columns" : ["jobID"]
+ }
+ ],
+ "parallelismHint": 2
+ }
+ */
+public class AbsencePolicyHandler implements PolicyStreamHandler {
+    private static final Logger LOG = LoggerFactory.getLogger(AbsencePolicyHandler.class);
+    private Map<String, StreamDefinition> sds;
+    private volatile PolicyDefinition policyDef;
+    private volatile Collector<AlertStreamEvent> collector;
+    private volatile PolicyHandlerContext context;
+    private volatile List<Integer> expectFieldIndices = new ArrayList<>();
+    private volatile List<Object> expectValues = new ArrayList<>();
+    private AbsenceAlertDriver driver;
+
+    public AbsencePolicyHandler(Map<String, StreamDefinition> sds){
+        this.sds = sds;
+    }
+
+    @Override
+    public void prepare(Collector<AlertStreamEvent> collector, PolicyHandlerContext context) throws Exception {
+        this.collector = collector;
+        this.context = context;
+        this.policyDef = context.getPolicyDefinition();
+        List<String> inputStreams = policyDef.getInputStreams();
+        // validate inputStreams has to contain only one stream
+        if(inputStreams.size() != 1)
+            throw new IllegalArgumentException("policy inputStream size has to be 1 for absence alert");
+        // validate outputStream has to contain only one stream
+        if(policyDef.getOutputStreams().size() != 1)
+            throw new IllegalArgumentException("policy outputStream size has to be 1 for absense alert");
+
+        String is = inputStreams.get(0);
+        StreamDefinition sd = sds.get(is);
+
+        String policyValue = policyDef.getDefinition().getValue();
+
+        // assume that absence alert policy value consists of "numOfFields, f1_name, f2_name, f1_value, f2_value, absence_window_rule_type, startTimeOffset, endTimeOffset}
+        String[] segments = policyValue.split(",");
+        int offset = 0;
+        // populate wisb field names
+        int numOfFields = Integer.parseInt(segments[offset++]);
+        for(int i = offset; i < offset+numOfFields; i++){
+            String fn = segments[i];
+            expectFieldIndices.add(sd.getColumnIndex(fn));
+        }
+        offset += numOfFields;
+        for(int i = offset; i < offset+numOfFields; i++){
+            String fn = segments[i];
+            expectValues.add(fn);
+        }
+        offset += numOfFields;
+        String absence_window_rule_type = segments[offset++];
+        AbsenceDailyRule rule = new AbsenceDailyRule();
+        SimpleDateFormat sdf = new SimpleDateFormat("HH:mm:ss");
+        sdf.setTimeZone(TimeZone.getTimeZone("UTC"));
+        Date t1 = sdf.parse(segments[offset++]);
+        rule.startOffset = t1.getTime();
+        Date t2 = sdf.parse(segments[offset++]);
+        rule.endOffset = t2.getTime();
+        AbsenceWindowGenerator generator = new AbsenceWindowGenerator(rule);
+        driver = new AbsenceAlertDriver(expectValues, generator);
+    }
+
+    @Override
+    public void send(StreamEvent event) throws Exception {
+        Object[] data = event.getData();
+        List<Object> columnValues = new ArrayList<>();
+        for(int i=0; i<expectFieldIndices.size(); i++){
+            Object o = data[expectFieldIndices.get(i)];
+            // convert value to string
+            columnValues.add(o.toString());
+        }
+
+        driver.process(columnValues, event.getTimestamp());
+    }
+
+    @Override
+    public void close() throws Exception {
+
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/1dffec09/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/absence/AbsenceRule.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/absence/AbsenceRule.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/absence/AbsenceRule.java
new file mode 100644
index 0000000..272d5cf
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/absence/AbsenceRule.java
@@ -0,0 +1,23 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.alert.engine.evaluator.absence;
+
+/**
+ * Since 7/7/16.
+ */
+public interface AbsenceRule {
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/1dffec09/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/absence/AbsenceWindow.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/absence/AbsenceWindow.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/absence/AbsenceWindow.java
new file mode 100644
index 0000000..728e702
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/absence/AbsenceWindow.java
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.alert.engine.evaluator.absence;
+
+import java.text.SimpleDateFormat;
+import java.util.Date;
+import java.util.TimeZone;
+
+/**
+ * Since 7/7/16.
+ */
+public class AbsenceWindow {
+    public long startTime;
+    public long endTime;
+
+    public String toString(){
+        SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
+        sdf.setTimeZone(TimeZone.getTimeZone("UTC"));
+        String t1 = sdf.format(new Date(startTime));
+        String t2 = sdf.format(new Date(endTime));
+        String format = "startTime=%d (%s), endTime=%d (%s)";
+        return String.format(format, startTime, t1, endTime, t2);
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/1dffec09/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/absence/AbsenceWindowGenerator.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/absence/AbsenceWindowGenerator.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/absence/AbsenceWindowGenerator.java
new file mode 100644
index 0000000..6cd0880
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/absence/AbsenceWindowGenerator.java
@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.alert.engine.evaluator.absence;
+
+/**
+ * Since 7/7/16.
+ */
+public class AbsenceWindowGenerator {
+    private AbsenceRule rule;
+    public AbsenceWindowGenerator(AbsenceRule rule){
+        this.rule = rule;
+    }
+
+    /**
+     * @param currTime
+     * @return
+     */
+    public AbsenceWindow nextWindow(long currTime){
+        AbsenceWindow window = new AbsenceWindow();
+        if(rule instanceof AbsenceDailyRule){
+            AbsenceDailyRule r = (AbsenceDailyRule)rule;
+            long adjustment = 0; // if today's window already expires, then adjust to tomorrow's window
+            if(currTime % AbsenceDailyRule.DAY_MILLI_SECONDS > r.startOffset){
+                adjustment = AbsenceDailyRule.DAY_MILLI_SECONDS;
+            }
+            // use current timestamp to round down to day
+            long day = currTime - currTime % AbsenceDailyRule.DAY_MILLI_SECONDS;
+            day += adjustment;
+            window.startTime = day + r.startOffset;
+            window.endTime = day + r.endOffset;
+            return window;
+        }else{
+            throw new UnsupportedOperationException("not supported rule " + rule);
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/1dffec09/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/absence/AbsenceWindowProcessor.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/absence/AbsenceWindowProcessor.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/absence/AbsenceWindowProcessor.java
new file mode 100644
index 0000000..4e8d381
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/absence/AbsenceWindowProcessor.java
@@ -0,0 +1,97 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.alert.engine.evaluator.absence;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+
+/**
+ * Since 7/6/16.
+ * To process each incoming event
+ * internally maintain state machine to trigger alert when some attribute does not occur within this window
+ */
+public class AbsenceWindowProcessor {
+    private static final Logger LOG = LoggerFactory.getLogger(AbsenceWindowProcessor.class);
+    private List<Object> expectAttrs;
+    private AbsenceWindow window;
+    private boolean expired; // to mark if the time range has been went through
+    private OccurStatus status = OccurStatus.not_sure;
+
+    public enum OccurStatus{
+        not_sure,
+        occured,
+        absent
+    }
+
+    public AbsenceWindowProcessor(List<Object> expectAttrs, AbsenceWindow window){
+        this.expectAttrs = expectAttrs;
+        this.window = window;
+        expired = false;
+    }
+
+    /**
+     * return true if it is certain that expected attributes don't occur during startTime and endTime, else return false
+     * @param appearAttrs
+     * @param occurTime
+     * @return
+     */
+    public void process(List<Object> appearAttrs, long occurTime){
+        if(expired)
+            throw new IllegalStateException("Expired window can't recieve events");
+        switch(status) {
+            case not_sure:
+                if(occurTime < window.startTime) {
+                    break;
+                }else if(occurTime >= window.startTime &&
+                        occurTime <= window.endTime) {
+                    if(expectAttrs.equals(appearAttrs)) {
+                        status = OccurStatus.occured;
+                    }
+                    break;
+                }else{
+                    status = OccurStatus.absent;
+                    break;
+                }
+            case occured:
+                if(occurTime > window.endTime)
+                    expired = true;
+                break;
+            default:
+                break;
+        }
+        // reset status
+        if(status == OccurStatus.absent){
+            expired = true;
+        }
+    }
+
+    public OccurStatus checkStatus(){
+        return status;
+    }
+    public boolean checkExpired(){
+        return expired;
+    }
+    public AbsenceWindow currWindow(){
+        return window;
+    }
+
+    public String toString(){
+        return "expectAttrs=" + expectAttrs + ", status=" + status + ", expired=" + expired + ", window=[" + window + "]";
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/1dffec09/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/impl/DistinctValuesInTimeWindow.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/impl/DistinctValuesInTimeWindow.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/impl/DistinctValuesInTimeWindow.java
deleted file mode 100644
index 8a681da..0000000
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/impl/DistinctValuesInTimeWindow.java
+++ /dev/null
@@ -1,141 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.engine.evaluator.impl;
-
-import java.util.Comparator;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.Map;
-import java.util.SortedMap;
-import java.util.TreeMap;
-
-import org.apache.commons.lang.builder.HashCodeBuilder;
-
-/**
- * Since 6/28/16.
- * to get distinct values within a specified time window
- * valueMaxTimeMap : each distinct value is associated with max timestamp it ever had
- * timeSortedMap : map sorted by timestamp first and then value
- * With the above 2 data structure, we can get distinct values in LOG(N)
- */
-public class DistinctValuesInTimeWindow {
-    public static class ValueAndTime{
-        Object value;
-        long timestamp;
-        public ValueAndTime(Object value, long timestamp){
-            this.value = value;
-            this.timestamp = timestamp;
-        }
-
-        public String toString(){
-            return "[" + value + "," + timestamp + "]";
-        }
-
-        public int hashCode(){
-            return new HashCodeBuilder().append(value).append(timestamp).toHashCode();
-        }
-
-        public boolean equals(Object that){
-            if(!(that instanceof ValueAndTime))
-                return false;
-            ValueAndTime another = (ValueAndTime)that;
-            return another.timestamp == this.timestamp && another.value.equals(this.value);
-        }
-    }
-
-    public static class ValueAndTimeComparator implements Comparator<ValueAndTime>{
-        @Override
-        public int compare(ValueAndTime o1, ValueAndTime o2) {
-            if(o1.timestamp != o2.timestamp)
-                return (o1.timestamp > o2.timestamp) ? 1 : -1;
-            if(o1.value.equals(o2.value))
-                return 0;
-            else {
-                // this is not strictly correct, but I don't want to write too many comparators here :-)
-                if(o1.hashCode() > o2.hashCode())
-                    return 1;
-                else
-                    return -1;
-            }
-        }
-    }
-
-    /**
-     * map from value to max timestamp for this value
-     */
-    private Map<Object, Long> valueMaxTimeMap = new HashMap<>();
-    /**
-     * map sorted by time(max timestamp for the value) and then value
-     */
-    private SortedMap<ValueAndTime, ValueAndTime> timeSortedMap = new TreeMap<>(new ValueAndTimeComparator());
-    private long maxTimestamp = 0L;
-    private long window;
-    private boolean windowSlided;
-
-    /**
-     * @param window - milliseconds
-     */
-    public DistinctValuesInTimeWindow(long window){
-        this.window = window;
-    }
-
-    public void send(Object value, long timestamp){
-        ValueAndTime vt = new ValueAndTime(value, timestamp);
-
-        // todo think of time out of order
-        if(valueMaxTimeMap.containsKey(value)){
-            // remove that entry with old timestamp in timeSortedMap
-            long oldTime = valueMaxTimeMap.get(value);
-            if(oldTime >= timestamp){
-                // no any effect as the new timestamp is equal or even less than old timestamp
-                return;
-            }
-            timeSortedMap.remove(new ValueAndTime(value, oldTime));
-        }
-        // insert entry with new timestamp in timeSortedMap
-        timeSortedMap.put(vt, vt);
-        // update new timestamp in valueMaxTimeMap
-        valueMaxTimeMap.put(value, timestamp);
-
-        // evict old entries
-        // store max timestamp if possible
-        maxTimestamp = Math.max(maxTimestamp, timestamp);
-
-        // check if some values should be evicted because of time window
-        Iterator<Map.Entry<ValueAndTime, ValueAndTime>> it = timeSortedMap.entrySet().iterator();
-        while(it.hasNext()){
-            Map.Entry<ValueAndTime, ValueAndTime> entry = it.next();
-            if(entry.getKey().timestamp < maxTimestamp - window){
-                // should remove the entry in valueMaxTimeMap and timeSortedMap
-                valueMaxTimeMap.remove(entry.getKey().value);
-                windowSlided = true;
-
-                it.remove();
-            }else {
-                break;
-            }
-        }
-    }
-
-    public Map<Object, Long> distinctValues(){
-        return valueMaxTimeMap;
-    }
-
-    public boolean windowSlided(){
-        return windowSlided;
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/1dffec09/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/nodata/DistinctValuesInTimeWindow.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/nodata/DistinctValuesInTimeWindow.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/nodata/DistinctValuesInTimeWindow.java
new file mode 100644
index 0000000..676357a
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/nodata/DistinctValuesInTimeWindow.java
@@ -0,0 +1,141 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.alert.engine.evaluator.nodata;
+
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.SortedMap;
+import java.util.TreeMap;
+
+import org.apache.commons.lang.builder.HashCodeBuilder;
+
+/**
+ * Since 6/28/16.
+ * to get distinct values within a specified time window
+ * valueMaxTimeMap : each distinct value is associated with max timestamp it ever had
+ * timeSortedMap : map sorted by timestamp first and then value
+ * With the above 2 data structure, we can get distinct values in LOG(N)
+ */
+public class DistinctValuesInTimeWindow {
+    public static class ValueAndTime{
+        Object value;
+        long timestamp;
+        public ValueAndTime(Object value, long timestamp){
+            this.value = value;
+            this.timestamp = timestamp;
+        }
+
+        public String toString(){
+            return "[" + value + "," + timestamp + "]";
+        }
+
+        public int hashCode(){
+            return new HashCodeBuilder().append(value).append(timestamp).toHashCode();
+        }
+
+        public boolean equals(Object that){
+            if(!(that instanceof ValueAndTime))
+                return false;
+            ValueAndTime another = (ValueAndTime)that;
+            return another.timestamp == this.timestamp && another.value.equals(this.value);
+        }
+    }
+
+    public static class ValueAndTimeComparator implements Comparator<ValueAndTime>{
+        @Override
+        public int compare(ValueAndTime o1, ValueAndTime o2) {
+            if(o1.timestamp != o2.timestamp)
+                return (o1.timestamp > o2.timestamp) ? 1 : -1;
+            if(o1.value.equals(o2.value))
+                return 0;
+            else {
+                // this is not strictly correct, but I don't want to write too many comparators here :-)
+                if(o1.hashCode() > o2.hashCode())
+                    return 1;
+                else
+                    return -1;
+            }
+        }
+    }
+
+    /**
+     * map from value to max timestamp for this value
+     */
+    private Map<Object, Long> valueMaxTimeMap = new HashMap<>();
+    /**
+     * map sorted by time(max timestamp for the value) and then value
+     */
+    private SortedMap<ValueAndTime, ValueAndTime> timeSortedMap = new TreeMap<>(new ValueAndTimeComparator());
+    private long maxTimestamp = 0L;
+    private long window;
+    private boolean windowSlided;
+
+    /**
+     * @param window - milliseconds
+     */
+    public DistinctValuesInTimeWindow(long window){
+        this.window = window;
+    }
+
+    public void send(Object value, long timestamp){
+        ValueAndTime vt = new ValueAndTime(value, timestamp);
+
+        // todo think of time out of order
+        if(valueMaxTimeMap.containsKey(value)){
+            // remove that entry with old timestamp in timeSortedMap
+            long oldTime = valueMaxTimeMap.get(value);
+            if(oldTime >= timestamp){
+                // no any effect as the new timestamp is equal or even less than old timestamp
+                return;
+            }
+            timeSortedMap.remove(new ValueAndTime(value, oldTime));
+        }
+        // insert entry with new timestamp in timeSortedMap
+        timeSortedMap.put(vt, vt);
+        // update new timestamp in valueMaxTimeMap
+        valueMaxTimeMap.put(value, timestamp);
+
+        // evict old entries
+        // store max timestamp if possible
+        maxTimestamp = Math.max(maxTimestamp, timestamp);
+
+        // check if some values should be evicted because of time window
+        Iterator<Map.Entry<ValueAndTime, ValueAndTime>> it = timeSortedMap.entrySet().iterator();
+        while(it.hasNext()){
+            Map.Entry<ValueAndTime, ValueAndTime> entry = it.next();
+            if(entry.getKey().timestamp < maxTimestamp - window){
+                // should remove the entry in valueMaxTimeMap and timeSortedMap
+                valueMaxTimeMap.remove(entry.getKey().value);
+                windowSlided = true;
+
+                it.remove();
+            }else {
+                break;
+            }
+        }
+    }
+
+    public Map<Object, Long> distinctValues(){
+        return valueMaxTimeMap;
+    }
+
+    public boolean windowSlided(){
+        return windowSlided;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/1dffec09/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/nodata/NoDataPolicyHandler.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/nodata/NoDataPolicyHandler.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/nodata/NoDataPolicyHandler.java
index ed13f71..6e5beb6 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/nodata/NoDataPolicyHandler.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/nodata/NoDataPolicyHandler.java
@@ -29,7 +29,6 @@ import org.apache.eagle.alert.engine.coordinator.PolicyDefinition;
 import org.apache.eagle.alert.engine.coordinator.StreamDefinition;
 import org.apache.eagle.alert.engine.evaluator.PolicyHandlerContext;
 import org.apache.eagle.alert.engine.evaluator.PolicyStreamHandler;
-import org.apache.eagle.alert.engine.evaluator.impl.DistinctValuesInTimeWindow;
 import org.apache.eagle.alert.engine.model.AlertStreamEvent;
 import org.apache.eagle.alert.engine.model.StreamEvent;
 import org.apache.eagle.alert.utils.TimePeriodUtils;
@@ -50,6 +49,29 @@ import org.slf4j.LoggerFactory;
  * fixed fields and dynamic fields
  * fixed fields are leading fields : windowPeriod, type, numOfFields, f1_name, f2_name
  * dynamic fields depend on wisb type.
+ *
+ * policy would be like:
+ * {
+ "name": "noDataAlertPolicy",
+ "description": "noDataAlertPolicy",
+ "inputStreams": [
+ "noDataAlertStream"
+ ],
+ "outputStreams": [
+ "noDataAlertStream_out"
+ ],
+ "definition": {
+ "type": "nodataalert",
+ "value": "PT1M,plain,1,host,host1,host2"   // or "value": "PT1M,dynamic,1,host"
+ },
+ "partitionSpec": [
+ {
+ "streamId": "noDataAlertStream",
+ "type": "GROUPBY"
+ }
+ ],
+ "parallelismHint": 2
+ }
  */
 public class NoDataPolicyHandler implements PolicyStreamHandler{
     private static final Logger LOG = LoggerFactory.getLogger(NoDataPolicyHandler.class);
@@ -61,10 +83,10 @@ public class NoDataPolicyHandler implements PolicyStreamHandler{
     private volatile List<Integer> wisbFieldIndices = new ArrayList<>();
     // reuse PolicyDefinition.defintion.value field to store full set of values separated by comma
     private volatile PolicyDefinition policyDef;
-    private volatile DistinctValuesInTimeWindow distinctWindow;
     private volatile Collector<AlertStreamEvent> collector;
     private volatile PolicyHandlerContext context;
     private volatile NoDataWisbType wisbType;
+    private volatile DistinctValuesInTimeWindow distinctWindow;
 
     public NoDataPolicyHandler(Map<String, StreamDefinition> sds){
         this.sds = sds;
@@ -161,4 +183,4 @@ public class NoDataPolicyHandler implements PolicyStreamHandler{
     public void close() throws Exception {
 
     }
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/1dffec09/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/JsonEventSerializer.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/JsonEventSerializer.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/JsonEventSerializer.java
new file mode 100644
index 0000000..bf2a954
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/JsonEventSerializer.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.alert.engine.publisher.impl;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.eagle.alert.engine.codec.IEventSerializer;
+import org.apache.eagle.alert.engine.coordinator.StreamColumn;
+import org.apache.eagle.alert.engine.model.AlertStreamEvent;
+import org.apache.eagle.alert.utils.JsonUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * @since Jul 9, 2016
+ *
+ */
+public class JsonEventSerializer implements IEventSerializer {
+
+    private static final Logger LOG = LoggerFactory.getLogger(JsonEventSerializer.class);
+
+    @SuppressWarnings("rawtypes")
+    public JsonEventSerializer(Map stormConf) throws Exception {
+    }
+
+    @Override
+    public Object serialize(AlertStreamEvent event) {
+        String result = streamEventToJson(event);
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("serialized alert event : ", result);
+        }
+        return result;
+    }
+
+    public String streamEventToJson(AlertStreamEvent event) {
+        Map<String, Object> jsonMap = new HashMap<String, Object>();
+        jsonMap.put("policyId", event.getPolicyId());
+        jsonMap.put("streamId", event.getStreamId());
+        jsonMap.put("createBy", event.getCreatedBy());
+        jsonMap.put("createTime", event.getCreatedTime());
+        // data
+        int size = event.getData().length;
+        List<StreamColumn> columns = event.getSchema().getColumns();
+        for (int i = 0; i < size; i++) {
+            if (columns.size() < i) {
+                // redudant check to log inconsistency
+                LOG.error(" strema event data have different lenght compare to column definition! ");
+            } else {
+                jsonMap.put(columns.get(i).getName(), event.getData()[i]);
+            }
+        }
+        return JsonUtils.writeValueAsString(jsonMap);
+    }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/1dffec09/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/absence/TestAbsenceDriver.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/absence/TestAbsenceDriver.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/absence/TestAbsenceDriver.java
new file mode 100644
index 0000000..ca5bfdf
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/absence/TestAbsenceDriver.java
@@ -0,0 +1,96 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.alert.engine.absence;
+
+import org.apache.eagle.alert.engine.evaluator.absence.AbsenceDailyRule;
+import org.apache.eagle.alert.engine.evaluator.absence.AbsenceAlertDriver;
+import org.apache.eagle.alert.engine.evaluator.absence.AbsenceWindowGenerator;
+import org.junit.Test;
+
+import java.text.DateFormat;
+import java.text.SimpleDateFormat;
+import java.util.Arrays;
+import java.util.Date;
+import java.util.List;
+import java.util.TimeZone;
+
+/**
+ * Since 7/8/16.
+ */
+public class TestAbsenceDriver {
+    @Test
+    public void testAbsence() throws Exception{
+        // from 2PM to 3PM each day
+        AbsenceDailyRule rule = new AbsenceDailyRule();
+        rule.startOffset = 14*3600*1000;
+        rule.endOffset = 15*3600*1000;
+        AbsenceWindowGenerator generator = new AbsenceWindowGenerator(rule);
+        List<Object> expectAttrs = Arrays.asList("host1");
+        AbsenceAlertDriver driver = new AbsenceAlertDriver(expectAttrs, generator);
+
+        // first event came in 2016-07-08 11:20:00
+        String date = "2016-07-08 11:20:00";
+        DateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
+        df.setTimeZone(TimeZone.getTimeZone("UTC"));
+        Date d = df.parse(date);
+        long baseOccurTime = d.getTime();
+
+        // first event
+        driver.process(Arrays.asList("host2"), baseOccurTime);
+        // event after 1 hour
+        driver.process(Arrays.asList("host2"), baseOccurTime + 3600*1000);
+        // event after 2 hour
+        driver.process(Arrays.asList("host2"), baseOccurTime + 2*3600*1000);
+        // event after 3 hour, enter this window
+        driver.process(Arrays.asList("host2"), baseOccurTime + 3*3600*1000);
+        // event after 3.5 hour, still in this window
+        driver.process(Arrays.asList("host2"), baseOccurTime + 3*3600*1000 + 1800*1000);
+        // event after 4 hour, exit this window
+        driver.process(Arrays.asList("host2"), baseOccurTime + 4*3600*1000);
+    }
+
+    @Test
+    public void testOccurrence() throws Exception{
+        // from 2PM to 3PM each day
+        AbsenceDailyRule rule = new AbsenceDailyRule();
+        rule.startOffset = 14*3600*1000;
+        rule.endOffset = 15*3600*1000;
+        AbsenceWindowGenerator generator = new AbsenceWindowGenerator(rule);
+        List<Object> expectAttrs = Arrays.asList("host1");
+        AbsenceAlertDriver driver = new AbsenceAlertDriver(expectAttrs, generator);
+
+        // first event came in 2016-07-08 11:20:00
+        String date = "2016-07-08 11:20:00";
+        DateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
+        df.setTimeZone(TimeZone.getTimeZone("UTC"));
+        Date d = df.parse(date);
+        long baseOccurTime = d.getTime();
+
+        // first event
+        driver.process(Arrays.asList("host2"), baseOccurTime);
+        // event after 1 hour
+        driver.process(Arrays.asList("host2"), baseOccurTime + 3600*1000);
+        // event after 2 hour
+        driver.process(Arrays.asList("host2"), baseOccurTime + 2*3600*1000);
+        // event after 3 hour, enter this window
+        driver.process(Arrays.asList("host2"), baseOccurTime + 3*3600*1000);
+        // event after 3.5 hour, still in this window
+        driver.process(Arrays.asList("host1"), baseOccurTime + 3*3600*1000 + 1800*1000);
+        // event after 4 hour, exit this window
+        driver.process(Arrays.asList("host2"), baseOccurTime + 4*3600*1000);
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/1dffec09/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/absence/TestAbsencePolicyHandler.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/absence/TestAbsencePolicyHandler.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/absence/TestAbsencePolicyHandler.java
new file mode 100644
index 0000000..7f325c4
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/absence/TestAbsencePolicyHandler.java
@@ -0,0 +1,111 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.alert.engine.absence;
+
+import org.apache.eagle.alert.engine.Collector;
+import org.apache.eagle.alert.engine.coordinator.PolicyDefinition;
+import org.apache.eagle.alert.engine.coordinator.StreamColumn;
+import org.apache.eagle.alert.engine.coordinator.StreamDefinition;
+import org.apache.eagle.alert.engine.evaluator.PolicyHandlerContext;
+import org.apache.eagle.alert.engine.evaluator.absence.AbsencePolicyHandler;
+import org.apache.eagle.alert.engine.model.AlertStreamEvent;
+import org.apache.eagle.alert.engine.model.StreamEvent;
+import org.junit.Assert;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Since 7/8/16.
+ */
+public class TestAbsencePolicyHandler {
+    private static final Logger LOG = LoggerFactory.getLogger(TestAbsencePolicyHandler.class);
+    private static final String inputStream = "testInputStream";
+    private static final String outputStream = "testOutputStream";
+
+    @Test
+    public void test() throws Exception{
+        test(buildPolicyDef_provided());
+    }
+
+    public void test(PolicyDefinition pd) throws Exception{
+        Map<String, StreamDefinition> sds = new HashMap<>();
+        StreamDefinition sd = buildStreamDef();
+        sds.put("testInputStream", sd);
+        AbsencePolicyHandler handler = new AbsencePolicyHandler(sds);
+
+        PolicyHandlerContext context = new PolicyHandlerContext();
+        context.setPolicyDefinition(pd);
+        handler.prepare(new TestCollector(), context);
+
+        handler.send(buildStreamEvt(0, "job1", "running"));
+    }
+
+    private static class TestCollector implements Collector {
+        @Override
+        public void emit(Object o) {
+            AlertStreamEvent e = (AlertStreamEvent)o;
+            Object[] data = e.getData();
+            Assert.assertEquals("host2", data[1]);
+            LOG.info(e.toString());
+        }
+    }
+
+    private PolicyDefinition buildPolicyDef_provided(){
+        PolicyDefinition pd = new PolicyDefinition();
+        PolicyDefinition.Definition def = new PolicyDefinition.Definition();
+        def.setValue("1,jobID,job1,daily_rule,14:00:00,15:00:00");
+        def.setType("absencealert");
+        pd.setDefinition(def);
+        pd.setInputStreams(Arrays.asList(inputStream));
+        pd.setOutputStreams(Arrays.asList(outputStream));
+        pd.setName("absencealert-test");
+        return pd;
+    }
+
+    private StreamDefinition buildStreamDef(){
+        StreamDefinition sd = new StreamDefinition();
+        StreamColumn tsColumn = new StreamColumn();
+        tsColumn.setName("timestamp");
+        tsColumn.setType(StreamColumn.Type.LONG);
+
+        StreamColumn hostColumn = new StreamColumn();
+        hostColumn.setName("jobID");
+        hostColumn.setType(StreamColumn.Type.STRING);
+
+        StreamColumn valueColumn = new StreamColumn();
+        valueColumn.setName("status");
+        valueColumn.setType(StreamColumn.Type.STRING);
+
+        sd.setColumns(Arrays.asList(tsColumn, hostColumn, valueColumn));
+        sd.setDataSource("testDataSource");
+        sd.setStreamId("testStreamId");
+        return sd;
+    }
+
+    private StreamEvent buildStreamEvt(long ts, String jobID, String status){
+        StreamEvent e = new StreamEvent();
+        e.setData(new Object[]{ts, jobID, status});
+        e.setStreamId(inputStream);
+        e.setTimestamp(ts);
+        return e;
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/1dffec09/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/absence/TestAbsenceWindowGenerator.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/absence/TestAbsenceWindowGenerator.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/absence/TestAbsenceWindowGenerator.java
new file mode 100644
index 0000000..e2345c9
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/absence/TestAbsenceWindowGenerator.java
@@ -0,0 +1,80 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.alert.engine.absence;
+
+import org.apache.eagle.alert.engine.evaluator.absence.AbsenceDailyRule;
+import org.apache.eagle.alert.engine.evaluator.absence.AbsenceWindow;
+import org.apache.eagle.alert.engine.evaluator.absence.AbsenceWindowGenerator;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.text.DateFormat;
+import java.text.SimpleDateFormat;
+import java.util.Date;
+import java.util.TimeZone;
+
+/**
+ * Since 7/8/16.
+ */
+public class TestAbsenceWindowGenerator {
+    @Test
+    public void testWindowInToday() throws Exception{
+        AbsenceDailyRule rule = new AbsenceDailyRule();
+        // from 2PM to 3PM each day
+        rule.startOffset = 14*3600*1000;
+        rule.endOffset = 15*3600*1000;
+        AbsenceWindowGenerator generator = new AbsenceWindowGenerator(rule);
+
+        // get current time
+        String date = "2016-07-08 00:00:00";
+        DateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
+        df.setTimeZone(TimeZone.getTimeZone("UTC"));
+        Date d = df.parse(date);
+        long startTimeOfDay = d.getTime();
+
+        String currDate = "2016-07-08 11:30:29";
+        df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
+        df.setTimeZone(TimeZone.getTimeZone("UTC"));
+        d = df.parse(currDate);
+        AbsenceWindow window = generator.nextWindow(d.getTime());
+        Assert.assertEquals(startTimeOfDay+rule.startOffset, window.startTime);
+    }
+
+    @Test
+    public void testWindowInTomorrow() throws Exception{
+        AbsenceDailyRule rule = new AbsenceDailyRule();
+        // from 2PM to 3PM each day
+        rule.startOffset = 14*3600*1000;
+        rule.endOffset = 15*3600*1000;
+        AbsenceWindowGenerator generator = new AbsenceWindowGenerator(rule);
+
+        // get current time
+        String date = "2016-07-08 00:00:00";
+        DateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
+        df.setTimeZone(TimeZone.getTimeZone("UTC"));
+        Date d = df.parse(date);
+        long startTimeOfDay = d.getTime();
+
+        String currDate = "2016-07-08 18:20:19";
+        df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
+        df.setTimeZone(TimeZone.getTimeZone("UTC"));
+        d = df.parse(currDate);
+        AbsenceWindow window = generator.nextWindow(d.getTime());
+        // this needs adjustment for one day
+        Assert.assertEquals(startTimeOfDay+rule.startOffset + AbsenceDailyRule.DAY_MILLI_SECONDS, window.startTime);
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/1dffec09/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/absence/TestAbsenceWindowProcessor.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/absence/TestAbsenceWindowProcessor.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/absence/TestAbsenceWindowProcessor.java
new file mode 100644
index 0000000..a47c7a4
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/absence/TestAbsenceWindowProcessor.java
@@ -0,0 +1,70 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.alert.engine.absence;
+
+import org.apache.eagle.alert.engine.evaluator.absence.AbsenceWindow;
+import org.apache.eagle.alert.engine.evaluator.absence.AbsenceWindowProcessor;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.List;
+
+/**
+ * Since 7/6/16.
+ */
+public class TestAbsenceWindowProcessor {
+    @Test
+    public void testDataMissing(){
+        List<Object> expectedHosts = Arrays.asList("host1");
+        AbsenceWindow window = new AbsenceWindow();
+        window.startTime = 100L;
+        window.endTime = 200L;
+        AbsenceWindowProcessor processor = new AbsenceWindowProcessor(expectedHosts, window);
+        processor.process(Arrays.asList("host2"), 90);
+        Assert.assertEquals(processor.checkStatus(), AbsenceWindowProcessor.OccurStatus.not_sure);
+        processor.process(Arrays.asList("host3"), 101);
+        Assert.assertEquals(processor.checkStatus(), AbsenceWindowProcessor.OccurStatus.not_sure);
+        processor.process(Arrays.asList("host3"), 138);
+        Assert.assertEquals(processor.checkStatus(), AbsenceWindowProcessor.OccurStatus.not_sure);
+        processor.process(Arrays.asList("host2"), 189);
+        Assert.assertEquals(processor.checkStatus(), AbsenceWindowProcessor.OccurStatus.not_sure);
+        processor.process(Arrays.asList("host2"), 201);
+        Assert.assertEquals(processor.checkStatus(), AbsenceWindowProcessor.OccurStatus.absent);
+    }
+
+    @Test(expected = IllegalStateException.class)
+    public void testDataExists(){
+        List<Object> expectedHosts = Arrays.asList("host1");
+        AbsenceWindow window = new AbsenceWindow();
+        window.startTime = 100L;
+        window.endTime = 200L;
+        AbsenceWindowProcessor processor = new AbsenceWindowProcessor(expectedHosts, window);
+        processor.process(Arrays.asList("host2"), 90);
+        Assert.assertEquals(processor.checkStatus(), AbsenceWindowProcessor.OccurStatus.not_sure);
+        processor.process(Arrays.asList("host3"), 101);
+        Assert.assertEquals(processor.checkStatus(), AbsenceWindowProcessor.OccurStatus.not_sure);
+        processor.process(Arrays.asList("host1"), 138);
+        Assert.assertEquals(processor.checkStatus(), AbsenceWindowProcessor.OccurStatus.occured);
+        processor.process(Arrays.asList("host2"), 189);
+        Assert.assertEquals(processor.checkStatus(), AbsenceWindowProcessor.OccurStatus.occured);
+        processor.process(Arrays.asList("host2"), 201);
+        Assert.assertEquals(processor.checkStatus(), AbsenceWindowProcessor.OccurStatus.occured);
+        Assert.assertEquals(processor.checkExpired(), true);
+        processor.process(Arrays.asList("host2"), 225);
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/1dffec09/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/e2e/Integration5AbsenceAlert.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/e2e/Integration5AbsenceAlert.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/e2e/Integration5AbsenceAlert.java
new file mode 100644
index 0000000..52d1e5d
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/e2e/Integration5AbsenceAlert.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.alert.engine.e2e;
+
+import backtype.storm.utils.Utils;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+import org.apache.eagle.alert.engine.UnitTopologyMain;
+import org.apache.eagle.alert.utils.KafkaEmbedded;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Ignore;
+import org.junit.Test;
+
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+/**
+ * Since 6/29/16.
+ */
+public class Integration5AbsenceAlert {
+    private String[] args;
+
+    private ExecutorService executors = Executors.newFixedThreadPool(5);
+
+    private static KafkaEmbedded kafka;
+
+    @BeforeClass
+    public static void setup() {
+        // FIXME : start local kafka
+    }
+
+    @AfterClass
+    public static void end() {
+        if (kafka != null) {
+            kafka.shutdown();
+        }
+    }
+    @Test @Ignore
+    public void testTriggerAbsenceAlert() throws Exception{
+        System.setProperty("config.resource", "/absence/application-absence.conf");
+        ConfigFactory.invalidateCaches();
+        Config config = ConfigFactory.load();
+
+        System.out.println("loading metadatas...");
+        Integration1.loadMetadatas("/absence/", config);
+        System.out.println("loading metadatas done!");
+
+
+        executors.submit(() -> {
+            try {
+                UnitTopologyMain.main(args);
+            } catch (Exception e) {
+                e.printStackTrace();
+            }
+        });
+
+        // wait 20 seconds for topology to bring up
+        try{
+            Thread.sleep(20000);
+        }catch(Exception ex){}
+
+        // send mock data
+        executors.submit(() -> {
+            try {
+                SampleClient5AbsenceAlert.main(args);
+            } catch (Exception e) {
+                e.printStackTrace();
+            }
+        });
+
+
+        Utils.sleep(1000 * 5l);
+        while (true) {
+            Integration1.proactive_schedule(config);
+
+            Utils.sleep(1000 * 60l * 5);
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/1dffec09/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/e2e/SampleClient5AbsenceAlert.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/e2e/SampleClient5AbsenceAlert.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/e2e/SampleClient5AbsenceAlert.java
new file mode 100644
index 0000000..0256324
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/e2e/SampleClient5AbsenceAlert.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.alert.engine.e2e;
+
+import backtype.storm.utils.Utils;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Properties;
+
+/**
+ * Since 6/29/16.
+ */
+public class SampleClient5AbsenceAlert {
+    private static final Logger LOG = LoggerFactory.getLogger(SampleClient5AbsenceAlert.class);
+    private static long currentTimestamp = 1467240000000L;
+    private static long interval = 3000L;
+    public static void main(String[] args) throws Exception {
+        System.setProperty("config.resource", "/absence/application-absence.conf");
+        ConfigFactory.invalidateCaches();
+
+        Config config = ConfigFactory.load();
+        KafkaProducer producer = createProducer(config);
+        ProducerRecord record = null;
+        record = new ProducerRecord("absenceAlertTopic", createEvent("job1"));
+        producer.send(record);
+        record = new ProducerRecord("absenceAlertTopic", createEvent("job2"));
+        producer.send(record);
+        record = new ProducerRecord("absenceAlertTopic", createEvent("host3"));
+        producer.send(record);
+    }
+
+    private static class AbsenceEvent{
+        @JsonProperty
+        long timestamp;
+        @JsonProperty
+        String jobID;
+        @JsonProperty
+        String status;
+
+        public String toString(){
+            return "timestamp=" + timestamp + ",jobID=" + jobID + ",status=" + status;
+        }
+    }
+
+    private static String createEvent(String jobID) throws Exception{
+        AbsenceEvent e = new AbsenceEvent();
+        long expectTS = currentTimestamp + interval;
+        // adjust back 1 second random
+        long adjust = Math.round(2*Math.random());
+        e.timestamp = expectTS-adjust;
+        e.jobID = jobID;
+        e.status = "running";
+        LOG.info("sending event {} ",  e);
+        ObjectMapper mapper = new ObjectMapper();
+        String value = mapper.writeValueAsString(e);
+        return value;
+    }
+
+
+    public static KafkaProducer<String, String> createProducer(Config config) {
+        String servers = config.getString("kafkaProducer.bootstrapServers");
+        Properties configMap = new Properties();
+        configMap.put("bootstrap.servers", servers);
+        configMap.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
+        configMap.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
+        configMap.put("request.required.acks", "1");
+        configMap.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
+        configMap.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
+        KafkaProducer<String, String> proceduer = new KafkaProducer<String, String>(configMap);
+        return proceduer;
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/1dffec09/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/nodata/TestDistinctValuesInTimeWindow.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/nodata/TestDistinctValuesInTimeWindow.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/nodata/TestDistinctValuesInTimeWindow.java
index f97b1a8..27744a4 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/nodata/TestDistinctValuesInTimeWindow.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/nodata/TestDistinctValuesInTimeWindow.java
@@ -21,7 +21,7 @@ import java.util.Map;
 import java.util.SortedMap;
 import java.util.TreeMap;
 
-import org.apache.eagle.alert.engine.evaluator.impl.DistinctValuesInTimeWindow;
+import org.apache.eagle.alert.engine.evaluator.nodata.DistinctValuesInTimeWindow;
 import org.junit.Test;
 
 /**

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/1dffec09/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/nodata/TestNoDataAlert.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/nodata/TestNoDataAlert.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/nodata/TestNoDataAlert.java
index 569a3b0..f50ad15 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/nodata/TestNoDataAlert.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/nodata/TestNoDataAlert.java
@@ -80,4 +80,35 @@ public class TestNoDataAlert {
 //        }
 //        Thread.sleep(10000);
     }
-}
+
+    /**
+     * only alert when the successive 2 events has number of missing blocks changed
+     *from every a = hadoopJmxMetricEventStream[ component=="namenode" and metric == "hadoop.namenode.dfs.missingblocks"] -> b = hadoopJmxMetricEventStream[b.component==a.component and b.metric==a.metric and b.host==a.host and convert(b.value, "long") > convert(a.value, "long") ] select b.metric as metric, b.host as host, b.value as newNumOfMissingBlocks, a.value as oldNumOfMissingBlocks, b.timestamp as timestamp, b.component as component, b.site as site insert into tmp;
+     */
+    @Test
+    public void testMissingBlock() throws Exception{
+        ExecutionPlanRuntime runtime = new SiddhiManager().createExecutionPlanRuntime(
+                "define stream hadoopJmxMetricEventStream (component string, metric string, host string, site string, value double, timestamp long);"+
+                        "from every a = hadoopJmxMetricEventStream[ component==\"namenode\" and metric == \"hadoop.namenode.dfs.missingblocks\"] -> "+
+                        "b = hadoopJmxMetricEventStream[b.component==a.component and b.metric==a.metric and b.host==a.host and "+
+                        "convert(b.value, \"long\") > convert(a.value, \"long\") ] select b.metric as metric, b.host as host, "+
+                        "b.value as newNumOfMissingBlocks, a.value as oldNumOfMissingBlocks, b.timestamp as timestamp, b.component as component, " +
+                        "b.site as site insert into outputStream;"
+        );
+
+        runtime.addCallback("outputStream", new StreamCallback() {
+            @Override
+            public void receive(Event[] events) {
+                EventPrinter.print(events);
+            }
+        });
+
+        runtime.start();
+        runtime.getInputHandler("hadoopJmxMetricEventStream").send(System.currentTimeMillis(), new Object[]{"namenode", "hadoop.namenode.dfs.missingblocks", "host1", "site1", 12.0, 123000L});
+        runtime.getInputHandler("hadoopJmxMetricEventStream").send(System.currentTimeMillis(), new Object[]{"namenode", "hadoop.namenode.dfs.missingblocks", "host1", "site1", 13.0, 123100L});
+        runtime.getInputHandler("hadoopJmxMetricEventStream").send(System.currentTimeMillis(), new Object[]{"namenode", "hadoop.namenode.dfs.missingblocks", "host1", "site1", 16.0, 123200L});
+
+
+        Thread.sleep(5000);
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/1dffec09/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/nodata/TestNoDataPolicyHandler.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/nodata/TestNoDataPolicyHandler.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/nodata/TestNoDataPolicyHandler.java
index 6c48def..6305da8 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/nodata/TestNoDataPolicyHandler.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/nodata/TestNoDataPolicyHandler.java
@@ -85,7 +85,7 @@ public class TestNoDataPolicyHandler {
         PolicyDefinition pd = new PolicyDefinition();
         PolicyDefinition.Definition def = new PolicyDefinition.Definition();
         def.setValue("PT1M,provided,1,host,host1,host2");
-        def.setType("string");
+        def.setType("nodataalert");
         pd.setDefinition(def);
         pd.setInputStreams(Arrays.asList(inputStream));
         pd.setOutputStreams(Arrays.asList(outputStream));
@@ -97,7 +97,7 @@ public class TestNoDataPolicyHandler {
         PolicyDefinition pd = new PolicyDefinition();
         PolicyDefinition.Definition def = new PolicyDefinition.Definition();
         def.setValue("PT1M,dynamic,1,host");
-        def.setType("string");
+        def.setType("nodataalert");
         pd.setDefinition(def);
         pd.setInputStreams(Arrays.asList(inputStream));
         pd.setOutputStreams(Arrays.asList(outputStream));

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/1dffec09/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/resources/absence/application-absence.conf
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/resources/absence/application-absence.conf b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/resources/absence/application-absence.conf
new file mode 100644
index 0000000..82e3f15
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/resources/absence/application-absence.conf
@@ -0,0 +1,60 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+{
+  "topology" : {
+    "name" : "alertUnitTopology_1",
+    "numOfTotalWorkers": 20,
+    "numOfSpoutTasks" : 1,
+    "numOfRouterBolts" : 4,
+    "numOfAlertBolts" : 10,
+    "numOfPublishTasks" : 1,
+    "localMode" : "true"
+  },
+  "spout" : {
+    "kafkaBrokerZkQuorum": "sandbox.hortonworks.com:2181",
+    "kafkaBrokerZkBasePath": "/brokers",
+    "stormKafkaUseSameZkQuorumWithKafkaBroker": true,
+    "stormKafkaTransactionZkQuorum": "",
+    "stormKafkaTransactionZkPath": "/consumers",
+    "stormKafkaEagleConsumer": "eagle_consumer",
+    "stormKafkaStateUpdateIntervalMs": 2000,
+    "stormKafkaFetchSizeBytes": 1048586,
+  },
+  "zkConfig" : {
+    "zkQuorum" : "sandbox.hortonworks.com:2181",
+    "zkRoot" : "/alert",
+    "zkSessionTimeoutMs" : 10000,
+    "connectionTimeoutMs" : 10000,
+    "zkRetryTimes" : 3,
+    "zkRetryInterval" : 3000
+  },
+  "dynamicConfigSource" : {
+    "initDelayMillis": 3000,
+    "delayMillis" : 10000
+  },
+  "metadataService": {
+    "context" : "/rest",
+    "host" : "localhost",
+    "port" : 8080
+  },
+  "coordinatorService": {
+    "host": "localhost",
+    "port": "8080",
+    "context" : "/rest"
+  },
+  "kafkaProducer": {
+    "bootstrapServers": "sandbox.hortonworks.com:6667"
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/1dffec09/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/resources/absence/datasources.json
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/resources/absence/datasources.json b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/resources/absence/datasources.json
new file mode 100644
index 0000000..ed4d638
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/resources/absence/datasources.json
@@ -0,0 +1,17 @@
+[
+  {
+    "name": "absenceAlertDataSource",
+    "type": "KAFKA",
+    "properties": {},
+    "topic": "absenceAlertTopic",
+    "schemeCls": "org.apache.eagle.alert.engine.scheme.JsonScheme",
+    "codec": {
+      "streamNameSelectorProp": {
+        "userProvidedStreamName": "noDataAlertStream"
+      },
+      "streamNameSelectorCls": "org.apache.eagle.alert.engine.scheme.JsonStringStreamNameSelector",
+      "timestampColumn": "timestamp",
+      "timestampFormat": ""
+    }
+  }
+]
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/1dffec09/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/resources/absence/policies.json
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/resources/absence/policies.json b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/resources/absence/policies.json
new file mode 100644
index 0000000..a7ce7dc
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/resources/absence/policies.json
@@ -0,0 +1,24 @@
+[
+  {
+    "name": "absenceAlertPolicy",
+    "description": "absenceAlertPolicy",
+    "inputStreams": [
+      "absenceAlertStream"
+    ],
+    "outputStreams": [
+      "absenceAlertStream_out"
+    ],
+    "definition": {
+      "type": "absencealert",
+      "value": "1,jobID,job1,daily_rule,14:00:00,15:00:00"
+    },
+    "partitionSpec": [
+      {
+        "streamId": "absenceAlertStream",
+        "type": "GROUPBY",
+        "columns" : ["jobID"]
+      }
+    ],
+    "parallelismHint": 2
+  }
+]
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/1dffec09/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/resources/absence/publishments.json
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/resources/absence/publishments.json b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/resources/absence/publishments.json
new file mode 100644
index 0000000..6e9260f
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/resources/absence/publishments.json
@@ -0,0 +1,20 @@
+[
+  {
+    "name":"test-stream-output",
+    "type":"org.apache.eagle.alert.engine.publisher.impl.AlertEmailPublisher",
+    "policyIds": [
+      "absenceAlertPolicy"
+    ],
+    "properties": {
+      "subject":"UMP Test Alert",
+      "template":"",
+      "sender": "sender@corp.com",
+      "recipients": "yonzhang@ebay.com",
+      "smtp.server":"atom.corp.ebay.com",
+      "connection": "plaintext",
+      "smtp.port": "25"
+    },
+    "dedupIntervalMin" : "PT5M",
+    "serializer" : "org.apache.eagle.alert.engine.publisher.impl.StringEventSerializer"
+  }
+]
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/1dffec09/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/resources/absence/streamdefinitions.json
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/resources/absence/streamdefinitions.json b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/resources/absence/streamdefinitions.json
new file mode 100644
index 0000000..4bd7319
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/resources/absence/streamdefinitions.json
@@ -0,0 +1,29 @@
+[
+  {
+    "streamId": "absenceAlertStream",
+    "dataSource": "absenceAlertDataSource",
+    "description": "the data stream for testing absence alert",
+    "validate": false,
+    "timeseries": false,
+    "columns": [
+      {
+        "name": "jobID",
+        "type": "STRING",
+        "defaultValue": "",
+        "required": true
+      },
+      {
+        "name": "timestamp",
+        "type": "LONG",
+        "defaultValue": 0,
+        "required": true
+      },
+      {
+        "name": "status",
+        "type": "STRING",
+        "defaultValue": "running",
+        "required": true
+      }
+    ]
+  }
+]
\ No newline at end of file