You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@eagle.apache.org by ha...@apache.org on 2016/07/07 00:53:34 UTC

[4/6] incubator-eagle git commit: Rebase code base

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/72a1501c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/impl/FloatSerializer.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/impl/FloatSerializer.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/impl/FloatSerializer.java
index 18089a9..e6b510a 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/impl/FloatSerializer.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/impl/FloatSerializer.java
@@ -1,11 +1,11 @@
 package org.apache.eagle.alert.engine.serialization.impl;
 
-import org.apache.eagle.alert.engine.serialization.Serializer;
-
 import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
 
+import org.apache.eagle.alert.engine.serialization.Serializer;
+
 /**
  * Licensed to the Apache Software Foundation (ASF) under one or more
  * contributor license agreements.  See the NOTICE file distributed with

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/72a1501c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/impl/IntegerSerializer.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/impl/IntegerSerializer.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/impl/IntegerSerializer.java
index d2473a9..f784456 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/impl/IntegerSerializer.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/impl/IntegerSerializer.java
@@ -1,11 +1,11 @@
 package org.apache.eagle.alert.engine.serialization.impl;
 
-import org.apache.eagle.alert.engine.serialization.Serializer;
-
 import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
 
+import org.apache.eagle.alert.engine.serialization.Serializer;
+
 /**
  * Licensed to the Apache Software Foundation (ASF) under one or more
  * contributor license agreements.  See the NOTICE file distributed with

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/72a1501c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/impl/JavaObjectSerializer.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/impl/JavaObjectSerializer.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/impl/JavaObjectSerializer.java
index 76d2294..39baf2b 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/impl/JavaObjectSerializer.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/impl/JavaObjectSerializer.java
@@ -16,14 +16,14 @@
  */
 package org.apache.eagle.alert.engine.serialization.impl;
 
-import org.apache.commons.lang3.SerializationUtils;
-import org.apache.eagle.alert.engine.serialization.Serializer;
-
 import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
 import java.io.Serializable;
 
