You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@eagle.apache.org by ra...@apache.org on 2016/09/02 08:34:42 UTC

incubator-eagle git commit: EAGLE-519: no data alert enhancement

Repository: incubator-eagle
Updated Branches:
  refs/heads/develop b2b16b745 -> ebbaad091


EAGLE-519: no data alert enhancement

Author: Li, Garrett
Reviewer: ralphsu

This closes #413


Project: http://git-wip-us.apache.org/repos/asf/incubator-eagle/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-eagle/commit/ebbaad09
Tree: http://git-wip-us.apache.org/repos/asf/incubator-eagle/tree/ebbaad09
Diff: http://git-wip-us.apache.org/repos/asf/incubator-eagle/diff/ebbaad09

Branch: refs/heads/develop
Commit: ebbaad091b797945efa7147698de554a53117955
Parents: b2b16b7
Author: Ralph, Su <su...@gmail.com>
Authored: Fri Sep 2 16:33:23 2016 +0800
Committer: Ralph, Su <su...@gmail.com>
Committed: Fri Sep 2 16:33:23 2016 +0800

----------------------------------------------------------------------
 .../engine/coordinator/PolicyDefinition.java    |   3 +-
 .../alert/engine/coordinator/StreamColumn.java  |  25 +-
 .../eagle/alert/coordinator/Coordinator.java    |   2 +-
 .../coordinator/impl/GreedyPolicyScheduler.java |   4 +
 .../provider/NodataMetadataGenerator.java       | 343 +++++++++++++++++++
 .../provider/ScheduleContextBuilder.java        |  14 +-
 .../coordinator/trigger/CoordinatorTrigger.java |   2 +-
 .../NodataMetadataGeneratorTest.java            | 103 ++++++
 .../coordinator/ScheduleContextBuilderTest.java |  93 ++++-
 .../TestGreedyScheduleCoordinator.java          | 131 +++++++
 .../src/test/resources/application.conf         |   9 +
 .../src/test/resources/test-application.conf    |   9 +
 .../engine/evaluator/PolicyStreamHandlers.java  |   9 +-
 .../nodata/DistinctValuesInTimeBatchWindow.java | 128 +++++++
 .../nodata/NoDataPolicyTimeBatchHandler.java    | 168 +++++++++
 .../publisher/impl/JsonEventSerializer.java     |   2 +-
 .../AttributeCollectWithDistinctAggregator.java | 124 +++++++
 .../src/main/resources/eagle.siddhiext          |   1 +
 .../TestDistinctValuesInTimeBatchWindow.java    |  89 +++++
 .../TestNoDataPolicyTimeBatchHandler.java       | 158 +++++++++
 .../src/test/resources/eagle.siddhiext          |  18 +
 .../simple/application-integration.conf         |  16 +-
 .../src/test/resources/simple/topologies.json   |  14 +-
 .../src/main/resources/application.conf         |  11 +-
 24 files changed, 1442 insertions(+), 34 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/ebbaad09/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/PolicyDefinition.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/PolicyDefinition.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/PolicyDefinition.java
