You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@eagle.apache.org by ha...@apache.org on 2017/04/03 11:54:29 UTC

[21/84] [partial] eagle git commit: Clean repo for eagle site

http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/Serializers.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/Serializers.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/Serializers.java
deleted file mode 100644
index a84b5dc..0000000
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/Serializers.java
+++ /dev/null
@@ -1,57 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.engine.serialization;
-
-import org.apache.eagle.alert.engine.coordinator.StreamColumn;
-import org.apache.eagle.alert.engine.serialization.impl.*;
-
-import java.util.HashMap;
-import java.util.Map;
-
-public class Serializers {
-    private static final Map<StreamColumn.Type, Serializer<?>> COLUMN_TYPE_SER_MAPPING = new HashMap<>();
-
-    public static <T> void register(StreamColumn.Type type, Serializer<T> serializer) {
-        if (COLUMN_TYPE_SER_MAPPING.containsKey(type)) {
-            throw new IllegalArgumentException("Duplicated column type: " + type);
-        }
-        COLUMN_TYPE_SER_MAPPING.put(type, serializer);
-    }
-
-    @SuppressWarnings("unchecked")
-    public static <T> Serializer<T> getColumnSerializer(StreamColumn.Type type) {
-        if (COLUMN_TYPE_SER_MAPPING.containsKey(type)) {
-            return (Serializer<T>) COLUMN_TYPE_SER_MAPPING.get(type);
-        } else {
-            throw new IllegalArgumentException("Serializer of type: " + type + " not found");
-        }
-    }
-
-    public static PartitionedEventSerializer newPartitionedEventSerializer(SerializationMetadataProvider metadataProvider) {
-        return new PartitionedEventSerializerImpl(metadataProvider);
-    }
-
-    static {
-        register(StreamColumn.Type.STRING, new StringSerializer());
-        register(StreamColumn.Type.INT, new IntegerSerializer());
-        register(StreamColumn.Type.LONG, new LongSerializer());
-        register(StreamColumn.Type.FLOAT, new FloatSerializer());
-        register(StreamColumn.Type.DOUBLE, new DoubleSerializer());
-        register(StreamColumn.Type.BOOL, new BooleanSerializer());
-        register(StreamColumn.Type.OBJECT, new JavaObjectSerializer());
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/impl/BooleanSerializer.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/impl/BooleanSerializer.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/impl/BooleanSerializer.java
deleted file mode 100644
index 1e90569..0000000
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/impl/BooleanSerializer.java
+++ /dev/null
@@ -1,35 +0,0 @@
-package org.apache.eagle.alert.engine.serialization.impl;
-
-import org.apache.eagle.alert.engine.serialization.Serializer;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-public class BooleanSerializer implements Serializer<Boolean> {
-    @Override
-    public void serialize(Boolean value, DataOutput dataOutput) throws IOException {
-        dataOutput.writeBoolean(value);
-    }
-
-    @Override
-    public Boolean deserialize(DataInput dataInput) throws IOException {
-        return dataInput.readBoolean();
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/impl/DoubleSerializer.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/impl/DoubleSerializer.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/impl/DoubleSerializer.java
deleted file mode 100644
index df56124..0000000
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/impl/DoubleSerializer.java
+++ /dev/null
@@ -1,38 +0,0 @@
-package org.apache.eagle.alert.engine.serialization.impl;
-
-import org.apache.eagle.alert.engine.serialization.Serializer;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-public class DoubleSerializer implements Serializer<Object> {
-    @Override
-    public void serialize(Object value, DataOutput dataOutput) throws IOException {
-        if (value instanceof Number) {
-            value = ((Number)value).doubleValue();
-        }
-        dataOutput.writeDouble((double)value);
-    }
-
-    @Override
-    public Object deserialize(DataInput dataInput) throws IOException {
-        return dataInput.readDouble();
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/impl/FloatSerializer.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/impl/FloatSerializer.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/impl/FloatSerializer.java
deleted file mode 100644
index 0ae48e3..0000000
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/impl/FloatSerializer.java
+++ /dev/null
@@ -1,38 +0,0 @@
-package org.apache.eagle.alert.engine.serialization.impl;
-
-import org.apache.eagle.alert.engine.serialization.Serializer;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-public class FloatSerializer implements Serializer<Object> {
-    @Override
-    public void serialize(Object value, DataOutput dataOutput) throws IOException {
-        if (value instanceof Number) {
-            value = ((Number)value).floatValue();
-        }
-        dataOutput.writeFloat((float)value);
-    }
-
-    @Override
-    public Object deserialize(DataInput dataInput) throws IOException {
-        return dataInput.readFloat();
-    }
-}

http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/impl/IntegerSerializer.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/impl/IntegerSerializer.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/impl/IntegerSerializer.java
deleted file mode 100644
index b698167..0000000
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/impl/IntegerSerializer.java
+++ /dev/null
@@ -1,38 +0,0 @@
-package org.apache.eagle.alert.engine.serialization.impl;
-
-import org.apache.eagle.alert.engine.serialization.Serializer;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-public class IntegerSerializer implements Serializer<Object> {
-    @Override
-    public void serialize(Object value, DataOutput dataOutput) throws IOException {
-        if (value instanceof Number) {
-            value = ((Number) value).intValue();
-        }
-        dataOutput.writeInt((int) value);
-    }
-
-    @Override
-    public Object deserialize(DataInput dataInput) throws IOException {
-        return dataInput.readInt();
-    }
-}

http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/impl/JavaObjectSerializer.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/impl/JavaObjectSerializer.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/impl/JavaObjectSerializer.java
deleted file mode 100644
index 14d9ea5..0000000
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/impl/JavaObjectSerializer.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.engine.serialization.impl;
-
-import org.apache.eagle.alert.engine.serialization.Serializer;
-import org.apache.commons.lang3.SerializationUtils;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-import java.io.Serializable;
-
-public class JavaObjectSerializer implements Serializer<Object> {
-    @Override
-    public void serialize(Object value, DataOutput dataOutput) throws IOException {
-        byte[] bytes = SerializationUtils.serialize((Serializable) value);
-        dataOutput.writeInt(bytes.length);
-        dataOutput.write(bytes);
-    }
-
-    @Override
-    public Object deserialize(DataInput dataInput) throws IOException {
-        int len = dataInput.readInt();
-        byte[] bytes = new byte[len];
-        dataInput.readFully(bytes);
-        return SerializationUtils.deserialize(bytes);
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/impl/LongSerializer.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/impl/LongSerializer.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/impl/LongSerializer.java
deleted file mode 100644
index efe7e3a..0000000
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/impl/LongSerializer.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.engine.serialization.impl;
-
-import org.apache.eagle.alert.engine.serialization.Serializer;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-
-public class LongSerializer implements Serializer<Object> {
-    @Override
-    public void serialize(Object value, DataOutput dataOutput) throws IOException {
-        if (value instanceof Number) {
-            value = ((Number) value).longValue();
-        }
-        dataOutput.writeLong((long) value);
-    }
-
-    @Override
-    public Long deserialize(DataInput dataInput) throws IOException {
-        return dataInput.readLong();
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/impl/PartitionedEventSerializerImpl.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/impl/PartitionedEventSerializerImpl.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/impl/PartitionedEventSerializerImpl.java
deleted file mode 100644
index 2b0140f..0000000
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/impl/PartitionedEventSerializerImpl.java
+++ /dev/null
@@ -1,102 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.engine.serialization.impl;
-
-import org.apache.eagle.alert.engine.coordinator.StreamPartition;
-import org.apache.eagle.alert.engine.model.PartitionedEvent;
-import org.apache.eagle.alert.engine.model.StreamEvent;
-import org.apache.eagle.alert.engine.serialization.PartitionedEventSerializer;
-import org.apache.eagle.alert.engine.serialization.SerializationMetadataProvider;
-import org.apache.eagle.alert.engine.serialization.Serializer;
-import org.apache.eagle.alert.engine.utils.CompressionUtils;
-import com.google.common.io.ByteArrayDataOutput;
-import com.google.common.io.ByteStreams;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-
-/**
- * Stream Metadata Cached Serializer
- *
- * <p> Performance:
- *
- * 1) VS Kryo Direct: reduce 73.4% space (bytes) and 42.5 % time (ms).
- * 2) VS Java Native: reduce 92.5% space (bytes) and 94.2% time (ms)
- * </p>
- *
- * <p>Tips:
- * 1) Without-compression performs better than with compression for small event
- * </p>
- *
- * <p>TODO: Cache Partition would save little space but almost half of serialization time, how to balance the performance?</p>
- *
- * @see PartitionedEvent
- */
-public class PartitionedEventSerializerImpl implements Serializer<PartitionedEvent>, PartitionedEventSerializer {
-    private final StreamEventSerializer streamEventSerializer;
-    private final Serializer<StreamPartition> streamPartitionSerializer;
-    private final boolean compress;
-
-    /**
-     * @param serializationMetadataProvider metadata provider.
-     * @param compress                      false by default.
-     */
-    public PartitionedEventSerializerImpl(SerializationMetadataProvider serializationMetadataProvider, boolean compress) {
-        this.streamEventSerializer = new StreamEventSerializer(serializationMetadataProvider);
-        this.streamPartitionSerializer = StreamPartitionSerializer.INSTANCE;
-        this.compress = compress;
-    }
-
-    public PartitionedEventSerializerImpl(SerializationMetadataProvider serializationMetadataProvider) {
-        this.streamEventSerializer = new StreamEventSerializer(serializationMetadataProvider);
-        this.streamPartitionSerializer = StreamPartitionSerializer.INSTANCE;
-        this.compress = false;
-    }
-
-    @Override
-    public void serialize(PartitionedEvent entity, DataOutput dataOutput) throws IOException {
-        dataOutput.writeLong(entity.getPartitionKey());
-        streamEventSerializer.serialize(entity.getEvent(), dataOutput);
-        streamPartitionSerializer.serialize(entity.getPartition(), dataOutput);
-    }
-
-    @Override
-    public byte[] serialize(PartitionedEvent entity) throws IOException {
-        ByteArrayDataOutput dataOutput = ByteStreams.newDataOutput();
-        this.serialize(entity, dataOutput);
-        return compress ? CompressionUtils.compress(dataOutput.toByteArray()) : dataOutput.toByteArray();
-    }
-
-    @Override
-    public PartitionedEvent deserialize(DataInput dataInput) throws IOException {
-        PartitionedEvent event = new PartitionedEvent();
-        event.setPartitionKey(dataInput.readLong());
-        StreamEvent streamEvent = streamEventSerializer.deserialize(dataInput);
-        StreamPartition partition = streamPartitionSerializer.deserialize(dataInput);
-        event.setEvent(streamEvent);
-        partition.setStreamId(streamEvent.getStreamId());
-        event.setPartition(partition);
-        return event;
-    }
-
-
-    @Override
-    public PartitionedEvent deserialize(byte[] bytes) throws IOException {
-        return this.deserialize(ByteStreams.newDataInput(compress ? CompressionUtils.decompress(bytes) : bytes));
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/impl/StreamEventSerializer.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/impl/StreamEventSerializer.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/impl/StreamEventSerializer.java
deleted file mode 100644
index 8ffcb83..0000000
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/impl/StreamEventSerializer.java
+++ /dev/null
@@ -1,118 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.engine.serialization.impl;
-
-import org.apache.eagle.alert.engine.coordinator.StreamColumn;
-import org.apache.eagle.alert.engine.coordinator.StreamDefinition;
-import org.apache.eagle.alert.engine.model.StreamEvent;
-import org.apache.eagle.alert.engine.serialization.SerializationMetadataProvider;
-import org.apache.eagle.alert.engine.serialization.Serializer;
-import org.apache.eagle.alert.engine.serialization.Serializers;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-import java.util.BitSet;
-
-/**
- * StreamEventSerializer.
- *
- * @see StreamEvent
- */
-public class StreamEventSerializer implements Serializer<StreamEvent> {
-    private final SerializationMetadataProvider serializationMetadataProvider;
-
-    public StreamEventSerializer(SerializationMetadataProvider serializationMetadataProvider) {
-        this.serializationMetadataProvider = serializationMetadataProvider;
-    }
-
-    private BitSet isNullBitSet(Object[] objects) {
-        BitSet bitSet = new BitSet();
-        int i = 0;
-        for (Object obj : objects) {
-            bitSet.set(i, obj == null);
-            i++;
-        }
-        return bitSet;
-    }
-
-    @Override
-    public void serialize(StreamEvent event, DataOutput dataOutput) throws IOException {
-        // Bryant: here "metaVersion/streamId" writes to dataOutputUTF
-        String metaVersion = event.getMetaVersion();
-        String streamId = event.getStreamId();
-        String metaVersionStreamId = String.format("%s/%s", metaVersion, streamId);
-
-        dataOutput.writeUTF(metaVersionStreamId);
-        dataOutput.writeLong(event.getTimestamp());
-        if (event.getData() == null || event.getData().length == 0) {
-            dataOutput.writeInt(0);
-        } else {
-            BitSet isNullIndex = isNullBitSet(event.getData());
-            byte[] isNullBytes = isNullIndex.toByteArray();
-            dataOutput.writeInt(isNullBytes.length);
-            dataOutput.write(isNullBytes);
-            int i = 0;
-            StreamDefinition definition = serializationMetadataProvider.getStreamDefinition(event.getStreamId());
-            if (definition == null) {
-                throw new IOException("StreamDefinition not found: " + event.getStreamId());
-            }
-            if (event.getData().length != definition.getColumns().size()) {
-                throw new IOException("Event :" + event + " doesn't match with schema: " + definition);
-            }
-            for (StreamColumn column : definition.getColumns()) {
-                if (!isNullIndex.get(i)) {
-                    Serializers.getColumnSerializer(column.getType()).serialize(event.getData()[i], dataOutput);
-                }
-                i++;
-            }
-        }
-    }
-
-    @Override
-    public StreamEvent deserialize(DataInput dataInput) throws IOException {
-        StreamEvent event = new StreamEvent();
-        String metaVersionStreamId = dataInput.readUTF();
-        String streamId = metaVersionStreamId.split("/")[1];
-        String metaVersion = metaVersionStreamId.split("/")[0];
-        // sometimes metaVersionStreamId will be "null/id", then metaVersion will be "null" rather than null
-        // need to handle it for future use
-        if (metaVersion.equals("null")) {
-            metaVersion = null;
-        }
-
-        event.setStreamId(streamId);
-        event.setMetaVersion(metaVersion);
-
-        StreamDefinition definition = serializationMetadataProvider.getStreamDefinition(event.getStreamId());
-        event.setTimestamp(dataInput.readLong());
-        int isNullBytesLen = dataInput.readInt();
-        byte[] isNullBytes = new byte[isNullBytesLen];
-        dataInput.readFully(isNullBytes);
-        BitSet isNullIndex = BitSet.valueOf(isNullBytes);
-        Object[] attributes = new Object[definition.getColumns().size()];
-        int i = 0;
-        for (StreamColumn column : definition.getColumns()) {
-            if (!isNullIndex.get(i)) {
-                attributes[i] = Serializers.getColumnSerializer(column.getType()).deserialize(dataInput);
-            }
-            i++;
-        }
-        event.setData(attributes);
-        return event;
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/impl/StreamPartitionDigestSerializer.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/impl/StreamPartitionDigestSerializer.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/impl/StreamPartitionDigestSerializer.java
deleted file mode 100644
index 6a47f1e..0000000
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/impl/StreamPartitionDigestSerializer.java
+++ /dev/null
@@ -1,103 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.engine.serialization.impl;
-
-import org.apache.eagle.alert.engine.coordinator.StreamPartition;
-import org.apache.eagle.alert.engine.serialization.Serializer;
-
-import java.io.*;
-import java.security.MessageDigest;
-import java.security.NoSuchAlgorithmException;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.Map;
-
-/**
- * Don't serialize streamId.
- *
- * @see StreamPartition
- */
-public class StreamPartitionDigestSerializer implements Serializer<StreamPartition> {
-    public static final StreamPartitionDigestSerializer INSTANCE = new StreamPartitionDigestSerializer();
-
-    private final Map<DigestBytes, StreamPartition> checkSumPartitionMap = new HashMap<>();
-    private final Map<StreamPartition, DigestBytes> partitionCheckSumMap = new HashMap<>();
-
-    @Override
-    public void serialize(StreamPartition partition, DataOutput dataOutput) throws IOException {
-        DigestBytes checkSum = partitionCheckSumMap.get(partition);
-        if (checkSum == null) {
-            try {
-                checkSum = digestCheckSum(partition);
-                partitionCheckSumMap.put(partition, checkSum);
-                checkSumPartitionMap.put(checkSum, partition);
-            } catch (NoSuchAlgorithmException e) {
-                throw new IOException(e);
-            }
-        }
-        dataOutput.writeInt(checkSum.size());
-        dataOutput.write(checkSum.toByteArray());
-    }
-
-    @Override
-    public StreamPartition deserialize(DataInput dataInput) throws IOException {
-        int checkSumLen = dataInput.readInt();
-        byte[] checksum = new byte[checkSumLen];
-        dataInput.readFully(checksum);
-        StreamPartition partition = checkSumPartitionMap.get(new DigestBytes(checksum));
-        if (partition == null) {
-            throw new IOException("Illegal partition checksum: " + checksum);
-        }
-        return partition;
-    }
-
-    private class DigestBytes {
-        private final byte[] data;
-
-        public DigestBytes(byte[] bytes) {
-            this.data = bytes;
-        }
-
-        @Override
-        public boolean equals(Object other) {
-            return other instanceof DigestBytes && Arrays.equals(data, ((DigestBytes) other).data);
-        }
-
-        @Override
-        public int hashCode() {
-            return Arrays.hashCode(data);
-        }
-
-        public int size() {
-            return data.length;
-        }
-
-        public byte[] toByteArray() {
-            return data;
-        }
-    }
-
-    private DigestBytes digestCheckSum(Object obj) throws IOException, NoSuchAlgorithmException {
-        ByteArrayOutputStream baos = new ByteArrayOutputStream();
-        ObjectOutputStream oos = new ObjectOutputStream(baos);
-        oos.writeObject(obj);
-        oos.close();
-        MessageDigest m = MessageDigest.getInstance("SHA1");
-        m.update(baos.toByteArray());
-        return new DigestBytes(m.digest());
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/impl/StreamPartitionSerializer.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/impl/StreamPartitionSerializer.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/impl/StreamPartitionSerializer.java
deleted file mode 100644
index 411368f..0000000
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/impl/StreamPartitionSerializer.java
+++ /dev/null
@@ -1,80 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.engine.serialization.impl;
-
-import org.apache.eagle.alert.engine.coordinator.StreamPartition;
-import org.apache.eagle.alert.engine.coordinator.StreamSortSpec;
-import org.apache.eagle.alert.engine.serialization.Serializer;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-
-/**
- * Don't serialize streamId.
- *
- * @see StreamPartition
- */
-public class StreamPartitionSerializer implements Serializer<StreamPartition> {
-    public static final StreamPartitionSerializer INSTANCE = new StreamPartitionSerializer();
-
-    @Override
-    public void serialize(StreamPartition partition, DataOutput dataOutput) throws IOException {
-        dataOutput.writeUTF(partition.getType().toString());
-        if (partition.getColumns() == null || partition.getColumns().size() == 0) {
-            dataOutput.writeInt(0);
-        } else {
-            dataOutput.writeInt(partition.getColumns().size());
-            for (String column : partition.getColumns()) {
-                dataOutput.writeUTF(column);
-            }
-        }
-        if (partition.getSortSpec() == null) {
-            dataOutput.writeByte(0);
-        } else {
-            dataOutput.writeByte(1);
-            dataOutput.writeUTF(partition.getSortSpec().getWindowPeriod());
-            dataOutput.writeInt(partition.getSortSpec().getWindowMargin());
-        }
-    }
-
-    @Override
-    public StreamPartition deserialize(DataInput dataInput) throws IOException {
-        StreamPartition partition = new StreamPartition();
-        partition.setType(StreamPartition.Type.locate(dataInput.readUTF()));
-        int colSize = dataInput.readInt();
-        if (colSize > 0) {
-            List<String> columns = new ArrayList<>(colSize);
-            for (int i = 0; i < colSize; i++) {
-                columns.add(dataInput.readUTF());
-            }
-            partition.setColumns(columns);
-        }
-        if (dataInput.readByte() == 1) {
-            String period = dataInput.readUTF();
-            int margin = dataInput.readInt();
-
-            StreamSortSpec sortSpec = new StreamSortSpec();
-            sortSpec.setWindowPeriod(period);
-            sortSpec.setWindowMargin(margin);
-            partition.setSortSpec(sortSpec);
-        }
-        return partition;
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/impl/StringSerializer.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/impl/StringSerializer.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/impl/StringSerializer.java
deleted file mode 100644
index 2a1541a..0000000
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/impl/StringSerializer.java
+++ /dev/null
@@ -1,35 +0,0 @@
-package org.apache.eagle.alert.engine.serialization.impl;
-
-import org.apache.eagle.alert.engine.serialization.Serializer;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-public class StringSerializer implements Serializer<String> {
-    @Override
-    public void serialize(String value, DataOutput dataOutput) throws IOException {
-        dataOutput.writeUTF(value);
-    }
-
-    @Override
-    public String deserialize(DataInput dataInput) throws IOException {
-        return dataInput.readUTF();
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/siddhi/extension/AttributeCollectAggregator.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/siddhi/extension/AttributeCollectAggregator.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/siddhi/extension/AttributeCollectAggregator.java
deleted file mode 100644
index 599f349..0000000
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/siddhi/extension/AttributeCollectAggregator.java
+++ /dev/null
@@ -1,120 +0,0 @@
-/*
- *
- *  * Licensed to the Apache Software Foundation (ASF) under one or more
- *  * contributor license agreements.  See the NOTICE file distributed with
- *  * this work for additional information regarding copyright ownership.
- *  * The ASF licenses this file to You under the Apache License, Version 2.0
- *  * (the "License"); you may not use this file except in compliance with
- *  * the License.  You may obtain a copy of the License at
- *  *
- *  *    http://www.apache.org/licenses/LICENSE-2.0
- *  *
- *  * Unless required by applicable law or agreed to in writing, software
- *  * distributed under the License is distributed on an "AS IS" BASIS,
- *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  * See the License for the specific language governing permissions and
- *  * limitations under the License.
- *
- */
-package org.apache.eagle.alert.engine.siddhi.extension;
-
-import com.google.common.collect.ImmutableList;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.wso2.siddhi.core.config.ExecutionPlanContext;
-import org.wso2.siddhi.core.executor.ExpressionExecutor;
-import org.wso2.siddhi.core.query.selector.attribute.aggregator.AttributeAggregator;
-import org.wso2.siddhi.query.api.definition.Attribute;
-import org.wso2.siddhi.query.api.definition.Attribute.Type;
-
-import java.util.LinkedList;
-
-/**
- * @since Apr 1, 2016.
- */
-public class AttributeCollectAggregator extends AttributeAggregator {
-
-    private static final Logger LOG = LoggerFactory.getLogger(AttributeCollectAggregator.class);
-
-    private LinkedList<Object> value;
-
-    public AttributeCollectAggregator() {
-        value = new LinkedList<Object>();
-    }
-
-    @Override
-    public void start() {
-    }
-
-    @Override
-    public void stop() {
-    }
-
-    @Override
-    public Object[] currentState() {
-        return value.toArray();
-    }
-
-    @Override
-    public void restoreState(Object[] arg0) {
-        value = new LinkedList<Object>();
-        if (arg0 != null) {
-            for (Object o : arg0) {
-                value.add(o);
-            }
-        }
-    }
-
-    @Override
-    public Type getReturnType() {
-        return Attribute.Type.OBJECT;
-    }
-
-    @Override
-    protected void init(ExpressionExecutor[] arg0, ExecutionPlanContext arg1) {
-        // TODO: Support max of elements?
-    }
-
-    @Override
-    public Object processAdd(Object arg0) {
-        value.add(arg0);
-        if (LOG.isDebugEnabled()) {
-            LOG.debug("processAdd: current values are : " + value);
-        }
-        return ImmutableList.copyOf(value);
-    }
-
-    @Override
-    public Object processAdd(Object[] arg0) {
-        value.add(arg0);
-        if (LOG.isDebugEnabled()) {
-            LOG.debug("processAdd: current values are : " + value);
-        }
-        return ImmutableList.copyOf(value);
-    }
-
-    // / NOTICE: non O(1)
-    @Override
-    public Object processRemove(Object arg0) {
-        value.remove(arg0);
-        if (LOG.isDebugEnabled()) {
-            LOG.debug("processRemove: current values are : " + value);
-        }
-        return ImmutableList.copyOf(value);
-    }
-
-    // / NOTICE: non O(1)
-    @Override
-    public Object processRemove(Object[] arg0) {
-        value.remove(arg0);
-        LOG.info("processRemove: current values are : " + value);
-        return ImmutableList.copyOf(value);
-    }
-
-    @Override
-    public Object reset() {
-        value.clear();
-        return value;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/siddhi/extension/AttributeCollectWithDistinctAggregator.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/siddhi/extension/AttributeCollectWithDistinctAggregator.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/siddhi/extension/AttributeCollectWithDistinctAggregator.java
deleted file mode 100644
index 101d05b..0000000
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/siddhi/extension/AttributeCollectWithDistinctAggregator.java
+++ /dev/null
@@ -1,123 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.engine.siddhi.extension;
-
-import com.google.common.collect.ImmutableList;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.wso2.siddhi.core.config.ExecutionPlanContext;
-import org.wso2.siddhi.core.executor.ExpressionExecutor;
-import org.wso2.siddhi.core.query.selector.attribute.aggregator.AttributeAggregator;
-import org.wso2.siddhi.query.api.definition.Attribute;
-import org.wso2.siddhi.query.api.definition.Attribute.Type;
-
-import java.util.LinkedList;
-
-public class AttributeCollectWithDistinctAggregator extends AttributeAggregator {
-
-    private static final Logger LOG = LoggerFactory.getLogger(AttributeCollectAggregator.class);
-
-    private LinkedList<Object> value;
-
-    public AttributeCollectWithDistinctAggregator() {
-        value = new LinkedList<Object>();
-    }
-
-    @Override
-    public void start() {
-    }
-
-    @Override
-    public void stop() {
-    }
-
-    @Override
-    public Object[] currentState() {
-        return value.toArray();
-    }
-
-    @Override
-    public void restoreState(Object[] arg0) {
-        value = new LinkedList<Object>();
-        if (arg0 != null) {
-            for (Object o : arg0) {
-                value.add(o);
-            }
-        }
-    }
-
-    @Override
-    public Type getReturnType() {
-        return Attribute.Type.OBJECT;
-    }
-
-    @Override
-    protected void init(ExpressionExecutor[] arg0, ExecutionPlanContext arg1) {
-        // TODO: Support max of elements?
-    }
-
-    @Override
-    public Object processAdd(Object arg0) {
-        // AttributeAggregator.process is already synchronized
-        if (value.contains(arg0)) {
-            value.remove(arg0);
-        }
-        value.add(arg0);
-        if (LOG.isDebugEnabled()) {
-            LOG.debug("processAdd: current values are : " + value);
-        }
-        return ImmutableList.copyOf(value);
-    }
-
-    @Override
-    public Object processAdd(Object[] arg0) {
-        // AttributeAggregator.process is already synchronized
-        if (value.contains(arg0)) {
-            value.remove(arg0);
-        }
-        value.add(arg0);
-        if (LOG.isDebugEnabled()) {
-            LOG.debug("processAdd: current values are : " + value);
-        }
-        return ImmutableList.copyOf(value);
-    }
-
-    // / NOTICE: non O(1)
-    @Override
-    public Object processRemove(Object arg0) {
-        value.remove(arg0);
-        if (LOG.isDebugEnabled()) {
-            LOG.debug("processRemove: current values are : " + value);
-        }
-        return ImmutableList.copyOf(value);
-    }
-
-    // / NOTICE: non O(1)
-    @Override
-    public Object processRemove(Object[] arg0) {
-        value.remove(arg0);
-        LOG.info("processRemove: current values are : " + value);
-        return ImmutableList.copyOf(value);
-    }
-
-    @Override
-    public Object reset() {
-        value.clear();
-        return value;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/siddhi/extension/ContainsIgnoreCaseExtension.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/siddhi/extension/ContainsIgnoreCaseExtension.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/siddhi/extension/ContainsIgnoreCaseExtension.java
deleted file mode 100644
index 27df63b..0000000
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/siddhi/extension/ContainsIgnoreCaseExtension.java
+++ /dev/null
@@ -1,88 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.eagle.alert.engine.siddhi.extension;
-
-import org.wso2.siddhi.core.config.ExecutionPlanContext;
-import org.wso2.siddhi.core.exception.ExecutionPlanRuntimeException;
-import org.wso2.siddhi.core.executor.ExpressionExecutor;
-import org.wso2.siddhi.core.executor.function.FunctionExecutor;
-import org.wso2.siddhi.query.api.definition.Attribute;
-import org.wso2.siddhi.query.api.exception.ExecutionPlanValidationException;
-
-public class ContainsIgnoreCaseExtension extends FunctionExecutor {
-
-    Attribute.Type returnType = Attribute.Type.BOOL;
-
-    @Override
-    protected void init(ExpressionExecutor[] attributeExpressionExecutors, ExecutionPlanContext executionPlanContext) {
-        if (attributeExpressionExecutors.length != 2) {
-            throw new ExecutionPlanValidationException("Invalid no of arguments passed to str:containsIgnoreCase() function, required 2, "
-                + "but found " + attributeExpressionExecutors.length);
-        }
-        if (attributeExpressionExecutors[0].getReturnType() != Attribute.Type.STRING) {
-            throw new ExecutionPlanValidationException("Invalid parameter type found for the first argument of str:containsIgnoreCase() function, "
-                + "required " + Attribute.Type.STRING + ", but found " + attributeExpressionExecutors[0].getReturnType().toString());
-        }
-        if (attributeExpressionExecutors[1].getReturnType() != Attribute.Type.STRING) {
-            throw new ExecutionPlanValidationException("Invalid parameter type found for the second argument of str:containsIgnoreCase() function, "
-                + "required " + Attribute.Type.STRING + ", but found " + attributeExpressionExecutors[1].getReturnType().toString());
-        }
-    }
-
-    @Override
-    protected Object execute(Object[] data) {
-        if (data[0] == null) {
-            throw new ExecutionPlanRuntimeException("Invalid input given to str:containsIgnoreCase() function. First argument cannot be null");
-        }
-        if (data[1] == null) {
-            throw new ExecutionPlanRuntimeException("Invalid input given to str:containsIgnoreCase() function. Second argument cannot be null");
-        }
-        String str1 = (String) data[0];
-        String str2 = (String) data[1];
-        return str1.toUpperCase().contains(str2.toUpperCase());
-    }
-
-    @Override
-    protected Object execute(Object data) {
-        return null; //Since the containsIgnoreCase function takes in 2 parameters, this method does not get called. Hence, not implemented.
-    }
-
-    @Override
-    public void start() {
-        //Nothing to start
-    }
-
-    @Override
-    public void stop() {
-        //Nothing to stop
-    }
-
-    @Override
-    public Attribute.Type getReturnType() {
-        return returnType;
-    }
-
-    @Override
-    public Object[] currentState() {
-        return new Object[] {};
-    }
-
-    @Override
-    public void restoreState(Object[] state) {
-    }
-}

http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/siddhi/extension/EqualsIgnoreCaseExtension.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/siddhi/extension/EqualsIgnoreCaseExtension.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/siddhi/extension/EqualsIgnoreCaseExtension.java
deleted file mode 100644
index 1292e05..0000000
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/siddhi/extension/EqualsIgnoreCaseExtension.java
+++ /dev/null
@@ -1,88 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.eagle.alert.engine.siddhi.extension;
-
-import org.wso2.siddhi.core.config.ExecutionPlanContext;
-import org.wso2.siddhi.core.exception.ExecutionPlanRuntimeException;
-import org.wso2.siddhi.core.executor.ExpressionExecutor;
-import org.wso2.siddhi.core.executor.function.FunctionExecutor;
-import org.wso2.siddhi.query.api.definition.Attribute;
-import org.wso2.siddhi.query.api.exception.ExecutionPlanValidationException;
-
-public class EqualsIgnoreCaseExtension extends FunctionExecutor {
-
-    Attribute.Type returnType = Attribute.Type.BOOL;
-
-    @Override
-    protected void init(ExpressionExecutor[] attributeExpressionExecutors, ExecutionPlanContext executionPlanContext) {
-        if (attributeExpressionExecutors.length != 2) {
-            throw new ExecutionPlanValidationException("Invalid no of arguments passed to str:equalsIgnoreCase() function, required 2, "
-                + "but found " + attributeExpressionExecutors.length);
-        }
-        if (attributeExpressionExecutors[0].getReturnType() != Attribute.Type.STRING) {
-            throw new ExecutionPlanValidationException("Invalid parameter type found for the first argument of str:equalsIgnoreCase() function, "
-                + "required " + Attribute.Type.STRING + ", but found " + attributeExpressionExecutors[0].getReturnType().toString());
-        }
-        if (attributeExpressionExecutors[1].getReturnType() != Attribute.Type.STRING) {
-            throw new ExecutionPlanValidationException("Invalid parameter type found for the second argument of str:equalsIgnoreCase() function, "
-                + "required " + Attribute.Type.STRING + ", but found " + attributeExpressionExecutors[1].getReturnType().toString());
-        }
-    }
-
-    @Override
-    protected Object execute(Object[] data) {
-        if (data[0] == null) {
-            throw new ExecutionPlanRuntimeException("Invalid input given to str:equalsIgnoreCase() function. First argument cannot be null");
-        }
-        if (data[1] == null) {
-            throw new ExecutionPlanRuntimeException("Invalid input given to str:equalsIgnoreCase() function. Second argument cannot be null");
-        }
-        String str1 = (String) data[0];
-        String str2 = (String) data[1];
-        return str1.equalsIgnoreCase(str2);
-    }
-
-    @Override
-    protected Object execute(Object data) {
-        return null; //Since the equalsIgnoreCase function takes in 2 parameters, this method does not get called. Hence, not implemented.
-    }
-
-    @Override
-    public void start() {
-        //Nothing to start
-    }
-
-    @Override
-    public void stop() {
-        //Nothing to stop
-    }
-
-    @Override
-    public Attribute.Type getReturnType() {
-        return returnType;
-    }
-
-    @Override
-    public Object[] currentState() {
-        return new Object[] {};
-    }
-
-    @Override
-    public void restoreState(Object[] state) {
-    }
-}

http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/siddhi/extension/RegexpIgnoreCaseFunctionExtension.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/siddhi/extension/RegexpIgnoreCaseFunctionExtension.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/siddhi/extension/RegexpIgnoreCaseFunctionExtension.java
deleted file mode 100644
index fe2280f..0000000
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/siddhi/extension/RegexpIgnoreCaseFunctionExtension.java
+++ /dev/null
@@ -1,90 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.eagle.alert.engine.siddhi.extension;
-
-import org.wso2.siddhi.core.config.ExecutionPlanContext;
-import org.wso2.siddhi.core.exception.ExecutionPlanRuntimeException;
-import org.wso2.siddhi.core.executor.ConstantExpressionExecutor;
-import org.wso2.siddhi.core.executor.ExpressionExecutor;
-import org.wso2.siddhi.extension.string.RegexpFunctionExtension;
-import org.wso2.siddhi.query.api.definition.Attribute;
-import org.wso2.siddhi.query.api.exception.ExecutionPlanValidationException;
-
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
-
-/**
- * regexpIgnoreCase(string, regex)
- * Tells whether or not this 'string' matches the given regular expression 'regex'.
- * Accept Type(s): (STRING,STRING)
- * Return Type(s): BOOLEAN
- */
-public class RegexpIgnoreCaseFunctionExtension extends RegexpFunctionExtension {
-
-    //state-variables
-    boolean isRegexConstant = false;
-    String regexConstant;
-    Pattern patternConstant;
-
-    @Override
-    protected void init(ExpressionExecutor[] attributeExpressionExecutors, ExecutionPlanContext executionPlanContext) {
-        if (attributeExpressionExecutors.length != 2) {
-            throw new ExecutionPlanValidationException("Invalid no of arguments passed to str:regexpIgnoreCase() function, required 2, "
-                + "but found " + attributeExpressionExecutors.length);
-        }
-        if (attributeExpressionExecutors[0].getReturnType() != Attribute.Type.STRING) {
-            throw new ExecutionPlanValidationException("Invalid parameter type found for the first argument of str:regexpIgnoreCase() function, "
-                + "required " + Attribute.Type.STRING + ", but found " + attributeExpressionExecutors[0].getReturnType().toString());
-        }
-        if (attributeExpressionExecutors[1].getReturnType() != Attribute.Type.STRING) {
-            throw new ExecutionPlanValidationException("Invalid parameter type found for the second argument of str:regexpIgnoreCase() function, "
-                + "required " + Attribute.Type.STRING + ", but found " + attributeExpressionExecutors[1].getReturnType().toString());
-        }
-        if (attributeExpressionExecutors[1] instanceof ConstantExpressionExecutor) {
-            isRegexConstant = true;
-            regexConstant = (String) ((ConstantExpressionExecutor) attributeExpressionExecutors[1]).getValue();
-            patternConstant = Pattern.compile(regexConstant, Pattern.CASE_INSENSITIVE);
-        }
-    }
-
-    @Override
-    protected Object execute(Object[] data) {
-        String regex;
-        Pattern pattern;
-        Matcher matcher;
-
-        if (data[0] == null) {
-            throw new ExecutionPlanRuntimeException("Invalid input given to str:regexpIgnoreCase() function. First argument cannot be null");
-        }
-        if (data[1] == null) {
-            throw new ExecutionPlanRuntimeException("Invalid input given to str:regexpIgnoreCase() function. Second argument cannot be null");
-        }
-        String source = (String) data[0];
-
-        if (!isRegexConstant) {
-            regex = (String) data[1];
-            pattern = Pattern.compile(regex, Pattern.CASE_INSENSITIVE);
-            matcher = pattern.matcher(source);
-            return matcher.matches();
-
-        } else {
-            matcher = patternConstant.matcher(source);
-            return matcher.matches();
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/sorter/BaseStreamWindow.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/sorter/BaseStreamWindow.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/sorter/BaseStreamWindow.java
deleted file mode 100644
index a72f728..0000000
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/sorter/BaseStreamWindow.java
+++ /dev/null
@@ -1,184 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.engine.sorter;
-
-import org.apache.eagle.alert.engine.PartitionedEventCollector;
-import org.apache.commons.lang3.builder.HashCodeBuilder;
-import org.apache.eagle.common.DateTimeUtil;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicLong;
-
-/**
- * TODO: Make sure thread-safe.
- * TODO: Leverage Off-Heap Memory to persist append-only events collection.
- */
-public abstract class BaseStreamWindow implements StreamWindow {
-    private final long endTime;
-    private final long startTime;
-    private final long margin;
-    private final AtomicBoolean expired;
-    private final long createdTime;
-    private static final Logger LOG = LoggerFactory.getLogger(BaseStreamWindow.class);
-    private PartitionedEventCollector collector;
-    private final AtomicLong lastFlushedStreamTime;
-    private final AtomicLong lastFlushedSystemTime;
-
-    public BaseStreamWindow(long startTime, long endTime, long marginTime) {
-        if (startTime >= endTime) {
-            throw new IllegalArgumentException("startTime: " + startTime + " >= endTime: " + endTime + ", expected: startTime < endTime");
-        }
-        if (marginTime > endTime - startTime) {
-            throw new IllegalArgumentException("marginTime: " + marginTime + " > endTime: " + endTime + " - startTime " + startTime + ", expected: marginTime < endTime - startTime");
-        }
-        this.startTime = startTime;
-        this.endTime = endTime;
-        this.margin = marginTime;
-        this.expired = new AtomicBoolean(false);
-        this.createdTime = System.currentTimeMillis();
-        this.lastFlushedStreamTime = new AtomicLong(0);
-        this.lastFlushedSystemTime = new AtomicLong(this.createdTime);
-    }
-
-    @Override
-    public void register(PartitionedEventCollector collector) {
-        if (this.collector != null) {
-            throw new IllegalArgumentException("Duplicated collector error");
-        }
-        this.collector = collector;
-    }
-
-    @Override
-    public long createdTime() {
-        return createdTime;
-    }
-
-    public long startTime() {
-        return this.startTime;
-    }
-
-    @Override
-    public long rejectTime() {
-        return this.lastFlushedStreamTime.get();
-    }
-
-    @Override
-    public long margin() {
-        return this.margin;
-    }
-
-    public long endTime() {
-        return this.endTime;
-    }
-
-    public boolean accept(final long eventTime) {
-        return !expired() && eventTime >= startTime && eventTime < endTime
-            && eventTime >= lastFlushedStreamTime.get(); // dropped
-    }
-
-    public boolean expired() {
-        return expired.get();
-    }
-
-    @Override
-    public boolean alive() {
-        return !expired.get();
-    }
-
-    /**
-     * Expire when
-     * 1) If stream time >= endTime + marginTime, then flush and expire
-     * 2) If systemTime - flushedTime > endTime - startTime + marginTime && streamTime >= endTime, then flush and expire.
-     * 3) If systemTime - flushedTime > endTime - startTime + marginTime && streamTime < endTime, then flush but not expire.
-     * 4) else do nothing
-     *
-     * @param clock            stream time clock
-     * @param globalSystemTime system time clock
-     */
-    @Override
-    public synchronized void onTick(StreamTimeClock clock, long globalSystemTime) {
-        if (!expired()) {
-            if (clock.getTime() >= endTime + margin) {
-                LOG.info("Expiring {} at stream time:{}, latency:{}, window: {}", clock.getStreamId(),
-                    DateTimeUtil.millisecondsToHumanDateWithMilliseconds(clock.getTime()), globalSystemTime - lastFlushedSystemTime.get(), this);
-                lastFlushedStreamTime.set(clock.getTime());
-                lastFlushedSystemTime.set(globalSystemTime);
-                flush();
-                expired.set(true);
-            } else if (globalSystemTime - lastFlushedSystemTime.get() >= endTime + margin - startTime && size() > 0) {
-                LOG.info("Flushing {} at system time: {}, stream time: {}, latency: {}, window: {}", clock.getStreamId(),
-                    DateTimeUtil.millisecondsToHumanDateWithMilliseconds(globalSystemTime),
-                    DateTimeUtil.millisecondsToHumanDateWithMilliseconds(clock.getTime()), globalSystemTime - lastFlushedSystemTime.get(), this);
-                lastFlushedStreamTime.set(clock.getTime());
-                lastFlushedSystemTime.set(globalSystemTime);
-                flush();
-                if (lastFlushedStreamTime.get() >= this.endTime) {
-                    expired.set(true);
-                }
-            }
-        } else {
-            LOG.warn("Window has already expired, should not tick: {}", this.toString());
-        }
-    }
-
-    public void close() {
-        flush();
-        expired.set(true);
-    }
-
-    @Override
-    public int hashCode() {
-        return new HashCodeBuilder().append(startTime).append(endTime).append(margin).build();
-    }
-
-    @Override
-    public boolean equals(Object obj) {
-        if (obj != null && obj instanceof BaseStreamWindow) {
-            BaseStreamWindow another = (BaseStreamWindow) obj;
-            return another.startTime == this.startTime && another.endTime == this.endTime && another.margin == this.margin;
-        }
-        return false;
-    }
-
-    @Override
-    public void flush() {
-        if (this.collector == null) {
-            throw new NullPointerException("Collector is not given before window flush");
-        }
-        this.flush(collector);
-    }
-
-    /**
-     * @param collector PartitionedEventCollector.
-     * @return max timestamp.
-     */
-    protected abstract void flush(PartitionedEventCollector collector);
-
-    @Override
-    public String toString() {
-        return String.format("StreamWindow[period=[%s,%s), margin=%s ms, size=%s, reject=%s]",
-            DateTimeUtil.millisecondsToHumanDateWithMilliseconds(this.startTime),
-            DateTimeUtil.millisecondsToHumanDateWithMilliseconds(this.endTime),
-            this.margin,
-            size(),
-            this.rejectTime() == 0 ? DateTimeUtil.millisecondsToHumanDateWithMilliseconds(this.startTime) : DateTimeUtil.millisecondsToHumanDateWithMilliseconds(this.rejectTime())
-        );
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/sorter/StreamTimeClock.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/sorter/StreamTimeClock.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/sorter/StreamTimeClock.java
deleted file mode 100644
index 13e60d6..0000000
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/sorter/StreamTimeClock.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.engine.sorter;
-
-/**
- * The time clock per stream should be thread-safe between getTime and moveForward.
- * By default, we currently simple support event timestamp now.
- */
-public interface StreamTimeClock {
-    /**
-     * Get stream id.
-     *
-     * @return stream id
-     */
-    String getStreamId();
-
-    /**
-     * Get current time.
-     *
-     * @return current timestamp value
-     */
-    long getTime();
-
-    /**
-     * @param timestamp move forward current time to given timestamp.
-     */
-    void moveForward(long timestamp);
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/sorter/StreamTimeClockListener.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/sorter/StreamTimeClockListener.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/sorter/StreamTimeClockListener.java
deleted file mode 100644
index b88f66e..0000000
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/sorter/StreamTimeClockListener.java
+++ /dev/null
@@ -1,28 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.engine.sorter;
-
-public interface StreamTimeClockListener {
-    /**
-     * StreamTimeClockListener onTick callback.
-     *
-     * @param streamTime
-     * @param globalSystemTime
-     * @see StreamWindow
-     */
-    void onTick(StreamTimeClock streamTime, long globalSystemTime);
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/sorter/StreamTimeClockManager.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/sorter/StreamTimeClockManager.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/sorter/StreamTimeClockManager.java
deleted file mode 100644
index 08878fd..0000000
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/sorter/StreamTimeClockManager.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.engine.sorter;
-
-import java.io.Serializable;
-
-/**
- * By default, we could keep the current time clock in memory,
- * Eventually we may need to consider the global time synchronization across all nodes
- *
- * <p>TODO: maybe need to synchronize time clock globally</p>
- *
- * <p>1) When to initialize window according to start time
- * 2) When to close expired window according to current time
- * 3) Automatically tick periodically as the single place for control lock.</p>
- */
-public interface StreamTimeClockManager extends StreamTimeClockTrigger, Serializable {
-    /**
-     * @return StreamTimeClock instance.
-     */
-    StreamTimeClock createStreamTimeClock(String streamId);
-
-    StreamTimeClock getStreamTimeClock(String streamId);
-
-    void removeStreamTimeClock(String streamId);
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/sorter/StreamTimeClockTrigger.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/sorter/StreamTimeClockTrigger.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/sorter/StreamTimeClockTrigger.java
deleted file mode 100644
index 494ef05..0000000
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/sorter/StreamTimeClockTrigger.java
+++ /dev/null
@@ -1,56 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.engine.sorter;
-
-
-/**
- * Possible implementation:
- *
- * <p>1) EventTimeClockTrigger (by default).
- * 2) SystemTimeClockTrigger.</p>
- */
-public interface StreamTimeClockTrigger {
-    /**
-     * @param streamId stream id to listen to.
-     * @param listener to watch on streamId.
-     */
-    void registerListener(String streamId, StreamTimeClockListener listener);
-
-    void registerListener(StreamTimeClock streamClock, StreamTimeClockListener listener);
-
-    /**
-     * @param listener listener to remove.
-     */
-    void removeListener(StreamTimeClockListener listener);
-
-    /**
-     * Trigger tick of all listeners on certain stream.
-     *
-     * @param streamId stream id
-     */
-    void triggerTickOn(String streamId);
-
-    /**
-     * Update time per new event time on stream.
-     *
-     * @param streamId
-     * @param timestamp
-     */
-    void onTimeUpdate(String streamId, long timestamp);
-
-    void close();
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/sorter/StreamWindow.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/sorter/StreamWindow.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/sorter/StreamWindow.java
deleted file mode 100644
index c30f00f..0000000
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/sorter/StreamWindow.java
+++ /dev/null
@@ -1,103 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.engine.sorter;
-
-import org.apache.eagle.alert.engine.PartitionedEventCollector;
-import org.apache.eagle.alert.engine.model.PartitionedEvent;
-
-/**
- * <h2>Tumbling Window instead Sliding Window</h2>
- * We could have time overlap to sort out-of-ordered stream,
- * but each window should never have events overlap, otherwise will have logic problem.
- * <h2>Ingestion Time Policy</h2>
- * Different notions of time, namely processing time, event time, and ingestion time.
- * <ol>
- * <li>
- * In processing time, windows are defined with respect to the wall clock of the machine that builds and processes a window,
- * i.e., a one minute processing time window collects elements for exactly one minute.
- * </li>
- * <li>
- * In event time, windows are defined with respect to timestamps that are attached to each event record. This is common for
- * many types of events, such as log entries, sensor data, etc, where the timestamp usually represents the time at which the
- * event occurred. Event time has several benefits over processing time. First of all, it decouples the program semantics
- * from the actual serving speed of the source and the processing performance of system.
- * Hence you can process historic data, which is served at maximum speed, and continuously produced data with the same program.
- * It also prevents semantically incorrect results in case of backpressure or delays due to failure recovery.
- *
- * Second, event time windows compute correct results, even if events arrive out-of-order of their timestamp which is common
- * if a data stream gathers events from distributed sources.
- * </li>
- * <li>
- * Ingestion time is a hybrid of processing and event time. It assigns wall clock timestamps to records as soon as they arrive
- * in the system (at the source) and continues processing with event time semantics based on the attached timestamps.
- * </li>
- * </ol>
- */
-public interface StreamWindow extends StreamTimeClockListener {
-    /**
-     * @return Created timestamp.
-     */
-    long createdTime();
-
-    /**
-     * Get start time.
-     */
-    long startTime();
-
-    long margin();
-
-    /**
-     * @return reject timestamp < rejectTime().
-     */
-    long rejectTime();
-
-    /**
-     * Get end time.
-     */
-    long endTime();
-
-    /**
-     * @param timestamp event time.
-     * @return true/false in boolean.
-     */
-    boolean accept(long timestamp);
-
-    /**
-     * Window is expired.
-     *
-     * @return whether window is expired
-     */
-    boolean expired();
-
-    /**
-     * @return whether window is alive.
-     */
-    boolean alive();
-
-    boolean add(PartitionedEvent event);
-
-    void flush();
-
-    /**
-     * Close window.
-     */
-    void close();
-
-    void register(PartitionedEventCollector collector);
-
-    int size();
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/sorter/StreamWindowManager.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/sorter/StreamWindowManager.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/sorter/StreamWindowManager.java
deleted file mode 100644
index efa1014..0000000
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/sorter/StreamWindowManager.java
+++ /dev/null
@@ -1,59 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.engine.sorter;
-
-import java.io.Closeable;
-import java.util.Collection;
-
-/**
- * TODO: Reuse existing expired window to avoid recreating new windows again and again
- * <p>Single stream window manager.</p>
- */
-public interface StreamWindowManager extends StreamTimeClockListener, Closeable {
-
-    /**
-     * addNewWindow.
-     */
-    StreamWindow addNewWindow(long initialTime);
-
-    /**
-     * removeWindow.
-     */
-    void removeWindow(StreamWindow window);
-
-    /**
-     * hasWindow.
-     *
-     * @return if has window
-     */
-    boolean hasWindow(StreamWindow window);
-
-    /**
-     * @param timestamp time.
-     * @return whether window exists for time.
-     */
-    boolean hasWindowFor(long timestamp);
-
-    /**
-     * @return Internal collection for performance optimization.
-     */
-    Collection<StreamWindow> getWindows();
-
-    StreamWindow getWindowFor(long timestamp);
-
-    boolean reject(long timestamp);
-}
\ No newline at end of file