You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@eagle.apache.org by yo...@apache.org on 2016/07/11 23:53:50 UTC

[2/2] incubator-eagle git commit: EAGLE-370 absence alert engine absence alert engine

EAGLE-370 absence alert engine
absence alert engine

Author: Yong Zhang <yo...@apache.org>
Reviewer: Yong Zhang

Closes: #262


Project: http://git-wip-us.apache.org/repos/asf/incubator-eagle/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-eagle/commit/1dffec09
Tree: http://git-wip-us.apache.org/repos/asf/incubator-eagle/tree/1dffec09
Diff: http://git-wip-us.apache.org/repos/asf/incubator-eagle/diff/1dffec09

Branch: refs/heads/develop
Commit: 1dffec09ccb084c484e0427d2ce2cb567403cf21
Parents: 0b77d94
Author: yonzhang <yo...@gmail.com>
Authored: Mon Jul 11 16:56:46 2016 -0700
Committer: yonzhang <yo...@gmail.com>
Committed: Mon Jul 11 16:56:46 2016 -0700

----------------------------------------------------------------------
 .../engine/evaluator/PolicyStreamHandlers.java  |   8 +-
 .../evaluator/absence/AbsenceAlertDriver.java   |  68 ++++++++
 .../evaluator/absence/AbsenceDailyRule.java     |  26 +++
 .../evaluator/absence/AbsencePolicyHandler.java | 134 +++++++++++++++
 .../engine/evaluator/absence/AbsenceRule.java   |  23 +++
 .../engine/evaluator/absence/AbsenceWindow.java |  38 +++++
 .../absence/AbsenceWindowGenerator.java         |  50 ++++++
 .../absence/AbsenceWindowProcessor.java         |  97 +++++++++++
 .../impl/DistinctValuesInTimeWindow.java        | 141 ---------------
 .../nodata/DistinctValuesInTimeWindow.java      | 141 +++++++++++++++
 .../evaluator/nodata/NoDataPolicyHandler.java   |  28 ++-
 .../publisher/impl/JsonEventSerializer.java     |  71 ++++++++
 .../alert/engine/absence/TestAbsenceDriver.java |  96 +++++++++++
 .../absence/TestAbsencePolicyHandler.java       | 111 ++++++++++++
 .../absence/TestAbsenceWindowGenerator.java     |  80 +++++++++
 .../absence/TestAbsenceWindowProcessor.java     |  70 ++++++++
 .../engine/e2e/Integration5AbsenceAlert.java    |  94 ++++++++++
 .../engine/e2e/SampleClient5AbsenceAlert.java   |  93 ++++++++++
 .../nodata/TestDistinctValuesInTimeWindow.java  |   2 +-
 .../alert/engine/nodata/TestNoDataAlert.java    |  33 +++-
 .../engine/nodata/TestNoDataPolicyHandler.java  |   4 +-
 .../resources/absence/application-absence.conf  |  60 +++++++
 .../src/test/resources/absence/datasources.json |  17 ++
 .../src/test/resources/absence/policies.json    |  24 +++
 .../test/resources/absence/publishments.json    |  20 +++
 .../resources/absence/streamdefinitions.json    |  29 ++++
 .../src/test/resources/absence/topologies.json  |  31 ++++
 .../src/test/resources/nodata/policies.json     |   5 +-
 .../eagle-machinelearning-base/pom.xml          |  37 ----
 .../apache/eagle/ml/MLAlgorithmEvaluator.java   |  50 ------
 .../org/apache/eagle/ml/MLAnomalyCallback.java  |  28 ---
 .../java/org/apache/eagle/ml/MLConstants.java   |  24 ---
 .../java/org/apache/eagle/ml/MLModelDAO.java    |  30 ----
 .../org/apache/eagle/ml/MLPolicyEvaluator.java  | 170 -------------------
 .../eagle/ml/impl/MLAnomalyCallbackImpl.java    | 107 ------------
 .../apache/eagle/ml/impl/MLModelDAOImpl.java    |  62 -------
 .../MLPolicyEvaluatorServiceProviderImpl.java   |  52 ------
 .../org/apache/eagle/ml/model/MLAlgorithm.java  |  61 -------
 .../apache/eagle/ml/model/MLCallbackResult.java | 137 ---------------
 .../eagle/ml/model/MLEntityRepository.java      |  25 ---
 .../apache/eagle/ml/model/MLModelAPIEntity.java |  67 --------
 .../eagle/ml/model/MLPolicyDefinition.java      |  82 ---------
 .../eagle/ml/utils/MLReflectionUtils.java       |  38 -----
 ....eagle.policy.PolicyEvaluatorServiceProvider |  16 --
 ....eagle.policy.PolicyEvaluatorServiceProvider |  16 --
 .../src/test/resources/application.conf         |  57 -------
 .../src/test/resources/log4j.properties         |  34 ----
 .../test/resources/ml-policyDef-UserProfile.txt |  51 ------
 eagle-core/eagle-machinelearning/pom.xml        |  33 ----
 eagle-core/pom.xml                              |   1 -
 ....eagle.policy.PolicyEvaluatorServiceProvider |   3 +-
 51 files changed, 1443 insertions(+), 1332 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/1dffec09/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/PolicyStreamHandlers.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/PolicyStreamHandlers.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/PolicyStreamHandlers.java
index e8f736c..638b240 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/PolicyStreamHandlers.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/PolicyStreamHandlers.java
@@ -19,19 +19,23 @@ package org.apache.eagle.alert.engine.evaluator;
 import java.util.Map;
 
 import org.apache.eagle.alert.engine.coordinator.StreamDefinition;
-import org.apache.eagle.alert.engine.evaluator.impl.SiddhiPolicyHandler;
+import org.apache.eagle.alert.engine.evaluator.absence.AbsencePolicyHandler;
 import org.apache.eagle.alert.engine.evaluator.nodata.NoDataPolicyHandler;
