You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@eagle.apache.org by ha...@apache.org on 2016/07/07 00:53:34 UTC
[4/6] incubator-eagle git commit: Rebase code base
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/72a1501c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/impl/FloatSerializer.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/impl/FloatSerializer.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/impl/FloatSerializer.java
index 18089a9..e6b510a 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/impl/FloatSerializer.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/impl/FloatSerializer.java
@@ -1,11 +1,11 @@
package org.apache.eagle.alert.engine.serialization.impl;
-import org.apache.eagle.alert.engine.serialization.Serializer;
-
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
+import org.apache.eagle.alert.engine.serialization.Serializer;
+
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/72a1501c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/impl/IntegerSerializer.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/impl/IntegerSerializer.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/impl/IntegerSerializer.java
index d2473a9..f784456 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/impl/IntegerSerializer.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/impl/IntegerSerializer.java
@@ -1,11 +1,11 @@
package org.apache.eagle.alert.engine.serialization.impl;
-import org.apache.eagle.alert.engine.serialization.Serializer;
-
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
+import org.apache.eagle.alert.engine.serialization.Serializer;
+
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/72a1501c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/impl/JavaObjectSerializer.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/impl/JavaObjectSerializer.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/impl/JavaObjectSerializer.java
index 76d2294..39baf2b 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/impl/JavaObjectSerializer.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/impl/JavaObjectSerializer.java
@@ -16,14 +16,14 @@
*/
package org.apache.eagle.alert.engine.serialization.impl;
-import org.apache.commons.lang3.SerializationUtils;
-import org.apache.eagle.alert.engine.serialization.Serializer;
-
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.io.Serializable;
+import org.apache.commons.lang3.SerializationUtils;
+import org.apache.eagle.alert.engine.serialization.Serializer;
+
public class JavaObjectSerializer implements Serializer<Object> {
@Override
public void serialize(Object value, DataOutput dataOutput) throws IOException {
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/72a1501c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/impl/LongSerializer.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/impl/LongSerializer.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/impl/LongSerializer.java
index 8d85c76..116b275 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/impl/LongSerializer.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/impl/LongSerializer.java
@@ -1,11 +1,11 @@
package org.apache.eagle.alert.engine.serialization.impl;
-import org.apache.eagle.alert.engine.serialization.Serializer;
-
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
+import org.apache.eagle.alert.engine.serialization.Serializer;
+
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/72a1501c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/impl/PartitionedEventSerializerImpl.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/impl/PartitionedEventSerializerImpl.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/impl/PartitionedEventSerializerImpl.java
index 714920e..5a3d77d 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/impl/PartitionedEventSerializerImpl.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/impl/PartitionedEventSerializerImpl.java
@@ -16,8 +16,10 @@
*/
package org.apache.eagle.alert.engine.serialization.impl;
-import com.google.common.io.ByteArrayDataOutput;
-import com.google.common.io.ByteStreams;
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
import org.apache.eagle.alert.engine.coordinator.StreamPartition;
import org.apache.eagle.alert.engine.model.PartitionedEvent;
import org.apache.eagle.alert.engine.model.StreamEvent;
@@ -26,7 +28,8 @@ import org.apache.eagle.alert.engine.serialization.SerializationMetadataProvider
import org.apache.eagle.alert.engine.serialization.Serializer;
import org.apache.eagle.alert.engine.utils.CompressionUtils;
-import java.io.*;
+import com.google.common.io.ByteArrayDataOutput;
+import com.google.common.io.ByteStreams;
/**
* Stream Metadata Cached Serializer
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/72a1501c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/impl/StreamEventSerializer.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/impl/StreamEventSerializer.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/impl/StreamEventSerializer.java
index 113816f..0fb686b 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/impl/StreamEventSerializer.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/impl/StreamEventSerializer.java
@@ -16,6 +16,11 @@
*/
package org.apache.eagle.alert.engine.serialization.impl;
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.BitSet;
+
import org.apache.eagle.alert.engine.coordinator.StreamColumn;
import org.apache.eagle.alert.engine.coordinator.StreamDefinition;
import org.apache.eagle.alert.engine.model.StreamEvent;
@@ -23,11 +28,6 @@ import org.apache.eagle.alert.engine.serialization.SerializationMetadataProvider
import org.apache.eagle.alert.engine.serialization.Serializer;
import org.apache.eagle.alert.engine.serialization.Serializers;
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-import java.util.BitSet;
-
/**
* @see StreamEvent
*/
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/72a1501c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/impl/StreamPartitionDigestSerializer.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/impl/StreamPartitionDigestSerializer.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/impl/StreamPartitionDigestSerializer.java
index 1268cb8..f35da39 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/impl/StreamPartitionDigestSerializer.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/impl/StreamPartitionDigestSerializer.java
@@ -16,13 +16,19 @@
*/
package org.apache.eagle.alert.engine.serialization.impl;
-import org.apache.eagle.alert.engine.coordinator.StreamPartition;
-import org.apache.eagle.alert.engine.serialization.Serializer;
-
-import java.io.*;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.io.ObjectOutputStream;
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
-import java.util.*;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.eagle.alert.engine.coordinator.StreamPartition;
+import org.apache.eagle.alert.engine.serialization.Serializer;
/**
* Don't serialize streamId
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/72a1501c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/impl/StreamPartitionSerializer.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/impl/StreamPartitionSerializer.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/impl/StreamPartitionSerializer.java
index 67de517..4105277 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/impl/StreamPartitionSerializer.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/impl/StreamPartitionSerializer.java
@@ -16,16 +16,16 @@
*/
package org.apache.eagle.alert.engine.serialization.impl;
-import org.apache.eagle.alert.engine.coordinator.StreamPartition;
-import org.apache.eagle.alert.engine.coordinator.StreamSortSpec;
-import org.apache.eagle.alert.engine.serialization.Serializer;
-
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
+import org.apache.eagle.alert.engine.coordinator.StreamPartition;
+import org.apache.eagle.alert.engine.coordinator.StreamSortSpec;
+import org.apache.eagle.alert.engine.serialization.Serializer;
+
/**
* Don't serialize streamId
*
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/72a1501c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/impl/StringSerializer.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/impl/StringSerializer.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/impl/StringSerializer.java
index 2a1541a..940024d 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/impl/StringSerializer.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/impl/StringSerializer.java
@@ -1,11 +1,11 @@
package org.apache.eagle.alert.engine.serialization.impl;
-import org.apache.eagle.alert.engine.serialization.Serializer;
-
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
+import org.apache.eagle.alert.engine.serialization.Serializer;
+
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/72a1501c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/spout/CorrelationSpout.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/spout/CorrelationSpout.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/spout/CorrelationSpout.java
index cd23405..f54d5cd 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/spout/CorrelationSpout.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/spout/CorrelationSpout.java
@@ -304,7 +304,12 @@ public class CorrelationSpout extends BaseRichSpout implements SpoutSpecListener
protected KafkaSpoutWrapper createKafkaSpout(Map conf, TopologyContext context, SpoutOutputCollector collector, final String topic,
String schemeClsName, SpoutSpec spoutSpec, Map<String, StreamDefinition> sds) throws Exception{
String kafkaBrokerZkQuorum = config.getString("spout.kafkaBrokerZkQuorum");
- BrokerHosts hosts = new ZkHosts(kafkaBrokerZkQuorum);
+ BrokerHosts hosts = null;
+ if (config.hasPath("spout.kafkaBrokerZkBasePath")) {
+ hosts = new ZkHosts(kafkaBrokerZkQuorum, config.getString("spout.kafkaBrokerZkBasePath"));
+ } else {
+ hosts = new ZkHosts(kafkaBrokerZkQuorum);
+ }
String transactionZkRoot = DEFAULT_STORM_KAFKA_TRANSACTION_ZK_ROOT;
if(config.hasPath("spout.stormKafkaTransactionZkPath")) {
transactionZkRoot = config.getString("spout.stormKafkaTransactionZkPath");
@@ -335,10 +340,14 @@ public class CorrelationSpout extends BaseRichSpout implements SpoutSpecListener
spoutConfig.startOffsetTime = config.getInt("spout.stormKafkaStartOffsetTime");
}
- spoutConfig.scheme = new SchemeAsMultiScheme(SchemeBuilder.buildFromClsName(schemeClsName, topic));
+ spoutConfig.scheme = new SchemeAsMultiScheme(SchemeBuilder.buildFromClsName(schemeClsName, topic, conf));
KafkaSpoutWrapper wrapper = new KafkaSpoutWrapper(spoutConfig, kafkaSpoutMetric);
SpoutOutputCollectorWrapper collectorWrapper = new SpoutOutputCollectorWrapper(this, collector, topic, spoutSpec, numOfRouterBolts, sds,this.serializer);
wrapper.open(conf, context, collectorWrapper);
+
+ if (LOG.isInfoEnabled()) {
+ LOG.info("create and open kafka wrapper: topic {}, scheme class{} ", topic, schemeClsName);
+ }
return wrapper;
}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/72a1501c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/spout/CreateTopicUtils.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/spout/CreateTopicUtils.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/spout/CreateTopicUtils.java
index a2c9219..7e0dd3f 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/spout/CreateTopicUtils.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/spout/CreateTopicUtils.java
@@ -21,23 +21,34 @@ package org.apache.eagle.alert.engine.spout;
import java.util.Properties;
import kafka.admin.AdminUtils;
+import kafka.admin.RackAwareMode;
import kafka.utils.ZKStringSerializer$;
+import kafka.utils.ZkUtils;
import org.I0Itec.zkclient.ZkClient;
+import org.I0Itec.zkclient.ZkConnection;
import org.slf4j.Logger;
/**
* normally this is used in unit test for convenience
*/
public class CreateTopicUtils {
+
private static final Logger LOG = org.slf4j.LoggerFactory.getLogger(CreateTopicUtils.class);
+
private static final int partitions = 2;
private static final int replicationFactor = 1;
- public static void ensureTopicReady(String zkQuorum, String topic){
+
+ public static void ensureTopicReady(String zkQuorum, String topic) {
+ ZkConnection zkConnection = new ZkConnection(zkQuorum);
ZkClient zkClient = new ZkClient(zkQuorum, 10000, 10000, ZKStringSerializer$.MODULE$);
- if(!AdminUtils.topicExists(zkClient, topic)) {
- LOG.info("create topic " + topic + " with partitions " + partitions + ", and replicationFactor " + replicationFactor);
- AdminUtils.createTopic(zkClient, topic, partitions, replicationFactor, new Properties());
+ ZkUtils zkUtils = new ZkUtils(zkClient, zkConnection, false);
+ if (!AdminUtils.topicExists(zkUtils, topic)) {
+ LOG.info("create topic " + topic + " with partitions " + partitions + ", and replicationFactor "
+ + replicationFactor);
+ AdminUtils.createTopic(zkUtils, topic, partitions, replicationFactor, new Properties(),
+ RackAwareMode.Disabled$.MODULE$);
}
}
+
}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/72a1501c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/spout/SchemeBuilder.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/spout/SchemeBuilder.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/spout/SchemeBuilder.java
index 223f1b5..bfd5da7 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/spout/SchemeBuilder.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/spout/SchemeBuilder.java
@@ -19,6 +19,8 @@
package org.apache.eagle.alert.engine.spout;
+import java.util.Map;
+
import backtype.storm.spout.Scheme;
@@ -28,8 +30,10 @@ import backtype.storm.spout.Scheme;
* 2) has one constructor with topic name as parameter
*/
public class SchemeBuilder {
- public static Scheme buildFromClsName(String clsName, String topic) throws Exception{
- Object o = Class.forName(clsName).getConstructor(String.class).newInstance(topic);
+
+ @SuppressWarnings("rawtypes")
+ public static Scheme buildFromClsName(String clsName, String topic, Map conf) throws Exception{
+ Object o = Class.forName(clsName).getConstructor(String.class, Map.class).newInstance(topic, conf);
return (Scheme)o;
}
}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/72a1501c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/spout/SpoutOutputCollectorWrapper.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/spout/SpoutOutputCollectorWrapper.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/spout/SpoutOutputCollectorWrapper.java
index b37f7b3..2f7cc68 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/spout/SpoutOutputCollectorWrapper.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/spout/SpoutOutputCollectorWrapper.java
@@ -18,7 +18,6 @@
*/
package org.apache.eagle.alert.engine.spout;
-import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.Map;
@@ -112,7 +111,7 @@ public class SpoutOutputCollectorWrapper extends SpoutOutputCollector implements
Object streamId = convertedTuple.get(1);
StreamDefinition sd = sds.get(streamId);
- if(sd == null){
+ if (sd == null) {
LOG.warn("StreamDefinition {} is not found within {}, ignore this message", streamId, sds);
spout.ack(newMessageId);
return null;
@@ -141,17 +140,17 @@ public class SpoutOutputCollectorWrapper extends SpoutOutputCollector implements
}
// send message to StreamRouterBolt
PartitionedEvent pEvent = new PartitionedEvent(event, groupingStrategy.partition, hash);
- if(this.serializer == null){
- delegate.emit(sid, Collections.singletonList(pEvent), newMessageId);
- }else {
+ if (this.serializer == null) {
+ delegate.emit(sid, Collections.singletonList(pEvent), newMessageId);
+ } else {
try {
delegate.emit(sid, Collections.singletonList(serializer.serialize(pEvent)), newMessageId);
- } catch (IOException e) {
- LOG.error("Failed to serialize {}", pEvent, e);
- throw new RuntimeException(e);
+ } catch (Exception e) {
+ LOG.error("Failed to serialize {}, this message would be ignored!", pEvent, e);
+ spout.ack(newMessageId);
}
}
- }else{
+ } else {
// ******* short-cut ack ********
// we should simply ack those messages which are not processed in this topology because KafkaSpout implementation requires _pending is empty
// before moving to next offsets.
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/72a1501c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/utils/CompressionUtils.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/utils/CompressionUtils.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/utils/CompressionUtils.java
index f526cad..0fbe7b3 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/utils/CompressionUtils.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/utils/CompressionUtils.java
@@ -16,12 +16,15 @@
*/
package org.apache.eagle.alert.engine.utils;
-import com.google.common.io.ByteStreams;
-
-import java.io.*;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
import java.util.zip.GZIPInputStream;
import java.util.zip.GZIPOutputStream;
+import com.google.common.io.ByteStreams;
+
public class CompressionUtils {
public static byte[] compress(byte[] source) throws IOException {
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/72a1501c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/resources/application.conf
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/resources/application.conf b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/resources/application.conf
index bfb5f54..e3be1b7 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/resources/application.conf
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/resources/application.conf
@@ -24,7 +24,7 @@
"localMode" : "true"
},
"spout" : {
- "kafkaBrokerZkQuorum": "sandbox.hortonworks.com:2181",
+ "kafkaBrokerZkQuorum": "10.254.194.245:2181",
"kafkaBrokerZkBasePath": "/brokers",
"stormKafkaUseSameZkQuorumWithKafkaBroker": true,
"stormKafkaTransactionZkQuorum": "",
@@ -34,8 +34,8 @@
"stormKafkaFetchSizeBytes": 1048586,
},
"zkConfig" : {
- "zkQuorum" : "sandbox.hortonworks.com:2181",
- "zkRoot" : "/alert",
+ "zkQuorum" : "10.254.194.245:2181",
+ "zkRoot" : "/kafka",
"zkSessionTimeoutMs" : 10000,
"connectionTimeoutMs" : 10000,
"zkRetryTimes" : 3,
@@ -47,21 +47,21 @@
},
"metadataService": {
"context" : "/rest",
- "host" : "localhost",
- "port" : 58080
+ "host" : "127.0.0.1",
+ "port" : 8080
},
"coordinatorService": {
"host": "localhost",
- "port": 58080,
+ "port": 9090,
"context" : "/rest"
}
"metric":{
"sink": {
-// "kafka": {
-// "topic": "alert_metric"
-// "bootstrap.servers": "localhost:6667"
-// }
- "stdout": {}
+ // "kafka": {
+ // "topic": "alert_metric"
+ // "bootstrap.servers": "localhost:6667"
+ // }
+ // "stdout": {}
// "elasticsearch": {
// "hosts": ["localhost:9200"]
// "index": "alert_metric"
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/72a1501c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/resources/log4j.properties b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/resources/log4j.properties
index af99e2c..5e3d3b1 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/resources/log4j.properties
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/resources/log4j.properties
@@ -21,6 +21,7 @@ log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d{ISO8601} %p [%t] %c{2}[%L]: %m%n
##log4j.logger.org.apache.eagle.alert.engine.spout.CorrelationSpout=DEBUG
+log4j.logger.org.apache.eagle.alert.metric=ERROR
log4j.logger.org.apache.eagle.alert.engine.spout.SpoutOutputCollectorWrapper=DEBUG
log4j.logger.org.apache.eagle.alert.engine.router.impl.StreamRouterBoltOutputCollector=DEBUG
log4j.logger.org.apache.eagle.alert.engine.sorter.impl.StreamSortWindowHandlerImpl=DEBUG
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/72a1501c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/e2e/Integration1.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/e2e/Integration1.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/e2e/Integration1.java
index 057aa73..927dfd7 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/e2e/Integration1.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/e2e/Integration1.java
@@ -17,11 +17,20 @@
package org.apache.eagle.alert.engine.e2e;
import java.util.List;
-import java.util.concurrent.ExecutionException;
+import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
+import java.util.concurrent.ThreadFactory;
+import kafka.admin.AdminUtils;
+import kafka.admin.RackAwareMode;
+import kafka.utils.ZKStringSerializer$;
+import kafka.utils.ZkUtils;
+
+import org.I0Itec.zkclient.ZkClient;
+import org.I0Itec.zkclient.ZkConnection;
+import org.apache.eagle.alert.config.ZKConfig;
+import org.apache.eagle.alert.config.ZKConfigBuilder;
import org.apache.eagle.alert.coordination.model.Kafka2TupleMetadata;
import org.apache.eagle.alert.coordination.model.internal.Topology;
import org.apache.eagle.alert.engine.UnitTopologyMain;
@@ -30,7 +39,10 @@ import org.apache.eagle.alert.engine.coordinator.Publishment;
import org.apache.eagle.alert.engine.coordinator.StreamDefinition;
import org.apache.eagle.alert.service.IMetadataServiceClient;
import org.apache.eagle.alert.service.MetadataServiceClientImpl;
+import org.apache.eagle.alert.utils.KafkaEmbedded;
+import org.junit.AfterClass;
import org.junit.Assert;
+import org.junit.BeforeClass;
import org.junit.Ignore;
import org.junit.Test;
import org.slf4j.Logger;
@@ -53,6 +65,7 @@ import com.typesafe.config.ConfigFactory;
*
*/
public class Integration1 {
+ private static final String SIMPLE_CONFIG = "/simple/application-integration.conf";
private static final Logger LOG = LoggerFactory.getLogger(Integration1.class);
private static final ObjectMapper om = new ObjectMapper();
@@ -61,16 +74,36 @@ public class Integration1 {
inte.args = args;
inte.test_simple_threshhold();
}
-
+
private String[] args;
- private ExecutorService executors = Executors.newFixedThreadPool(5);
+ private ExecutorService executors = Executors.newFixedThreadPool(5, new ThreadFactory() {
+
+ @Override
+ public Thread newThread(Runnable r) {
+ Thread t = new Thread();
+ t.setDaemon(true);
+ return t;
+ }
+ });
+ private static KafkaEmbedded kafka;
+
+ @BeforeClass
+ public static void setup() {
+// kafka = new KafkaEmbedded(9092, 2181);
+// makeSureTopic("perfmon_metrics");
+ }
+
+ @AfterClass
+ public static void end() {
+ if (kafka != null) {
+ kafka.shutdown();
+ }
+ }
/**
* Assumption:
* <p>
- * start metadata service 8080, better in docker
- * <p>
- * start coordinator service 9090, better in docker
+ * start metadata service 8080 /rest
* <p>
* datasources : perfmon_datasource
* <p>
@@ -84,20 +117,29 @@ public class Integration1 {
*
* @throws InterruptedException
*/
- @Ignore
@Test
public void test_simple_threshhold() throws Exception {
- System.setProperty("config.resource", "/application-integration.conf");
+ System.setProperty("config.resource", SIMPLE_CONFIG);
ConfigFactory.invalidateCaches();
Config config = ConfigFactory.load();
System.out.println("loading metadatas...");
- loadMetadatas("/", config);
+ loadMetadatas("/simple/", config);
System.out.println("loading metadatas done!");
+ if (args == null) {
+ args = new String[] { "-f", "simple/application-integration.conf" };
+ }
+
executors.submit(() -> SampleClient1.main(args));
- executors.submit(() -> UnitTopologyMain.main(args));
+ executors.submit(() -> {
+ try {
+ UnitTopologyMain.main(args);
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ });
Utils.sleep(1000 * 5l);
while (true) {
@@ -107,40 +149,20 @@ public class Integration1 {
}
}
- /**
- * Test only run expected when there is a missed config in the config file. mark as ignored
- * @throws InterruptedException
- * @throws ExecutionException
- */
- @Ignore
- @Test(expected = ExecutionException.class)
- public void test_typesafe_config() throws InterruptedException, ExecutionException {
- System.setProperty("config.resource", "/application-integration.conf");
+ public static void makeSureTopic(String topic) {
+ System.setProperty("config.resource", SIMPLE_CONFIG);
ConfigFactory.invalidateCaches();
- Future<?> f = executors.submit(() -> {
- UnitTopologyMain.main(null);
- });
+ Config config = ConfigFactory.load();
+ ZKConfig zkconfig = ZKConfigBuilder.getZKConfig(config);
- f.get();
+ ZkClient zkClient = new ZkClient(zkconfig.zkQuorum, 10000, 10000, ZKStringSerializer$.MODULE$);
+ Properties topicConfiguration = new Properties();
+ ZkConnection zkConnection = new ZkConnection(zkconfig.zkQuorum);
+ ZkUtils zkUtils = new ZkUtils(zkClient, zkConnection, false);
+ AdminUtils.createTopic(zkUtils, topic, 1, 1, topicConfiguration, RackAwareMode.Disabled$.MODULE$);
}
-// @Test
-// private void makeSureTopic() {
-// System.setProperty("config.resource", "/application-integration.conf");
-// ConfigFactory.invalidateCaches();
-// Config config = ConfigFactory.load();
-// ZKConfig zkconfig = ZKConfigBuilder.getZKConfig(config);
-//
-// CuratorFramework curator = CuratorFrameworkFactory.newClient(
-// zkconfig.zkQuorum,
-// zkconfig.zkSessionTimeoutMs,
-// zkconfig.connectionTimeoutMs,
-// new RetryNTimes(zkconfig.zkRetryTimes, zkconfig.zkRetryInterval)
-// );
-// }
-
public static void proactive_schedule(Config config) throws Exception {
-
try (CoordinatorClient cc = new CoordinatorClient(config)) {
try {
String resp = cc.schedule();
@@ -205,7 +227,7 @@ public class Integration1 {
public void testJson() throws Exception {
{
JavaType type = CollectionType.construct(List.class, SimpleType.construct(Topology.class));
- List<Topology> l = om.readValue(Integration1.class.getResourceAsStream("/topologies.json"),
+ List<Topology> l = om.readValue(Integration1.class.getResourceAsStream("/simple/topologies.json"),
type);
Topology t = (Topology) l.get(0);
@@ -216,16 +238,16 @@ public class Integration1 {
{
JavaType type = CollectionType.construct(List.class, SimpleType.construct(Publishment.class));
// publishment
- List<Publishment> l = om.readValue(Integration1.class.getResourceAsStream("/publishments.json"), type);
+ List<Publishment> l = om.readValue(Integration1.class.getResourceAsStream("/simple/publishments.json"), type);
Publishment p = l.get(0);
Assert.assertEquals("KAFKA", p.getType());
}
- checkAll("/");
+ checkAll("/simple/");
checkAll("/correlation/");
}
- private void checkAll(String base) throws Exception {
+ public static void checkAll(String base) throws Exception {
loadEntities(base + "datasources.json", Kafka2TupleMetadata.class);
loadEntities(base + "policies.json", PolicyDefinition.class);
loadEntities(base + "publishments.json", Publishment.class);
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/72a1501c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/e2e/Integration2.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/e2e/Integration2.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/e2e/Integration2.java
index 7ea0e7e..a11cc66 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/e2e/Integration2.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/e2e/Integration2.java
@@ -67,7 +67,12 @@ public class Integration2 {
Config config = ConfigFactory.load();
Integration1.loadMetadatas("/correlation/", config);
- executors.submit(() -> UnitTopologyMain.main(args));
+ executors.submit(() -> {
+ try {
+ UnitTopologyMain.main(args);
+ } catch (Exception e) {
+ e.printStackTrace();
+ }});
executors.submit(() -> SampleClient2.main(args));
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/72a1501c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/e2e/Integration4NoDataAlert.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/e2e/Integration4NoDataAlert.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/e2e/Integration4NoDataAlert.java
new file mode 100644
index 0000000..667d241
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/e2e/Integration4NoDataAlert.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.alert.engine.e2e;
+
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+import org.apache.eagle.alert.engine.UnitTopologyMain;
+import org.apache.eagle.alert.utils.KafkaEmbedded;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import backtype.storm.utils.Utils;
+
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+
+/**
+ * Since 6/29/16.
+ */
+public class Integration4NoDataAlert {
+ private String[] args;
+
+ private ExecutorService executors = Executors.newFixedThreadPool(5);
+
+ private static KafkaEmbedded kafka;
+
+ @BeforeClass
+ public static void setup() {
+ // FIXME : start local kafka
+ }
+
+ @AfterClass
+ public static void end() {
+ if (kafka != null) {
+ kafka.shutdown();
+ }
+ }
+ @Test
+ public void testTriggerNoData() throws Exception{
+ System.setProperty("config.resource", "/nodata/application-nodata.conf");
+ ConfigFactory.invalidateCaches();
+ Config config = ConfigFactory.load();
+
+ System.out.println("loading metadatas...");
+ Integration1.loadMetadatas("/nodata/", config);
+ System.out.println("loading metadatas done!");
+
+
+ executors.submit(() -> {
+ try {
+ UnitTopologyMain.main(args);
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ });
+
+ // wait 20 seconds for topology to bring up
+ try{
+ Thread.sleep(20000);
+ }catch(Exception ex){}
+
+ // send mock data
+ executors.submit(() -> {
+ try {
+ SampleClient4NoDataAlert.main(args);
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ });
+
+
+ Utils.sleep(1000 * 5l);
+ while (true) {
+ Integration1.proactive_schedule(config);
+
+ Utils.sleep(1000 * 60l * 5);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/72a1501c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/e2e/MetadataServiceClientImpTest.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/e2e/MetadataServiceClientImpTest.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/e2e/MetadataServiceClientImpTest.java
new file mode 100644
index 0000000..c044da0
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/e2e/MetadataServiceClientImpTest.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.eagle.alert.engine.e2e;
+
+import java.util.List;
+
+import org.apache.eagle.alert.coordination.model.Kafka2TupleMetadata;
+import org.apache.eagle.alert.coordination.model.internal.Topology;
+import org.apache.eagle.alert.engine.coordinator.PolicyDefinition;
+import org.apache.eagle.alert.engine.coordinator.Publishment;
+import org.apache.eagle.alert.engine.coordinator.StreamDefinition;
+import org.apache.eagle.alert.service.IMetadataServiceClient;
+import org.apache.eagle.alert.service.MetadataServiceClientImpl;
+import org.junit.Ignore;
+import org.junit.Test;
+
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+
+public class MetadataServiceClientImpTest {
+
+ @Test @Ignore
+ public void test() {
+ System.out.println("loading metadatas...");
+ try {
+ System.setProperty("config.resource", "/application-integration.conf");
+ ConfigFactory.invalidateCaches();
+ Config config = ConfigFactory.load();
+ loadMetadatas("/", config);
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ System.out.println("loading metadatas done!");
+ }
+
+ private void loadMetadatas(String base, Config config) throws Exception {
+ IMetadataServiceClient client = new MetadataServiceClientImpl(config);
+ client.clear();
+
+ List<Kafka2TupleMetadata> metadata = Integration1.loadEntities(base + "datasources.json", Kafka2TupleMetadata.class);
+ client.addDataSources(metadata);
+
+ List<PolicyDefinition> policies = Integration1.loadEntities(base + "policies.json", PolicyDefinition.class);
+ client.addPolicies(policies);
+
+ List<Publishment> pubs = Integration1.loadEntities(base + "publishments.json", Publishment.class);
+ client.addPublishments(pubs);
+
+ List<StreamDefinition> defs = Integration1.loadEntities(base + "streamdefinitions.json", StreamDefinition.class);
+ client.addStreamDefinitions(defs);
+
+ List<Topology> topos = Integration1.loadEntities(base + "topologies.json", Topology.class);
+ client.addTopologies(topos);
+
+ client.close();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/72a1501c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/e2e/MetadataServiceTest.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/e2e/MetadataServiceTest.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/e2e/MetadataServiceTest.java
deleted file mode 100644
index 12fbfea..0000000
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/e2e/MetadataServiceTest.java
+++ /dev/null
@@ -1,85 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.eagle.alert.engine.e2e;
-
-import com.fasterxml.jackson.databind.JavaType;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.databind.type.CollectionType;
-import com.fasterxml.jackson.databind.type.SimpleType;
-import com.typesafe.config.Config;
-import com.typesafe.config.ConfigFactory;
-import org.apache.eagle.alert.coordination.model.Kafka2TupleMetadata;
-import org.apache.eagle.alert.coordination.model.internal.Topology;
-import org.apache.eagle.alert.engine.coordinator.PolicyDefinition;
-import org.apache.eagle.alert.engine.coordinator.Publishment;
-import org.apache.eagle.alert.engine.coordinator.StreamDefinition;
-import org.apache.eagle.alert.service.IMetadataServiceClient;
-import org.apache.eagle.alert.service.MetadataServiceClientImpl;
-import org.junit.Ignore;
-import org.junit.Test;
-
-import java.util.List;
-
-public class MetadataServiceTest {
-
- @Ignore
- @Test
- public void test() {
- System.out.println("loading metadatas...");
- try {
- System.setProperty("config.resource", "/application-integration.conf");
- ConfigFactory.invalidateCaches();
- Config config = ConfigFactory.load();
- loadMetadatas("/", config);
- } catch (Exception e) {
- e.printStackTrace();
- }
- System.out.println("loading metadatas done!");
- }
-
- private void loadMetadatas(String base, Config config) throws Exception {
- IMetadataServiceClient client = new MetadataServiceClientImpl(config);
- client.clear();
-
- List<Kafka2TupleMetadata> metadata = loadEntities(base + "datasources.json", Kafka2TupleMetadata.class);
- client.addDataSources(metadata);
-
- List<PolicyDefinition> policies = loadEntities(base + "policies.json", PolicyDefinition.class);
- client.addPolicies(policies);
-
- List<Publishment> pubs = loadEntities(base + "publishments.json", Publishment.class);
- client.addPublishments(pubs);
-
- List<StreamDefinition> defs = loadEntities(base + "streamdefinitions.json", StreamDefinition.class);
- client.addStreamDefinitions(defs);
-
- List<Topology> topos = loadEntities(base + "topologies.json", Topology.class);
- client.addTopologies(topos);
-
- client.close();
- }
-
- private <T> List<T> loadEntities(String path, Class<T> tClz) throws Exception {
- ObjectMapper om = new ObjectMapper();
- JavaType type = CollectionType.construct(List.class, SimpleType.construct(tClz));
- List<T> l = om.readValue(Integration1.class.getResourceAsStream(path), type);
- return l;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/72a1501c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/e2e/SampleClient1.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/e2e/SampleClient1.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/e2e/SampleClient1.java
index 348bf78..6bff94b 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/e2e/SampleClient1.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/e2e/SampleClient1.java
@@ -28,6 +28,9 @@ import org.slf4j.LoggerFactory;
import backtype.storm.utils.Utils;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+
/**
* @since May 9, 2016
*
@@ -60,7 +63,8 @@ public class SampleClient1 {
long base = System.currentTimeMillis();
AtomicLong msgCount = new AtomicLong();
- try (KafkaProducer<String, String> proceduer = createProceduer()) {
+ Config config = ConfigFactory.load();
+ try (KafkaProducer<String, String> proceduer = createProceduer(config)) {
while (true) {
int hostIndex = 6;
for (int i = 0; i < hostIndex; i++) {
@@ -108,12 +112,10 @@ public class SampleClient1 {
return Pair.of(base, JsonUtils.writeValueAsString(e));
}
- public static KafkaProducer<String, String> createProceduer() {
-
+ public static KafkaProducer<String, String> createProceduer(Config config) {
+ String servers = config.getString("kafkaProducer.bootstrapServers");
Properties configMap = new Properties();
- // String broker_list = zkconfig.zkQuorum;
- // TODO: replace boot strap servers with new workable server
- configMap.put("bootstrap.servers", "localhost:9092");
+ configMap.put("bootstrap.servers", servers);
// configMap.put("metadata.broker.list", broker_list);
configMap.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
configMap.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/72a1501c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/e2e/SampleClient2.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/e2e/SampleClient2.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/e2e/SampleClient2.java
index 06148cc..ad0079c 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/e2e/SampleClient2.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/e2e/SampleClient2.java
@@ -26,6 +26,9 @@ import org.apache.kafka.clients.producer.ProducerRecord;
import backtype.storm.utils.Utils;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+
/**
* @since May 10, 2016
*
@@ -51,7 +54,6 @@ public class SampleClient2 {
public String message;
public String host;
}
-
/**
* @param args
@@ -60,8 +62,10 @@ public class SampleClient2 {
AtomicLong base1 = new AtomicLong(System.currentTimeMillis());
AtomicLong base2 = new AtomicLong(System.currentTimeMillis());
AtomicLong count = new AtomicLong();
+
+ Config config = ConfigFactory.load();
- try (KafkaProducer<String, String> proceduer = SampleClient1.createProceduer()) {
+ try (KafkaProducer<String, String> proceduer = SampleClient1.createProceduer(config)) {
while (true) {
nextUuid = String.format(instanceUuidTemp, UUID.randomUUID().toString());
nextReqId = String.format(reqIdTemp, UUID.randomUUID().toString());
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/72a1501c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/e2e/SampleClient4NoDataAlert.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/e2e/SampleClient4NoDataAlert.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/e2e/SampleClient4NoDataAlert.java
new file mode 100644
index 0000000..f0e0d80
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/e2e/SampleClient4NoDataAlert.java
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.alert.engine.e2e;
+
+import java.util.Properties;
+
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import backtype.storm.utils.Utils;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+
+/**
+ * Since 6/29/16.
+ */
+@SuppressWarnings({ "rawtypes", "unchecked"})
+public class SampleClient4NoDataAlert {
+ private static final Logger LOG = LoggerFactory.getLogger(SampleClient4NoDataAlert.class);
+ private static long currentTimestamp = 1467240000000L;
+ private static long interval = 3000L;
+ private static volatile boolean host1Muted = false;
+ private static volatile boolean host2Muted = false;
+ public static void main(String[] args) throws Exception {
+ System.setProperty("config.resource", "/nodata/application-nodata.conf");
+ ConfigFactory.invalidateCaches();
+
+ Config config = ConfigFactory.load();
+ KafkaProducer producer = createProducer(config);
+ ProducerRecord record = null;
+ Thread x = new MuteThread();
+ x.start();
+ while(true) {
+ if(!host1Muted) {
+ record = new ProducerRecord("noDataAlertTopic", createEvent("host1"));
+ producer.send(record);
+ }
+ if(!host2Muted) {
+ record = new ProducerRecord("noDataAlertTopic", createEvent("host2"));
+ producer.send(record);
+ }
+ record = new ProducerRecord("noDataAlertTopic", createEvent("host3"));
+ producer.send(record);
+ Utils.sleep(interval);
+ currentTimestamp += interval;
+ }
+ }
+
+ private static class MuteThread extends Thread{
+ @Override
+ public void run(){
+ try {
+ // sleep 10 seconds
+ Thread.sleep(10000);
+ // mute host1
+ LOG.info("mute host1");
+ host1Muted = true;
+ // sleep 70 seconds for triggering no data alert
+ LOG.info("try to sleep 70 seconds for triggering no data alert");
+ Thread.sleep(70000);
+ // unmute host1
+ LOG.info("unmute host1");
+ host1Muted = false;
+ Thread.sleep(10000);
+ // mute host2
+ LOG.info("mute host2");
+ host2Muted = true;
+ // sleep 70 seconds for triggering no data alert
+ LOG.info("try to sleep 70 seconds for triggering no data alert");
+ Thread.sleep(70000);
+ LOG.info("unmute host2");
+ host2Muted = false;
+ }catch(Exception ex){
+ ex.printStackTrace();
+ }
+ }
+ }
+
+ private static class NoDataEvent{
+ @JsonProperty
+ long timestamp;
+ @JsonProperty
+ String host;
+ @JsonProperty
+ double value;
+
+ public String toString(){
+ return "timestamp=" + timestamp + ",host=" + host + ",value=" + value;
+ }
+ }
+
+ private static String createEvent(String host) throws Exception{
+ NoDataEvent e = new NoDataEvent();
+ long expectTS = currentTimestamp + interval;
+ // adjust back 1 second random
+ long adjust = Math.round(2*Math.random());
+ e.timestamp = expectTS-adjust;
+ e.host = host;
+ e.value = 25.6;
+ LOG.info("sending event {} ", e);
+ ObjectMapper mapper = new ObjectMapper();
+ String value = mapper.writeValueAsString(e);
+ return value;
+ }
+
+
+ public static KafkaProducer<String, String> createProducer(Config config) {
+ String servers = config.getString("kafkaProducer.bootstrapServers");
+ Properties configMap = new Properties();
+ configMap.put("bootstrap.servers", servers);
+ configMap.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
+ configMap.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
+ configMap.put("request.required.acks", "1");
+ configMap.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
+ configMap.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
+ KafkaProducer<String, String> proceduer = new KafkaProducer<String, String>(configMap);
+ return proceduer;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/72a1501c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/nodata/TestDistinctValuesInTimeWindow.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/nodata/TestDistinctValuesInTimeWindow.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/nodata/TestDistinctValuesInTimeWindow.java
new file mode 100644
index 0000000..f97b1a8
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/nodata/TestDistinctValuesInTimeWindow.java
@@ -0,0 +1,71 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.alert.engine.nodata;
+
+import java.util.Iterator;
+import java.util.Map;
+import java.util.SortedMap;
+import java.util.TreeMap;
+
+import org.apache.eagle.alert.engine.evaluator.impl.DistinctValuesInTimeWindow;
+import org.junit.Test;
+
+/**
+ * Since 6/28/16.
+ */
+public class TestDistinctValuesInTimeWindow {
+ @Test
+ public void test(){
+ DistinctValuesInTimeWindow window = new DistinctValuesInTimeWindow(60*1000);
+ window.send("1", 0);
+ window.send("2", 1000);
+ window.send("3", 1000);
+ window.send("1", 30000);
+ window.send("2", 50000);
+ window.send("1", 62000);
+ Map<Object, Long> values = window.distinctValues();
+ System.out.println(values);
+ }
+
+ @Test
+ public void testSort(){
+ SortedMap<DistinctValuesInTimeWindow.ValueAndTime, DistinctValuesInTimeWindow.ValueAndTime> timeSortedMap =
+ new TreeMap<>(new DistinctValuesInTimeWindow.ValueAndTimeComparator());
+ DistinctValuesInTimeWindow.ValueAndTime vt1 = new DistinctValuesInTimeWindow.ValueAndTime("1", 0);
+ timeSortedMap.put(vt1, vt1);
+ DistinctValuesInTimeWindow.ValueAndTime vt2 = new DistinctValuesInTimeWindow.ValueAndTime("2", 1000);
+ timeSortedMap.put(vt2, vt2);
+ DistinctValuesInTimeWindow.ValueAndTime vt3 = new DistinctValuesInTimeWindow.ValueAndTime("3", 1000);
+ timeSortedMap.put(vt3, vt3);
+ timeSortedMap.remove(new DistinctValuesInTimeWindow.ValueAndTime("1", 0));
+ DistinctValuesInTimeWindow.ValueAndTime vt4 = new DistinctValuesInTimeWindow.ValueAndTime("1", 30000);
+ timeSortedMap.put(vt4, vt4);
+ Iterator<?> it = timeSortedMap.entrySet().iterator();
+ while(it.hasNext()){
+ System.out.println(it.next());
+ }
+ timeSortedMap.remove(new DistinctValuesInTimeWindow.ValueAndTime("2", 1000));
+ DistinctValuesInTimeWindow.ValueAndTime vt5 = new DistinctValuesInTimeWindow.ValueAndTime("2", 50000);
+ timeSortedMap.put(vt5, vt5);
+ DistinctValuesInTimeWindow.ValueAndTime vt6 = new DistinctValuesInTimeWindow.ValueAndTime("1", 62000);
+ timeSortedMap.put(vt6, vt6);
+ it = timeSortedMap.entrySet().iterator();
+ while(it.hasNext()){
+ System.out.println(it.next());
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/72a1501c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/nodata/TestEventTable.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/nodata/TestEventTable.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/nodata/TestEventTable.java
new file mode 100644
index 0000000..4149e17
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/nodata/TestEventTable.java
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.alert.engine.nodata;
+
+import org.junit.Test;
+import org.wso2.siddhi.core.ExecutionPlanRuntime;
+import org.wso2.siddhi.core.SiddhiManager;
+import org.wso2.siddhi.core.event.Event;
+import org.wso2.siddhi.core.stream.output.StreamCallback;
+import org.wso2.siddhi.core.util.EventPrinter;
+
+/**
+ * Since 6/27/16.
+ */
+public class TestEventTable {
+ @Test
+ public void test() throws Exception{
+ ExecutionPlanRuntime runtime = new SiddhiManager().createExecutionPlanRuntime(
+ "define stream expectStream (key string, src string);"+
+ "define stream appearStream (key string, src string);"+
+ "define table expectTable (key string, src string);"+
+ "from expectStream insert into expectTable;" +
+ "from appearStream[(expectTable.key==key) in expectTable] insert into outputStream;"
+ );
+
+ runtime.addCallback("outputStream", new StreamCallback() {
+ @Override
+ public void receive(Event[] events) {
+ EventPrinter.print(events);
+ }
+ });
+
+ runtime.start();
+ runtime.getInputHandler("expectStream").send(System.currentTimeMillis(), new Object[]{"host1","expectStream"});
+ Thread.sleep(2000);
+ runtime.getInputHandler("appearStream").send(System.currentTimeMillis(), new Object[]{"host2","expectStream"});
+ Thread.sleep(2000);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/72a1501c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/nodata/TestNoDataAlert.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/nodata/TestNoDataAlert.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/nodata/TestNoDataAlert.java
new file mode 100644
index 0000000..569a3b0
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/nodata/TestNoDataAlert.java
@@ -0,0 +1,83 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.alert.engine.nodata;
+
+import org.junit.Test;
+import org.wso2.siddhi.core.ExecutionPlanRuntime;
+import org.wso2.siddhi.core.SiddhiManager;
+import org.wso2.siddhi.core.event.Event;
+import org.wso2.siddhi.core.stream.output.StreamCallback;
+import org.wso2.siddhi.core.util.EventPrinter;
+
+/**
+ * Since 6/27/16.
+ */
+public class TestNoDataAlert {
+ @Test
+ public void test() throws Exception{
+ String[] expectHosts = new String[]{"host_1","host_2","host_3","host_4","host_5","host_6","host_7","host_8"};
+// String[] appearHosts = new String[]{"host_6","host_7","host_8"};
+// String[] noDataHosts = new String[]{"host_1","host_2","host_3","host_4","host_5"};
+
+ ExecutionPlanRuntime runtime = new SiddhiManager().createExecutionPlanRuntime(
+ "define stream appearStream (key string, src string);"+
+ "define stream expectStream (key string, src string);"+
+ "define table expectTable (key string, src string);"+
+ "define trigger fiveSecTriggerStream at every 1 sec;"+
+ "define trigger initAppearTriggerStream at 'start';"+
+ "from expectStream insert into expectTable;"+
+ "from fiveSecTriggerStream join expectTable insert into triggerExpectStream;"+
+ "from initAppearTriggerStream join expectTable insert into initAppearStream;"
+// "from triggerExpectStream as l left outer join appearStream#window.time(5 sec) as r on l.key == r.key select l.key as k1,r.key as k2 insert current events into joinStream;" +
+// "from joinStream[k2 is null] select k1 insert current events into missingStream;"
+ );
+
+// ExecutionPlanRuntime runtime = new SiddhiManager().createExecutionPlanRuntime(
+// "define stream appearStream (key string, src string);"+
+// "define stream expectStream (key string, src string);"+
+// "define table expectTable (key string, src string);"+
+// "from expectStream insert into expectTable;"+
+// "from appearStream#window.time(10 sec) as l right outer join expectTable as r on l.key == r.key select r.key as k2, l.key as k1 insert current events into joinStream;" +
+// "from joinStream[k1 is null] select k2 insert current events into missingStream;"
+//// "from joinStream insert into missingStream;"
+//
+// );
+
+ runtime.addCallback("initAppearStream", new StreamCallback() {
+ @Override
+ public void receive(Event[] events) {
+ EventPrinter.print(events);
+ }
+ });
+
+ runtime.start();
+ for(String host: expectHosts) {
+ runtime.getInputHandler("expectStream").send(System.currentTimeMillis(), new Object[]{host,"expectStream"});
+ }
+
+// for(String host:appearHosts) {
+// runtime.getInputHandler("appearStream").send(System.currentTimeMillis(), new Object[]{host,"inStream"});
+// }
+
+ Thread.sleep(5000);
+
+// for(String host:appearHosts) {
+// runtime.getInputHandler("appearStream").send(System.currentTimeMillis(), new Object[]{host,"inStream"});
+// }
+// Thread.sleep(10000);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/72a1501c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/nodata/TestNoDataPolicyHandler.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/nodata/TestNoDataPolicyHandler.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/nodata/TestNoDataPolicyHandler.java
new file mode 100644
index 0000000..6c48def
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/nodata/TestNoDataPolicyHandler.java
@@ -0,0 +1,134 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.alert.engine.nodata;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.eagle.alert.engine.Collector;
+import org.apache.eagle.alert.engine.coordinator.PolicyDefinition;
+import org.apache.eagle.alert.engine.coordinator.StreamColumn;
+import org.apache.eagle.alert.engine.coordinator.StreamDefinition;
+import org.apache.eagle.alert.engine.evaluator.PolicyHandlerContext;
+import org.apache.eagle.alert.engine.evaluator.nodata.NoDataPolicyHandler;
+import org.apache.eagle.alert.engine.model.AlertStreamEvent;
+import org.apache.eagle.alert.engine.model.StreamEvent;
+import org.junit.Assert;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Since 6/29/16.
+ */
+public class TestNoDataPolicyHandler {
+ private static final Logger LOG = LoggerFactory.getLogger(TestNoDataPolicyHandler.class);
+ private static final String inputStream = "testInputStream";
+ private static final String outputStream = "testOutputStream";
+
+ @Test
+ public void test() throws Exception{
+ test(buildPolicyDef_provided());
+ test(buildPolicyDef_dynamic());
+ }
+
+ @SuppressWarnings("unchecked")
+ public void test(PolicyDefinition pd) throws Exception{
+ Map<String, StreamDefinition> sds = new HashMap<>();
+ StreamDefinition sd = buildStreamDef();
+ sds.put("testInputStream", sd);
+ NoDataPolicyHandler handler = new NoDataPolicyHandler(sds);
+
+ PolicyHandlerContext context = new PolicyHandlerContext();
+ context.setPolicyDefinition(pd);
+ handler.prepare(new TestCollector(), context);
+
+ handler.send(buildStreamEvt(0, "host1", 12.5));
+ handler.send(buildStreamEvt(0, "host2", 12.6));
+ handler.send(buildStreamEvt(100, "host1", 20.9));
+ handler.send(buildStreamEvt(120, "host2", 22.1));
+ handler.send(buildStreamEvt(4000, "host2", 22.1));
+ handler.send(buildStreamEvt(50000, "host2", 22.1));
+ handler.send(buildStreamEvt(60150, "host2", 22.3));
+ handler.send(buildStreamEvt(60450, "host2", 22.9));
+ handler.send(buildStreamEvt(75000, "host1", 41.6));
+ handler.send(buildStreamEvt(85000, "host2", 45.6));
+ }
+
+ @SuppressWarnings("rawtypes")
+ private static class TestCollector implements Collector{
+ @Override
+ public void emit(Object o) {
+ AlertStreamEvent e = (AlertStreamEvent)o;
+ Object[] data = e.getData();
+ Assert.assertEquals("host2", data[1]);
+ LOG.info(e.toString());
+ }
+ }
+
+ private PolicyDefinition buildPolicyDef_provided(){
+ PolicyDefinition pd = new PolicyDefinition();
+ PolicyDefinition.Definition def = new PolicyDefinition.Definition();
+ def.setValue("PT1M,provided,1,host,host1,host2");
+ def.setType("string");
+ pd.setDefinition(def);
+ pd.setInputStreams(Arrays.asList(inputStream));
+ pd.setOutputStreams(Arrays.asList(outputStream));
+ pd.setName("nodataalert-test");
+ return pd;
+ }
+
+ private PolicyDefinition buildPolicyDef_dynamic(){
+ PolicyDefinition pd = new PolicyDefinition();
+ PolicyDefinition.Definition def = new PolicyDefinition.Definition();
+ def.setValue("PT1M,dynamic,1,host");
+ def.setType("string");
+ pd.setDefinition(def);
+ pd.setInputStreams(Arrays.asList(inputStream));
+ pd.setOutputStreams(Arrays.asList(outputStream));
+ pd.setName("nodataalert-test");
+ return pd;
+ }
+ private StreamDefinition buildStreamDef(){
+ StreamDefinition sd = new StreamDefinition();
+ StreamColumn tsColumn = new StreamColumn();
+ tsColumn.setName("timestamp");
+ tsColumn.setType(StreamColumn.Type.LONG);
+
+ StreamColumn hostColumn = new StreamColumn();
+ hostColumn.setName("host");
+ hostColumn.setType(StreamColumn.Type.STRING);
+
+ StreamColumn valueColumn = new StreamColumn();
+ valueColumn.setName("value");
+ valueColumn.setType(StreamColumn.Type.DOUBLE);
+
+ sd.setColumns(Arrays.asList(tsColumn, hostColumn, valueColumn));
+ sd.setDataSource("testDataSource");
+ sd.setStreamId("testStreamId");
+ return sd;
+ }
+
+ private StreamEvent buildStreamEvt(long ts, String host, double value){
+ StreamEvent e = new StreamEvent();
+ e.setData(new Object[]{ts, host, value});
+ e.setStreamId(inputStream);
+ e.setTimestamp(ts);
+ return e;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/72a1501c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/router/TestAlertPublisherBolt.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/router/TestAlertPublisherBolt.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/router/TestAlertPublisherBolt.java
index af79f96..23ddd69 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/router/TestAlertPublisherBolt.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/router/TestAlertPublisherBolt.java
@@ -18,10 +18,10 @@
package org.apache.eagle.alert.engine.router;
-import com.fasterxml.jackson.databind.JavaType;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.databind.type.CollectionType;
-import com.fasterxml.jackson.databind.type.SimpleType;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
import org.apache.eagle.alert.coordination.model.PublishSpec;
import org.apache.eagle.alert.engine.coordinator.PolicyDefinition;
import org.apache.eagle.alert.engine.coordinator.Publishment;
@@ -35,24 +35,25 @@ import org.junit.Assert;
import org.junit.Ignore;
import org.junit.Test;
+import com.fasterxml.jackson.databind.JavaType;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.type.CollectionType;
+import com.fasterxml.jackson.databind.type.SimpleType;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
/**
* @Since 5/14/16.
*/
public class TestAlertPublisherBolt {
+ @SuppressWarnings("rawtypes")
@Ignore
@Test
public void test() {
Config config = ConfigFactory.load("application-test.conf");
AlertPublisher publisher = new AlertPublisherImpl("alertPublishBolt");
- publisher.init(config);
+ publisher.init(config, new HashMap());
PublishSpec spec = MetadataSerDeser.deserialize(getClass().getResourceAsStream("/testPublishSpec.json"), PublishSpec.class);
publisher.onPublishChange(spec.getPublishments(), null, null, null);
AlertStreamEvent event = create("testAlertStream");
@@ -95,7 +96,7 @@ public class TestAlertPublisherBolt {
@Test
public void testAlertPublisher() throws Exception {
AlertPublisher alertPublisher = new AlertPublisherImpl("alert-publisher-test");
- List<Publishment> oldPubs = loadEntities("/publishments.json", Publishment.class);
+ List<Publishment> oldPubs = loadEntities("/publishments1.json", Publishment.class);
List<Publishment> newPubs = loadEntities("/publishments2.json", Publishment.class);
alertPublisher.onPublishChange(oldPubs, null, null, null);
alertPublisher.onPublishChange(null, null, newPubs, oldPubs);
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/72a1501c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/runner/TestStreamRouterBolt.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/runner/TestStreamRouterBolt.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/runner/TestStreamRouterBolt.java
index 8c048cb..2de2073 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/runner/TestStreamRouterBolt.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/runner/TestStreamRouterBolt.java
@@ -16,16 +16,19 @@
*/
package org.apache.eagle.alert.engine.runner;
-import backtype.storm.metric.api.MultiCountMetric;
-import backtype.storm.task.GeneralTopologyContext;
-import backtype.storm.task.IOutputCollector;
-import backtype.storm.task.OutputCollector;
-import backtype.storm.task.TopologyContext;
-import backtype.storm.tuple.Fields;
-import backtype.storm.tuple.Tuple;
-import backtype.storm.tuple.TupleImpl;
-import com.typesafe.config.Config;
-import com.typesafe.config.ConfigFactory;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
import org.apache.eagle.alert.coordination.model.PolicyWorkerQueue;
import org.apache.eagle.alert.coordination.model.RouterSpec;
import org.apache.eagle.alert.coordination.model.StreamRouterSpec;
@@ -45,12 +48,17 @@ import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.IOException;
-import java.util.*;
+import backtype.storm.metric.api.MultiCountMetric;
+import backtype.storm.task.GeneralTopologyContext;
+import backtype.storm.task.IOutputCollector;
+import backtype.storm.task.OutputCollector;
+import backtype.storm.task.TopologyContext;
+import backtype.storm.tuple.Fields;
+import backtype.storm.tuple.Tuple;
+import backtype.storm.tuple.TupleImpl;
-import static org.mockito.Matchers.any;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
public class TestStreamRouterBolt {
private final static Logger LOG = LoggerFactory.getLogger(TestStreamRouterBolt.class);
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/72a1501c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/serialization/PartitionedEventSerializerTest.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/serialization/PartitionedEventSerializerTest.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/serialization/PartitionedEventSerializerTest.java
index 0347d50..a756ebe 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/serialization/PartitionedEventSerializerTest.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/serialization/PartitionedEventSerializerTest.java
@@ -16,14 +16,9 @@
*/
package org.apache.eagle.alert.engine.serialization;
-import backtype.storm.serialization.DefaultKryoFactory;
-import backtype.storm.serialization.DefaultSerializationDelegate;
-import com.esotericsoftware.kryo.*;
-import com.esotericsoftware.kryo.io.Input;
-import com.esotericsoftware.kryo.io.Output;
-import com.google.common.io.ByteArrayDataInput;
-import com.google.common.io.ByteArrayDataOutput;
-import com.google.common.io.ByteStreams;
+import java.io.IOException;
+import java.util.BitSet;
+
import org.apache.commons.lang.time.StopWatch;
import org.apache.eagle.alert.engine.mock.MockSampleMetadataFactory;
import org.apache.eagle.alert.engine.model.PartitionedEvent;
@@ -35,12 +30,20 @@ import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.IOException;
-import java.util.BitSet;
+import backtype.storm.serialization.DefaultKryoFactory;
+import backtype.storm.serialization.DefaultSerializationDelegate;
+
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.Output;
+import com.google.common.io.ByteArrayDataInput;
+import com.google.common.io.ByteArrayDataOutput;
+import com.google.common.io.ByteStreams;
public class PartitionedEventSerializerTest {
private final static Logger LOG = LoggerFactory.getLogger(PartitionedEventSerializerTest.class);
+ @SuppressWarnings("deprecation")
@Test
public void testPartitionEventSerialization() throws IOException {
PartitionedEvent partitionedEvent = MockSampleMetadataFactory.createPartitionedEventGroupedByName("sampleStream",System.currentTimeMillis());;
@@ -76,6 +79,7 @@ public class PartitionedEventSerializerTest {
Assert.assertEquals(partitionedEvent,kryoDeserializedEvent);
LOG.info("\nCached Stream:{}\nCompressed Cached Stream :{}\nCached Stream + Cached Partition: {}\nJava Native: {}\nKryo: {}\nKryo + Cached Stream: {}\nKryo + Cached Stream + Cached Partition: {}",serializedBytes.length,serializedBytesCompressed.length,serializedBytes2.length,javaSerialization.length,kryoBytes.length,kryoSerialize(serializedBytes).length,kryoSerialize(serializedBytes2).length);
}
+ @SuppressWarnings("deprecation")
@Test
public void testPartitionEventSerializationEfficiency() throws IOException {
PartitionedEvent partitionedEvent = MockSampleMetadataFactory.createPartitionedEventGroupedByName("sampleStream",System.currentTimeMillis());;