You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@eagle.apache.org by ha...@apache.org on 2016/09/08 07:14:06 UTC
[05/18] incubator-eagle git commit: [EAGLE-530] Fix checkstyle
problems on eagle-alert module and enable failOnViolation
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a0fc8708/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/nodata/TestNoDataAlert.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/nodata/TestNoDataAlert.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/nodata/TestNoDataAlert.java
index f50ad15..fe70630 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/nodata/TestNoDataAlert.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/nodata/TestNoDataAlert.java
@@ -28,20 +28,20 @@ import org.wso2.siddhi.core.util.EventPrinter;
*/
public class TestNoDataAlert {
@Test
- public void test() throws Exception{
- String[] expectHosts = new String[]{"host_1","host_2","host_3","host_4","host_5","host_6","host_7","host_8"};
+ public void test() throws Exception {
+ String[] expectHosts = new String[] {"host_1", "host_2", "host_3", "host_4", "host_5", "host_6", "host_7", "host_8"};
// String[] appearHosts = new String[]{"host_6","host_7","host_8"};
// String[] noDataHosts = new String[]{"host_1","host_2","host_3","host_4","host_5"};
ExecutionPlanRuntime runtime = new SiddhiManager().createExecutionPlanRuntime(
- "define stream appearStream (key string, src string);"+
- "define stream expectStream (key string, src string);"+
- "define table expectTable (key string, src string);"+
- "define trigger fiveSecTriggerStream at every 1 sec;"+
- "define trigger initAppearTriggerStream at 'start';"+
- "from expectStream insert into expectTable;"+
- "from fiveSecTriggerStream join expectTable insert into triggerExpectStream;"+
- "from initAppearTriggerStream join expectTable insert into initAppearStream;"
+ "define stream appearStream (key string, src string);" +
+ "define stream expectStream (key string, src string);" +
+ "define table expectTable (key string, src string);" +
+ "define trigger fiveSecTriggerStream at every 1 sec;" +
+ "define trigger initAppearTriggerStream at 'start';" +
+ "from expectStream insert into expectTable;" +
+ "from fiveSecTriggerStream join expectTable insert into triggerExpectStream;" +
+ "from initAppearTriggerStream join expectTable insert into initAppearStream;"
// "from triggerExpectStream as l left outer join appearStream#window.time(5 sec) as r on l.key == r.key select l.key as k1,r.key as k2 insert current events into joinStream;" +
// "from joinStream[k2 is null] select k1 insert current events into missingStream;"
);
@@ -65,8 +65,8 @@ public class TestNoDataAlert {
});
runtime.start();
- for(String host: expectHosts) {
- runtime.getInputHandler("expectStream").send(System.currentTimeMillis(), new Object[]{host,"expectStream"});
+ for (String host : expectHosts) {
+ runtime.getInputHandler("expectStream").send(System.currentTimeMillis(), new Object[] {host, "expectStream"});
}
// for(String host:appearHosts) {
@@ -83,17 +83,17 @@ public class TestNoDataAlert {
/**
* only alert when the successive 2 events has number of missing blocks changed
- *from every a = hadoopJmxMetricEventStream[ component=="namenode" and metric == "hadoop.namenode.dfs.missingblocks"] -> b = hadoopJmxMetricEventStream[b.component==a.component and b.metric==a.metric and b.host==a.host and convert(b.value, "long") > convert(a.value, "long") ] select b.metric as metric, b.host as host, b.value as newNumOfMissingBlocks, a.value as oldNumOfMissingBlocks, b.timestamp as timestamp, b.component as component, b.site as site insert into tmp;
+ * from every a = hadoopJmxMetricEventStream[ component=="namenode" and metric == "hadoop.namenode.dfs.missingblocks"] -> b = hadoopJmxMetricEventStream[b.component==a.component and b.metric==a.metric and b.host==a.host and convert(b.value, "long") > convert(a.value, "long") ] select b.metric as metric, b.host as host, b.value as newNumOfMissingBlocks, a.value as oldNumOfMissingBlocks, b.timestamp as timestamp, b.component as component, b.site as site insert into tmp;
*/
@Test
- public void testMissingBlock() throws Exception{
+ public void testMissingBlock() throws Exception {
ExecutionPlanRuntime runtime = new SiddhiManager().createExecutionPlanRuntime(
- "define stream hadoopJmxMetricEventStream (component string, metric string, host string, site string, value double, timestamp long);"+
- "from every a = hadoopJmxMetricEventStream[ component==\"namenode\" and metric == \"hadoop.namenode.dfs.missingblocks\"] -> "+
- "b = hadoopJmxMetricEventStream[b.component==a.component and b.metric==a.metric and b.host==a.host and "+
- "convert(b.value, \"long\") > convert(a.value, \"long\") ] select b.metric as metric, b.host as host, "+
- "b.value as newNumOfMissingBlocks, a.value as oldNumOfMissingBlocks, b.timestamp as timestamp, b.component as component, " +
- "b.site as site insert into outputStream;"
+ "define stream hadoopJmxMetricEventStream (component string, metric string, host string, site string, value double, timestamp long);" +
+ "from every a = hadoopJmxMetricEventStream[ component==\"namenode\" and metric == \"hadoop.namenode.dfs.missingblocks\"] -> " +
+ "b = hadoopJmxMetricEventStream[b.component==a.component and b.metric==a.metric and b.host==a.host and " +
+ "convert(b.value, \"long\") > convert(a.value, \"long\") ] select b.metric as metric, b.host as host, " +
+ "b.value as newNumOfMissingBlocks, a.value as oldNumOfMissingBlocks, b.timestamp as timestamp, b.component as component, " +
+ "b.site as site insert into outputStream;"
);
runtime.addCallback("outputStream", new StreamCallback() {
@@ -104,9 +104,9 @@ public class TestNoDataAlert {
});
runtime.start();
- runtime.getInputHandler("hadoopJmxMetricEventStream").send(System.currentTimeMillis(), new Object[]{"namenode", "hadoop.namenode.dfs.missingblocks", "host1", "site1", 12.0, 123000L});
- runtime.getInputHandler("hadoopJmxMetricEventStream").send(System.currentTimeMillis(), new Object[]{"namenode", "hadoop.namenode.dfs.missingblocks", "host1", "site1", 13.0, 123100L});
- runtime.getInputHandler("hadoopJmxMetricEventStream").send(System.currentTimeMillis(), new Object[]{"namenode", "hadoop.namenode.dfs.missingblocks", "host1", "site1", 16.0, 123200L});
+ runtime.getInputHandler("hadoopJmxMetricEventStream").send(System.currentTimeMillis(), new Object[] {"namenode", "hadoop.namenode.dfs.missingblocks", "host1", "site1", 12.0, 123000L});
+ runtime.getInputHandler("hadoopJmxMetricEventStream").send(System.currentTimeMillis(), new Object[] {"namenode", "hadoop.namenode.dfs.missingblocks", "host1", "site1", 13.0, 123100L});
+ runtime.getInputHandler("hadoopJmxMetricEventStream").send(System.currentTimeMillis(), new Object[] {"namenode", "hadoop.namenode.dfs.missingblocks", "host1", "site1", 16.0, 123200L});
Thread.sleep(5000);
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a0fc8708/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/nodata/TestNoDataPolicyHandler.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/nodata/TestNoDataPolicyHandler.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/nodata/TestNoDataPolicyHandler.java
index 6305da8..5564b90 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/nodata/TestNoDataPolicyHandler.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/nodata/TestNoDataPolicyHandler.java
@@ -16,10 +16,6 @@
*/
package org.apache.eagle.alert.engine.nodata;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.Map;
-
import org.apache.eagle.alert.engine.Collector;
import org.apache.eagle.alert.engine.coordinator.PolicyDefinition;
import org.apache.eagle.alert.engine.coordinator.StreamColumn;
@@ -33,6 +29,10 @@ import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+
/**
* Since 6/29/16.
*/
@@ -42,13 +42,13 @@ public class TestNoDataPolicyHandler {
private static final String outputStream = "testOutputStream";
@Test
- public void test() throws Exception{
+ public void test() throws Exception {
test(buildPolicyDef_provided());
test(buildPolicyDef_dynamic());
}
@SuppressWarnings("unchecked")
- public void test(PolicyDefinition pd) throws Exception{
+ public void test(PolicyDefinition pd) throws Exception {
Map<String, StreamDefinition> sds = new HashMap<>();
StreamDefinition sd = buildStreamDef();
sds.put("testInputStream", sd);
@@ -71,17 +71,17 @@ public class TestNoDataPolicyHandler {
}
@SuppressWarnings("rawtypes")
- private static class TestCollector implements Collector{
+ private static class TestCollector implements Collector {
@Override
public void emit(Object o) {
- AlertStreamEvent e = (AlertStreamEvent)o;
+ AlertStreamEvent e = (AlertStreamEvent) o;
Object[] data = e.getData();
Assert.assertEquals("host2", data[1]);
LOG.info(e.toString());
}
}
- private PolicyDefinition buildPolicyDef_provided(){
+ private PolicyDefinition buildPolicyDef_provided() {
PolicyDefinition pd = new PolicyDefinition();
PolicyDefinition.Definition def = new PolicyDefinition.Definition();
def.setValue("PT1M,provided,1,host,host1,host2");
@@ -93,7 +93,7 @@ public class TestNoDataPolicyHandler {
return pd;
}
- private PolicyDefinition buildPolicyDef_dynamic(){
+ private PolicyDefinition buildPolicyDef_dynamic() {
PolicyDefinition pd = new PolicyDefinition();
PolicyDefinition.Definition def = new PolicyDefinition.Definition();
def.setValue("PT1M,dynamic,1,host");
@@ -104,7 +104,8 @@ public class TestNoDataPolicyHandler {
pd.setName("nodataalert-test");
return pd;
}
- private StreamDefinition buildStreamDef(){
+
+ private StreamDefinition buildStreamDef() {
StreamDefinition sd = new StreamDefinition();
StreamColumn tsColumn = new StreamColumn();
tsColumn.setName("timestamp");
@@ -124,9 +125,9 @@ public class TestNoDataPolicyHandler {
return sd;
}
- private StreamEvent buildStreamEvt(long ts, String host, double value){
+ private StreamEvent buildStreamEvt(long ts, String host, double value) {
StreamEvent e = new StreamEvent();
- e.setData(new Object[]{ts, host, value});
+ e.setData(new Object[] {ts, host, value});
e.setStreamId(inputStream);
e.setTimestamp(ts);
return e;
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a0fc8708/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/nodata/TestNoDataPolicyTimeBatchHandler.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/nodata/TestNoDataPolicyTimeBatchHandler.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/nodata/TestNoDataPolicyTimeBatchHandler.java
index 02d19b4..84844e7 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/nodata/TestNoDataPolicyTimeBatchHandler.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/nodata/TestNoDataPolicyTimeBatchHandler.java
@@ -16,10 +16,6 @@
*/
package org.apache.eagle.alert.engine.nodata;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.Map;
-
import org.apache.eagle.alert.engine.Collector;
import org.apache.eagle.alert.engine.coordinator.PolicyDefinition;
import org.apache.eagle.alert.engine.coordinator.StreamColumn;
@@ -34,125 +30,129 @@ import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+
public class TestNoDataPolicyTimeBatchHandler {
- private static final Logger LOG = LoggerFactory.getLogger(TestNoDataPolicyTimeBatchHandler.class);
-
- private static final String inputStream = "testInputStream";
- private static final String outputStream = "testOutputStream";
-
- @Before
- public void setup() {
- }
-
- @SuppressWarnings("unchecked")
- @Test
- public void testDynamic1() throws Exception {
- Map<String, StreamDefinition> sds = new HashMap<>();
- sds.put("testInputStream", buildStreamDef());
- sds.put("testOutputStream", buildOutputStreamDef());
- NoDataPolicyTimeBatchHandler handler = new NoDataPolicyTimeBatchHandler(sds);
-
- PolicyHandlerContext context = new PolicyHandlerContext();
- context.setPolicyDefinition(buildPolicyDef_dynamic());
- handler.prepare(new TestCollector(), context);
-
- long now = System.currentTimeMillis();
-
- handler.send(buildStreamEvt(now, "host1", 12.5));
-
- Thread.sleep(2000);
-
- handler.send(buildStreamEvt(now, "host2", 12.6));
- handler.send(buildStreamEvt(now, "host1", 20.9));
- handler.send(buildStreamEvt(now, "host2", 22.1));
- handler.send(buildStreamEvt(now, "host2", 22.1));
-
- Thread.sleep(5000);
-
- handler.send(buildStreamEvt(now, "host2", 22.1));
- handler.send(buildStreamEvt(now, "host2", 22.3));
-
- Thread.sleep(5000);
-
- handler.send(buildStreamEvt(now, "host2", 22.9));
- handler.send(buildStreamEvt(now, "host1", 41.6));
- handler.send(buildStreamEvt(now, "host2", 45.6));
-
- Thread.sleep(1000);
- }
-
- @SuppressWarnings("rawtypes")
- private static class TestCollector implements Collector{
+ private static final Logger LOG = LoggerFactory.getLogger(TestNoDataPolicyTimeBatchHandler.class);
+
+ private static final String inputStream = "testInputStream";
+ private static final String outputStream = "testOutputStream";
+
+ @Before
+ public void setup() {
+ }
+
+ @SuppressWarnings("unchecked")
+ @Test
+ public void testDynamic1() throws Exception {
+ Map<String, StreamDefinition> sds = new HashMap<>();
+ sds.put("testInputStream", buildStreamDef());
+ sds.put("testOutputStream", buildOutputStreamDef());
+ NoDataPolicyTimeBatchHandler handler = new NoDataPolicyTimeBatchHandler(sds);
+
+ PolicyHandlerContext context = new PolicyHandlerContext();
+ context.setPolicyDefinition(buildPolicyDef_dynamic());
+ handler.prepare(new TestCollector(), context);
+
+ long now = System.currentTimeMillis();
+
+ handler.send(buildStreamEvt(now, "host1", 12.5));
+
+ Thread.sleep(2000);
+
+ handler.send(buildStreamEvt(now, "host2", 12.6));
+ handler.send(buildStreamEvt(now, "host1", 20.9));
+ handler.send(buildStreamEvt(now, "host2", 22.1));
+ handler.send(buildStreamEvt(now, "host2", 22.1));
+
+ Thread.sleep(5000);
+
+ handler.send(buildStreamEvt(now, "host2", 22.1));
+ handler.send(buildStreamEvt(now, "host2", 22.3));
+
+ Thread.sleep(5000);
+
+ handler.send(buildStreamEvt(now, "host2", 22.9));
+ handler.send(buildStreamEvt(now, "host1", 41.6));
+ handler.send(buildStreamEvt(now, "host2", 45.6));
+
+ Thread.sleep(1000);
+ }
+
+ @SuppressWarnings("rawtypes")
+ private static class TestCollector implements Collector {
@Override
public void emit(Object o) {
- AlertStreamEvent e = (AlertStreamEvent)o;
+ AlertStreamEvent e = (AlertStreamEvent) o;
Object[] data = e.getData();
-
+
LOG.info("alert data: {}, {}", data[1], data[0]);
-
+
Assert.assertEquals("host1", data[1]);
}
}
- private PolicyDefinition buildPolicyDef_dynamic() {
- PolicyDefinition pd = new PolicyDefinition();
- PolicyDefinition.Definition def = new PolicyDefinition.Definition();
- def.setValue("PT5S,dynamic,1,host");
- def.setType("nodataalert");
- pd.setDefinition(def);
- pd.setInputStreams(Arrays.asList(inputStream));
- pd.setOutputStreams(Arrays.asList(outputStream));
- pd.setName("nodataalert-test");
- return pd;
- }
-
- private StreamDefinition buildStreamDef() {
- StreamDefinition sd = new StreamDefinition();
- StreamColumn tsColumn = new StreamColumn();
- tsColumn.setName("timestamp");
- tsColumn.setType(StreamColumn.Type.LONG);
-
- StreamColumn hostColumn = new StreamColumn();
- hostColumn.setName("host");
- hostColumn.setType(StreamColumn.Type.STRING);
-
- StreamColumn valueColumn = new StreamColumn();
- valueColumn.setName("value");
- valueColumn.setType(StreamColumn.Type.DOUBLE);
-
- sd.setColumns(Arrays.asList(tsColumn, hostColumn, valueColumn));
- sd.setDataSource("testDataSource");
- sd.setStreamId("testInputStream");
- return sd;
- }
-
- private StreamDefinition buildOutputStreamDef() {
- StreamDefinition sd = new StreamDefinition();
- StreamColumn tsColumn = new StreamColumn();
- tsColumn.setName("timestamp");
- tsColumn.setType(StreamColumn.Type.LONG);
-
- StreamColumn hostColumn = new StreamColumn();
- hostColumn.setName("host");
- hostColumn.setType(StreamColumn.Type.STRING);
-
- StreamColumn valueColumn = new StreamColumn();
- valueColumn.setName("originalStreamName");
- valueColumn.setType(StreamColumn.Type.STRING);
-
- sd.setColumns(Arrays.asList(tsColumn, hostColumn, valueColumn));
- sd.setDataSource("testDataSource");
- sd.setStreamId("testOutputStream");
- return sd;
- }
-
- private StreamEvent buildStreamEvt(long ts, String host, double value) {
- StreamEvent e = new StreamEvent();
- e.setData(new Object[] { ts, host, value });
- e.setStreamId(inputStream);
- e.setTimestamp(ts);
- return e;
- }
+ private PolicyDefinition buildPolicyDef_dynamic() {
+ PolicyDefinition pd = new PolicyDefinition();
+ PolicyDefinition.Definition def = new PolicyDefinition.Definition();
+ def.setValue("PT5S,dynamic,1,host");
+ def.setType("nodataalert");
+ pd.setDefinition(def);
+ pd.setInputStreams(Arrays.asList(inputStream));
+ pd.setOutputStreams(Arrays.asList(outputStream));
+ pd.setName("nodataalert-test");
+ return pd;
+ }
+
+ private StreamDefinition buildStreamDef() {
+ StreamDefinition sd = new StreamDefinition();
+ StreamColumn tsColumn = new StreamColumn();
+ tsColumn.setName("timestamp");
+ tsColumn.setType(StreamColumn.Type.LONG);
+
+ StreamColumn hostColumn = new StreamColumn();
+ hostColumn.setName("host");
+ hostColumn.setType(StreamColumn.Type.STRING);
+
+ StreamColumn valueColumn = new StreamColumn();
+ valueColumn.setName("value");
+ valueColumn.setType(StreamColumn.Type.DOUBLE);
+
+ sd.setColumns(Arrays.asList(tsColumn, hostColumn, valueColumn));
+ sd.setDataSource("testDataSource");
+ sd.setStreamId("testInputStream");
+ return sd;
+ }
+
+ private StreamDefinition buildOutputStreamDef() {
+ StreamDefinition sd = new StreamDefinition();
+ StreamColumn tsColumn = new StreamColumn();
+ tsColumn.setName("timestamp");
+ tsColumn.setType(StreamColumn.Type.LONG);
+
+ StreamColumn hostColumn = new StreamColumn();
+ hostColumn.setName("host");
+ hostColumn.setType(StreamColumn.Type.STRING);
+
+ StreamColumn valueColumn = new StreamColumn();
+ valueColumn.setName("originalStreamName");
+ valueColumn.setType(StreamColumn.Type.STRING);
+
+ sd.setColumns(Arrays.asList(tsColumn, hostColumn, valueColumn));
+ sd.setDataSource("testDataSource");
+ sd.setStreamId("testOutputStream");
+ return sd;
+ }
+
+ private StreamEvent buildStreamEvt(long ts, String host, double value) {
+ StreamEvent e = new StreamEvent();
+ e.setData(new Object[] {ts, host, value});
+ e.setStreamId(inputStream);
+ e.setTimestamp(ts);
+ return e;
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a0fc8708/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/perf/TestSerDeserPer.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/perf/TestSerDeserPer.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/perf/TestSerDeserPer.java
index f2027b2..77ab9c3 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/perf/TestSerDeserPer.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/perf/TestSerDeserPer.java
@@ -16,12 +16,9 @@
*/
package org.apache.eagle.alert.engine.perf;
-import java.io.FileInputStream;
-import java.io.FileOutputStream;
-import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.List;
-
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.Output;
import org.apache.commons.io.FilenameUtils;
import org.apache.eagle.alert.engine.coordinator.StreamPartition;
import org.apache.eagle.alert.engine.coordinator.StreamSortSpec;
@@ -31,23 +28,26 @@ import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
-import com.esotericsoftware.kryo.Kryo;
-import com.esotericsoftware.kryo.io.Input;
-import com.esotericsoftware.kryo.io.Output;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
/**
* Since 5/13/16.
*/
public class TestSerDeserPer {
Object[] data = null;
+
@Before
- public void before(){
+ public void before() {
int max = 100;
StringBuilder sb = new StringBuilder();
- for(int i=0; i<max; i++){
+ for (int i = 0; i < max; i++) {
sb.append("a");
}
- data = new Object[]{sb.toString()};
+ data = new Object[] {sb.toString()};
}
private String getTmpPath() {
@@ -55,11 +55,11 @@ public class TestSerDeserPer {
}
@Test
- public void testSerDeserPerf() throws Exception{
+ public void testSerDeserPerf() throws Exception {
Kryo kryo = new Kryo();
String outputPath = FilenameUtils.concat(getTmpPath(), "file.bin");
Output output = new Output(new FileOutputStream(outputPath));
- for(int i=0; i<1000; i++){
+ for (int i = 0; i < 1000; i++) {
kryo.writeObject(output, constructPE());
}
output.close();
@@ -69,7 +69,7 @@ public class TestSerDeserPer {
Assert.assertTrue(someObject.getData().length == 1);
}
- private PartitionedEvent constructPE(){
+ private PartitionedEvent constructPE() {
StreamEvent e = new StreamEvent();
e.setStreamId("testStreamId");
e.setTimestamp(1463159382000L);
@@ -92,11 +92,11 @@ public class TestSerDeserPer {
}
@Test
- public void testSerDeserPerf2() throws Exception{
+ public void testSerDeserPerf2() throws Exception {
Kryo kryo = new Kryo();
String outputPath = FilenameUtils.concat(getTmpPath(), "file2.bin");
Output output = new Output(new FileOutputStream(outputPath));
- for(int i=0; i<1000; i++){
+ for (int i = 0; i < 1000; i++) {
kryo.writeObject(output, constructNewPE());
}
output.close();
@@ -106,7 +106,7 @@ public class TestSerDeserPer {
Assert.assertTrue(someObject.getData().length == 1);
}
- private NewPartitionedEvent constructNewPE(){
+ private NewPartitionedEvent constructNewPE() {
NewPartitionedEvent pe = new NewPartitionedEvent();
pe.setStreamId("testStreamId");
pe.setTimestamp(1463159382000L);
@@ -124,11 +124,11 @@ public class TestSerDeserPer {
}
@Test
- public void testSerDeserPerf3() throws Exception{
+ public void testSerDeserPerf3() throws Exception {
Kryo kryo = new Kryo();
String outputPath = FilenameUtils.concat(getTmpPath(), "file3.bin");
Output output = new Output(new FileOutputStream(outputPath));
- for(int i=0; i<1000; i++){
+ for (int i = 0; i < 1000; i++) {
kryo.writeObject(output, constructNewPE2());
}
output.close();
@@ -138,7 +138,7 @@ public class TestSerDeserPer {
Assert.assertTrue(someObject.getData().length == 1);
}
- private NewPartitionedEvent2 constructNewPE2(){
+ private NewPartitionedEvent2 constructNewPE2() {
NewPartitionedEvent2 pe = new NewPartitionedEvent2();
pe.setStreamId(100);
pe.setTimestamp(1463159382000L);
@@ -168,10 +168,10 @@ public class TestSerDeserPer {
private long partitionKey;
// sort spec
- private String windowPeriod="";
+ private String windowPeriod = "";
private long windowMargin = 30 * 1000;
- public NewPartitionedEvent(){
+ public NewPartitionedEvent() {
}
public String getStreamId() {
@@ -255,7 +255,7 @@ public class TestSerDeserPer {
private long windowPeriod;
private long windowMargin = 30 * 1000;
- public NewPartitionedEvent2(){
+ public NewPartitionedEvent2() {
}
public int getStreamId() {
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a0fc8708/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/router/TestAlertBolt.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/router/TestAlertBolt.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/router/TestAlertBolt.java
index f3548d8..4bec98d 100755
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/router/TestAlertBolt.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/router/TestAlertBolt.java
@@ -58,7 +58,7 @@ import static org.mockito.Mockito.when;
/**
* Since 5/2/16.
*/
-@SuppressWarnings({"rawtypes", "unused"})
+@SuppressWarnings( {"rawtypes", "unused"})
public class TestAlertBolt {
public static final String TEST_STREAM = "test-stream";
@@ -66,22 +66,20 @@ public class TestAlertBolt {
/**
* Following knowledge is guaranteed in
*
+ * @throws Exception Add test case: 2 alerts should be generated even if they are very close to each other in timestamp
* @see org.apache.eagle.alert.engine.runner.AlertBolt#execute{
- * if(!routedStreamEvent.getRoute().getTargetComponentId().equals(this.policyGroupEvaluator.getName())){
- * throw new IllegalStateException("Got event targeted to "+ routedStreamEvent.getRoute().getTargetComponentId()+" in "+this.policyGroupEvaluator.getName());
- * }
+ * if(!routedStreamEvent.getRoute().getTargetComponentId().equals(this.policyGroupEvaluator.getName())){
+ * throw new IllegalStateException("Got event targeted to "+ routedStreamEvent.getRoute().getTargetComponentId()+" in "+this.policyGroupEvaluator.getName());
+ * }
* }
- *
- * @throws Exception
- *
- * Add test case: 2 alerts should be generated even if they are very close to each other in timestamp
*/
@Test
- public void testAlertBolt() throws Exception{
+ public void testAlertBolt() throws Exception {
final AtomicInteger alertCount = new AtomicInteger();
final Semaphore mutex = new Semaphore(0);
- OutputCollector collector = new OutputCollector(new IOutputCollector(){
+ OutputCollector collector = new OutputCollector(new IOutputCollector() {
int count = 0;
+
@Override
public List<Integer> emit(String streamId, Collection<Tuple> anchors, List<Object> tuple) {
alertCount.incrementAndGet();
@@ -91,14 +89,22 @@ public class TestAlertBolt {
System.out.println(String.format("collector received: [streamId=[%s], tuple=[%s] ", streamId, tuple));
return null;
}
+
@Override
- public void emitDirect(int taskId, String streamId, Collection<Tuple> anchors, List<Object> tuple) { }
+ public void emitDirect(int taskId, String streamId, Collection<Tuple> anchors, List<Object> tuple) {
+ }
+
@Override
- public void ack(Tuple input) { }
+ public void ack(Tuple input) {
+ }
+
@Override
- public void fail(Tuple input) { }
+ public void fail(Tuple input) {
+ }
+
@Override
- public void reportError(Throwable error) { }
+ public void reportError(Throwable error) {
+ }
});
AlertBolt bolt = createAlertBolt(collector);
@@ -143,27 +149,27 @@ public class TestAlertBolt {
// construct event with "value1"
StreamEvent event1 = new StreamEvent();
- event1.setTimestamp(DateTimeUtil.humanDateToSeconds("2016-01-01 00:00:00")*1000);
+ event1.setTimestamp(DateTimeUtil.humanDateToSeconds("2016-01-01 00:00:00") * 1000);
event1.setMetaVersion("version1");
- Object[] data = new Object[]{"value1"};
+ Object[] data = new Object[] {"value1"};
event1.setData(data);
event1.setStreamId(streamId);
- PartitionedEvent partitionedEvent1 = new PartitionedEvent(event1, sp,1001);
+ PartitionedEvent partitionedEvent1 = new PartitionedEvent(event1, sp, 1001);
// construct another event with "value1"
StreamEvent event2 = new StreamEvent();
- event2.setTimestamp(DateTimeUtil.humanDateToSeconds("2016-01-01 00:00:00")*1000);
+ event2.setTimestamp(DateTimeUtil.humanDateToSeconds("2016-01-01 00:00:00") * 1000);
event2.setMetaVersion("version1");
- data = new Object[]{"value2"};
+ data = new Object[] {"value2"};
event2.setData(data);
event2.setStreamId(streamId);
- PartitionedEvent partitionedEvent2 = new PartitionedEvent(event2, sp,1001);
+ PartitionedEvent partitionedEvent2 = new PartitionedEvent(event2, sp, 1001);
Tuple input = new TupleImpl(context, Collections.singletonList(partitionedEvent1), taskId, "default");
Tuple input2 = new TupleImpl(context, Collections.singletonList(partitionedEvent2), taskId, "default");
bolt.execute(input);
bolt.execute(input2);
- Assert.assertTrue("Timeout to acquire mutex in 5s",mutex.tryAcquire(2, 5, TimeUnit.SECONDS));
+ Assert.assertTrue("Timeout to acquire mutex in 5s", mutex.tryAcquire(2, 5, TimeUnit.SECONDS));
Assert.assertEquals(2, alertCount.get());
bolt.cleanup();
}
@@ -183,8 +189,9 @@ public class TestAlertBolt {
@Test
public void testMetadataMismatch() throws Exception {
AtomicInteger failedCount = new AtomicInteger();
- OutputCollector collector = new OutputCollector(new IOutputCollector(){
+ OutputCollector collector = new OutputCollector(new IOutputCollector() {
int count = 0;
+
@Override
public List<Integer> emit(String streamId, Collection<Tuple> anchors, List<Object> tuple) {
Assert.assertEquals("testAlertStream", tuple.get(0));
@@ -192,14 +199,23 @@ public class TestAlertBolt {
System.out.println(String.format("collector received: [streamId=[%s], tuple=[%s] ", streamId, tuple));
return null;
}
+
@Override
- public void emitDirect(int taskId, String streamId, Collection<Tuple> anchors, List<Object> tuple) { }
+ public void emitDirect(int taskId, String streamId, Collection<Tuple> anchors, List<Object> tuple) {
+ }
+
@Override
- public void ack(Tuple input) { }
+ public void ack(Tuple input) {
+ }
+
@Override
- public void fail(Tuple input) { failedCount.incrementAndGet(); }
+ public void fail(Tuple input) {
+ failedCount.incrementAndGet();
+ }
+
@Override
- public void reportError(Throwable error) { }
+ public void reportError(Throwable error) {
+ }
});
AlertBolt bolt = createAlertBolt(collector);
@@ -267,8 +283,9 @@ public class TestAlertBolt {
@Test
public void testMetaversionConflict() throws Exception {
AtomicInteger failedCount = new AtomicInteger();
- OutputCollector collector = new OutputCollector(new IOutputCollector(){
+ OutputCollector collector = new OutputCollector(new IOutputCollector() {
int count = 0;
+
@Override
public List<Integer> emit(String streamId, Collection<Tuple> anchors, List<Object> tuple) {
Assert.assertEquals("testAlertStream", tuple.get(0));
@@ -276,14 +293,23 @@ public class TestAlertBolt {
System.out.println(String.format("collector received: [streamId=[%s], tuple=[%s] ", streamId, tuple));
return null;
}
+
@Override
- public void emitDirect(int taskId, String streamId, Collection<Tuple> anchors, List<Object> tuple) { }
+ public void emitDirect(int taskId, String streamId, Collection<Tuple> anchors, List<Object> tuple) {
+ }
+
@Override
- public void ack(Tuple input) { }
+ public void ack(Tuple input) {
+ }
+
@Override
- public void fail(Tuple input) { failedCount.incrementAndGet(); }
+ public void fail(Tuple input) {
+ failedCount.incrementAndGet();
+ }
+
@Override
- public void reportError(Throwable error) { }
+ public void reportError(Throwable error) {
+ }
});
AlertBolt bolt = createAlertBolt(collector);
@@ -374,21 +400,25 @@ public class TestAlertBolt {
}
@Override
- public void emitDirect(int taskId, String streamId, Collection<Tuple> anchors, List<Object> tuple) {}
+ public void emitDirect(int taskId, String streamId, Collection<Tuple> anchors, List<Object> tuple) {
+ }
@Override
- public void ack(Tuple input) {}
+ public void ack(Tuple input) {
+ }
@Override
- public void fail(Tuple input) {}
+ public void fail(Tuple input) {
+ }
@Override
- public void reportError(Throwable error) {}
+ public void reportError(Throwable error) {
+ }
});
AlertBolt bolt = createAlertBolt(collector);
boltSpecs.getBoltPoliciesMap().put(bolt.getBoltId(), Arrays.asList(def));
- boltSpecs.setVersion("spec_"+System.currentTimeMillis());
+ boltSpecs.setVersion("spec_" + System.currentTimeMillis());
// stream def map
Map<String, StreamDefinition> sds = new HashMap();
StreamDefinition sdTest = new StreamDefinition();
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a0fc8708/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/router/TestAlertPublisherBolt.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/router/TestAlertPublisherBolt.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/router/TestAlertPublisherBolt.java
index 1e52036..61a0aba 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/router/TestAlertPublisherBolt.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/router/TestAlertPublisherBolt.java
@@ -66,13 +66,13 @@ public class TestAlertPublisherBolt {
publisher.nextEvent(event1);
}
- private AlertStreamEvent create(String streamId){
+ private AlertStreamEvent create(String streamId) {
AlertStreamEvent alert = new AlertStreamEvent();
PolicyDefinition policy = new PolicyDefinition();
policy.setName("policy1");
alert.setPolicyId(policy.getName());
alert.setCreatedTime(System.currentTimeMillis());
- alert.setData(new Object[]{"field_1", 2, "field_3"});
+ alert.setData(new Object[] {"field_1", 2, "field_3"});
alert.setStreamId(streamId);
alert.setCreatedBy(this.toString());
return alert;
@@ -165,13 +165,13 @@ public class TestAlertPublisherBolt {
return l;
}
- private AlertStreamEvent createWithStreamDef(String hostname, String appName){
+ private AlertStreamEvent createWithStreamDef(String hostname, String appName) {
AlertStreamEvent alert = new AlertStreamEvent();
PolicyDefinition policy = new PolicyDefinition();
policy.setName("perfmon_cpu_host_check");
alert.setPolicyId(policy.getName());
alert.setCreatedTime(System.currentTimeMillis());
- alert.setData(new Object[]{appName, hostname});
+ alert.setData(new Object[] {appName, hostname});
alert.setStreamId("testAlertStream");
alert.setCreatedBy(this.toString());
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a0fc8708/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/runner/TestStreamRouterBolt.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/runner/TestStreamRouterBolt.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/runner/TestStreamRouterBolt.java
index 13550e1..830f1f7 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/runner/TestStreamRouterBolt.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/runner/TestStreamRouterBolt.java
@@ -16,19 +16,16 @@
*/
package org.apache.eagle.alert.engine.runner;
-import static org.mockito.Matchers.any;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
+import backtype.storm.metric.api.MultiCountMetric;
+import backtype.storm.task.GeneralTopologyContext;
+import backtype.storm.task.IOutputCollector;
+import backtype.storm.task.OutputCollector;
+import backtype.storm.task.TopologyContext;
+import backtype.storm.tuple.Fields;
+import backtype.storm.tuple.Tuple;
+import backtype.storm.tuple.TupleImpl;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
import org.apache.eagle.alert.coordination.model.PolicyWorkerQueue;
import org.apache.eagle.alert.coordination.model.RouterSpec;
import org.apache.eagle.alert.coordination.model.StreamRouterSpec;
@@ -40,7 +37,6 @@ import org.apache.eagle.alert.engine.coordinator.StreamSortSpec;
import org.apache.eagle.alert.engine.coordinator.impl.AbstractMetadataChangeNotifyService;
import org.apache.eagle.alert.engine.model.PartitionedEvent;
import org.apache.eagle.alert.engine.model.StreamEvent;
-import org.apache.eagle.alert.engine.router.impl.StreamRouterImpl;
import org.apache.eagle.alert.utils.DateTimeUtil;
import org.apache.eagle.alert.utils.StreamIdConversion;
import org.joda.time.Period;
@@ -49,43 +45,39 @@ import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import backtype.storm.metric.api.MultiCountMetric;
-import backtype.storm.task.GeneralTopologyContext;
-import backtype.storm.task.IOutputCollector;
-import backtype.storm.task.OutputCollector;
-import backtype.storm.task.TopologyContext;
-import backtype.storm.tuple.Fields;
-import backtype.storm.tuple.Tuple;
-import backtype.storm.tuple.TupleImpl;
+import java.io.IOException;
+import java.util.*;
-import com.typesafe.config.Config;
-import com.typesafe.config.ConfigFactory;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
public class TestStreamRouterBolt {
private final static Logger LOG = LoggerFactory.getLogger(TestStreamRouterBolt.class);
/**
* Mocked 5 Events
- *
+ * <p>
* 1. Sent in random order:
* "value1","value2","value3","value4","value5"
- *
+ * <p>
* 2. Received correct time order and value5 is thrown because too late: "value2","value1","value3","value4"
*
* @throws Exception
*/
@SuppressWarnings("rawtypes")
@Test
- public void testRouterWithSortAndRouteSpec() throws Exception{
+ public void testRouterWithSortAndRouteSpec() throws Exception {
Config config = ConfigFactory.load();
MockChangeService mockChangeService = new MockChangeService();
StreamRouterBolt routerBolt = new StreamRouterBolt("routerBolt1", config, mockChangeService);
- final Map<String,List<PartitionedEvent>> streamCollected = new HashMap<>();
+ final Map<String, List<PartitionedEvent>> streamCollected = new HashMap<>();
final List<PartitionedEvent> orderCollected = new ArrayList<>();
- OutputCollector collector = new OutputCollector(new IOutputCollector(){
+ OutputCollector collector = new OutputCollector(new IOutputCollector() {
int count = 0;
+
@Override
public List<Integer> emit(String streamId, Collection<Tuple> anchors, List<Object> tuple) {
PartitionedEvent event;
@@ -94,27 +86,37 @@ public class TestStreamRouterBolt {
} catch (IOException e) {
throw new RuntimeException(e);
}
- if(count == 0) {
+ if (count == 0) {
count++;
}
LOG.info(String.format("Collector received: [streamId=[%s], tuple=[%s] ", streamId, tuple));
- if(!streamCollected.containsKey(streamId)){
- streamCollected.put(streamId,new ArrayList<>());
+ if (!streamCollected.containsKey(streamId)) {
+ streamCollected.put(streamId, new ArrayList<>());
}
streamCollected.get(streamId).add(event);
orderCollected.add(event);
return null;
}
+
@Override
- public void emitDirect(int taskId, String streamId, Collection<Tuple> anchors, List<Object> tuple) { }
+ public void emitDirect(int taskId, String streamId, Collection<Tuple> anchors, List<Object> tuple) {
+ }
+
@Override
- public void ack(Tuple input) { }
+ public void ack(Tuple input) {
+ }
+
@Override
- public void fail(Tuple input) { }
+ public void fail(Tuple input) {
+ }
+
@SuppressWarnings("unused")
- public void resetTimeout(Tuple input) { }
+ public void resetTimeout(Tuple input) {
+ }
+
@Override
- public void reportError(Throwable error) { }
+ public void reportError(Throwable error) {
+ }
});
Map stormConf = new HashMap<>();
@@ -144,7 +146,7 @@ public class TestStreamRouterBolt {
routerSpec.setStreamId(streamId);
PolicyWorkerQueue queue = new PolicyWorkerQueue();
queue.setPartition(sp);
- queue.setWorkers(Arrays.asList(new WorkSlot("testTopology","alertBolt1"), new WorkSlot("testTopology","alertBolt2")));
+ queue.setWorkers(Arrays.asList(new WorkSlot("testTopology", "alertBolt1"), new WorkSlot("testTopology", "alertBolt2")));
routerSpec.setTargetQueue(Collections.singletonList(queue));
boltSpec.addRouterSpec(routerSpec);
boltSpec.setVersion("version1");
@@ -178,8 +180,8 @@ public class TestStreamRouterBolt {
// construct event with "value1"
StreamEvent event = new StreamEvent();
- event.setTimestamp(DateTimeUtil.humanDateToSeconds("2016-01-01 00:01:30")*1000);
- Object[] data = new Object[]{"value1"};
+ event.setTimestamp(DateTimeUtil.humanDateToSeconds("2016-01-01 00:01:30") * 1000);
+ Object[] data = new Object[] {"value1"};
event.setData(data);
event.setStreamId(streamId);
PartitionedEvent pEvent = new PartitionedEvent();
@@ -190,8 +192,8 @@ public class TestStreamRouterBolt {
// construct another event with "value2"
event = new StreamEvent();
- event.setTimestamp(DateTimeUtil.humanDateToSeconds("2016-01-01 00:01:10")*1000);
- data = new Object[]{"value2"};
+ event.setTimestamp(DateTimeUtil.humanDateToSeconds("2016-01-01 00:01:10") * 1000);
+ data = new Object[] {"value2"};
event.setData(data);
event.setStreamId(streamId);
pEvent = new PartitionedEvent();
@@ -202,8 +204,8 @@ public class TestStreamRouterBolt {
// construct another event with "value3"
event = new StreamEvent();
- event.setTimestamp(DateTimeUtil.humanDateToSeconds("2016-01-01 00:01:40")*1000);
- data = new Object[]{"value3"};
+ event.setTimestamp(DateTimeUtil.humanDateToSeconds("2016-01-01 00:01:40") * 1000);
+ data = new Object[] {"value3"};
event.setData(data);
event.setStreamId(streamId);
pEvent = new PartitionedEvent();
@@ -214,8 +216,8 @@ public class TestStreamRouterBolt {
// construct another event with "value4"
event = new StreamEvent();
- event.setTimestamp(DateTimeUtil.humanDateToSeconds("2016-01-01 00:02:10")*1000);
- data = new Object[]{"value4"};
+ event.setTimestamp(DateTimeUtil.humanDateToSeconds("2016-01-01 00:02:10") * 1000);
+ data = new Object[] {"value4"};
event.setData(data);
event.setStreamId(streamId);
pEvent = new PartitionedEvent();
@@ -226,8 +228,8 @@ public class TestStreamRouterBolt {
// construct another event with "value5", which will be thrown because two late
event = new StreamEvent();
- event.setTimestamp(DateTimeUtil.humanDateToSeconds("2016-01-01 00:00:10")*1000);
- data = new Object[]{"value5"};
+ event.setTimestamp(DateTimeUtil.humanDateToSeconds("2016-01-01 00:00:10") * 1000);
+ data = new Object[] {"value5"};
event.setData(data);
event.setStreamId(streamId);
pEvent = new PartitionedEvent();
@@ -236,14 +238,14 @@ public class TestStreamRouterBolt {
input = new TupleImpl(context, Collections.singletonList(pEvent), taskId, "default");
routerBolt.execute(input);
- Assert.assertEquals("Should finally collect two streams",2,streamCollected.size());
- Assert.assertTrue("Should collect stream stream_routerBolt_to_alertBolt1",streamCollected.keySet().contains(
- String.format(StreamIdConversion.generateStreamIdBetween(routerBolt.getBoltId(), "alertBolt1"))));
- Assert.assertTrue("Should collect stream stream_routerBolt_to_alertBolt2",streamCollected.keySet().contains(
- String.format(StreamIdConversion.generateStreamIdBetween(routerBolt.getBoltId(), "alertBolt2"))));
+ Assert.assertEquals("Should finally collect two streams", 2, streamCollected.size());
+ Assert.assertTrue("Should collect stream stream_routerBolt_to_alertBolt1", streamCollected.keySet().contains(
+ String.format(StreamIdConversion.generateStreamIdBetween(routerBolt.getBoltId(), "alertBolt1"))));
+ Assert.assertTrue("Should collect stream stream_routerBolt_to_alertBolt2", streamCollected.keySet().contains(
+ String.format(StreamIdConversion.generateStreamIdBetween(routerBolt.getBoltId(), "alertBolt2"))));
- Assert.assertEquals("Should finally collect 3 events",3,orderCollected.size());
- Assert.assertArrayEquals("Should sort 3 events in ASC order",new String[]{"value2","value1","value3"},orderCollected.stream().map((d)->d.getData()[0]).toArray());
+ Assert.assertEquals("Should finally collect 3 events", 3, orderCollected.size());
+ Assert.assertArrayEquals("Should sort 3 events in ASC order", new String[] {"value2", "value1", "value3"}, orderCollected.stream().map((d) -> d.getData()[0]).toArray());
// The first 3 events are ticked automatically by window
@@ -252,14 +254,14 @@ public class TestStreamRouterBolt {
// Close will flush all events in memory, so will receive the last event which is still in memory as window is not expired according to clock
// The 5th event will be thrown because too late and out of margin
- Assert.assertEquals("Should finally collect two streams",2,streamCollected.size());
- Assert.assertEquals("Should finally collect 3 events",4,orderCollected.size());
- Assert.assertArrayEquals("Should sort 4 events in ASC-ordered timestamp",new String[]{"value2","value1","value3","value4"},orderCollected.stream().map((d)->d.getData()[0]).toArray());
+ Assert.assertEquals("Should finally collect two streams", 2, streamCollected.size());
+ Assert.assertEquals("Should finally collect 3 events", 4, orderCollected.size());
+ Assert.assertArrayEquals("Should sort 4 events in ASC-ordered timestamp", new String[] {"value2", "value1", "value3", "value4"}, orderCollected.stream().map((d) -> d.getData()[0]).toArray());
}
@SuppressWarnings("serial")
- public static class MockChangeService extends AbstractMetadataChangeNotifyService{
+ public static class MockChangeService extends AbstractMetadataChangeNotifyService {
private final static Logger LOG = LoggerFactory.getLogger(MockChangeService.class);
@Override
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a0fc8708/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/serialization/JavaSerializationTest.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/serialization/JavaSerializationTest.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/serialization/JavaSerializationTest.java
index 13d1015..a3939cc 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/serialization/JavaSerializationTest.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/serialization/JavaSerializationTest.java
@@ -1,10 +1,5 @@
package org.apache.eagle.alert.engine.serialization;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.List;
-
import org.apache.commons.lang3.SerializationUtils;
import org.apache.eagle.alert.engine.coordinator.StreamColumn;
import org.apache.eagle.alert.engine.coordinator.StreamDefinition;
@@ -17,6 +12,11 @@ import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
@@ -37,18 +37,18 @@ public class JavaSerializationTest {
private final static Logger LOG = LoggerFactory.getLogger(JavaSerializationTest.class);
@Test
- public void testJavaSerialization(){
+ public void testJavaSerialization() {
PartitionedEvent partitionedEvent = new PartitionedEvent();
partitionedEvent.setPartitionKey(partitionedEvent.hashCode());
- partitionedEvent.setPartition(createSampleStreamGroupbyPartition("sampleStream",Arrays.asList("name","host")));
+ partitionedEvent.setPartition(createSampleStreamGroupbyPartition("sampleStream", Arrays.asList("name", "host")));
StreamEvent event = new StreamEvent();
event.setStreamId("sampleStream");
event.setTimestamp(System.currentTimeMillis());
- event.setData(new Object[]{"CPU","LOCALHOST",true,Long.MAX_VALUE,60.0});
+ event.setData(new Object[] {"CPU", "LOCALHOST", true, Long.MAX_VALUE, 60.0});
partitionedEvent.setEvent(event);
int javaSerializationLength = SerializationUtils.serialize(partitionedEvent).length;
- LOG.info("Java serialization length: {}, event: {}",javaSerializationLength,partitionedEvent);
+ LOG.info("Java serialization length: {}, event: {}", javaSerializationLength, partitionedEvent);
int compactLength = 0;
compactLength += "sampleStream".getBytes().length;
@@ -60,17 +60,17 @@ public class JavaSerializationTest {
compactLength += ByteUtils.longToBytes(Long.MAX_VALUE).length;
compactLength += ByteUtils.doubleToBytes(60.0).length;
- LOG.info("Compact serialization length: {}, event: {}",compactLength,partitionedEvent);
+ LOG.info("Compact serialization length: {}, event: {}", compactLength, partitionedEvent);
Assert.assertTrue(compactLength * 20 < javaSerializationLength);
}
- public static StreamDefinition createSampleStreamDefinition(String streamId){
+ public static StreamDefinition createSampleStreamDefinition(String streamId) {
StreamDefinition sampleStreamDefinition = new StreamDefinition();
sampleStreamDefinition.setStreamId(streamId);
sampleStreamDefinition.setTimeseries(true);
sampleStreamDefinition.setValidate(true);
- sampleStreamDefinition.setDescription("Schema for "+streamId);
+ sampleStreamDefinition.setDescription("Schema for " + streamId);
List<StreamColumn> streamColumns = new ArrayList<>();
streamColumns.add(new StreamColumn.Builder().name("name").type(StreamColumn.Type.STRING).build());
@@ -82,7 +82,7 @@ public class JavaSerializationTest {
return sampleStreamDefinition;
}
- public static StreamPartition createSampleStreamGroupbyPartition(String streamId, List<String> groupByField){
+ public static StreamPartition createSampleStreamGroupbyPartition(String streamId, List<String> groupByField) {
StreamPartition streamPartition = new StreamPartition();
streamPartition.setStreamId(streamId);
streamPartition.setColumns(groupByField);
@@ -91,22 +91,22 @@ public class JavaSerializationTest {
}
@SuppressWarnings("serial")
- public static PartitionedEvent createSimpleStreamEvent() {
- StreamEvent event = StreamEvent.Builder()
- .schema(createSampleStreamDefinition("sampleStream_1"))
- .streamId("sampleStream_1")
- .timestamep(System.currentTimeMillis())
- .attributes(new HashMap<String,Object>(){{
- put("name","cpu");
- put("host","localhost");
- put("flag",true);
- put("value",60.0);
- put("data",Long.MAX_VALUE);
- put("unknown","unknown column value");
- }}).build();
+ public static PartitionedEvent createSimpleStreamEvent() {
+ StreamEvent event = StreamEvent.builder()
+ .schema(createSampleStreamDefinition("sampleStream_1"))
+ .streamId("sampleStream_1")
+ .timestamep(System.currentTimeMillis())
+ .attributes(new HashMap<String, Object>() {{
+ put("name", "cpu");
+ put("host", "localhost");
+ put("flag", true);
+ put("value", 60.0);
+ put("data", Long.MAX_VALUE);
+ put("unknown", "unknown column value");
+ }}).build();
PartitionedEvent pEvent = new PartitionedEvent();
pEvent.setEvent(event);
- pEvent.setPartition(createSampleStreamGroupbyPartition("sampleStream_1", Arrays.asList("name","host")));
+ pEvent.setPartition(createSampleStreamGroupbyPartition("sampleStream_1", Arrays.asList("name", "host")));
return pEvent;
}
}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a0fc8708/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/serialization/PartitionedEventSerializerTest.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/serialization/PartitionedEventSerializerTest.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/serialization/PartitionedEventSerializerTest.java
index a756ebe..4241c3c 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/serialization/PartitionedEventSerializerTest.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/serialization/PartitionedEventSerializerTest.java
@@ -16,9 +16,14 @@
*/
package org.apache.eagle.alert.engine.serialization;
-import java.io.IOException;
-import java.util.BitSet;
-
+import backtype.storm.serialization.DefaultKryoFactory;
+import backtype.storm.serialization.DefaultSerializationDelegate;
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.Output;
+import com.google.common.io.ByteArrayDataInput;
+import com.google.common.io.ByteArrayDataOutput;
+import com.google.common.io.ByteStreams;
import org.apache.commons.lang.time.StopWatch;
import org.apache.eagle.alert.engine.mock.MockSampleMetadataFactory;
import org.apache.eagle.alert.engine.model.PartitionedEvent;
@@ -30,66 +35,63 @@ import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import backtype.storm.serialization.DefaultKryoFactory;
-import backtype.storm.serialization.DefaultSerializationDelegate;
-
-import com.esotericsoftware.kryo.Kryo;
-import com.esotericsoftware.kryo.io.Input;
-import com.esotericsoftware.kryo.io.Output;
-import com.google.common.io.ByteArrayDataInput;
-import com.google.common.io.ByteArrayDataOutput;
-import com.google.common.io.ByteStreams;
+import java.io.IOException;
+import java.util.BitSet;
public class PartitionedEventSerializerTest {
private final static Logger LOG = LoggerFactory.getLogger(PartitionedEventSerializerTest.class);
+
@SuppressWarnings("deprecation")
@Test
public void testPartitionEventSerialization() throws IOException {
- PartitionedEvent partitionedEvent = MockSampleMetadataFactory.createPartitionedEventGroupedByName("sampleStream",System.currentTimeMillis());;
+ PartitionedEvent partitionedEvent = MockSampleMetadataFactory.createPartitionedEventGroupedByName("sampleStream", System.currentTimeMillis());
+ ;
PartitionedEventSerializerImpl serializer = new PartitionedEventSerializerImpl(MockSampleMetadataFactory::createSampleStreamDefinition);
ByteArrayDataOutput dataOutput1 = ByteStreams.newDataOutput();
- serializer.serialize(partitionedEvent,dataOutput1);
+ serializer.serialize(partitionedEvent, dataOutput1);
byte[] serializedBytes = dataOutput1.toByteArray();
PartitionedEvent deserializedEvent = serializer.deserialize(ByteStreams.newDataInput(serializedBytes));
- Assert.assertEquals(partitionedEvent,deserializedEvent);
+ Assert.assertEquals(partitionedEvent, deserializedEvent);
- PartitionedEventSerializerImpl compressSerializer = new PartitionedEventSerializerImpl(MockSampleMetadataFactory::createSampleStreamDefinition,true);
+ PartitionedEventSerializerImpl compressSerializer = new PartitionedEventSerializerImpl(MockSampleMetadataFactory::createSampleStreamDefinition, true);
byte[] serializedBytesCompressed = compressSerializer.serialize(partitionedEvent);
PartitionedEvent deserializedEventCompressed = compressSerializer.deserialize(serializedBytesCompressed);
- Assert.assertEquals(partitionedEvent,deserializedEventCompressed);
+ Assert.assertEquals(partitionedEvent, deserializedEventCompressed);
PartitionedEventDigestSerializer serializer2 = new PartitionedEventDigestSerializer(MockSampleMetadataFactory::createSampleStreamDefinition);
ByteArrayDataOutput dataOutput2 = ByteStreams.newDataOutput();
- serializer2.serialize(partitionedEvent,dataOutput2);
+ serializer2.serialize(partitionedEvent, dataOutput2);
byte[] serializedBytes2 = dataOutput2.toByteArray();
ByteArrayDataInput dataInput2 = ByteStreams.newDataInput(serializedBytes2);
PartitionedEvent deserializedEvent2 = serializer2.deserialize(dataInput2);
- Assert.assertEquals(partitionedEvent,deserializedEvent2);
+ Assert.assertEquals(partitionedEvent, deserializedEvent2);
byte[] javaSerialization = new DefaultSerializationDelegate().serialize(partitionedEvent);
Kryo kryo = new DefaultKryoFactory.KryoSerializableDefault();
Output output = new Output(10000);
- kryo.writeClassAndObject(output,partitionedEvent);
+ kryo.writeClassAndObject(output, partitionedEvent);
byte[] kryoBytes = output.toBytes();
Input input = new Input(kryoBytes);
PartitionedEvent kryoDeserializedEvent = (PartitionedEvent) kryo.readClassAndObject(input);
- Assert.assertEquals(partitionedEvent,kryoDeserializedEvent);
- LOG.info("\nCached Stream:{}\nCompressed Cached Stream :{}\nCached Stream + Cached Partition: {}\nJava Native: {}\nKryo: {}\nKryo + Cached Stream: {}\nKryo + Cached Stream + Cached Partition: {}",serializedBytes.length,serializedBytesCompressed.length,serializedBytes2.length,javaSerialization.length,kryoBytes.length,kryoSerialize(serializedBytes).length,kryoSerialize(serializedBytes2).length);
+ Assert.assertEquals(partitionedEvent, kryoDeserializedEvent);
+ LOG.info("\nCached Stream:{}\nCompressed Cached Stream :{}\nCached Stream + Cached Partition: {}\nJava Native: {}\nKryo: {}\nKryo + Cached Stream: {}\nKryo + Cached Stream + Cached Partition: {}", serializedBytes.length, serializedBytesCompressed.length, serializedBytes2.length, javaSerialization.length, kryoBytes.length, kryoSerialize(serializedBytes).length, kryoSerialize(serializedBytes2).length);
}
+
@SuppressWarnings("deprecation")
@Test
public void testPartitionEventSerializationEfficiency() throws IOException {
- PartitionedEvent partitionedEvent = MockSampleMetadataFactory.createPartitionedEventGroupedByName("sampleStream",System.currentTimeMillis());;
+ PartitionedEvent partitionedEvent = MockSampleMetadataFactory.createPartitionedEventGroupedByName("sampleStream", System.currentTimeMillis());
+ ;
PartitionedEventSerializerImpl serializer = new PartitionedEventSerializerImpl(MockSampleMetadataFactory::createSampleStreamDefinition);
int count = 100000;
StopWatch stopWatch = new StopWatch();
stopWatch.start();
int i = 0;
- while(i<count) {
+ while (i < count) {
ByteArrayDataOutput dataOutput1 = ByteStreams.newDataOutput();
serializer.serialize(partitionedEvent, dataOutput1);
byte[] serializedBytes = dataOutput1.toByteArray();
@@ -98,24 +100,24 @@ public class PartitionedEventSerializerTest {
i++;
}
stopWatch.stop();
- LOG.info("Cached Stream: {} ms",stopWatch.getTime());
+ LOG.info("Cached Stream: {} ms", stopWatch.getTime());
stopWatch.reset();
- PartitionedEventSerializerImpl compressSerializer = new PartitionedEventSerializerImpl(MockSampleMetadataFactory::createSampleStreamDefinition,true);
+ PartitionedEventSerializerImpl compressSerializer = new PartitionedEventSerializerImpl(MockSampleMetadataFactory::createSampleStreamDefinition, true);
i = 0;
stopWatch.start();
- while(i<count) {
+ while (i < count) {
byte[] serializedBytesCompressed = compressSerializer.serialize(partitionedEvent);
PartitionedEvent deserializedEventCompressed = compressSerializer.deserialize(serializedBytesCompressed);
Assert.assertEquals(partitionedEvent, deserializedEventCompressed);
i++;
}
stopWatch.stop();
- LOG.info("Compressed Cached Stream: {} ms",stopWatch.getTime());
+ LOG.info("Compressed Cached Stream: {} ms", stopWatch.getTime());
stopWatch.reset();
i = 0;
stopWatch.start();
- while(i<count) {
+ while (i < count) {
PartitionedEventDigestSerializer serializer2 = new PartitionedEventDigestSerializer(MockSampleMetadataFactory::createSampleStreamDefinition);
ByteArrayDataOutput dataOutput2 = ByteStreams.newDataOutput();
serializer2.serialize(partitionedEvent, dataOutput2);
@@ -126,23 +128,23 @@ public class PartitionedEventSerializerTest {
i++;
}
stopWatch.stop();
- LOG.info("Cached Stream&Partition: {} ms",stopWatch.getTime());
+ LOG.info("Cached Stream&Partition: {} ms", stopWatch.getTime());
stopWatch.reset();
i = 0;
stopWatch.start();
- while(i<count) {
+ while (i < count) {
byte[] javaSerialization = new DefaultSerializationDelegate().serialize(partitionedEvent);
PartitionedEvent javaSerializedEvent = (PartitionedEvent) new DefaultSerializationDelegate().deserialize(javaSerialization);
Assert.assertEquals(partitionedEvent, javaSerializedEvent);
i++;
}
stopWatch.stop();
- LOG.info("Java Native: {} ms",stopWatch.getTime());
+ LOG.info("Java Native: {} ms", stopWatch.getTime());
stopWatch.reset();
i = 0;
stopWatch.start();
Kryo kryo = new DefaultKryoFactory.KryoSerializableDefault();
- while(i<count) {
+ while (i < count) {
Output output = new Output(10000);
kryo.writeClassAndObject(output, partitionedEvent);
byte[] kryoBytes = output.toBytes();
@@ -152,73 +154,73 @@ public class PartitionedEventSerializerTest {
i++;
}
stopWatch.stop();
- LOG.info("Kryo: {} ms",stopWatch.getTime());
+ LOG.info("Kryo: {} ms", stopWatch.getTime());
}
/**
* Kryo Serialization Length = Length of byte[] + 2
*/
@Test
- public void testKryoByteArraySerialization(){
+ public void testKryoByteArraySerialization() {
Kryo kryo = new DefaultKryoFactory.KryoSerializableDefault();
- byte[] bytes = new byte[]{0,1,2,3,4,5,6,7,8,9};
+ byte[] bytes = new byte[] {0, 1, 2, 3, 4, 5, 6, 7, 8, 9};
Output output = new Output(1000);
- kryo.writeObject(output,bytes);
- Assert.assertEquals(bytes.length + 2,output.toBytes().length);
+ kryo.writeObject(output, bytes);
+ Assert.assertEquals(bytes.length + 2, output.toBytes().length);
}
- private byte[] kryoSerialize(Object object){
+ private byte[] kryoSerialize(Object object) {
Kryo kryo = new DefaultKryoFactory.KryoSerializableDefault();
Output output = new Output(100000);
- kryo.writeClassAndObject(output,object);
+ kryo.writeClassAndObject(output, object);
return output.toBytes();
}
@Test
- public void testBitSet(){
+ public void testBitSet() {
BitSet bitSet = new BitSet();
- bitSet.set(0,true); // 1
- bitSet.set(1,false); // 0
- bitSet.set(2,true); // 1
- LOG.info("Bit Set Size: {}",bitSet.size());
- LOG.info("Bit Set Byte[]: {}",bitSet.toByteArray());
- LOG.info("Bit Set Byte[]: {}",bitSet.toLongArray());
- LOG.info("BitSet[0]: {}",bitSet.get(0));
- LOG.info("BitSet[1]: {}",bitSet.get(1));
- LOG.info("BitSet[1]: {}",bitSet.get(2));
+ bitSet.set(0, true); // 1
+ bitSet.set(1, false); // 0
+ bitSet.set(2, true); // 1
+ LOG.info("Bit Set Size: {}", bitSet.size());
+ LOG.info("Bit Set Byte[]: {}", bitSet.toByteArray());
+ LOG.info("Bit Set Byte[]: {}", bitSet.toLongArray());
+ LOG.info("BitSet[0]: {}", bitSet.get(0));
+ LOG.info("BitSet[1]: {}", bitSet.get(1));
+ LOG.info("BitSet[1]: {}", bitSet.get(2));
byte[] bytes = bitSet.toByteArray();
BitSet bitSet2 = BitSet.valueOf(bytes);
- LOG.info("Bit Set Size: {}",bitSet2.size());
- LOG.info("Bit Set Byte[]: {}",bitSet2.toByteArray());
- LOG.info("Bit Set Byte[]: {}",bitSet2.toLongArray());
- LOG.info("BitSet[0]: {}",bitSet2.get(0));
- LOG.info("BitSet[1]: {}",bitSet2.get(1));
- LOG.info("BitSet[1]: {}",bitSet2.get(2));
+ LOG.info("Bit Set Size: {}", bitSet2.size());
+ LOG.info("Bit Set Byte[]: {}", bitSet2.toByteArray());
+ LOG.info("Bit Set Byte[]: {}", bitSet2.toLongArray());
+ LOG.info("BitSet[0]: {}", bitSet2.get(0));
+ LOG.info("BitSet[1]: {}", bitSet2.get(1));
+ LOG.info("BitSet[1]: {}", bitSet2.get(2));
BitSet bitSet3 = new BitSet();
- bitSet3.set(0,true);
- Assert.assertEquals(1,bitSet3.length());
+ bitSet3.set(0, true);
+ Assert.assertEquals(1, bitSet3.length());
BitSet bitSet4 = new BitSet();
- bitSet4.set(0,false);
- Assert.assertEquals(0,bitSet4.length());
+ bitSet4.set(0, false);
+ Assert.assertEquals(0, bitSet4.length());
Assert.assertFalse(bitSet4.get(1));
Assert.assertFalse(bitSet4.get(2));
}
@Test
- public void testPeriod(){
- Assert.assertEquals(30*60*1000, TimePeriodUtils.getMillisecondsOfPeriod(Period.parse("PT30m")));
- Assert.assertEquals(30*60*1000, TimePeriodUtils.getMillisecondsOfPeriod(Period.millis(30*60*1000)));
- Assert.assertEquals("PT1800S", Period.millis(30*60*1000).toString());
+ public void testPeriod() {
+ Assert.assertEquals(30 * 60 * 1000, TimePeriodUtils.getMillisecondsOfPeriod(Period.parse("PT30m")));
+ Assert.assertEquals(30 * 60 * 1000, TimePeriodUtils.getMillisecondsOfPeriod(Period.millis(30 * 60 * 1000)));
+ Assert.assertEquals("PT1800S", Period.millis(30 * 60 * 1000).toString());
}
@Test
- public void testPartitionType(){
+ public void testPartitionType() {
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a0fc8708/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/siddhi/SiddhiPolicyTest.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/siddhi/SiddhiPolicyTest.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/siddhi/SiddhiPolicyTest.java
index 3d373b6..9520b62 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/siddhi/SiddhiPolicyTest.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/siddhi/SiddhiPolicyTest.java
@@ -32,32 +32,31 @@ import java.util.concurrent.atomic.AtomicBoolean;
/**
* @since Jun 21, 2016
- *
*/
public class SiddhiPolicyTest {
-
+
private static final Logger LOG = LoggerFactory.getLogger(SiddhiPolicyTest.class);
private String streams = " define stream syslog_stream("
- + "dims_facility string, "
- + "dims_severity string, "
- + "dims_hostname string, "
- + "dims_msgid string, "
- + "timestamp string, "
- + "conn string, "
- + "op string, "
- + "msgId string, "
- + "command string, "
- + "name string, "
- + "namespace string, "
- + "epochMillis long); ";
+ + "dims_facility string, "
+ + "dims_severity string, "
+ + "dims_hostname string, "
+ + "dims_msgid string, "
+ + "timestamp string, "
+ + "conn string, "
+ + "op string, "
+ + "msgId string, "
+ + "command string, "
+ + "name string, "
+ + "namespace string, "
+ + "epochMillis long); ";
private SiddhiManager sm;
-
+
@Before
public void setup() {
sm = new SiddhiManager();
}
-
+
@After
public void shutdown() {
sm.shutdown();
@@ -70,7 +69,9 @@ public class SiddhiPolicyTest {
@Override
public void receive(Event[] arg0) {
- };
+ }
+
+ ;
};
String executionPlan = streams + ql;
@@ -83,13 +84,13 @@ public class SiddhiPolicyTest {
@Test
public void testPolicy_agg() throws Exception {
String sql = " from syslog_stream#window.time(1min) select "
- + "name, "
- + "namespace, "
- + "timestamp, "
- + "dims_hostname, "
- + "count(*) as abortCount "
- + "group by dims_hostname "
- + "having abortCount > 3 insert into syslog_severity_check_output; ";
+ + "name, "
+ + "namespace, "
+ + "timestamp, "
+ + "dims_hostname, "
+ + "count(*) as abortCount "
+ + "group by dims_hostname "
+ + "having abortCount > 3 insert into syslog_severity_check_output; ";
final AtomicBoolean checked = new AtomicBoolean(false);
StreamCallback sc = new StreamCallback() {
@@ -107,7 +108,9 @@ public class SiddhiPolicyTest {
Assert.assertTrue(hosts.contains("HOSTNAME-" + 1));
Assert.assertTrue(hosts.contains("HOSTNAME-" + 2));
Assert.assertFalse(hosts.contains("HOSTNAME-" + 3));
- };
+ }
+
+ ;
};
String executionPlan = streams + sql;
@@ -124,7 +127,7 @@ public class SiddhiPolicyTest {
runtime.shutdown();
}
-
+
/*
+ "dims_facility string, "
+ "dims_severity string, "
@@ -145,8 +148,8 @@ public class SiddhiPolicyTest {
for (int i = 0; i < length; i++) {
Event e = new Event(12);
e.setTimestamp(System.currentTimeMillis());
- e.setData(new Object[] {"facitliy", "SEVERITY_EMERG", "HOSTNAME-" + i%4 , "MSGID-...", "Timestamp", "conn-sss", "op-msg-Abort", "msgId..", "command-...", "name-", "namespace", System.currentTimeMillis()});
-
+ e.setData(new Object[] {"facitliy", "SEVERITY_EMERG", "HOSTNAME-" + i % 4, "MSGID-...", "Timestamp", "conn-sss", "op-msg-Abort", "msgId..", "command-...", "name-", "namespace", System.currentTimeMillis()});
+
events[i] = e;
}
@@ -156,7 +159,7 @@ public class SiddhiPolicyTest {
Event e = new Event(12);
e.setTimestamp(System.currentTimeMillis());
- e.setData(new Object[] {"facitliy", "SEVERITY_EMERG", "HOSTNAME-" + 11 , "MSGID-...", "Timestamp", "conn-sss", "op-msg", "msgId..", "command-...", "name-", "namespace", System.currentTimeMillis()});
+ e.setData(new Object[] {"facitliy", "SEVERITY_EMERG", "HOSTNAME-" + 11, "MSGID-...", "Timestamp", "conn-sss", "op-msg", "msgId..", "command-...", "name-", "namespace", System.currentTimeMillis()});
handler.send(e);
}
@@ -164,28 +167,30 @@ public class SiddhiPolicyTest {
@Test
public void testPolicy_regex() throws Exception {
String sql = " from syslog_stream[regex:find(\"Abort\", op)]#window.time(1min) select timestamp, dims_hostname, count(*) as abortCount group by dims_hostname insert into syslog_severity_check_output; ";
-
+
AtomicBoolean checked = new AtomicBoolean();
StreamCallback sc = new StreamCallback() {
@Override
public void receive(Event[] arg0) {
checked.set(true);
- };
+ }
+
+ ;
};
String executionPlan = streams + sql;
ExecutionPlanRuntime runtime = sm.createExecutionPlanRuntime(executionPlan);
- runtime.addCallback("syslog_severity_check_output", sc);
+ runtime.addCallback("syslog_severity_check_output", sc);
runtime.start();
-
+
InputHandler handler = runtime.getInputHandler("syslog_stream");
-
+
sendInput(handler);
-
+
Thread.sleep(1000);
-
+
Assert.assertTrue(checked.get());
-
+
runtime.shutdown();
}
@@ -193,17 +198,19 @@ public class SiddhiPolicyTest {
@Test
public void testPolicy_seq() throws Exception {
String sql = ""
- + " from every e1=syslog_stream[regex:find(\"UPDOWN\", op)] -> "
- + " e2=syslog_stream[dims_hostname == e1.dims_hostname and regex:find(\"Abort\", op)] within 1 min "
- + " select e1.timestamp as timestamp, e1.op as a_op, e2.op as b_op "
- + " insert into syslog_severity_check_output; ";
+ + " from every e1=syslog_stream[regex:find(\"UPDOWN\", op)] -> "
+ + " e2=syslog_stream[dims_hostname == e1.dims_hostname and regex:find(\"Abort\", op)] within 1 min "
+ + " select e1.timestamp as timestamp, e1.op as a_op, e2.op as b_op "
+ + " insert into syslog_severity_check_output; ";
AtomicBoolean checked = new AtomicBoolean();
StreamCallback sc = new StreamCallback() {
@Override
public void receive(Event[] arg0) {
checked.set(true);
- };
+ }
+
+ ;
};
String executionPlan = streams + sql;
@@ -224,21 +231,21 @@ public class SiddhiPolicyTest {
// validate one
Event e = new Event(12);
e.setTimestamp(System.currentTimeMillis());
- e.setData(new Object[] {"facitliy", "SEVERITY_EMERG", "HOSTNAME-" + 0 , "MSGID-...", "Timestamp", "conn-sss", "op-msg-UPDOWN", "msgId..", "command-...", "name-", "namespace", System.currentTimeMillis()});
-
+ e.setData(new Object[] {"facitliy", "SEVERITY_EMERG", "HOSTNAME-" + 0, "MSGID-...", "Timestamp", "conn-sss", "op-msg-UPDOWN", "msgId..", "command-...", "name-", "namespace", System.currentTimeMillis()});
+
e = new Event(12);
e.setTimestamp(System.currentTimeMillis());
- e.setData(new Object[] {"facitliy", "SEVERITY_EMERG", "HOSTNAME-" + 0 , "MSGID-...", "Timestamp", "conn-sss", "op-msg-nothing", "msgId..", "command-...", "name-", "namespace", System.currentTimeMillis()});
-
+ e.setData(new Object[] {"facitliy", "SEVERITY_EMERG", "HOSTNAME-" + 0, "MSGID-...", "Timestamp", "conn-sss", "op-msg-nothing", "msgId..", "command-...", "name-", "namespace", System.currentTimeMillis()});
+
e = new Event(12);
e.setTimestamp(System.currentTimeMillis());
- e.setData(new Object[] {"facitliy", "SEVERITY_EMERG", "HOSTNAME-" + 0 , "MSGID-...", "Timestamp", "conn-sss", "op-msg-Abort", "msgId..", "command-...", "name-", "namespace", System.currentTimeMillis()});
+ e.setData(new Object[] {"facitliy", "SEVERITY_EMERG", "HOSTNAME-" + 0, "MSGID-...", "Timestamp", "conn-sss", "op-msg-Abort", "msgId..", "command-...", "name-", "namespace", System.currentTimeMillis()});
Thread.sleep(61 * 1000);
e = new Event(12);
e.setTimestamp(System.currentTimeMillis());
- e.setData(new Object[] {"facitliy", "SEVERITY_EMERG", "HOSTNAME-" + 11 , "MSGID-...", "Timestamp", "conn-sss", "op-msg", "msgId..", "command-...", "name-", "namespace", System.currentTimeMillis()});
+ e.setData(new Object[] {"facitliy", "SEVERITY_EMERG", "HOSTNAME-" + 11, "MSGID-...", "Timestamp", "conn-sss", "op-msg", "msgId..", "command-...", "name-", "namespace", System.currentTimeMillis()});
handler.send(e);
}
@@ -246,7 +253,7 @@ public class SiddhiPolicyTest {
@Test
public void testStrConcat() throws Exception {
String ql = " define stream log(timestamp long, switchLabel string, port string, message string); " +
- " from log select timestamp, str:concat(switchLabel, '===', port) as alertKey, message insert into output; ";
+ " from log select timestamp, str:concat(switchLabel, '===', port) as alertKey, message insert into output; ";
SiddhiManager manager = new SiddhiManager();
ExecutionPlanRuntime runtime = manager.createExecutionPlanRuntime(ql);
runtime.addCallback("output", new StreamCallback() {
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a0fc8708/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/siddhi/extension/AttributeCollectAggregatorTest.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/siddhi/extension/AttributeCollectAggregatorTest.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/siddhi/extension/AttributeCollectAggregatorTest.java
index f2f3b46..7694623 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/siddhi/extension/AttributeCollectAggregatorTest.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/siddhi/extension/AttributeCollectAggregatorTest.java
@@ -18,10 +18,6 @@
*/
package org.apache.eagle.alert.engine.siddhi.extension;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Random;
-
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -31,9 +27,12 @@ import org.wso2.siddhi.core.event.Event;
import org.wso2.siddhi.core.stream.input.InputHandler;
import org.wso2.siddhi.core.stream.output.StreamCallback;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Random;
+
/**
* @since Apr 1, 2016
- *
*/
public class AttributeCollectAggregatorTest {
@@ -85,59 +84,59 @@ public class AttributeCollectAggregatorTest {
Event e = null;
long base = System.currentTimeMillis();
{
- e = new Event(base, new Object[] { base, "host" + r.nextInt(), "nova" });
+ e = new Event(base, new Object[] {base, "host" + r.nextInt(), "nova"});
base += 100;
events.add(e);
- e = new Event(base, new Object[] { base, "host" + r.nextInt(), "nova" });
+ e = new Event(base, new Object[] {base, "host" + r.nextInt(), "nova"});
base += 100;
events.add(e);
- e = new Event(base, new Object[] { base, "host" + r.nextInt(), "nova" });
+ e = new Event(base, new Object[] {base, "host" + r.nextInt(), "nova"});
base += 100;
events.add(e);
}
{
- e = new Event(base, new Object[] { base, "host" + r.nextInt(), "neutron" });
+ e = new Event(base, new Object[] {base, "host" + r.nextInt(), "neutron"});
base += 100;
events.add(e);
- e = new Event(base, new Object[] { base, "host" + r.nextInt(), "neutron" });
+ e = new Event(base, new Object[] {base, "host" + r.nextInt(), "neutron"});
base += 100;
events.add(e);
- e = new Event(base, new Object[] { base, "host" + r.nextInt(), "neutron" });
+ e = new Event(base, new Object[] {base, "host" + r.nextInt(), "neutron"});
base += 100;
events.add(e);
}
base += 10000;
{
- e = new Event(base, new Object[] { base, "host" + r.nextInt(), "nova1" });
+ e = new Event(base, new Object[] {base, "host" + r.nextInt(), "nova1"});
base += 100;
events.add(e);
- e = new Event(base, new Object[] { base, "host" + r.nextInt(), "nova1" });
+ e = new Event(base, new Object[] {base, "host" + r.nextInt(), "nova1"});
base += 100;
events.add(e);
- e = new Event(base, new Object[] { base, "host" + r.nextInt(), "nova1" });
+ e = new Event(base, new Object[] {base, "host" + r.nextInt(), "nova1"});
base += 100;
events.add(e);
}
{
- e = new Event(base, new Object[] { base, "host" + r.nextInt(), "neutron2" });
+ e = new Event(base, new Object[] {base, "host" + r.nextInt(), "neutron2"});
base += 100;
events.add(e);
- e = new Event(base, new Object[] { base, "host" + r.nextInt(), "neutron2" });
+ e = new Event(base, new Object[] {base, "host" + r.nextInt(), "neutron2"});
base += 100;
events.add(e);
- e = new Event(base, new Object[] { base, "host" + r.nextInt(), "neutron2" });
+ e = new Event(base, new Object[] {base, "host" + r.nextInt(), "neutron2"});
base += 100;
events.add(e);
}
base += 10000;
- e = new Event(base, new Object[] { base, "host" + r.nextInt(), "mq" });
+ e = new Event(base, new Object[] {base, "host" + r.nextInt(), "mq"});
return events.toArray(new Event[0]);
}
-
+
@Test
public void testQuery() {
String ql = "define stream perfmon_input_stream_cpu ( host string,timestamp long,metric string,pool string,value double,colo string );";