index 0dca247..bc389a2 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/PolicyDefinition.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/PolicyDefinition.java
@@ -144,7 +144,8 @@ public class PolicyDefinition implements Serializable{
         if(! (that instanceof PolicyDefinition))
             return false;
         PolicyDefinition another = (PolicyDefinition)that;
-        if(another.name.equals(this.name) &&
+        if(Objects.equals(another.name, this.name) &&
+        		Objects.equals(another.description, this.description) &&
                 CollectionUtils.isEqualCollection(another.inputStreams, this.inputStreams) &&
                 CollectionUtils.isEqualCollection(another.outputStreams, this.outputStreams) &&
                 another.definition.equals(this.definition) &&

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/ebbaad09/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/StreamColumn.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/StreamColumn.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/StreamColumn.java
index b11729d..c0d355e 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/StreamColumn.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/StreamColumn.java
@@ -16,15 +16,15 @@
  */
 package org.apache.eagle.alert.engine.coordinator;
 
-import com.fasterxml.jackson.databind.ObjectMapper;
-
-import javax.xml.bind.annotation.XmlType;
-import javax.xml.bind.annotation.adapters.XmlAdapter;
-import javax.xml.bind.annotation.adapters.XmlJavaTypeAdapter;
 import java.io.IOException;
 import java.io.Serializable;
 import java.util.HashMap;
 
+import javax.xml.bind.annotation.adapters.XmlAdapter;
+import javax.xml.bind.annotation.adapters.XmlJavaTypeAdapter;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+
 
 public class StreamColumn implements Serializable {
     private static final long serialVersionUID = -5457861313624389106L;
@@ -33,13 +33,22 @@ public class StreamColumn implements Serializable {
     private Object defaultValue;
     private boolean required;
     private String description;
+    private String nodataExpression;
 
     public String toString() {
-        return String.format("StreamColumn=name[%s], type=[%s], defaultValue=[%s], required=[%s]", name, type,
-                defaultValue, required);
+        return String.format("StreamColumn=name[%s], type=[%s], defaultValue=[%s], required=[%s], nodataExpression=[%s]", 
+        		name, type, defaultValue, required, nodataExpression);
     }
 
-    public String getName() {
+    public String getNodataExpression() {
+		return nodataExpression;
+	}
+
+	public void setNodataExpression(String nodataExpression) {
+		this.nodataExpression = nodataExpression;
+	}
+
+	public String getName() {
         return name;
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/ebbaad09/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/Coordinator.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/Coordinator.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/Coordinator.java
index f46e4c2..5c455f6 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/Coordinator.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/Coordinator.java
@@ -128,7 +128,7 @@ public class Coordinator {
     		ScheduleState state = null;
     		try {
     			Stopwatch watch = Stopwatch.createStarted();
-    	        IScheduleContext context = new ScheduleContextBuilder(client).buildContext();
+    	        IScheduleContext context = new ScheduleContextBuilder(config, client).buildContext();
     	        TopologyMgmtService mgmtService = new TopologyMgmtService();
     	        IPolicyScheduler scheduler = PolicySchedulerFactory.createScheduler();
     	

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/ebbaad09/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/impl/GreedyPolicyScheduler.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/impl/GreedyPolicyScheduler.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/impl/GreedyPolicyScheduler.java
index 6c98fa6..ebc533e 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/impl/GreedyPolicyScheduler.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/impl/GreedyPolicyScheduler.java
@@ -298,6 +298,10 @@ public class GreedyPolicyScheduler implements IPolicyScheduler {
      * @return
      */
     private int getQueueSize(int hint) {
+    	if (hint == 0) {
+    		// some policies require single bolt to execute
+    		return 1;
+    	}
         return initialQueueSize * ((hint + initialQueueSize - 1) / initialQueueSize);
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/ebbaad09/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/provider/NodataMetadataGenerator.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/provider/NodataMetadataGenerator.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/provider/NodataMetadataGenerator.java
new file mode 100644
index 0000000..67dedeb
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/provider/NodataMetadataGenerator.java
@@ -0,0 +1,343 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.alert.coordinator.provider;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.eagle.alert.coordination.model.Kafka2TupleMetadata;
+import org.apache.eagle.alert.coordination.model.Tuple2StreamMetadata;
+import org.apache.eagle.alert.engine.coordinator.PolicyDefinition;
+import org.apache.eagle.alert.engine.coordinator.Publishment;
+import org.apache.eagle.alert.engine.coordinator.StreamColumn;
+import org.apache.eagle.alert.engine.coordinator.StreamDefinition;
+import org.apache.eagle.alert.engine.coordinator.StreamPartition;
+import org.apache.eagle.alert.utils.TimePeriodUtils;
+import org.joda.time.Period;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.typesafe.config.Config;
+
+public class NodataMetadataGenerator {
+
+	private static final Logger LOG = LoggerFactory.getLogger(NodataMetadataGenerator.class);
+	
+	private static final String NODATA_ALERT_AGGR_STREAM = "nodata_alert_aggregation_stream";
+	private static final String NODATA_ALERT_AGGR_OUTPUT_STREAM = "nodata_alert_aggregation_output_stream";
+	private static final String NODATA_ALERT_AGGR_DATASOURCE_NAME = "nodata_alert_aggregation_ds";
+	private static final String NODATA_ALERT_AGGR_OUTPUT_DATASOURCE_NAME = "nodata_alert_aggregation_output_ds";
+	private static final String NODATA_ALERT_AGGR_TOPIC_NAME = "nodata_alert_aggregation";
+	private static final String NODATA_ALERT_AGGR_OUTPUT_TOPIC_NAME = "nodata_alert";
+	
+	private static final String DATASOURCE_TYPE = "KAFKA";
+	private static final String DATASOURCE_SCHEME_CLS = "org.apache.eagle.alert.engine.scheme.JsonScheme";
+	
+	private static final String NODATA_ALERT_AGGR_POLICY_TYPE = "nodataalert";
+	private static final String NODATA_ALERT_AGGR_OUTPUT_POLICY_TYPE = "siddhi";
+	
+	private static final String JSON_STRING_STREAM_NAME_SELECTOR_CLS = "org.apache.eagle.alert.engine.scheme.JsonStringStreamNameSelector";
+	private static final String STREAM_TIMESTAMP_COLUMN_NAME = "timestamp";
+	private static final String STREAM_TIMESTAMP_FORMAT = "";
+	
+	private static final String KAFKA_PUBLISHMENT_TYPE = "org.apache.eagle.alert.engine.publisher.impl.AlertKafkaPublisher";
+	private static final String EMAIL_PUBLISHMENT_TYPE = "org.apache.eagle.alert.engine.publisher.impl.AlertEmailPublisher";
+	
+	private static final String PUBLISHMENT_DEDUP_DURATION = "PT0M";
+	private static final String PUBLISHMENT_SERIALIZER = "org.apache.eagle.alert.engine.publisher.impl.JsonEventSerializer";
+	
+	public NodataMetadataGenerator() {}
+	
+	public void execute(Config config, Map<String, StreamDefinition> streamDefinitionsMap, 
+			Map<String, Kafka2TupleMetadata> kafkaSources, 
+			Map<String, PolicyDefinition> policies, Map<String, Publishment> publishments) {
+		Collection<StreamDefinition> streamDefinitions = streamDefinitionsMap.values();
+		for (StreamDefinition streamDefinition : streamDefinitions) {
+    		StreamColumn columnWithNodataExpression = null;
+    		for (StreamColumn column : streamDefinition.getColumns()) {
+    			if (StringUtils.isNotBlank(column.getNodataExpression())) {
+    				// has nodata alert setting, needs to generate the nodata alert policy
+    				if (columnWithNodataExpression != null) {
+    					columnWithNodataExpression = null;
+    					LOG.warn("Only one column in one stream is allowed to configure nodata alert");
+    					break;
+    				}
+    				columnWithNodataExpression = column;
+    			}
+    		}
+    		if (columnWithNodataExpression != null) {
+    			String streamName = streamDefinition.getStreamId();
+    			
+    			// create nodata alert aggr stream
+    			if (streamDefinitionsMap.containsKey(NODATA_ALERT_AGGR_STREAM)) {
+    				LOG.info("Nodata alert aggregation stream: {} already exists", NODATA_ALERT_AGGR_STREAM);
+    			} else {
+    				streamDefinitionsMap.put(NODATA_ALERT_AGGR_STREAM, buildAggregationStream());
+    				LOG.info("Created nodata alert aggregation stream: {}", NODATA_ALERT_AGGR_STREAM);
+    			}
+    			
+    			// create nodata alert aggr output stream
+    			if (streamDefinitionsMap.containsKey(NODATA_ALERT_AGGR_OUTPUT_STREAM)) {
+    				LOG.info("Nodata alert aggregation output stream: {} already exists", NODATA_ALERT_AGGR_OUTPUT_STREAM);
+    			} else {
+    				streamDefinitionsMap.put(NODATA_ALERT_AGGR_OUTPUT_STREAM, buildAggregationOutputStream());
+    				LOG.info("Created nodata alert aggregation output stream: {}", NODATA_ALERT_AGGR_OUTPUT_STREAM);
+    			}
+    			
+    			// create nodata alert data source
+    			if (kafkaSources.containsKey(NODATA_ALERT_AGGR_DATASOURCE_NAME)) {
+    				LOG.info("Stream: {} nodata alert aggregation datasource: {} already exists", 
+    						NODATA_ALERT_AGGR_STREAM, NODATA_ALERT_AGGR_DATASOURCE_NAME);
+    			} else {
+	    			kafkaSources.put(NODATA_ALERT_AGGR_DATASOURCE_NAME, buildAggregationDatasource());
+	    			LOG.info("Created nodata alert aggregation datasource {} for stream {}", 
+	    					NODATA_ALERT_AGGR_DATASOURCE_NAME, NODATA_ALERT_AGGR_STREAM);
+    			}
+    			
+    			// create nodata alert aggregation output datasource
+    			if (kafkaSources.containsKey(NODATA_ALERT_AGGR_OUTPUT_DATASOURCE_NAME)) {
+    				LOG.info("Stream: {} nodata alert aggregation output datasource: {} already exists", 
+    						NODATA_ALERT_AGGR_OUTPUT_STREAM, NODATA_ALERT_AGGR_OUTPUT_DATASOURCE_NAME);
+    			} else {
+	    			kafkaSources.put(NODATA_ALERT_AGGR_OUTPUT_DATASOURCE_NAME, buildAggregationOutputDatasource());
+	    			LOG.info("Created nodata alert aggregation output datasource {} for stream {}", 
+	    					NODATA_ALERT_AGGR_DATASOURCE_NAME, NODATA_ALERT_AGGR_OUTPUT_STREAM);
+    			}
+    			
+    			// create nodata alert policy
+    			String policyName = streamName + "_nodata_alert";
+    			String nodataExpression = columnWithNodataExpression.getNodataExpression();
+    			String[] segments = nodataExpression.split(",");
+    			long windowPeriodInSeconds = TimePeriodUtils.getSecondsOfPeriod(Period.parse(segments[0]));
+    			if (policies.containsKey(policyName)) {
+    				LOG.info("Stream: {} nodata alert policy: {} already exists", streamName, policyName);
+    			} else {
+    				policies.put(policyName, buildDynamicNodataPolicy(
+    						streamName,
+    						policyName, 
+    						columnWithNodataExpression.getName(),
+    						nodataExpression,
+    						Arrays.asList(streamName)));
+    				LOG.info("Created nodata alert policy {} with expression {} for stream {}", 
+    						policyName, nodataExpression, streamName);
+    			}
+    			
+    			// create nodata alert aggregation
+    			String aggrPolicyName = NODATA_ALERT_AGGR_STREAM + "_policy";
+    			if (policies.containsKey(aggrPolicyName)) {
+    				LOG.info("Stream: {} nodata alert aggregation policy: {} already exists", 
+    						NODATA_ALERT_AGGR_OUTPUT_STREAM, aggrPolicyName);
+    			} else {
+    				policies.put(aggrPolicyName, buildAggregationPolicy(
+    						aggrPolicyName, 
+    						columnWithNodataExpression.getName(),
+    						windowPeriodInSeconds));
+    				LOG.info("Created nodata alert aggregation policy {} for stream {}", 
+    						aggrPolicyName, NODATA_ALERT_AGGR_OUTPUT_STREAM);
+    			}
+    			
+    			// create nodata alert publish
+    			String publishmentName = policyName + "_publish";
+    			if (publishments.containsKey(publishmentName)) {
+    				LOG.info("Stream: {} nodata alert publishment: {} already exists", streamName, publishmentName);
+    			} else {
+	    			String kafkaBroker = config.getString("kafkaProducer.bootstrapServers");
+	    			publishments.put(publishmentName, buildKafkaAlertPublishment(
+	    					publishmentName, policyName, kafkaBroker, NODATA_ALERT_AGGR_TOPIC_NAME));
+	    			publishments.put(publishmentName + "_email", buildEmailAlertPublishment(config, 
+	    					publishmentName + "_email", policyName, kafkaBroker, NODATA_ALERT_AGGR_TOPIC_NAME));
+	    			LOG.info("Created nodata alert publishment {} for stream {}", policyName + "_publish", streamName);
+    			}
+    			
+    			// create nodata alert aggregation publish
+    			String aggrPublishName = aggrPolicyName + "_publish";
+    			if (publishments.containsKey(aggrPublishName)) {
+    				LOG.info("Stream: {} publishment: {} already exists", NODATA_ALERT_AGGR_STREAM, aggrPublishName);
+    			} else {
+	    			String kafkaBroker = config.getString("kafkaProducer.bootstrapServers");
+	    			publishments.put(aggrPublishName, buildKafkaAlertPublishment(
+	    					aggrPublishName, aggrPolicyName, kafkaBroker, NODATA_ALERT_AGGR_OUTPUT_TOPIC_NAME));
+	    			publishments.put(aggrPublishName + "_email", buildEmailAlertPublishment(config, 
+	    					aggrPublishName + "_email", aggrPolicyName, kafkaBroker, NODATA_ALERT_AGGR_OUTPUT_TOPIC_NAME));
+	    			LOG.info("Created nodata alert publishment {} for stream {}", policyName + "_publish", streamName);
+    			}
+    		}
+    	}
+	}
+	
+	private Kafka2TupleMetadata buildAggregationDatasource() {
+		Kafka2TupleMetadata datasource = new Kafka2TupleMetadata();
+		datasource.setName(NODATA_ALERT_AGGR_DATASOURCE_NAME);
+		datasource.setType(DATASOURCE_TYPE);
+		datasource.setSchemeCls(DATASOURCE_SCHEME_CLS);
+		datasource.setTopic(NODATA_ALERT_AGGR_TOPIC_NAME);
+		Tuple2StreamMetadata codec = new Tuple2StreamMetadata();
+		codec.setStreamNameSelectorCls(JSON_STRING_STREAM_NAME_SELECTOR_CLS);
+		codec.setTimestampColumn(STREAM_TIMESTAMP_COLUMN_NAME);
+		codec.setTimestampFormat(STREAM_TIMESTAMP_FORMAT);
+		Properties codecProperties = new Properties();
+		codecProperties.put("userProvidedStreamName", NODATA_ALERT_AGGR_STREAM);
+		codecProperties.put("streamNameFormat", "%s");
+		codec.setStreamNameSelectorProp(codecProperties);
+		datasource.setCodec(codec);
+		return datasource;
+	}
+	
+	private Kafka2TupleMetadata buildAggregationOutputDatasource() {
+		Kafka2TupleMetadata datasource = new Kafka2TupleMetadata();
+		datasource.setName(NODATA_ALERT_AGGR_OUTPUT_DATASOURCE_NAME);
+		datasource.setType(DATASOURCE_TYPE);
+		datasource.setSchemeCls(DATASOURCE_SCHEME_CLS);
+		datasource.setTopic(NODATA_ALERT_AGGR_OUTPUT_TOPIC_NAME);
+		Tuple2StreamMetadata codec = new Tuple2StreamMetadata();
+		codec.setStreamNameSelectorCls(JSON_STRING_STREAM_NAME_SELECTOR_CLS);
+		codec.setTimestampColumn(STREAM_TIMESTAMP_COLUMN_NAME);
+		codec.setTimestampFormat(STREAM_TIMESTAMP_FORMAT);
+		Properties codecProperties = new Properties();
+		codecProperties.put("userProvidedStreamName", NODATA_ALERT_AGGR_OUTPUT_STREAM);
+		codecProperties.put("streamNameFormat", "%s");
+		codec.setStreamNameSelectorProp(codecProperties);
+		datasource.setCodec(codec);
+		return datasource;
+	}
+	
+	private PolicyDefinition buildDynamicNodataPolicy(String streamName, String policyName, 
+    		String columnName, String expression, List<String> inputStream) {
+		PolicyDefinition pd = new PolicyDefinition();
+		PolicyDefinition.Definition def = new PolicyDefinition.Definition();
+		//expression, something like "PT5S,dynamic,1,host"
+		def.setValue(expression);
+		def.setType(NODATA_ALERT_AGGR_POLICY_TYPE);
+		pd.setDefinition(def);
+		pd.setInputStreams(inputStream);
+		pd.setOutputStreams(Arrays.asList(NODATA_ALERT_AGGR_STREAM));
+		pd.setName(policyName);
+		pd.setDescription(String.format("Nodata alert policy for stream %s", streamName));
+		
+		StreamPartition sp = new StreamPartition();
+		sp.setStreamId(streamName);
+		sp.setColumns(Arrays.asList(columnName));
+		sp.setType(StreamPartition.Type.GROUPBY);
+		pd.addPartition(sp);
+		return pd;
+	}
+    
+    private PolicyDefinition buildAggregationPolicy(String policyName, String columnName, 
+    		long windowPeriodInSeconds) {
+    	PolicyDefinition pd = new PolicyDefinition();
+		PolicyDefinition.Definition def = new PolicyDefinition.Definition();
+		String SiddhiQL = String.format(
+				"from %s#window.timeBatch(%s sec) select eagle:collectWithDistinct(%s) as hosts, "
+				+ "originalStreamName as streamName group by originalStreamName insert into %s", 
+				NODATA_ALERT_AGGR_STREAM, windowPeriodInSeconds * 2, 
+				columnName, NODATA_ALERT_AGGR_OUTPUT_STREAM);
+		LOG.info("Generated SiddhiQL {} for stream: {}", SiddhiQL, NODATA_ALERT_AGGR_STREAM);
+		def.setValue(SiddhiQL);
+		def.setType(NODATA_ALERT_AGGR_OUTPUT_POLICY_TYPE);
+		pd.setDefinition(def);
+		pd.setInputStreams(Arrays.asList(NODATA_ALERT_AGGR_STREAM));
+		pd.setOutputStreams(Arrays.asList(NODATA_ALERT_AGGR_OUTPUT_STREAM));
+		pd.setName(policyName);
+		pd.setDescription("Nodata alert aggregation policy, used to merge alerts from multiple bolts");
+		
+		StreamPartition sp = new StreamPartition();
+		sp.setStreamId(NODATA_ALERT_AGGR_STREAM);
+		sp.setColumns(Arrays.asList(columnName));
+		sp.setType(StreamPartition.Type.GROUPBY);
+		pd.addPartition(sp);
+		pd.setParallelismHint(0);
+    	return pd;
+    }
+	
+	private Publishment buildKafkaAlertPublishment(String publishmentName, String policyName, String kafkaBroker, String topic) {
+		Publishment publishment = new Publishment();
+		publishment.setName(publishmentName);
+		publishment.setType(KAFKA_PUBLISHMENT_TYPE);
+		publishment.setPolicyIds(Arrays.asList(policyName));
+		publishment.setDedupIntervalMin(PUBLISHMENT_DEDUP_DURATION);
+		Map<String, String> publishmentProperties = new HashMap<String, String>();
+		publishmentProperties.put("kafka_broker", kafkaBroker);
+		publishmentProperties.put("topic", topic);
+		publishment.setProperties(publishmentProperties);
+		publishment.setSerializer(PUBLISHMENT_SERIALIZER);
+		return publishment;
+	}
+	
+	private Publishment buildEmailAlertPublishment(Config config, 
+			String publishmentName, String policyName, String kafkaBroker, String topic) {
+		Publishment publishment = new Publishment();
+		publishment.setName(publishmentName);
+		publishment.setType(EMAIL_PUBLISHMENT_TYPE);
+		publishment.setPolicyIds(Arrays.asList(policyName));
+		publishment.setDedupIntervalMin(PUBLISHMENT_DEDUP_DURATION);
+		Map<String, String> publishmentProperties = new HashMap<String, String>();
+		publishmentProperties.put("subject", String.format("Eagle Alert - %s", topic));
+		publishmentProperties.put("template", "");
+		publishmentProperties.put("sender", config.getString("email.sender"));
+		publishmentProperties.put("recipients", config.getString("email.recipients"));
+		publishmentProperties.put("mail.smtp.host", config.getString("email.mailSmtpHost"));
+		publishmentProperties.put("mail.smtp.port", config.getString("email.mailSmtpPort"));
+		publishmentProperties.put("connection", "plaintext");
+		publishment.setProperties(publishmentProperties);
+		publishment.setSerializer(PUBLISHMENT_SERIALIZER);
+		return publishment;
+	}
+	
+	private StreamDefinition buildAggregationStream() {
+		StreamDefinition sd = new StreamDefinition();
+		StreamColumn tsColumn = new StreamColumn();
+		tsColumn.setName("timestamp");
+		tsColumn.setType(StreamColumn.Type.LONG);
+		
+		StreamColumn hostColumn = new StreamColumn();
+		hostColumn.setName("host");
+		hostColumn.setType(StreamColumn.Type.STRING);
+
+		StreamColumn originalStreamNameColumn = new StreamColumn();
+		originalStreamNameColumn.setName("originalStreamName");
+		originalStreamNameColumn.setType(StreamColumn.Type.STRING);
+		
+		sd.setColumns(Arrays.asList(tsColumn, hostColumn, originalStreamNameColumn));
+		sd.setDataSource(NODATA_ALERT_AGGR_DATASOURCE_NAME);
+		sd.setStreamId(NODATA_ALERT_AGGR_STREAM);
+		sd.setDescription("Nodata alert aggregation stream");
+		return sd;
+	}
+	
+	private StreamDefinition buildAggregationOutputStream() {
+		StreamDefinition sd = new StreamDefinition();
+		StreamColumn hostColumn = new StreamColumn();
+		hostColumn.setName("hosts");
+		hostColumn.setType(StreamColumn.Type.STRING);
+		
+		StreamColumn osnColumn = new StreamColumn();
+		osnColumn.setName("streamName");
+		osnColumn.setType(StreamColumn.Type.STRING);
+		
+		sd.setColumns(Arrays.asList(hostColumn, osnColumn));
+		sd.setDataSource(NODATA_ALERT_AGGR_OUTPUT_DATASOURCE_NAME);
+		sd.setStreamId(NODATA_ALERT_AGGR_OUTPUT_STREAM);
+		sd.setDescription("Nodata alert aggregation output stream");
+		return sd;
+	}
+    
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/ebbaad09/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/provider/ScheduleContextBuilder.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/provider/ScheduleContextBuilder.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/provider/ScheduleContextBuilder.java
index 9013f04..dd38395 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/provider/ScheduleContextBuilder.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/provider/ScheduleContextBuilder.java
@@ -61,6 +61,7 @@ public class ScheduleContextBuilder {
     private static final Logger LOG = LoggerFactory.getLogger(ScheduleContextBuilder.class);
     private static final String UNIQUE_BOLT_ID = "%s-%s";// toponame-boltname
 
+    private Config config;
     private IMetadataServiceClient client;
 
     private Map<String, Topology> topologies;
@@ -73,10 +74,12 @@ public class ScheduleContextBuilder {
     private Map<String, TopologyUsage> usages;
 
     public ScheduleContextBuilder(Config config) {
+    	this.config = config;
         client = new MetadataServiceClientImpl(config);
     }
 
-    public ScheduleContextBuilder(IMetadataServiceClient client) {
+    public ScheduleContextBuilder(Config config, IMetadataServiceClient client) {
+    	this.config = config;
         this.client = client;
     }
 
@@ -89,10 +92,13 @@ public class ScheduleContextBuilder {
         topologies = listToMap(client.listTopologies());
         kafkaSources = listToMap(client.listDataSources());
         // filter out disabled policies
-        policies = listToMap(client.listPolicies().stream().filter(
-        		(t) -> t.getPolicyStatus() != PolicyStatus.DISABLED).collect(Collectors.toList()));
+        List<PolicyDefinition> enabledPolicies = client.listPolicies().stream().filter(
+        		(t) -> t.getPolicyStatus() != PolicyStatus.DISABLED).collect(Collectors.toList());
+        policies = listToMap(enabledPolicies);
         publishments = listToMap(client.listPublishment());
         streamDefinitions = listToMap(client.listStreams());
+        // generate data sources, policies, publishments for nodata alert
+        new NodataMetadataGenerator().execute(config, streamDefinitions, kafkaSources, policies, publishments);
         
         // TODO: See ScheduleState comments on how to improve the storage
         ScheduleState state = client.getVersionedSpec();
@@ -114,7 +120,7 @@ public class ScheduleContextBuilder {
         return new InMemScheduleConext(topologies, assignments, kafkaSources, policies, publishments,
                 streamDefinitions, monitoredStreamMap, usages);
     }
-
+    
     /**
      * 1.
      * <pre>

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/ebbaad09/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/trigger/CoordinatorTrigger.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/trigger/CoordinatorTrigger.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/trigger/CoordinatorTrigger.java
index c489a0e..4e17179 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/trigger/CoordinatorTrigger.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/trigger/CoordinatorTrigger.java
@@ -64,7 +64,7 @@ public class CoordinatorTrigger implements Runnable {
                 Stopwatch watch = Stopwatch.createStarted();
 
                 // schedule
-                IScheduleContext context = new ScheduleContextBuilder(client).buildContext();
+                IScheduleContext context = new ScheduleContextBuilder(config, client).buildContext();
                 TopologyMgmtService mgmtService = new TopologyMgmtService();
                 IPolicyScheduler scheduler = PolicySchedulerFactory.createScheduler();
 

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/ebbaad09/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/test/java/org/apache/alert/coordinator/NodataMetadataGeneratorTest.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/test/java/org/apache/alert/coordinator/NodataMetadataGeneratorTest.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/test/java/org/apache/alert/coordinator/NodataMetadataGeneratorTest.java
new file mode 100644
index 0000000..43dc9c1
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/test/java/org/apache/alert/coordinator/NodataMetadataGeneratorTest.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.alert.coordinator;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.commons.lang3.builder.ToStringBuilder;
+import org.apache.eagle.alert.coordination.model.Kafka2TupleMetadata;
+import org.apache.eagle.alert.coordinator.provider.NodataMetadataGenerator;
+import org.apache.eagle.alert.engine.coordinator.PolicyDefinition;
+import org.apache.eagle.alert.engine.coordinator.Publishment;
+import org.apache.eagle.alert.engine.coordinator.StreamColumn;
+import org.apache.eagle.alert.engine.coordinator.StreamDefinition;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+
+public class NodataMetadataGeneratorTest {
+	
+	private static final Logger LOG = LoggerFactory.getLogger(NodataMetadataGeneratorTest.class);
+
+	Config config = ConfigFactory.load().getConfig("coordinator");
+	private NodataMetadataGenerator generator;
+	
+	@Before
+	public void setup() {
+		generator = new NodataMetadataGenerator();
+	}
+	
+	@Test
+	public void testNormal() throws Exception {
+		StreamDefinition sd = createStreamDefinitionWithNodataAlert();
+		Map<String, StreamDefinition> streamDefinitionsMap = new HashMap<String, StreamDefinition>();
+		streamDefinitionsMap.put(sd.getStreamId(), sd);
+		
+		Map<String, Kafka2TupleMetadata> kafkaSources = new HashMap<String, Kafka2TupleMetadata>();
+		Map<String, PolicyDefinition> policies = new HashMap<String, PolicyDefinition>();
+		Map<String, Publishment> publishments = new HashMap<String, Publishment>();
+		
+		generator.execute(config, streamDefinitionsMap, kafkaSources, policies, publishments);
+		
+		Assert.assertEquals(2, kafkaSources.size());
+		
+		kafkaSources.forEach((key, value) -> {
+			LOG.info("KafkaSources > {}: {}", key, ToStringBuilder.reflectionToString(value));
+		});
+		
+		Assert.assertEquals(2, policies.size());
+		
+		policies.forEach((key, value) -> {
+			LOG.info("Policies > {}: {}", key, ToStringBuilder.reflectionToString(value));
+		});
+		
+		Assert.assertEquals(4, publishments.size());
+		
+		publishments.forEach((key, value) -> {
+			LOG.info("Publishments > {}: {}", key, ToStringBuilder.reflectionToString(value));
+		});
+	}
+	
+	private StreamDefinition createStreamDefinitionWithNodataAlert() {
+		StreamDefinition sd = new StreamDefinition();
+		StreamColumn tsColumn = new StreamColumn();
+		tsColumn.setName("timestamp");
+		tsColumn.setType(StreamColumn.Type.LONG);
+
+		StreamColumn hostColumn = new StreamColumn();
+		hostColumn.setName("host");
+		hostColumn.setType(StreamColumn.Type.STRING);
+		hostColumn.setNodataExpression("PT1M,dynamic,1,host");
+
+		StreamColumn valueColumn = new StreamColumn();
+		valueColumn.setName("value");
+		valueColumn.setType(StreamColumn.Type.DOUBLE);
+
+		sd.setColumns(Arrays.asList(tsColumn, hostColumn, valueColumn));
+		sd.setDataSource("testDataSource");
+		sd.setStreamId("testStreamId");
+		return sd;
+	}
+	
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/ebbaad09/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/test/java/org/apache/alert/coordinator/ScheduleContextBuilderTest.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/test/java/org/apache/alert/coordinator/ScheduleContextBuilderTest.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/test/java/org/apache/alert/coordinator/ScheduleContextBuilderTest.java
index ed9d7b7..84153f6 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/test/java/org/apache/alert/coordinator/ScheduleContextBuilderTest.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/test/java/org/apache/alert/coordinator/ScheduleContextBuilderTest.java
@@ -21,6 +21,7 @@ import java.util.Arrays;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Map.Entry;
 import java.util.Set;
 
 import org.apache.alert.coordinator.mock.InMemMetadataServiceClient;
@@ -48,17 +49,22 @@ import org.apache.eagle.alert.engine.coordinator.StreamSortSpec;
 import org.junit.Assert;
 import org.junit.Test;
 
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+
 /**
  * @since May 5, 2016
  *
  */
 public class ScheduleContextBuilderTest {
 
+	Config config = ConfigFactory.load().getConfig("coordinator");
+	
     @Test
     public void test() {
         InMemMetadataServiceClient client = getSampleMetadataService();
 
-        ScheduleContextBuilder builder = new ScheduleContextBuilder(client);
+        ScheduleContextBuilder builder = new ScheduleContextBuilder(config, client);
 
         IScheduleContext context = builder.buildContext();
 
@@ -84,7 +90,7 @@ public class ScheduleContextBuilderTest {
     @Test
     public void test_remove_policy() {
         InMemMetadataServiceClient client = getSampleMetadataService();
-        ScheduleContextBuilder builder = new ScheduleContextBuilder(client);
+        ScheduleContextBuilder builder = new ScheduleContextBuilder(config, client);
 
         PolicyAssignment assignment1 = client.getVersionedSpec().getAssignments().get(0);
 
@@ -105,7 +111,7 @@ public class ScheduleContextBuilderTest {
     @Test
     public void test_changed_policy_partition() {
         InMemMetadataServiceClient client = getSampleMetadataService();
-        ScheduleContextBuilder builder = new ScheduleContextBuilder(client);
+        ScheduleContextBuilder builder = new ScheduleContextBuilder(config, client);
         PolicyAssignment assignment1 = client.getVersionedSpec().getAssignments().get(0);
 
         IScheduleContext context = builder.buildContext();
@@ -143,7 +149,7 @@ public class ScheduleContextBuilderTest {
     @Test
     public void test_changed_policy_parallelism() {
         InMemMetadataServiceClient client = getSampleMetadataService();
-        ScheduleContextBuilder builder = new ScheduleContextBuilder(client);
+        ScheduleContextBuilder builder = new ScheduleContextBuilder(config, client);
         PolicyAssignment assignment1 = client.getVersionedSpec().getAssignments().get(0);
 
         IScheduleContext context = builder.buildContext();
@@ -171,7 +177,7 @@ public class ScheduleContextBuilderTest {
     @Test
     public void test_changed_policy_definition() {
         InMemMetadataServiceClient client = getSampleMetadataService();
-        ScheduleContextBuilder builder = new ScheduleContextBuilder(client);
+        ScheduleContextBuilder builder = new ScheduleContextBuilder(config, client);
         PolicyAssignment assignment1 = client.getVersionedSpec().getAssignments().get(0);
 
         IScheduleContext context = builder.buildContext();
@@ -189,11 +195,59 @@ public class ScheduleContextBuilderTest {
         // just to make sure queueNew is present
         Assert.assertEquals(queue.getQueueId(), queueNew.getQueueId());
     }
+    
+    @Test
+    public void test_stream_noalert_policies_generation() throws Exception {
+    	InMemMetadataServiceClient client = getSampleMetadataServiceWithNodataAlert();
+    	
+        ScheduleContextBuilder builder = new ScheduleContextBuilder(config, client);
+        IScheduleContext context = builder.buildContext();
+        
+        PolicyDefinition policyDefinition = null;
+        PolicyDefinition aggrPolicyDefinition = null;
+        for (Entry<String, PolicyDefinition> entry : context.getPolicies().entrySet()) {
+        	if (entry.getKey().endsWith("_nodata_alert")) {
+        		policyDefinition = entry.getValue();
+        		continue;
+        	}
+        	if (entry.getKey().endsWith("_aggregation_stream_policy")) {
+        		aggrPolicyDefinition = entry.getValue();
+        		continue;
+        	}
+        }
+        Assert.assertEquals(3, context.getPolicies().size());
+        
+        Assert.assertNotNull(policyDefinition);
+        Assert.assertEquals("nodataalert", policyDefinition.getDefinition().getType());
+        Assert.assertEquals("PT5S,dynamic,1," + COL1, policyDefinition.getDefinition().getValue());
+        
+        Assert.assertNotNull(aggrPolicyDefinition);
+        Assert.assertEquals("siddhi", aggrPolicyDefinition.getDefinition().getType());
+        
+        Kafka2TupleMetadata datasource = null;
+        for (Entry<String, Kafka2TupleMetadata> entry : context.getDataSourceMetadata().entrySet()) {
+        	if ("nodata_alert_aggregation_ds".equals(entry.getKey())) {
+        		datasource = entry.getValue();
+        		break;
+        	}
+        }
+        Assert.assertNotNull(datasource);
+        
+        String publishmentName = policyDefinition.getName() + "_publish";
+        Publishment publishment = null;
+        for (Entry<String, Publishment> entry : context.getPublishments().entrySet()) {
+        	if (publishmentName.equals(entry.getKey())) {
+        		publishment = entry.getValue();
+        		break;
+        	}
+        }
+        Assert.assertNotNull(publishment);
+    }
 
     @Test
     public void test_renamed_topologies() {
         InMemMetadataServiceClient client = getSampleMetadataService();
-        ScheduleContextBuilder builder = new ScheduleContextBuilder(client);
+        ScheduleContextBuilder builder = new ScheduleContextBuilder(config, client);
 
         IScheduleContext context = builder.buildContext();
         Assert.assertTrue(context.getPolicyAssignments().containsKey(TEST_POLICY_1));
@@ -226,6 +280,33 @@ public class ScheduleContextBuilderTest {
         client.addScheduleState(createScheduleState());
         return client;
     }
+    
+    public static InMemMetadataServiceClient getSampleMetadataServiceWithNodataAlert() {
+        InMemMetadataServiceClient client = new InMemMetadataServiceClient();
+        client.addTopology(createSampleTopology());
+        client.addDataSource(createKafka2TupleMetadata());
+        client.addPolicy(createPolicy());
+        client.addPublishment(createPublishment());
+        client.addStreamDefinition(createStreamDefinitionWithNodataAlert());
+        client.addScheduleState(createScheduleState());
+        return client;
+    }
+    
+    private static StreamDefinition createStreamDefinitionWithNodataAlert() {
+        StreamDefinition def = new StreamDefinition();
+        def.setStreamId(TEST_STREAM_DEF_1);
+        def.setDataSource(TEST_DATASOURCE_1);
+
+        StreamColumn col = new StreamColumn();
+        col.setName(COL1);
+        col.setRequired(true);
+        col.setType(Type.STRING);
+        col.setNodataExpression("PT5S,dynamic,1," + COL1);
+        def.getColumns().add(col);
+
+        return def;
+    }
+
 
     private static ScheduleState createScheduleState() {
         ScheduleState ss = new ScheduleState();

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/ebbaad09/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/test/java/org/apache/alert/coordinator/TestGreedyScheduleCoordinator.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/test/java/org/apache/alert/coordinator/TestGreedyScheduleCoordinator.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/test/java/org/apache/alert/coordinator/TestGreedyScheduleCoordinator.java
new file mode 100644
index 0000000..a86b13a
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/test/java/org/apache/alert/coordinator/TestGreedyScheduleCoordinator.java
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.alert.coordinator;
+
+import org.apache.eagle.alert.coordinator.ExclusiveExecutor;
+import org.apache.eagle.alert.utils.ZookeeperEmbedded;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TestGreedyScheduleCoordinator {
+	
+	public static class ScheduleZkState {
+    	volatile boolean scheduleAcquired = false;
+        volatile boolean scheduleCompleted = false;
+    }
+
+	public static class GreedyScheduleCoordinator {
+		
+	    public int schedule(int input) {
+	    	ScheduleZkState scheduleZkState = new ScheduleZkState();
+	    	ExclusiveExecutor.Runnable exclusiveRunnable = new ExclusiveExecutor.Runnable() {
+				@Override
+				public void run() throws Exception {
+					scheduleZkState.scheduleAcquired = true;
+					
+					while (!scheduleZkState.scheduleCompleted) {
+						Thread.sleep(2000);
+					}
+				}
+	    	};
+	    	ExclusiveExecutor.execute("/alert/test", exclusiveRunnable);
+	    	int waitMaxTimes = 0;
+	    	while (waitMaxTimes < 90) { //about 3 minutes waiting
+	    		if (!scheduleZkState.scheduleAcquired) {
+	    			waitMaxTimes ++;
+	    			try {
+						Thread.sleep(2000);
+					} catch (InterruptedException e) {}
+	    			continue;
+	    		}
+	    		try {
+	    			return input;
+	    		} finally {
+	    			//schedule completed
+	    			scheduleZkState.scheduleCompleted = true;
+	    		}
+	    	}
+	    	throw new RuntimeException("Acquire greedy scheduler lock failed, please retry later");
+	    }
+		
+	}
+	
+	ZookeeperEmbedded zkEmbed;
+
+	@Before
+	public void setUp() throws Exception {
+		zkEmbed = new ZookeeperEmbedded(2181);
+		zkEmbed.start();
+
+		Thread.sleep(2000);
+	}
+
+	@After
+	public void tearDown() throws Exception {
+		zkEmbed.shutdown();
+	}
+	
+	@Test
+	public void testMain() throws Exception {
+		final GreedyScheduleCoordinator coordinator = new GreedyScheduleCoordinator();
+		
+		
+		new Thread(new Runnable() {
+
+			@Override
+			public void run() {
+				System.out.println("output: " + coordinator.schedule(1));
+				
+				try {
+					Thread.sleep(1000);
+				} catch (InterruptedException e) {}
+			}
+			
+		}).start();
+		
+		new Thread(new Runnable() {
+
+			@Override
+			public void run() {
+				System.out.println("output: " + coordinator.schedule(2));
+				
+				try {
+					Thread.sleep(1000);
+				} catch (InterruptedException e) {}
+			}
+			
+		}).start();
+		
+		new Thread(new Runnable() {
+
+			@Override
+			public void run() {
+				System.out.println("output: " + coordinator.schedule(3));
+				
+				try {
+					Thread.sleep(1000);
+				} catch (InterruptedException e) {}
+			}
+			
+		}).start();
+		
+		Thread.sleep(15000);
+	}
+	
+	
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/ebbaad09/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/test/resources/application.conf
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/test/resources/application.conf b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/test/resources/application.conf
index 363e661..1ef71a0 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/test/resources/application.conf
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/test/resources/application.conf
@@ -37,6 +37,15 @@
 		"metadataDynamicCheck" : {
 			"initDelayMillis" : 1000,
 			"delayMillis" : 30000
+		},
+		"kafkaProducer": {
+	  		"bootstrapServers": "localhost:9092"
+	  	},
+		"email": {
+			"sender": "eagle@eagle.com",
+			"recipients": "test@eagle.com",
+			"mailSmtpHost": "test.eagle.com",
+			"mailSmtpPort": "25"
 		}
 	}
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/ebbaad09/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/test/resources/test-application.conf
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/test/resources/test-application.conf b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/test/resources/test-application.conf
index 361d6d1..63be6a9 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/test/resources/test-application.conf
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/test/resources/test-application.conf
@@ -37,6 +37,15 @@
 		"metadataDynamicCheck" : {
 			"initDelayMillis" : 1000,
 			"delayMillis" : 30000
+		},
+		"kafkaProducer": {
+	  		"bootstrapServers": "localhost:9092"
+	  	},
+		"email": {
+			"sender": "eagle@eagle.com",
+			"recipients": "test@eagle.com",
+			"mailSmtpHost": "test.eagle.com",
+			"mailSmtpPort": "25"
 		}
 	}
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/ebbaad09/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/PolicyStreamHandlers.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/PolicyStreamHandlers.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/PolicyStreamHandlers.java
index ef9caf0..1e7aacc 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/PolicyStreamHandlers.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/PolicyStreamHandlers.java
@@ -16,17 +16,17 @@
  */
 package org.apache.eagle.alert.engine.evaluator;
 
+import java.util.Map;
+
 import org.apache.eagle.alert.engine.coordinator.PolicyDefinition;
 import org.apache.eagle.alert.engine.coordinator.StreamDefinition;
 import org.apache.eagle.alert.engine.evaluator.absence.AbsencePolicyHandler;
 import org.apache.eagle.alert.engine.evaluator.impl.SiddhiPolicyHandler;
 import org.apache.eagle.alert.engine.evaluator.impl.SiddhiPolicyStateHandler;
-import org.apache.eagle.alert.engine.evaluator.nodata.NoDataPolicyHandler;
+import org.apache.eagle.alert.engine.evaluator.nodata.NoDataPolicyTimeBatchHandler;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.Map;
-
 /**
  * TODO/FIXME: to support multiple stage definition in single policy. The methods in this class is not good to understand now.(Hard code of 0/1).
  */
@@ -42,7 +42,8 @@ public class PolicyStreamHandlers {
         if (SIDDHI_ENGINE.equals(definition.getType())) {
             return new SiddhiPolicyHandler(sds, 0);// // FIXME: 8/2/16 
         } else if (NO_DATA_ALERT_ENGINE.equals(definition.getType())) {
-            return new NoDataPolicyHandler(sds);
+        	// no data for an entire stream won't trigger gap alert  (use local time & batch window instead)
+        	return new NoDataPolicyTimeBatchHandler(sds);
         } else if (ABSENCE_ALERT_ENGINE.equals(definition.getType())) {
             return new AbsencePolicyHandler(sds);
         } else if (CUSTOMIZED_ENGINE.equals(definition.getType())) {

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/ebbaad09/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/nodata/DistinctValuesInTimeBatchWindow.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/nodata/DistinctValuesInTimeBatchWindow.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/nodata/DistinctValuesInTimeBatchWindow.java
new file mode 100644
index 0000000..357504e
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/nodata/DistinctValuesInTimeBatchWindow.java
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.alert.engine.evaluator.nodata;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.eagle.alert.engine.model.StreamEvent;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class DistinctValuesInTimeBatchWindow {
+
+	private static final Logger LOG = LoggerFactory.getLogger(DistinctValuesInTimeBatchWindow.class);
+	
+	private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
+	
+	// wisb (what is should be) set for expected full set value of multiple columns
+	@SuppressWarnings("rawtypes")
+	private volatile Set wisb = new HashSet();
+	
+	private NoDataPolicyTimeBatchHandler handler;
+	
+	/**
+	 * map from value to max timestamp for this value
+	 */
+	private Map<Object, Long> valueMaxTimeMap = new HashMap<>();
+	
+	private long startTime = -1;
+	private long nextEmitTime = -1;
+	private long timeInMilliSeconds;
+
+	public DistinctValuesInTimeBatchWindow(NoDataPolicyTimeBatchHandler handler, 
+			long timeInMilliSeconds, @SuppressWarnings("rawtypes") Set wisb) {
+		this.handler = handler;
+		this.timeInMilliSeconds = timeInMilliSeconds;
+		if (wisb != null) {
+			this.wisb = wisb;
+		}
+	}
+
+	public Map<Object, Long> distinctValues() {
+		return valueMaxTimeMap;
+	}
+	
+	public void send(StreamEvent event, Object value, long timestamp) {
+		synchronized(this) {
+			if (startTime < 0) {
+				startTime = System.currentTimeMillis();
+				
+				scheduler.scheduleAtFixedRate(new Runnable() {
+
+					@SuppressWarnings({ "unchecked", "rawtypes" })
+					@Override
+					public void run() {
+						try {
+							LOG.info("{}/{}: {}", startTime, nextEmitTime, valueMaxTimeMap.keySet());
+							synchronized (valueMaxTimeMap) {
+								boolean sendAlerts = false;
+								
+								if (nextEmitTime < 0) {
+									nextEmitTime = startTime + timeInMilliSeconds;
+								}
+								
+								if (System.currentTimeMillis() > nextEmitTime) {
+									startTime = nextEmitTime;
+									nextEmitTime += timeInMilliSeconds;
+									sendAlerts = true;
+								} else {
+									sendAlerts = false;
+								}
+								
+								if (sendAlerts) {
+									// alert
+									handler.compareAndEmit(wisb, distinctValues().keySet(), event);
+									LOG.info("alert for wiri: {} compares to wisb: {}", distinctValues().keySet(), wisb);
+									
+									if (distinctValues().keySet().size() > 0) {
+										wisb = new HashSet(distinctValues().keySet());
+									}
+									valueMaxTimeMap.clear();
+									LOG.info("Clear wiri & update wisb to {}", wisb);
+								}
+							}
+						} catch (Throwable t) {
+							LOG.error("failed to run batch window for gap alert", t);
+						}
+					}
+					
+				}, 0, timeInMilliSeconds / 2, TimeUnit.MILLISECONDS);
+			}
+		}
+		
+		if (valueMaxTimeMap.containsKey(value)) {
+			// remove that entry with old timestamp in timeSortedMap
+			long oldTime = valueMaxTimeMap.get(value);
+			if (oldTime >= timestamp) {
+				// no any effect as the new timestamp is equal or even less than
+				// old timestamp
+				return;
+			}
+		}
+		// update new timestamp in valueMaxTimeMap
+		valueMaxTimeMap.put(value, timestamp);
+		
+		LOG.info("sent: {} with start: {}/next: {}", valueMaxTimeMap.keySet(), startTime, nextEmitTime);
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/ebbaad09/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/nodata/NoDataPolicyTimeBatchHandler.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/nodata/NoDataPolicyTimeBatchHandler.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/nodata/NoDataPolicyTimeBatchHandler.java
new file mode 100644
index 0000000..741fce4
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/nodata/NoDataPolicyTimeBatchHandler.java
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.alert.engine.evaluator.nodata;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.eagle.alert.engine.Collector;
+import org.apache.eagle.alert.engine.coordinator.PolicyDefinition;
+import org.apache.eagle.alert.engine.coordinator.StreamDefinition;
+import org.apache.eagle.alert.engine.evaluator.PolicyHandlerContext;
+import org.apache.eagle.alert.engine.evaluator.PolicyStreamHandler;
+import org.apache.eagle.alert.engine.model.AlertStreamEvent;
+import org.apache.eagle.alert.engine.model.StreamEvent;
+import org.apache.eagle.alert.utils.TimePeriodUtils;
+import org.apache.storm.guava.base.Joiner;
+import org.joda.time.Period;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class NoDataPolicyTimeBatchHandler implements PolicyStreamHandler {
+	
+	private static final Logger LOG = LoggerFactory.getLogger(NoDataPolicyTimeBatchHandler.class);
+	private Map<String, StreamDefinition> sds;
+	
+	private volatile List<Integer> wisbFieldIndices = new ArrayList<>();
+	// reuse PolicyDefinition.defintion.value field to store full set of values
+	// separated by comma
+	private volatile PolicyDefinition policyDef;
+	private volatile Collector<AlertStreamEvent> collector;
+	private volatile PolicyHandlerContext context;
+	private volatile NoDataWisbType wisbType;
+	private volatile DistinctValuesInTimeBatchWindow distinctWindow;
+
+	public NoDataPolicyTimeBatchHandler(Map<String, StreamDefinition> sds){
+        this.sds = sds;
+    }
+
+	@Override
+	public void prepare(Collector<AlertStreamEvent> collector, PolicyHandlerContext context) throws Exception {
+		this.collector = collector;
+		this.context = context;
+		this.policyDef = context.getPolicyDefinition();
+		List<String> inputStreams = policyDef.getInputStreams();
+		// validate inputStreams has to contain only one stream
+		if (inputStreams.size() != 1)
+			throw new IllegalArgumentException("policy inputStream size has to be 1 for no data alert");
+		// validate outputStream has to contain only one stream
+		if (policyDef.getOutputStreams().size() != 1)
+			throw new IllegalArgumentException("policy outputStream size has to be 1 for no data alert");
+
+		String is = inputStreams.get(0);
+		StreamDefinition sd = sds.get(is);
+
+		String policyValue = policyDef.getDefinition().getValue();
+		// assume that no data alert policy value consists of "windowPeriod,
+		// type, numOfFields, f1_name, f2_name, f1_value, f2_value, f1_value,
+		// f2_value}
+		String[] segments = policyValue.split(",");
+		this.wisbType = NoDataWisbType.valueOf(segments[1]);
+		// for provided wisb values, need to parse, for dynamic wisb values, it
+		// is computed through a window
+		@SuppressWarnings("rawtypes")
+		Set wisbValues = null;
+		if (wisbType == NoDataWisbType.provided) {
+			wisbValues = new NoDataWisbProvidedParser().parse(segments);
+		}
+		long windowPeriod = TimePeriodUtils.getMillisecondsOfPeriod(Period.parse(segments[0]));
+		distinctWindow = new DistinctValuesInTimeBatchWindow(this, windowPeriod, wisbValues);
+		// populate wisb field names
+		int numOfFields = Integer.parseInt(segments[2]);
+		for (int i = 3; i < 3 + numOfFields; i++) {
+			String fn = segments[i];
+			wisbFieldIndices.add(sd.getColumnIndex(fn));
+		}
+	}
+
+	@Override
+	public void send(StreamEvent event) throws Exception {
+		Object[] data = event.getData();
+		
+		List<Object> columnValues = new ArrayList<>();
+		for (int i = 0; i < wisbFieldIndices.size(); i++) {
+			Object o = data[wisbFieldIndices.get(i)];
+			// convert value to string
+			columnValues.add(o.toString());
+		}
+		// use local timestamp rather than event timestamp
+		distinctWindow.send(event, columnValues, System.currentTimeMillis());
+		LOG.debug("event sent to window with wiri: {}", distinctWindow.distinctValues());
+	}
+	
+	@SuppressWarnings("rawtypes")
+	public void compareAndEmit(Set wisb, Set wiri, StreamEvent event) {
+		// compare with wisbValues if wisbValues are already there for dynamic
+		// type
+		Collection noDataValues = CollectionUtils.subtract(wisb, wiri);
+		LOG.debug("nodatavalues:" + noDataValues + ", wisb: " + wisb + ", wiri: " + wiri);
+		if (noDataValues != null && noDataValues.size() > 0) {
+			LOG.info("No data alert is triggered with no data values {} and wisb {}", noDataValues, wisb);
+			
+			String is = policyDef.getOutputStreams().get(0);
+			StreamDefinition sd = sds.get(is);
+			int timestampIndex = sd.getColumnIndex("timestamp");
+			int hostIndex = sd.getColumnIndex("host");
+			int originalStreamNameIndex = sd.getColumnIndex("originalStreamName");
+			
+			for (Object one : noDataValues) {
+				Object[] triggerEvent = new Object[sd.getColumns().size()];
+				for (int i = 0; i < sd.getColumns().size(); i ++) {
+					if (i == timestampIndex) {
+						triggerEvent[i] = System.currentTimeMillis();
+					} else if (i == hostIndex) {
+						triggerEvent[hostIndex] = ((List) one).get(0);
+					} else if (i == originalStreamNameIndex) {
+						triggerEvent[originalStreamNameIndex] = event.getStreamId();
+					} else if (sd.getColumns().size() < i) {
+						LOG.error("strema event data have different lenght compare to column definition!");
+					} else {
+						triggerEvent[i] = sd.getColumns().get(i).getDefaultValue();
+					}
+				}
+				AlertStreamEvent alertEvent = createAlertEvent(sd, event.getTimestamp(), triggerEvent);
+				LOG.info(String.format("Nodata alert %s generated and will be emitted", Joiner.on(",").join(triggerEvent)));
+				collector.emit(alertEvent);
+			}
+			
+		}
+	}
+
+	private AlertStreamEvent createAlertEvent(StreamDefinition sd, long timestamp, Object[] triggerEvent) {
+		AlertStreamEvent event = new AlertStreamEvent();
+		event.setTimestamp(timestamp);
+		event.setData(triggerEvent);
+		event.setStreamId(policyDef.getOutputStreams().get(0));
+		event.setPolicy(context.getPolicyDefinition());
+		if (this.context.getPolicyEvaluator() != null) {
+			event.setCreatedBy(context.getPolicyEvaluator().getName());
+		}
+		event.setCreatedTime(System.currentTimeMillis());
+		event.setSchema(sd);
+		return event;
+	}
+
+	@Override
+	public void close() throws Exception {
+
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/ebbaad09/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/JsonEventSerializer.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/JsonEventSerializer.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/JsonEventSerializer.java
index bf2a954..f30bf8f 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/JsonEventSerializer.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/JsonEventSerializer.java
@@ -43,7 +43,7 @@ public class JsonEventSerializer implements IEventSerializer {
     public Object serialize(AlertStreamEvent event) {
         String result = streamEventToJson(event);
         if (LOG.isDebugEnabled()) {
-            LOG.debug("serialized alert event : ", result);
+            LOG.debug("serialized alert event : {}", result);
         }
         return result;
     }

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/ebbaad09/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/siddhi/extension/AttributeCollectWithDistinctAggregator.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/siddhi/extension/AttributeCollectWithDistinctAggregator.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/siddhi/extension/AttributeCollectWithDistinctAggregator.java
new file mode 100644
index 0000000..43400c7
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/siddhi/extension/AttributeCollectWithDistinctAggregator.java
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.alert.engine.siddhi.extension;
+
+import java.util.LinkedList;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.wso2.siddhi.core.config.ExecutionPlanContext;
+import org.wso2.siddhi.core.executor.ExpressionExecutor;
+import org.wso2.siddhi.core.query.selector.attribute.aggregator.AttributeAggregator;
+import org.wso2.siddhi.query.api.definition.Attribute;
+import org.wso2.siddhi.query.api.definition.Attribute.Type;
+
+import com.google.common.collect.ImmutableList;
+
+public class AttributeCollectWithDistinctAggregator extends AttributeAggregator {
+
+    private static final Logger LOG = LoggerFactory.getLogger(AttributeCollectAggregator.class);
+
+    private LinkedList<Object> value;
+
+    public AttributeCollectWithDistinctAggregator() {
+        value = new LinkedList<Object>();
+    }
+
+    @Override
+    public void start() {
+    }
+
+    @Override
+    public void stop() {
+    }
+
+    @Override
+    public Object[] currentState() {
+        return value.toArray();
+    }
+
+    @Override
+    public void restoreState(Object[] arg0) {
+        value = new LinkedList<Object>();
+        if (arg0 != null) {
+            for (Object o : arg0) {
+                value.add(o);
+            }
+        }
+    }
+
+    @Override
+    public Type getReturnType() {
+        return Attribute.Type.OBJECT;
+    }
+
+    @Override
+    protected void init(ExpressionExecutor[] arg0, ExecutionPlanContext arg1) {
+        // TODO: Support max of elements?
+    }
+
+    @Override
+    public Object processAdd(Object arg0) {
+    	// AttributeAggregator.process is already synchronized
+    	if (value.contains(arg0)) {
+    		value.remove(arg0);
+    	}
+    	value.add(arg0);
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("processAdd: current values are : " + value);
+        }
+        return ImmutableList.copyOf(value);
+    }
+
+    @Override
+    public Object processAdd(Object[] arg0) {
+    	// AttributeAggregator.process is already synchronized
+    	if (value.contains(arg0)) {
+    		value.remove(arg0);
+    	}
+    	value.add(arg0);
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("processAdd: current values are : " + value);
+        }
+        return ImmutableList.copyOf(value);
+    }
+
+    // / NOTICE: non O(1)
+    @Override
+    public Object processRemove(Object arg0) {
+        value.remove(arg0);
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("processRemove: current values are : " + value);
+        }
+        return ImmutableList.copyOf(value);
+    }
+
+    // / NOTICE: non O(1)
+    @Override
+    public Object processRemove(Object[] arg0) {
+        value.remove(arg0);
+        LOG.info("processRemove: current values are : " + value);
+        return ImmutableList.copyOf(value);
+    }
+
+    @Override
+    public Object reset() {
+        value.clear();
+        return value;
+    }
+    
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/ebbaad09/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/resources/eagle.siddhiext
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/resources/eagle.siddhiext b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/resources/eagle.siddhiext
index 4ce9805..16569a4 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/resources/eagle.siddhiext
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/resources/eagle.siddhiext
@@ -16,3 +16,4 @@
 #
 
 collect=org.apache.eagle.alert.engine.siddhi.extension.AttributeCollectAggregator
+collectWithDistinct=org.apache.eagle.alert.engine.siddhi.extension.AttributeCollectWithDistinctAggregator
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/ebbaad09/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/nodata/TestDistinctValuesInTimeBatchWindow.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/nodata/TestDistinctValuesInTimeBatchWindow.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/nodata/TestDistinctValuesInTimeBatchWindow.java
new file mode 100644
index 0000000..72ef02b
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/nodata/TestDistinctValuesInTimeBatchWindow.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.alert.engine.nodata;
+
+import static org.mockito.Matchers.anyObject;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.times;
+
+import org.apache.eagle.alert.engine.evaluator.nodata.DistinctValuesInTimeBatchWindow;
+import org.apache.eagle.alert.engine.evaluator.nodata.NoDataPolicyTimeBatchHandler;
+import org.apache.eagle.alert.engine.model.StreamEvent;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TestDistinctValuesInTimeBatchWindow {
+
+	private static final String inputStream = "testInputStream";
+
+	private NoDataPolicyTimeBatchHandler handler;
+
+	@Before
+	public void setup() {
+		handler = mock(NoDataPolicyTimeBatchHandler.class);
+	}
+
+	@After
+	public void teardown() {
+	}
+
+	@Test
+	public void testNormal() throws Exception {
+		// wisb is null since it is dynamic mode
+		DistinctValuesInTimeBatchWindow window = new DistinctValuesInTimeBatchWindow(handler, 5 * 1000, null);
+
+		long now = System.currentTimeMillis();
+
+		// handler.compareAndEmit(anyObject(), anyObject(), anyObject());
+
+		// event time
+		sendEventToWindow(window, now, "host1", 95.5);
+
+		Thread.sleep(6000);
+
+		sendEventToWindow(window, now, "host1", 91.0);
+		sendEventToWindow(window, now, "host2", 95.5);
+		sendEventToWindow(window, now, "host2", 97.1);
+
+		Thread.sleep(3000);
+
+		sendEventToWindow(window, now, "host1", 90.7);
+
+		Thread.sleep(4000);
+
+		sendEventToWindow(window, now, "host1", 90.7);
+		
+		Thread.sleep(3000);
+
+		verify(handler, times(3)).compareAndEmit(anyObject(), anyObject(), anyObject());
+	}
+
+	private void sendEventToWindow(DistinctValuesInTimeBatchWindow window, long ts, String host, double value) {
+		window.send(buildStreamEvent(ts, host, value), host, ts);
+	}
+
+	private StreamEvent buildStreamEvent(long ts, String host, double value) {
+		StreamEvent e = new StreamEvent();
+		e.setData(new Object[] { ts, host, value });
+		e.setStreamId(inputStream);
+		e.setTimestamp(ts);
+		return e;
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/ebbaad09/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/nodata/TestNoDataPolicyTimeBatchHandler.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/nodata/TestNoDataPolicyTimeBatchHandler.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/nodata/TestNoDataPolicyTimeBatchHandler.java
new file mode 100644
index 0000000..02d19b4
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/nodata/TestNoDataPolicyTimeBatchHandler.java
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.alert.engine.nodata;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.eagle.alert.engine.Collector;
+import org.apache.eagle.alert.engine.coordinator.PolicyDefinition;
+import org.apache.eagle.alert.engine.coordinator.StreamColumn;
+import org.apache.eagle.alert.engine.coordinator.StreamDefinition;
+import org.apache.eagle.alert.engine.evaluator.PolicyHandlerContext;
+import org.apache.eagle.alert.engine.evaluator.nodata.NoDataPolicyTimeBatchHandler;
+import org.apache.eagle.alert.engine.model.AlertStreamEvent;
+import org.apache.eagle.alert.engine.model.StreamEvent;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class TestNoDataPolicyTimeBatchHandler {
+
+	private static final Logger LOG = LoggerFactory.getLogger(TestNoDataPolicyTimeBatchHandler.class);
+	
+	private static final String inputStream = "testInputStream";
+	private static final String outputStream = "testOutputStream";
+
+	@Before
+	public void setup() {
+	}
+
+	@SuppressWarnings("unchecked")
+	@Test
+	public void testDynamic1() throws Exception {
+		Map<String, StreamDefinition> sds = new HashMap<>();
+		sds.put("testInputStream", buildStreamDef());
+		sds.put("testOutputStream", buildOutputStreamDef());
+		NoDataPolicyTimeBatchHandler handler = new NoDataPolicyTimeBatchHandler(sds);
+
+		PolicyHandlerContext context = new PolicyHandlerContext();
+		context.setPolicyDefinition(buildPolicyDef_dynamic());
+		handler.prepare(new TestCollector(), context);
+
+		long now = System.currentTimeMillis();
+		
+		handler.send(buildStreamEvt(now, "host1", 12.5));
+		
+		Thread.sleep(2000);
+		
+		handler.send(buildStreamEvt(now, "host2", 12.6));
+		handler.send(buildStreamEvt(now, "host1", 20.9));
+		handler.send(buildStreamEvt(now, "host2", 22.1));
+		handler.send(buildStreamEvt(now, "host2", 22.1));
+		
+		Thread.sleep(5000);
+		
+		handler.send(buildStreamEvt(now, "host2", 22.1));
+		handler.send(buildStreamEvt(now, "host2", 22.3));
+		
+		Thread.sleep(5000);
+		
+		handler.send(buildStreamEvt(now, "host2", 22.9));
+		handler.send(buildStreamEvt(now, "host1", 41.6));
+		handler.send(buildStreamEvt(now, "host2", 45.6));
+		
+		Thread.sleep(1000);
+	}
+	
+	@SuppressWarnings("rawtypes")
+    private static class TestCollector implements Collector{
+        @Override
+        public void emit(Object o) {
+            AlertStreamEvent e = (AlertStreamEvent)o;
+            Object[] data = e.getData();
+            
+            LOG.info("alert data: {}, {}", data[1], data[0]);
+            
+            Assert.assertEquals("host1", data[1]);
+        }
+    }
+
+	private PolicyDefinition buildPolicyDef_dynamic() {
+		PolicyDefinition pd = new PolicyDefinition();
+		PolicyDefinition.Definition def = new PolicyDefinition.Definition();
+		def.setValue("PT5S,dynamic,1,host");
+		def.setType("nodataalert");
+		pd.setDefinition(def);
+		pd.setInputStreams(Arrays.asList(inputStream));
+		pd.setOutputStreams(Arrays.asList(outputStream));
+		pd.setName("nodataalert-test");
+		return pd;
+	}
+
+	private StreamDefinition buildStreamDef() {
+		StreamDefinition sd = new StreamDefinition();
+		StreamColumn tsColumn = new StreamColumn();
+		tsColumn.setName("timestamp");
+		tsColumn.setType(StreamColumn.Type.LONG);
+
+		StreamColumn hostColumn = new StreamColumn();
+		hostColumn.setName("host");
+		hostColumn.setType(StreamColumn.Type.STRING);
+
+		StreamColumn valueColumn = new StreamColumn();
+		valueColumn.setName("value");
+		valueColumn.setType(StreamColumn.Type.DOUBLE);
+
+		sd.setColumns(Arrays.asList(tsColumn, hostColumn, valueColumn));
+		sd.setDataSource("testDataSource");
+		sd.setStreamId("testInputStream");
+		return sd;
+	}
+	
+	private StreamDefinition buildOutputStreamDef() {
+		StreamDefinition sd = new StreamDefinition();
+		StreamColumn tsColumn = new StreamColumn();
+		tsColumn.setName("timestamp");
+		tsColumn.setType(StreamColumn.Type.LONG);
+
+		StreamColumn hostColumn = new StreamColumn();
+		hostColumn.setName("host");
+		hostColumn.setType(StreamColumn.Type.STRING);
+
+		StreamColumn valueColumn = new StreamColumn();
+		valueColumn.setName("originalStreamName");
+		valueColumn.setType(StreamColumn.Type.STRING);
+
+		sd.setColumns(Arrays.asList(tsColumn, hostColumn, valueColumn));
+		sd.setDataSource("testDataSource");
+		sd.setStreamId("testOutputStream");
+		return sd;
+	}
+
+	private StreamEvent buildStreamEvt(long ts, String host, double value) {
+		StreamEvent e = new StreamEvent();
+		e.setData(new Object[] { ts, host, value });
+		e.setStreamId(inputStream);
+		e.setTimestamp(ts);
+		return e;
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/ebbaad09/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/resources/eagle.siddhiext
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/resources/eagle.siddhiext b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/resources/eagle.siddhiext
new file mode 100644
index 0000000..4ce9805
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/resources/eagle.siddhiext
@@ -0,0 +1,18 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+collect=org.apache.eagle.alert.engine.siddhi.extension.AttributeCollectAggregator

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/ebbaad09/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/resources/simple/application-integration.conf
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/resources/simple/application-integration.conf b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/resources/simple/application-integration.conf
index 73e5b30..3f2fbc3 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/resources/simple/application-integration.conf
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/resources/simple/application-integration.conf
@@ -18,12 +18,12 @@
     "numOfTotalWorkers": 20,
     "numOfSpoutTasks" : 1,
     "numOfRouterBolts" : 4,
-    "numOfAlertBolts" : 10,
+    "numOfAlertBolts" : 20,
     "numOfPublishTasks" : 1,
     "localMode" : "true"
   },
   "spout" : {
-    "kafkaBrokerZkQuorum": "localhost:2181",
+    "kafkaBrokerZkQuorum": "127.0.0.1:2181",
     "kafkaBrokerZkBasePath": "/brokers",
     "stormKafkaUseSameZkQuorumWithKafkaBroker": true,
     "stormKafkaTransactionZkQuorum": "",
@@ -33,7 +33,7 @@
     "stormKafkaFetchSizeBytes": 1048586,
   },
   "zkConfig" : {
-    "zkQuorum" : "localhost:2181",
+    "zkQuorum" : "127.0.0.1:2181",
     "zkRoot" : "/alert",
     "zkSessionTimeoutMs" : 10000,
     "connectionTimeoutMs" : 10000,
@@ -55,6 +55,12 @@
   	"context" : "/rest"
   },
   "kafkaProducer": {
-  	"bootstrapServers": "localhost:9092"
-  }
+  	"bootstrapServers": "127.0.0.1:9092"
+  },
+	"email": {
+		"sender": "eagle@eagle.com",
+		"recipients": "test@eagle.com",
+		"mail.smtp.host": "test.eagle.com",
+		"mail.smtp.port": "25"
+	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/ebbaad09/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/resources/simple/topologies.json
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/resources/simple/topologies.json b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/resources/simple/topologies.json
index 411cc48..984fcdb 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/resources/simple/topologies.json
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/resources/simple/topologies.json
@@ -2,7 +2,7 @@
 {
 	"name": "alertUnitTopology_1",
 	"numOfSpout":1,
-	"numOfAlertBolt": 10,
+	"numOfAlertBolt": 20,
 	"numOfGroupBolt": 4,
 	"spoutId": "alertEngineSpout",
 	"groupNodeIds" : [
@@ -21,7 +21,17 @@
 		"alertBolt6",
 		"alertBolt7",
 		"alertBolt8",
-		"alertBolt9"
+		"alertBolt9",
+		"alertBolt10",
+		"alertBolt11",
+		"alertBolt12",
+		"alertBolt13",
+		"alertBolt14",
+		"alertBolt15",
+		"alertBolt16",
+		"alertBolt17",
+		"alertBolt18",
+		"alertBolt19"
 	],
 	"pubBoltId" : "alertPublishBolt",
 	"spoutParallelism": 1,

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/ebbaad09/eagle-core/eagle-alert-parent/eagle-alert/alert-service/src/main/resources/application.conf
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-service/src/main/resources/application.conf b/eagle-core/eagle-alert-parent/eagle-alert/alert-service/src/main/resources/application.conf
index 97edc5a..72a731a 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-service/src/main/resources/application.conf
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-service/src/main/resources/application.conf
@@ -20,7 +20,7 @@
 		"policyDefaultParallelism" : 5,
 		"boltLoadUpbound": 0.8,
 		"topologyLoadUpbound" : 0.8,
-		"numOfAlertBoltsPerTopology" : 5,
+		"numOfAlertBoltsPerTopology" : 20,
 		"zkConfig" : {
 			"zkQuorum" : "127.0.0.1:2181",
 			"zkRoot" : "/alert",
@@ -37,6 +37,15 @@
 		"metadataDynamicCheck" : {
 			"initDelayMillis" : 1000,
 			"delayMillis" : 30000
+		},
+		"kafkaProducer": {
+			"bootstrapServers": "127.0.0.1:9092"
+		},
+		"email": {
+			"sender": "eagle@eagle.com",
+			"recipients": "test@eagle.com",
+			"mailSmtpHost": "test.eagle.com",
+			"mailSmtpPort": "25"
 		}
 	},
 	"datastore": {