+import org.apache.commons.lang3.SerializationUtils;
+import org.apache.eagle.alert.engine.serialization.Serializer;
+
 public class JavaObjectSerializer implements Serializer<Object> {
     @Override
     public void serialize(Object value, DataOutput dataOutput) throws IOException {

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/72a1501c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/impl/LongSerializer.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/impl/LongSerializer.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/impl/LongSerializer.java
index 8d85c76..116b275 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/impl/LongSerializer.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/impl/LongSerializer.java
@@ -1,11 +1,11 @@
 package org.apache.eagle.alert.engine.serialization.impl;
 
-import org.apache.eagle.alert.engine.serialization.Serializer;
-
 import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
 
+import org.apache.eagle.alert.engine.serialization.Serializer;
+
 /**
  * Licensed to the Apache Software Foundation (ASF) under one or more
  * contributor license agreements.  See the NOTICE file distributed with

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/72a1501c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/impl/PartitionedEventSerializerImpl.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/impl/PartitionedEventSerializerImpl.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/impl/PartitionedEventSerializerImpl.java
index 714920e..5a3d77d 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/impl/PartitionedEventSerializerImpl.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/impl/PartitionedEventSerializerImpl.java
@@ -16,8 +16,10 @@
  */
 package org.apache.eagle.alert.engine.serialization.impl;
 
-import com.google.common.io.ByteArrayDataOutput;
-import com.google.common.io.ByteStreams;
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
 import org.apache.eagle.alert.engine.coordinator.StreamPartition;
 import org.apache.eagle.alert.engine.model.PartitionedEvent;
 import org.apache.eagle.alert.engine.model.StreamEvent;
@@ -26,7 +28,8 @@ import org.apache.eagle.alert.engine.serialization.SerializationMetadataProvider
 import org.apache.eagle.alert.engine.serialization.Serializer;
 import org.apache.eagle.alert.engine.utils.CompressionUtils;
 
-import java.io.*;
+import com.google.common.io.ByteArrayDataOutput;
+import com.google.common.io.ByteStreams;
 
 /**
  * Stream Metadata Cached Serializer

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/72a1501c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/impl/StreamEventSerializer.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/impl/StreamEventSerializer.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/impl/StreamEventSerializer.java
index 113816f..0fb686b 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/impl/StreamEventSerializer.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/impl/StreamEventSerializer.java
@@ -16,6 +16,11 @@
  */
 package org.apache.eagle.alert.engine.serialization.impl;
 
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.BitSet;
+
 import org.apache.eagle.alert.engine.coordinator.StreamColumn;
 import org.apache.eagle.alert.engine.coordinator.StreamDefinition;
 import org.apache.eagle.alert.engine.model.StreamEvent;
@@ -23,11 +28,6 @@ import org.apache.eagle.alert.engine.serialization.SerializationMetadataProvider
 import org.apache.eagle.alert.engine.serialization.Serializer;
 import org.apache.eagle.alert.engine.serialization.Serializers;
 
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-import java.util.BitSet;
-
 /**
  * @see StreamEvent
  */

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/72a1501c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/impl/StreamPartitionDigestSerializer.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/impl/StreamPartitionDigestSerializer.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/impl/StreamPartitionDigestSerializer.java
index 1268cb8..f35da39 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/impl/StreamPartitionDigestSerializer.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/impl/StreamPartitionDigestSerializer.java
@@ -16,13 +16,19 @@
  */
 package org.apache.eagle.alert.engine.serialization.impl;
 
-import org.apache.eagle.alert.engine.coordinator.StreamPartition;
-import org.apache.eagle.alert.engine.serialization.Serializer;
-
-import java.io.*;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.io.ObjectOutputStream;
 import java.security.MessageDigest;
 import java.security.NoSuchAlgorithmException;
-import java.util.*;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.eagle.alert.engine.coordinator.StreamPartition;
+import org.apache.eagle.alert.engine.serialization.Serializer;
 
 /**
  * Don't serialize streamId

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/72a1501c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/impl/StreamPartitionSerializer.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/impl/StreamPartitionSerializer.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/impl/StreamPartitionSerializer.java
index 67de517..4105277 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/impl/StreamPartitionSerializer.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/impl/StreamPartitionSerializer.java
@@ -16,16 +16,16 @@
  */
 package org.apache.eagle.alert.engine.serialization.impl;
 
-import org.apache.eagle.alert.engine.coordinator.StreamPartition;
-import org.apache.eagle.alert.engine.coordinator.StreamSortSpec;
-import org.apache.eagle.alert.engine.serialization.Serializer;
-
 import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
 
+import org.apache.eagle.alert.engine.coordinator.StreamPartition;
+import org.apache.eagle.alert.engine.coordinator.StreamSortSpec;
+import org.apache.eagle.alert.engine.serialization.Serializer;
+
 /**
  * Don't serialize streamId
  *

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/72a1501c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/impl/StringSerializer.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/impl/StringSerializer.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/impl/StringSerializer.java
index 2a1541a..940024d 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/impl/StringSerializer.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/impl/StringSerializer.java
@@ -1,11 +1,11 @@
 package org.apache.eagle.alert.engine.serialization.impl;
 
-import org.apache.eagle.alert.engine.serialization.Serializer;
-
 import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
 
+import org.apache.eagle.alert.engine.serialization.Serializer;
+
 /**
  * Licensed to the Apache Software Foundation (ASF) under one or more
  * contributor license agreements.  See the NOTICE file distributed with

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/72a1501c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/spout/CorrelationSpout.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/spout/CorrelationSpout.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/spout/CorrelationSpout.java
index cd23405..f54d5cd 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/spout/CorrelationSpout.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/spout/CorrelationSpout.java
@@ -304,7 +304,12 @@ public class CorrelationSpout extends BaseRichSpout implements SpoutSpecListener
     protected KafkaSpoutWrapper createKafkaSpout(Map conf, TopologyContext context, SpoutOutputCollector collector, final String topic,
                                                  String schemeClsName, SpoutSpec spoutSpec, Map<String, StreamDefinition> sds) throws Exception{
         String kafkaBrokerZkQuorum = config.getString("spout.kafkaBrokerZkQuorum");
-        BrokerHosts hosts = new ZkHosts(kafkaBrokerZkQuorum);
+        BrokerHosts hosts = null;
+        if (config.hasPath("spout.kafkaBrokerZkBasePath")) {
+            hosts = new ZkHosts(kafkaBrokerZkQuorum, config.getString("spout.kafkaBrokerZkBasePath"));
+        } else {
+            hosts = new ZkHosts(kafkaBrokerZkQuorum);
+        }
         String transactionZkRoot = DEFAULT_STORM_KAFKA_TRANSACTION_ZK_ROOT;
         if(config.hasPath("spout.stormKafkaTransactionZkPath")) {
             transactionZkRoot = config.getString("spout.stormKafkaTransactionZkPath");
@@ -335,10 +340,14 @@ public class CorrelationSpout extends BaseRichSpout implements SpoutSpecListener
             spoutConfig.startOffsetTime = config.getInt("spout.stormKafkaStartOffsetTime");
         }
 
-        spoutConfig.scheme = new SchemeAsMultiScheme(SchemeBuilder.buildFromClsName(schemeClsName, topic));
+        spoutConfig.scheme = new SchemeAsMultiScheme(SchemeBuilder.buildFromClsName(schemeClsName, topic, conf));
         KafkaSpoutWrapper wrapper = new KafkaSpoutWrapper(spoutConfig, kafkaSpoutMetric);
         SpoutOutputCollectorWrapper collectorWrapper = new SpoutOutputCollectorWrapper(this, collector, topic, spoutSpec, numOfRouterBolts, sds,this.serializer);
         wrapper.open(conf, context, collectorWrapper);
+        
+        if (LOG.isInfoEnabled()) {
+            LOG.info("create and open kafka wrapper: topic {}, scheme class{} ", topic, schemeClsName);
+        }
         return wrapper;
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/72a1501c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/spout/CreateTopicUtils.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/spout/CreateTopicUtils.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/spout/CreateTopicUtils.java
index a2c9219..7e0dd3f 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/spout/CreateTopicUtils.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/spout/CreateTopicUtils.java
@@ -21,23 +21,34 @@ package org.apache.eagle.alert.engine.spout;
 import java.util.Properties;
 
 import kafka.admin.AdminUtils;
+import kafka.admin.RackAwareMode;
 import kafka.utils.ZKStringSerializer$;
+import kafka.utils.ZkUtils;
 
 import org.I0Itec.zkclient.ZkClient;
+import org.I0Itec.zkclient.ZkConnection;
 import org.slf4j.Logger;
 
 /**
  * normally this is used in unit test for convenience
  */
 public class CreateTopicUtils {
+
     private static final Logger LOG = org.slf4j.LoggerFactory.getLogger(CreateTopicUtils.class);
+
     private static final int partitions = 2;
     private static final int replicationFactor = 1;
-    public static void ensureTopicReady(String zkQuorum, String topic){
+
+    public static void ensureTopicReady(String zkQuorum, String topic) {
+        ZkConnection zkConnection = new ZkConnection(zkQuorum);
         ZkClient zkClient = new ZkClient(zkQuorum, 10000, 10000, ZKStringSerializer$.MODULE$);
-        if(!AdminUtils.topicExists(zkClient, topic)) {
-            LOG.info("create topic " + topic + " with partitions " + partitions + ", and replicationFactor " + replicationFactor);
-            AdminUtils.createTopic(zkClient, topic, partitions, replicationFactor, new Properties());
+        ZkUtils zkUtils = new ZkUtils(zkClient, zkConnection, false);
+        if (!AdminUtils.topicExists(zkUtils, topic)) {
+            LOG.info("create topic " + topic + " with partitions " + partitions + ", and replicationFactor "
+                    + replicationFactor);
+            AdminUtils.createTopic(zkUtils, topic, partitions, replicationFactor, new Properties(),
+                    RackAwareMode.Disabled$.MODULE$);
         }
     }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/72a1501c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/spout/SchemeBuilder.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/spout/SchemeBuilder.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/spout/SchemeBuilder.java
index 223f1b5..bfd5da7 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/spout/SchemeBuilder.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/spout/SchemeBuilder.java
@@ -19,6 +19,8 @@
 
 package org.apache.eagle.alert.engine.spout;
 
+import java.util.Map;
+
 import backtype.storm.spout.Scheme;
 
 
@@ -28,8 +30,10 @@ import backtype.storm.spout.Scheme;
  * 2) has one constructor with topic name as parameter
  */
 public class SchemeBuilder {
-    public static Scheme buildFromClsName(String clsName, String topic) throws Exception{
-        Object o = Class.forName(clsName).getConstructor(String.class).newInstance(topic);
+
+    @SuppressWarnings("rawtypes")
+    public static Scheme buildFromClsName(String clsName, String topic, Map conf) throws Exception{
+        Object o = Class.forName(clsName).getConstructor(String.class, Map.class).newInstance(topic, conf);
         return (Scheme)o;
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/72a1501c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/spout/SpoutOutputCollectorWrapper.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/spout/SpoutOutputCollectorWrapper.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/spout/SpoutOutputCollectorWrapper.java
index b37f7b3..2f7cc68 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/spout/SpoutOutputCollectorWrapper.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/spout/SpoutOutputCollectorWrapper.java
@@ -18,7 +18,6 @@
  */
 package org.apache.eagle.alert.engine.spout;
 
-import java.io.IOException;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
@@ -112,7 +111,7 @@ public class SpoutOutputCollectorWrapper extends SpoutOutputCollector implements
         Object streamId = convertedTuple.get(1);
 
         StreamDefinition sd = sds.get(streamId);
-        if(sd == null){
+        if (sd == null) {
             LOG.warn("StreamDefinition {} is not found within {}, ignore this message", streamId, sds);
             spout.ack(newMessageId);
             return null;
@@ -141,17 +140,17 @@ public class SpoutOutputCollectorWrapper extends SpoutOutputCollector implements
                     }
                     // send message to StreamRouterBolt
                     PartitionedEvent pEvent = new PartitionedEvent(event, groupingStrategy.partition, hash);
-                    if(this.serializer == null){
-                         delegate.emit(sid, Collections.singletonList(pEvent), newMessageId);
-                    }else {
+                    if (this.serializer == null) {
+                        delegate.emit(sid, Collections.singletonList(pEvent), newMessageId);
+                    } else {
                         try {
                             delegate.emit(sid, Collections.singletonList(serializer.serialize(pEvent)), newMessageId);
-                        } catch (IOException e) {
-                            LOG.error("Failed to serialize {}", pEvent, e);
-                            throw new RuntimeException(e);
+                        } catch (Exception e) {
+                            LOG.error("Failed to serialize {}, this message would be ignored!", pEvent, e);
+                            spout.ack(newMessageId);
                         }
                     }
-                }else{
+                } else {
                     // ******* short-cut ack ********
                     // we should simply ack those messages which are not processed in this topology because KafkaSpout implementation requires _pending is empty
                     // before moving to next offsets.

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/72a1501c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/utils/CompressionUtils.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/utils/CompressionUtils.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/utils/CompressionUtils.java
index f526cad..0fbe7b3 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/utils/CompressionUtils.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/utils/CompressionUtils.java
@@ -16,12 +16,15 @@
  */
 package org.apache.eagle.alert.engine.utils;
 
-import com.google.common.io.ByteStreams;
-
-import java.io.*;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
 import java.util.zip.GZIPInputStream;
 import java.util.zip.GZIPOutputStream;
 
+import com.google.common.io.ByteStreams;
+
 
 public class CompressionUtils {
     public static byte[] compress(byte[] source) throws IOException {

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/72a1501c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/resources/application.conf
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/resources/application.conf b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/resources/application.conf
index bfb5f54..e3be1b7 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/resources/application.conf
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/resources/application.conf
@@ -24,7 +24,7 @@
     "localMode" : "true"
   },
   "spout" : {
-    "kafkaBrokerZkQuorum": "sandbox.hortonworks.com:2181",
+    "kafkaBrokerZkQuorum": "10.254.194.245:2181",
     "kafkaBrokerZkBasePath": "/brokers",
     "stormKafkaUseSameZkQuorumWithKafkaBroker": true,
     "stormKafkaTransactionZkQuorum": "",
@@ -34,8 +34,8 @@
     "stormKafkaFetchSizeBytes": 1048586,
   },
   "zkConfig" : {
-    "zkQuorum" : "sandbox.hortonworks.com:2181",
-    "zkRoot" : "/alert",
+    "zkQuorum" : "10.254.194.245:2181",
+    "zkRoot" : "/kafka",
     "zkSessionTimeoutMs" : 10000,
     "connectionTimeoutMs" : 10000,
     "zkRetryTimes" : 3,
@@ -47,21 +47,21 @@
   },
   "metadataService": {
     "context" : "/rest",
-    "host" : "localhost",
-    "port" : 58080
+    "host" : "127.0.0.1",
+    "port" : 8080
   },
   "coordinatorService": {
     "host": "localhost",
-    "port": 58080,
+    "port": 9090,
     "context" : "/rest"
   }
   "metric":{
     "sink": {
-//      "kafka": {
-//        "topic": "alert_metric"
-//        "bootstrap.servers": "localhost:6667"
-//      }
-      "stdout": {}
+      // "kafka": {
+      //  "topic": "alert_metric"
+      //  "bootstrap.servers": "localhost:6667"
+      // }
+      //      "stdout": {}
       //      "elasticsearch": {
       //        "hosts": ["localhost:9200"]
       //        "index": "alert_metric"

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/72a1501c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/resources/log4j.properties b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/resources/log4j.properties
index af99e2c..5e3d3b1 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/resources/log4j.properties
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/resources/log4j.properties
@@ -21,6 +21,7 @@ log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
 log4j.appender.stdout.layout.ConversionPattern=%d{ISO8601} %p [%t] %c{2}[%L]: %m%n
 
 ##log4j.logger.org.apache.eagle.alert.engine.spout.CorrelationSpout=DEBUG
+log4j.logger.org.apache.eagle.alert.metric=ERROR
 log4j.logger.org.apache.eagle.alert.engine.spout.SpoutOutputCollectorWrapper=DEBUG
 log4j.logger.org.apache.eagle.alert.engine.router.impl.StreamRouterBoltOutputCollector=DEBUG
 log4j.logger.org.apache.eagle.alert.engine.sorter.impl.StreamSortWindowHandlerImpl=DEBUG
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/72a1501c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/e2e/Integration1.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/e2e/Integration1.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/e2e/Integration1.java
index 057aa73..927dfd7 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/e2e/Integration1.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/e2e/Integration1.java
@@ -17,11 +17,20 @@
 package org.apache.eagle.alert.engine.e2e;
 
 import java.util.List;
-import java.util.concurrent.ExecutionException;
+import java.util.Properties;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
+import java.util.concurrent.ThreadFactory;
 
+import kafka.admin.AdminUtils;
+import kafka.admin.RackAwareMode;
+import kafka.utils.ZKStringSerializer$;
+import kafka.utils.ZkUtils;
+
+import org.I0Itec.zkclient.ZkClient;
+import org.I0Itec.zkclient.ZkConnection;
+import org.apache.eagle.alert.config.ZKConfig;
+import org.apache.eagle.alert.config.ZKConfigBuilder;
 import org.apache.eagle.alert.coordination.model.Kafka2TupleMetadata;
 import org.apache.eagle.alert.coordination.model.internal.Topology;
 import org.apache.eagle.alert.engine.UnitTopologyMain;
@@ -30,7 +39,10 @@ import org.apache.eagle.alert.engine.coordinator.Publishment;
 import org.apache.eagle.alert.engine.coordinator.StreamDefinition;
 import org.apache.eagle.alert.service.IMetadataServiceClient;
 import org.apache.eagle.alert.service.MetadataServiceClientImpl;
+import org.apache.eagle.alert.utils.KafkaEmbedded;
+import org.junit.AfterClass;
 import org.junit.Assert;
+import org.junit.BeforeClass;
 import org.junit.Ignore;
 import org.junit.Test;
 import org.slf4j.Logger;
@@ -53,6 +65,7 @@ import com.typesafe.config.ConfigFactory;
  *
  */
 public class Integration1 {
+    private static final String SIMPLE_CONFIG = "/simple/application-integration.conf";
     private static final Logger LOG = LoggerFactory.getLogger(Integration1.class);
     private static final ObjectMapper om = new ObjectMapper();
 
@@ -61,16 +74,36 @@ public class Integration1 {
         inte.args = args;
         inte.test_simple_threshhold();
     }
-    
+
     private String[] args;
-    private ExecutorService executors = Executors.newFixedThreadPool(5);
+    private ExecutorService executors = Executors.newFixedThreadPool(5, new ThreadFactory() {
+        
+        @Override
+        public Thread newThread(Runnable r) {
+            Thread t = new Thread();
+            t.setDaemon(true);
+            return t;
+        }
+    });
+    private static KafkaEmbedded kafka;
+
+    @BeforeClass
+    public static void setup() {
+//        kafka = new KafkaEmbedded(9092, 2181);
+//        makeSureTopic("perfmon_metrics");
+    }
+
+    @AfterClass
+    public static void end() {
+        if (kafka != null) {
+            kafka.shutdown();
+        }
+    }
 
     /**
      * Assumption:
      * <p>
-     * start metadata service 8080, better in docker
-     * <p>
-     * start coordinator service 9090, better in docker
+     * start metadata service 8080 /rest
      * <p>
      * datasources : perfmon_datasource
      * <p>
@@ -84,20 +117,29 @@ public class Integration1 {
      * 
      * @throws InterruptedException
      */
-    @Ignore
     @Test
     public void test_simple_threshhold() throws Exception {
-        System.setProperty("config.resource", "/application-integration.conf");
+        System.setProperty("config.resource", SIMPLE_CONFIG);
         ConfigFactory.invalidateCaches();
         Config config = ConfigFactory.load();
 
         System.out.println("loading metadatas...");
-        loadMetadatas("/", config);
+        loadMetadatas("/simple/", config);
         System.out.println("loading metadatas done!");
 
+        if (args == null) {
+            args = new String[] { "-f", "simple/application-integration.conf" };
+        }
+
         executors.submit(() -> SampleClient1.main(args));
 
-        executors.submit(() -> UnitTopologyMain.main(args));
+        executors.submit(() -> { 
+            try {
+                UnitTopologyMain.main(args);
+            } catch (Exception e) {
+                e.printStackTrace();
+            }
+        });
 
         Utils.sleep(1000 * 5l);
         while (true) {
@@ -107,40 +149,20 @@ public class Integration1 {
         }
     }
 
-    /**
-     * Test only run expected when there is a missed config in the config file. mark as ignored
-     * @throws InterruptedException
-     * @throws ExecutionException
-     */
-    @Ignore
-    @Test(expected = ExecutionException.class)
-    public void test_typesafe_config() throws InterruptedException, ExecutionException {
-        System.setProperty("config.resource", "/application-integration.conf");
+    public static void makeSureTopic(String topic) {
+        System.setProperty("config.resource", SIMPLE_CONFIG);
         ConfigFactory.invalidateCaches();
-        Future<?> f = executors.submit(() -> {
-            UnitTopologyMain.main(null);
-        });
+        Config config = ConfigFactory.load();
+        ZKConfig zkconfig = ZKConfigBuilder.getZKConfig(config);
 
-        f.get();
+        ZkClient zkClient = new ZkClient(zkconfig.zkQuorum, 10000, 10000, ZKStringSerializer$.MODULE$);
+        Properties topicConfiguration = new Properties();
+        ZkConnection zkConnection = new ZkConnection(zkconfig.zkQuorum);
+        ZkUtils zkUtils = new ZkUtils(zkClient, zkConnection, false);
+        AdminUtils.createTopic(zkUtils, topic, 1, 1, topicConfiguration, RackAwareMode.Disabled$.MODULE$);
     }
 
-//    @Test
-//    private void makeSureTopic() {
-//        System.setProperty("config.resource", "/application-integration.conf");
-//        ConfigFactory.invalidateCaches();
-//        Config config = ConfigFactory.load();
-//        ZKConfig zkconfig = ZKConfigBuilder.getZKConfig(config);
-//        
-//        CuratorFramework curator = CuratorFrameworkFactory.newClient(
-//                zkconfig.zkQuorum,
-//                zkconfig.zkSessionTimeoutMs,
-//                zkconfig.connectionTimeoutMs,
-//                new RetryNTimes(zkconfig.zkRetryTimes, zkconfig.zkRetryInterval)
-//        );
-//    }
-
     public static void proactive_schedule(Config config) throws Exception {
-
         try (CoordinatorClient cc = new CoordinatorClient(config)) {
             try {
                 String resp = cc.schedule();
@@ -205,7 +227,7 @@ public class Integration1 {
     public void testJson() throws Exception {
         {
             JavaType type = CollectionType.construct(List.class, SimpleType.construct(Topology.class));
-            List<Topology> l = om.readValue(Integration1.class.getResourceAsStream("/topologies.json"),
+            List<Topology> l = om.readValue(Integration1.class.getResourceAsStream("/simple/topologies.json"),
                     type);
             Topology t = (Topology) l.get(0);
 
@@ -216,16 +238,16 @@ public class Integration1 {
         {
             JavaType type = CollectionType.construct(List.class, SimpleType.construct(Publishment.class));
             // publishment
-            List<Publishment> l = om.readValue(Integration1.class.getResourceAsStream("/publishments.json"), type);
+            List<Publishment> l = om.readValue(Integration1.class.getResourceAsStream("/simple/publishments.json"), type);
             Publishment p = l.get(0);
             Assert.assertEquals("KAFKA", p.getType());
         }
         
-        checkAll("/");
+        checkAll("/simple/");
         checkAll("/correlation/");
     }
 
-    private void checkAll(String base) throws Exception {
+    public static void checkAll(String base) throws Exception {
         loadEntities(base + "datasources.json", Kafka2TupleMetadata.class);
         loadEntities(base + "policies.json", PolicyDefinition.class);
         loadEntities(base + "publishments.json", Publishment.class);

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/72a1501c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/e2e/Integration2.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/e2e/Integration2.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/e2e/Integration2.java
index 7ea0e7e..a11cc66 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/e2e/Integration2.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/e2e/Integration2.java
@@ -67,7 +67,12 @@ public class Integration2 {
         Config config = ConfigFactory.load();
         Integration1.loadMetadatas("/correlation/", config);
 
-        executors.submit(() -> UnitTopologyMain.main(args));
+        executors.submit(() -> {
+            try {
+                UnitTopologyMain.main(args);
+            } catch (Exception e) {
+                e.printStackTrace();
+            }});
 
         executors.submit(() -> SampleClient2.main(args));
 

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/72a1501c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/e2e/Integration4NoDataAlert.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/e2e/Integration4NoDataAlert.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/e2e/Integration4NoDataAlert.java
new file mode 100644
index 0000000..667d241
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/e2e/Integration4NoDataAlert.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.alert.engine.e2e;
+
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+import org.apache.eagle.alert.engine.UnitTopologyMain;
+import org.apache.eagle.alert.utils.KafkaEmbedded;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import backtype.storm.utils.Utils;
+
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+
+/**
+ * Since 6/29/16.
+ */
+public class Integration4NoDataAlert {
+    private String[] args;
+
+    private ExecutorService executors = Executors.newFixedThreadPool(5);
+
+    private static KafkaEmbedded kafka;
+
+    @BeforeClass
+    public static void setup() {
+        // FIXME : start local kafka
+    }
+
+    @AfterClass
+    public static void end() {
+        if (kafka != null) {
+            kafka.shutdown();
+        }
+    }
+    @Test
+    public void testTriggerNoData() throws Exception{
+        System.setProperty("config.resource", "/nodata/application-nodata.conf");
+        ConfigFactory.invalidateCaches();
+        Config config = ConfigFactory.load();
+
+        System.out.println("loading metadatas...");
+        Integration1.loadMetadatas("/nodata/", config);
+        System.out.println("loading metadatas done!");
+
+
+        executors.submit(() -> {
+            try {
+                UnitTopologyMain.main(args);
+            } catch (Exception e) {
+                e.printStackTrace();
+            }
+        });
+
+        // wait 20 seconds for topology to bring up
+        try{
+            Thread.sleep(20000);
+        }catch(Exception ex){}
+
+        // send mock data
+        executors.submit(() -> {
+            try {
+                SampleClient4NoDataAlert.main(args);
+            } catch (Exception e) {
+                e.printStackTrace();
+            }
+        });
+
+
+        Utils.sleep(1000 * 5l);
+        while (true) {
+            Integration1.proactive_schedule(config);
+
+            Utils.sleep(1000 * 60l * 5);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/72a1501c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/e2e/MetadataServiceClientImpTest.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/e2e/MetadataServiceClientImpTest.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/e2e/MetadataServiceClientImpTest.java
new file mode 100644
index 0000000..c044da0
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/e2e/MetadataServiceClientImpTest.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *
+ */
+
+package org.apache.eagle.alert.engine.e2e;
+
+import java.util.List;
+
+import org.apache.eagle.alert.coordination.model.Kafka2TupleMetadata;
+import org.apache.eagle.alert.coordination.model.internal.Topology;
+import org.apache.eagle.alert.engine.coordinator.PolicyDefinition;
+import org.apache.eagle.alert.engine.coordinator.Publishment;
+import org.apache.eagle.alert.engine.coordinator.StreamDefinition;
+import org.apache.eagle.alert.service.IMetadataServiceClient;
+import org.apache.eagle.alert.service.MetadataServiceClientImpl;
+import org.junit.Ignore;
+import org.junit.Test;
+
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+
+public class MetadataServiceClientImpTest {
+
+    @Test @Ignore
+    public void test() {
+        System.out.println("loading metadatas...");
+        try {
+            System.setProperty("config.resource", "/application-integration.conf");
+            ConfigFactory.invalidateCaches();
+            Config config = ConfigFactory.load();
+            loadMetadatas("/", config);
+        } catch (Exception e) {
+            e.printStackTrace();
+        }
+        System.out.println("loading metadatas done!");
+    }
+
+    private void loadMetadatas(String base, Config config) throws Exception {
+        IMetadataServiceClient client = new MetadataServiceClientImpl(config);
+        client.clear();
+
+        List<Kafka2TupleMetadata> metadata = Integration1.loadEntities(base + "datasources.json", Kafka2TupleMetadata.class);
+        client.addDataSources(metadata);
+
+        List<PolicyDefinition> policies = Integration1.loadEntities(base + "policies.json", PolicyDefinition.class);
+        client.addPolicies(policies);
+
+        List<Publishment> pubs = Integration1.loadEntities(base + "publishments.json", Publishment.class);
+        client.addPublishments(pubs);
+
+        List<StreamDefinition> defs = Integration1.loadEntities(base + "streamdefinitions.json", StreamDefinition.class);
+        client.addStreamDefinitions(defs);
+
+        List<Topology> topos = Integration1.loadEntities(base + "topologies.json", Topology.class);
+        client.addTopologies(topos);
+
+        client.close();
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/72a1501c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/e2e/MetadataServiceTest.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/e2e/MetadataServiceTest.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/e2e/MetadataServiceTest.java
deleted file mode 100644
index 12fbfea..0000000
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/e2e/MetadataServiceTest.java
+++ /dev/null
@@ -1,85 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- *
- */
-
-package org.apache.eagle.alert.engine.e2e;
-
-import com.fasterxml.jackson.databind.JavaType;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.databind.type.CollectionType;
-import com.fasterxml.jackson.databind.type.SimpleType;
-import com.typesafe.config.Config;
-import com.typesafe.config.ConfigFactory;
-import org.apache.eagle.alert.coordination.model.Kafka2TupleMetadata;
-import org.apache.eagle.alert.coordination.model.internal.Topology;
-import org.apache.eagle.alert.engine.coordinator.PolicyDefinition;
-import org.apache.eagle.alert.engine.coordinator.Publishment;
-import org.apache.eagle.alert.engine.coordinator.StreamDefinition;
-import org.apache.eagle.alert.service.IMetadataServiceClient;
-import org.apache.eagle.alert.service.MetadataServiceClientImpl;
-import org.junit.Ignore;
-import org.junit.Test;
-
-import java.util.List;
-
-public class MetadataServiceTest {
-
-    @Ignore
-    @Test
-    public void test() {
-        System.out.println("loading metadatas...");
-        try {
-            System.setProperty("config.resource", "/application-integration.conf");
-            ConfigFactory.invalidateCaches();
-            Config config = ConfigFactory.load();
-            loadMetadatas("/", config);
-        } catch (Exception e) {
-            e.printStackTrace();
-        }
-        System.out.println("loading metadatas done!");
-    }
-
-    private void loadMetadatas(String base, Config config) throws Exception {
-        IMetadataServiceClient client = new MetadataServiceClientImpl(config);
-        client.clear();
-
-        List<Kafka2TupleMetadata> metadata = loadEntities(base + "datasources.json", Kafka2TupleMetadata.class);
-        client.addDataSources(metadata);
-
-        List<PolicyDefinition> policies = loadEntities(base + "policies.json", PolicyDefinition.class);
-        client.addPolicies(policies);
-
-        List<Publishment> pubs = loadEntities(base + "publishments.json", Publishment.class);
-        client.addPublishments(pubs);
-
-        List<StreamDefinition> defs = loadEntities(base + "streamdefinitions.json", StreamDefinition.class);
-        client.addStreamDefinitions(defs);
-
-        List<Topology> topos = loadEntities(base + "topologies.json", Topology.class);
-        client.addTopologies(topos);
-
-        client.close();
-    }
-
-    private  <T> List<T> loadEntities(String path, Class<T> tClz) throws Exception {
-        ObjectMapper om = new ObjectMapper();
-        JavaType type = CollectionType.construct(List.class, SimpleType.construct(tClz));
-        List<T> l = om.readValue(Integration1.class.getResourceAsStream(path), type);
-        return l;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/72a1501c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/e2e/SampleClient1.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/e2e/SampleClient1.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/e2e/SampleClient1.java
index 348bf78..6bff94b 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/e2e/SampleClient1.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/e2e/SampleClient1.java
@@ -28,6 +28,9 @@ import org.slf4j.LoggerFactory;
 
 import backtype.storm.utils.Utils;
 
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+
 /**
  * @since May 9, 2016
  *
@@ -60,7 +63,8 @@ public class SampleClient1 {
         long base = System.currentTimeMillis();
         AtomicLong msgCount = new AtomicLong();
 
-        try (KafkaProducer<String, String> proceduer = createProceduer()) {
+        Config config = ConfigFactory.load();
+        try (KafkaProducer<String, String> proceduer = createProceduer(config)) {
             while (true) {
                 int hostIndex = 6;
                 for (int i = 0; i < hostIndex; i++) {
@@ -108,12 +112,10 @@ public class SampleClient1 {
         return Pair.of(base, JsonUtils.writeValueAsString(e));
     }
 
-    public static KafkaProducer<String, String> createProceduer() {
-
+    public static KafkaProducer<String, String> createProceduer(Config config) {
+        String servers = config.getString("kafkaProducer.bootstrapServers");
         Properties configMap = new Properties();
-        // String broker_list = zkconfig.zkQuorum;
-        // TODO: replace boot strap servers with new workable server
-        configMap.put("bootstrap.servers", "localhost:9092");
+        configMap.put("bootstrap.servers", servers);
         // configMap.put("metadata.broker.list", broker_list);
         configMap.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
         configMap.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/72a1501c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/e2e/SampleClient2.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/e2e/SampleClient2.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/e2e/SampleClient2.java
index 06148cc..ad0079c 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/e2e/SampleClient2.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/e2e/SampleClient2.java
@@ -26,6 +26,9 @@ import org.apache.kafka.clients.producer.ProducerRecord;
 
 import backtype.storm.utils.Utils;
 
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+
 /**
  * @since May 10, 2016
  *
@@ -51,7 +54,6 @@ public class SampleClient2 {
         public String message;
         public String host;
     }
-    
 
     /**
      * @param args
@@ -60,8 +62,10 @@ public class SampleClient2 {
         AtomicLong base1 = new AtomicLong(System.currentTimeMillis());
         AtomicLong base2 = new AtomicLong(System.currentTimeMillis());
         AtomicLong count = new AtomicLong();
+        
+        Config config = ConfigFactory.load();
 
-        try (KafkaProducer<String, String> proceduer = SampleClient1.createProceduer()) {
+        try (KafkaProducer<String, String> proceduer = SampleClient1.createProceduer(config)) {
             while (true) {
                 nextUuid = String.format(instanceUuidTemp, UUID.randomUUID().toString());
                 nextReqId = String.format(reqIdTemp, UUID.randomUUID().toString());

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/72a1501c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/e2e/SampleClient4NoDataAlert.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/e2e/SampleClient4NoDataAlert.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/e2e/SampleClient4NoDataAlert.java
new file mode 100644
index 0000000..f0e0d80
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/e2e/SampleClient4NoDataAlert.java
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.alert.engine.e2e;
+
+import java.util.Properties;
+
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import backtype.storm.utils.Utils;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+
+/**
+ * Since 6/29/16.
+ */
+@SuppressWarnings({ "rawtypes", "unchecked"})
+public class SampleClient4NoDataAlert {
+    private static final Logger LOG = LoggerFactory.getLogger(SampleClient4NoDataAlert.class);
+    private static long currentTimestamp = 1467240000000L;
+    private static long interval = 3000L;
+    private static volatile boolean host1Muted = false;
+    private static volatile boolean host2Muted = false;
+    public static void main(String[] args) throws Exception {
+        System.setProperty("config.resource", "/nodata/application-nodata.conf");
+        ConfigFactory.invalidateCaches();
+
+        Config config = ConfigFactory.load();
+        KafkaProducer producer = createProducer(config);
+        ProducerRecord record = null;
+        Thread x = new MuteThread();
+        x.start();
+        while(true) {
+            if(!host1Muted) {
+                record = new ProducerRecord("noDataAlertTopic", createEvent("host1"));
+                producer.send(record);
+            }
+            if(!host2Muted) {
+                record = new ProducerRecord("noDataAlertTopic", createEvent("host2"));
+                producer.send(record);
+            }
+            record = new ProducerRecord("noDataAlertTopic", createEvent("host3"));
+            producer.send(record);
+            Utils.sleep(interval);
+            currentTimestamp += interval;
+        }
+    }
+
+    private static class MuteThread extends Thread{
+        @Override
+        public void run(){
+            try {
+                // sleep 10 seconds
+                Thread.sleep(10000);
+                // mute host1
+                LOG.info("mute host1");
+                host1Muted = true;
+                // sleep 70 seconds for triggering no data alert
+                LOG.info("try to sleep 70 seconds for triggering no data alert");
+                Thread.sleep(70000);
+                // unmute host1
+                LOG.info("unmute host1");
+                host1Muted = false;
+                Thread.sleep(10000);
+                // mute host2
+                LOG.info("mute host2");
+                host2Muted = true;
+                // sleep 70 seconds for triggering no data alert
+                LOG.info("try to sleep 70 seconds for triggering no data alert");
+                Thread.sleep(70000);
+                LOG.info("unmute host2");
+                host2Muted = false;
+            }catch(Exception ex){
+                ex.printStackTrace();
+            }
+        }
+    }
+
+    private static class NoDataEvent{
+        @JsonProperty
+        long timestamp;
+        @JsonProperty
+        String host;
+        @JsonProperty
+        double value;
+
+        public String toString(){
+            return "timestamp=" + timestamp + ",host=" + host + ",value=" + value;
+        }
+    }
+
+    private static String createEvent(String host) throws Exception{
+        NoDataEvent e = new NoDataEvent();
+        long expectTS = currentTimestamp + interval;
+        // adjust back 1 second random
+        long adjust = Math.round(2*Math.random());
+        e.timestamp = expectTS-adjust;
+        e.host = host;
+        e.value = 25.6;
+        LOG.info("sending event {} ",  e);
+        ObjectMapper mapper = new ObjectMapper();
+        String value = mapper.writeValueAsString(e);
+        return value;
+    }
+
+
+    public static KafkaProducer<String, String> createProducer(Config config) {
+        String servers = config.getString("kafkaProducer.bootstrapServers");
+        Properties configMap = new Properties();
+        configMap.put("bootstrap.servers", servers);
+        configMap.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
+        configMap.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
+        configMap.put("request.required.acks", "1");
+        configMap.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
+        configMap.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
+        KafkaProducer<String, String> proceduer = new KafkaProducer<String, String>(configMap);
+        return proceduer;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/72a1501c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/nodata/TestDistinctValuesInTimeWindow.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/nodata/TestDistinctValuesInTimeWindow.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/nodata/TestDistinctValuesInTimeWindow.java
new file mode 100644
index 0000000..f97b1a8
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/nodata/TestDistinctValuesInTimeWindow.java
@@ -0,0 +1,71 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.alert.engine.nodata;
+
+import java.util.Iterator;
+import java.util.Map;
+import java.util.SortedMap;
+import java.util.TreeMap;
+
+import org.apache.eagle.alert.engine.evaluator.impl.DistinctValuesInTimeWindow;
+import org.junit.Test;
+
+/**
+ * Since 6/28/16.
+ */
+public class TestDistinctValuesInTimeWindow {
+    @Test
+    public void test(){
+        DistinctValuesInTimeWindow window = new DistinctValuesInTimeWindow(60*1000);
+        window.send("1", 0);
+        window.send("2", 1000);
+        window.send("3", 1000);
+        window.send("1", 30000);
+        window.send("2", 50000);
+        window.send("1", 62000);
+        Map<Object, Long> values = window.distinctValues();
+        System.out.println(values);
+    }
+
+    @Test
+    public void testSort(){
+        SortedMap<DistinctValuesInTimeWindow.ValueAndTime, DistinctValuesInTimeWindow.ValueAndTime> timeSortedMap =
+                new TreeMap<>(new DistinctValuesInTimeWindow.ValueAndTimeComparator());
+        DistinctValuesInTimeWindow.ValueAndTime vt1 = new DistinctValuesInTimeWindow.ValueAndTime("1", 0);
+        timeSortedMap.put(vt1, vt1);
+        DistinctValuesInTimeWindow.ValueAndTime vt2 = new DistinctValuesInTimeWindow.ValueAndTime("2", 1000);
+        timeSortedMap.put(vt2, vt2);
+        DistinctValuesInTimeWindow.ValueAndTime vt3 = new DistinctValuesInTimeWindow.ValueAndTime("3", 1000);
+        timeSortedMap.put(vt3, vt3);
+        timeSortedMap.remove(new DistinctValuesInTimeWindow.ValueAndTime("1", 0));
+        DistinctValuesInTimeWindow.ValueAndTime vt4 = new DistinctValuesInTimeWindow.ValueAndTime("1", 30000);
+        timeSortedMap.put(vt4, vt4);
+        Iterator<?> it = timeSortedMap.entrySet().iterator();
+        while(it.hasNext()){
+            System.out.println(it.next());
+        }
+        timeSortedMap.remove(new DistinctValuesInTimeWindow.ValueAndTime("2", 1000));
+        DistinctValuesInTimeWindow.ValueAndTime vt5 = new DistinctValuesInTimeWindow.ValueAndTime("2", 50000);
+        timeSortedMap.put(vt5, vt5);
+        DistinctValuesInTimeWindow.ValueAndTime vt6 = new DistinctValuesInTimeWindow.ValueAndTime("1", 62000);
+        timeSortedMap.put(vt6, vt6);
+         it = timeSortedMap.entrySet().iterator();
+        while(it.hasNext()){
+            System.out.println(it.next());
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/72a1501c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/nodata/TestEventTable.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/nodata/TestEventTable.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/nodata/TestEventTable.java
new file mode 100644
index 0000000..4149e17
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/nodata/TestEventTable.java
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.alert.engine.nodata;
+
+import org.junit.Test;
+import org.wso2.siddhi.core.ExecutionPlanRuntime;
+import org.wso2.siddhi.core.SiddhiManager;
+import org.wso2.siddhi.core.event.Event;
+import org.wso2.siddhi.core.stream.output.StreamCallback;
+import org.wso2.siddhi.core.util.EventPrinter;
+
+/**
+ * Since 6/27/16.
+ */
+public class TestEventTable {
+    @Test
+    public void test() throws Exception{
+        ExecutionPlanRuntime runtime = new SiddhiManager().createExecutionPlanRuntime(
+                        "define stream expectStream (key string, src string);"+
+                                "define stream appearStream (key string, src string);"+
+                                "define table expectTable (key string, src string);"+
+                        "from expectStream insert into expectTable;" +
+                                "from appearStream[(expectTable.key==key) in expectTable] insert into outputStream;"
+        );
+
+        runtime.addCallback("outputStream", new StreamCallback() {
+            @Override
+            public void receive(Event[] events) {
+                EventPrinter.print(events);
+            }
+        });
+
+        runtime.start();
+        runtime.getInputHandler("expectStream").send(System.currentTimeMillis(), new Object[]{"host1","expectStream"});
+        Thread.sleep(2000);
+        runtime.getInputHandler("appearStream").send(System.currentTimeMillis(), new Object[]{"host2","expectStream"});
+        Thread.sleep(2000);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/72a1501c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/nodata/TestNoDataAlert.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/nodata/TestNoDataAlert.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/nodata/TestNoDataAlert.java
new file mode 100644
index 0000000..569a3b0
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/nodata/TestNoDataAlert.java
@@ -0,0 +1,83 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.alert.engine.nodata;
+
+import org.junit.Test;
+import org.wso2.siddhi.core.ExecutionPlanRuntime;
+import org.wso2.siddhi.core.SiddhiManager;
+import org.wso2.siddhi.core.event.Event;
+import org.wso2.siddhi.core.stream.output.StreamCallback;
+import org.wso2.siddhi.core.util.EventPrinter;
+
+/**
+ * Since 6/27/16.
+ */
+public class TestNoDataAlert {
+    @Test
+    public void test() throws Exception{
+        String[] expectHosts = new String[]{"host_1","host_2","host_3","host_4","host_5","host_6","host_7","host_8"};
+//        String[] appearHosts = new String[]{"host_6","host_7","host_8"};
+//        String[] noDataHosts = new String[]{"host_1","host_2","host_3","host_4","host_5"};
+
+        ExecutionPlanRuntime runtime = new SiddhiManager().createExecutionPlanRuntime(
+                "define stream appearStream (key string, src string);"+
+                        "define stream expectStream (key string, src string);"+
+                        "define table expectTable (key string, src string);"+
+                        "define trigger fiveSecTriggerStream at every 1 sec;"+
+                        "define trigger initAppearTriggerStream at 'start';"+
+                        "from expectStream insert into expectTable;"+
+                        "from fiveSecTriggerStream join expectTable insert into triggerExpectStream;"+
+                        "from initAppearTriggerStream join expectTable insert into initAppearStream;"
+//                        "from triggerExpectStream as l left outer join appearStream#window.time(5 sec) as r on l.key == r.key select l.key as k1,r.key as k2 insert current events into joinStream;" +
+//                        "from joinStream[k2 is null] select k1 insert current events into missingStream;"
+        );
+
+//        ExecutionPlanRuntime runtime = new SiddhiManager().createExecutionPlanRuntime(
+//                "define stream appearStream (key string, src string);"+
+//                        "define stream expectStream (key string, src string);"+
+//                        "define table expectTable (key string, src string);"+
+//                        "from expectStream insert into expectTable;"+
+//                        "from appearStream#window.time(10 sec)  as l right outer join expectTable as r on l.key == r.key select r.key as k2, l.key as k1 insert current events into joinStream;" +
+//                        "from joinStream[k1 is null] select k2 insert current events into missingStream;"
+////                "from joinStream insert into missingStream;"
+//
+//        );
+
+        runtime.addCallback("initAppearStream", new StreamCallback() {
+            @Override
+            public void receive(Event[] events) {
+                EventPrinter.print(events);
+            }
+        });
+
+        runtime.start();
+        for(String host: expectHosts) {
+            runtime.getInputHandler("expectStream").send(System.currentTimeMillis(), new Object[]{host,"expectStream"});
+        }
+
+//        for(String host:appearHosts) {
+//            runtime.getInputHandler("appearStream").send(System.currentTimeMillis(), new Object[]{host,"inStream"});
+//        }
+
+        Thread.sleep(5000);
+
+//        for(String host:appearHosts) {
+//            runtime.getInputHandler("appearStream").send(System.currentTimeMillis(), new Object[]{host,"inStream"});
+//        }
+//        Thread.sleep(10000);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/72a1501c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/nodata/TestNoDataPolicyHandler.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/nodata/TestNoDataPolicyHandler.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/nodata/TestNoDataPolicyHandler.java
new file mode 100644
index 0000000..6c48def
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/nodata/TestNoDataPolicyHandler.java
@@ -0,0 +1,134 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.alert.engine.nodata;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.eagle.alert.engine.Collector;
+import org.apache.eagle.alert.engine.coordinator.PolicyDefinition;
+import org.apache.eagle.alert.engine.coordinator.StreamColumn;
+import org.apache.eagle.alert.engine.coordinator.StreamDefinition;
+import org.apache.eagle.alert.engine.evaluator.PolicyHandlerContext;
+import org.apache.eagle.alert.engine.evaluator.nodata.NoDataPolicyHandler;
+import org.apache.eagle.alert.engine.model.AlertStreamEvent;
+import org.apache.eagle.alert.engine.model.StreamEvent;
+import org.junit.Assert;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Since 6/29/16.
+ */
+public class TestNoDataPolicyHandler {
+    private static final Logger LOG = LoggerFactory.getLogger(TestNoDataPolicyHandler.class);
+    private static final String inputStream = "testInputStream";
+    private static final String outputStream = "testOutputStream";
+
+    @Test
+    public void test() throws Exception{
+        test(buildPolicyDef_provided());
+        test(buildPolicyDef_dynamic());
+    }
+
+    @SuppressWarnings("unchecked")
+    public void test(PolicyDefinition pd) throws Exception{
+        Map<String, StreamDefinition> sds = new HashMap<>();
+        StreamDefinition sd = buildStreamDef();
+        sds.put("testInputStream", sd);
+        NoDataPolicyHandler handler = new NoDataPolicyHandler(sds);
+
+        PolicyHandlerContext context = new PolicyHandlerContext();
+        context.setPolicyDefinition(pd);
+        handler.prepare(new TestCollector(), context);
+
+        handler.send(buildStreamEvt(0, "host1", 12.5));
+        handler.send(buildStreamEvt(0, "host2", 12.6));
+        handler.send(buildStreamEvt(100, "host1", 20.9));
+        handler.send(buildStreamEvt(120, "host2", 22.1));
+        handler.send(buildStreamEvt(4000, "host2", 22.1));
+        handler.send(buildStreamEvt(50000, "host2", 22.1));
+        handler.send(buildStreamEvt(60150, "host2", 22.3));
+        handler.send(buildStreamEvt(60450, "host2", 22.9));
+        handler.send(buildStreamEvt(75000, "host1", 41.6));
+        handler.send(buildStreamEvt(85000, "host2", 45.6));
+    }
+
+    @SuppressWarnings("rawtypes")
+    private static class TestCollector implements Collector{
+        @Override
+        public void emit(Object o) {
+            AlertStreamEvent e = (AlertStreamEvent)o;
+            Object[] data = e.getData();
+            Assert.assertEquals("host2", data[1]);
+            LOG.info(e.toString());
+        }
+    }
+
+    private PolicyDefinition buildPolicyDef_provided(){
+        PolicyDefinition pd = new PolicyDefinition();
+        PolicyDefinition.Definition def = new PolicyDefinition.Definition();
+        def.setValue("PT1M,provided,1,host,host1,host2");
+        def.setType("string");
+        pd.setDefinition(def);
+        pd.setInputStreams(Arrays.asList(inputStream));
+        pd.setOutputStreams(Arrays.asList(outputStream));
+        pd.setName("nodataalert-test");
+        return pd;
+    }
+
+    private PolicyDefinition buildPolicyDef_dynamic(){
+        PolicyDefinition pd = new PolicyDefinition();
+        PolicyDefinition.Definition def = new PolicyDefinition.Definition();
+        def.setValue("PT1M,dynamic,1,host");
+        def.setType("string");
+        pd.setDefinition(def);
+        pd.setInputStreams(Arrays.asList(inputStream));
+        pd.setOutputStreams(Arrays.asList(outputStream));
+        pd.setName("nodataalert-test");
+        return pd;
+    }
+    private StreamDefinition buildStreamDef(){
+        StreamDefinition sd = new StreamDefinition();
+        StreamColumn tsColumn = new StreamColumn();
+        tsColumn.setName("timestamp");
+        tsColumn.setType(StreamColumn.Type.LONG);
+
+        StreamColumn hostColumn = new StreamColumn();
+        hostColumn.setName("host");
+        hostColumn.setType(StreamColumn.Type.STRING);
+
+        StreamColumn valueColumn = new StreamColumn();
+        valueColumn.setName("value");
+        valueColumn.setType(StreamColumn.Type.DOUBLE);
+
+        sd.setColumns(Arrays.asList(tsColumn, hostColumn, valueColumn));
+        sd.setDataSource("testDataSource");
+        sd.setStreamId("testStreamId");
+        return sd;
+    }
+
+    private StreamEvent buildStreamEvt(long ts, String host, double value){
+        StreamEvent e = new StreamEvent();
+        e.setData(new Object[]{ts, host, value});
+        e.setStreamId(inputStream);
+        e.setTimestamp(ts);
+        return e;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/72a1501c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/router/TestAlertPublisherBolt.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/router/TestAlertPublisherBolt.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/router/TestAlertPublisherBolt.java
index af79f96..23ddd69 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/router/TestAlertPublisherBolt.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/router/TestAlertPublisherBolt.java
@@ -18,10 +18,10 @@
 
 package org.apache.eagle.alert.engine.router;
 
-import com.fasterxml.jackson.databind.JavaType;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.databind.type.CollectionType;
-import com.fasterxml.jackson.databind.type.SimpleType;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
 import org.apache.eagle.alert.coordination.model.PublishSpec;
 import org.apache.eagle.alert.engine.coordinator.PolicyDefinition;
 import org.apache.eagle.alert.engine.coordinator.Publishment;
@@ -35,24 +35,25 @@ import org.junit.Assert;
 import org.junit.Ignore;
 import org.junit.Test;
 
+import com.fasterxml.jackson.databind.JavaType;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.type.CollectionType;
+import com.fasterxml.jackson.databind.type.SimpleType;
 import com.typesafe.config.Config;
 import com.typesafe.config.ConfigFactory;
 
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
 /**
  * @Since 5/14/16.
  */
 public class TestAlertPublisherBolt {
 
+    @SuppressWarnings("rawtypes")
     @Ignore
     @Test
     public void test() {
         Config config = ConfigFactory.load("application-test.conf");
         AlertPublisher publisher = new AlertPublisherImpl("alertPublishBolt");
-        publisher.init(config);
+        publisher.init(config, new HashMap());
         PublishSpec spec = MetadataSerDeser.deserialize(getClass().getResourceAsStream("/testPublishSpec.json"), PublishSpec.class);
         publisher.onPublishChange(spec.getPublishments(), null, null, null);
         AlertStreamEvent event = create("testAlertStream");
@@ -95,7 +96,7 @@ public class TestAlertPublisherBolt {
     @Test
     public void testAlertPublisher() throws Exception {
         AlertPublisher alertPublisher = new AlertPublisherImpl("alert-publisher-test");
-        List<Publishment> oldPubs = loadEntities("/publishments.json", Publishment.class);
+        List<Publishment> oldPubs = loadEntities("/publishments1.json", Publishment.class);
         List<Publishment> newPubs = loadEntities("/publishments2.json", Publishment.class);
         alertPublisher.onPublishChange(oldPubs, null, null, null);
         alertPublisher.onPublishChange(null, null, newPubs, oldPubs);

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/72a1501c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/runner/TestStreamRouterBolt.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/runner/TestStreamRouterBolt.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/runner/TestStreamRouterBolt.java
index 8c048cb..2de2073 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/runner/TestStreamRouterBolt.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/runner/TestStreamRouterBolt.java
@@ -16,16 +16,19 @@
  */
 package org.apache.eagle.alert.engine.runner;
 
-import backtype.storm.metric.api.MultiCountMetric;
-import backtype.storm.task.GeneralTopologyContext;
-import backtype.storm.task.IOutputCollector;
-import backtype.storm.task.OutputCollector;
-import backtype.storm.task.TopologyContext;
-import backtype.storm.tuple.Fields;
-import backtype.storm.tuple.Tuple;
-import backtype.storm.tuple.TupleImpl;
-import com.typesafe.config.Config;
-import com.typesafe.config.ConfigFactory;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
 import org.apache.eagle.alert.coordination.model.PolicyWorkerQueue;
 import org.apache.eagle.alert.coordination.model.RouterSpec;
 import org.apache.eagle.alert.coordination.model.StreamRouterSpec;
@@ -45,12 +48,17 @@ import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.IOException;
-import java.util.*;
+import backtype.storm.metric.api.MultiCountMetric;
+import backtype.storm.task.GeneralTopologyContext;
+import backtype.storm.task.IOutputCollector;
+import backtype.storm.task.OutputCollector;
+import backtype.storm.task.TopologyContext;
+import backtype.storm.tuple.Fields;
+import backtype.storm.tuple.Tuple;
+import backtype.storm.tuple.TupleImpl;
 
-import static org.mockito.Matchers.any;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
 
 public class TestStreamRouterBolt {
     private final static Logger LOG = LoggerFactory.getLogger(TestStreamRouterBolt.class);

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/72a1501c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/serialization/PartitionedEventSerializerTest.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/serialization/PartitionedEventSerializerTest.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/serialization/PartitionedEventSerializerTest.java
index 0347d50..a756ebe 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/serialization/PartitionedEventSerializerTest.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/serialization/PartitionedEventSerializerTest.java
@@ -16,14 +16,9 @@
  */
 package org.apache.eagle.alert.engine.serialization;
 
-import backtype.storm.serialization.DefaultKryoFactory;
-import backtype.storm.serialization.DefaultSerializationDelegate;
-import com.esotericsoftware.kryo.*;
-import com.esotericsoftware.kryo.io.Input;
-import com.esotericsoftware.kryo.io.Output;
-import com.google.common.io.ByteArrayDataInput;
-import com.google.common.io.ByteArrayDataOutput;
-import com.google.common.io.ByteStreams;
+import java.io.IOException;
+import java.util.BitSet;
+
 import org.apache.commons.lang.time.StopWatch;
 import org.apache.eagle.alert.engine.mock.MockSampleMetadataFactory;
 import org.apache.eagle.alert.engine.model.PartitionedEvent;
@@ -35,12 +30,20 @@ import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.IOException;
-import java.util.BitSet;
+import backtype.storm.serialization.DefaultKryoFactory;
+import backtype.storm.serialization.DefaultSerializationDelegate;
+
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.Output;
+import com.google.common.io.ByteArrayDataInput;
+import com.google.common.io.ByteArrayDataOutput;
+import com.google.common.io.ByteStreams;
 
 
 public class PartitionedEventSerializerTest {
     private final static Logger LOG = LoggerFactory.getLogger(PartitionedEventSerializerTest.class);
+    @SuppressWarnings("deprecation")
     @Test
     public void testPartitionEventSerialization() throws IOException {
         PartitionedEvent partitionedEvent = MockSampleMetadataFactory.createPartitionedEventGroupedByName("sampleStream",System.currentTimeMillis());;
@@ -76,6 +79,7 @@ public class PartitionedEventSerializerTest {
         Assert.assertEquals(partitionedEvent,kryoDeserializedEvent);
         LOG.info("\nCached Stream:{}\nCompressed Cached Stream :{}\nCached Stream + Cached Partition: {}\nJava Native: {}\nKryo: {}\nKryo + Cached Stream: {}\nKryo + Cached Stream + Cached Partition: {}",serializedBytes.length,serializedBytesCompressed.length,serializedBytes2.length,javaSerialization.length,kryoBytes.length,kryoSerialize(serializedBytes).length,kryoSerialize(serializedBytes2).length);
     }
+    @SuppressWarnings("deprecation")
     @Test
     public void testPartitionEventSerializationEfficiency() throws IOException {
         PartitionedEvent partitionedEvent = MockSampleMetadataFactory.createPartitionedEventGroupedByName("sampleStream",System.currentTimeMillis());;