You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@eagle.apache.org by ha...@apache.org on 2017/04/03 11:54:28 UTC

[20/84] [partial] eagle git commit: Clean repo for eagle site

http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/sorter/StreamWindowRepository.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/sorter/StreamWindowRepository.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/sorter/StreamWindowRepository.java
deleted file mode 100755
index 7a503e1..0000000
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/sorter/StreamWindowRepository.java
+++ /dev/null
@@ -1,252 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.engine.sorter;
-
-import org.apache.eagle.alert.engine.sorter.impl.StreamSortedWindowInMapDB;
-import org.apache.eagle.alert.engine.sorter.impl.StreamSortedWindowOnHeap;
-import com.google.common.base.Preconditions;
-import org.mapdb.DB;
-import org.mapdb.DBMaker;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.File;
-import java.io.IOException;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.UUID;
-
-/**
- * ===== Benchmark Result Report =====<br/><br/>
- *
- * <p>Num. Operation   Type                            Time<br/>
- * ---- ---------   ----                            ----<br/>
- * 1000    FlushTime    DIRECT_MEMORY                :    55<br/>
- * 1000    FlushTime    FILE_RAF                     :    63<br/>
- * 1000    FlushTime    MEMORY                       :    146<br/>
- * 1000    FlushTime    ONHEAP                       :    17<br/>
- * 1000    InsertTime    DIRECT_MEMORY               :    68<br/>
- * 1000    InsertTime    FILE_RAF                    :    223<br/>
- * 1000    InsertTime    MEMORY                      :    273<br/>
- * 1000    InsertTime    ONHEAP                      :    20<br/>
- * 10000    FlushTime    DIRECT_MEMORY               :    551<br/>
- * 10000    FlushTime    FILE_RAF                    :    668<br/>
- * 10000    FlushTime    MEMORY                      :    643<br/>
- * 10000    FlushTime    ONHEAP                      :    5<br/>
- * 10000    InsertTime    DIRECT_MEMORY              :    446<br/>
- * 10000    InsertTime    FILE_RAF                   :    2095<br/>
- * 10000    InsertTime    MEMORY                     :    784<br/>
- * 10000    InsertTime    ONHEAP                     :    29<br/>
- * 100000    FlushTime    DIRECT_MEMORY              :    6139<br/>
- * 100000    FlushTime    FILE_RAF                   :    6237<br/>
- * 100000    FlushTime    MEMORY                     :    6238<br/>
- * 100000    FlushTime    ONHEAP                     :    18<br/>
- * 100000    InsertTime    DIRECT_MEMORY             :    4499<br/>
- * 100000    InsertTime    FILE_RAF                  :    22343<br/>
- * 100000    InsertTime    MEMORY                    :    4962<br/>
- * 100000    InsertTime    ONHEAP                    :    107<br/>
- * 1000000    FlushTime    DIRECT_MEMORY             :    61356<br/>
- * 1000000    FlushTime    FILE_RAF                  :    63025<br/>
- * 1000000    FlushTime    MEMORY                    :    61380<br/>
- * 1000000    FlushTime    ONHEAP                    :    47<br/>
- * 1000000    InsertTime    DIRECT_MEMORY            :    43637<br/>
- * 1000000    InsertTime    FILE_RAF                 :    464481<br/>
- * 1000000    InsertTime    MEMORY                   :    44367<br/>
- * 1000000    InsertTime    ONHEAP                   :    2040<br/>
- * </p>
- * @see StreamSortedWindowOnHeap
- * @see org.mapdb.DBMaker
- */
-public class StreamWindowRepository {
-    public enum StorageType {
-        /**
-         * Creates new in-memory database which stores all data on heap without serialization.
-         * This mode should be very fast, but data will affect Garbage PartitionedEventCollector the same way as traditional Java Collections.
-         */
-        ONHEAP,
-
-        /**
-         * Creates new in-memory database. Changes are lost after JVM exits.
-         * This option serializes data into {@code byte[]},
-         * so they are not affected by Garbage PartitionedEventCollector.
-         */
-        MEMORY,
-
-        /**
-         * <p>
-         * Creates new in-memory database. Changes are lost after JVM exits.
-         * </p><p>
-         * This will use {@code DirectByteBuffer} outside of HEAP, so Garbage Collector is not affected
-         * You should increase ammount of direct memory with
-         * {@code -XX:MaxDirectMemorySize=10G} JVM param
-         * </p>
-         */
-        DIRECT_MEMORY,
-
-        /**
-         * By default use File.createTempFile("streamwindows","temp")
-         */
-        FILE_RAF
-    }
-
-    private static final Logger LOG = LoggerFactory.getLogger(StreamWindowRepository.class);
-    private final Map<StorageType, DB> dbPool;
-
-    private StreamWindowRepository() {
-        dbPool = new HashMap<>();
-    }
-
-    private static StreamWindowRepository repository;
-
-    /**
-     * Close automatically when JVM exists.
-     *
-     * @return StreamWindowRepository singletonInstance
-     */
-    public static StreamWindowRepository getSingletonInstance() {
-        synchronized (StreamWindowRepository.class) {
-            if (repository == null) {
-                repository = new StreamWindowRepository();
-                Runtime.getRuntime().addShutdownHook(new Thread() {
-                    @Override
-                    public void run() {
-                        repository.close();
-                    }
-                });
-            }
-            return repository;
-        }
-    }
-
-    private DB createMapDB(StorageType storageType) {
-        synchronized (dbPool) {
-            if (!dbPool.containsKey(storageType)) {
-                DB db;
-                switch (storageType) {
-                    case ONHEAP:
-                        db = DBMaker.heapDB().closeOnJvmShutdown().make();
-                        LOG.info("Create ONHEAP mapdb");
-                        break;
-                    case MEMORY:
-                        db = DBMaker.memoryDB().closeOnJvmShutdown().make();
-                        LOG.info("Create MEMORY mapdb");
-                        break;
-                    case DIRECT_MEMORY:
-                        db = DBMaker.memoryDirectDB().closeOnJvmShutdown().make();
-                        LOG.info("Create DIRECT_MEMORY mapdb");
-                        break;
-                    case FILE_RAF:
-                        try {
-                            File file = File.createTempFile("window-", ".map");
-                            file.delete();
-                            file.deleteOnExit();
-                            Preconditions.checkNotNull(file, "file is null");
-                            db = DBMaker.fileDB(file).deleteFilesAfterClose().make();
-                            LOG.info("Created FILE_RAF map file at {}", file.getAbsolutePath());
-                        } catch (IOException e) {
-                            throw new IllegalStateException(e);
-                        }
-                        break;
-                    default:
-                        throw new IllegalArgumentException("Illegal storage type: " + storageType);
-                }
-                dbPool.put(storageType, db);
-                return db;
-            }
-            return dbPool.get(storageType);
-        }
-    }
-
-    public StreamWindow createWindow(long start, long end, long margin, StorageType type) {
-        StreamWindow ret;
-        switch (type) {
-            case ONHEAP:
-                ret = new StreamSortedWindowOnHeap(start, end, margin);
-                break;
-            default:
-                ret = new StreamSortedWindowInMapDB(
-                    start, end, margin,
-                    createMapDB(type),
-                    UUID.randomUUID().toString()
-                );
-                break;
-        }
-
-        if (LOG.isDebugEnabled()) {
-            LOG.debug("Created new {}, type: {}", ret, type);
-        }
-        return ret;
-    }
-
-    public StreamWindow createWindow(long start, long end, long margin, StreamWindowStrategy strategy) {
-        return strategy.createWindow(start, end, margin, this);
-    }
-
-    public StreamWindow createWindow(long start, long end, long margin) {
-        return OnHeapStrategy.INSTANCE.createWindow(start, end, margin, this);
-    }
-
-    public void close() {
-        for (Map.Entry<StorageType, DB> entry : dbPool.entrySet()) {
-            entry.getValue().close();
-        }
-        dbPool.clear();
-    }
-
-    public interface StreamWindowStrategy {
-        StreamWindow createWindow(long start, long end, long margin, StreamWindowRepository repository);
-    }
-
-    public static class OnHeapStrategy implements StreamWindowStrategy {
-        public static final OnHeapStrategy INSTANCE = new OnHeapStrategy();
-
-        @Override
-        public StreamWindow createWindow(long start, long end, long margin, StreamWindowRepository repository) {
-            return repository.createWindow(start, end, margin, StorageType.ONHEAP);
-        }
-    }
-
-    public static class WindowSizeStrategy implements StreamWindowStrategy {
-        private static final long ONE_HOUR = 3600 * 1000;
-        private static final long FIVE_HOURS = 5 * 3600 * 1000;
-        private final long onheapWindowSizeLimit;
-        private final long offheapWindowSizeLimit;
-
-        public static WindowSizeStrategy INSTANCE = new WindowSizeStrategy(ONE_HOUR, FIVE_HOURS);
-
-        public WindowSizeStrategy(long onheapWindowSizeLimit, long offheapWindowSizeLimit) {
-            this.offheapWindowSizeLimit = offheapWindowSizeLimit;
-            this.onheapWindowSizeLimit = onheapWindowSizeLimit;
-
-            if (this.offheapWindowSizeLimit < this.onheapWindowSizeLimit) {
-                throw new IllegalStateException("offheapWindowSizeLimit " + this.offheapWindowSizeLimit + " < onheapWindowSizeLimit " + this.onheapWindowSizeLimit);
-            }
-        }
-
-        @Override
-        public StreamWindow createWindow(long start, long end, long margin, StreamWindowRepository repository) {
-            long windowLength = end - start;
-            if (windowLength <= onheapWindowSizeLimit) {
-                return repository.createWindow(start, end, margin, StreamWindowRepository.StorageType.ONHEAP);
-            } else if (windowLength > onheapWindowSizeLimit & windowLength <= offheapWindowSizeLimit) {
-                return repository.createWindow(start, end, margin, StreamWindowRepository.StorageType.DIRECT_MEMORY);
-            } else {
-                return repository.createWindow(start, end, margin, StreamWindowRepository.StorageType.FILE_RAF);
-            }
-        }
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/sorter/impl/CachedEventGroupSerializer.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/sorter/impl/CachedEventGroupSerializer.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/sorter/impl/CachedEventGroupSerializer.java
deleted file mode 100644
index 73adee6..0000000
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/sorter/impl/CachedEventGroupSerializer.java
+++ /dev/null
@@ -1,115 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.engine.sorter.impl;
-
-import org.apache.eagle.alert.engine.coordinator.StreamPartition;
-import org.apache.eagle.alert.engine.model.PartitionedEvent;
-import org.apache.eagle.alert.engine.model.StreamEvent;
-import org.apache.eagle.alert.engine.utils.SerializableUtils;
-import org.apache.commons.lang.builder.HashCodeBuilder;
-import org.jetbrains.annotations.NotNull;
-import org.mapdb.DataInput2;
-import org.mapdb.DataOutput2;
-import org.mapdb.serializer.GroupSerializerObjectArray;
-
-import java.io.IOException;
-import java.util.HashMap;
-import java.util.Map;
-
-/**
- * @deprecated performance is worse, should investigate.
- */
-public class CachedEventGroupSerializer extends GroupSerializerObjectArray<PartitionedEvent[]> {
-    private Map<Integer, StreamPartition> hashCodePartitionDict = new HashMap<>();
-
-    private void writePartitionedEvent(DataOutput2 out, PartitionedEvent event) throws IOException {
-        out.packLong(event.getPartitionKey());
-        int partitionHashCode = 0;
-        if (event.getPartition() != null) {
-            partitionHashCode = event.getPartition().hashCode();
-            if (!hashCodePartitionDict.containsKey(partitionHashCode)) {
-                hashCodePartitionDict.put(partitionHashCode, event.getPartition());
-            }
-        }
-        out.packInt(partitionHashCode);
-        if (event.getEvent() != null) {
-            byte[] eventBytes = SerializableUtils.serializeToCompressedByteArray(event.getEvent());
-            out.packInt(eventBytes.length);
-            out.write(eventBytes);
-        } else {
-            out.packInt(0);
-        }
-    }
-
-    private PartitionedEvent readPartitionedEvent(DataInput2 in) throws IOException {
-        PartitionedEvent event = new PartitionedEvent();
-        event.setPartitionKey(in.unpackLong());
-        int partitionHashCode = in.unpackInt();
-        if (partitionHashCode != 0 && hashCodePartitionDict.containsKey(partitionHashCode)) {
-            event.setPartition(hashCodePartitionDict.get(partitionHashCode));
-        }
-        int eventBytesLen = in.unpackInt();
-        if (eventBytesLen > 0) {
-            byte[] eventBytes = new byte[eventBytesLen];
-            in.readFully(eventBytes);
-            event.setEvent((StreamEvent) SerializableUtils.deserializeFromCompressedByteArray(eventBytes, "Deserialize event from bytes"));
-        }
-        return event;
-    }
-
-    @Override
-    public void serialize(DataOutput2 out, PartitionedEvent[] value) throws IOException {
-        out.packInt(value.length);
-        for (PartitionedEvent event : value) {
-            writePartitionedEvent(out, event);
-        }
-    }
-
-    @Override
-    public PartitionedEvent[] deserialize(DataInput2 in, int available) throws IOException {
-        final int size = in.unpackInt();
-        PartitionedEvent[] ret = new PartitionedEvent[size];
-        for (int i = 0; i < size; i++) {
-            ret[i] = readPartitionedEvent(in);
-        }
-        return ret;
-    }
-
-    @Override
-    public boolean isTrusted() {
-        return true;
-    }
-
-    @Override
-    public boolean equals(PartitionedEvent[] a1, PartitionedEvent[] a2) {
-        return a1[0].getTimestamp() == a2[0].getTimestamp();
-    }
-
-    @Override
-    public int hashCode(@NotNull PartitionedEvent[] events, int seed) {
-        return new HashCodeBuilder().append(events).toHashCode();
-    }
-
-    @Override
-    public int compare(PartitionedEvent[] o1, PartitionedEvent[] o2) {
-        if (o1.length > 0 && o2.length > 0) {
-            return (int) (o1[0].getTimestamp() - o2[0].getTimestamp());
-        } else {
-            return 0;
-        }
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/sorter/impl/PartitionedEventGroupSerializer.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/sorter/impl/PartitionedEventGroupSerializer.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/sorter/impl/PartitionedEventGroupSerializer.java
deleted file mode 100644
index 55efcaf..0000000
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/sorter/impl/PartitionedEventGroupSerializer.java
+++ /dev/null
@@ -1,113 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.engine.sorter.impl;
-
-import org.apache.eagle.alert.engine.model.PartitionedEvent;
-import org.apache.eagle.alert.engine.utils.SerializableUtils;
-import org.jetbrains.annotations.NotNull;
-import org.mapdb.DataInput2;
-import org.mapdb.DataOutput2;
-import org.mapdb.Serializer;
-import org.mapdb.serializer.GroupSerializer;
-
-import java.io.IOException;
-import java.util.Comparator;
-
-
-public class PartitionedEventGroupSerializer implements GroupSerializer<PartitionedEvent[]> {
-    private static final GroupSerializer<byte[]> delegate = Serializer.BYTE_ARRAY;
-
-    @Override
-    public int valueArraySearch(Object keys, PartitionedEvent[] key) {
-        return delegate.valueArraySearch(keys, serialize(key));
-    }
-
-    @SuppressWarnings("rawtypes")
-    @Override
-    public int valueArraySearch(Object keys, PartitionedEvent[] key, Comparator comparator) {
-        return delegate.valueArraySearch(keys, serialize(key), comparator);
-    }
-
-    @Override
-    public void valueArraySerialize(DataOutput2 out, Object vals) throws IOException {
-        delegate.valueArraySerialize(out, vals);
-    }
-
-    @Override
-    public Object valueArrayDeserialize(DataInput2 in, int size) throws IOException {
-        return delegate.valueArrayDeserialize(in, size);
-    }
-
-    @Override
-    public PartitionedEvent[] valueArrayGet(Object vals, int pos) {
-        return deserialize(delegate.valueArrayGet(vals, pos));
-    }
-
-    @Override
-    public int valueArraySize(Object vals) {
-        return delegate.valueArraySize(vals);
-    }
-
-    @Override
-    public Object valueArrayEmpty() {
-        return delegate.valueArrayEmpty();
-    }
-
-    @Override
-    public Object valueArrayPut(Object vals, int pos, PartitionedEvent[] newValue) {
-        return delegate.valueArrayPut(vals, pos, serialize(newValue));
-    }
-
-    @Override
-    public Object valueArrayUpdateVal(Object vals, int pos, PartitionedEvent[] newValue) {
-        return delegate.valueArrayUpdateVal(vals, pos, serialize(newValue));
-    }
-
-    @Override
-    public Object valueArrayFromArray(Object[] objects) {
-        return delegate.valueArrayFromArray(objects);
-    }
-
-    @Override
-    public Object valueArrayCopyOfRange(Object vals, int from, int to) {
-        return delegate.valueArrayCopyOfRange(vals, from, to);
-    }
-
-    @Override
-    public Object valueArrayDeleteValue(Object vals, int pos) {
-        return delegate.valueArrayDeleteValue(vals, pos);
-    }
-
-    @Override
-    public void serialize(@NotNull DataOutput2 out, @NotNull PartitionedEvent[] value) throws IOException {
-        delegate.serialize(out, serialize(value));
-    }
-
-    private static byte[] serialize(PartitionedEvent[] events) {
-        return SerializableUtils.serializeToCompressedByteArray(events);
-    }
-
-    @Override
-    public PartitionedEvent[] deserialize(@NotNull DataInput2 input, int available) throws IOException {
-        return deserialize(delegate.deserialize(input, available));
-    }
-
-    private static PartitionedEvent[] deserialize(byte[] bytes) {
-        return (PartitionedEvent[]) SerializableUtils.deserializeFromCompressedByteArray(bytes, "deserialize as stream event");
-    }
-
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/sorter/impl/PartitionedEventTimeOrderingComparator.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/sorter/impl/PartitionedEventTimeOrderingComparator.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/sorter/impl/PartitionedEventTimeOrderingComparator.java
deleted file mode 100644
index 5378c67..0000000
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/sorter/impl/PartitionedEventTimeOrderingComparator.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.engine.sorter.impl;
-
-import org.apache.eagle.alert.engine.model.PartitionedEvent;
-
-import java.util.Comparator;
-import java.util.Objects;
-
-/**
- * TODO: Stable sorting algorithm for better performance to avoid event resorting with same timestamp?.
- */
-public class PartitionedEventTimeOrderingComparator implements Comparator<PartitionedEvent> {
-    public static final PartitionedEventTimeOrderingComparator INSTANCE = new PartitionedEventTimeOrderingComparator();
-
-    @Override
-    public int compare(PartitionedEvent o1, PartitionedEvent o2) {
-        if (Objects.equals(o1, o2)) {
-            return 0;
-        } else {
-            if (o1 == null && o2 == null) {
-                return 0;
-            } else if (o1 != null && o2 == null) {
-                return 1;
-            } else if (o1 == null) {
-                return -1;
-            }
-            // Unstable Sorting Algorithm
-            if (o1.getTimestamp() <= o2.getTimestamp()) {
-                return -1;
-            } else {
-                return 1;
-            }
-        }
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/sorter/impl/StreamSortWindowHandlerImpl.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/sorter/impl/StreamSortWindowHandlerImpl.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/sorter/impl/StreamSortWindowHandlerImpl.java
deleted file mode 100644
index fb5ba72..0000000
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/sorter/impl/StreamSortWindowHandlerImpl.java
+++ /dev/null
@@ -1,114 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.engine.sorter.impl;
-
-import org.apache.eagle.alert.engine.PartitionedEventCollector;
-import org.apache.eagle.alert.engine.coordinator.StreamSortSpec;
-import org.apache.eagle.alert.engine.model.PartitionedEvent;
-import org.apache.eagle.alert.engine.router.StreamSortHandler;
-import org.apache.eagle.alert.engine.sorter.StreamTimeClock;
-import org.apache.eagle.alert.engine.sorter.StreamWindow;
-import org.apache.eagle.alert.engine.sorter.StreamWindowManager;
-import org.apache.eagle.common.DateTimeUtil;
-
-import org.joda.time.Period;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-
-public class StreamSortWindowHandlerImpl implements StreamSortHandler {
-    private static final Logger LOG = LoggerFactory.getLogger(StreamSortWindowHandlerImpl.class);
-    private StreamWindowManager windowManager;
-    private StreamSortSpec streamSortSpecSpec;
-    private PartitionedEventCollector outputCollector;
-    private String streamId;
-
-    public void prepare(String streamId, StreamSortSpec streamSortSpecSpec, PartitionedEventCollector outputCollector) {
-        this.windowManager = new StreamWindowManagerImpl(
-            Period.parse(streamSortSpecSpec.getWindowPeriod()),
-            streamSortSpecSpec.getWindowMargin(),
-            PartitionedEventTimeOrderingComparator.INSTANCE,
-            outputCollector);
-        this.streamSortSpecSpec = streamSortSpecSpec;
-        this.streamId = streamId;
-        this.outputCollector = outputCollector;
-    }
-
-    /**
-     * Entry point to manage window lifecycle.
-     *
-     * @param event StreamEvent
-     */
-    public void nextEvent(PartitionedEvent event) {
-        final long eventTime = event.getEvent().getTimestamp();
-        boolean handled = false;
-
-        synchronized (this.windowManager) {
-            for (StreamWindow window : this.windowManager.getWindows()) {
-                if (window.alive() && window.add(event)) {
-                    handled = true;
-                }
-            }
-
-            // No window found for the event but not too late being rejected
-            if (!handled && !windowManager.reject(eventTime)) {
-                // later then all events, create later window
-                StreamWindow window = windowManager.addNewWindow(eventTime);
-                if (window.add(event)) {
-                    LOG.info("Created {} of {} at {}", window, this.streamId, DateTimeUtil.millisecondsToHumanDateWithMilliseconds(eventTime));
-                    handled = true;
-                }
-            }
-        }
-
-        if (!handled) {
-            if (LOG.isDebugEnabled()) {
-                LOG.debug("Drop expired event {}", event);
-            }
-            outputCollector.drop(event);
-        }
-    }
-
-    @Override
-    public void onTick(StreamTimeClock clock, long globalSystemTime) {
-        windowManager.onTick(clock, globalSystemTime);
-    }
-
-    @Override
-    public void close() {
-        try {
-            windowManager.close();
-        } catch (IOException e) {
-            LOG.error("Got exception while closing window manager", e);
-        }
-    }
-
-    @Override
-    public String toString() {
-        return super.toString();
-    }
-
-    @Override
-    public int hashCode() {
-        if (streamSortSpecSpec == null) {
-            throw new NullPointerException("streamSortSpec is null");
-        } else {
-            return streamSortSpecSpec.hashCode();
-        }
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/sorter/impl/StreamSortedWindowInMapDB.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/sorter/impl/StreamSortedWindowInMapDB.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/sorter/impl/StreamSortedWindowInMapDB.java
deleted file mode 100644
index 73a63b4..0000000
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/sorter/impl/StreamSortedWindowInMapDB.java
+++ /dev/null
@@ -1,139 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.engine.sorter.impl;
-
-import org.apache.eagle.alert.engine.PartitionedEventCollector;
-import org.apache.eagle.alert.engine.model.PartitionedEvent;
-import org.apache.eagle.alert.engine.model.StreamEvent;
-import org.apache.eagle.alert.engine.sorter.BaseStreamWindow;
-import org.apache.commons.lang3.time.StopWatch;
-import org.mapdb.BTreeMap;
-import org.mapdb.DB;
-import org.mapdb.Serializer;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.Arrays;
-import java.util.concurrent.atomic.AtomicInteger;
-
-/**
- * StreamSortedWindow based on MapDB to support off-heap or disk storage.
- * Stable sorting algorithm
- * See <a href="http://www.mapdb.org">http://www.mapdb.org</a>
- */
-public class StreamSortedWindowInMapDB extends BaseStreamWindow {
-    private final String mapId;
-    private BTreeMap<Long, PartitionedEvent[]> btreeMap;
-    private static final Logger LOG = LoggerFactory.getLogger(StreamSortedWindowInMapDB.class);
-    private final AtomicInteger size;
-    private long replaceOpCount = 0;
-    private static final PartitionedEventGroupSerializer STREAM_EVENT_GROUP_SERIALIZER = new PartitionedEventGroupSerializer();
-
-    /**
-     * @param mapId  physical map id, used to decide whether to reuse or not.
-     */
-    @SuppressWarnings("unused")
-    public StreamSortedWindowInMapDB(long start, long end, long margin, DB db, String mapId) {
-        super(start, end, margin);
-        this.mapId = mapId;
-        try {
-            btreeMap = db.<Long, StreamEvent>treeMap(mapId)
-                .keySerializer(Serializer.LONG)
-                .valueSerializer(STREAM_EVENT_GROUP_SERIALIZER)
-                .createOrOpen();
-            LOG.debug("Created BTree map {}", mapId);
-        } catch (Error error) {
-            LOG.info("Failed create BTree {}", mapId, error);
-        }
-        size = new AtomicInteger(0);
-    }
-
-    /**
-     * Assumed: most of adding operation will do putting only and few require replacing.
-     * <ol>
-     * <li>
-     * First of all, always try to put with created event directly
-     * </li>
-     * <li>
-     * If not absent (key already exists), then append and replace,
-     * replace operation will cause more consumption
-     * </li>
-     * </ol>
-     *
-     * @param event coming-in event
-     * @return whether success
-     */
-    @Override
-    public synchronized boolean add(PartitionedEvent event) {
-        long timestamp = event.getEvent().getTimestamp();
-        if (accept(timestamp)) {
-            boolean absent = btreeMap.putIfAbsentBoolean(timestamp, new PartitionedEvent[] {event});
-            if (!absent) {
-                size.incrementAndGet();
-                return true;
-            } else {
-                if (LOG.isDebugEnabled()) {
-                    LOG.debug("Duplicated timestamp {}, will reduce performance as replacing", timestamp);
-                }
-                PartitionedEvent[] oldValue = btreeMap.get(timestamp);
-                PartitionedEvent[] newValue = oldValue == null ? new PartitionedEvent[1] : Arrays.copyOf(oldValue, oldValue.length + 1);
-                newValue[newValue.length - 1] = event;
-                PartitionedEvent[] removedValue = btreeMap.replace(timestamp, newValue);
-                replaceOpCount++;
-                if (replaceOpCount % 1000 == 0) {
-                    LOG.warn("Too many events ({}) with overlap timestamp, may reduce insertion performance", replaceOpCount);
-                }
-                if (removedValue != null) {
-                    size.incrementAndGet();
-                } else {
-                    throw new IllegalStateException("Failed to replace key " + timestamp + " with " + newValue.length + " entities array to replace old " + oldValue.length + " entities array");
-                }
-                return true;
-            }
-        } else {
-            return false;
-        }
-    }
-
-    @Override
-    protected synchronized void flush(PartitionedEventCollector collector) {
-        StopWatch stopWatch = new StopWatch();
-        stopWatch.start();
-        btreeMap.valueIterator().forEachRemaining((events) -> {
-            for (PartitionedEvent event : events) {
-                collector.emit(event);
-            }
-        });
-        btreeMap.clear();
-        replaceOpCount = 0;
-        stopWatch.stop();
-        LOG.info("Flushed {} events in {} ms", size, stopWatch.getTime());
-        size.set(0);
-    }
-
-    @Override
-    public synchronized void close() {
-        super.close();
-        btreeMap.close();
-        LOG.info("Closed {}", this.mapId);
-    }
-
-    @Override
-    public synchronized int size() {
-        return size.get();
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/sorter/impl/StreamSortedWindowOnHeap.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/sorter/impl/StreamSortedWindowOnHeap.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/sorter/impl/StreamSortedWindowOnHeap.java
deleted file mode 100644
index ed000f1..0000000
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/sorter/impl/StreamSortedWindowOnHeap.java
+++ /dev/null
@@ -1,82 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.engine.sorter.impl;
-
-import org.apache.eagle.alert.engine.PartitionedEventCollector;
-import org.apache.eagle.alert.engine.model.PartitionedEvent;
-import org.apache.eagle.alert.engine.sorter.BaseStreamWindow;
-
-import com.google.common.collect.TreeMultiset;
-import org.apache.commons.lang3.time.StopWatch;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.Comparator;
-
-public class StreamSortedWindowOnHeap extends BaseStreamWindow {
-    private static final Logger LOG = LoggerFactory.getLogger(StreamSortedWindowOnHeap.class);
-    private final TreeMultiset<PartitionedEvent> treeMultisetCache;
-
-    /**
-     * @param start  start time.
-     * @param end    end time.
-     * @param margin margin time.
-     */
-    public StreamSortedWindowOnHeap(long start, long end, long margin, Comparator<PartitionedEvent> comparator) {
-        super(start, end, margin);
-        treeMultisetCache = TreeMultiset.create(comparator);
-    }
-
-    public StreamSortedWindowOnHeap(long start, long end, long margin) {
-        this(start, end, margin, new PartitionedEventTimeOrderingComparator());
-    }
-
-    @Override
-    public boolean add(PartitionedEvent partitionedEvent) {
-        synchronized (treeMultisetCache) {
-            if (accept(partitionedEvent.getEvent().getTimestamp())) {
-                treeMultisetCache.add(partitionedEvent);
-                return true;
-            } else {
-                if (LOG.isDebugEnabled()) {
-                    LOG.debug("{} is not acceptable, ignored", partitionedEvent);
-                }
-                return false;
-            }
-        }
-    }
-
-    @Override
-    protected void flush(PartitionedEventCollector collector) {
-        synchronized (treeMultisetCache) {
-            StopWatch stopWatch = new StopWatch();
-            stopWatch.start();
-            treeMultisetCache.forEach(collector::emit);
-            int size = treeMultisetCache.size();
-            treeMultisetCache.clear();
-            stopWatch.stop();
-            LOG.info("Flushed {} events in {} ms from {}", size, stopWatch.getTime(), this.toString());
-        }
-    }
-
-    @Override
-    public int size() {
-        synchronized (treeMultisetCache) {
-            return treeMultisetCache.size();
-        }
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/sorter/impl/StreamTimeClockInLocalMemory.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/sorter/impl/StreamTimeClockInLocalMemory.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/sorter/impl/StreamTimeClockInLocalMemory.java
deleted file mode 100644
index e5be786..0000000
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/sorter/impl/StreamTimeClockInLocalMemory.java
+++ /dev/null
@@ -1,64 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.engine.sorter.impl;
-
-import org.apache.eagle.alert.engine.sorter.StreamTimeClock;
-import org.apache.eagle.common.DateTimeUtil;
-
-import java.util.concurrent.atomic.AtomicLong;
-
-
-/**
- * In memory thread-safe time clock service.
- * TODO: maybe need to synchronize time clock globally, how to?
- */
-public class StreamTimeClockInLocalMemory implements StreamTimeClock {
-    private final AtomicLong currentTime;
-    private final String streamId;
-
-    public StreamTimeClockInLocalMemory(String streamId, long initialTime) {
-        this.streamId = streamId;
-        this.currentTime = new AtomicLong(initialTime);
-    }
-
-    public StreamTimeClockInLocalMemory(String streamId) {
-        this(streamId, 0L);
-    }
-
-    @Override
-    public void moveForward(long timestamp) {
-        if (timestamp < currentTime.get()) {
-            throw new IllegalArgumentException(timestamp + " < " + currentTime.get() + ", should not move time back");
-        }
-        this.currentTime.set(timestamp);
-    }
-
-    @Override
-    public String getStreamId() {
-        return streamId;
-    }
-
-    @Override
-    public long getTime() {
-        return currentTime.get();
-    }
-
-    @Override
-    public String toString() {
-        return String.format("StreamClock[streamId=%s, now=%s]", streamId, DateTimeUtil.millisecondsToHumanDateWithMilliseconds(currentTime.get()));
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/sorter/impl/StreamTimeClockManagerImpl.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/sorter/impl/StreamTimeClockManagerImpl.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/sorter/impl/StreamTimeClockManagerImpl.java
deleted file mode 100644
index b59918d..0000000
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/sorter/impl/StreamTimeClockManagerImpl.java
+++ /dev/null
@@ -1,169 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.engine.sorter.impl;
-
-import org.apache.eagle.alert.engine.sorter.StreamTimeClock;
-import org.apache.eagle.alert.engine.sorter.StreamTimeClockListener;
-import org.apache.eagle.alert.engine.sorter.StreamTimeClockManager;
-import org.apache.eagle.common.DateTimeUtil;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Timer;
-import java.util.TimerTask;
-import java.util.concurrent.atomic.AtomicInteger;
-
-public final class StreamTimeClockManagerImpl implements StreamTimeClockManager {
-    private static final long serialVersionUID = -2770823821511195343L;
-    private static final Logger LOG = LoggerFactory.getLogger(StreamTimeClockManagerImpl.class);
-    private final Map<String, StreamTimeClock> streamIdTimeClockMap;
-    private Timer timer;
-
-    private final Map<StreamTimeClockListener, String> listenerStreamIdMap;
-    private static final AtomicInteger num = new AtomicInteger();
-
-    public StreamTimeClockManagerImpl() {
-        listenerStreamIdMap = new HashMap<>();
-        streamIdTimeClockMap = new HashMap<>();
-        timer = new Timer("StreamScheduler-" + num.getAndIncrement());
-        timer.schedule(new TimerTask() {
-            @Override
-            public void run() {
-                // Make sure the timer tick happens one by one
-                triggerTickOnAll();
-            }
-        }, 1000, 1000);
-    }
-
-    /**
-     * By default, we could keep the current time clock in memory,
-     * Eventually we may need to consider the global time synchronization across all nodes
-     * 1) When to initialize window according to start time
-     * 2) When to close expired window according to current time
-     *
-     * @return StreamTimeClock instance.
-     */
-    @Override
-    public StreamTimeClock createStreamTimeClock(String streamId) {
-        synchronized (streamIdTimeClockMap) {
-            if (!streamIdTimeClockMap.containsKey(streamId)) {
-                StreamTimeClock instance = new StreamTimeClockInLocalMemory(streamId);
-                LOG.info("Created {}", instance);
-                streamIdTimeClockMap.put(streamId, instance);
-            } else {
-                LOG.warn("TimeClock for stream already existss: " + streamIdTimeClockMap.get(streamId));
-            }
-            return streamIdTimeClockMap.get(streamId);
-        }
-    }
-
-    @Override
-    public StreamTimeClock getStreamTimeClock(String streamId) {
-        synchronized (streamIdTimeClockMap) {
-            if (!streamIdTimeClockMap.containsKey(streamId)) {
-                LOG.warn("TimeClock for stream {} is not initialized before being called, create now", streamId);
-                return createStreamTimeClock(streamId);
-            }
-            return streamIdTimeClockMap.get(streamId);
-        }
-    }
-
-    @Override
-    public void removeStreamTimeClock(String streamId) {
-        synchronized (streamIdTimeClockMap) {
-            if (streamIdTimeClockMap.containsKey(streamId)) {
-                streamIdTimeClockMap.remove(streamId);
-                LOG.info("Removed TimeClock for stream {}: {}", streamId, streamIdTimeClockMap.get(streamId));
-            } else {
-                LOG.warn("No TimeClock found for stream {}, nothing to remove", streamId);
-            }
-        }
-    }
-
-    @Override
-    public void registerListener(String streamId, StreamTimeClockListener listener) {
-        synchronized (listenerStreamIdMap) {
-            if (listenerStreamIdMap.containsKey(listener)) {
-                throw new IllegalArgumentException("Duplicated listener: " + listener.toString());
-            }
-            LOG.info("Register {} on {}", listener, streamId);
-            listenerStreamIdMap.put(listener, streamId);
-        }
-    }
-
-    @Override
-    public void registerListener(StreamTimeClock streamClock, StreamTimeClockListener listener) {
-        registerListener(streamClock.getStreamId(), listener);
-    }
-
-    @Override
-    public void removeListener(StreamTimeClockListener listener) {
-        listenerStreamIdMap.remove(listener);
-    }
-
-    @Override
-    public synchronized void triggerTickOn(String streamId) {
-        int count = 0;
-        for (Map.Entry<StreamTimeClockListener, String> entry : listenerStreamIdMap.entrySet()) {
-            if (entry.getValue().equals(streamId)) {
-                entry.getKey().onTick(streamIdTimeClockMap.get(streamId), getCurrentSystemTime());
-                count++;
-            }
-        }
-        if (LOG.isDebugEnabled()) {
-            LOG.debug("Triggered {} time-clock listeners on stream {}", count, streamId);
-        }
-    }
-
-    private static long getCurrentSystemTime() {
-        return System.currentTimeMillis();
-    }
-
-    @Override
-    public void onTimeUpdate(String streamId, long timestamp) {
-        StreamTimeClock timeClock = getStreamTimeClock(streamId);
-        if (timeClock == null) {
-            return;
-        }
-        // Trigger time clock only when time moves forward
-        if (timestamp >= timeClock.getTime()) {
-            timeClock.moveForward(timestamp);
-            if (LOG.isDebugEnabled()) {
-                LOG.debug("Tick on stream {} with latest time {}", streamId, DateTimeUtil.millisecondsToHumanDateWithMilliseconds(timeClock.getTime()));
-            }
-            triggerTickOn(streamId);
-        }
-    }
-
-    private void triggerTickOnAll() {
-        synchronized (listenerStreamIdMap) {
-            for (Map.Entry<StreamTimeClockListener, String> entry : listenerStreamIdMap.entrySet()) {
-                triggerTickOn(entry.getValue());
-            }
-        }
-    }
-
-    @Override
-    public void close() {
-        timer.cancel();
-        triggerTickOnAll();
-        LOG.info("Closed StreamTimeClockManager {}", this);
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/sorter/impl/StreamWindowManagerImpl.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/sorter/impl/StreamWindowManagerImpl.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/sorter/impl/StreamWindowManagerImpl.java
deleted file mode 100644
index 4c5154b..0000000
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/sorter/impl/StreamWindowManagerImpl.java
+++ /dev/null
@@ -1,176 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.engine.sorter.impl;
-
-import org.apache.eagle.alert.engine.PartitionedEventCollector;
-import org.apache.eagle.alert.engine.model.PartitionedEvent;
-import org.apache.eagle.alert.engine.sorter.StreamTimeClock;
-import org.apache.eagle.alert.engine.sorter.StreamWindow;
-import org.apache.eagle.alert.engine.sorter.StreamWindowManager;
-import org.apache.eagle.alert.engine.sorter.StreamWindowRepository;
-import org.apache.eagle.alert.utils.TimePeriodUtils;
-import org.apache.eagle.common.DateTimeUtil;
-
-import org.apache.commons.lang3.time.StopWatch;
-import org.joda.time.Period;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.*;
-
-public class StreamWindowManagerImpl implements StreamWindowManager {
-    private static final Logger LOG = LoggerFactory.getLogger(StreamWindowManagerImpl.class);
-    private final TreeMap<Long, StreamWindow> windowBuckets;
-    private final PartitionedEventCollector collector;
-    private final Period windowPeriod;
-    private final long windowMargin;
-    @SuppressWarnings("unused")
-    private final Comparator<PartitionedEvent> comparator;
-    private long rejectTime;
-
-    public StreamWindowManagerImpl(Period windowPeriod, long windowMargin, Comparator<PartitionedEvent> comparator, PartitionedEventCollector collector) {
-        this.windowBuckets = new TreeMap<>();
-        this.windowPeriod = windowPeriod;
-        this.windowMargin = windowMargin;
-        this.collector = collector;
-        this.comparator = comparator;
-    }
-
-    @Override
-    public StreamWindow addNewWindow(long initialTime) {
-        synchronized (windowBuckets) {
-            if (!reject(initialTime)) {
-                Long windowStartTime = TimePeriodUtils.formatMillisecondsByPeriod(initialTime, windowPeriod);
-                Long windowEndTime = windowStartTime + TimePeriodUtils.getMillisecondsOfPeriod(windowPeriod);
-                StreamWindow window = StreamWindowRepository.getSingletonInstance().createWindow(windowStartTime, windowEndTime, windowMargin);
-                window.register(collector);
-                addWindow(window);
-                return window;
-            } else {
-                throw new IllegalStateException("Failed to create new window, as "
-                    + DateTimeUtil.millisecondsToHumanDateWithMilliseconds(initialTime) + " is too late, only allow timestamp after "
-                    + DateTimeUtil.millisecondsToHumanDateWithMilliseconds(rejectTime));
-            }
-        }
-    }
-
-    private void addWindow(StreamWindow window) {
-        if (!windowBuckets.containsKey(window.startTime())) {
-            windowBuckets.put(window.startTime(), window);
-        } else {
-            throw new IllegalArgumentException("Duplicated " + window.toString());
-        }
-    }
-
-    @Override
-    public void removeWindow(StreamWindow window) {
-        synchronized (windowBuckets) {
-            windowBuckets.remove(window.startTime());
-        }
-    }
-
-    @Override
-    public boolean hasWindow(StreamWindow window) {
-        synchronized (windowBuckets) {
-            return windowBuckets.containsKey(window.startTime());
-        }
-    }
-
-    @Override
-    public boolean hasWindowFor(long timestamp) {
-        return getWindowFor(timestamp) != null;
-    }
-
-    @Override
-    public Collection<StreamWindow> getWindows() {
-        synchronized (windowBuckets) {
-            return windowBuckets.values();
-        }
-    }
-
-    @Override
-    public StreamWindow getWindowFor(long timestamp) {
-        synchronized (windowBuckets) {
-            for (StreamWindow windowBucket : windowBuckets.values()) {
-                if (timestamp >= windowBucket.startTime() && timestamp < windowBucket.endTime()) {
-                    return windowBucket;
-                }
-            }
-            return null;
-        }
-    }
-
-    @Override
-    public boolean reject(long timestamp) {
-        return timestamp < rejectTime;
-    }
-
-    @Override
-    public void onTick(StreamTimeClock clock, long globalSystemTime) {
-        synchronized (windowBuckets) {
-            List<StreamWindow> toRemoved = new ArrayList<>();
-            List<StreamWindow> aliveWindow = new ArrayList<>();
-
-            for (StreamWindow windowBucket : windowBuckets.values()) {
-                windowBucket.onTick(clock, globalSystemTime);
-                if (windowBucket.rejectTime() > rejectTime) {
-                    rejectTime = windowBucket.rejectTime();
-                }
-            }
-            for (StreamWindow windowBucket : windowBuckets.values()) {
-                if (windowBucket.expired() || windowBucket.endTime() <= rejectTime) {
-                    toRemoved.add(windowBucket);
-                } else {
-                    aliveWindow.add(windowBucket);
-                }
-            }
-            toRemoved.forEach(this::closeAndRemoveWindow);
-            if (toRemoved.size() > 0) {
-                LOG.info("Windows: {} alive = {}, {} expired = {}", aliveWindow.size(), aliveWindow, toRemoved.size(), toRemoved);
-            }
-        }
-    }
-
-    private void closeAndRemoveWindow(StreamWindow windowBucket) {
-        StopWatch stopWatch = new StopWatch();
-        stopWatch.start();
-        closeWindow(windowBucket);
-        removeWindow(windowBucket);
-        stopWatch.stop();
-        LOG.info("Removed {} in {} ms", windowBucket, stopWatch.getTime());
-    }
-
-    private void closeWindow(StreamWindow windowBucket) {
-        windowBucket.close();
-    }
-
-    public void close() {
-        synchronized (windowBuckets) {
-            LOG.debug("Closing");
-            StopWatch stopWatch = new StopWatch();
-            stopWatch.start();
-            int count = 0;
-            for (StreamWindow windowBucket : getWindows()) {
-                count++;
-                closeWindow(windowBucket);
-            }
-            windowBuckets.clear();
-            stopWatch.stop();
-            LOG.info("Closed {} windows in {} ms", count, stopWatch.getTime());
-        }
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/spout/CorrelationSpout.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/spout/CorrelationSpout.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/spout/CorrelationSpout.java
deleted file mode 100644
index e9ee892..0000000
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/spout/CorrelationSpout.java
+++ /dev/null
@@ -1,411 +0,0 @@
-/*
- *
- *  * Licensed to the Apache Software Foundation (ASF) under one or more
- *  * contributor license agreements.  See the NOTICE file distributed with
- *  * this work for additional information regarding copyright ownership.
- *  * The ASF licenses this file to You under the Apache License, Version 2.0
- *  * (the "License"); you may not use this file except in compliance with
- *  * the License.  You may obtain a copy of the License at
- *  *
- *  *    http://www.apache.org/licenses/LICENSE-2.0
- *  *
- *  * Unless required by applicable law or agreed to in writing, software
- *  * distributed under the License is distributed on an "AS IS" BASIS,
- *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  * See the License for the specific language governing permissions and
- *  * limitations under the License.
- *
- */
-package org.apache.eagle.alert.engine.spout;
-
-import backtype.storm.spout.MultiScheme;
-import backtype.storm.spout.Scheme;
-import backtype.storm.spout.SchemeAsMultiScheme;
-import backtype.storm.spout.SpoutOutputCollector;
-import backtype.storm.task.TopologyContext;
-import backtype.storm.topology.OutputFieldsDeclarer;
-import backtype.storm.topology.base.BaseRichSpout;
-import backtype.storm.tuple.Fields;
-import com.typesafe.config.Config;
-import com.typesafe.config.ConfigFactory;
-import org.apache.commons.collections.CollectionUtils;
-import org.apache.eagle.alert.coordination.model.Kafka2TupleMetadata;
-import org.apache.eagle.alert.coordination.model.SpoutSpec;
-import org.apache.eagle.alert.engine.coordinator.IMetadataChangeNotifyService;
-import org.apache.eagle.alert.engine.coordinator.MetadataType;
-import org.apache.eagle.alert.engine.coordinator.StreamDefinition;
-import org.apache.eagle.alert.engine.router.SpoutSpecListener;
-import org.apache.eagle.alert.engine.serialization.PartitionedEventSerializer;
-import org.apache.eagle.alert.engine.serialization.SerializationMetadataProvider;
-import org.apache.eagle.alert.engine.serialization.Serializers;
-import org.apache.eagle.alert.utils.AlertConstants;
-import org.apache.eagle.alert.utils.StreamIdConversion;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import storm.kafka.*;
-
-import java.text.MessageFormat;
-import java.util.*;
-
-/**
- * wrap KafkaSpout to provide parallel processing of messages for multiple Kafka topics
- * <p>1. onNewConfig() is interface for outside to update new metadata. Upon new metadata, this class will calculate if there is any new topic, removed topic or
- * updated topic</p>
- */
-public class CorrelationSpout extends BaseRichSpout implements SpoutSpecListener, SerializationMetadataProvider {
-    private static final long serialVersionUID = -5280723341236671580L;
-    private static final Logger LOG = LoggerFactory.getLogger(CorrelationSpout.class);
-
-    public static final String DEFAULT_STORM_KAFKA_TRANSACTION_ZK_ROOT = "/consumers";
-    public static final String DEFAULT_STORM_KAFKA_TRANSACTION_ZK_RELATIVE_PATH = "/eagle_consumer";
-
-    // topic to KafkaSpoutWrapper
-    private volatile Map<String, KafkaSpoutWrapper> kafkaSpoutList = new HashMap<>();
-    private int numOfRouterBolts;
-
-    private SpoutSpec cachedSpoutSpec;
-
-    private transient KafkaSpoutMetric kafkaSpoutMetric;
-
-    @SuppressWarnings("rawtypes")
-    private Map conf;
-    private TopologyContext context;
-    private SpoutOutputCollector collector;
-    private final Config config;
-    private String topologyId;
-    private String spoutName;
-    private String routeBoltName;
-    @SuppressWarnings("unused")
-    private int taskIndex;
-    private IMetadataChangeNotifyService changeNotifyService;
-    private PartitionedEventSerializer serializer;
-    private volatile Map<String, StreamDefinition> sds;
-
-    /**
-     * FIXME one single changeNotifyService may have issues as possibly multiple spout tasks will register themselves and initialize service.
-     *
-     * @param config
-     * @param topologyId
-     * @param changeNotifyService
-     * @param numOfRouterBolts
-     */
-    public CorrelationSpout(Config config, String topologyId, IMetadataChangeNotifyService changeNotifyService, int numOfRouterBolts) {
-        this(config, topologyId, changeNotifyService, numOfRouterBolts, AlertConstants.DEFAULT_SPOUT_NAME, AlertConstants.DEFAULT_ROUTERBOLT_NAME);
-    }
-
-    /**
-     * @param config
-     * @param topologyId       used for distinguishing kafka offset for different topologies
-     * @param numOfRouterBolts used for generating streamId and routing
-     * @param spoutName        used for generating streamId between spout and router bolt
-     * @param routerBoltName   used for generating streamId between spout and router bolt.
-     */
-    public CorrelationSpout(Config config, String topologyId, IMetadataChangeNotifyService changeNotifyService, int numOfRouterBolts, String spoutName, String routerBoltName) {
-        this.config = config;
-        this.topologyId = topologyId;
-        this.changeNotifyService = changeNotifyService;
-        this.numOfRouterBolts = numOfRouterBolts;
-        this.spoutName = spoutName;
-        this.routeBoltName = routerBoltName;
-    }
-
-    public String getSpoutName() {
-        return spoutName;
-    }
-
-    public String getRouteBoltName() {
-        return routeBoltName;
-    }
-
-    /**
-     * the only output field is for StreamEvent.
-     *
-     * @param declarer
-     */
-    @Override
-    public void declareOutputFields(OutputFieldsDeclarer declarer) {
-        for (int i = 0; i < numOfRouterBolts; i++) {
-            String streamId = StreamIdConversion.generateStreamIdBetween(spoutName, routeBoltName + i);
-            declarer.declareStream(streamId, new Fields(AlertConstants.FIELD_0));
-            LOG.info("declare stream between spout and streamRouterBolt " + streamId);
-        }
-    }
-
-    @SuppressWarnings("rawtypes")
-    @Override
-    public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
-        if (LOG.isDebugEnabled()) {
-            LOG.debug("open method invoked");
-        }
-        this.conf = conf;
-        this.context = context;
-        this.collector = collector;
-        this.taskIndex = context.getThisTaskIndex();
-
-        // initialize an empty SpoutSpec
-        cachedSpoutSpec = new SpoutSpec(topologyId, new HashMap<>(), new HashMap<>(), new HashMap<>());
-
-        changeNotifyService.registerListener(this);
-        changeNotifyService.init(config, MetadataType.SPOUT);
-
-        // register KafkaSpout metric
-        kafkaSpoutMetric = new KafkaSpoutMetric();
-        context.registerMetric("kafkaSpout", kafkaSpoutMetric, 60);
-
-        this.serializer = Serializers.newPartitionedEventSerializer(this);
-    }
-
-    @Override
-    public void onSpoutSpecChange(SpoutSpec spec, Map<String, StreamDefinition> sds) {
-        LOG.info("new metadata is updated " + spec);
-        try {
-            onReload(spec, sds);
-        } catch (Exception ex) {
-            LOG.error("error applying new SpoutSpec", ex);
-        }
-    }
-
-    @Override
-    public void nextTuple() {
-        for (KafkaSpoutWrapper wrapper : kafkaSpoutList.values()) {
-            try {
-                wrapper.nextTuple();
-            } catch (Exception e) {
-                LOG.error("unexpected exception is caught: {}", e.getMessage(), e);
-            }
-
-        }
-    }
-
-    /**
-     * find the correct wrapper to do ack that means msgId should be mapped to
-     * wrapper.
-     *
-     * @param msgId
-     */
-    @Override
-    public void ack(Object msgId) {
-        // decode and get topic
-        KafkaMessageIdWrapper id = (KafkaMessageIdWrapper) msgId;
-        KafkaSpoutWrapper spout = kafkaSpoutList.get(id.topic);
-        if (spout != null) {
-            spout.ack(id.id);
-        }
-    }
-
-    @Override
-    public void fail(Object msgId) {
-        // decode and get topic
-        KafkaMessageIdWrapper id = (KafkaMessageIdWrapper) msgId;
-        LOG.error("Failing message {}, with topic {}", msgId, id.topic);
-        KafkaSpoutWrapper spout = kafkaSpoutList.get(id.topic);
-        if (spout != null) {
-            spout.fail(id.id);
-        }
-    }
-
-    @Override
-    public void deactivate() {
-        System.out.println("deactivate");
-        for (KafkaSpoutWrapper wrapper : kafkaSpoutList.values()) {
-            wrapper.deactivate();
-        }
-    }
-
-    @Override
-    public void close() {
-        System.out.println("close");
-        for (KafkaSpoutWrapper wrapper : kafkaSpoutList.values()) {
-            wrapper.close();
-        }
-    }
-
-    private List<String> getTopics(SpoutSpec spoutSpec) {
-        List<String> meta = new ArrayList<String>();
-        for (Kafka2TupleMetadata entry : spoutSpec.getKafka2TupleMetadataMap().values()) {
-            meta.add(entry.getTopic());
-        }
-        return meta;
-    }
-
-    @SuppressWarnings("unchecked")
-    public void onReload(final SpoutSpec newMeta, Map<String, StreamDefinition> sds) throws Exception {
-        // calculate topic create/remove/update
-        List<String> topics = getTopics(newMeta);
-        List<String> cachedTopcies = getTopics(cachedSpoutSpec);
-        Collection<String> newTopics = CollectionUtils.subtract(topics, cachedTopcies);
-        Collection<String> removeTopics = CollectionUtils.subtract(cachedTopcies, topics);
-        Collection<String> updateTopics = CollectionUtils.intersection(topics, cachedTopcies);
-
-        LOG.info("Topics were added={}, removed={}, modified={}", newTopics, removeTopics, updateTopics);
-
-        // build lookup table for scheme
-        Map<String, String> newSchemaName = new HashMap<String, String>();
-        Map<String, Map<String, String>> dataSourceProperties = new HashMap<>();
-        for (Kafka2TupleMetadata ds : newMeta.getKafka2TupleMetadataMap().values()) {
-            newSchemaName.put(ds.getTopic(), ds.getSchemeCls());
-            dataSourceProperties.put(ds.getTopic(), ds.getProperties());
-        }
-
-        // copy and swap
-        Map<String, KafkaSpoutWrapper> newKafkaSpoutList = new HashMap<>(this.kafkaSpoutList);
-        // iterate new topics and then create KafkaSpout
-        for (String topic : newTopics) {
-            KafkaSpoutWrapper wrapper = newKafkaSpoutList.get(topic);
-            if (wrapper != null) {
-                LOG.warn(MessageFormat.format("try to create new topic {0}, but found in the active spout list, this may indicate some inconsistency", topic));
-                continue;
-            }
-            KafkaSpoutWrapper newWrapper = createKafkaSpout(ConfigFactory.parseMap(dataSourceProperties.get(topic)).withFallback(this.config),
-                    conf, context, collector, topic, newSchemaName.get(topic), newMeta, sds);
-            newKafkaSpoutList.put(topic, newWrapper);
-        }
-        // iterate remove topics and then close KafkaSpout
-        for (String topic : removeTopics) {
-            KafkaSpoutWrapper wrapper = newKafkaSpoutList.get(topic);
-            if (wrapper == null) {
-                LOG.warn(MessageFormat.format("try to remove topic {0}, but not found in the active spout list, this may indicate some inconsistency", topic));
-                continue;
-            }
-            removeKafkaSpout(wrapper);
-            newKafkaSpoutList.remove(topic);
-        }
-
-        // iterate update topic and then update metadata
-        for (String topic : updateTopics) {
-            KafkaSpoutWrapper spoutWrapper = newKafkaSpoutList.get(topic);
-            if (spoutWrapper == null) {
-                LOG.warn(MessageFormat.format("try to update topic {0}, but not found in the active spout list, this may indicate some inconsistency", topic));
-                continue;
-            }
-            spoutWrapper.update(newMeta, sds);
-        }
-
-        // swap
-        this.cachedSpoutSpec = newMeta;
-        this.kafkaSpoutList = newKafkaSpoutList;
-        this.sds = sds;
-    }
-
-    /**
-     * make this method protected to make sure unit test can work well
-     * Q: Where to persist consumer state, i.e. what offset has been consumed for each topic and partition
-     * A: stormKafkaTransactionZkPath + "/" + consumerId + "/" + topic + "/" + topologyId + "/" + partitionId
-     * Note1: PartitionManager.committedPath for composing zkState path,  _spoutConfig.zkRoot + "/" + _spoutConfig.id + "/" + _partition.getId();
-     * consumerId by default is EagleConsumer unless it is specified by "stormKafkaEagleConsumer"
-     * Note2: put topologyId as part of zkState because one topic by design can be consumed by multiple topologies so one topology needs to know
-     * processed offset for itself
-     * <p>TODO: Should avoid use Config.get in deep calling stack, should generate config bean as early as possible
-     * </p>
-     *
-     * @param conf
-     * @param context
-     * @param collector
-     * @param topic
-     * @param spoutSpec
-     * @return
-     */
-    @SuppressWarnings("rawtypes")
-    protected KafkaSpoutWrapper createKafkaSpout(Config configure, Map conf, TopologyContext context, SpoutOutputCollector collector, final String topic,
-                                                 String schemeClsName, SpoutSpec spoutSpec, Map<String, StreamDefinition> sds) throws Exception {
-        String kafkaBrokerZkQuorum = configure.getString(AlertConstants.KAFKA_BROKER_ZK_QUORUM);
-        BrokerHosts hosts = null;
-        if (configure.hasPath("spout.kafkaBrokerZkBasePath")) {
-            hosts = new ZkHosts(kafkaBrokerZkQuorum, configure.getString(AlertConstants.KAFKA_BROKER_ZK_BASE_PATH));
-        } else {
-            hosts = new ZkHosts(kafkaBrokerZkQuorum);
-        }
-        String transactionZkRoot = DEFAULT_STORM_KAFKA_TRANSACTION_ZK_ROOT;
-        if (configure.hasPath("spout.stormKafkaTransactionZkPath")) {
-            transactionZkRoot = configure.getString("spout.stormKafkaTransactionZkPath");
-        }
-        boolean logEventEnabled = false;
-        if (configure.hasPath("topology.logEventEnabled")) {
-            logEventEnabled = configure.getBoolean("topology.logEventEnabled");
-        }
-        // write partition offset etc. into zkRoot+id, see PartitionManager.committedPath
-        String zkStateTransactionRelPath = DEFAULT_STORM_KAFKA_TRANSACTION_ZK_RELATIVE_PATH;
-        if (configure.hasPath("spout.stormKafkaEagleConsumer")) {
-            zkStateTransactionRelPath = configure.getString("spout.stormKafkaEagleConsumer");
-        }
-        SpoutConfig spoutConfig = new SpoutConfig(hosts, topic, transactionZkRoot, zkStateTransactionRelPath + "/" + topic + "/" + topologyId);
-        // transaction zkServers
-        boolean stormKafkaUseSameZkQuorumWithKafkaBroker = configure.getBoolean("spout.stormKafkaUseSameZkQuorumWithKafkaBroker");
-        if (stormKafkaUseSameZkQuorumWithKafkaBroker) {
-            ZkServerPortUtils utils = new ZkServerPortUtils(kafkaBrokerZkQuorum);
-            spoutConfig.zkServers = utils.getZkHosts();
-            spoutConfig.zkPort = utils.getZkPort();
-        } else {
-            ZkServerPortUtils utils = new ZkServerPortUtils(configure.getString("spout.stormKafkaTransactionZkQuorum"));
-            spoutConfig.zkServers = utils.getZkHosts();
-            spoutConfig.zkPort = utils.getZkPort();
-        }
-        // transaction update interval
-        spoutConfig.stateUpdateIntervalMs = configure.hasPath("spout.stormKafkaStateUpdateIntervalMs") ? configure.getInt("spout.stormKafkaStateUpdateIntervalMs") : 2000;
-        // Kafka fetch size
-        spoutConfig.fetchSizeBytes = configure.hasPath("spout.stormKafkaFetchSizeBytes") ? configure.getInt("spout.stormKafkaFetchSizeBytes") : 1048586;
-        // "startOffsetTime" is for test usage, prod should not use this
-        if (configure.hasPath("spout.stormKafkaStartOffsetTime")) {
-            spoutConfig.startOffsetTime = configure.getInt("spout.stormKafkaStartOffsetTime");
-        }
-
-        spoutConfig.scheme = createMultiScheme(conf, topic, schemeClsName);
-        KafkaSpoutWrapper wrapper = new KafkaSpoutWrapper(spoutConfig, kafkaSpoutMetric);
-        SpoutOutputCollectorWrapper collectorWrapper = new SpoutOutputCollectorWrapper(this, collector, topic, spoutSpec, numOfRouterBolts, sds, this.serializer, logEventEnabled);
-        wrapper.open(conf, context, collectorWrapper);
-
-        if (LOG.isInfoEnabled()) {
-            LOG.info("create and open kafka wrapper: topic {}, scheme class{} ", topic, schemeClsName);
-        }
-        return wrapper;
-    }
-
-    private MultiScheme createMultiScheme(Map conf, String topic, String schemeClsName) throws Exception {
-        Object scheme = SchemeBuilder.buildFromClsName(schemeClsName, topic, conf);
-        if (scheme instanceof MultiScheme) {
-            return (MultiScheme) scheme;
-        } else if (scheme instanceof Scheme) {
-            return new SchemeAsMultiScheme((Scheme) scheme);
-        } else {
-            LOG.error("create spout scheme failed.");
-            throw new IllegalArgumentException("create spout scheme failed.");
-        }
-    }
-
-    @Override
-    public StreamDefinition getStreamDefinition(String streamId) {
-        return sds.get(streamId);
-    }
-
-    /**
-     * utility to get list of zkServers and zkPort.(It is assumed that zkPort is same for all zkServers as storm-kafka library requires this though it is not efficient)
-     */
-    private static class ZkServerPortUtils {
-        private List<String> zkHosts = new ArrayList<>();
-        private Integer zkPort;
-
-        public ZkServerPortUtils(String zkQuorum) {
-            String[] zkConnections = zkQuorum.split(",");
-            for (String zkConnection : zkConnections) {
-                zkHosts.add(zkConnection.split(":")[0]);
-            }
-            zkPort = Integer.valueOf(zkConnections[0].split(":")[1]);
-        }
-
-        public List<String> getZkHosts() {
-            return zkHosts;
-        }
-
-        public Integer getZkPort() {
-            return zkPort;
-        }
-    }
-
-    protected void removeKafkaSpout(KafkaSpoutWrapper wrapper) {
-        try {
-            wrapper.close();
-        } catch (Exception e) {
-            LOG.error("Close wrapper failed. Ignore and continue!", e);
-        }
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/spout/CreateTopicUtils.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/spout/CreateTopicUtils.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/spout/CreateTopicUtils.java
deleted file mode 100644
index 5b7e542..0000000
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/spout/CreateTopicUtils.java
+++ /dev/null
@@ -1,43 +0,0 @@
-/*
- *
- *  * Licensed to the Apache Software Foundation (ASF) under one or more
- *  * contributor license agreements.  See the NOTICE file distributed with
- *  * this work for additional information regarding copyright ownership.
- *  * The ASF licenses this file to You under the Apache License, Version 2.0
- *  * (the "License"); you may not use this file except in compliance with
- *  * the License.  You may obtain a copy of the License at
- *  *
- *  *    http://www.apache.org/licenses/LICENSE-2.0
- *  *
- *  * Unless required by applicable law or agreed to in writing, software
- *  * distributed under the License is distributed on an "AS IS" BASIS,
- *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  * See the License for the specific language governing permissions and
- *  * limitations under the License.
- *
- */
-package org.apache.eagle.alert.engine.spout;
-
-import org.slf4j.Logger;
-
-/**
- * normally this is used in unit test for convenience.
- */
-public class CreateTopicUtils {
-
-    private static final Logger LOG = org.slf4j.LoggerFactory.getLogger(CreateTopicUtils.class);
-
-    private static final int partitions = 2;
-    private static final int replicationFactor = 1;
-
-    public static void ensureTopicReady(String zkQuorum, String topic) {
-        //        ZkConnection zkConnection = new ZkConnection(zkQuorum);
-        //        ZkClient zkClient = new ZkClient(zkQuorum, 10000, 10000, ZKStringSerializer$.MODULE$);
-        ////        ZkUtils zkUtils = new ZkUtils(zkClient, zkConnection, false);
-        //        if (!AdminUtils.topicExists(zkClient, topic)) {
-        //            LOG.info("create topic " + topic + " with partitions " + partitions + ", and replicationFactor "
-        //                    + replicationFactor);
-        //            AdminUtils.createTopic(zkClient, topic, partitions, replicationFactor, new Properties());
-        //        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/spout/ISpoutSpecLCM.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/spout/ISpoutSpecLCM.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/spout/ISpoutSpecLCM.java
deleted file mode 100644
index 3c8c99d..0000000
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/spout/ISpoutSpecLCM.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- *
- *  * Licensed to the Apache Software Foundation (ASF) under one or more
- *  * contributor license agreements.  See the NOTICE file distributed with
- *  * this work for additional information regarding copyright ownership.
- *  * The ASF licenses this file to You under the Apache License, Version 2.0
- *  * (the "License"); you may not use this file except in compliance with
- *  * the License.  You may obtain a copy of the License at
- *  *
- *  *    http://www.apache.org/licenses/LICENSE-2.0
- *  *
- *  * Unless required by applicable law or agreed to in writing, software
- *  * distributed under the License is distributed on an "AS IS" BASIS,
- *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  * See the License for the specific language governing permissions and
- *  * limitations under the License.
- *
- */
-
-package org.apache.eagle.alert.engine.spout;
-
-import org.apache.eagle.alert.coordination.model.SpoutSpec;
-import org.apache.eagle.alert.engine.coordinator.StreamDefinition;
-
-import java.util.Map;
-
-/**
- * topic to stream metadata lifecycle method
- * one topic may spawn multiple streams, the metadata change includes
- * 1. add/remove stream
- * 2. for a specific stream, groupingstrategy is changed
- * ex1, this stream has more alert bolts than before, then this spout would take more traffic
- * ex2, this stream has less alert bolts than before, then this spout would take less traffic
- */
-public interface ISpoutSpecLCM {
-    /**
-     * stream metadata is used for SPOUT to filter traffic and route traffic to following groupby bolts.
-     *
-     * @param metadata
-     */
-    void update(SpoutSpec metadata, Map<String, StreamDefinition> sds);
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/spout/KafkaMessageIdWrapper.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/spout/KafkaMessageIdWrapper.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/spout/KafkaMessageIdWrapper.java
deleted file mode 100644
index 74dea03..0000000
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/spout/KafkaMessageIdWrapper.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/*
- *
- *  * Licensed to the Apache Software Foundation (ASF) under one or more
- *  * contributor license agreements.  See the NOTICE file distributed with
- *  * this work for additional information regarding copyright ownership.
- *  * The ASF licenses this file to You under the Apache License, Version 2.0
- *  * (the "License"); you may not use this file except in compliance with
- *  * the License.  You may obtain a copy of the License at
- *  *
- *  *    http://www.apache.org/licenses/LICENSE-2.0
- *  *
- *  * Unless required by applicable law or agreed to in writing, software
- *  * distributed under the License is distributed on an "AS IS" BASIS,
- *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  * See the License for the specific language governing permissions and
- *  * limitations under the License.
- *
- */
-package org.apache.eagle.alert.engine.spout;
-
-import com.fasterxml.jackson.core.JsonProcessingException;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import org.apache.eagle.common.DateTimeUtil;
-
-/**
- * Created on 2/18/16.
- */
-public class KafkaMessageIdWrapper {
-    public Object id;
-    public String topic;
-    public long timestamp;
-
-    public KafkaMessageIdWrapper(Object o) {
-        this.id = o;
-    }
-
-    private static final ObjectMapper objectMapper = new ObjectMapper();
-
-    public String toString() {
-        try {
-            return String.format("KafkaMessageIdWrapper[topic=%s, id=%s, timestamp=%s %s]",
-                    topic,
-                    objectMapper.writeValueAsString(id),
-                    DateTimeUtil.millisecondsToHumanDateWithSeconds(timestamp),
-                    DateTimeUtil.CURRENT_TIME_ZONE.getID());
-        } catch (JsonProcessingException e) {
-            throw new IllegalStateException(e);
-        }
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/spout/SchemeBuilder.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/spout/SchemeBuilder.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/spout/SchemeBuilder.java
deleted file mode 100644
index 440db0a..0000000
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/spout/SchemeBuilder.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- *
- *  * Licensed to the Apache Software Foundation (ASF) under one or more
- *  * contributor license agreements.  See the NOTICE file distributed with
- *  * this work for additional information regarding copyright ownership.
- *  * The ASF licenses this file to You under the Apache License, Version 2.0
- *  * (the "License"); you may not use this file except in compliance with
- *  * the License.  You may obtain a copy of the License at
- *  *
- *  *    http://www.apache.org/licenses/LICENSE-2.0
- *  *
- *  * Unless required by applicable law or agreed to in writing, software
- *  * distributed under the License is distributed on an "AS IS" BASIS,
- *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  * See the License for the specific language governing permissions and
- *  * limitations under the License.
- *
- */
-
-package org.apache.eagle.alert.engine.spout;
-
-import java.util.Map;
-
-
-/**
- * All Scheme implementations should have the following conditions
- * 1) implement Scheme interface
- * 2) has one constructor with topic name as parameter.
- */
-public class SchemeBuilder {
-
-    @SuppressWarnings("rawtypes")
-    public static Object buildFromClsName(String clsName, String topic, Map conf) throws Exception {
-        Object o = Class.forName(clsName).getConstructor(String.class, Map.class).newInstance(topic, conf);
-        return o;
-    }
-}