You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@eagle.apache.org by ha...@apache.org on 2017/04/03 11:54:29 UTC
[21/84] [partial] eagle git commit: Clean repo for eagle site
http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/Serializers.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/Serializers.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/Serializers.java
deleted file mode 100644
index a84b5dc..0000000
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/Serializers.java
+++ /dev/null
@@ -1,57 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.engine.serialization;
-
-import org.apache.eagle.alert.engine.coordinator.StreamColumn;
-import org.apache.eagle.alert.engine.serialization.impl.*;
-
-import java.util.HashMap;
-import java.util.Map;
-
-public class Serializers {
- private static final Map<StreamColumn.Type, Serializer<?>> COLUMN_TYPE_SER_MAPPING = new HashMap<>();
-
- public static <T> void register(StreamColumn.Type type, Serializer<T> serializer) {
- if (COLUMN_TYPE_SER_MAPPING.containsKey(type)) {
- throw new IllegalArgumentException("Duplicated column type: " + type);
- }
- COLUMN_TYPE_SER_MAPPING.put(type, serializer);
- }
-
- @SuppressWarnings("unchecked")
- public static <T> Serializer<T> getColumnSerializer(StreamColumn.Type type) {
- if (COLUMN_TYPE_SER_MAPPING.containsKey(type)) {
- return (Serializer<T>) COLUMN_TYPE_SER_MAPPING.get(type);
- } else {
- throw new IllegalArgumentException("Serializer of type: " + type + " not found");
- }
- }
-
- public static PartitionedEventSerializer newPartitionedEventSerializer(SerializationMetadataProvider metadataProvider) {
- return new PartitionedEventSerializerImpl(metadataProvider);
- }
-
- static {
- register(StreamColumn.Type.STRING, new StringSerializer());
- register(StreamColumn.Type.INT, new IntegerSerializer());
- register(StreamColumn.Type.LONG, new LongSerializer());
- register(StreamColumn.Type.FLOAT, new FloatSerializer());
- register(StreamColumn.Type.DOUBLE, new DoubleSerializer());
- register(StreamColumn.Type.BOOL, new BooleanSerializer());
- register(StreamColumn.Type.OBJECT, new JavaObjectSerializer());
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/impl/BooleanSerializer.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/impl/BooleanSerializer.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/impl/BooleanSerializer.java
deleted file mode 100644
index 1e90569..0000000
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/impl/BooleanSerializer.java
+++ /dev/null
@@ -1,35 +0,0 @@
-package org.apache.eagle.alert.engine.serialization.impl;
-
-import org.apache.eagle.alert.engine.serialization.Serializer;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-public class BooleanSerializer implements Serializer<Boolean> {
- @Override
- public void serialize(Boolean value, DataOutput dataOutput) throws IOException {
- dataOutput.writeBoolean(value);
- }
-
- @Override
- public Boolean deserialize(DataInput dataInput) throws IOException {
- return dataInput.readBoolean();
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/impl/DoubleSerializer.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/impl/DoubleSerializer.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/impl/DoubleSerializer.java
deleted file mode 100644
index df56124..0000000
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/impl/DoubleSerializer.java
+++ /dev/null
@@ -1,38 +0,0 @@
-package org.apache.eagle.alert.engine.serialization.impl;
-
-import org.apache.eagle.alert.engine.serialization.Serializer;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-public class DoubleSerializer implements Serializer<Object> {
- @Override
- public void serialize(Object value, DataOutput dataOutput) throws IOException {
- if (value instanceof Number) {
- value = ((Number)value).doubleValue();
- }
- dataOutput.writeDouble((double)value);
- }
-
- @Override
- public Object deserialize(DataInput dataInput) throws IOException {
- return dataInput.readDouble();
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/impl/FloatSerializer.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/impl/FloatSerializer.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/impl/FloatSerializer.java
deleted file mode 100644
index 0ae48e3..0000000
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/impl/FloatSerializer.java
+++ /dev/null
@@ -1,38 +0,0 @@
-package org.apache.eagle.alert.engine.serialization.impl;
-
-import org.apache.eagle.alert.engine.serialization.Serializer;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-public class FloatSerializer implements Serializer<Object> {
- @Override
- public void serialize(Object value, DataOutput dataOutput) throws IOException {
- if (value instanceof Number) {
- value = ((Number)value).floatValue();
- }
- dataOutput.writeFloat((float)value);
- }
-
- @Override
- public Object deserialize(DataInput dataInput) throws IOException {
- return dataInput.readFloat();
- }
-}
http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/impl/IntegerSerializer.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/impl/IntegerSerializer.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/impl/IntegerSerializer.java
deleted file mode 100644
index b698167..0000000
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/impl/IntegerSerializer.java
+++ /dev/null
@@ -1,38 +0,0 @@
-package org.apache.eagle.alert.engine.serialization.impl;
-
-import org.apache.eagle.alert.engine.serialization.Serializer;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-public class IntegerSerializer implements Serializer<Object> {
- @Override
- public void serialize(Object value, DataOutput dataOutput) throws IOException {
- if (value instanceof Number) {
- value = ((Number) value).intValue();
- }
- dataOutput.writeInt((int) value);
- }
-
- @Override
- public Object deserialize(DataInput dataInput) throws IOException {
- return dataInput.readInt();
- }
-}
http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/impl/JavaObjectSerializer.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/impl/JavaObjectSerializer.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/impl/JavaObjectSerializer.java
deleted file mode 100644
index 14d9ea5..0000000
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/impl/JavaObjectSerializer.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.engine.serialization.impl;
-
-import org.apache.eagle.alert.engine.serialization.Serializer;
-import org.apache.commons.lang3.SerializationUtils;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-import java.io.Serializable;
-
-public class JavaObjectSerializer implements Serializer<Object> {
- @Override
- public void serialize(Object value, DataOutput dataOutput) throws IOException {
- byte[] bytes = SerializationUtils.serialize((Serializable) value);
- dataOutput.writeInt(bytes.length);
- dataOutput.write(bytes);
- }
-
- @Override
- public Object deserialize(DataInput dataInput) throws IOException {
- int len = dataInput.readInt();
- byte[] bytes = new byte[len];
- dataInput.readFully(bytes);
- return SerializationUtils.deserialize(bytes);
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/impl/LongSerializer.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/impl/LongSerializer.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/impl/LongSerializer.java
deleted file mode 100644
index efe7e3a..0000000
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/impl/LongSerializer.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.engine.serialization.impl;
-
-import org.apache.eagle.alert.engine.serialization.Serializer;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-
-public class LongSerializer implements Serializer<Object> {
- @Override
- public void serialize(Object value, DataOutput dataOutput) throws IOException {
- if (value instanceof Number) {
- value = ((Number) value).longValue();
- }
- dataOutput.writeLong((long) value);
- }
-
- @Override
- public Long deserialize(DataInput dataInput) throws IOException {
- return dataInput.readLong();
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/impl/PartitionedEventSerializerImpl.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/impl/PartitionedEventSerializerImpl.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/impl/PartitionedEventSerializerImpl.java
deleted file mode 100644
index 2b0140f..0000000
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/impl/PartitionedEventSerializerImpl.java
+++ /dev/null
@@ -1,102 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.engine.serialization.impl;
-
-import org.apache.eagle.alert.engine.coordinator.StreamPartition;
-import org.apache.eagle.alert.engine.model.PartitionedEvent;
-import org.apache.eagle.alert.engine.model.StreamEvent;
-import org.apache.eagle.alert.engine.serialization.PartitionedEventSerializer;
-import org.apache.eagle.alert.engine.serialization.SerializationMetadataProvider;
-import org.apache.eagle.alert.engine.serialization.Serializer;
-import org.apache.eagle.alert.engine.utils.CompressionUtils;
-import com.google.common.io.ByteArrayDataOutput;
-import com.google.common.io.ByteStreams;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-
-/**
- * Stream Metadata Cached Serializer
- *
- * <p> Performance:
- *
- * 1) VS Kryo Direct: reduce 73.4% space (bytes) and 42.5 % time (ms).
- * 2) VS Java Native: reduce 92.5% space (bytes) and 94.2% time (ms)
- * </p>
- *
- * <p>Tips:
- * 1) Without-compression performs better than with compression for small event
- * </p>
- *
- * <p>TODO: Cache Partition would save little space but almost half of serialization time, how to balance the performance?</p>
- *
- * @see PartitionedEvent
- */
-public class PartitionedEventSerializerImpl implements Serializer<PartitionedEvent>, PartitionedEventSerializer {
- private final StreamEventSerializer streamEventSerializer;
- private final Serializer<StreamPartition> streamPartitionSerializer;
- private final boolean compress;
-
- /**
- * @param serializationMetadataProvider metadata provider.
- * @param compress false by default.
- */
- public PartitionedEventSerializerImpl(SerializationMetadataProvider serializationMetadataProvider, boolean compress) {
- this.streamEventSerializer = new StreamEventSerializer(serializationMetadataProvider);
- this.streamPartitionSerializer = StreamPartitionSerializer.INSTANCE;
- this.compress = compress;
- }
-
- public PartitionedEventSerializerImpl(SerializationMetadataProvider serializationMetadataProvider) {
- this.streamEventSerializer = new StreamEventSerializer(serializationMetadataProvider);
- this.streamPartitionSerializer = StreamPartitionSerializer.INSTANCE;
- this.compress = false;
- }
-
- @Override
- public void serialize(PartitionedEvent entity, DataOutput dataOutput) throws IOException {
- dataOutput.writeLong(entity.getPartitionKey());
- streamEventSerializer.serialize(entity.getEvent(), dataOutput);
- streamPartitionSerializer.serialize(entity.getPartition(), dataOutput);
- }
-
- @Override
- public byte[] serialize(PartitionedEvent entity) throws IOException {
- ByteArrayDataOutput dataOutput = ByteStreams.newDataOutput();
- this.serialize(entity, dataOutput);
- return compress ? CompressionUtils.compress(dataOutput.toByteArray()) : dataOutput.toByteArray();
- }
-
- @Override
- public PartitionedEvent deserialize(DataInput dataInput) throws IOException {
- PartitionedEvent event = new PartitionedEvent();
- event.setPartitionKey(dataInput.readLong());
- StreamEvent streamEvent = streamEventSerializer.deserialize(dataInput);
- StreamPartition partition = streamPartitionSerializer.deserialize(dataInput);
- event.setEvent(streamEvent);
- partition.setStreamId(streamEvent.getStreamId());
- event.setPartition(partition);
- return event;
- }
-
-
- @Override
- public PartitionedEvent deserialize(byte[] bytes) throws IOException {
- return this.deserialize(ByteStreams.newDataInput(compress ? CompressionUtils.decompress(bytes) : bytes));
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/impl/StreamEventSerializer.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/impl/StreamEventSerializer.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/impl/StreamEventSerializer.java
deleted file mode 100644
index 8ffcb83..0000000
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/impl/StreamEventSerializer.java
+++ /dev/null
@@ -1,118 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.engine.serialization.impl;
-
-import org.apache.eagle.alert.engine.coordinator.StreamColumn;
-import org.apache.eagle.alert.engine.coordinator.StreamDefinition;
-import org.apache.eagle.alert.engine.model.StreamEvent;
-import org.apache.eagle.alert.engine.serialization.SerializationMetadataProvider;
-import org.apache.eagle.alert.engine.serialization.Serializer;
-import org.apache.eagle.alert.engine.serialization.Serializers;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-import java.util.BitSet;
-
-/**
- * StreamEventSerializer.
- *
- * @see StreamEvent
- */
-public class StreamEventSerializer implements Serializer<StreamEvent> {
- private final SerializationMetadataProvider serializationMetadataProvider;
-
- public StreamEventSerializer(SerializationMetadataProvider serializationMetadataProvider) {
- this.serializationMetadataProvider = serializationMetadataProvider;
- }
-
- private BitSet isNullBitSet(Object[] objects) {
- BitSet bitSet = new BitSet();
- int i = 0;
- for (Object obj : objects) {
- bitSet.set(i, obj == null);
- i++;
- }
- return bitSet;
- }
-
- @Override
- public void serialize(StreamEvent event, DataOutput dataOutput) throws IOException {
- // Bryant: here "metaVersion/streamId" writes to dataOutputUTF
- String metaVersion = event.getMetaVersion();
- String streamId = event.getStreamId();
- String metaVersionStreamId = String.format("%s/%s", metaVersion, streamId);
-
- dataOutput.writeUTF(metaVersionStreamId);
- dataOutput.writeLong(event.getTimestamp());
- if (event.getData() == null || event.getData().length == 0) {
- dataOutput.writeInt(0);
- } else {
- BitSet isNullIndex = isNullBitSet(event.getData());
- byte[] isNullBytes = isNullIndex.toByteArray();
- dataOutput.writeInt(isNullBytes.length);
- dataOutput.write(isNullBytes);
- int i = 0;
- StreamDefinition definition = serializationMetadataProvider.getStreamDefinition(event.getStreamId());
- if (definition == null) {
- throw new IOException("StreamDefinition not found: " + event.getStreamId());
- }
- if (event.getData().length != definition.getColumns().size()) {
- throw new IOException("Event :" + event + " doesn't match with schema: " + definition);
- }
- for (StreamColumn column : definition.getColumns()) {
- if (!isNullIndex.get(i)) {
- Serializers.getColumnSerializer(column.getType()).serialize(event.getData()[i], dataOutput);
- }
- i++;
- }
- }
- }
-
- @Override
- public StreamEvent deserialize(DataInput dataInput) throws IOException {
- StreamEvent event = new StreamEvent();
- String metaVersionStreamId = dataInput.readUTF();
- String streamId = metaVersionStreamId.split("/")[1];
- String metaVersion = metaVersionStreamId.split("/")[0];
- // sometimes metaVersionStreamId will be "null/id", then metaVersion will be "null" rather than null
- // need to handle it for future use
- if (metaVersion.equals("null")) {
- metaVersion = null;
- }
-
- event.setStreamId(streamId);
- event.setMetaVersion(metaVersion);
-
- StreamDefinition definition = serializationMetadataProvider.getStreamDefinition(event.getStreamId());
- event.setTimestamp(dataInput.readLong());
- int isNullBytesLen = dataInput.readInt();
- byte[] isNullBytes = new byte[isNullBytesLen];
- dataInput.readFully(isNullBytes);
- BitSet isNullIndex = BitSet.valueOf(isNullBytes);
- Object[] attributes = new Object[definition.getColumns().size()];
- int i = 0;
- for (StreamColumn column : definition.getColumns()) {
- if (!isNullIndex.get(i)) {
- attributes[i] = Serializers.getColumnSerializer(column.getType()).deserialize(dataInput);
- }
- i++;
- }
- event.setData(attributes);
- return event;
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/impl/StreamPartitionDigestSerializer.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/impl/StreamPartitionDigestSerializer.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/impl/StreamPartitionDigestSerializer.java
deleted file mode 100644
index 6a47f1e..0000000
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/impl/StreamPartitionDigestSerializer.java
+++ /dev/null
@@ -1,103 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.engine.serialization.impl;
-
-import org.apache.eagle.alert.engine.coordinator.StreamPartition;
-import org.apache.eagle.alert.engine.serialization.Serializer;
-
-import java.io.*;
-import java.security.MessageDigest;
-import java.security.NoSuchAlgorithmException;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.Map;
-
-/**
- * Don't serialize streamId.
- *
- * @see StreamPartition
- */
-public class StreamPartitionDigestSerializer implements Serializer<StreamPartition> {
- public static final StreamPartitionDigestSerializer INSTANCE = new StreamPartitionDigestSerializer();
-
- private final Map<DigestBytes, StreamPartition> checkSumPartitionMap = new HashMap<>();
- private final Map<StreamPartition, DigestBytes> partitionCheckSumMap = new HashMap<>();
-
- @Override
- public void serialize(StreamPartition partition, DataOutput dataOutput) throws IOException {
- DigestBytes checkSum = partitionCheckSumMap.get(partition);
- if (checkSum == null) {
- try {
- checkSum = digestCheckSum(partition);
- partitionCheckSumMap.put(partition, checkSum);
- checkSumPartitionMap.put(checkSum, partition);
- } catch (NoSuchAlgorithmException e) {
- throw new IOException(e);
- }
- }
- dataOutput.writeInt(checkSum.size());
- dataOutput.write(checkSum.toByteArray());
- }
-
- @Override
- public StreamPartition deserialize(DataInput dataInput) throws IOException {
- int checkSumLen = dataInput.readInt();
- byte[] checksum = new byte[checkSumLen];
- dataInput.readFully(checksum);
- StreamPartition partition = checkSumPartitionMap.get(new DigestBytes(checksum));
- if (partition == null) {
- throw new IOException("Illegal partition checksum: " + checksum);
- }
- return partition;
- }
-
- private class DigestBytes {
- private final byte[] data;
-
- public DigestBytes(byte[] bytes) {
- this.data = bytes;
- }
-
- @Override
- public boolean equals(Object other) {
- return other instanceof DigestBytes && Arrays.equals(data, ((DigestBytes) other).data);
- }
-
- @Override
- public int hashCode() {
- return Arrays.hashCode(data);
- }
-
- public int size() {
- return data.length;
- }
-
- public byte[] toByteArray() {
- return data;
- }
- }
-
- private DigestBytes digestCheckSum(Object obj) throws IOException, NoSuchAlgorithmException {
- ByteArrayOutputStream baos = new ByteArrayOutputStream();
- ObjectOutputStream oos = new ObjectOutputStream(baos);
- oos.writeObject(obj);
- oos.close();
- MessageDigest m = MessageDigest.getInstance("SHA1");
- m.update(baos.toByteArray());
- return new DigestBytes(m.digest());
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/impl/StreamPartitionSerializer.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/impl/StreamPartitionSerializer.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/impl/StreamPartitionSerializer.java
deleted file mode 100644
index 411368f..0000000
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/impl/StreamPartitionSerializer.java
+++ /dev/null
@@ -1,80 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.engine.serialization.impl;
-
-import org.apache.eagle.alert.engine.coordinator.StreamPartition;
-import org.apache.eagle.alert.engine.coordinator.StreamSortSpec;
-import org.apache.eagle.alert.engine.serialization.Serializer;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-
-/**
- * Don't serialize streamId.
- *
- * @see StreamPartition
- */
-public class StreamPartitionSerializer implements Serializer<StreamPartition> {
- public static final StreamPartitionSerializer INSTANCE = new StreamPartitionSerializer();
-
- @Override
- public void serialize(StreamPartition partition, DataOutput dataOutput) throws IOException {
- dataOutput.writeUTF(partition.getType().toString());
- if (partition.getColumns() == null || partition.getColumns().size() == 0) {
- dataOutput.writeInt(0);
- } else {
- dataOutput.writeInt(partition.getColumns().size());
- for (String column : partition.getColumns()) {
- dataOutput.writeUTF(column);
- }
- }
- if (partition.getSortSpec() == null) {
- dataOutput.writeByte(0);
- } else {
- dataOutput.writeByte(1);
- dataOutput.writeUTF(partition.getSortSpec().getWindowPeriod());
- dataOutput.writeInt(partition.getSortSpec().getWindowMargin());
- }
- }
-
- @Override
- public StreamPartition deserialize(DataInput dataInput) throws IOException {
- StreamPartition partition = new StreamPartition();
- partition.setType(StreamPartition.Type.locate(dataInput.readUTF()));
- int colSize = dataInput.readInt();
- if (colSize > 0) {
- List<String> columns = new ArrayList<>(colSize);
- for (int i = 0; i < colSize; i++) {
- columns.add(dataInput.readUTF());
- }
- partition.setColumns(columns);
- }
- if (dataInput.readByte() == 1) {
- String period = dataInput.readUTF();
- int margin = dataInput.readInt();
-
- StreamSortSpec sortSpec = new StreamSortSpec();
- sortSpec.setWindowPeriod(period);
- sortSpec.setWindowMargin(margin);
- partition.setSortSpec(sortSpec);
- }
- return partition;
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/impl/StringSerializer.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/impl/StringSerializer.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/impl/StringSerializer.java
deleted file mode 100644
index 2a1541a..0000000
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/impl/StringSerializer.java
+++ /dev/null
@@ -1,35 +0,0 @@
-package org.apache.eagle.alert.engine.serialization.impl;
-
-import org.apache.eagle.alert.engine.serialization.Serializer;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-public class StringSerializer implements Serializer<String> {
- @Override
- public void serialize(String value, DataOutput dataOutput) throws IOException {
- dataOutput.writeUTF(value);
- }
-
- @Override
- public String deserialize(DataInput dataInput) throws IOException {
- return dataInput.readUTF();
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/siddhi/extension/AttributeCollectAggregator.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/siddhi/extension/AttributeCollectAggregator.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/siddhi/extension/AttributeCollectAggregator.java
deleted file mode 100644
index 599f349..0000000
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/siddhi/extension/AttributeCollectAggregator.java
+++ /dev/null
@@ -1,120 +0,0 @@
-/*
- *
- * * Licensed to the Apache Software Foundation (ASF) under one or more
- * * contributor license agreements. See the NOTICE file distributed with
- * * this work for additional information regarding copyright ownership.
- * * The ASF licenses this file to You under the Apache License, Version 2.0
- * * (the "License"); you may not use this file except in compliance with
- * * the License. You may obtain a copy of the License at
- * *
- * * http://www.apache.org/licenses/LICENSE-2.0
- * *
- * * Unless required by applicable law or agreed to in writing, software
- * * distributed under the License is distributed on an "AS IS" BASIS,
- * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * * See the License for the specific language governing permissions and
- * * limitations under the License.
- *
- */
-package org.apache.eagle.alert.engine.siddhi.extension;
-
-import com.google.common.collect.ImmutableList;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.wso2.siddhi.core.config.ExecutionPlanContext;
-import org.wso2.siddhi.core.executor.ExpressionExecutor;
-import org.wso2.siddhi.core.query.selector.attribute.aggregator.AttributeAggregator;
-import org.wso2.siddhi.query.api.definition.Attribute;
-import org.wso2.siddhi.query.api.definition.Attribute.Type;
-
-import java.util.LinkedList;
-
-/**
- * @since Apr 1, 2016.
- */
-public class AttributeCollectAggregator extends AttributeAggregator {
-
- private static final Logger LOG = LoggerFactory.getLogger(AttributeCollectAggregator.class);
-
- private LinkedList<Object> value;
-
- public AttributeCollectAggregator() {
- value = new LinkedList<Object>();
- }
-
- @Override
- public void start() {
- }
-
- @Override
- public void stop() {
- }
-
- @Override
- public Object[] currentState() {
- return value.toArray();
- }
-
- @Override
- public void restoreState(Object[] arg0) {
- value = new LinkedList<Object>();
- if (arg0 != null) {
- for (Object o : arg0) {
- value.add(o);
- }
- }
- }
-
- @Override
- public Type getReturnType() {
- return Attribute.Type.OBJECT;
- }
-
- @Override
- protected void init(ExpressionExecutor[] arg0, ExecutionPlanContext arg1) {
- // TODO: Support max of elements?
- }
-
- @Override
- public Object processAdd(Object arg0) {
- value.add(arg0);
- if (LOG.isDebugEnabled()) {
- LOG.debug("processAdd: current values are : " + value);
- }
- return ImmutableList.copyOf(value);
- }
-
- @Override
- public Object processAdd(Object[] arg0) {
- value.add(arg0);
- if (LOG.isDebugEnabled()) {
- LOG.debug("processAdd: current values are : " + value);
- }
- return ImmutableList.copyOf(value);
- }
-
- // / NOTICE: non O(1)
- @Override
- public Object processRemove(Object arg0) {
- value.remove(arg0);
- if (LOG.isDebugEnabled()) {
- LOG.debug("processRemove: current values are : " + value);
- }
- return ImmutableList.copyOf(value);
- }
-
- // / NOTICE: non O(1)
- @Override
- public Object processRemove(Object[] arg0) {
- value.remove(arg0);
- LOG.info("processRemove: current values are : " + value);
- return ImmutableList.copyOf(value);
- }
-
- @Override
- public Object reset() {
- value.clear();
- return value;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/siddhi/extension/AttributeCollectWithDistinctAggregator.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/siddhi/extension/AttributeCollectWithDistinctAggregator.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/siddhi/extension/AttributeCollectWithDistinctAggregator.java
deleted file mode 100644
index 101d05b..0000000
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/siddhi/extension/AttributeCollectWithDistinctAggregator.java
+++ /dev/null
@@ -1,123 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.engine.siddhi.extension;
-
-import com.google.common.collect.ImmutableList;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.wso2.siddhi.core.config.ExecutionPlanContext;
-import org.wso2.siddhi.core.executor.ExpressionExecutor;
-import org.wso2.siddhi.core.query.selector.attribute.aggregator.AttributeAggregator;
-import org.wso2.siddhi.query.api.definition.Attribute;
-import org.wso2.siddhi.query.api.definition.Attribute.Type;
-
-import java.util.LinkedList;
-
-public class AttributeCollectWithDistinctAggregator extends AttributeAggregator {
-
- private static final Logger LOG = LoggerFactory.getLogger(AttributeCollectAggregator.class);
-
- private LinkedList<Object> value;
-
- public AttributeCollectWithDistinctAggregator() {
- value = new LinkedList<Object>();
- }
-
- @Override
- public void start() {
- }
-
- @Override
- public void stop() {
- }
-
- @Override
- public Object[] currentState() {
- return value.toArray();
- }
-
- @Override
- public void restoreState(Object[] arg0) {
- value = new LinkedList<Object>();
- if (arg0 != null) {
- for (Object o : arg0) {
- value.add(o);
- }
- }
- }
-
- @Override
- public Type getReturnType() {
- return Attribute.Type.OBJECT;
- }
-
- @Override
- protected void init(ExpressionExecutor[] arg0, ExecutionPlanContext arg1) {
- // TODO: Support max of elements?
- }
-
- @Override
- public Object processAdd(Object arg0) {
- // AttributeAggregator.process is already synchronized
- if (value.contains(arg0)) {
- value.remove(arg0);
- }
- value.add(arg0);
- if (LOG.isDebugEnabled()) {
- LOG.debug("processAdd: current values are : " + value);
- }
- return ImmutableList.copyOf(value);
- }
-
- @Override
- public Object processAdd(Object[] arg0) {
- // AttributeAggregator.process is already synchronized
- if (value.contains(arg0)) {
- value.remove(arg0);
- }
- value.add(arg0);
- if (LOG.isDebugEnabled()) {
- LOG.debug("processAdd: current values are : " + value);
- }
- return ImmutableList.copyOf(value);
- }
-
- // / NOTICE: non O(1)
- @Override
- public Object processRemove(Object arg0) {
- value.remove(arg0);
- if (LOG.isDebugEnabled()) {
- LOG.debug("processRemove: current values are : " + value);
- }
- return ImmutableList.copyOf(value);
- }
-
- // / NOTICE: non O(1)
- @Override
- public Object processRemove(Object[] arg0) {
- value.remove(arg0);
- LOG.info("processRemove: current values are : " + value);
- return ImmutableList.copyOf(value);
- }
-
- @Override
- public Object reset() {
- value.clear();
- return value;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/siddhi/extension/ContainsIgnoreCaseExtension.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/siddhi/extension/ContainsIgnoreCaseExtension.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/siddhi/extension/ContainsIgnoreCaseExtension.java
deleted file mode 100644
index 27df63b..0000000
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/siddhi/extension/ContainsIgnoreCaseExtension.java
+++ /dev/null
@@ -1,88 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.eagle.alert.engine.siddhi.extension;
-
-import org.wso2.siddhi.core.config.ExecutionPlanContext;
-import org.wso2.siddhi.core.exception.ExecutionPlanRuntimeException;
-import org.wso2.siddhi.core.executor.ExpressionExecutor;
-import org.wso2.siddhi.core.executor.function.FunctionExecutor;
-import org.wso2.siddhi.query.api.definition.Attribute;
-import org.wso2.siddhi.query.api.exception.ExecutionPlanValidationException;
-
-public class ContainsIgnoreCaseExtension extends FunctionExecutor {
-
- Attribute.Type returnType = Attribute.Type.BOOL;
-
- @Override
- protected void init(ExpressionExecutor[] attributeExpressionExecutors, ExecutionPlanContext executionPlanContext) {
- if (attributeExpressionExecutors.length != 2) {
- throw new ExecutionPlanValidationException("Invalid no of arguments passed to str:containsIgnoreCase() function, required 2, "
- + "but found " + attributeExpressionExecutors.length);
- }
- if (attributeExpressionExecutors[0].getReturnType() != Attribute.Type.STRING) {
- throw new ExecutionPlanValidationException("Invalid parameter type found for the first argument of str:containsIgnoreCase() function, "
- + "required " + Attribute.Type.STRING + ", but found " + attributeExpressionExecutors[0].getReturnType().toString());
- }
- if (attributeExpressionExecutors[1].getReturnType() != Attribute.Type.STRING) {
- throw new ExecutionPlanValidationException("Invalid parameter type found for the second argument of str:containsIgnoreCase() function, "
- + "required " + Attribute.Type.STRING + ", but found " + attributeExpressionExecutors[1].getReturnType().toString());
- }
- }
-
- @Override
- protected Object execute(Object[] data) {
- if (data[0] == null) {
- throw new ExecutionPlanRuntimeException("Invalid input given to str:containsIgnoreCase() function. First argument cannot be null");
- }
- if (data[1] == null) {
- throw new ExecutionPlanRuntimeException("Invalid input given to str:containsIgnoreCase() function. Second argument cannot be null");
- }
- String str1 = (String) data[0];
- String str2 = (String) data[1];
- return str1.toUpperCase().contains(str2.toUpperCase());
- }
-
- @Override
- protected Object execute(Object data) {
- return null; //Since the containsIgnoreCase function takes in 2 parameters, this method does not get called. Hence, not implemented.
- }
-
- @Override
- public void start() {
- //Nothing to start
- }
-
- @Override
- public void stop() {
- //Nothing to stop
- }
-
- @Override
- public Attribute.Type getReturnType() {
- return returnType;
- }
-
- @Override
- public Object[] currentState() {
- return new Object[] {};
- }
-
- @Override
- public void restoreState(Object[] state) {
- }
-}
http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/siddhi/extension/EqualsIgnoreCaseExtension.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/siddhi/extension/EqualsIgnoreCaseExtension.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/siddhi/extension/EqualsIgnoreCaseExtension.java
deleted file mode 100644
index 1292e05..0000000
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/siddhi/extension/EqualsIgnoreCaseExtension.java
+++ /dev/null
@@ -1,88 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.eagle.alert.engine.siddhi.extension;
-
-import org.wso2.siddhi.core.config.ExecutionPlanContext;
-import org.wso2.siddhi.core.exception.ExecutionPlanRuntimeException;
-import org.wso2.siddhi.core.executor.ExpressionExecutor;
-import org.wso2.siddhi.core.executor.function.FunctionExecutor;
-import org.wso2.siddhi.query.api.definition.Attribute;
-import org.wso2.siddhi.query.api.exception.ExecutionPlanValidationException;
-
-public class EqualsIgnoreCaseExtension extends FunctionExecutor {
-
- Attribute.Type returnType = Attribute.Type.BOOL;
-
- @Override
- protected void init(ExpressionExecutor[] attributeExpressionExecutors, ExecutionPlanContext executionPlanContext) {
- if (attributeExpressionExecutors.length != 2) {
- throw new ExecutionPlanValidationException("Invalid no of arguments passed to str:equalsIgnoreCase() function, required 2, "
- + "but found " + attributeExpressionExecutors.length);
- }
- if (attributeExpressionExecutors[0].getReturnType() != Attribute.Type.STRING) {
- throw new ExecutionPlanValidationException("Invalid parameter type found for the first argument of str:equalsIgnoreCase() function, "
- + "required " + Attribute.Type.STRING + ", but found " + attributeExpressionExecutors[0].getReturnType().toString());
- }
- if (attributeExpressionExecutors[1].getReturnType() != Attribute.Type.STRING) {
- throw new ExecutionPlanValidationException("Invalid parameter type found for the second argument of str:equalsIgnoreCase() function, "
- + "required " + Attribute.Type.STRING + ", but found " + attributeExpressionExecutors[1].getReturnType().toString());
- }
- }
-
- @Override
- protected Object execute(Object[] data) {
- if (data[0] == null) {
- throw new ExecutionPlanRuntimeException("Invalid input given to str:equalsIgnoreCase() function. First argument cannot be null");
- }
- if (data[1] == null) {
- throw new ExecutionPlanRuntimeException("Invalid input given to str:equalsIgnoreCase() function. Second argument cannot be null");
- }
- String str1 = (String) data[0];
- String str2 = (String) data[1];
- return str1.equalsIgnoreCase(str2);
- }
-
- @Override
- protected Object execute(Object data) {
- return null; //Since the equalsIgnoreCase function takes in 2 parameters, this method does not get called. Hence, not implemented.
- }
-
- @Override
- public void start() {
- //Nothing to start
- }
-
- @Override
- public void stop() {
- //Nothing to stop
- }
-
- @Override
- public Attribute.Type getReturnType() {
- return returnType;
- }
-
- @Override
- public Object[] currentState() {
- return new Object[] {};
- }
-
- @Override
- public void restoreState(Object[] state) {
- }
-}
http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/siddhi/extension/RegexpIgnoreCaseFunctionExtension.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/siddhi/extension/RegexpIgnoreCaseFunctionExtension.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/siddhi/extension/RegexpIgnoreCaseFunctionExtension.java
deleted file mode 100644
index fe2280f..0000000
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/siddhi/extension/RegexpIgnoreCaseFunctionExtension.java
+++ /dev/null
@@ -1,90 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.eagle.alert.engine.siddhi.extension;
-
-import org.wso2.siddhi.core.config.ExecutionPlanContext;
-import org.wso2.siddhi.core.exception.ExecutionPlanRuntimeException;
-import org.wso2.siddhi.core.executor.ConstantExpressionExecutor;
-import org.wso2.siddhi.core.executor.ExpressionExecutor;
-import org.wso2.siddhi.extension.string.RegexpFunctionExtension;
-import org.wso2.siddhi.query.api.definition.Attribute;
-import org.wso2.siddhi.query.api.exception.ExecutionPlanValidationException;
-
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
-
-/**
- * regexpIgnoreCase(string, regex)
- * Tells whether or not this 'string' matches the given regular expression 'regex'.
- * Accept Type(s): (STRING,STRING)
- * Return Type(s): BOOLEAN
- */
-public class RegexpIgnoreCaseFunctionExtension extends RegexpFunctionExtension {
-
- //state-variables
- boolean isRegexConstant = false;
- String regexConstant;
- Pattern patternConstant;
-
- @Override
- protected void init(ExpressionExecutor[] attributeExpressionExecutors, ExecutionPlanContext executionPlanContext) {
- if (attributeExpressionExecutors.length != 2) {
- throw new ExecutionPlanValidationException("Invalid no of arguments passed to str:regexpIgnoreCase() function, required 2, "
- + "but found " + attributeExpressionExecutors.length);
- }
- if (attributeExpressionExecutors[0].getReturnType() != Attribute.Type.STRING) {
- throw new ExecutionPlanValidationException("Invalid parameter type found for the first argument of str:regexpIgnoreCase() function, "
- + "required " + Attribute.Type.STRING + ", but found " + attributeExpressionExecutors[0].getReturnType().toString());
- }
- if (attributeExpressionExecutors[1].getReturnType() != Attribute.Type.STRING) {
- throw new ExecutionPlanValidationException("Invalid parameter type found for the second argument of str:regexpIgnoreCase() function, "
- + "required " + Attribute.Type.STRING + ", but found " + attributeExpressionExecutors[1].getReturnType().toString());
- }
- if (attributeExpressionExecutors[1] instanceof ConstantExpressionExecutor) {
- isRegexConstant = true;
- regexConstant = (String) ((ConstantExpressionExecutor) attributeExpressionExecutors[1]).getValue();
- patternConstant = Pattern.compile(regexConstant, Pattern.CASE_INSENSITIVE);
- }
- }
-
- @Override
- protected Object execute(Object[] data) {
- String regex;
- Pattern pattern;
- Matcher matcher;
-
- if (data[0] == null) {
- throw new ExecutionPlanRuntimeException("Invalid input given to str:regexpIgnoreCase() function. First argument cannot be null");
- }
- if (data[1] == null) {
- throw new ExecutionPlanRuntimeException("Invalid input given to str:regexpIgnoreCase() function. Second argument cannot be null");
- }
- String source = (String) data[0];
-
- if (!isRegexConstant) {
- regex = (String) data[1];
- pattern = Pattern.compile(regex, Pattern.CASE_INSENSITIVE);
- matcher = pattern.matcher(source);
- return matcher.matches();
-
- } else {
- matcher = patternConstant.matcher(source);
- return matcher.matches();
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/sorter/BaseStreamWindow.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/sorter/BaseStreamWindow.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/sorter/BaseStreamWindow.java
deleted file mode 100644
index a72f728..0000000
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/sorter/BaseStreamWindow.java
+++ /dev/null
@@ -1,184 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.engine.sorter;
-
-import org.apache.eagle.alert.engine.PartitionedEventCollector;
-import org.apache.commons.lang3.builder.HashCodeBuilder;
-import org.apache.eagle.common.DateTimeUtil;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicLong;
-
-/**
- * TODO: Make sure thread-safe.
- * TODO: Leverage Off-Heap Memory to persist append-only events collection.
- */
-public abstract class BaseStreamWindow implements StreamWindow {
- private final long endTime;
- private final long startTime;
- private final long margin;
- private final AtomicBoolean expired;
- private final long createdTime;
- private static final Logger LOG = LoggerFactory.getLogger(BaseStreamWindow.class);
- private PartitionedEventCollector collector;
- private final AtomicLong lastFlushedStreamTime;
- private final AtomicLong lastFlushedSystemTime;
-
- public BaseStreamWindow(long startTime, long endTime, long marginTime) {
- if (startTime >= endTime) {
- throw new IllegalArgumentException("startTime: " + startTime + " >= endTime: " + endTime + ", expected: startTime < endTime");
- }
- if (marginTime > endTime - startTime) {
- throw new IllegalArgumentException("marginTime: " + marginTime + " > endTime: " + endTime + " - startTime " + startTime + ", expected: marginTime < endTime - startTime");
- }
- this.startTime = startTime;
- this.endTime = endTime;
- this.margin = marginTime;
- this.expired = new AtomicBoolean(false);
- this.createdTime = System.currentTimeMillis();
- this.lastFlushedStreamTime = new AtomicLong(0);
- this.lastFlushedSystemTime = new AtomicLong(this.createdTime);
- }
-
- @Override
- public void register(PartitionedEventCollector collector) {
- if (this.collector != null) {
- throw new IllegalArgumentException("Duplicated collector error");
- }
- this.collector = collector;
- }
-
- @Override
- public long createdTime() {
- return createdTime;
- }
-
- public long startTime() {
- return this.startTime;
- }
-
- @Override
- public long rejectTime() {
- return this.lastFlushedStreamTime.get();
- }
-
- @Override
- public long margin() {
- return this.margin;
- }
-
- public long endTime() {
- return this.endTime;
- }
-
- public boolean accept(final long eventTime) {
- return !expired() && eventTime >= startTime && eventTime < endTime
- && eventTime >= lastFlushedStreamTime.get(); // dropped
- }
-
- public boolean expired() {
- return expired.get();
- }
-
- @Override
- public boolean alive() {
- return !expired.get();
- }
-
- /**
- * Expire when
- * 1) If stream time >= endTime + marginTime, then flush and expire
- * 2) If systemTime - flushedTime > endTime - startTime + marginTime && streamTime >= endTime, then flush and expire.
- * 3) If systemTime - flushedTime > endTime - startTime + marginTime && streamTime < endTime, then flush but not expire.
- * 4) else do nothing
- *
- * @param clock stream time clock
- * @param globalSystemTime system time clock
- */
- @Override
- public synchronized void onTick(StreamTimeClock clock, long globalSystemTime) {
- if (!expired()) {
- if (clock.getTime() >= endTime + margin) {
- LOG.info("Expiring {} at stream time:{}, latency:{}, window: {}", clock.getStreamId(),
- DateTimeUtil.millisecondsToHumanDateWithMilliseconds(clock.getTime()), globalSystemTime - lastFlushedSystemTime.get(), this);
- lastFlushedStreamTime.set(clock.getTime());
- lastFlushedSystemTime.set(globalSystemTime);
- flush();
- expired.set(true);
- } else if (globalSystemTime - lastFlushedSystemTime.get() >= endTime + margin - startTime && size() > 0) {
- LOG.info("Flushing {} at system time: {}, stream time: {}, latency: {}, window: {}", clock.getStreamId(),
- DateTimeUtil.millisecondsToHumanDateWithMilliseconds(globalSystemTime),
- DateTimeUtil.millisecondsToHumanDateWithMilliseconds(clock.getTime()), globalSystemTime - lastFlushedSystemTime.get(), this);
- lastFlushedStreamTime.set(clock.getTime());
- lastFlushedSystemTime.set(globalSystemTime);
- flush();
- if (lastFlushedStreamTime.get() >= this.endTime) {
- expired.set(true);
- }
- }
- } else {
- LOG.warn("Window has already expired, should not tick: {}", this.toString());
- }
- }
-
- public void close() {
- flush();
- expired.set(true);
- }
-
- @Override
- public int hashCode() {
- return new HashCodeBuilder().append(startTime).append(endTime).append(margin).build();
- }
-
- @Override
- public boolean equals(Object obj) {
- if (obj != null && obj instanceof BaseStreamWindow) {
- BaseStreamWindow another = (BaseStreamWindow) obj;
- return another.startTime == this.startTime && another.endTime == this.endTime && another.margin == this.margin;
- }
- return false;
- }
-
- @Override
- public void flush() {
- if (this.collector == null) {
- throw new NullPointerException("Collector is not given before window flush");
- }
- this.flush(collector);
- }
-
- /**
- * @param collector PartitionedEventCollector.
- * @return max timestamp.
- */
- protected abstract void flush(PartitionedEventCollector collector);
-
- @Override
- public String toString() {
- return String.format("StreamWindow[period=[%s,%s), margin=%s ms, size=%s, reject=%s]",
- DateTimeUtil.millisecondsToHumanDateWithMilliseconds(this.startTime),
- DateTimeUtil.millisecondsToHumanDateWithMilliseconds(this.endTime),
- this.margin,
- size(),
- this.rejectTime() == 0 ? DateTimeUtil.millisecondsToHumanDateWithMilliseconds(this.startTime) : DateTimeUtil.millisecondsToHumanDateWithMilliseconds(this.rejectTime())
- );
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/sorter/StreamTimeClock.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/sorter/StreamTimeClock.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/sorter/StreamTimeClock.java
deleted file mode 100644
index 13e60d6..0000000
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/sorter/StreamTimeClock.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.engine.sorter;
-
-/**
- * The time clock per stream should be thread-safe between getTime and moveForward.
- * By default, we currently simple support event timestamp now.
- */
-public interface StreamTimeClock {
- /**
- * Get stream id.
- *
- * @return stream id
- */
- String getStreamId();
-
- /**
- * Get current time.
- *
- * @return current timestamp value
- */
- long getTime();
-
- /**
- * @param timestamp move forward current time to given timestamp.
- */
- void moveForward(long timestamp);
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/sorter/StreamTimeClockListener.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/sorter/StreamTimeClockListener.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/sorter/StreamTimeClockListener.java
deleted file mode 100644
index b88f66e..0000000
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/sorter/StreamTimeClockListener.java
+++ /dev/null
@@ -1,28 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.engine.sorter;
-
-public interface StreamTimeClockListener {
- /**
- * StreamTimeClockListener onTick callback.
- *
- * @param streamTime
- * @param globalSystemTime
- * @see StreamWindow
- */
- void onTick(StreamTimeClock streamTime, long globalSystemTime);
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/sorter/StreamTimeClockManager.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/sorter/StreamTimeClockManager.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/sorter/StreamTimeClockManager.java
deleted file mode 100644
index 08878fd..0000000
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/sorter/StreamTimeClockManager.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.engine.sorter;
-
-import java.io.Serializable;
-
-/**
- * By default, we could keep the current time clock in memory,
- * Eventually we may need to consider the global time synchronization across all nodes
- *
- * <p>TODO: maybe need to synchronize time clock globally</p>
- *
- * <p>1) When to initialize window according to start time
- * 2) When to close expired window according to current time
- * 3) Automatically tick periodically as the single place for control lock.</p>
- */
-public interface StreamTimeClockManager extends StreamTimeClockTrigger, Serializable {
- /**
- * @return StreamTimeClock instance.
- */
- StreamTimeClock createStreamTimeClock(String streamId);
-
- StreamTimeClock getStreamTimeClock(String streamId);
-
- void removeStreamTimeClock(String streamId);
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/sorter/StreamTimeClockTrigger.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/sorter/StreamTimeClockTrigger.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/sorter/StreamTimeClockTrigger.java
deleted file mode 100644
index 494ef05..0000000
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/sorter/StreamTimeClockTrigger.java
+++ /dev/null
@@ -1,56 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.engine.sorter;
-
-
-/**
- * Possible implementation:
- *
- * <p>1) EventTimeClockTrigger (by default).
- * 2) SystemTimeClockTrigger.</p>
- */
-public interface StreamTimeClockTrigger {
- /**
- * @param streamId stream id to listen to.
- * @param listener to watch on streamId.
- */
- void registerListener(String streamId, StreamTimeClockListener listener);
-
- void registerListener(StreamTimeClock streamClock, StreamTimeClockListener listener);
-
- /**
- * @param listener listener to remove.
- */
- void removeListener(StreamTimeClockListener listener);
-
- /**
- * Trigger tick of all listeners on certain stream.
- *
- * @param streamId stream id
- */
- void triggerTickOn(String streamId);
-
- /**
- * Update time per new event time on stream.
- *
- * @param streamId
- * @param timestamp
- */
- void onTimeUpdate(String streamId, long timestamp);
-
- void close();
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/sorter/StreamWindow.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/sorter/StreamWindow.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/sorter/StreamWindow.java
deleted file mode 100644
index c30f00f..0000000
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/sorter/StreamWindow.java
+++ /dev/null
@@ -1,103 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.engine.sorter;
-
-import org.apache.eagle.alert.engine.PartitionedEventCollector;
-import org.apache.eagle.alert.engine.model.PartitionedEvent;
-
-/**
- * <h2>Tumbling Window instead Sliding Window</h2>
- * We could have time overlap to sort out-of-ordered stream,
- * but each window should never have events overlap, otherwise will have logic problem.
- * <h2>Ingestion Time Policy</h2>
- * Different notions of time, namely processing time, event time, and ingestion time.
- * <ol>
- * <li>
- * In processing time, windows are defined with respect to the wall clock of the machine that builds and processes a window,
- * i.e., a one minute processing time window collects elements for exactly one minute.
- * </li>
- * <li>
- * In event time, windows are defined with respect to timestamps that are attached to each event record. This is common for
- * many types of events, such as log entries, sensor data, etc, where the timestamp usually represents the time at which the
- * event occurred. Event time has several benefits over processing time. First of all, it decouples the program semantics
- * from the actual serving speed of the source and the processing performance of system.
- * Hence you can process historic data, which is served at maximum speed, and continuously produced data with the same program.
- * It also prevents semantically incorrect results in case of backpressure or delays due to failure recovery.
- *
- * Second, event time windows compute correct results, even if events arrive out-of-order of their timestamp which is common
- * if a data stream gathers events from distributed sources.
- * </li>
- * <li>
- * Ingestion time is a hybrid of processing and event time. It assigns wall clock timestamps to records as soon as they arrive
- * in the system (at the source) and continues processing with event time semantics based on the attached timestamps.
- * </li>
- * </ol>
- */
-public interface StreamWindow extends StreamTimeClockListener {
- /**
- * @return Created timestamp.
- */
- long createdTime();
-
- /**
- * Get start time.
- */
- long startTime();
-
- long margin();
-
- /**
- * @return reject timestamp < rejectTime().
- */
- long rejectTime();
-
- /**
- * Get end time.
- */
- long endTime();
-
- /**
- * @param timestamp event time.
- * @return true/false in boolean.
- */
- boolean accept(long timestamp);
-
- /**
- * Window is expired.
- *
- * @return whether window is expired
- */
- boolean expired();
-
- /**
- * @return whether window is alive.
- */
- boolean alive();
-
- boolean add(PartitionedEvent event);
-
- void flush();
-
- /**
- * Close window.
- */
- void close();
-
- void register(PartitionedEventCollector collector);
-
- int size();
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/sorter/StreamWindowManager.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/sorter/StreamWindowManager.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/sorter/StreamWindowManager.java
deleted file mode 100644
index efa1014..0000000
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/sorter/StreamWindowManager.java
+++ /dev/null
@@ -1,59 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.engine.sorter;
-
-import java.io.Closeable;
-import java.util.Collection;
-
-/**
- * TODO: Reuse existing expired window to avoid recreating new windows again and again
- * <p>Single stream window manager.</p>
- */
-public interface StreamWindowManager extends StreamTimeClockListener, Closeable {
-
- /**
- * addNewWindow.
- */
- StreamWindow addNewWindow(long initialTime);
-
- /**
- * removeWindow.
- */
- void removeWindow(StreamWindow window);
-
- /**
- * hasWindow.
- *
- * @return if has window
- */
- boolean hasWindow(StreamWindow window);
-
- /**
- * @param timestamp time.
- * @return whether window exists for time.
- */
- boolean hasWindowFor(long timestamp);
-
- /**
- * @return Internal collection for performance optimization.
- */
- Collection<StreamWindow> getWindows();
-
- StreamWindow getWindowFor(long timestamp);
-
- boolean reject(long timestamp);
-}
\ No newline at end of file