You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@eagle.apache.org by ha...@apache.org on 2017/04/03 11:54:28 UTC
[20/84] [partial] eagle git commit: Clean repo for eagle site
http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/sorter/StreamWindowRepository.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/sorter/StreamWindowRepository.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/sorter/StreamWindowRepository.java
deleted file mode 100755
index 7a503e1..0000000
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/sorter/StreamWindowRepository.java
+++ /dev/null
@@ -1,252 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.engine.sorter;
-
-import org.apache.eagle.alert.engine.sorter.impl.StreamSortedWindowInMapDB;
-import org.apache.eagle.alert.engine.sorter.impl.StreamSortedWindowOnHeap;
-import com.google.common.base.Preconditions;
-import org.mapdb.DB;
-import org.mapdb.DBMaker;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.File;
-import java.io.IOException;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.UUID;
-
-/**
- * ===== Benchmark Result Report =====<br/><br/>
- *
- * <p>Num. Operation Type Time<br/>
- * ---- --------- ---- ----<br/>
- * 1000 FlushTime DIRECT_MEMORY : 55<br/>
- * 1000 FlushTime FILE_RAF : 63<br/>
- * 1000 FlushTime MEMORY : 146<br/>
- * 1000 FlushTime ONHEAP : 17<br/>
- * 1000 InsertTime DIRECT_MEMORY : 68<br/>
- * 1000 InsertTime FILE_RAF : 223<br/>
- * 1000 InsertTime MEMORY : 273<br/>
- * 1000 InsertTime ONHEAP : 20<br/>
- * 10000 FlushTime DIRECT_MEMORY : 551<br/>
- * 10000 FlushTime FILE_RAF : 668<br/>
- * 10000 FlushTime MEMORY : 643<br/>
- * 10000 FlushTime ONHEAP : 5<br/>
- * 10000 InsertTime DIRECT_MEMORY : 446<br/>
- * 10000 InsertTime FILE_RAF : 2095<br/>
- * 10000 InsertTime MEMORY : 784<br/>
- * 10000 InsertTime ONHEAP : 29<br/>
- * 100000 FlushTime DIRECT_MEMORY : 6139<br/>
- * 100000 FlushTime FILE_RAF : 6237<br/>
- * 100000 FlushTime MEMORY : 6238<br/>
- * 100000 FlushTime ONHEAP : 18<br/>
- * 100000 InsertTime DIRECT_MEMORY : 4499<br/>
- * 100000 InsertTime FILE_RAF : 22343<br/>
- * 100000 InsertTime MEMORY : 4962<br/>
- * 100000 InsertTime ONHEAP : 107<br/>
- * 1000000 FlushTime DIRECT_MEMORY : 61356<br/>
- * 1000000 FlushTime FILE_RAF : 63025<br/>
- * 1000000 FlushTime MEMORY : 61380<br/>
- * 1000000 FlushTime ONHEAP : 47<br/>
- * 1000000 InsertTime DIRECT_MEMORY : 43637<br/>
- * 1000000 InsertTime FILE_RAF : 464481<br/>
- * 1000000 InsertTime MEMORY : 44367<br/>
- * 1000000 InsertTime ONHEAP : 2040<br/>
- * </p>
- * @see StreamSortedWindowOnHeap
- * @see org.mapdb.DBMaker
- */
-public class StreamWindowRepository {
- public enum StorageType {
- /**
- * Creates new in-memory database which stores all data on heap without serialization.
- * This mode should be very fast, but data will affect Garbage PartitionedEventCollector the same way as traditional Java Collections.
- */
- ONHEAP,
-
- /**
- * Creates new in-memory database. Changes are lost after JVM exits.
- * This option serializes data into {@code byte[]},
- * so they are not affected by Garbage PartitionedEventCollector.
- */
- MEMORY,
-
- /**
- * <p>
- * Creates new in-memory database. Changes are lost after JVM exits.
- * </p><p>
- * This will use {@code DirectByteBuffer} outside of HEAP, so Garbage Collector is not affected
- * You should increase ammount of direct memory with
- * {@code -XX:MaxDirectMemorySize=10G} JVM param
- * </p>
- */
- DIRECT_MEMORY,
-
- /**
- * By default use File.createTempFile("streamwindows","temp")
- */
- FILE_RAF
- }
-
- private static final Logger LOG = LoggerFactory.getLogger(StreamWindowRepository.class);
- private final Map<StorageType, DB> dbPool;
-
- private StreamWindowRepository() {
- dbPool = new HashMap<>();
- }
-
- private static StreamWindowRepository repository;
-
- /**
- * Close automatically when JVM exists.
- *
- * @return StreamWindowRepository singletonInstance
- */
- public static StreamWindowRepository getSingletonInstance() {
- synchronized (StreamWindowRepository.class) {
- if (repository == null) {
- repository = new StreamWindowRepository();
- Runtime.getRuntime().addShutdownHook(new Thread() {
- @Override
- public void run() {
- repository.close();
- }
- });
- }
- return repository;
- }
- }
-
- private DB createMapDB(StorageType storageType) {
- synchronized (dbPool) {
- if (!dbPool.containsKey(storageType)) {
- DB db;
- switch (storageType) {
- case ONHEAP:
- db = DBMaker.heapDB().closeOnJvmShutdown().make();
- LOG.info("Create ONHEAP mapdb");
- break;
- case MEMORY:
- db = DBMaker.memoryDB().closeOnJvmShutdown().make();
- LOG.info("Create MEMORY mapdb");
- break;
- case DIRECT_MEMORY:
- db = DBMaker.memoryDirectDB().closeOnJvmShutdown().make();
- LOG.info("Create DIRECT_MEMORY mapdb");
- break;
- case FILE_RAF:
- try {
- File file = File.createTempFile("window-", ".map");
- file.delete();
- file.deleteOnExit();
- Preconditions.checkNotNull(file, "file is null");
- db = DBMaker.fileDB(file).deleteFilesAfterClose().make();
- LOG.info("Created FILE_RAF map file at {}", file.getAbsolutePath());
- } catch (IOException e) {
- throw new IllegalStateException(e);
- }
- break;
- default:
- throw new IllegalArgumentException("Illegal storage type: " + storageType);
- }
- dbPool.put(storageType, db);
- return db;
- }
- return dbPool.get(storageType);
- }
- }
-
- public StreamWindow createWindow(long start, long end, long margin, StorageType type) {
- StreamWindow ret;
- switch (type) {
- case ONHEAP:
- ret = new StreamSortedWindowOnHeap(start, end, margin);
- break;
- default:
- ret = new StreamSortedWindowInMapDB(
- start, end, margin,
- createMapDB(type),
- UUID.randomUUID().toString()
- );
- break;
- }
-
- if (LOG.isDebugEnabled()) {
- LOG.debug("Created new {}, type: {}", ret, type);
- }
- return ret;
- }
-
- public StreamWindow createWindow(long start, long end, long margin, StreamWindowStrategy strategy) {
- return strategy.createWindow(start, end, margin, this);
- }
-
- public StreamWindow createWindow(long start, long end, long margin) {
- return OnHeapStrategy.INSTANCE.createWindow(start, end, margin, this);
- }
-
- public void close() {
- for (Map.Entry<StorageType, DB> entry : dbPool.entrySet()) {
- entry.getValue().close();
- }
- dbPool.clear();
- }
-
- public interface StreamWindowStrategy {
- StreamWindow createWindow(long start, long end, long margin, StreamWindowRepository repository);
- }
-
- public static class OnHeapStrategy implements StreamWindowStrategy {
- public static final OnHeapStrategy INSTANCE = new OnHeapStrategy();
-
- @Override
- public StreamWindow createWindow(long start, long end, long margin, StreamWindowRepository repository) {
- return repository.createWindow(start, end, margin, StorageType.ONHEAP);
- }
- }
-
- public static class WindowSizeStrategy implements StreamWindowStrategy {
- private static final long ONE_HOUR = 3600 * 1000;
- private static final long FIVE_HOURS = 5 * 3600 * 1000;
- private final long onheapWindowSizeLimit;
- private final long offheapWindowSizeLimit;
-
- public static WindowSizeStrategy INSTANCE = new WindowSizeStrategy(ONE_HOUR, FIVE_HOURS);
-
- public WindowSizeStrategy(long onheapWindowSizeLimit, long offheapWindowSizeLimit) {
- this.offheapWindowSizeLimit = offheapWindowSizeLimit;
- this.onheapWindowSizeLimit = onheapWindowSizeLimit;
-
- if (this.offheapWindowSizeLimit < this.onheapWindowSizeLimit) {
- throw new IllegalStateException("offheapWindowSizeLimit " + this.offheapWindowSizeLimit + " < onheapWindowSizeLimit " + this.onheapWindowSizeLimit);
- }
- }
-
- @Override
- public StreamWindow createWindow(long start, long end, long margin, StreamWindowRepository repository) {
- long windowLength = end - start;
- if (windowLength <= onheapWindowSizeLimit) {
- return repository.createWindow(start, end, margin, StreamWindowRepository.StorageType.ONHEAP);
- } else if (windowLength > onheapWindowSizeLimit & windowLength <= offheapWindowSizeLimit) {
- return repository.createWindow(start, end, margin, StreamWindowRepository.StorageType.DIRECT_MEMORY);
- } else {
- return repository.createWindow(start, end, margin, StreamWindowRepository.StorageType.FILE_RAF);
- }
- }
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/sorter/impl/CachedEventGroupSerializer.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/sorter/impl/CachedEventGroupSerializer.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/sorter/impl/CachedEventGroupSerializer.java
deleted file mode 100644
index 73adee6..0000000
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/sorter/impl/CachedEventGroupSerializer.java
+++ /dev/null
@@ -1,115 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.engine.sorter.impl;
-
-import org.apache.eagle.alert.engine.coordinator.StreamPartition;
-import org.apache.eagle.alert.engine.model.PartitionedEvent;
-import org.apache.eagle.alert.engine.model.StreamEvent;
-import org.apache.eagle.alert.engine.utils.SerializableUtils;
-import org.apache.commons.lang.builder.HashCodeBuilder;
-import org.jetbrains.annotations.NotNull;
-import org.mapdb.DataInput2;
-import org.mapdb.DataOutput2;
-import org.mapdb.serializer.GroupSerializerObjectArray;
-
-import java.io.IOException;
-import java.util.HashMap;
-import java.util.Map;
-
-/**
- * @deprecated performance is worse, should investigate.
- */
-public class CachedEventGroupSerializer extends GroupSerializerObjectArray<PartitionedEvent[]> {
- private Map<Integer, StreamPartition> hashCodePartitionDict = new HashMap<>();
-
- private void writePartitionedEvent(DataOutput2 out, PartitionedEvent event) throws IOException {
- out.packLong(event.getPartitionKey());
- int partitionHashCode = 0;
- if (event.getPartition() != null) {
- partitionHashCode = event.getPartition().hashCode();
- if (!hashCodePartitionDict.containsKey(partitionHashCode)) {
- hashCodePartitionDict.put(partitionHashCode, event.getPartition());
- }
- }
- out.packInt(partitionHashCode);
- if (event.getEvent() != null) {
- byte[] eventBytes = SerializableUtils.serializeToCompressedByteArray(event.getEvent());
- out.packInt(eventBytes.length);
- out.write(eventBytes);
- } else {
- out.packInt(0);
- }
- }
-
- private PartitionedEvent readPartitionedEvent(DataInput2 in) throws IOException {
- PartitionedEvent event = new PartitionedEvent();
- event.setPartitionKey(in.unpackLong());
- int partitionHashCode = in.unpackInt();
- if (partitionHashCode != 0 && hashCodePartitionDict.containsKey(partitionHashCode)) {
- event.setPartition(hashCodePartitionDict.get(partitionHashCode));
- }
- int eventBytesLen = in.unpackInt();
- if (eventBytesLen > 0) {
- byte[] eventBytes = new byte[eventBytesLen];
- in.readFully(eventBytes);
- event.setEvent((StreamEvent) SerializableUtils.deserializeFromCompressedByteArray(eventBytes, "Deserialize event from bytes"));
- }
- return event;
- }
-
- @Override
- public void serialize(DataOutput2 out, PartitionedEvent[] value) throws IOException {
- out.packInt(value.length);
- for (PartitionedEvent event : value) {
- writePartitionedEvent(out, event);
- }
- }
-
- @Override
- public PartitionedEvent[] deserialize(DataInput2 in, int available) throws IOException {
- final int size = in.unpackInt();
- PartitionedEvent[] ret = new PartitionedEvent[size];
- for (int i = 0; i < size; i++) {
- ret[i] = readPartitionedEvent(in);
- }
- return ret;
- }
-
- @Override
- public boolean isTrusted() {
- return true;
- }
-
- @Override
- public boolean equals(PartitionedEvent[] a1, PartitionedEvent[] a2) {
- return a1[0].getTimestamp() == a2[0].getTimestamp();
- }
-
- @Override
- public int hashCode(@NotNull PartitionedEvent[] events, int seed) {
- return new HashCodeBuilder().append(events).toHashCode();
- }
-
- @Override
- public int compare(PartitionedEvent[] o1, PartitionedEvent[] o2) {
- if (o1.length > 0 && o2.length > 0) {
- return (int) (o1[0].getTimestamp() - o2[0].getTimestamp());
- } else {
- return 0;
- }
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/sorter/impl/PartitionedEventGroupSerializer.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/sorter/impl/PartitionedEventGroupSerializer.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/sorter/impl/PartitionedEventGroupSerializer.java
deleted file mode 100644
index 55efcaf..0000000
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/sorter/impl/PartitionedEventGroupSerializer.java
+++ /dev/null
@@ -1,113 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.engine.sorter.impl;
-
-import org.apache.eagle.alert.engine.model.PartitionedEvent;
-import org.apache.eagle.alert.engine.utils.SerializableUtils;
-import org.jetbrains.annotations.NotNull;
-import org.mapdb.DataInput2;
-import org.mapdb.DataOutput2;
-import org.mapdb.Serializer;
-import org.mapdb.serializer.GroupSerializer;
-
-import java.io.IOException;
-import java.util.Comparator;
-
-
-public class PartitionedEventGroupSerializer implements GroupSerializer<PartitionedEvent[]> {
- private static final GroupSerializer<byte[]> delegate = Serializer.BYTE_ARRAY;
-
- @Override
- public int valueArraySearch(Object keys, PartitionedEvent[] key) {
- return delegate.valueArraySearch(keys, serialize(key));
- }
-
- @SuppressWarnings("rawtypes")
- @Override
- public int valueArraySearch(Object keys, PartitionedEvent[] key, Comparator comparator) {
- return delegate.valueArraySearch(keys, serialize(key), comparator);
- }
-
- @Override
- public void valueArraySerialize(DataOutput2 out, Object vals) throws IOException {
- delegate.valueArraySerialize(out, vals);
- }
-
- @Override
- public Object valueArrayDeserialize(DataInput2 in, int size) throws IOException {
- return delegate.valueArrayDeserialize(in, size);
- }
-
- @Override
- public PartitionedEvent[] valueArrayGet(Object vals, int pos) {
- return deserialize(delegate.valueArrayGet(vals, pos));
- }
-
- @Override
- public int valueArraySize(Object vals) {
- return delegate.valueArraySize(vals);
- }
-
- @Override
- public Object valueArrayEmpty() {
- return delegate.valueArrayEmpty();
- }
-
- @Override
- public Object valueArrayPut(Object vals, int pos, PartitionedEvent[] newValue) {
- return delegate.valueArrayPut(vals, pos, serialize(newValue));
- }
-
- @Override
- public Object valueArrayUpdateVal(Object vals, int pos, PartitionedEvent[] newValue) {
- return delegate.valueArrayUpdateVal(vals, pos, serialize(newValue));
- }
-
- @Override
- public Object valueArrayFromArray(Object[] objects) {
- return delegate.valueArrayFromArray(objects);
- }
-
- @Override
- public Object valueArrayCopyOfRange(Object vals, int from, int to) {
- return delegate.valueArrayCopyOfRange(vals, from, to);
- }
-
- @Override
- public Object valueArrayDeleteValue(Object vals, int pos) {
- return delegate.valueArrayDeleteValue(vals, pos);
- }
-
- @Override
- public void serialize(@NotNull DataOutput2 out, @NotNull PartitionedEvent[] value) throws IOException {
- delegate.serialize(out, serialize(value));
- }
-
- private static byte[] serialize(PartitionedEvent[] events) {
- return SerializableUtils.serializeToCompressedByteArray(events);
- }
-
- @Override
- public PartitionedEvent[] deserialize(@NotNull DataInput2 input, int available) throws IOException {
- return deserialize(delegate.deserialize(input, available));
- }
-
- private static PartitionedEvent[] deserialize(byte[] bytes) {
- return (PartitionedEvent[]) SerializableUtils.deserializeFromCompressedByteArray(bytes, "deserialize as stream event");
- }
-
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/sorter/impl/PartitionedEventTimeOrderingComparator.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/sorter/impl/PartitionedEventTimeOrderingComparator.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/sorter/impl/PartitionedEventTimeOrderingComparator.java
deleted file mode 100644
index 5378c67..0000000
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/sorter/impl/PartitionedEventTimeOrderingComparator.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.engine.sorter.impl;
-
-import org.apache.eagle.alert.engine.model.PartitionedEvent;
-
-import java.util.Comparator;
-import java.util.Objects;
-
-/**
- * TODO: Stable sorting algorithm for better performance to avoid event resorting with same timestamp?.
- */
-public class PartitionedEventTimeOrderingComparator implements Comparator<PartitionedEvent> {
- public static final PartitionedEventTimeOrderingComparator INSTANCE = new PartitionedEventTimeOrderingComparator();
-
- @Override
- public int compare(PartitionedEvent o1, PartitionedEvent o2) {
- if (Objects.equals(o1, o2)) {
- return 0;
- } else {
- if (o1 == null && o2 == null) {
- return 0;
- } else if (o1 != null && o2 == null) {
- return 1;
- } else if (o1 == null) {
- return -1;
- }
- // Unstable Sorting Algorithm
- if (o1.getTimestamp() <= o2.getTimestamp()) {
- return -1;
- } else {
- return 1;
- }
- }
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/sorter/impl/StreamSortWindowHandlerImpl.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/sorter/impl/StreamSortWindowHandlerImpl.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/sorter/impl/StreamSortWindowHandlerImpl.java
deleted file mode 100644
index fb5ba72..0000000
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/sorter/impl/StreamSortWindowHandlerImpl.java
+++ /dev/null
@@ -1,114 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.engine.sorter.impl;
-
-import org.apache.eagle.alert.engine.PartitionedEventCollector;
-import org.apache.eagle.alert.engine.coordinator.StreamSortSpec;
-import org.apache.eagle.alert.engine.model.PartitionedEvent;
-import org.apache.eagle.alert.engine.router.StreamSortHandler;
-import org.apache.eagle.alert.engine.sorter.StreamTimeClock;
-import org.apache.eagle.alert.engine.sorter.StreamWindow;
-import org.apache.eagle.alert.engine.sorter.StreamWindowManager;
-import org.apache.eagle.common.DateTimeUtil;
-
-import org.joda.time.Period;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-
-public class StreamSortWindowHandlerImpl implements StreamSortHandler {
- private static final Logger LOG = LoggerFactory.getLogger(StreamSortWindowHandlerImpl.class);
- private StreamWindowManager windowManager;
- private StreamSortSpec streamSortSpecSpec;
- private PartitionedEventCollector outputCollector;
- private String streamId;
-
- public void prepare(String streamId, StreamSortSpec streamSortSpecSpec, PartitionedEventCollector outputCollector) {
- this.windowManager = new StreamWindowManagerImpl(
- Period.parse(streamSortSpecSpec.getWindowPeriod()),
- streamSortSpecSpec.getWindowMargin(),
- PartitionedEventTimeOrderingComparator.INSTANCE,
- outputCollector);
- this.streamSortSpecSpec = streamSortSpecSpec;
- this.streamId = streamId;
- this.outputCollector = outputCollector;
- }
-
- /**
- * Entry point to manage window lifecycle.
- *
- * @param event StreamEvent
- */
- public void nextEvent(PartitionedEvent event) {
- final long eventTime = event.getEvent().getTimestamp();
- boolean handled = false;
-
- synchronized (this.windowManager) {
- for (StreamWindow window : this.windowManager.getWindows()) {
- if (window.alive() && window.add(event)) {
- handled = true;
- }
- }
-
- // No window found for the event but not too late being rejected
- if (!handled && !windowManager.reject(eventTime)) {
- // later then all events, create later window
- StreamWindow window = windowManager.addNewWindow(eventTime);
- if (window.add(event)) {
- LOG.info("Created {} of {} at {}", window, this.streamId, DateTimeUtil.millisecondsToHumanDateWithMilliseconds(eventTime));
- handled = true;
- }
- }
- }
-
- if (!handled) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Drop expired event {}", event);
- }
- outputCollector.drop(event);
- }
- }
-
- @Override
- public void onTick(StreamTimeClock clock, long globalSystemTime) {
- windowManager.onTick(clock, globalSystemTime);
- }
-
- @Override
- public void close() {
- try {
- windowManager.close();
- } catch (IOException e) {
- LOG.error("Got exception while closing window manager", e);
- }
- }
-
- @Override
- public String toString() {
- return super.toString();
- }
-
- @Override
- public int hashCode() {
- if (streamSortSpecSpec == null) {
- throw new NullPointerException("streamSortSpec is null");
- } else {
- return streamSortSpecSpec.hashCode();
- }
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/sorter/impl/StreamSortedWindowInMapDB.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/sorter/impl/StreamSortedWindowInMapDB.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/sorter/impl/StreamSortedWindowInMapDB.java
deleted file mode 100644
index 73a63b4..0000000
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/sorter/impl/StreamSortedWindowInMapDB.java
+++ /dev/null
@@ -1,139 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.engine.sorter.impl;
-
-import org.apache.eagle.alert.engine.PartitionedEventCollector;
-import org.apache.eagle.alert.engine.model.PartitionedEvent;
-import org.apache.eagle.alert.engine.model.StreamEvent;
-import org.apache.eagle.alert.engine.sorter.BaseStreamWindow;
-import org.apache.commons.lang3.time.StopWatch;
-import org.mapdb.BTreeMap;
-import org.mapdb.DB;
-import org.mapdb.Serializer;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.Arrays;
-import java.util.concurrent.atomic.AtomicInteger;
-
-/**
- * StreamSortedWindow based on MapDB to support off-heap or disk storage.
- * Stable sorting algorithm
- * See <a href="http://www.mapdb.org">http://www.mapdb.org</a>
- */
-public class StreamSortedWindowInMapDB extends BaseStreamWindow {
- private final String mapId;
- private BTreeMap<Long, PartitionedEvent[]> btreeMap;
- private static final Logger LOG = LoggerFactory.getLogger(StreamSortedWindowInMapDB.class);
- private final AtomicInteger size;
- private long replaceOpCount = 0;
- private static final PartitionedEventGroupSerializer STREAM_EVENT_GROUP_SERIALIZER = new PartitionedEventGroupSerializer();
-
- /**
- * @param mapId physical map id, used to decide whether to reuse or not.
- */
- @SuppressWarnings("unused")
- public StreamSortedWindowInMapDB(long start, long end, long margin, DB db, String mapId) {
- super(start, end, margin);
- this.mapId = mapId;
- try {
- btreeMap = db.<Long, StreamEvent>treeMap(mapId)
- .keySerializer(Serializer.LONG)
- .valueSerializer(STREAM_EVENT_GROUP_SERIALIZER)
- .createOrOpen();
- LOG.debug("Created BTree map {}", mapId);
- } catch (Error error) {
- LOG.info("Failed create BTree {}", mapId, error);
- }
- size = new AtomicInteger(0);
- }
-
- /**
- * Assumed: most of adding operation will do putting only and few require replacing.
- * <ol>
- * <li>
- * First of all, always try to put with created event directly
- * </li>
- * <li>
- * If not absent (key already exists), then append and replace,
- * replace operation will cause more consumption
- * </li>
- * </ol>
- *
- * @param event coming-in event
- * @return whether success
- */
- @Override
- public synchronized boolean add(PartitionedEvent event) {
- long timestamp = event.getEvent().getTimestamp();
- if (accept(timestamp)) {
- boolean absent = btreeMap.putIfAbsentBoolean(timestamp, new PartitionedEvent[] {event});
- if (!absent) {
- size.incrementAndGet();
- return true;
- } else {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Duplicated timestamp {}, will reduce performance as replacing", timestamp);
- }
- PartitionedEvent[] oldValue = btreeMap.get(timestamp);
- PartitionedEvent[] newValue = oldValue == null ? new PartitionedEvent[1] : Arrays.copyOf(oldValue, oldValue.length + 1);
- newValue[newValue.length - 1] = event;
- PartitionedEvent[] removedValue = btreeMap.replace(timestamp, newValue);
- replaceOpCount++;
- if (replaceOpCount % 1000 == 0) {
- LOG.warn("Too many events ({}) with overlap timestamp, may reduce insertion performance", replaceOpCount);
- }
- if (removedValue != null) {
- size.incrementAndGet();
- } else {
- throw new IllegalStateException("Failed to replace key " + timestamp + " with " + newValue.length + " entities array to replace old " + oldValue.length + " entities array");
- }
- return true;
- }
- } else {
- return false;
- }
- }
-
- @Override
- protected synchronized void flush(PartitionedEventCollector collector) {
- StopWatch stopWatch = new StopWatch();
- stopWatch.start();
- btreeMap.valueIterator().forEachRemaining((events) -> {
- for (PartitionedEvent event : events) {
- collector.emit(event);
- }
- });
- btreeMap.clear();
- replaceOpCount = 0;
- stopWatch.stop();
- LOG.info("Flushed {} events in {} ms", size, stopWatch.getTime());
- size.set(0);
- }
-
- @Override
- public synchronized void close() {
- super.close();
- btreeMap.close();
- LOG.info("Closed {}", this.mapId);
- }
-
- @Override
- public synchronized int size() {
- return size.get();
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/sorter/impl/StreamSortedWindowOnHeap.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/sorter/impl/StreamSortedWindowOnHeap.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/sorter/impl/StreamSortedWindowOnHeap.java
deleted file mode 100644
index ed000f1..0000000
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/sorter/impl/StreamSortedWindowOnHeap.java
+++ /dev/null
@@ -1,82 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.engine.sorter.impl;
-
-import org.apache.eagle.alert.engine.PartitionedEventCollector;
-import org.apache.eagle.alert.engine.model.PartitionedEvent;
-import org.apache.eagle.alert.engine.sorter.BaseStreamWindow;
-
-import com.google.common.collect.TreeMultiset;
-import org.apache.commons.lang3.time.StopWatch;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.Comparator;
-
-public class StreamSortedWindowOnHeap extends BaseStreamWindow {
- private static final Logger LOG = LoggerFactory.getLogger(StreamSortedWindowOnHeap.class);
- private final TreeMultiset<PartitionedEvent> treeMultisetCache;
-
- /**
- * @param start start time.
- * @param end end time.
- * @param margin margin time.
- */
- public StreamSortedWindowOnHeap(long start, long end, long margin, Comparator<PartitionedEvent> comparator) {
- super(start, end, margin);
- treeMultisetCache = TreeMultiset.create(comparator);
- }
-
- public StreamSortedWindowOnHeap(long start, long end, long margin) {
- this(start, end, margin, new PartitionedEventTimeOrderingComparator());
- }
-
- @Override
- public boolean add(PartitionedEvent partitionedEvent) {
- synchronized (treeMultisetCache) {
- if (accept(partitionedEvent.getEvent().getTimestamp())) {
- treeMultisetCache.add(partitionedEvent);
- return true;
- } else {
- if (LOG.isDebugEnabled()) {
- LOG.debug("{} is not acceptable, ignored", partitionedEvent);
- }
- return false;
- }
- }
- }
-
- @Override
- protected void flush(PartitionedEventCollector collector) {
- synchronized (treeMultisetCache) {
- StopWatch stopWatch = new StopWatch();
- stopWatch.start();
- treeMultisetCache.forEach(collector::emit);
- int size = treeMultisetCache.size();
- treeMultisetCache.clear();
- stopWatch.stop();
- LOG.info("Flushed {} events in {} ms from {}", size, stopWatch.getTime(), this.toString());
- }
- }
-
- @Override
- public int size() {
- synchronized (treeMultisetCache) {
- return treeMultisetCache.size();
- }
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/sorter/impl/StreamTimeClockInLocalMemory.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/sorter/impl/StreamTimeClockInLocalMemory.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/sorter/impl/StreamTimeClockInLocalMemory.java
deleted file mode 100644
index e5be786..0000000
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/sorter/impl/StreamTimeClockInLocalMemory.java
+++ /dev/null
@@ -1,64 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.engine.sorter.impl;
-
-import org.apache.eagle.alert.engine.sorter.StreamTimeClock;
-import org.apache.eagle.common.DateTimeUtil;
-
-import java.util.concurrent.atomic.AtomicLong;
-
-
-/**
- * In memory thread-safe time clock service.
- * TODO: maybe need to synchronize time clock globally, how to?
- */
-public class StreamTimeClockInLocalMemory implements StreamTimeClock {
- private final AtomicLong currentTime;
- private final String streamId;
-
- public StreamTimeClockInLocalMemory(String streamId, long initialTime) {
- this.streamId = streamId;
- this.currentTime = new AtomicLong(initialTime);
- }
-
- public StreamTimeClockInLocalMemory(String streamId) {
- this(streamId, 0L);
- }
-
- @Override
- public void moveForward(long timestamp) {
- if (timestamp < currentTime.get()) {
- throw new IllegalArgumentException(timestamp + " < " + currentTime.get() + ", should not move time back");
- }
- this.currentTime.set(timestamp);
- }
-
- @Override
- public String getStreamId() {
- return streamId;
- }
-
- @Override
- public long getTime() {
- return currentTime.get();
- }
-
- @Override
- public String toString() {
- return String.format("StreamClock[streamId=%s, now=%s]", streamId, DateTimeUtil.millisecondsToHumanDateWithMilliseconds(currentTime.get()));
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/sorter/impl/StreamTimeClockManagerImpl.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/sorter/impl/StreamTimeClockManagerImpl.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/sorter/impl/StreamTimeClockManagerImpl.java
deleted file mode 100644
index b59918d..0000000
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/sorter/impl/StreamTimeClockManagerImpl.java
+++ /dev/null
@@ -1,169 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.engine.sorter.impl;
-
-import org.apache.eagle.alert.engine.sorter.StreamTimeClock;
-import org.apache.eagle.alert.engine.sorter.StreamTimeClockListener;
-import org.apache.eagle.alert.engine.sorter.StreamTimeClockManager;
-import org.apache.eagle.common.DateTimeUtil;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Timer;
-import java.util.TimerTask;
-import java.util.concurrent.atomic.AtomicInteger;
-
-public final class StreamTimeClockManagerImpl implements StreamTimeClockManager {
- private static final long serialVersionUID = -2770823821511195343L;
- private static final Logger LOG = LoggerFactory.getLogger(StreamTimeClockManagerImpl.class);
- private final Map<String, StreamTimeClock> streamIdTimeClockMap;
- private Timer timer;
-
- private final Map<StreamTimeClockListener, String> listenerStreamIdMap;
- private static final AtomicInteger num = new AtomicInteger();
-
- public StreamTimeClockManagerImpl() {
- listenerStreamIdMap = new HashMap<>();
- streamIdTimeClockMap = new HashMap<>();
- timer = new Timer("StreamScheduler-" + num.getAndIncrement());
- timer.schedule(new TimerTask() {
- @Override
- public void run() {
- // Make sure the timer tick happens one by one
- triggerTickOnAll();
- }
- }, 1000, 1000);
- }
-
- /**
- * By default, we could keep the current time clock in memory,
- * Eventually we may need to consider the global time synchronization across all nodes
- * 1) When to initialize window according to start time
- * 2) When to close expired window according to current time
- *
- * @return StreamTimeClock instance.
- */
- @Override
- public StreamTimeClock createStreamTimeClock(String streamId) {
- synchronized (streamIdTimeClockMap) {
- if (!streamIdTimeClockMap.containsKey(streamId)) {
- StreamTimeClock instance = new StreamTimeClockInLocalMemory(streamId);
- LOG.info("Created {}", instance);
- streamIdTimeClockMap.put(streamId, instance);
- } else {
- LOG.warn("TimeClock for stream already existss: " + streamIdTimeClockMap.get(streamId));
- }
- return streamIdTimeClockMap.get(streamId);
- }
- }
-
- @Override
- public StreamTimeClock getStreamTimeClock(String streamId) {
- synchronized (streamIdTimeClockMap) {
- if (!streamIdTimeClockMap.containsKey(streamId)) {
- LOG.warn("TimeClock for stream {} is not initialized before being called, create now", streamId);
- return createStreamTimeClock(streamId);
- }
- return streamIdTimeClockMap.get(streamId);
- }
- }
-
- @Override
- public void removeStreamTimeClock(String streamId) {
- synchronized (streamIdTimeClockMap) {
- if (streamIdTimeClockMap.containsKey(streamId)) {
- streamIdTimeClockMap.remove(streamId);
- LOG.info("Removed TimeClock for stream {}: {}", streamId, streamIdTimeClockMap.get(streamId));
- } else {
- LOG.warn("No TimeClock found for stream {}, nothing to remove", streamId);
- }
- }
- }
-
- @Override
- public void registerListener(String streamId, StreamTimeClockListener listener) {
- synchronized (listenerStreamIdMap) {
- if (listenerStreamIdMap.containsKey(listener)) {
- throw new IllegalArgumentException("Duplicated listener: " + listener.toString());
- }
- LOG.info("Register {} on {}", listener, streamId);
- listenerStreamIdMap.put(listener, streamId);
- }
- }
-
- @Override
- public void registerListener(StreamTimeClock streamClock, StreamTimeClockListener listener) {
- registerListener(streamClock.getStreamId(), listener);
- }
-
- @Override
- public void removeListener(StreamTimeClockListener listener) {
- listenerStreamIdMap.remove(listener);
- }
-
- @Override
- public synchronized void triggerTickOn(String streamId) {
- int count = 0;
- for (Map.Entry<StreamTimeClockListener, String> entry : listenerStreamIdMap.entrySet()) {
- if (entry.getValue().equals(streamId)) {
- entry.getKey().onTick(streamIdTimeClockMap.get(streamId), getCurrentSystemTime());
- count++;
- }
- }
- if (LOG.isDebugEnabled()) {
- LOG.debug("Triggered {} time-clock listeners on stream {}", count, streamId);
- }
- }
-
- private static long getCurrentSystemTime() {
- return System.currentTimeMillis();
- }
-
- @Override
- public void onTimeUpdate(String streamId, long timestamp) {
- StreamTimeClock timeClock = getStreamTimeClock(streamId);
- if (timeClock == null) {
- return;
- }
- // Trigger time clock only when time moves forward
- if (timestamp >= timeClock.getTime()) {
- timeClock.moveForward(timestamp);
- if (LOG.isDebugEnabled()) {
- LOG.debug("Tick on stream {} with latest time {}", streamId, DateTimeUtil.millisecondsToHumanDateWithMilliseconds(timeClock.getTime()));
- }
- triggerTickOn(streamId);
- }
- }
-
- private void triggerTickOnAll() {
- synchronized (listenerStreamIdMap) {
- for (Map.Entry<StreamTimeClockListener, String> entry : listenerStreamIdMap.entrySet()) {
- triggerTickOn(entry.getValue());
- }
- }
- }
-
- @Override
- public void close() {
- timer.cancel();
- triggerTickOnAll();
- LOG.info("Closed StreamTimeClockManager {}", this);
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/sorter/impl/StreamWindowManagerImpl.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/sorter/impl/StreamWindowManagerImpl.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/sorter/impl/StreamWindowManagerImpl.java
deleted file mode 100644
index 4c5154b..0000000
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/sorter/impl/StreamWindowManagerImpl.java
+++ /dev/null
@@ -1,176 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.engine.sorter.impl;
-
-import org.apache.eagle.alert.engine.PartitionedEventCollector;
-import org.apache.eagle.alert.engine.model.PartitionedEvent;
-import org.apache.eagle.alert.engine.sorter.StreamTimeClock;
-import org.apache.eagle.alert.engine.sorter.StreamWindow;
-import org.apache.eagle.alert.engine.sorter.StreamWindowManager;
-import org.apache.eagle.alert.engine.sorter.StreamWindowRepository;
-import org.apache.eagle.alert.utils.TimePeriodUtils;
-import org.apache.eagle.common.DateTimeUtil;
-
-import org.apache.commons.lang3.time.StopWatch;
-import org.joda.time.Period;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.*;
-
-public class StreamWindowManagerImpl implements StreamWindowManager {
- private static final Logger LOG = LoggerFactory.getLogger(StreamWindowManagerImpl.class);
- private final TreeMap<Long, StreamWindow> windowBuckets;
- private final PartitionedEventCollector collector;
- private final Period windowPeriod;
- private final long windowMargin;
- @SuppressWarnings("unused")
- private final Comparator<PartitionedEvent> comparator;
- private long rejectTime;
-
- public StreamWindowManagerImpl(Period windowPeriod, long windowMargin, Comparator<PartitionedEvent> comparator, PartitionedEventCollector collector) {
- this.windowBuckets = new TreeMap<>();
- this.windowPeriod = windowPeriod;
- this.windowMargin = windowMargin;
- this.collector = collector;
- this.comparator = comparator;
- }
-
- @Override
- public StreamWindow addNewWindow(long initialTime) {
- synchronized (windowBuckets) {
- if (!reject(initialTime)) {
- Long windowStartTime = TimePeriodUtils.formatMillisecondsByPeriod(initialTime, windowPeriod);
- Long windowEndTime = windowStartTime + TimePeriodUtils.getMillisecondsOfPeriod(windowPeriod);
- StreamWindow window = StreamWindowRepository.getSingletonInstance().createWindow(windowStartTime, windowEndTime, windowMargin);
- window.register(collector);
- addWindow(window);
- return window;
- } else {
- throw new IllegalStateException("Failed to create new window, as "
- + DateTimeUtil.millisecondsToHumanDateWithMilliseconds(initialTime) + " is too late, only allow timestamp after "
- + DateTimeUtil.millisecondsToHumanDateWithMilliseconds(rejectTime));
- }
- }
- }
-
- private void addWindow(StreamWindow window) {
- if (!windowBuckets.containsKey(window.startTime())) {
- windowBuckets.put(window.startTime(), window);
- } else {
- throw new IllegalArgumentException("Duplicated " + window.toString());
- }
- }
-
- @Override
- public void removeWindow(StreamWindow window) {
- synchronized (windowBuckets) {
- windowBuckets.remove(window.startTime());
- }
- }
-
- @Override
- public boolean hasWindow(StreamWindow window) {
- synchronized (windowBuckets) {
- return windowBuckets.containsKey(window.startTime());
- }
- }
-
- @Override
- public boolean hasWindowFor(long timestamp) {
- return getWindowFor(timestamp) != null;
- }
-
- @Override
- public Collection<StreamWindow> getWindows() {
- synchronized (windowBuckets) {
- return windowBuckets.values();
- }
- }
-
- @Override
- public StreamWindow getWindowFor(long timestamp) {
- synchronized (windowBuckets) {
- for (StreamWindow windowBucket : windowBuckets.values()) {
- if (timestamp >= windowBucket.startTime() && timestamp < windowBucket.endTime()) {
- return windowBucket;
- }
- }
- return null;
- }
- }
-
- @Override
- public boolean reject(long timestamp) {
- return timestamp < rejectTime;
- }
-
- @Override
- public void onTick(StreamTimeClock clock, long globalSystemTime) {
- synchronized (windowBuckets) {
- List<StreamWindow> toRemoved = new ArrayList<>();
- List<StreamWindow> aliveWindow = new ArrayList<>();
-
- for (StreamWindow windowBucket : windowBuckets.values()) {
- windowBucket.onTick(clock, globalSystemTime);
- if (windowBucket.rejectTime() > rejectTime) {
- rejectTime = windowBucket.rejectTime();
- }
- }
- for (StreamWindow windowBucket : windowBuckets.values()) {
- if (windowBucket.expired() || windowBucket.endTime() <= rejectTime) {
- toRemoved.add(windowBucket);
- } else {
- aliveWindow.add(windowBucket);
- }
- }
- toRemoved.forEach(this::closeAndRemoveWindow);
- if (toRemoved.size() > 0) {
- LOG.info("Windows: {} alive = {}, {} expired = {}", aliveWindow.size(), aliveWindow, toRemoved.size(), toRemoved);
- }
- }
- }
-
- private void closeAndRemoveWindow(StreamWindow windowBucket) {
- StopWatch stopWatch = new StopWatch();
- stopWatch.start();
- closeWindow(windowBucket);
- removeWindow(windowBucket);
- stopWatch.stop();
- LOG.info("Removed {} in {} ms", windowBucket, stopWatch.getTime());
- }
-
- private void closeWindow(StreamWindow windowBucket) {
- windowBucket.close();
- }
-
- public void close() {
- synchronized (windowBuckets) {
- LOG.debug("Closing");
- StopWatch stopWatch = new StopWatch();
- stopWatch.start();
- int count = 0;
- for (StreamWindow windowBucket : getWindows()) {
- count++;
- closeWindow(windowBucket);
- }
- windowBuckets.clear();
- stopWatch.stop();
- LOG.info("Closed {} windows in {} ms", count, stopWatch.getTime());
- }
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/spout/CorrelationSpout.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/spout/CorrelationSpout.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/spout/CorrelationSpout.java
deleted file mode 100644
index e9ee892..0000000
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/spout/CorrelationSpout.java
+++ /dev/null
@@ -1,411 +0,0 @@
-/*
- *
- * * Licensed to the Apache Software Foundation (ASF) under one or more
- * * contributor license agreements. See the NOTICE file distributed with
- * * this work for additional information regarding copyright ownership.
- * * The ASF licenses this file to You under the Apache License, Version 2.0
- * * (the "License"); you may not use this file except in compliance with
- * * the License. You may obtain a copy of the License at
- * *
- * * http://www.apache.org/licenses/LICENSE-2.0
- * *
- * * Unless required by applicable law or agreed to in writing, software
- * * distributed under the License is distributed on an "AS IS" BASIS,
- * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * * See the License for the specific language governing permissions and
- * * limitations under the License.
- *
- */
-package org.apache.eagle.alert.engine.spout;
-
-import backtype.storm.spout.MultiScheme;
-import backtype.storm.spout.Scheme;
-import backtype.storm.spout.SchemeAsMultiScheme;
-import backtype.storm.spout.SpoutOutputCollector;
-import backtype.storm.task.TopologyContext;
-import backtype.storm.topology.OutputFieldsDeclarer;
-import backtype.storm.topology.base.BaseRichSpout;
-import backtype.storm.tuple.Fields;
-import com.typesafe.config.Config;
-import com.typesafe.config.ConfigFactory;
-import org.apache.commons.collections.CollectionUtils;
-import org.apache.eagle.alert.coordination.model.Kafka2TupleMetadata;
-import org.apache.eagle.alert.coordination.model.SpoutSpec;
-import org.apache.eagle.alert.engine.coordinator.IMetadataChangeNotifyService;
-import org.apache.eagle.alert.engine.coordinator.MetadataType;
-import org.apache.eagle.alert.engine.coordinator.StreamDefinition;
-import org.apache.eagle.alert.engine.router.SpoutSpecListener;
-import org.apache.eagle.alert.engine.serialization.PartitionedEventSerializer;
-import org.apache.eagle.alert.engine.serialization.SerializationMetadataProvider;
-import org.apache.eagle.alert.engine.serialization.Serializers;
-import org.apache.eagle.alert.utils.AlertConstants;
-import org.apache.eagle.alert.utils.StreamIdConversion;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import storm.kafka.*;
-
-import java.text.MessageFormat;
-import java.util.*;
-
-/**
- * wrap KafkaSpout to provide parallel processing of messages for multiple Kafka topics
- * <p>1. onNewConfig() is interface for outside to update new metadata. Upon new metadata, this class will calculate if there is any new topic, removed topic or
- * updated topic</p>
- */
-public class CorrelationSpout extends BaseRichSpout implements SpoutSpecListener, SerializationMetadataProvider {
- private static final long serialVersionUID = -5280723341236671580L;
- private static final Logger LOG = LoggerFactory.getLogger(CorrelationSpout.class);
-
- public static final String DEFAULT_STORM_KAFKA_TRANSACTION_ZK_ROOT = "/consumers";
- public static final String DEFAULT_STORM_KAFKA_TRANSACTION_ZK_RELATIVE_PATH = "/eagle_consumer";
-
- // topic to KafkaSpoutWrapper
- private volatile Map<String, KafkaSpoutWrapper> kafkaSpoutList = new HashMap<>();
- private int numOfRouterBolts;
-
- private SpoutSpec cachedSpoutSpec;
-
- private transient KafkaSpoutMetric kafkaSpoutMetric;
-
- @SuppressWarnings("rawtypes")
- private Map conf;
- private TopologyContext context;
- private SpoutOutputCollector collector;
- private final Config config;
- private String topologyId;
- private String spoutName;
- private String routeBoltName;
- @SuppressWarnings("unused")
- private int taskIndex;
- private IMetadataChangeNotifyService changeNotifyService;
- private PartitionedEventSerializer serializer;
- private volatile Map<String, StreamDefinition> sds;
-
- /**
- * FIXME one single changeNotifyService may have issues as possibly multiple spout tasks will register themselves and initialize service.
- *
- * @param config
- * @param topologyId
- * @param changeNotifyService
- * @param numOfRouterBolts
- */
- public CorrelationSpout(Config config, String topologyId, IMetadataChangeNotifyService changeNotifyService, int numOfRouterBolts) {
- this(config, topologyId, changeNotifyService, numOfRouterBolts, AlertConstants.DEFAULT_SPOUT_NAME, AlertConstants.DEFAULT_ROUTERBOLT_NAME);
- }
-
- /**
- * @param config
- * @param topologyId used for distinguishing kafka offset for different topologies
- * @param numOfRouterBolts used for generating streamId and routing
- * @param spoutName used for generating streamId between spout and router bolt
- * @param routerBoltName used for generating streamId between spout and router bolt.
- */
- public CorrelationSpout(Config config, String topologyId, IMetadataChangeNotifyService changeNotifyService, int numOfRouterBolts, String spoutName, String routerBoltName) {
- this.config = config;
- this.topologyId = topologyId;
- this.changeNotifyService = changeNotifyService;
- this.numOfRouterBolts = numOfRouterBolts;
- this.spoutName = spoutName;
- this.routeBoltName = routerBoltName;
- }
-
- public String getSpoutName() {
- return spoutName;
- }
-
- public String getRouteBoltName() {
- return routeBoltName;
- }
-
- /**
- * the only output field is for StreamEvent.
- *
- * @param declarer
- */
- @Override
- public void declareOutputFields(OutputFieldsDeclarer declarer) {
- for (int i = 0; i < numOfRouterBolts; i++) {
- String streamId = StreamIdConversion.generateStreamIdBetween(spoutName, routeBoltName + i);
- declarer.declareStream(streamId, new Fields(AlertConstants.FIELD_0));
- LOG.info("declare stream between spout and streamRouterBolt " + streamId);
- }
- }
-
- @SuppressWarnings("rawtypes")
- @Override
- public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("open method invoked");
- }
- this.conf = conf;
- this.context = context;
- this.collector = collector;
- this.taskIndex = context.getThisTaskIndex();
-
- // initialize an empty SpoutSpec
- cachedSpoutSpec = new SpoutSpec(topologyId, new HashMap<>(), new HashMap<>(), new HashMap<>());
-
- changeNotifyService.registerListener(this);
- changeNotifyService.init(config, MetadataType.SPOUT);
-
- // register KafkaSpout metric
- kafkaSpoutMetric = new KafkaSpoutMetric();
- context.registerMetric("kafkaSpout", kafkaSpoutMetric, 60);
-
- this.serializer = Serializers.newPartitionedEventSerializer(this);
- }
-
- @Override
- public void onSpoutSpecChange(SpoutSpec spec, Map<String, StreamDefinition> sds) {
- LOG.info("new metadata is updated " + spec);
- try {
- onReload(spec, sds);
- } catch (Exception ex) {
- LOG.error("error applying new SpoutSpec", ex);
- }
- }
-
- @Override
- public void nextTuple() {
- for (KafkaSpoutWrapper wrapper : kafkaSpoutList.values()) {
- try {
- wrapper.nextTuple();
- } catch (Exception e) {
- LOG.error("unexpected exception is caught: {}", e.getMessage(), e);
- }
-
- }
- }
-
- /**
- * find the correct wrapper to do ack that means msgId should be mapped to
- * wrapper.
- *
- * @param msgId
- */
- @Override
- public void ack(Object msgId) {
- // decode and get topic
- KafkaMessageIdWrapper id = (KafkaMessageIdWrapper) msgId;
- KafkaSpoutWrapper spout = kafkaSpoutList.get(id.topic);
- if (spout != null) {
- spout.ack(id.id);
- }
- }
-
- @Override
- public void fail(Object msgId) {
- // decode and get topic
- KafkaMessageIdWrapper id = (KafkaMessageIdWrapper) msgId;
- LOG.error("Failing message {}, with topic {}", msgId, id.topic);
- KafkaSpoutWrapper spout = kafkaSpoutList.get(id.topic);
- if (spout != null) {
- spout.fail(id.id);
- }
- }
-
- @Override
- public void deactivate() {
- System.out.println("deactivate");
- for (KafkaSpoutWrapper wrapper : kafkaSpoutList.values()) {
- wrapper.deactivate();
- }
- }
-
- @Override
- public void close() {
- System.out.println("close");
- for (KafkaSpoutWrapper wrapper : kafkaSpoutList.values()) {
- wrapper.close();
- }
- }
-
- private List<String> getTopics(SpoutSpec spoutSpec) {
- List<String> meta = new ArrayList<String>();
- for (Kafka2TupleMetadata entry : spoutSpec.getKafka2TupleMetadataMap().values()) {
- meta.add(entry.getTopic());
- }
- return meta;
- }
-
- @SuppressWarnings("unchecked")
- public void onReload(final SpoutSpec newMeta, Map<String, StreamDefinition> sds) throws Exception {
- // calculate topic create/remove/update
- List<String> topics = getTopics(newMeta);
- List<String> cachedTopcies = getTopics(cachedSpoutSpec);
- Collection<String> newTopics = CollectionUtils.subtract(topics, cachedTopcies);
- Collection<String> removeTopics = CollectionUtils.subtract(cachedTopcies, topics);
- Collection<String> updateTopics = CollectionUtils.intersection(topics, cachedTopcies);
-
- LOG.info("Topics were added={}, removed={}, modified={}", newTopics, removeTopics, updateTopics);
-
- // build lookup table for scheme
- Map<String, String> newSchemaName = new HashMap<String, String>();
- Map<String, Map<String, String>> dataSourceProperties = new HashMap<>();
- for (Kafka2TupleMetadata ds : newMeta.getKafka2TupleMetadataMap().values()) {
- newSchemaName.put(ds.getTopic(), ds.getSchemeCls());
- dataSourceProperties.put(ds.getTopic(), ds.getProperties());
- }
-
- // copy and swap
- Map<String, KafkaSpoutWrapper> newKafkaSpoutList = new HashMap<>(this.kafkaSpoutList);
- // iterate new topics and then create KafkaSpout
- for (String topic : newTopics) {
- KafkaSpoutWrapper wrapper = newKafkaSpoutList.get(topic);
- if (wrapper != null) {
- LOG.warn(MessageFormat.format("try to create new topic {0}, but found in the active spout list, this may indicate some inconsistency", topic));
- continue;
- }
- KafkaSpoutWrapper newWrapper = createKafkaSpout(ConfigFactory.parseMap(dataSourceProperties.get(topic)).withFallback(this.config),
- conf, context, collector, topic, newSchemaName.get(topic), newMeta, sds);
- newKafkaSpoutList.put(topic, newWrapper);
- }
- // iterate remove topics and then close KafkaSpout
- for (String topic : removeTopics) {
- KafkaSpoutWrapper wrapper = newKafkaSpoutList.get(topic);
- if (wrapper == null) {
- LOG.warn(MessageFormat.format("try to remove topic {0}, but not found in the active spout list, this may indicate some inconsistency", topic));
- continue;
- }
- removeKafkaSpout(wrapper);
- newKafkaSpoutList.remove(topic);
- }
-
- // iterate update topic and then update metadata
- for (String topic : updateTopics) {
- KafkaSpoutWrapper spoutWrapper = newKafkaSpoutList.get(topic);
- if (spoutWrapper == null) {
- LOG.warn(MessageFormat.format("try to update topic {0}, but not found in the active spout list, this may indicate some inconsistency", topic));
- continue;
- }
- spoutWrapper.update(newMeta, sds);
- }
-
- // swap
- this.cachedSpoutSpec = newMeta;
- this.kafkaSpoutList = newKafkaSpoutList;
- this.sds = sds;
- }
-
- /**
- * make this method protected to make sure unit test can work well
- * Q: Where to persist consumer state, i.e. what offset has been consumed for each topic and partition
- * A: stormKafkaTransactionZkPath + "/" + consumerId + "/" + topic + "/" + topologyId + "/" + partitionId
- * Note1: PartitionManager.committedPath for composing zkState path, _spoutConfig.zkRoot + "/" + _spoutConfig.id + "/" + _partition.getId();
- * consumerId by default is EagleConsumer unless it is specified by "stormKafkaEagleConsumer"
- * Note2: put topologyId as part of zkState because one topic by design can be consumed by multiple topologies so one topology needs to know
- * processed offset for itself
- * <p>TODO: Should avoid use Config.get in deep calling stack, should generate config bean as early as possible
- * </p>
- *
- * @param conf
- * @param context
- * @param collector
- * @param topic
- * @param spoutSpec
- * @return
- */
- @SuppressWarnings("rawtypes")
- protected KafkaSpoutWrapper createKafkaSpout(Config configure, Map conf, TopologyContext context, SpoutOutputCollector collector, final String topic,
- String schemeClsName, SpoutSpec spoutSpec, Map<String, StreamDefinition> sds) throws Exception {
- String kafkaBrokerZkQuorum = configure.getString(AlertConstants.KAFKA_BROKER_ZK_QUORUM);
- BrokerHosts hosts = null;
- if (configure.hasPath("spout.kafkaBrokerZkBasePath")) {
- hosts = new ZkHosts(kafkaBrokerZkQuorum, configure.getString(AlertConstants.KAFKA_BROKER_ZK_BASE_PATH));
- } else {
- hosts = new ZkHosts(kafkaBrokerZkQuorum);
- }
- String transactionZkRoot = DEFAULT_STORM_KAFKA_TRANSACTION_ZK_ROOT;
- if (configure.hasPath("spout.stormKafkaTransactionZkPath")) {
- transactionZkRoot = configure.getString("spout.stormKafkaTransactionZkPath");
- }
- boolean logEventEnabled = false;
- if (configure.hasPath("topology.logEventEnabled")) {
- logEventEnabled = configure.getBoolean("topology.logEventEnabled");
- }
- // write partition offset etc. into zkRoot+id, see PartitionManager.committedPath
- String zkStateTransactionRelPath = DEFAULT_STORM_KAFKA_TRANSACTION_ZK_RELATIVE_PATH;
- if (configure.hasPath("spout.stormKafkaEagleConsumer")) {
- zkStateTransactionRelPath = configure.getString("spout.stormKafkaEagleConsumer");
- }
- SpoutConfig spoutConfig = new SpoutConfig(hosts, topic, transactionZkRoot, zkStateTransactionRelPath + "/" + topic + "/" + topologyId);
- // transaction zkServers
- boolean stormKafkaUseSameZkQuorumWithKafkaBroker = configure.getBoolean("spout.stormKafkaUseSameZkQuorumWithKafkaBroker");
- if (stormKafkaUseSameZkQuorumWithKafkaBroker) {
- ZkServerPortUtils utils = new ZkServerPortUtils(kafkaBrokerZkQuorum);
- spoutConfig.zkServers = utils.getZkHosts();
- spoutConfig.zkPort = utils.getZkPort();
- } else {
- ZkServerPortUtils utils = new ZkServerPortUtils(configure.getString("spout.stormKafkaTransactionZkQuorum"));
- spoutConfig.zkServers = utils.getZkHosts();
- spoutConfig.zkPort = utils.getZkPort();
- }
- // transaction update interval
- spoutConfig.stateUpdateIntervalMs = configure.hasPath("spout.stormKafkaStateUpdateIntervalMs") ? configure.getInt("spout.stormKafkaStateUpdateIntervalMs") : 2000;
- // Kafka fetch size
- spoutConfig.fetchSizeBytes = configure.hasPath("spout.stormKafkaFetchSizeBytes") ? configure.getInt("spout.stormKafkaFetchSizeBytes") : 1048586;
- // "startOffsetTime" is for test usage, prod should not use this
- if (configure.hasPath("spout.stormKafkaStartOffsetTime")) {
- spoutConfig.startOffsetTime = configure.getInt("spout.stormKafkaStartOffsetTime");
- }
-
- spoutConfig.scheme = createMultiScheme(conf, topic, schemeClsName);
- KafkaSpoutWrapper wrapper = new KafkaSpoutWrapper(spoutConfig, kafkaSpoutMetric);
- SpoutOutputCollectorWrapper collectorWrapper = new SpoutOutputCollectorWrapper(this, collector, topic, spoutSpec, numOfRouterBolts, sds, this.serializer, logEventEnabled);
- wrapper.open(conf, context, collectorWrapper);
-
- if (LOG.isInfoEnabled()) {
- LOG.info("create and open kafka wrapper: topic {}, scheme class{} ", topic, schemeClsName);
- }
- return wrapper;
- }
-
- private MultiScheme createMultiScheme(Map conf, String topic, String schemeClsName) throws Exception {
- Object scheme = SchemeBuilder.buildFromClsName(schemeClsName, topic, conf);
- if (scheme instanceof MultiScheme) {
- return (MultiScheme) scheme;
- } else if (scheme instanceof Scheme) {
- return new SchemeAsMultiScheme((Scheme) scheme);
- } else {
- LOG.error("create spout scheme failed.");
- throw new IllegalArgumentException("create spout scheme failed.");
- }
- }
-
- @Override
- public StreamDefinition getStreamDefinition(String streamId) {
- return sds.get(streamId);
- }
-
- /**
- * utility to get list of zkServers and zkPort.(It is assumed that zkPort is same for all zkServers as storm-kafka library requires this though it is not efficient)
- */
- private static class ZkServerPortUtils {
- private List<String> zkHosts = new ArrayList<>();
- private Integer zkPort;
-
- public ZkServerPortUtils(String zkQuorum) {
- String[] zkConnections = zkQuorum.split(",");
- for (String zkConnection : zkConnections) {
- zkHosts.add(zkConnection.split(":")[0]);
- }
- zkPort = Integer.valueOf(zkConnections[0].split(":")[1]);
- }
-
- public List<String> getZkHosts() {
- return zkHosts;
- }
-
- public Integer getZkPort() {
- return zkPort;
- }
- }
-
- protected void removeKafkaSpout(KafkaSpoutWrapper wrapper) {
- try {
- wrapper.close();
- } catch (Exception e) {
- LOG.error("Close wrapper failed. Ignore and continue!", e);
- }
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/spout/CreateTopicUtils.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/spout/CreateTopicUtils.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/spout/CreateTopicUtils.java
deleted file mode 100644
index 5b7e542..0000000
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/spout/CreateTopicUtils.java
+++ /dev/null
@@ -1,43 +0,0 @@
-/*
- *
- * * Licensed to the Apache Software Foundation (ASF) under one or more
- * * contributor license agreements. See the NOTICE file distributed with
- * * this work for additional information regarding copyright ownership.
- * * The ASF licenses this file to You under the Apache License, Version 2.0
- * * (the "License"); you may not use this file except in compliance with
- * * the License. You may obtain a copy of the License at
- * *
- * * http://www.apache.org/licenses/LICENSE-2.0
- * *
- * * Unless required by applicable law or agreed to in writing, software
- * * distributed under the License is distributed on an "AS IS" BASIS,
- * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * * See the License for the specific language governing permissions and
- * * limitations under the License.
- *
- */
-package org.apache.eagle.alert.engine.spout;
-
-import org.slf4j.Logger;
-
-/**
- * normally this is used in unit test for convenience.
- */
-public class CreateTopicUtils {
-
- private static final Logger LOG = org.slf4j.LoggerFactory.getLogger(CreateTopicUtils.class);
-
- private static final int partitions = 2;
- private static final int replicationFactor = 1;
-
- public static void ensureTopicReady(String zkQuorum, String topic) {
- // ZkConnection zkConnection = new ZkConnection(zkQuorum);
- // ZkClient zkClient = new ZkClient(zkQuorum, 10000, 10000, ZKStringSerializer$.MODULE$);
- //// ZkUtils zkUtils = new ZkUtils(zkClient, zkConnection, false);
- // if (!AdminUtils.topicExists(zkClient, topic)) {
- // LOG.info("create topic " + topic + " with partitions " + partitions + ", and replicationFactor "
- // + replicationFactor);
- // AdminUtils.createTopic(zkClient, topic, partitions, replicationFactor, new Properties());
- // }
- }
-}
http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/spout/ISpoutSpecLCM.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/spout/ISpoutSpecLCM.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/spout/ISpoutSpecLCM.java
deleted file mode 100644
index 3c8c99d..0000000
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/spout/ISpoutSpecLCM.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- *
- * * Licensed to the Apache Software Foundation (ASF) under one or more
- * * contributor license agreements. See the NOTICE file distributed with
- * * this work for additional information regarding copyright ownership.
- * * The ASF licenses this file to You under the Apache License, Version 2.0
- * * (the "License"); you may not use this file except in compliance with
- * * the License. You may obtain a copy of the License at
- * *
- * * http://www.apache.org/licenses/LICENSE-2.0
- * *
- * * Unless required by applicable law or agreed to in writing, software
- * * distributed under the License is distributed on an "AS IS" BASIS,
- * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * * See the License for the specific language governing permissions and
- * * limitations under the License.
- *
- */
-
-package org.apache.eagle.alert.engine.spout;
-
-import org.apache.eagle.alert.coordination.model.SpoutSpec;
-import org.apache.eagle.alert.engine.coordinator.StreamDefinition;
-
-import java.util.Map;
-
-/**
- * topic to stream metadata lifecycle method
- * one topic may spawn multiple streams, the metadata change includes
- * 1. add/remove stream
- * 2. for a specific stream, groupingstrategy is changed
- * ex1, this stream has more alert bolts than before, then this spout would take more traffic
- * ex2, this stream has less alert bolts than before, then this spout would take less traffic
- */
-public interface ISpoutSpecLCM {
- /**
- * stream metadata is used for SPOUT to filter traffic and route traffic to following groupby bolts.
- *
- * @param metadata
- */
- void update(SpoutSpec metadata, Map<String, StreamDefinition> sds);
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/spout/KafkaMessageIdWrapper.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/spout/KafkaMessageIdWrapper.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/spout/KafkaMessageIdWrapper.java
deleted file mode 100644
index 74dea03..0000000
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/spout/KafkaMessageIdWrapper.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/*
- *
- * * Licensed to the Apache Software Foundation (ASF) under one or more
- * * contributor license agreements. See the NOTICE file distributed with
- * * this work for additional information regarding copyright ownership.
- * * The ASF licenses this file to You under the Apache License, Version 2.0
- * * (the "License"); you may not use this file except in compliance with
- * * the License. You may obtain a copy of the License at
- * *
- * * http://www.apache.org/licenses/LICENSE-2.0
- * *
- * * Unless required by applicable law or agreed to in writing, software
- * * distributed under the License is distributed on an "AS IS" BASIS,
- * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * * See the License for the specific language governing permissions and
- * * limitations under the License.
- *
- */
-package org.apache.eagle.alert.engine.spout;
-
-import com.fasterxml.jackson.core.JsonProcessingException;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import org.apache.eagle.common.DateTimeUtil;
-
-/**
- * Created on 2/18/16.
- */
-public class KafkaMessageIdWrapper {
- public Object id;
- public String topic;
- public long timestamp;
-
- public KafkaMessageIdWrapper(Object o) {
- this.id = o;
- }
-
- private static final ObjectMapper objectMapper = new ObjectMapper();
-
- public String toString() {
- try {
- return String.format("KafkaMessageIdWrapper[topic=%s, id=%s, timestamp=%s %s]",
- topic,
- objectMapper.writeValueAsString(id),
- DateTimeUtil.millisecondsToHumanDateWithSeconds(timestamp),
- DateTimeUtil.CURRENT_TIME_ZONE.getID());
- } catch (JsonProcessingException e) {
- throw new IllegalStateException(e);
- }
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/spout/SchemeBuilder.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/spout/SchemeBuilder.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/spout/SchemeBuilder.java
deleted file mode 100644
index 440db0a..0000000
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/spout/SchemeBuilder.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- *
- * * Licensed to the Apache Software Foundation (ASF) under one or more
- * * contributor license agreements. See the NOTICE file distributed with
- * * this work for additional information regarding copyright ownership.
- * * The ASF licenses this file to You under the Apache License, Version 2.0
- * * (the "License"); you may not use this file except in compliance with
- * * the License. You may obtain a copy of the License at
- * *
- * * http://www.apache.org/licenses/LICENSE-2.0
- * *
- * * Unless required by applicable law or agreed to in writing, software
- * * distributed under the License is distributed on an "AS IS" BASIS,
- * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * * See the License for the specific language governing permissions and
- * * limitations under the License.
- *
- */
-
-package org.apache.eagle.alert.engine.spout;
-
-import java.util.Map;
-
-
-/**
- * All Scheme implementations should have the following conditions
- * 1) implement Scheme interface
- * 2) has one constructor with topic name as parameter.
- */
-public class SchemeBuilder {
-
- @SuppressWarnings("rawtypes")
- public static Object buildFromClsName(String clsName, String topic, Map conf) throws Exception {
- Object o = Class.forName(clsName).getConstructor(String.class, Map.class).newInstance(topic, conf);
- return o;
- }
-}