You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@eagle.apache.org by ha...@apache.org on 2016/06/02 07:07:49 UTC

[10/46] incubator-eagle git commit: [EAGLE-325] Initialize next-gen alert engine code on branch-0.5

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/java/org/apache/eagle/alert/engine/mock/MockSampleMetadataFactory.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/java/org/apache/eagle/alert/engine/mock/MockSampleMetadataFactory.java b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/java/org/apache/eagle/alert/engine/mock/MockSampleMetadataFactory.java
deleted file mode 100644
index 97e6310..0000000
--- a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/java/org/apache/eagle/alert/engine/mock/MockSampleMetadataFactory.java
+++ /dev/null
@@ -1,266 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.engine.mock;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Random;
-import java.util.concurrent.ThreadLocalRandom;
-
-import org.apache.eagle.alert.coordination.model.PolicyWorkerQueue;
-import org.apache.eagle.alert.coordination.model.StreamRouterSpec;
-import org.apache.eagle.alert.coordination.model.WorkSlot;
-import org.apache.eagle.alert.engine.coordinator.PolicyDefinition;
-import org.apache.eagle.alert.engine.coordinator.StreamColumn;
-import org.apache.eagle.alert.engine.coordinator.StreamDefinition;
-import org.apache.eagle.alert.engine.coordinator.StreamDefinitionNotFoundException;
-import org.apache.eagle.alert.engine.coordinator.StreamPartition;
-import org.apache.eagle.alert.engine.coordinator.StreamSortSpec;
-import org.apache.eagle.alert.engine.evaluator.PolicyStreamHandlers;
-import org.apache.eagle.alert.engine.model.PartitionedEvent;
-import org.apache.eagle.alert.engine.model.StreamEvent;
-
-@SuppressWarnings("serial")
-public class MockSampleMetadataFactory {
-    private static MockStreamMetadataService mockStreamMetadataServiceInstance = null;
-    public static MockStreamMetadataService createSingletonMetadataServiceWithSample(){
-        if(mockStreamMetadataServiceInstance!=null) return mockStreamMetadataServiceInstance;
-        mockStreamMetadataServiceInstance = new MockStreamMetadataService();
-        mockStreamMetadataServiceInstance.registerStream("sampleStream",createSampleStreamDefinition("sampleStream"));
-        mockStreamMetadataServiceInstance.registerStream("sampleStream_1",createSampleStreamDefinition("sampleStream_1"));
-        mockStreamMetadataServiceInstance.registerStream("sampleStream_2",createSampleStreamDefinition("sampleStream_2"));
-        mockStreamMetadataServiceInstance.registerStream("sampleStream_3",createSampleStreamDefinition("sampleStream_3"));
-        mockStreamMetadataServiceInstance.registerStream("sampleStream_4",createSampleStreamDefinition("sampleStream_4"));
-        return mockStreamMetadataServiceInstance;
-    }
-
-    public static StreamDefinition createSampleStreamDefinition(String streamId){
-        StreamDefinition sampleStreamDefinition = new StreamDefinition();
-        sampleStreamDefinition.setStreamId(streamId);
-        sampleStreamDefinition.setTimeseries(true);
-        sampleStreamDefinition.setValidate(true);
-        sampleStreamDefinition.setDescription("Schema for "+streamId);
-        List<StreamColumn> streamColumns = new ArrayList<>();
-
-        streamColumns.add(new StreamColumn.Builder().name("name").type(StreamColumn.Type.STRING).build());
-        streamColumns.add(new StreamColumn.Builder().name("host").type(StreamColumn.Type.STRING).build());
-        streamColumns.add(new StreamColumn.Builder().name("flag").type(StreamColumn.Type.BOOL).build());
-        streamColumns.add(new StreamColumn.Builder().name("timestamp").type(StreamColumn.Type.LONG).build());
-        streamColumns.add(new StreamColumn.Builder().name("value").type(StreamColumn.Type.DOUBLE).build());
-//        streamColumns.add(new StreamColumn.Builder().name("value1").type(StreamColumn.Type.DOUBLE).build());
-//        streamColumns.add(new StreamColumn.Builder().name("value2").type(StreamColumn.Type.DOUBLE).build());
-//        streamColumns.add(new StreamColumn.Builder().name("value3").type(StreamColumn.Type.DOUBLE).build());
-//        streamColumns.add(new StreamColumn.Builder().name("value4").type(StreamColumn.Type.DOUBLE).build());
-//        streamColumns.add(new StreamColumn.Builder().name("value5").type(StreamColumn.Type.DOUBLE).build());
-        sampleStreamDefinition.setColumns(streamColumns);
-        return sampleStreamDefinition;
-    }
-
-    /**
-     * By default window period is: PT1m
-     *
-     * @param streamId
-     * @return
-     */
-    public static StreamSortSpec createSampleStreamSortSpec(String streamId){
-        StreamSortSpec streamSortSpec = new StreamSortSpec();
-//        streamSortSpec.setColumn("timestamp");
-//        streamSortSpec.setStreamId(streamId);
-        streamSortSpec.setWindowMargin(1000);
-        streamSortSpec.setWindowPeriod("PT1m");
-        return streamSortSpec;
-    }
-
-    public static StreamSortSpec createSampleStreamSortSpec(String streamId,String period,int margin){
-        StreamSortSpec streamSortSpec = new StreamSortSpec();
-//        streamSortSpec.setColumn("timestamp");
-//        streamSortSpec.setStreamId(streamId);
-        streamSortSpec.setWindowMargin(margin);
-        streamSortSpec.setWindowPeriod(period);
-        return streamSortSpec;
-    }
-
-    /**
-     * Policy: from sampleStream_1[name == "cpu" and value > 50.0] select name, host, flag, value insert into outputStream;
-     *
-     * @return PolicyDefinition[from sampleStream_1[name == "cpu" and value > 50.0] select name, host, flag, value insert into outputStream;]
-     */
-    public static PolicyDefinition createSingleMetricSamplePolicy(){
-        String definePolicy = "from sampleStream_1[name == \"cpu\" and value > 50.0] select name, host, flag, value insert into outputStream;";
-        PolicyDefinition policyDefinition = new PolicyDefinition();
-        policyDefinition.setName("SamplePolicyForTest");
-        policyDefinition.setInputStreams(Arrays.asList("sampleStream_1"));
-        policyDefinition.setOutputStreams(Arrays.asList("outputStream"));
-        policyDefinition.setDefinition(new PolicyDefinition.Definition(
-                PolicyStreamHandlers.SIDDHI_ENGINE,
-                definePolicy
-        ));
-        policyDefinition.setPartitionSpec(Arrays.asList(createSampleStreamGroupbyPartition("sampleStream_1",Arrays.asList("name"))));
-        return policyDefinition;
-    }
-
-    public static StreamPartition createSampleStreamGroupbyPartition(String streamId, List<String> groupByField){
-        StreamPartition streamPartition = new StreamPartition();
-        streamPartition.setStreamId(streamId);
-        streamPartition.setColumns(new ArrayList<>(groupByField));
-        streamPartition.setType(StreamPartition.Type.GROUPBY);
-        StreamSortSpec streamSortSpec = new StreamSortSpec();
-        streamSortSpec.setWindowPeriod("PT30m");
-        streamSortSpec.setWindowMargin(10000);
-        streamPartition.setSortSpec(streamSortSpec);
-        return streamPartition;
-    }
-
-    public static StreamRouterSpec createSampleStreamRouteSpec(String streamId, String groupByField, List<String> targetEvaluatorIds){
-        List<WorkSlot> slots = Arrays.asList(targetEvaluatorIds.stream().map((t) -> {
-            return new WorkSlot("sampleTopology", t);
-        }).toArray(WorkSlot[]::new));
-        StreamRouterSpec streamRouteSpec = new StreamRouterSpec();
-        streamRouteSpec.setStreamId(streamId);
-        streamRouteSpec.setPartition(createSampleStreamGroupbyPartition(streamId,Arrays.asList(groupByField)));
-        streamRouteSpec.setTargetQueue(Arrays.asList(new PolicyWorkerQueue(slots)));
-        return streamRouteSpec;
-    }
-
-    public static StreamRouterSpec createSampleStreamRouteSpec(List<String> targetEvaluatorIds){
-        return createSampleStreamRouteSpec("sampleStream_1","name",targetEvaluatorIds);
-    }
-
-    /**
-     * GROUPBY_sampleStream_1_ON_name
-     *
-     * @param targetEvaluatorIds
-     * @return
-     */
-    public static StreamRouterSpec createRouteSpec_GROUP_sampleStream_1_BY_name(List<String> targetEvaluatorIds){
-        return createSampleStreamRouteSpec("sampleStream_1","name",targetEvaluatorIds);
-    }
-
-    public static StreamRouterSpec createRouteSpec_GROUP_sampleStream_2_BY_name(List<String> targetEvaluatorIds){
-        return createSampleStreamRouteSpec("sampleStream_2","name",targetEvaluatorIds);
-    }
-
-    public static PartitionedEvent createSimpleStreamEvent()  {
-        StreamEvent event = null;
-        try {
-            event = StreamEvent.Builder()
-                .schema(MockSampleMetadataFactory.createSingletonMetadataServiceWithSample().getStreamDefinition("sampleStream_1"))
-                .streamId("sampleStream_1")
-                .timestamep(System.currentTimeMillis())
-                .attributes(new HashMap<String,Object>(){{
-                    put("name","cpu");
-                    put("value",60.0);
-                    put("unknown","unknown column value");
-                }}).build();
-        } catch (StreamDefinitionNotFoundException e) {
-            e.printStackTrace();
-        }
-        PartitionedEvent pEvent = new PartitionedEvent();
-        pEvent.setEvent(event);
-        return pEvent;
-    }
-
-    private final static String[] SAMPLE_STREAM_NAME_OPTIONS=new String[]{
-            "cpu","memory","disk","network"
-    };
-
-    private final static String[] SAMPLE_STREAM_HOST_OPTIONS =new String[]{
-            "localhost_1","localhost_2","localhost_3","localhost_4"
-    };
-
-    private final static Boolean[] SAMPLE_STREAM_FLAG_OPTIONS=new Boolean[]{
-            true,false
-    };
-
-    private final static Double[] SAMPLE_STREAM_VALUE_OPTIONS=new Double[]{
-            -0.20, 40.4,50.5,60.6,10000.1
-    };
-    private final static String[] SAMPLE_STREAM_ID_OPTIONS=new String[]{
-            "sampleStream_1","sampleStream_2","sampleStream_3","sampleStream_4",
-    };
-    private final static Random RANDOM = ThreadLocalRandom.current();
-
-    public static StreamEvent createRandomStreamEvent()  {
-        return createRandomStreamEvent(SAMPLE_STREAM_ID_OPTIONS[RANDOM.nextInt(SAMPLE_STREAM_ID_OPTIONS.length)]);
-    }
-
-    public static StreamEvent createRandomStreamEvent(String streamId)  {
-        return createRandomStreamEvent(streamId,System.currentTimeMillis());
-    }
-
-    private final static Long[] TIME_DELTA_OPTIONS = new Long[]{
-            -30000L, -10000L, -5000L, -1000L, 0L, 1000L, 5000L, 10000L, 30000L
-    };
-
-    public static StreamEvent createRandomOutOfTimeOrderStreamEvent(String streamId)  {
-        StreamEvent event = createRandomStreamEvent(streamId);
-        event.setTimestamp(System.currentTimeMillis()+TIME_DELTA_OPTIONS[RANDOM.nextInt(TIME_DELTA_OPTIONS.length)]);
-        return event;
-    }
-
-
-    public static PartitionedEvent createRandomOutOfTimeOrderEventGroupedByName(String streamId)  {
-        StreamEvent event = createRandomStreamEvent(streamId);
-        event.setTimestamp(System.currentTimeMillis()+TIME_DELTA_OPTIONS[RANDOM.nextInt(TIME_DELTA_OPTIONS.length)]);
-        return new PartitionedEvent(event,createSampleStreamGroupbyPartition(streamId,Arrays.asList("name")),event.getData()[0].hashCode());
-    }
-
-    public static PartitionedEvent createPartitionedEventGroupedByName(String streamId,long timestamp)  {
-        StreamEvent event = createRandomStreamEvent(streamId);
-        event.setTimestamp(timestamp);
-        return new PartitionedEvent(event,createSampleStreamGroupbyPartition(streamId,Arrays.asList("name")),event.getData()[0].hashCode());
-    }
-
-    public static PartitionedEvent createRandomSortedEventGroupedByName(String streamId)  {
-        StreamEvent event = createRandomStreamEvent(streamId);
-        event.setTimestamp(System.currentTimeMillis());
-        return new PartitionedEvent(event,createSampleStreamGroupbyPartition(streamId,Arrays.asList("name")),event.getData()[0].hashCode());
-    }
-
-    public static StreamEvent createRandomStreamEvent(String streamId, long timestamp)  {
-        StreamEvent event;
-        try {
-            event = StreamEvent.Builder()
-                    .schema(MockSampleMetadataFactory.createSingletonMetadataServiceWithSample().getStreamDefinition(streamId))
-                    .streamId(streamId)
-                    .timestamep(timestamp)
-                    .attributes(new HashMap<String,Object>(){{
-                        put("name",SAMPLE_STREAM_NAME_OPTIONS[RANDOM.nextInt(SAMPLE_STREAM_NAME_OPTIONS.length)]);
-                        put("value", SAMPLE_STREAM_VALUE_OPTIONS[RANDOM.nextInt(SAMPLE_STREAM_VALUE_OPTIONS.length)]);
-                        put("host", SAMPLE_STREAM_HOST_OPTIONS[RANDOM.nextInt(SAMPLE_STREAM_HOST_OPTIONS.length)]);
-                        put("flag",SAMPLE_STREAM_FLAG_OPTIONS[RANDOM.nextInt(SAMPLE_STREAM_FLAG_OPTIONS.length)]);
-//                        put("value1", SAMPLE_STREAM_VALUE_OPTIONS[RANDOM.nextInt(SAMPLE_STREAM_VALUE_OPTIONS.length)]);
-//                        put("value2", SAMPLE_STREAM_VALUE_OPTIONS[RANDOM.nextInt(SAMPLE_STREAM_VALUE_OPTIONS.length)]);
-//                        put("value3", SAMPLE_STREAM_VALUE_OPTIONS[RANDOM.nextInt(SAMPLE_STREAM_VALUE_OPTIONS.length)]);
-//                        put("value4", SAMPLE_STREAM_VALUE_OPTIONS[RANDOM.nextInt(SAMPLE_STREAM_VALUE_OPTIONS.length)]);
-//                        put("value5", SAMPLE_STREAM_VALUE_OPTIONS[RANDOM.nextInt(SAMPLE_STREAM_VALUE_OPTIONS.length)]);
-                        put("unknown","unknown column value");
-                    }}).build();
-        } catch (StreamDefinitionNotFoundException e) {
-            throw new IllegalStateException(e.getMessage(),e);
-        }
-        return event;
-    }
-
-    public static PartitionedEvent createRandomPartitionedEvent(String streamId, long timestamp)  {
-        StreamEvent event = createRandomStreamEvent(streamId,timestamp);
-        PartitionedEvent partitionedEvent = new PartitionedEvent(event,createSampleStreamGroupbyPartition(streamId,Arrays.asList("name")),event.getData()[0].hashCode());
-        return partitionedEvent;
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/java/org/apache/eagle/alert/engine/mock/MockStreamCollector.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/java/org/apache/eagle/alert/engine/mock/MockStreamCollector.java b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/java/org/apache/eagle/alert/engine/mock/MockStreamCollector.java
deleted file mode 100755
index fa07701..0000000
--- a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/java/org/apache/eagle/alert/engine/mock/MockStreamCollector.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.engine.mock;
-
-import java.util.LinkedList;
-import java.util.List;
-
-import org.apache.eagle.alert.engine.Collector;
-import org.apache.eagle.alert.engine.model.AlertStreamEvent;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class MockStreamCollector implements Collector<AlertStreamEvent> {
-    @SuppressWarnings("unused")
-    private final static Logger LOG = LoggerFactory.getLogger(MockStreamCollector.class);
-    private List<AlertStreamEvent> cache;
-    public MockStreamCollector(){
-        cache = new LinkedList<>();
-    }
-
-    public void emit(AlertStreamEvent event) {
-        cache.add(event);
-        // LOG.info("PartitionedEventCollector received: {}",event);
-    }
-
-    public void clear(){
-        cache.clear();
-    }
-
-    public List<AlertStreamEvent> get(){
-        return cache;
-    }
-
-    public int size(){
-        return cache.size();
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/java/org/apache/eagle/alert/engine/mock/MockStreamMetadataService.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/java/org/apache/eagle/alert/engine/mock/MockStreamMetadataService.java b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/java/org/apache/eagle/alert/engine/mock/MockStreamMetadataService.java
deleted file mode 100644
index 2119ecd..0000000
--- a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/java/org/apache/eagle/alert/engine/mock/MockStreamMetadataService.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.engine.mock;
-
-import java.util.HashMap;
-import java.util.Map;
-
-import org.apache.eagle.alert.engine.coordinator.StreamDefinition;
-import org.apache.eagle.alert.engine.coordinator.StreamDefinitionNotFoundException;
-
-public class MockStreamMetadataService{
-    private final Map<String,StreamDefinition> streamSchemaMap = new HashMap<>();
-
-    public StreamDefinition getStreamDefinition(String streamId) throws StreamDefinitionNotFoundException {
-        if(streamSchemaMap.containsKey(streamId)) {
-            return streamSchemaMap.get(streamId);
-        }else {
-            throw new StreamDefinitionNotFoundException(streamId);
-        }
-    }
-
-    public void registerStream(String streamId, StreamDefinition schema){
-        streamSchemaMap.put(streamId,schema);
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/java/org/apache/eagle/alert/engine/mock/MockStreamReceiver.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/java/org/apache/eagle/alert/engine/mock/MockStreamReceiver.java b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/java/org/apache/eagle/alert/engine/mock/MockStreamReceiver.java
deleted file mode 100644
index 1e44c2a..0000000
--- a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/java/org/apache/eagle/alert/engine/mock/MockStreamReceiver.java
+++ /dev/null
@@ -1,79 +0,0 @@
-package org.apache.eagle.alert.engine.mock;
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.eagle.alert.engine.model.PartitionedEvent;
-import org.apache.eagle.alert.utils.AlertConstants;
-import org.apache.eagle.alert.utils.StreamIdConversion;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import backtype.storm.spout.SpoutOutputCollector;
-import backtype.storm.task.TopologyContext;
-import backtype.storm.topology.OutputFieldsDeclarer;
-import backtype.storm.topology.base.BaseRichSpout;
-import backtype.storm.tuple.Fields;
-import backtype.storm.utils.Utils;
-
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-@SuppressWarnings("serial")
-public class MockStreamReceiver extends BaseRichSpout {
-    private final static Logger LOG = LoggerFactory.getLogger(MockStreamReceiver.class);
-    private SpoutOutputCollector collector;
-    private List<String> outputStreamIds;
-    public MockStreamReceiver(int partition){
-        outputStreamIds = new ArrayList<>(partition);
-        for(int i=0;i<partition;i++){
-            outputStreamIds.add(StreamIdConversion.generateStreamIdByPartition(i));
-        }
-    }
-
-    @SuppressWarnings("rawtypes")
-    @Override
-    public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
-        this.collector = collector;
-    }
-
-    @Override
-    public void close() {}
-
-    /**
-     * This unit test is not to mock the end2end logic of correlation spout,
-     * but simply generate some sample data for following bolts testing
-     */
-    @Override
-    public void nextTuple() {
-        PartitionedEvent event = MockSampleMetadataFactory.createRandomOutOfTimeOrderEventGroupedByName("sampleStream_1");
-        LOG.info("Receive {}",event);
-        collector.emit(outputStreamIds.get(
-                // group by the first field in event i.e. name
-                (int) (event.getPartitionKey() % outputStreamIds.size())),
-                Collections.singletonList(event));
-        Utils.sleep(500);
-    }
-
-    @Override
-    public void declareOutputFields(OutputFieldsDeclarer declarer) {
-        for(String streamId:outputStreamIds) {
-            declarer.declareStream(streamId,new Fields(AlertConstants.FIELD_0));
-        }
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/java/org/apache/eagle/alert/engine/perf/TestSerDeserPer.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/java/org/apache/eagle/alert/engine/perf/TestSerDeserPer.java b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/java/org/apache/eagle/alert/engine/perf/TestSerDeserPer.java
deleted file mode 100644
index 251e47f..0000000
--- a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/java/org/apache/eagle/alert/engine/perf/TestSerDeserPer.java
+++ /dev/null
@@ -1,317 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.engine.perf;
-
-import java.io.FileInputStream;
-import java.io.FileOutputStream;
-import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.eagle.alert.engine.coordinator.StreamPartition;
-import org.apache.eagle.alert.engine.coordinator.StreamSortSpec;
-import org.apache.eagle.alert.engine.model.PartitionedEvent;
-import org.apache.eagle.alert.engine.model.StreamEvent;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Test;
-
-import com.esotericsoftware.kryo.Kryo;
-import com.esotericsoftware.kryo.io.Input;
-import com.esotericsoftware.kryo.io.Output;
-
-/**
- * Since 5/13/16.
- */
-public class TestSerDeserPer {
-    Object[] data = null;
-    @Before
-    public void before(){
-        int max = 100;
-        StringBuilder sb = new StringBuilder();
-        for(int i=0; i<max; i++){
-            sb.append("a");
-        }
-        data = new Object[]{sb.toString()};
-    }
-
-    @Test
-    public void testSerDeserPerf() throws Exception{
-        Kryo kryo = new Kryo();
-        Output output = new Output(new FileOutputStream("/tmp/file.bin"));
-        for(int i=0; i<1000; i++){
-            kryo.writeObject(output, constructPE());
-        }
-        output.close();
-        Input input = new Input(new FileInputStream("/tmp/file.bin"));
-        PartitionedEvent someObject = kryo.readObject(input, PartitionedEvent.class);
-        input.close();
-        Assert.assertTrue(someObject.getData().length == 1);
-    }
-
-    private PartitionedEvent constructPE(){
-        StreamEvent e = new StreamEvent();
-        e.setStreamId("testStreamId");
-        e.setTimestamp(1463159382000L);
-        e.setData(data);
-        StreamPartition sp = new StreamPartition();
-        List<String> col = new ArrayList<>();
-        col.add("host");
-        sp.setColumns(col);
-        StreamSortSpec sortSpec = new StreamSortSpec();
-        sortSpec.setWindowMargin(30000);
-        sortSpec.setWindowPeriod("PT1M");
-        sp.setSortSpec(sortSpec);
-        sp.setStreamId("testStreamId");
-        sp.setType(StreamPartition.Type.GROUPBY);
-        PartitionedEvent pe = new PartitionedEvent();
-        pe.setEvent(e);
-        pe.setPartition(sp);
-        pe.setPartitionKey(1000);
-        return pe;
-    }
-
-    @Test
-    public void testSerDeserPerf2() throws Exception{
-        Kryo kryo = new Kryo();
-        Output output = new Output(new FileOutputStream("/tmp/file2.bin"));
-        for(int i=0; i<1000; i++){
-            kryo.writeObject(output, constructNewPE());
-        }
-        output.close();
-        Input input = new Input(new FileInputStream("/tmp/file2.bin"));
-        NewPartitionedEvent someObject = kryo.readObject(input, NewPartitionedEvent.class);
-        input.close();
-        Assert.assertTrue(someObject.getData().length == 1);
-    }
-
-    private NewPartitionedEvent constructNewPE(){
-        NewPartitionedEvent pe = new NewPartitionedEvent();
-        pe.setStreamId("testStreamId");
-        pe.setTimestamp(1463159382000L);
-        pe.setData(data);
-
-        pe.setType(StreamPartition.Type.GROUPBY);
-        List<String> col = new ArrayList<>();
-        col.add("host");
-        pe.setColumns(col);
-        pe.setPartitionKey(1000);
-
-        pe.setWindowMargin(30000);
-        pe.setWindowPeriod("PT1M");
-        return pe;
-    }
-
-    @Test
-    public void testSerDeserPerf3() throws Exception{
-        Kryo kryo = new Kryo();
-        Output output = new Output(new FileOutputStream("/tmp/file3.bin"));
-        for(int i=0; i<1000; i++){
-            kryo.writeObject(output, constructNewPE2());
-        }
-        output.close();
-        Input input = new Input(new FileInputStream("/tmp/file3.bin"));
-        NewPartitionedEvent2 someObject = kryo.readObject(input, NewPartitionedEvent2.class);
-        input.close();
-        Assert.assertTrue(someObject.getData().length == 1);
-    }
-
-    private NewPartitionedEvent2 constructNewPE2(){
-        NewPartitionedEvent2 pe = new NewPartitionedEvent2();
-        pe.setStreamId(100);
-        pe.setTimestamp(1463159382000L);
-        pe.setData(data);
-
-        pe.setType(1);
-        int[] col = new int[1];
-        col[0] = 1;
-        pe.setColumns(col);
-        pe.setPartitionKey(1000);
-
-        pe.setWindowMargin(30000);
-        pe.setWindowPeriod(60);
-        return pe;
-    }
-
-    public static class NewPartitionedEvent implements Serializable {
-        private static final long serialVersionUID = -3840016190614238593L;
-        // basic
-        private String streamId;
-        private long timestamp;
-        private Object[] data;
-
-        // stream partition
-        private StreamPartition.Type type;
-        private List<String> columns = new ArrayList<>();
-        private long partitionKey;
-
-        // sort spec
-        private String windowPeriod="";
-        private long windowMargin = 30 * 1000;
-
-        public NewPartitionedEvent(){
-        }
-
-        public String getStreamId() {
-            return streamId;
-        }
-
-        public void setStreamId(String streamId) {
-            this.streamId = streamId;
-        }
-
-        public long getTimestamp() {
-            return timestamp;
-        }
-
-        public void setTimestamp(long timestamp) {
-            this.timestamp = timestamp;
-        }
-
-        public Object[] getData() {
-            return data;
-        }
-
-        public void setData(Object[] data) {
-            this.data = data;
-        }
-
-        public StreamPartition.Type getType() {
-            return type;
-        }
-
-        public void setType(StreamPartition.Type type) {
-            this.type = type;
-        }
-
-        public List<String> getColumns() {
-            return columns;
-        }
-
-        public void setColumns(List<String> columns) {
-            this.columns = columns;
-        }
-
-        public long getPartitionKey() {
-            return partitionKey;
-        }
-
-        public void setPartitionKey(long partitionKey) {
-            this.partitionKey = partitionKey;
-        }
-
-        public String getWindowPeriod() {
-            return windowPeriod;
-        }
-
-        public void setWindowPeriod(String windowPeriod) {
-            this.windowPeriod = windowPeriod;
-        }
-
-        public long getWindowMargin() {
-            return windowMargin;
-        }
-
-        public void setWindowMargin(long windowMargin) {
-            this.windowMargin = windowMargin;
-        }
-    }
-
-    public static class NewPartitionedEvent2 implements Serializable {
-        private static final long serialVersionUID = -3840016190614238593L;
-        // basic
-        private int streamId;
-        private long timestamp;
-        private Object[] data;
-
-        // stream partition
-        private int type;
-        private int[] columns;
-        private long partitionKey;
-
-        // sort spec
-        private long windowPeriod;
-        private long windowMargin = 30 * 1000;
-
-        public NewPartitionedEvent2(){
-        }
-
-        public int getStreamId() {
-            return streamId;
-        }
-
-        public void setStreamId(int streamId) {
-            this.streamId = streamId;
-        }
-
-        public int getType() {
-            return type;
-        }
-
-        public void setType(int type) {
-            this.type = type;
-        }
-
-        public int[] getColumns() {
-            return columns;
-        }
-
-        public void setColumns(int[] columns) {
-            this.columns = columns;
-        }
-
-        public long getPartitionKey() {
-            return partitionKey;
-        }
-
-        public void setPartitionKey(long partitionKey) {
-            this.partitionKey = partitionKey;
-        }
-
-        public long getWindowPeriod() {
-            return windowPeriod;
-        }
-
-        public void setWindowPeriod(long windowPeriod) {
-            this.windowPeriod = windowPeriod;
-        }
-
-        public long getTimestamp() {
-            return timestamp;
-        }
-
-        public void setTimestamp(long timestamp) {
-            this.timestamp = timestamp;
-        }
-
-        public Object[] getData() {
-            return data;
-        }
-
-        public void setData(Object[] data) {
-            this.data = data;
-        }
-
-        public long getWindowMargin() {
-            return windowMargin;
-        }
-
-        public void setWindowMargin(long windowMargin) {
-            this.windowMargin = windowMargin;
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/java/org/apache/eagle/alert/engine/router/TestAlertBolt.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/java/org/apache/eagle/alert/engine/router/TestAlertBolt.java b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/java/org/apache/eagle/alert/engine/router/TestAlertBolt.java
deleted file mode 100755
index e322099..0000000
--- a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/java/org/apache/eagle/alert/engine/router/TestAlertBolt.java
+++ /dev/null
@@ -1,177 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.engine.router;
-
-import static org.mockito.Matchers.any;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.Semaphore;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import org.apache.eagle.alert.coordination.model.AlertBoltSpec;
-import org.apache.eagle.alert.engine.coordinator.PolicyDefinition;
-import org.apache.eagle.alert.engine.coordinator.StreamColumn;
-import org.apache.eagle.alert.engine.coordinator.StreamDefinition;
-import org.apache.eagle.alert.engine.coordinator.StreamPartition;
-import org.apache.eagle.alert.engine.evaluator.PolicyGroupEvaluator;
-import org.apache.eagle.alert.engine.evaluator.PolicyStreamHandlers;
-import org.apache.eagle.alert.engine.evaluator.impl.PolicyGroupEvaluatorImpl;
-import org.apache.eagle.alert.engine.model.AlertStreamEvent;
-import org.apache.eagle.alert.engine.model.PartitionedEvent;
-import org.apache.eagle.alert.engine.model.StreamEvent;
-import org.apache.eagle.alert.engine.runner.AlertBolt;
-import org.apache.eagle.alert.engine.runner.TestStreamRouterBolt;
-import org.apache.eagle.alert.utils.DateTimeUtil;
-import org.junit.Assert;
-import org.junit.Test;
-
-import backtype.storm.metric.api.MultiCountMetric;
-import backtype.storm.task.GeneralTopologyContext;
-import backtype.storm.task.IOutputCollector;
-import backtype.storm.task.OutputCollector;
-import backtype.storm.task.TopologyContext;
-import backtype.storm.tuple.Fields;
-import backtype.storm.tuple.Tuple;
-import backtype.storm.tuple.TupleImpl;
-
-import com.typesafe.config.Config;
-import com.typesafe.config.ConfigFactory;
-
-/**
- * Since 5/2/16.
- */
-@SuppressWarnings({"rawtypes", "unused"})
-public class TestAlertBolt {
-    /**
-     * Following knowledge is guaranteed in
-     *
-     * @see org.apache.eagle.alert.engine.runner.AlertBolt#execute{
-     *    if(!routedStreamEvent.getRoute().getTargetComponentId().equals(this.policyGroupEvaluator.getName())){
-     *      throw new IllegalStateException("Got event targeted to "+ routedStreamEvent.getRoute().getTargetComponentId()+" in "+this.policyGroupEvaluator.getName());
-     *    }
-     * }
-     *
-     * @throws Exception
-     *
-     * Add test case: 2 alerts should be generated even if they are very close to each other in timestamp
-     */
-    @Test
-    public void testAlertBolt() throws Exception{
-        final AtomicInteger alertCount = new AtomicInteger();
-        final Semaphore mutex = new Semaphore(0);
-        Config config = ConfigFactory.load();
-        PolicyGroupEvaluator policyGroupEvaluator = new PolicyGroupEvaluatorImpl("testPolicyGroupEvaluatorImpl");
-        TestStreamRouterBolt.MockChangeService mockChangeService = new TestStreamRouterBolt.MockChangeService();
-        AlertBolt bolt = new AlertBolt("alertBolt1", policyGroupEvaluator, config, mockChangeService);
-        OutputCollector collector = new OutputCollector(new IOutputCollector(){
-            int count = 0;
-            @Override
-            public List<Integer> emit(String streamId, Collection<Tuple> anchors, List<Object> tuple) {
-                alertCount.incrementAndGet();
-                mutex.release();
-                Assert.assertEquals("testAlertStream", tuple.get(0));
-                AlertStreamEvent event = (AlertStreamEvent) tuple.get(1);
-                System.out.println(String.format("collector received: [streamId=[%s], tuple=[%s] ", streamId, tuple));
-                return null;
-            }
-            @Override
-            public void emitDirect(int taskId, String streamId, Collection<Tuple> anchors, List<Object> tuple) {            }
-            @Override
-            public void ack(Tuple input) {            }
-            @Override
-            public void fail(Tuple input) {            }
-            @Override
-            public void reportError(Throwable error) {            }
-        });
-        Map stormConf = new HashMap<>();
-        TopologyContext topologyContext = mock(TopologyContext.class);
-        when(topologyContext.registerMetric(any(String.class), any(MultiCountMetric.class), any(int.class))).thenReturn(new MultiCountMetric());
-        bolt.prepare(stormConf, topologyContext, collector);
-
-        String streamId = "cpuUsageStream";
-
-        // construct StreamDefinition
-        StreamDefinition schema = new StreamDefinition();
-        schema.setStreamId(streamId);
-        StreamColumn column = new StreamColumn();
-        column.setName("col1");
-        column.setType(StreamColumn.Type.STRING);
-        schema.setColumns(Collections.singletonList(column));
-        Map<String, StreamDefinition> sds = new HashMap<>();
-        sds.put(schema.getStreamId(), schema);
-
-        // construct StreamPartition
-        StreamPartition sp = new StreamPartition();
-        sp.setColumns(Collections.singletonList("col1"));
-        sp.setStreamId(streamId);
-        sp.setType(StreamPartition.Type.GROUPBY);
-
-        AlertBoltSpec spec = new AlertBoltSpec();
-        spec.setVersion("version1");
-        spec.setTopologyName("testTopology");
-        PolicyDefinition pd = new PolicyDefinition();
-        pd.setName("policy1");
-        pd.setPartitionSpec(Collections.singletonList(sp));
-        pd.setOutputStreams(Collections.singletonList("testAlertStream"));
-        pd.setInputStreams(Collections.singletonList(streamId));
-        pd.setDefinition(new PolicyDefinition.Definition());
-        pd.getDefinition().type = PolicyStreamHandlers.SIDDHI_ENGINE;
-        pd.getDefinition().value = "from cpuUsageStream[col1=='value1' OR col1=='value2'] select col1 insert into testAlertStream;";
-        spec.addBoltPolicy("alertBolt1", pd.getName());
-        spec.getBoltPoliciesMap().put("alertBolt1", new ArrayList<PolicyDefinition>(Arrays.asList(pd)));
-        bolt.onAlertBoltSpecChange(spec, sds);
-
-        // contruct GeneralTopologyContext
-        GeneralTopologyContext context = mock(GeneralTopologyContext.class);
-        int taskId = 1;
-        when(context.getComponentId(taskId)).thenReturn("comp1");
-        when(context.getComponentOutputFields("comp1", "default")).thenReturn(new Fields("f0"));
-
-        // construct event with "value1"
-        StreamEvent event1 = new StreamEvent();
-        event1.setTimestamp(DateTimeUtil.humanDateToSeconds("2016-01-01 00:00:00")*1000);
-        Object[] data = new Object[]{"value1"};
-        event1.setData(data);
-        event1.setStreamId(streamId);
-        PartitionedEvent partitionedEvent1 = new PartitionedEvent(event1, sp,1001);
-
-        // construct another event with "value1"
-        StreamEvent event2 = new StreamEvent();
-        event2.setTimestamp(DateTimeUtil.humanDateToSeconds("2016-01-01 00:00:00")*1000);
-        data = new Object[]{"value2"};
-        event2.setData(data);
-        event2.setStreamId(streamId);
-        PartitionedEvent partitionedEvent2 = new PartitionedEvent(event2, sp,1001);
-
-        Tuple input = new TupleImpl(context, Collections.singletonList(partitionedEvent1), taskId, "default");
-        Tuple input2 = new TupleImpl(context, Collections.singletonList(partitionedEvent2), taskId, "default");
-        bolt.execute(input);
-        bolt.execute(input2);
-        Assert.assertTrue("Timeout to acquire mutex in 5s",mutex.tryAcquire(2, 5, TimeUnit.SECONDS));
-        Assert.assertEquals(2, alertCount.get());
-        bolt.cleanup();
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/java/org/apache/eagle/alert/engine/router/TestAlertPublisherBolt.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/java/org/apache/eagle/alert/engine/router/TestAlertPublisherBolt.java b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/java/org/apache/eagle/alert/engine/router/TestAlertPublisherBolt.java
deleted file mode 100644
index af79f96..0000000
--- a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/java/org/apache/eagle/alert/engine/router/TestAlertPublisherBolt.java
+++ /dev/null
@@ -1,111 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- *
- */
-
-package org.apache.eagle.alert.engine.router;
-
-import com.fasterxml.jackson.databind.JavaType;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.databind.type.CollectionType;
-import com.fasterxml.jackson.databind.type.SimpleType;
-import org.apache.eagle.alert.coordination.model.PublishSpec;
-import org.apache.eagle.alert.engine.coordinator.PolicyDefinition;
-import org.apache.eagle.alert.engine.coordinator.Publishment;
-import org.apache.eagle.alert.engine.model.AlertStreamEvent;
-import org.apache.eagle.alert.engine.publisher.AlertPublisher;
-import org.apache.eagle.alert.engine.publisher.impl.AlertPublisherImpl;
-import org.apache.eagle.alert.engine.runner.AlertPublisherBolt;
-import org.apache.eagle.alert.engine.runner.MapComparator;
-import org.apache.eagle.alert.engine.utils.MetadataSerDeser;
-import org.junit.Assert;
-import org.junit.Ignore;
-import org.junit.Test;
-
-import com.typesafe.config.Config;
-import com.typesafe.config.ConfigFactory;
-
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-/**
- * @Since 5/14/16.
- */
-public class TestAlertPublisherBolt {
-
-    @Ignore
-    @Test
-    public void test() {
-        Config config = ConfigFactory.load("application-test.conf");
-        AlertPublisher publisher = new AlertPublisherImpl("alertPublishBolt");
-        publisher.init(config);
-        PublishSpec spec = MetadataSerDeser.deserialize(getClass().getResourceAsStream("/testPublishSpec.json"), PublishSpec.class);
-        publisher.onPublishChange(spec.getPublishments(), null, null, null);
-        AlertStreamEvent event = create("testAlertStream");
-        publisher.nextEvent(event);
-        AlertStreamEvent event1 = create("testAlertStream");
-        publisher.nextEvent(event1);
-    }
-
-    private AlertStreamEvent create(String streamId){
-        AlertStreamEvent alert = new AlertStreamEvent();
-        PolicyDefinition policy = new PolicyDefinition();
-        policy.setName("policy1");
-        alert.setPolicy(policy);
-        alert.setCreatedTime(System.currentTimeMillis());
-        alert.setData(new Object[]{"field_1", 2, "field_3"});
-        alert.setStreamId(streamId);
-        alert.setCreatedBy(this.toString());
-        return alert;
-    }
-
-    @Test
-    public void testMapComparator() {
-        PublishSpec spec1 = MetadataSerDeser.deserialize(getClass().getResourceAsStream("/testPublishSpec.json"), PublishSpec.class);
-        PublishSpec spec2 = MetadataSerDeser.deserialize(getClass().getResourceAsStream("/testPublishSpec2.json"), PublishSpec.class);
-        Map<String, Publishment> map1 = new HashMap<>();
-        Map<String, Publishment> map2 = new HashMap<>();
-        spec1.getPublishments().forEach(p -> map1.put(p.getName(), p));
-        spec2.getPublishments().forEach(p -> map2.put(p.getName(), p));
-
-        MapComparator<String, Publishment> comparator = new MapComparator<>(map1, map2);
-        comparator.compare();
-        Assert.assertTrue(comparator.getModified().size() == 1);
-
-        AlertPublisher alertPublisher = new AlertPublisherImpl("alert-publisher-test");
-        AlertPublisherBolt publisherBolt = new AlertPublisherBolt(alertPublisher, null, null);
-        publisherBolt.onAlertPublishSpecChange(spec1, null);
-        publisherBolt.onAlertPublishSpecChange(spec2, null);
-    }
-
-    @Test
-    public void testAlertPublisher() throws Exception {
-        AlertPublisher alertPublisher = new AlertPublisherImpl("alert-publisher-test");
-        List<Publishment> oldPubs = loadEntities("/publishments.json", Publishment.class);
-        List<Publishment> newPubs = loadEntities("/publishments2.json", Publishment.class);
-        alertPublisher.onPublishChange(oldPubs, null, null, null);
-        alertPublisher.onPublishChange(null, null, newPubs, oldPubs);
-    }
-
-    private <T> List<T> loadEntities(String path, Class<T> tClz) throws Exception {
-        ObjectMapper objectMapper = new ObjectMapper();
-        JavaType type = CollectionType.construct(List.class, SimpleType.construct(tClz));
-        List<T> l = objectMapper.readValue(TestAlertPublisherBolt.class.getResourceAsStream(path), type);
-        return l;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/java/org/apache/eagle/alert/engine/runner/TestStreamRouterBolt.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/java/org/apache/eagle/alert/engine/runner/TestStreamRouterBolt.java b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/java/org/apache/eagle/alert/engine/runner/TestStreamRouterBolt.java
deleted file mode 100644
index 8c048cb..0000000
--- a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/java/org/apache/eagle/alert/engine/runner/TestStreamRouterBolt.java
+++ /dev/null
@@ -1,259 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.engine.runner;
-
-import backtype.storm.metric.api.MultiCountMetric;
-import backtype.storm.task.GeneralTopologyContext;
-import backtype.storm.task.IOutputCollector;
-import backtype.storm.task.OutputCollector;
-import backtype.storm.task.TopologyContext;
-import backtype.storm.tuple.Fields;
-import backtype.storm.tuple.Tuple;
-import backtype.storm.tuple.TupleImpl;
-import com.typesafe.config.Config;
-import com.typesafe.config.ConfigFactory;
-import org.apache.eagle.alert.coordination.model.PolicyWorkerQueue;
-import org.apache.eagle.alert.coordination.model.RouterSpec;
-import org.apache.eagle.alert.coordination.model.StreamRouterSpec;
-import org.apache.eagle.alert.coordination.model.WorkSlot;
-import org.apache.eagle.alert.engine.coordinator.StreamColumn;
-import org.apache.eagle.alert.engine.coordinator.StreamDefinition;
-import org.apache.eagle.alert.engine.coordinator.StreamPartition;
-import org.apache.eagle.alert.engine.coordinator.StreamSortSpec;
-import org.apache.eagle.alert.engine.coordinator.impl.AbstractMetadataChangeNotifyService;
-import org.apache.eagle.alert.engine.model.PartitionedEvent;
-import org.apache.eagle.alert.engine.model.StreamEvent;
-import org.apache.eagle.alert.engine.router.impl.StreamRouterImpl;
-import org.apache.eagle.alert.utils.DateTimeUtil;
-import org.joda.time.Period;
-import org.junit.Assert;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.util.*;
-
-import static org.mockito.Matchers.any;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
-public class TestStreamRouterBolt {
-    private final static Logger LOG = LoggerFactory.getLogger(TestStreamRouterBolt.class);
-
-    /**
-     * Mocked 5 Events
-     *
-     * 1. Sent in random order:
-     * "value1","value2","value3","value4","value5"
-     *
-     * 2. Received correct time order and value5 is thrown because too late: "value2","value1","value3","value4"
-     *
-     * @throws Exception
-     */
-    @SuppressWarnings("rawtypes")
-    @Test
-    public void testRouterWithSortAndRouteSpec() throws Exception{
-        Config config = ConfigFactory.load();
-        StreamRouterImpl routerImpl = new StreamRouterImpl("testStreamRouterImpl");
-        MockChangeService mockChangeService = new MockChangeService();
-        StreamRouterBolt bolt = new StreamRouterBolt(routerImpl, config, mockChangeService);
-
-        final Map<String,List<PartitionedEvent>> streamCollected = new HashMap<>();
-        final List<PartitionedEvent> orderCollected = new ArrayList<>();
-
-        OutputCollector collector = new OutputCollector(new IOutputCollector(){
-            int count = 0;
-            @Override
-            public List<Integer> emit(String streamId, Collection<Tuple> anchors, List<Object> tuple) {
-                PartitionedEvent event;
-                try {
-                    event = bolt.deserialize(tuple.get(0));
-                } catch (IOException e) {
-                    throw new RuntimeException(e);
-                }
-                if(count == 0) {
-                    count++;
-                }
-                LOG.info(String.format("Collector received: [streamId=[%s], tuple=[%s] ", streamId, tuple));
-                if(!streamCollected.containsKey(streamId)){
-                    streamCollected.put(streamId,new ArrayList<>());
-                }
-                streamCollected.get(streamId).add(event);
-                orderCollected.add(event);
-                return null;
-            }
-            @Override
-            public void emitDirect(int taskId, String streamId, Collection<Tuple> anchors, List<Object> tuple) {            }
-            @Override
-            public void ack(Tuple input) {            }
-            @Override
-            public void fail(Tuple input) {            }
-            @SuppressWarnings("unused")
-            public void resetTimeout(Tuple input) {            }
-            @Override
-            public void reportError(Throwable error) {            }
-        });
-
-        Map stormConf = new HashMap<>();
-        TopologyContext topologyContext = mock(TopologyContext.class);
-        when(topologyContext.registerMetric(any(String.class), any(MultiCountMetric.class), any(int.class))).thenReturn(new MultiCountMetric());
-        bolt.prepare(stormConf, topologyContext, collector);
-
-        String streamId = "cpuUsageStream";
-        // StreamPartition, groupby col1 for stream cpuUsageStream
-        StreamPartition sp = new StreamPartition();
-        sp.setStreamId(streamId);
-        sp.setColumns(Collections.singletonList("col1"));
-        sp.setType(StreamPartition.Type.GROUPBY);
-
-        StreamSortSpec sortSpec = new StreamSortSpec();
-//        sortSpec.setColumn("timestamp");
-//        sortSpec.setOrder("asc");
-        sortSpec.setWindowPeriod2(Period.minutes(1));
-        sortSpec.setWindowMargin(1000);
-        sp.setSortSpec(sortSpec);
-
-        RouterSpec boltSpec = new RouterSpec();
-
-        // set StreamRouterSpec to have 2 WorkSlot
-        StreamRouterSpec routerSpec = new StreamRouterSpec();
-        routerSpec.setPartition(sp);
-        routerSpec.setStreamId(streamId);
-        PolicyWorkerQueue queue = new PolicyWorkerQueue();
-        queue.setPartition(sp);
-        queue.setWorkers(Arrays.asList(new WorkSlot("testTopology","alertBolt1"), new WorkSlot("testTopology","alertBolt2")));
-        routerSpec.setTargetQueue(Collections.singletonList(queue));
-        boltSpec.addRouterSpec(routerSpec);
-
-        // construct StreamDefinition
-        StreamDefinition schema = new StreamDefinition();
-        schema.setStreamId(streamId);
-        StreamColumn column = new StreamColumn();
-        column.setName("col1");
-        column.setType(StreamColumn.Type.STRING);
-        schema.setColumns(Collections.singletonList(column));
-        Map<String, StreamDefinition> sds = new HashMap<>();
-        sds.put(schema.getStreamId(), schema);
-
-        bolt.declareOutputStreams(Arrays.asList("alertBolt1", "alertBolt2"));
-        bolt.onStreamRouteBoltSpecChange(boltSpec, sds);
-        GeneralTopologyContext context = mock(GeneralTopologyContext.class);
-        int taskId = 1;
-        when(context.getComponentId(taskId)).thenReturn("comp1");
-        when(context.getComponentOutputFields("comp1", "default")).thenReturn(new Fields("f0"));
-
-        // =======================================
-        // Mock 5 Events
-        //
-        // 1. Sent in random order:
-        // "value1","value2","value3","value4","value5"
-        //
-        // 2. Received correct time order and value5 is thrown because too:
-        // "value2","value1","value3","value4"
-        // =======================================
-
-        // construct event with "value1"
-        StreamEvent event = new StreamEvent();
-        event.setTimestamp(DateTimeUtil.humanDateToSeconds("2016-01-01 00:01:30")*1000);
-        Object[] data = new Object[]{"value1"};
-        event.setData(data);
-        event.setStreamId(streamId);
-        PartitionedEvent pEvent = new PartitionedEvent();
-        pEvent.setEvent(event);
-        pEvent.setPartition(sp);
-        Tuple input = new TupleImpl(context, Collections.singletonList(pEvent), taskId, "default");
-        bolt.execute(input);
-
-        // construct another event with "value2"
-        event = new StreamEvent();
-        event.setTimestamp(DateTimeUtil.humanDateToSeconds("2016-01-01 00:01:10")*1000);
-        data = new Object[]{"value2"};
-        event.setData(data);
-        event.setStreamId(streamId);
-        pEvent = new PartitionedEvent();
-        pEvent.setPartition(sp);
-        pEvent.setEvent(event);
-        input = new TupleImpl(context, Collections.singletonList(pEvent), taskId, "default");
-        bolt.execute(input);
-
-        // construct another event with "value3"
-        event = new StreamEvent();
-        event.setTimestamp(DateTimeUtil.humanDateToSeconds("2016-01-01 00:01:40")*1000);
-        data = new Object[]{"value3"};
-        event.setData(data);
-        event.setStreamId(streamId);
-        pEvent = new PartitionedEvent();
-        pEvent.setPartition(sp);
-        pEvent.setEvent(event);
-        input = new TupleImpl(context, Collections.singletonList(pEvent), taskId, "default");
-        bolt.execute(input);
-
-        // construct another event with "value4"
-        event = new StreamEvent();
-        event.setTimestamp(DateTimeUtil.humanDateToSeconds("2016-01-01 00:02:10")*1000);
-        data = new Object[]{"value4"};
-        event.setData(data);
-        event.setStreamId(streamId);
-        pEvent = new PartitionedEvent();
-        pEvent.setPartition(sp);
-        pEvent.setEvent(event);
-        input = new TupleImpl(context, Collections.singletonList(pEvent), taskId, "default");
-        bolt.execute(input);
-
-        // construct another event with "value5", which will be thrown because two late
-        event = new StreamEvent();
-        event.setTimestamp(DateTimeUtil.humanDateToSeconds("2016-01-01 00:00:10")*1000);
-        data = new Object[]{"value5"};
-        event.setData(data);
-        event.setStreamId(streamId);
-        pEvent = new PartitionedEvent();
-        pEvent.setPartition(sp);
-        pEvent.setEvent(event);
-        input = new TupleImpl(context, Collections.singletonList(pEvent), taskId, "default");
-        bolt.execute(input);
-
-        Assert.assertEquals("Should finally collect two streams",2,streamCollected.size());
-        Assert.assertTrue("Should collect stream stream_testStreamRouterImpl_to_alertBolt1",streamCollected.keySet().contains("stream_testStreamRouterImpl_to_alertBolt1"));
-        Assert.assertTrue("Should collect stream stream_testStreamRouterImpl_to_alertBolt2",streamCollected.keySet().contains("stream_testStreamRouterImpl_to_alertBolt2"));
-
-        Assert.assertEquals("Should finally collect 3 events",3,orderCollected.size());
-        Assert.assertArrayEquals("Should sort 3 events in ASC order",new String[]{"value2","value1","value3"},orderCollected.stream().map((d)->d.getData()[0]).toArray());
-
-        // The first 3 events are ticked automatically by window
-
-        bolt.cleanup();
-
-        // Close will flush all events in memory, so will receive the last event which is still in memory as window is not expired according to clock
-        // The 5th event will be thrown because too late and out of margin
-
-        Assert.assertEquals("Should finally collect two streams",2,streamCollected.size());
-        Assert.assertEquals("Should finally collect 3 events",4,orderCollected.size());
-        Assert.assertArrayEquals("Should sort 4 events in ASC-ordered timestamp",new String[]{"value2","value1","value3","value4"},orderCollected.stream().map((d)->d.getData()[0]).toArray());
-
-    }
-
-    @SuppressWarnings("serial")
-    public static class MockChangeService extends AbstractMetadataChangeNotifyService{
-        private final static Logger LOG = LoggerFactory.getLogger(MockChangeService.class);
-
-        @Override
-        public void close() throws IOException {
-            LOG.info("Closing");
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/java/org/apache/eagle/alert/engine/serialization/JavaSerializationTest.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/java/org/apache/eagle/alert/engine/serialization/JavaSerializationTest.java b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/java/org/apache/eagle/alert/engine/serialization/JavaSerializationTest.java
deleted file mode 100644
index 13d1015..0000000
--- a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/java/org/apache/eagle/alert/engine/serialization/JavaSerializationTest.java
+++ /dev/null
@@ -1,112 +0,0 @@
-package org.apache.eagle.alert.engine.serialization;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.List;
-
-import org.apache.commons.lang3.SerializationUtils;
-import org.apache.eagle.alert.engine.coordinator.StreamColumn;
-import org.apache.eagle.alert.engine.coordinator.StreamDefinition;
-import org.apache.eagle.alert.engine.coordinator.StreamPartition;
-import org.apache.eagle.alert.engine.model.PartitionedEvent;
-import org.apache.eagle.alert.engine.model.StreamEvent;
-import org.apache.eagle.alert.utils.ByteUtils;
-import org.junit.Assert;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-public class JavaSerializationTest {
-    private final static Logger LOG = LoggerFactory.getLogger(JavaSerializationTest.class);
-
-    @Test
-    public void testJavaSerialization(){
-        PartitionedEvent partitionedEvent = new PartitionedEvent();
-        partitionedEvent.setPartitionKey(partitionedEvent.hashCode());
-        partitionedEvent.setPartition(createSampleStreamGroupbyPartition("sampleStream",Arrays.asList("name","host")));
-        StreamEvent event = new StreamEvent();
-        event.setStreamId("sampleStream");
-        event.setTimestamp(System.currentTimeMillis());
-        event.setData(new Object[]{"CPU","LOCALHOST",true,Long.MAX_VALUE,60.0});
-        partitionedEvent.setEvent(event);
-
-        int javaSerializationLength = SerializationUtils.serialize(partitionedEvent).length;
-        LOG.info("Java serialization length: {}, event: {}",javaSerializationLength,partitionedEvent);
-
-        int compactLength = 0;
-        compactLength += "sampleStream".getBytes().length;
-        compactLength += ByteUtils.intToBytes(partitionedEvent.getPartition().hashCode()).length;
-        compactLength += ByteUtils.longToBytes(partitionedEvent.getTimestamp()).length;
-        compactLength += "CPU".getBytes().length;
-        compactLength += "LOCALHOST".getBytes().length;
-        compactLength += 1;
-        compactLength += ByteUtils.longToBytes(Long.MAX_VALUE).length;
-        compactLength += ByteUtils.doubleToBytes(60.0).length;
-
-        LOG.info("Compact serialization length: {}, event: {}",compactLength,partitionedEvent);
-        Assert.assertTrue(compactLength * 20 < javaSerializationLength);
-    }
-
-
-    public static StreamDefinition createSampleStreamDefinition(String streamId){
-        StreamDefinition sampleStreamDefinition = new StreamDefinition();
-        sampleStreamDefinition.setStreamId(streamId);
-        sampleStreamDefinition.setTimeseries(true);
-        sampleStreamDefinition.setValidate(true);
-        sampleStreamDefinition.setDescription("Schema for "+streamId);
-        List<StreamColumn> streamColumns = new ArrayList<>();
-
-        streamColumns.add(new StreamColumn.Builder().name("name").type(StreamColumn.Type.STRING).build());
-        streamColumns.add(new StreamColumn.Builder().name("host").type(StreamColumn.Type.STRING).build());
-        streamColumns.add(new StreamColumn.Builder().name("flag").type(StreamColumn.Type.BOOL).build());
-        streamColumns.add(new StreamColumn.Builder().name("data").type(StreamColumn.Type.LONG).build());
-        streamColumns.add(new StreamColumn.Builder().name("value").type(StreamColumn.Type.DOUBLE).build());
-        sampleStreamDefinition.setColumns(streamColumns);
-        return sampleStreamDefinition;
-    }
-
-    public static StreamPartition createSampleStreamGroupbyPartition(String streamId, List<String> groupByField){
-        StreamPartition streamPartition = new StreamPartition();
-        streamPartition.setStreamId(streamId);
-        streamPartition.setColumns(groupByField);
-        streamPartition.setType(StreamPartition.Type.GROUPBY);
-        return streamPartition;
-    }
-
-    @SuppressWarnings("serial")
-    public static PartitionedEvent createSimpleStreamEvent()  {
-        StreamEvent event = StreamEvent.Builder()
-                    .schema(createSampleStreamDefinition("sampleStream_1"))
-                    .streamId("sampleStream_1")
-                    .timestamep(System.currentTimeMillis())
-                    .attributes(new HashMap<String,Object>(){{
-                        put("name","cpu");
-                        put("host","localhost");
-                        put("flag",true);
-                        put("value",60.0);
-                        put("data",Long.MAX_VALUE);
-                        put("unknown","unknown column value");
-                    }}).build();
-        PartitionedEvent pEvent = new PartitionedEvent();
-        pEvent.setEvent(event);
-        pEvent.setPartition(createSampleStreamGroupbyPartition("sampleStream_1", Arrays.asList("name","host")));
-        return pEvent;
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/java/org/apache/eagle/alert/engine/serialization/PartitionedEventSerializerTest.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/java/org/apache/eagle/alert/engine/serialization/PartitionedEventSerializerTest.java b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/java/org/apache/eagle/alert/engine/serialization/PartitionedEventSerializerTest.java
deleted file mode 100644
index 0347d50..0000000
--- a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/java/org/apache/eagle/alert/engine/serialization/PartitionedEventSerializerTest.java
+++ /dev/null
@@ -1,220 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.engine.serialization;
-
-import backtype.storm.serialization.DefaultKryoFactory;
-import backtype.storm.serialization.DefaultSerializationDelegate;
-import com.esotericsoftware.kryo.*;
-import com.esotericsoftware.kryo.io.Input;
-import com.esotericsoftware.kryo.io.Output;
-import com.google.common.io.ByteArrayDataInput;
-import com.google.common.io.ByteArrayDataOutput;
-import com.google.common.io.ByteStreams;
-import org.apache.commons.lang.time.StopWatch;
-import org.apache.eagle.alert.engine.mock.MockSampleMetadataFactory;
-import org.apache.eagle.alert.engine.model.PartitionedEvent;
-import org.apache.eagle.alert.engine.serialization.impl.PartitionedEventSerializerImpl;
-import org.apache.eagle.alert.utils.TimePeriodUtils;
-import org.joda.time.Period;
-import org.junit.Assert;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.util.BitSet;
-
-
-public class PartitionedEventSerializerTest {
-    private final static Logger LOG = LoggerFactory.getLogger(PartitionedEventSerializerTest.class);
-    @Test
-    public void testPartitionEventSerialization() throws IOException {
-        PartitionedEvent partitionedEvent = MockSampleMetadataFactory.createPartitionedEventGroupedByName("sampleStream",System.currentTimeMillis());;
-        PartitionedEventSerializerImpl serializer = new PartitionedEventSerializerImpl(MockSampleMetadataFactory::createSampleStreamDefinition);
-
-        ByteArrayDataOutput dataOutput1 = ByteStreams.newDataOutput();
-        serializer.serialize(partitionedEvent,dataOutput1);
-        byte[] serializedBytes = dataOutput1.toByteArray();
-        PartitionedEvent deserializedEvent = serializer.deserialize(ByteStreams.newDataInput(serializedBytes));
-        Assert.assertEquals(partitionedEvent,deserializedEvent);
-
-        PartitionedEventSerializerImpl compressSerializer = new PartitionedEventSerializerImpl(MockSampleMetadataFactory::createSampleStreamDefinition,true);
-
-        byte[] serializedBytesCompressed = compressSerializer.serialize(partitionedEvent);
-        PartitionedEvent deserializedEventCompressed = compressSerializer.deserialize(serializedBytesCompressed);
-        Assert.assertEquals(partitionedEvent,deserializedEventCompressed);
-
-        PartitionedEventDigestSerializer serializer2 = new PartitionedEventDigestSerializer(MockSampleMetadataFactory::createSampleStreamDefinition);
-        ByteArrayDataOutput dataOutput2 = ByteStreams.newDataOutput();
-        serializer2.serialize(partitionedEvent,dataOutput2);
-        byte[] serializedBytes2 = dataOutput2.toByteArray();
-        ByteArrayDataInput dataInput2 = ByteStreams.newDataInput(serializedBytes2);
-        PartitionedEvent deserializedEvent2 = serializer2.deserialize(dataInput2);
-        Assert.assertEquals(partitionedEvent,deserializedEvent2);
-
-        byte[] javaSerialization = new DefaultSerializationDelegate().serialize(partitionedEvent);
-        Kryo kryo = new DefaultKryoFactory.KryoSerializableDefault();
-        Output output = new Output(10000);
-        kryo.writeClassAndObject(output,partitionedEvent);
-        byte[] kryoBytes = output.toBytes();
-        Input input = new Input(kryoBytes);
-        PartitionedEvent kryoDeserializedEvent = (PartitionedEvent) kryo.readClassAndObject(input);
-        Assert.assertEquals(partitionedEvent,kryoDeserializedEvent);
-        LOG.info("\nCached Stream:{}\nCompressed Cached Stream :{}\nCached Stream + Cached Partition: {}\nJava Native: {}\nKryo: {}\nKryo + Cached Stream: {}\nKryo + Cached Stream + Cached Partition: {}",serializedBytes.length,serializedBytesCompressed.length,serializedBytes2.length,javaSerialization.length,kryoBytes.length,kryoSerialize(serializedBytes).length,kryoSerialize(serializedBytes2).length);
-    }
-    @Test
-    public void testPartitionEventSerializationEfficiency() throws IOException {
-        PartitionedEvent partitionedEvent = MockSampleMetadataFactory.createPartitionedEventGroupedByName("sampleStream",System.currentTimeMillis());;
-        PartitionedEventSerializerImpl serializer = new PartitionedEventSerializerImpl(MockSampleMetadataFactory::createSampleStreamDefinition);
-
-        int count = 100000;
-        StopWatch stopWatch = new StopWatch();
-        stopWatch.start();
-        int i = 0;
-        while(i<count) {
-            ByteArrayDataOutput dataOutput1 = ByteStreams.newDataOutput();
-            serializer.serialize(partitionedEvent, dataOutput1);
-            byte[] serializedBytes = dataOutput1.toByteArray();
-            PartitionedEvent deserializedEvent = serializer.deserialize(ByteStreams.newDataInput(serializedBytes));
-            Assert.assertEquals(partitionedEvent, deserializedEvent);
-            i++;
-        }
-        stopWatch.stop();
-        LOG.info("Cached Stream: {} ms",stopWatch.getTime());
-        stopWatch.reset();
-        PartitionedEventSerializerImpl compressSerializer = new PartitionedEventSerializerImpl(MockSampleMetadataFactory::createSampleStreamDefinition,true);
-        i = 0;
-        stopWatch.start();
-        while(i<count) {
-            byte[] serializedBytesCompressed = compressSerializer.serialize(partitionedEvent);
-            PartitionedEvent deserializedEventCompressed = compressSerializer.deserialize(serializedBytesCompressed);
-            Assert.assertEquals(partitionedEvent, deserializedEventCompressed);
-            i++;
-        }
-        stopWatch.stop();
-        LOG.info("Compressed Cached Stream: {} ms",stopWatch.getTime());
-        stopWatch.reset();
-
-        i = 0;
-        stopWatch.start();
-        while(i<count) {
-            PartitionedEventDigestSerializer serializer2 = new PartitionedEventDigestSerializer(MockSampleMetadataFactory::createSampleStreamDefinition);
-            ByteArrayDataOutput dataOutput2 = ByteStreams.newDataOutput();
-            serializer2.serialize(partitionedEvent, dataOutput2);
-            byte[] serializedBytes2 = dataOutput2.toByteArray();
-            ByteArrayDataInput dataInput2 = ByteStreams.newDataInput(serializedBytes2);
-            PartitionedEvent deserializedEvent2 = serializer2.deserialize(dataInput2);
-            Assert.assertEquals(partitionedEvent, deserializedEvent2);
-            i++;
-        }
-        stopWatch.stop();
-        LOG.info("Cached Stream&Partition: {} ms",stopWatch.getTime());
-        stopWatch.reset();
-        i = 0;
-        stopWatch.start();
-        while(i<count) {
-            byte[] javaSerialization = new DefaultSerializationDelegate().serialize(partitionedEvent);
-            PartitionedEvent javaSerializedEvent = (PartitionedEvent) new DefaultSerializationDelegate().deserialize(javaSerialization);
-            Assert.assertEquals(partitionedEvent, javaSerializedEvent);
-            i++;
-        }
-        stopWatch.stop();
-        LOG.info("Java Native: {} ms",stopWatch.getTime());
-        stopWatch.reset();
-        i = 0;
-        stopWatch.start();
-        Kryo kryo = new DefaultKryoFactory.KryoSerializableDefault();
-        while(i<count) {
-            Output output = new Output(10000);
-            kryo.writeClassAndObject(output, partitionedEvent);
-            byte[] kryoBytes = output.toBytes();
-            Input input = new Input(kryoBytes);
-            PartitionedEvent kryoDeserializedEvent = (PartitionedEvent) kryo.readClassAndObject(input);
-            Assert.assertEquals(partitionedEvent, kryoDeserializedEvent);
-            i++;
-        }
-        stopWatch.stop();
-        LOG.info("Kryo: {} ms",stopWatch.getTime());
-    }
-
-    /**
-     * Kryo Serialization Length = Length of byte[] + 2
-     */
-    @Test
-    public void testKryoByteArraySerialization(){
-        Kryo kryo = new DefaultKryoFactory.KryoSerializableDefault();
-        byte[] bytes = new byte[]{0,1,2,3,4,5,6,7,8,9};
-        Output output = new Output(1000);
-        kryo.writeObject(output,bytes);
-        Assert.assertEquals(bytes.length + 2,output.toBytes().length);
-    }
-
-    private byte[] kryoSerialize(Object object){
-        Kryo kryo = new DefaultKryoFactory.KryoSerializableDefault();
-        Output output = new Output(100000);
-        kryo.writeClassAndObject(output,object);
-        return output.toBytes();
-    }
-
-    @Test
-    public void testBitSet(){
-        BitSet bitSet = new BitSet();
-        bitSet.set(0,true); // 1
-        bitSet.set(1,false); // 0
-        bitSet.set(2,true); // 1
-        LOG.info("Bit Set Size: {}",bitSet.size());
-        LOG.info("Bit Set Byte[]: {}",bitSet.toByteArray());
-        LOG.info("Bit Set Byte[]: {}",bitSet.toLongArray());
-        LOG.info("BitSet[0]: {}",bitSet.get(0));
-        LOG.info("BitSet[1]: {}",bitSet.get(1));
-        LOG.info("BitSet[1]: {}",bitSet.get(2));
-
-        byte[] bytes = bitSet.toByteArray();
-
-        BitSet bitSet2 = BitSet.valueOf(bytes);
-
-        LOG.info("Bit Set Size: {}",bitSet2.size());
-        LOG.info("Bit Set Byte[]: {}",bitSet2.toByteArray());
-        LOG.info("Bit Set Byte[]: {}",bitSet2.toLongArray());
-        LOG.info("BitSet[0]: {}",bitSet2.get(0));
-        LOG.info("BitSet[1]: {}",bitSet2.get(1));
-        LOG.info("BitSet[1]: {}",bitSet2.get(2));
-
-
-        BitSet bitSet3 = new BitSet();
-        bitSet3.set(0,true);
-        Assert.assertEquals(1,bitSet3.length());
-
-        BitSet bitSet4 = new BitSet();
-        bitSet4.set(0,false);
-        Assert.assertEquals(0,bitSet4.length());
-        Assert.assertFalse(bitSet4.get(1));
-        Assert.assertFalse(bitSet4.get(2));
-    }
-
-    @Test
-    public void testPeriod(){
-        Assert.assertEquals(30*60*1000, TimePeriodUtils.getMillisecondsOfPeriod(Period.parse("PT30m")));
-        Assert.assertEquals(30*60*1000, TimePeriodUtils.getMillisecondsOfPeriod(Period.millis(30*60*1000)));
-        Assert.assertEquals("PT1800S", Period.millis(30*60*1000).toString());
-    }
-
-    @Test
-    public void testPartitionType(){
-
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/java/org/apache/eagle/alert/engine/siddhi/extension/AttributeCollectAggregatorTest.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/java/org/apache/eagle/alert/engine/siddhi/extension/AttributeCollectAggregatorTest.java b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/java/org/apache/eagle/alert/engine/siddhi/extension/AttributeCollectAggregatorTest.java
deleted file mode 100644
index f2f3b46..0000000
--- a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/java/org/apache/eagle/alert/engine/siddhi/extension/AttributeCollectAggregatorTest.java
+++ /dev/null
@@ -1,149 +0,0 @@
-/*
- *
- *  * Licensed to the Apache Software Foundation (ASF) under one or more
- *  * contributor license agreements.  See the NOTICE file distributed with
- *  * this work for additional information regarding copyright ownership.
- *  * The ASF licenses this file to You under the Apache License, Version 2.0
- *  * (the "License"); you may not use this file except in compliance with
- *  * the License.  You may obtain a copy of the License at
- *  *
- *  *    http://www.apache.org/licenses/LICENSE-2.0
- *  *
- *  * Unless required by applicable law or agreed to in writing, software
- *  * distributed under the License is distributed on an "AS IS" BASIS,
- *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  * See the License for the specific language governing permissions and
- *  * limitations under the License.
- *
- */
-package org.apache.eagle.alert.engine.siddhi.extension;
-
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Random;
-
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.wso2.siddhi.core.ExecutionPlanRuntime;
-import org.wso2.siddhi.core.SiddhiManager;
-import org.wso2.siddhi.core.event.Event;
-import org.wso2.siddhi.core.stream.input.InputHandler;
-import org.wso2.siddhi.core.stream.output.StreamCallback;
-
-/**
- * @since Apr 1, 2016
- *
- */
-public class AttributeCollectAggregatorTest {
-
-    private static final Logger logger = LoggerFactory.getLogger(AttributeCollectAggregatorTest.class);
-
-    @Test
-    public void test() throws Exception {
-        String ql = "define stream s1(timestamp long, host string, type string);";
-        ql += " from s1#window.externalTime(timestamp, 1 sec)";
-        ql += " select eagle:collect(timestamp) as timestamps, eagle:collect(host) as hosts, type group by type insert into output;";
-
-        SiddhiManager sm = new SiddhiManager();
-        ExecutionPlanRuntime runtime = sm.createExecutionPlanRuntime(ql);
-
-        InputHandler input = runtime.getInputHandler("s1");
-        runtime.addCallback("output", new StreamCallback() {
-
-            @Override
-            public void receive(Event[] arg0) {
-                logger.info("output event length:" + arg0.length);
-
-                for (Event e : arg0) {
-                    StringBuilder sb = new StringBuilder("\t - [").append(e.getData().length).append("]");
-                    for (Object o : e.getData()) {
-                        sb.append("," + o);
-                    }
-                    logger.info(sb.toString());
-                }
-                logger.info("===end===");
-            }
-        });
-//        StreamDefinition definition = (StreamDefinition) runtime.getStreamDefinitionMap().get("output");
-
-        runtime.start();
-
-        Event[] events = generateEvents();
-        for (Event e : events) {
-            input.send(e);
-        }
-
-        Thread.sleep(1000);
-
-    }
-
-    private Event[] generateEvents() {
-        List<Event> events = new LinkedList<Event>();
-
-        Random r = new Random();
-        Event e = null;
-        long base = System.currentTimeMillis();
-        {
-            e = new Event(base, new Object[] { base, "host" + r.nextInt(), "nova" });
-            base += 100;
-            events.add(e);
-            e = new Event(base, new Object[] { base, "host" + r.nextInt(), "nova" });
-            base += 100;
-            events.add(e);
-            e = new Event(base, new Object[] { base, "host" + r.nextInt(), "nova" });
-            base += 100;
-            events.add(e);
-        }
-
-        {
-            e = new Event(base, new Object[] { base, "host" + r.nextInt(), "neutron" });
-            base += 100;
-            events.add(e);
-            e = new Event(base, new Object[] { base, "host" + r.nextInt(), "neutron" });
-            base += 100;
-            events.add(e);
-            e = new Event(base, new Object[] { base, "host" + r.nextInt(), "neutron" });
-            base += 100;
-            events.add(e);
-        }
-
-        base += 10000;
-        {
-            e = new Event(base, new Object[] { base, "host" + r.nextInt(), "nova1" });
-            base += 100;
-            events.add(e);
-            e = new Event(base, new Object[] { base, "host" + r.nextInt(), "nova1" });
-            base += 100;
-            events.add(e);
-            e = new Event(base, new Object[] { base, "host" + r.nextInt(), "nova1" });
-            base += 100;
-            events.add(e);
-        }
-
-        {
-            e = new Event(base, new Object[] { base, "host" + r.nextInt(), "neutron2" });
-            base += 100;
-            events.add(e);
-            e = new Event(base, new Object[] { base, "host" + r.nextInt(), "neutron2" });
-            base += 100;
-            events.add(e);
-            e = new Event(base, new Object[] { base, "host" + r.nextInt(), "neutron2" });
-            base += 100;
-            events.add(e);
-        }
-        base += 10000;
-        e = new Event(base, new Object[] { base, "host" + r.nextInt(), "mq" });
-
-        return events.toArray(new Event[0]);
-    }
-    
-    @Test
-    public void testQuery() {
-        String ql = "define stream perfmon_input_stream_cpu ( host string,timestamp long,metric string,pool string,value double,colo string );";
-        ql += "from perfmon_input_stream_cpu#window.length(3) select host, min(value) as min group by host having min>91.0 insert into perfmon_output_stream_cpu;";
-
-        SiddhiManager sm = new SiddhiManager();
-        sm.createExecutionPlanRuntime(ql);
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/java/org/apache/eagle/alert/engine/sorter/MapDBTestSuite.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/java/org/apache/eagle/alert/engine/sorter/MapDBTestSuite.java b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/java/org/apache/eagle/alert/engine/sorter/MapDBTestSuite.java
deleted file mode 100644
index 613be00..0000000
--- a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/java/org/apache/eagle/alert/engine/sorter/MapDBTestSuite.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.engine.sorter;
-
-import org.junit.Assert;
-import org.junit.Test;
-import org.mapdb.BTreeMap;
-import org.mapdb.DB;
-import org.mapdb.DBMaker;
-import org.mapdb.Serializer;
-
-public class MapDBTestSuite {
-    @Test
-    public void testOnHeapDB(){
-        DB db = DBMaker.heapDB().make();
-        BTreeMap<Long,String> map = db.treeMap("btree").keySerializer(Serializer.LONG).valueSerializer(Serializer.STRING).create();
-        Assert.assertFalse(map.putIfAbsentBoolean(1L,"val_1"));
-        Assert.assertTrue(map.putIfAbsentBoolean(1L,"val_2"));
-        Assert.assertTrue(map.putIfAbsentBoolean(1L,"val_3"));
-        Assert.assertFalse(map.putIfAbsentBoolean(2L,"val_4"));
-
-        Assert.assertEquals("val_1",map.get(1L));
-        Assert.assertEquals("val_4",map.get(2L));
-
-        Assert.assertTrue(map.replace(2L,"val_4","val_5"));
-        Assert.assertEquals("val_5",map.get(2L));
-
-        map.close();
-        db.close();
-    }
-}