+import org.apache.eagle.alert.engine.evaluator.impl.SiddhiPolicyHandler;
 
 public class PolicyStreamHandlers {
     public static final String SIDDHI_ENGINE ="siddhi";
     public static final String NO_DATA_ALERT_ENGINE ="nodataalert";
+    public static final String ABSENCE_ALERT_ENGINE ="absencealert";
 
     public static PolicyStreamHandler createHandler(String type, Map<String, StreamDefinition> sds){
         if(SIDDHI_ENGINE.equals(type)) {
             return new SiddhiPolicyHandler(sds);
         }else if(NO_DATA_ALERT_ENGINE.equals(type)){
             return new NoDataPolicyHandler(sds);
+        }else if(ABSENCE_ALERT_ENGINE.equals(type)){
+            return new AbsencePolicyHandler(sds);
         }
-        throw new IllegalArgumentException("Illegal policy stream handler type: "+type);
+        throw new IllegalArgumentException("Illegal policy stream handler type " + type);
     }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/1dffec09/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/absence/AbsenceAlertDriver.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/absence/AbsenceAlertDriver.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/absence/AbsenceAlertDriver.java
new file mode 100644
index 0000000..bf142cd
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/absence/AbsenceAlertDriver.java
@@ -0,0 +1,68 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.alert.engine.evaluator.absence;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+
+/**
+ * Since 7/7/16.
+ * this assumes that event comes in time order
+ */
+public class AbsenceAlertDriver {
+    private static final Logger LOG = LoggerFactory.getLogger(AbsenceAlertDriver.class);
+    private List<Object> expectedAttrs;
+    private AbsenceWindowProcessor processor;
+    private AbsenceWindowGenerator windowGenerator;
+
+    public AbsenceAlertDriver(List<Object> expectedAttrs, AbsenceWindowGenerator windowGenerator){
+        this.expectedAttrs = expectedAttrs;
+        this.windowGenerator = windowGenerator;
+    }
+
+    public void process(List<Object> appearAttrs, long occurTime){
+        // initialize window
+        if(processor == null){
+            processor = nextProcessor(occurTime);
+            LOG.info("initialized a new window {}", processor);
+        }
+        processor.process(appearAttrs, occurTime);
+        AbsenceWindowProcessor.OccurStatus status = processor.checkStatus();
+        boolean expired = processor.checkExpired();
+        if(expired){
+            if(status == AbsenceWindowProcessor.OccurStatus.absent){
+                // send alert
+                LOG.info("this is an alert");
+                // figure out next window and set the new window
+            }
+            processor = nextProcessor(occurTime);
+            LOG.info("created a new window {}", processor);
+        }
+    }
+
+    /**
+     * calculate absolute time range based on current timestamp
+     * @param currTime milliseconds
+     * @return
+     */
+    private AbsenceWindowProcessor nextProcessor(long currTime){
+        AbsenceWindow window = windowGenerator.nextWindow(currTime);
+        return new AbsenceWindowProcessor(expectedAttrs, window);
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/1dffec09/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/absence/AbsenceDailyRule.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/absence/AbsenceDailyRule.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/absence/AbsenceDailyRule.java
new file mode 100644
index 0000000..ed50280
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/absence/AbsenceDailyRule.java
@@ -0,0 +1,26 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.alert.engine.evaluator.absence;
+
+/**
+ * Since 7/7/16.
+ */
+public class AbsenceDailyRule implements AbsenceRule {
+    public static final long DAY_MILLI_SECONDS = 86400*1000L;
+    public long startOffset;
+    public long endOffset;
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/1dffec09/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/absence/AbsencePolicyHandler.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/absence/AbsencePolicyHandler.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/absence/AbsencePolicyHandler.java
new file mode 100644
index 0000000..0a07a27
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/absence/AbsencePolicyHandler.java
@@ -0,0 +1,134 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.alert.engine.evaluator.absence;
+
+import org.apache.eagle.alert.engine.Collector;
+import org.apache.eagle.alert.engine.coordinator.PolicyDefinition;
+import org.apache.eagle.alert.engine.coordinator.StreamDefinition;
+import org.apache.eagle.alert.engine.evaluator.PolicyHandlerContext;
+import org.apache.eagle.alert.engine.evaluator.PolicyStreamHandler;
+import org.apache.eagle.alert.engine.model.AlertStreamEvent;
+import org.apache.eagle.alert.engine.model.StreamEvent;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.text.SimpleDateFormat;
+import java.util.*;
+
+/**
+ * Since 7/6/16.
+ *  * policy would be like:
+ * {
+ "name": "absenceAlertPolicy",
+ "description": "absenceAlertPolicy",
+ "inputStreams": [
+ "absenceAlertStream"
+ ],
+ "outputStreams": [
+ "absenceAlertStream_out"
+ ],
+ "definition": {
+ "type": "absencealert",
+ "value": "1,jobID,job1,daily_rule,14:00:00,15:00:00"
+ },
+ "partitionSpec": [
+ {
+ "streamId": "absenceAlertStream",
+ "type": "GROUPBY",
+ "columns" : ["jobID"]
+ }
+ ],
+ "parallelismHint": 2
+ }
+ */
+public class AbsencePolicyHandler implements PolicyStreamHandler {
+    private static final Logger LOG = LoggerFactory.getLogger(AbsencePolicyHandler.class);
+    private Map<String, StreamDefinition> sds;
+    private volatile PolicyDefinition policyDef;
+    private volatile Collector<AlertStreamEvent> collector;
+    private volatile PolicyHandlerContext context;
+    private volatile List<Integer> expectFieldIndices = new ArrayList<>();
+    private volatile List<Object> expectValues = new ArrayList<>();
+    private AbsenceAlertDriver driver;
+
+    public AbsencePolicyHandler(Map<String, StreamDefinition> sds){
+        this.sds = sds;
+    }
+
+    @Override
+    public void prepare(Collector<AlertStreamEvent> collector, PolicyHandlerContext context) throws Exception {
+        this.collector = collector;
+        this.context = context;
+        this.policyDef = context.getPolicyDefinition();
+        List<String> inputStreams = policyDef.getInputStreams();
+        // validate inputStreams has to contain only one stream
+        if(inputStreams.size() != 1)
+            throw new IllegalArgumentException("policy inputStream size has to be 1 for absence alert");
+        // validate outputStream has to contain only one stream
+        if(policyDef.getOutputStreams().size() != 1)
+            throw new IllegalArgumentException("policy outputStream size has to be 1 for absense alert");
+
+        String is = inputStreams.get(0);
+        StreamDefinition sd = sds.get(is);
+
+        String policyValue = policyDef.getDefinition().getValue();
+
+        // assume that absence alert policy value consists of "numOfFields, f1_name, f2_name, f1_value, f2_value, absence_window_rule_type, startTimeOffset, endTimeOffset}
+        String[] segments = policyValue.split(",");
+        int offset = 0;
+        // populate wisb field names
+        int numOfFields = Integer.parseInt(segments[offset++]);
+        for(int i = offset; i < offset+numOfFields; i++){
+            String fn = segments[i];
+            expectFieldIndices.add(sd.getColumnIndex(fn));
+        }
+        offset += numOfFields;
+        for(int i = offset; i < offset+numOfFields; i++){
+            String fn = segments[i];
+            expectValues.add(fn);
+        }
+        offset += numOfFields;
+        String absence_window_rule_type = segments[offset++];
+        AbsenceDailyRule rule = new AbsenceDailyRule();
+        SimpleDateFormat sdf = new SimpleDateFormat("HH:mm:ss");
+        sdf.setTimeZone(TimeZone.getTimeZone("UTC"));
+        Date t1 = sdf.parse(segments[offset++]);
+        rule.startOffset = t1.getTime();
+        Date t2 = sdf.parse(segments[offset++]);
+        rule.endOffset = t2.getTime();
+        AbsenceWindowGenerator generator = new AbsenceWindowGenerator(rule);
+        driver = new AbsenceAlertDriver(expectValues, generator);
+    }
+
+    @Override
+    public void send(StreamEvent event) throws Exception {
+        Object[] data = event.getData();
+        List<Object> columnValues = new ArrayList<>();
+        for(int i=0; i<expectFieldIndices.size(); i++){
+            Object o = data[expectFieldIndices.get(i)];
+            // convert value to string
+            columnValues.add(o.toString());
+        }
+
+        driver.process(columnValues, event.getTimestamp());
+    }
+
+    @Override
+    public void close() throws Exception {
+
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/1dffec09/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/absence/AbsenceRule.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/absence/AbsenceRule.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/absence/AbsenceRule.java
new file mode 100644
index 0000000..272d5cf
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/absence/AbsenceRule.java
@@ -0,0 +1,23 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.alert.engine.evaluator.absence;
+
+/**
+ * Since 7/7/16.
+ */
+public interface AbsenceRule {
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/1dffec09/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/absence/AbsenceWindow.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/absence/AbsenceWindow.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/absence/AbsenceWindow.java
new file mode 100644
index 0000000..728e702
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/absence/AbsenceWindow.java
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.alert.engine.evaluator.absence;
+
+import java.text.SimpleDateFormat;
+import java.util.Date;
+import java.util.TimeZone;
+
+/**
+ * Since 7/7/16.
+ */
+public class AbsenceWindow {
+    public long startTime;
+    public long endTime;
+
+    public String toString(){
+        SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
+        sdf.setTimeZone(TimeZone.getTimeZone("UTC"));
+        String t1 = sdf.format(new Date(startTime));
+        String t2 = sdf.format(new Date(endTime));
+        String format = "startTime=%d (%s), endTime=%d (%s)";
+        return String.format(format, startTime, t1, endTime, t2);
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/1dffec09/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/absence/AbsenceWindowGenerator.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/absence/AbsenceWindowGenerator.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/absence/AbsenceWindowGenerator.java
new file mode 100644
index 0000000..6cd0880
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/absence/AbsenceWindowGenerator.java
@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.alert.engine.evaluator.absence;
+
+/**
+ * Since 7/7/16.
+ */
+public class AbsenceWindowGenerator {
+    private AbsenceRule rule;
+    public AbsenceWindowGenerator(AbsenceRule rule){
+        this.rule = rule;
+    }
+
+    /**
+     * @param currTime
+     * @return
+     */
+    public AbsenceWindow nextWindow(long currTime){
+        AbsenceWindow window = new AbsenceWindow();
+        if(rule instanceof AbsenceDailyRule){
+            AbsenceDailyRule r = (AbsenceDailyRule)rule;
+            long adjustment = 0; // if today's window already expires, then adjust to tomorrow's window
+            if(currTime % AbsenceDailyRule.DAY_MILLI_SECONDS > r.startOffset){
+                adjustment = AbsenceDailyRule.DAY_MILLI_SECONDS;
+            }
+            // use current timestamp to round down to day
+            long day = currTime - currTime % AbsenceDailyRule.DAY_MILLI_SECONDS;
+            day += adjustment;
+            window.startTime = day + r.startOffset;
+            window.endTime = day + r.endOffset;
+            return window;
+        }else{
+            throw new UnsupportedOperationException("not supported rule " + rule);
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/1dffec09/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/absence/AbsenceWindowProcessor.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/absence/AbsenceWindowProcessor.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/absence/AbsenceWindowProcessor.java
new file mode 100644
index 0000000..4e8d381
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/absence/AbsenceWindowProcessor.java
@@ -0,0 +1,97 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.alert.engine.evaluator.absence;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+
+/**
+ * Since 7/6/16.
+ * To process each incoming event
+ * internally maintain state machine to trigger alert when some attribute does not occur within this window
+ */
+public class AbsenceWindowProcessor {
+    private static final Logger LOG = LoggerFactory.getLogger(AbsenceWindowProcessor.class);
+    private List<Object> expectAttrs;
+    private AbsenceWindow window;
+    private boolean expired; // to mark if the time range has been went through
+    private OccurStatus status = OccurStatus.not_sure;
+
+    public enum OccurStatus{
+        not_sure,
+        occured,
+        absent
+    }
+
+    public AbsenceWindowProcessor(List<Object> expectAttrs, AbsenceWindow window){
+        this.expectAttrs = expectAttrs;
+        this.window = window;
+        expired = false;
+    }
+
+    /**
+     * return true if it is certain that expected attributes don't occur during startTime and endTime, else return false
+     * @param appearAttrs
+     * @param occurTime
+     * @return
+     */
+    public void process(List<Object> appearAttrs, long occurTime){
+        if(expired)
+            throw new IllegalStateException("Expired window can't recieve events");
+        switch(status) {
+            case not_sure:
+                if(occurTime < window.startTime) {
+                    break;
+                }else if(occurTime >= window.startTime &&
+                        occurTime <= window.endTime) {
+                    if(expectAttrs.equals(appearAttrs)) {
+                        status = OccurStatus.occured;
+                    }
+                    break;
+                }else{
+                    status = OccurStatus.absent;
+                    break;
+                }
+            case occured:
+                if(occurTime > window.endTime)
+                    expired = true;
+                break;
+            default:
+                break;
+        }
+        // reset status
+        if(status == OccurStatus.absent){
+            expired = true;
+        }
+    }
+
+    public OccurStatus checkStatus(){
+        return status;
+    }
+    public boolean checkExpired(){
+        return expired;
+    }
+    public AbsenceWindow currWindow(){
+        return window;
+    }
+
+    public String toString(){
+        return "expectAttrs=" + expectAttrs + ", status=" + status + ", expired=" + expired + ", window=[" + window + "]";
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/1dffec09/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/impl/DistinctValuesInTimeWindow.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/impl/DistinctValuesInTimeWindow.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/impl/DistinctValuesInTimeWindow.java
deleted file mode 100644
index 8a681da..0000000
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/impl/DistinctValuesInTimeWindow.java
+++ /dev/null
@@ -1,141 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.engine.evaluator.impl;
-
-import java.util.Comparator;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.Map;
-import java.util.SortedMap;
-import java.util.TreeMap;
-
-import org.apache.commons.lang.builder.HashCodeBuilder;
-
-/**
- * Since 6/28/16.
- * to get distinct values within a specified time window
- * valueMaxTimeMap : each distinct value is associated with max timestamp it ever had
- * timeSortedMap : map sorted by timestamp first and then value
- * With the above 2 data structure, we can get distinct values in LOG(N)
- */
-public class DistinctValuesInTimeWindow {
-    public static class ValueAndTime{
-        Object value;
-        long timestamp;
-        public ValueAndTime(Object value, long timestamp){
-            this.value = value;
-            this.timestamp = timestamp;
-        }
-
-        public String toString(){
-            return "[" + value + "," + timestamp + "]";
-        }
-
-        public int hashCode(){
-            return new HashCodeBuilder().append(value).append(timestamp).toHashCode();
-        }
-
-        public boolean equals(Object that){
-            if(!(that instanceof ValueAndTime))
-                return false;
-            ValueAndTime another = (ValueAndTime)that;
-            return another.timestamp == this.timestamp && another.value.equals(this.value);
-        }
-    }
-
-    public static class ValueAndTimeComparator implements Comparator<ValueAndTime>{
-        @Override
-        public int compare(ValueAndTime o1, ValueAndTime o2) {
-            if(o1.timestamp != o2.timestamp)
-                return (o1.timestamp > o2.timestamp) ? 1 : -1;
-            if(o1.value.equals(o2.value))
-                return 0;
-            else {
-                // this is not strictly correct, but I don't want to write too many comparators here :-)
-                if(o1.hashCode() > o2.hashCode())
-                    return 1;
-                else
-                    return -1;
-            }
-        }
-    }
-
-    /**
-     * map from value to max timestamp for this value
-     */
-    private Map<Object, Long> valueMaxTimeMap = new HashMap<>();
-    /**
-     * map sorted by time(max timestamp for the value) and then value
-     */
-    private SortedMap<ValueAndTime, ValueAndTime> timeSortedMap = new TreeMap<>(new ValueAndTimeComparator());
-    private long maxTimestamp = 0L;
-    private long window;
-    private boolean windowSlided;
-
-    /**
-     * @param window - milliseconds
-     */
-    public DistinctValuesInTimeWindow(long window){
-        this.window = window;
-    }
-
-    public void send(Object value, long timestamp){
-        ValueAndTime vt = new ValueAndTime(value, timestamp);
-
-        // todo think of time out of order
-        if(valueMaxTimeMap.containsKey(value)){
-            // remove that entry with old timestamp in timeSortedMap
-            long oldTime = valueMaxTimeMap.get(value);
-            if(oldTime >= timestamp){
-                // no any effect as the new timestamp is equal or even less than old timestamp
-                return;
-            }
-            timeSortedMap.remove(new ValueAndTime(value, oldTime));
-        }
-        // insert entry with new timestamp in timeSortedMap
-        timeSortedMap.put(vt, vt);
-        // update new timestamp in valueMaxTimeMap
-        valueMaxTimeMap.put(value, timestamp);
-
-        // evict old entries
-        // store max timestamp if possible
-        maxTimestamp = Math.max(maxTimestamp, timestamp);
-
-        // check if some values should be evicted because of time window
-        Iterator<Map.Entry<ValueAndTime, ValueAndTime>> it = timeSortedMap.entrySet().iterator();
-        while(it.hasNext()){
-            Map.Entry<ValueAndTime, ValueAndTime> entry = it.next();
-            if(entry.getKey().timestamp < maxTimestamp - window){
-                // should remove the entry in valueMaxTimeMap and timeSortedMap
-                valueMaxTimeMap.remove(entry.getKey().value);
-                windowSlided = true;
-
-                it.remove();
-            }else {
-                break;
-            }
-        }
-    }
-
-    public Map<Object, Long> distinctValues(){
-        return valueMaxTimeMap;
-    }
-
-    public boolean windowSlided(){
-        return windowSlided;
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/1dffec09/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/nodata/DistinctValuesInTimeWindow.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/nodata/DistinctValuesInTimeWindow.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/nodata/DistinctValuesInTimeWindow.java
new file mode 100644
index 0000000..676357a
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/nodata/DistinctValuesInTimeWindow.java
@@ -0,0 +1,141 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.alert.engine.evaluator.nodata;
+
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.SortedMap;
+import java.util.TreeMap;
+
+import org.apache.commons.lang.builder.HashCodeBuilder;
+
+/**
+ * Since 6/28/16.
+ * to get distinct values within a specified time window
+ * valueMaxTimeMap : each distinct value is associated with max timestamp it ever had
+ * timeSortedMap : map sorted by timestamp first and then value
+ * With the above 2 data structure, we can get distinct values in LOG(N)
+ */
+public class DistinctValuesInTimeWindow {
+    public static class ValueAndTime{
+        Object value;
+        long timestamp;
+        public ValueAndTime(Object value, long timestamp){
+            this.value = value;
+            this.timestamp = timestamp;
+        }
+
+        public String toString(){
+            return "[" + value + "," + timestamp + "]";
+        }
+
+        public int hashCode(){
+            return new HashCodeBuilder().append(value).append(timestamp).toHashCode();
+        }
+
+        public boolean equals(Object that){
+            if(!(that instanceof ValueAndTime))
+                return false;
+            ValueAndTime another = (ValueAndTime)that;
+            return another.timestamp == this.timestamp && another.value.equals(this.value);
+        }
+    }
+
+    public static class ValueAndTimeComparator implements Comparator<ValueAndTime>{
+        @Override
+        public int compare(ValueAndTime o1, ValueAndTime o2) {
+            if(o1.timestamp != o2.timestamp)
+                return (o1.timestamp > o2.timestamp) ? 1 : -1;
+            if(o1.value.equals(o2.value))
+                return 0;
+            else {
+                // this is not strictly correct, but I don't want to write too many comparators here :-)
+                if(o1.hashCode() > o2.hashCode())
+                    return 1;
+                else
+                    return -1;
+            }
+        }
+    }
+
+    /**
+     * map from value to max timestamp for this value
+     */
+    private Map<Object, Long> valueMaxTimeMap = new HashMap<>();
+    /**
+     * map sorted by time(max timestamp for the value) and then value
+     */
+    private SortedMap<ValueAndTime, ValueAndTime> timeSortedMap = new TreeMap<>(new ValueAndTimeComparator());
+    private long maxTimestamp = 0L;
+    private long window;
+    private boolean windowSlided;
+
+    /**
+     * @param window - milliseconds
+     */
+    public DistinctValuesInTimeWindow(long window){
+        this.window = window;
+    }
+
+    public void send(Object value, long timestamp){
+        ValueAndTime vt = new ValueAndTime(value, timestamp);
+
+        // todo think of time out of order
+        if(valueMaxTimeMap.containsKey(value)){
+            // remove that entry with old timestamp in timeSortedMap
+            long oldTime = valueMaxTimeMap.get(value);
+            if(oldTime >= timestamp){
+                // no any effect as the new timestamp is equal or even less than old timestamp
+                return;
+            }
+            timeSortedMap.remove(new ValueAndTime(value, oldTime));
+        }
+        // insert entry with new timestamp in timeSortedMap
+        timeSortedMap.put(vt, vt);
+        // update new timestamp in valueMaxTimeMap
+        valueMaxTimeMap.put(value, timestamp);
+
+        // evict old entries
+        // store max timestamp if possible
+        maxTimestamp = Math.max(maxTimestamp, timestamp);
+
+        // check if some values should be evicted because of time window
+        Iterator<Map.Entry<ValueAndTime, ValueAndTime>> it = timeSortedMap.entrySet().iterator();
+        while(it.hasNext()){
+            Map.Entry<ValueAndTime, ValueAndTime> entry = it.next();
+            if(entry.getKey().timestamp < maxTimestamp - window){
+                // should remove the entry in valueMaxTimeMap and timeSortedMap
+                valueMaxTimeMap.remove(entry.getKey().value);
+                windowSlided = true;
+
+                it.remove();
+            }else {
+                break;
+            }
+        }
+    }
+
+    public Map<Object, Long> distinctValues(){
+        return valueMaxTimeMap;
+    }
+
+    public boolean windowSlided(){
+        return windowSlided;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/1dffec09/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/nodata/NoDataPolicyHandler.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/nodata/NoDataPolicyHandler.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/nodata/NoDataPolicyHandler.java
index ed13f71..6e5beb6 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/nodata/NoDataPolicyHandler.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/nodata/NoDataPolicyHandler.java
@@ -29,7 +29,6 @@ import org.apache.eagle.alert.engine.coordinator.PolicyDefinition;
 import org.apache.eagle.alert.engine.coordinator.StreamDefinition;
 import org.apache.eagle.alert.engine.evaluator.PolicyHandlerContext;
 import org.apache.eagle.alert.engine.evaluator.PolicyStreamHandler;
-import org.apache.eagle.alert.engine.evaluator.impl.DistinctValuesInTimeWindow;
 import org.apache.eagle.alert.engine.model.AlertStreamEvent;
 import org.apache.eagle.alert.engine.model.StreamEvent;
 import org.apache.eagle.alert.utils.TimePeriodUtils;
@@ -50,6 +49,29 @@ import org.slf4j.LoggerFactory;
  * fixed fields and dynamic fields
  * fixed fields are leading fields : windowPeriod, type, numOfFields, f1_name, f2_name
  * dynamic fields depend on wisb type.
+ *
+ * policy would be like:
+ * {
+ "name": "noDataAlertPolicy",
+ "description": "noDataAlertPolicy",
+ "inputStreams": [
+ "noDataAlertStream"
+ ],
+ "outputStreams": [
+ "noDataAlertStream_out"
+ ],
+ "definition": {
+ "type": "nodataalert",
+ "value": "PT1M,plain,1,host,host1,host2"   // or "value": "PT1M,dynamic,1,host"
+ },
+ "partitionSpec": [
+ {
+ "streamId": "noDataAlertStream",
+ "type": "GROUPBY"
+ }
+ ],
+ "parallelismHint": 2
+ }
  */
 public class NoDataPolicyHandler implements PolicyStreamHandler{
     private static final Logger LOG = LoggerFactory.getLogger(NoDataPolicyHandler.class);
@@ -61,10 +83,10 @@ public class NoDataPolicyHandler implements PolicyStreamHandler{
     private volatile List<Integer> wisbFieldIndices = new ArrayList<>();
     // reuse PolicyDefinition.defintion.value field to store full set of values separated by comma
     private volatile PolicyDefinition policyDef;
-    private volatile DistinctValuesInTimeWindow distinctWindow;
     private volatile Collector<AlertStreamEvent> collector;
     private volatile PolicyHandlerContext context;
     private volatile NoDataWisbType wisbType;
+    private volatile DistinctValuesInTimeWindow distinctWindow;
 
     public NoDataPolicyHandler(Map<String, StreamDefinition> sds){
         this.sds = sds;
@@ -161,4 +183,4 @@ public class NoDataPolicyHandler implements PolicyStreamHandler{
     public void close() throws Exception {
 
     }
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/1dffec09/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/JsonEventSerializer.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/JsonEventSerializer.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/JsonEventSerializer.java
new file mode 100644
index 0000000..bf2a954
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/JsonEventSerializer.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.alert.engine.publisher.impl;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.eagle.alert.engine.codec.IEventSerializer;
+import org.apache.eagle.alert.engine.coordinator.StreamColumn;
+import org.apache.eagle.alert.engine.model.AlertStreamEvent;
+import org.apache.eagle.alert.utils.JsonUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * @since Jul 9, 2016
+ *
+ */
+public class JsonEventSerializer implements IEventSerializer {
+
+    private static final Logger LOG = LoggerFactory.getLogger(JsonEventSerializer.class);
+
+    @SuppressWarnings("rawtypes")
+    public JsonEventSerializer(Map stormConf) throws Exception {
+    }
+
+    @Override
+    public Object serialize(AlertStreamEvent event) {
+        String result = streamEventToJson(event);
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("serialized alert event : ", result);
+        }
+        return result;
+    }
+
+    public String streamEventToJson(AlertStreamEvent event) {
+        Map<String, Object> jsonMap = new HashMap<String, Object>();
+        jsonMap.put("policyId", event.getPolicyId());
+        jsonMap.put("streamId", event.getStreamId());
+        jsonMap.put("createBy", event.getCreatedBy());
+        jsonMap.put("createTime", event.getCreatedTime());
+        // data
+        int size = event.getData().length;
+        List<StreamColumn> columns = event.getSchema().getColumns();
+        for (int i = 0; i < size; i++) {
+            if (columns.size() < i) {
+                // redudant check to log inconsistency
+                LOG.error(" strema event data have different lenght compare to column definition! ");
+            } else {
+                jsonMap.put(columns.get(i).getName(), event.getData()[i]);
+            }
+        }
+        return JsonUtils.writeValueAsString(jsonMap);
+    }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/1dffec09/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/absence/TestAbsenceDriver.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/absence/TestAbsenceDriver.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/absence/TestAbsenceDriver.java
new file mode 100644
index 0000000..ca5bfdf
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/absence/TestAbsenceDriver.java
@@ -0,0 +1,96 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.alert.engine.absence;
+
+import org.apache.eagle.alert.engine.evaluator.absence.AbsenceDailyRule;
+import org.apache.eagle.alert.engine.evaluator.absence.AbsenceAlertDriver;
+import org.apache.eagle.alert.engine.evaluator.absence.AbsenceWindowGenerator;
+import org.junit.Test;
+
+import java.text.DateFormat;
+import java.text.SimpleDateFormat;
+import java.util.Arrays;
+import java.util.Date;
+import java.util.List;
+import java.util.TimeZone;
+
+/**
+ * Since 7/8/16.
+ */
+public class TestAbsenceDriver {
+    @Test
+    public void testAbsence() throws Exception{
+        // from 2PM to 3PM each day
+        AbsenceDailyRule rule = new AbsenceDailyRule();
+        rule.startOffset = 14*3600*1000;
+        rule.endOffset = 15*3600*1000;
+        AbsenceWindowGenerator generator = new AbsenceWindowGenerator(rule);
+        List<Object> expectAttrs = Arrays.asList("host1");
+        AbsenceAlertDriver driver = new AbsenceAlertDriver(expectAttrs, generator);
+
+        // first event came in 2016-07-08 11:20:00
+        String date = "2016-07-08 11:20:00";
+        DateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
+        df.setTimeZone(TimeZone.getTimeZone("UTC"));
+        Date d = df.parse(date);
+        long baseOccurTime = d.getTime();
+
+        // first event
+        driver.process(Arrays.asList("host2"), baseOccurTime);
+        // event after 1 hour
+        driver.process(Arrays.asList("host2"), baseOccurTime + 3600*1000);
+        // event after 2 hour
+        driver.process(Arrays.asList("host2"), baseOccurTime + 2*3600*1000);
+        // event after 3 hour, enter this window
+        driver.process(Arrays.asList("host2"), baseOccurTime + 3*3600*1000);
+        // event after 3.5 hour, still in this window
+        driver.process(Arrays.asList("host2"), baseOccurTime + 3*3600*1000 + 1800*1000);
+        // event after 4 hour, exit this window
+        driver.process(Arrays.asList("host2"), baseOccurTime + 4*3600*1000);
+    }
+
+    @Test
+    public void testOccurrence() throws Exception{
+        // from 2PM to 3PM each day
+        AbsenceDailyRule rule = new AbsenceDailyRule();
+        rule.startOffset = 14*3600*1000;
+        rule.endOffset = 15*3600*1000;
+        AbsenceWindowGenerator generator = new AbsenceWindowGenerator(rule);
+        List<Object> expectAttrs = Arrays.asList("host1");
+        AbsenceAlertDriver driver = new AbsenceAlertDriver(expectAttrs, generator);
+
+        // first event came in 2016-07-08 11:20:00
+        String date = "2016-07-08 11:20:00";
+        DateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
+        df.setTimeZone(TimeZone.getTimeZone("UTC"));
+        Date d = df.parse(date);
+        long baseOccurTime = d.getTime();
+
+        // first event
+        driver.process(Arrays.asList("host2"), baseOccurTime);
+        // event after 1 hour
+        driver.process(Arrays.asList("host2"), baseOccurTime + 3600*1000);
+        // event after 2 hour
+        driver.process(Arrays.asList("host2"), baseOccurTime + 2*3600*1000);
+        // event after 3 hour, enter this window
+        driver.process(Arrays.asList("host2"), baseOccurTime + 3*3600*1000);
+        // event after 3.5 hour, still in this window
+        driver.process(Arrays.asList("host1"), baseOccurTime + 3*3600*1000 + 1800*1000);
+        // event after 4 hour, exit this window
+        driver.process(Arrays.asList("host2"), baseOccurTime + 4*3600*1000);
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/1dffec09/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/absence/TestAbsencePolicyHandler.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/absence/TestAbsencePolicyHandler.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/absence/TestAbsencePolicyHandler.java
new file mode 100644
index 0000000..7f325c4
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/absence/TestAbsencePolicyHandler.java
@@ -0,0 +1,111 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.alert.engine.absence;
+
+import org.apache.eagle.alert.engine.Collector;
+import org.apache.eagle.alert.engine.coordinator.PolicyDefinition;
+import org.apache.eagle.alert.engine.coordinator.StreamColumn;
+import org.apache.eagle.alert.engine.coordinator.StreamDefinition;
+import org.apache.eagle.alert.engine.evaluator.PolicyHandlerContext;
+import org.apache.eagle.alert.engine.evaluator.absence.AbsencePolicyHandler;
+import org.apache.eagle.alert.engine.model.AlertStreamEvent;
+import org.apache.eagle.alert.engine.model.StreamEvent;
+import org.junit.Assert;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Since 7/8/16.
+ */
+public class TestAbsencePolicyHandler {
+    private static final Logger LOG = LoggerFactory.getLogger(TestAbsencePolicyHandler.class);
+    private static final String inputStream = "testInputStream";
+    private static final String outputStream = "testOutputStream";
+
+    @Test
+    public void test() throws Exception{
+        test(buildPolicyDef_provided());
+    }
+
+    public void test(PolicyDefinition pd) throws Exception{
+        Map<String, StreamDefinition> sds = new HashMap<>();
+        StreamDefinition sd = buildStreamDef();
+        sds.put("testInputStream", sd);
+        AbsencePolicyHandler handler = new AbsencePolicyHandler(sds);
+
+        PolicyHandlerContext context = new PolicyHandlerContext();
+        context.setPolicyDefinition(pd);
+        handler.prepare(new TestCollector(), context);
+
+        handler.send(buildStreamEvt(0, "job1", "running"));
+    }
+
+    private static class TestCollector implements Collector {
+        @Override
+        public void emit(Object o) {
+            AlertStreamEvent e = (AlertStreamEvent)o;
+            Object[] data = e.getData();
+            Assert.assertEquals("host2", data[1]);
+            LOG.info(e.toString());
+        }
+    }
+
+    private PolicyDefinition buildPolicyDef_provided(){
+        PolicyDefinition pd = new PolicyDefinition();
+        PolicyDefinition.Definition def = new PolicyDefinition.Definition();
+        def.setValue("1,jobID,job1,daily_rule,14:00:00,15:00:00");
+        def.setType("absencealert");
+        pd.setDefinition(def);
+        pd.setInputStreams(Arrays.asList(inputStream));
+        pd.setOutputStreams(Arrays.asList(outputStream));
+        pd.setName("absencealert-test");
+        return pd;
+    }
+
+    private StreamDefinition buildStreamDef(){
+        StreamDefinition sd = new StreamDefinition();
+        StreamColumn tsColumn = new StreamColumn();
+        tsColumn.setName("timestamp");
+        tsColumn.setType(StreamColumn.Type.LONG);
+
+        StreamColumn hostColumn = new StreamColumn();
+        hostColumn.setName("jobID");
+        hostColumn.setType(StreamColumn.Type.STRING);
+
+        StreamColumn valueColumn = new StreamColumn();
+        valueColumn.setName("status");
+        valueColumn.setType(StreamColumn.Type.STRING);
+
+        sd.setColumns(Arrays.asList(tsColumn, hostColumn, valueColumn));
+        sd.setDataSource("testDataSource");
+        sd.setStreamId("testStreamId");
+        return sd;
+    }
+
+    private StreamEvent buildStreamEvt(long ts, String jobID, String status){
+        StreamEvent e = new StreamEvent();
+        e.setData(new Object[]{ts, jobID, status});
+        e.setStreamId(inputStream);
+        e.setTimestamp(ts);
+        return e;
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/1dffec09/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/absence/TestAbsenceWindowGenerator.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/absence/TestAbsenceWindowGenerator.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/absence/TestAbsenceWindowGenerator.java
new file mode 100644
index 0000000..e2345c9
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/absence/TestAbsenceWindowGenerator.java
@@ -0,0 +1,80 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.alert.engine.absence;
+
+import org.apache.eagle.alert.engine.evaluator.absence.AbsenceDailyRule;
+import org.apache.eagle.alert.engine.evaluator.absence.AbsenceWindow;
+import org.apache.eagle.alert.engine.evaluator.absence.AbsenceWindowGenerator;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.text.DateFormat;
+import java.text.SimpleDateFormat;
+import java.util.Date;
+import java.util.TimeZone;
+
+/**
+ * Since 7/8/16.
+ */
+public class TestAbsenceWindowGenerator {
+    @Test
+    public void testWindowInToday() throws Exception{
+        AbsenceDailyRule rule = new AbsenceDailyRule();
+        // from 2PM to 3PM each day
+        rule.startOffset = 14*3600*1000;
+        rule.endOffset = 15*3600*1000;
+        AbsenceWindowGenerator generator = new AbsenceWindowGenerator(rule);
+
+        // get current time
+        String date = "2016-07-08 00:00:00";
+        DateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
+        df.setTimeZone(TimeZone.getTimeZone("UTC"));
+        Date d = df.parse(date);
+        long startTimeOfDay = d.getTime();
+
+        String currDate = "2016-07-08 11:30:29";
+        df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
+        df.setTimeZone(TimeZone.getTimeZone("UTC"));
+        d = df.parse(currDate);
+        AbsenceWindow window = generator.nextWindow(d.getTime());
+        Assert.assertEquals(startTimeOfDay+rule.startOffset, window.startTime);
+    }
+
+    @Test
+    public void testWindowInTomorrow() throws Exception{
+        AbsenceDailyRule rule = new AbsenceDailyRule();
+        // from 2PM to 3PM each day
+        rule.startOffset = 14*3600*1000;
+        rule.endOffset = 15*3600*1000;
+        AbsenceWindowGenerator generator = new AbsenceWindowGenerator(rule);
+
+        // get current time
+        String date = "2016-07-08 00:00:00";
+        DateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
+        df.setTimeZone(TimeZone.getTimeZone("UTC"));
+        Date d = df.parse(date);
+        long startTimeOfDay = d.getTime();
+
+        String currDate = "2016-07-08 18:20:19";
+        df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
+        df.setTimeZone(TimeZone.getTimeZone("UTC"));
+        d = df.parse(currDate);
+        AbsenceWindow window = generator.nextWindow(d.getTime());
+        // this needs adjustment for one day
+        Assert.assertEquals(startTimeOfDay+rule.startOffset + AbsenceDailyRule.DAY_MILLI_SECONDS, window.startTime);
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/1dffec09/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/absence/TestAbsenceWindowProcessor.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/absence/TestAbsenceWindowProcessor.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/absence/TestAbsenceWindowProcessor.java
new file mode 100644
index 0000000..a47c7a4
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/absence/TestAbsenceWindowProcessor.java
@@ -0,0 +1,70 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.alert.engine.absence;
+
+import org.apache.eagle.alert.engine.evaluator.absence.AbsenceWindow;
+import org.apache.eagle.alert.engine.evaluator.absence.AbsenceWindowProcessor;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.List;
+
+/**
+ * Since 7/6/16.
+ */
+public class TestAbsenceWindowProcessor {
+    @Test
+    public void testDataMissing(){
+        List<Object> expectedHosts = Arrays.asList("host1");
+        AbsenceWindow window = new AbsenceWindow();
+        window.startTime = 100L;
+        window.endTime = 200L;
+        AbsenceWindowProcessor processor = new AbsenceWindowProcessor(expectedHosts, window);
+        processor.process(Arrays.asList("host2"), 90);
+        Assert.assertEquals(processor.checkStatus(), AbsenceWindowProcessor.OccurStatus.not_sure);
+        processor.process(Arrays.asList("host3"), 101);
+        Assert.assertEquals(processor.checkStatus(), AbsenceWindowProcessor.OccurStatus.not_sure);
+        processor.process(Arrays.asList("host3"), 138);
+        Assert.assertEquals(processor.checkStatus(), AbsenceWindowProcessor.OccurStatus.not_sure);
+        processor.process(Arrays.asList("host2"), 189);
+        Assert.assertEquals(processor.checkStatus(), AbsenceWindowProcessor.OccurStatus.not_sure);
+        processor.process(Arrays.asList("host2"), 201);
+        Assert.assertEquals(processor.checkStatus(), AbsenceWindowProcessor.OccurStatus.absent);
+    }
+
+    @Test(expected = IllegalStateException.class)
+    public void testDataExists(){
+        List<Object> expectedHosts = Arrays.asList("host1");
+        AbsenceWindow window = new AbsenceWindow();
+        window.startTime = 100L;
+        window.endTime = 200L;
+        AbsenceWindowProcessor processor = new AbsenceWindowProcessor(expectedHosts, window);
+        processor.process(Arrays.asList("host2"), 90);
+        Assert.assertEquals(processor.checkStatus(), AbsenceWindowProcessor.OccurStatus.not_sure);
+        processor.process(Arrays.asList("host3"), 101);
+        Assert.assertEquals(processor.checkStatus(), AbsenceWindowProcessor.OccurStatus.not_sure);
+        processor.process(Arrays.asList("host1"), 138);
+        Assert.assertEquals(processor.checkStatus(), AbsenceWindowProcessor.OccurStatus.occured);
+        processor.process(Arrays.asList("host2"), 189);
+        Assert.assertEquals(processor.checkStatus(), AbsenceWindowProcessor.OccurStatus.occured);
+        processor.process(Arrays.asList("host2"), 201);
+        Assert.assertEquals(processor.checkStatus(), AbsenceWindowProcessor.OccurStatus.occured);
+        Assert.assertEquals(processor.checkExpired(), true);
+        processor.process(Arrays.asList("host2"), 225);
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/1dffec09/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/e2e/Integration5AbsenceAlert.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/e2e/Integration5AbsenceAlert.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/e2e/Integration5AbsenceAlert.java
new file mode 100644
index 0000000..52d1e5d
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/e2e/Integration5AbsenceAlert.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.alert.engine.e2e;
+
+import backtype.storm.utils.Utils;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+import org.apache.eagle.alert.engine.UnitTopologyMain;
+import org.apache.eagle.alert.utils.KafkaEmbedded;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Ignore;
+import org.junit.Test;
+
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+/**
+ * Since 6/29/16.
+ */
+public class Integration5AbsenceAlert {
+    private String[] args;
+
+    private ExecutorService executors = Executors.newFixedThreadPool(5);
+
+    private static KafkaEmbedded kafka;
+
+    @BeforeClass
+    public static void setup() {
+        // FIXME : start local kafka
+    }
+
+    @AfterClass
+    public static void end() {
+        if (kafka != null) {
+            kafka.shutdown();
+        }
+    }
+    @Test @Ignore
+    public void testTriggerAbsenceAlert() throws Exception{
+        System.setProperty("config.resource", "/absence/application-absence.conf");
+        ConfigFactory.invalidateCaches();
+        Config config = ConfigFactory.load();
+
+        System.out.println("loading metadatas...");
+        Integration1.loadMetadatas("/absence/", config);
+        System.out.println("loading metadatas done!");
+
+
+        executors.submit(() -> {
+            try {
+                UnitTopologyMain.main(args);
+            } catch (Exception e) {
+                e.printStackTrace();
+            }
+        });
+
+        // wait 20 seconds for topology to bring up
+        try{
+            Thread.sleep(20000);
+        }catch(Exception ex){}
+
+        // send mock data
+        executors.submit(() -> {
+            try {
+                SampleClient5AbsenceAlert.main(args);
+            } catch (Exception e) {
+                e.printStackTrace();
+            }
+        });
+
+
+        Utils.sleep(1000 * 5l);
+        while (true) {
+            Integration1.proactive_schedule(config);
+
+            Utils.sleep(1000 * 60l * 5);
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/1dffec09/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/e2e/SampleClient5AbsenceAlert.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/e2e/SampleClient5AbsenceAlert.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/e2e/SampleClient5AbsenceAlert.java
new file mode 100644
index 0000000..0256324
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/e2e/SampleClient5AbsenceAlert.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.alert.engine.e2e;
+
+import backtype.storm.utils.Utils;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Properties;
+
+/**
+ * Since 6/29/16.
+ */
+public class SampleClient5AbsenceAlert {
+    private static final Logger LOG = LoggerFactory.getLogger(SampleClient5AbsenceAlert.class);
+    private static long currentTimestamp = 1467240000000L;
+    private static long interval = 3000L;
+    public static void main(String[] args) throws Exception {
+        System.setProperty("config.resource", "/absence/application-absence.conf");
+        ConfigFactory.invalidateCaches();
+
+        Config config = ConfigFactory.load();
+        KafkaProducer producer = createProducer(config);
+        ProducerRecord record = null;
+        record = new ProducerRecord("absenceAlertTopic", createEvent("job1"));
+        producer.send(record);
+        record = new ProducerRecord("absenceAlertTopic", createEvent("job2"));
+        producer.send(record);
+        record = new ProducerRecord("absenceAlertTopic", createEvent("host3"));
+        producer.send(record);
+    }
+
+    private static class AbsenceEvent{
+        @JsonProperty
+        long timestamp;
+        @JsonProperty
+        String jobID;
+        @JsonProperty
+        String status;
+
+        public String toString(){
+            return "timestamp=" + timestamp + ",jobID=" + jobID + ",status=" + status;
+        }
+    }
+
+    private static String createEvent(String jobID) throws Exception{
+        AbsenceEvent e = new AbsenceEvent();
+        long expectTS = currentTimestamp + interval;
+        // adjust back 1 second random
+        long adjust = Math.round(2*Math.random());
+        e.timestamp = expectTS-adjust;
+        e.jobID = jobID;
+        e.status = "running";
+        LOG.info("sending event {} ",  e);
+        ObjectMapper mapper = new ObjectMapper();
+        String value = mapper.writeValueAsString(e);
+        return value;
+    }
+
+
+    public static KafkaProducer<String, String> createProducer(Config config) {
+        String servers = config.getString("kafkaProducer.bootstrapServers");
+        Properties configMap = new Properties();
+        configMap.put("bootstrap.servers", servers);
+        configMap.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
+        configMap.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
+        configMap.put("request.required.acks", "1");
+        configMap.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
+        configMap.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
+        KafkaProducer<String, String> proceduer = new KafkaProducer<String, String>(configMap);
+        return proceduer;
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/1dffec09/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/nodata/TestDistinctValuesInTimeWindow.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/nodata/TestDistinctValuesInTimeWindow.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/nodata/TestDistinctValuesInTimeWindow.java
index f97b1a8..27744a4 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/nodata/TestDistinctValuesInTimeWindow.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/nodata/TestDistinctValuesInTimeWindow.java
@@ -21,7 +21,7 @@ import java.util.Map;
 import java.util.SortedMap;
 import java.util.TreeMap;
 
-import org.apache.eagle.alert.engine.evaluator.impl.DistinctValuesInTimeWindow;
+import org.apache.eagle.alert.engine.evaluator.nodata.DistinctValuesInTimeWindow;
 import org.junit.Test;
 
 /**

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/1dffec09/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/nodata/TestNoDataAlert.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/nodata/TestNoDataAlert.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/nodata/TestNoDataAlert.java
index 569a3b0..f50ad15 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/nodata/TestNoDataAlert.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/nodata/TestNoDataAlert.java
@@ -80,4 +80,35 @@ public class TestNoDataAlert {
 //        }
 //        Thread.sleep(10000);
     }
-}
+
+    /**
+     * only alert when the successive 2 events has number of missing blocks changed
+     *from every a = hadoopJmxMetricEventStream[ component=="namenode" and metric == "hadoop.namenode.dfs.missingblocks"] -> b = hadoopJmxMetricEventStream[b.component==a.component and b.metric==a.metric and b.host==a.host and convert(b.value, "long") > convert(a.value, "long") ] select b.metric as metric, b.host as host, b.value as newNumOfMissingBlocks, a.value as oldNumOfMissingBlocks, b.timestamp as timestamp, b.component as component, b.site as site insert into tmp;
+     */
+    @Test
+    public void testMissingBlock() throws Exception{
+        ExecutionPlanRuntime runtime = new SiddhiManager().createExecutionPlanRuntime(
+                "define stream hadoopJmxMetricEventStream (component string, metric string, host string, site string, value double, timestamp long);"+
+                        "from every a = hadoopJmxMetricEventStream[ component==\"namenode\" and metric == \"hadoop.namenode.dfs.missingblocks\"] -> "+
+                        "b = hadoopJmxMetricEventStream[b.component==a.component and b.metric==a.metric and b.host==a.host and "+
+                        "convert(b.value, \"long\") > convert(a.value, \"long\") ] select b.metric as metric, b.host as host, "+
+                        "b.value as newNumOfMissingBlocks, a.value as oldNumOfMissingBlocks, b.timestamp as timestamp, b.component as component, " +
+                        "b.site as site insert into outputStream;"
+        );
+
+        runtime.addCallback("outputStream", new StreamCallback() {
+            @Override
+            public void receive(Event[] events) {
+                EventPrinter.print(events);
+            }
+        });
+
+        runtime.start();
+        runtime.getInputHandler("hadoopJmxMetricEventStream").send(System.currentTimeMillis(), new Object[]{"namenode", "hadoop.namenode.dfs.missingblocks", "host1", "site1", 12.0, 123000L});
+        runtime.getInputHandler("hadoopJmxMetricEventStream").send(System.currentTimeMillis(), new Object[]{"namenode", "hadoop.namenode.dfs.missingblocks", "host1", "site1", 13.0, 123100L});
+        runtime.getInputHandler("hadoopJmxMetricEventStream").send(System.currentTimeMillis(), new Object[]{"namenode", "hadoop.namenode.dfs.missingblocks", "host1", "site1", 16.0, 123200L});
+
+
+        Thread.sleep(5000);
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/1dffec09/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/nodata/TestNoDataPolicyHandler.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/nodata/TestNoDataPolicyHandler.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/nodata/TestNoDataPolicyHandler.java
index 6c48def..6305da8 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/nodata/TestNoDataPolicyHandler.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/nodata/TestNoDataPolicyHandler.java
@@ -85,7 +85,7 @@ public class TestNoDataPolicyHandler {
         PolicyDefinition pd = new PolicyDefinition();
         PolicyDefinition.Definition def = new PolicyDefinition.Definition();
         def.setValue("PT1M,provided,1,host,host1,host2");
-        def.setType("string");
+        def.setType("nodataalert");
         pd.setDefinition(def);
         pd.setInputStreams(Arrays.asList(inputStream));
         pd.setOutputStreams(Arrays.asList(outputStream));
@@ -97,7 +97,7 @@ public class TestNoDataPolicyHandler {
         PolicyDefinition pd = new PolicyDefinition();
         PolicyDefinition.Definition def = new PolicyDefinition.Definition();
         def.setValue("PT1M,dynamic,1,host");
-        def.setType("string");
+        def.setType("nodataalert");
         pd.setDefinition(def);
         pd.setInputStreams(Arrays.asList(inputStream));
         pd.setOutputStreams(Arrays.asList(outputStream));

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/1dffec09/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/resources/absence/application-absence.conf
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/resources/absence/application-absence.conf b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/resources/absence/application-absence.conf
new file mode 100644
index 0000000..82e3f15
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/resources/absence/application-absence.conf
@@ -0,0 +1,60 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+{
+  "topology" : {
+    "name" : "alertUnitTopology_1",
+    "numOfTotalWorkers": 20,
+    "numOfSpoutTasks" : 1,
+    "numOfRouterBolts" : 4,
+    "numOfAlertBolts" : 10,
+    "numOfPublishTasks" : 1,
+    "localMode" : "true"
+  },
+  "spout" : {
+    "kafkaBrokerZkQuorum": "sandbox.hortonworks.com:2181",
+    "kafkaBrokerZkBasePath": "/brokers",
+    "stormKafkaUseSameZkQuorumWithKafkaBroker": true,
+    "stormKafkaTransactionZkQuorum": "",
+    "stormKafkaTransactionZkPath": "/consumers",
+    "stormKafkaEagleConsumer": "eagle_consumer",
+    "stormKafkaStateUpdateIntervalMs": 2000,
+    "stormKafkaFetchSizeBytes": 1048586,
+  },
+  "zkConfig" : {
+    "zkQuorum" : "sandbox.hortonworks.com:2181",
+    "zkRoot" : "/alert",
+    "zkSessionTimeoutMs" : 10000,
+    "connectionTimeoutMs" : 10000,
+    "zkRetryTimes" : 3,
+    "zkRetryInterval" : 3000
+  },
+  "dynamicConfigSource" : {
+    "initDelayMillis": 3000,
+    "delayMillis" : 10000
+  },
+  "metadataService": {
+    "context" : "/rest",
+    "host" : "localhost",
+    "port" : 8080
+  },
+  "coordinatorService": {
+    "host": "localhost",
+    "port": "8080",
+    "context" : "/rest"
+  },
+  "kafkaProducer": {
+    "bootstrapServers": "sandbox.hortonworks.com:6667"
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/1dffec09/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/resources/absence/datasources.json
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/resources/absence/datasources.json b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/resources/absence/datasources.json
new file mode 100644
index 0000000..ed4d638
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/resources/absence/datasources.json
@@ -0,0 +1,17 @@
+[
+  {
+    "name": "absenceAlertDataSource",
+    "type": "KAFKA",
+    "properties": {},
+    "topic": "absenceAlertTopic",
+    "schemeCls": "org.apache.eagle.alert.engine.scheme.JsonScheme",
+    "codec": {
+      "streamNameSelectorProp": {
+        "userProvidedStreamName": "noDataAlertStream"
+      },
+      "streamNameSelectorCls": "org.apache.eagle.alert.engine.scheme.JsonStringStreamNameSelector",
+      "timestampColumn": "timestamp",
+      "timestampFormat": ""
+    }
+  }
+]
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/1dffec09/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/resources/absence/policies.json
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/resources/absence/policies.json b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/resources/absence/policies.json
new file mode 100644
index 0000000..a7ce7dc
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/resources/absence/policies.json
@@ -0,0 +1,24 @@
+[
+  {
+    "name": "absenceAlertPolicy",
+    "description": "absenceAlertPolicy",
+    "inputStreams": [
+      "absenceAlertStream"
+    ],
+    "outputStreams": [
+      "absenceAlertStream_out"
+    ],
+    "definition": {
+      "type": "absencealert",
+      "value": "1,jobID,job1,daily_rule,14:00:00,15:00:00"
+    },
+    "partitionSpec": [
+      {
+        "streamId": "absenceAlertStream",
+        "type": "GROUPBY",
+        "columns" : ["jobID"]
+      }
+    ],
+    "parallelismHint": 2
+  }
+]
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/1dffec09/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/resources/absence/publishments.json
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/resources/absence/publishments.json b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/resources/absence/publishments.json
new file mode 100644
index 0000000..6e9260f
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/resources/absence/publishments.json
@@ -0,0 +1,20 @@
+[
+  {
+    "name":"test-stream-output",
+    "type":"org.apache.eagle.alert.engine.publisher.impl.AlertEmailPublisher",
+    "policyIds": [
+      "absenceAlertPolicy"
+    ],
+    "properties": {
+      "subject":"UMP Test Alert",
+      "template":"",
+      "sender": "sender@corp.com",
+      "recipients": "yonzhang@ebay.com",
+      "smtp.server":"atom.corp.ebay.com",
+      "connection": "plaintext",
+      "smtp.port": "25"
+    },
+    "dedupIntervalMin" : "PT5M",
+    "serializer" : "org.apache.eagle.alert.engine.publisher.impl.StringEventSerializer"
+  }
+]
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/1dffec09/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/resources/absence/streamdefinitions.json
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/resources/absence/streamdefinitions.json b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/resources/absence/streamdefinitions.json
new file mode 100644
index 0000000..4bd7319
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/resources/absence/streamdefinitions.json
@@ -0,0 +1,29 @@
+[
+  {
+    "streamId": "absenceAlertStream",
+    "dataSource": "absenceAlertDataSource",
+    "description": "the data stream for testing absence alert",
+    "validate": false,
+    "timeseries": false,
+    "columns": [
+      {
+        "name": "jobID",
+        "type": "STRING",
+        "defaultValue": "",
+        "required": true
+      },
+      {
+        "name": "timestamp",
+        "type": "LONG",
+        "defaultValue": 0,
+        "required": true
+      },
+      {
+        "name": "status",
+        "type": "STRING",
+        "defaultValue": "running",
+        "required": true
+      }
+    ]
+  }
+]
\ No newline at end of file