You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@eagle.apache.org by ha...@apache.org on 2016/06/02 07:07:49 UTC
[10/46] incubator-eagle git commit: [EAGLE-325] Initialize next-gen
alert engine code on branch-0.5
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/java/org/apache/eagle/alert/engine/mock/MockSampleMetadataFactory.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/java/org/apache/eagle/alert/engine/mock/MockSampleMetadataFactory.java b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/java/org/apache/eagle/alert/engine/mock/MockSampleMetadataFactory.java
deleted file mode 100644
index 97e6310..0000000
--- a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/java/org/apache/eagle/alert/engine/mock/MockSampleMetadataFactory.java
+++ /dev/null
@@ -1,266 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.engine.mock;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Random;
-import java.util.concurrent.ThreadLocalRandom;
-
-import org.apache.eagle.alert.coordination.model.PolicyWorkerQueue;
-import org.apache.eagle.alert.coordination.model.StreamRouterSpec;
-import org.apache.eagle.alert.coordination.model.WorkSlot;
-import org.apache.eagle.alert.engine.coordinator.PolicyDefinition;
-import org.apache.eagle.alert.engine.coordinator.StreamColumn;
-import org.apache.eagle.alert.engine.coordinator.StreamDefinition;
-import org.apache.eagle.alert.engine.coordinator.StreamDefinitionNotFoundException;
-import org.apache.eagle.alert.engine.coordinator.StreamPartition;
-import org.apache.eagle.alert.engine.coordinator.StreamSortSpec;
-import org.apache.eagle.alert.engine.evaluator.PolicyStreamHandlers;
-import org.apache.eagle.alert.engine.model.PartitionedEvent;
-import org.apache.eagle.alert.engine.model.StreamEvent;
-
-@SuppressWarnings("serial")
-public class MockSampleMetadataFactory {
- private static MockStreamMetadataService mockStreamMetadataServiceInstance = null;
- public static MockStreamMetadataService createSingletonMetadataServiceWithSample(){
- if(mockStreamMetadataServiceInstance!=null) return mockStreamMetadataServiceInstance;
- mockStreamMetadataServiceInstance = new MockStreamMetadataService();
- mockStreamMetadataServiceInstance.registerStream("sampleStream",createSampleStreamDefinition("sampleStream"));
- mockStreamMetadataServiceInstance.registerStream("sampleStream_1",createSampleStreamDefinition("sampleStream_1"));
- mockStreamMetadataServiceInstance.registerStream("sampleStream_2",createSampleStreamDefinition("sampleStream_2"));
- mockStreamMetadataServiceInstance.registerStream("sampleStream_3",createSampleStreamDefinition("sampleStream_3"));
- mockStreamMetadataServiceInstance.registerStream("sampleStream_4",createSampleStreamDefinition("sampleStream_4"));
- return mockStreamMetadataServiceInstance;
- }
-
- public static StreamDefinition createSampleStreamDefinition(String streamId){
- StreamDefinition sampleStreamDefinition = new StreamDefinition();
- sampleStreamDefinition.setStreamId(streamId);
- sampleStreamDefinition.setTimeseries(true);
- sampleStreamDefinition.setValidate(true);
- sampleStreamDefinition.setDescription("Schema for "+streamId);
- List<StreamColumn> streamColumns = new ArrayList<>();
-
- streamColumns.add(new StreamColumn.Builder().name("name").type(StreamColumn.Type.STRING).build());
- streamColumns.add(new StreamColumn.Builder().name("host").type(StreamColumn.Type.STRING).build());
- streamColumns.add(new StreamColumn.Builder().name("flag").type(StreamColumn.Type.BOOL).build());
- streamColumns.add(new StreamColumn.Builder().name("timestamp").type(StreamColumn.Type.LONG).build());
- streamColumns.add(new StreamColumn.Builder().name("value").type(StreamColumn.Type.DOUBLE).build());
-// streamColumns.add(new StreamColumn.Builder().name("value1").type(StreamColumn.Type.DOUBLE).build());
-// streamColumns.add(new StreamColumn.Builder().name("value2").type(StreamColumn.Type.DOUBLE).build());
-// streamColumns.add(new StreamColumn.Builder().name("value3").type(StreamColumn.Type.DOUBLE).build());
-// streamColumns.add(new StreamColumn.Builder().name("value4").type(StreamColumn.Type.DOUBLE).build());
-// streamColumns.add(new StreamColumn.Builder().name("value5").type(StreamColumn.Type.DOUBLE).build());
- sampleStreamDefinition.setColumns(streamColumns);
- return sampleStreamDefinition;
- }
-
- /**
- * By default window period is: PT1m
- *
- * @param streamId
- * @return
- */
- public static StreamSortSpec createSampleStreamSortSpec(String streamId){
- StreamSortSpec streamSortSpec = new StreamSortSpec();
-// streamSortSpec.setColumn("timestamp");
-// streamSortSpec.setStreamId(streamId);
- streamSortSpec.setWindowMargin(1000);
- streamSortSpec.setWindowPeriod("PT1m");
- return streamSortSpec;
- }
-
- public static StreamSortSpec createSampleStreamSortSpec(String streamId,String period,int margin){
- StreamSortSpec streamSortSpec = new StreamSortSpec();
-// streamSortSpec.setColumn("timestamp");
-// streamSortSpec.setStreamId(streamId);
- streamSortSpec.setWindowMargin(margin);
- streamSortSpec.setWindowPeriod(period);
- return streamSortSpec;
- }
-
- /**
- * Policy: from sampleStream_1[name == "cpu" and value > 50.0] select name, host, flag, value insert into outputStream;
- *
- * @return PolicyDefinition[from sampleStream_1[name == "cpu" and value > 50.0] select name, host, flag, value insert into outputStream;]
- */
- public static PolicyDefinition createSingleMetricSamplePolicy(){
- String definePolicy = "from sampleStream_1[name == \"cpu\" and value > 50.0] select name, host, flag, value insert into outputStream;";
- PolicyDefinition policyDefinition = new PolicyDefinition();
- policyDefinition.setName("SamplePolicyForTest");
- policyDefinition.setInputStreams(Arrays.asList("sampleStream_1"));
- policyDefinition.setOutputStreams(Arrays.asList("outputStream"));
- policyDefinition.setDefinition(new PolicyDefinition.Definition(
- PolicyStreamHandlers.SIDDHI_ENGINE,
- definePolicy
- ));
- policyDefinition.setPartitionSpec(Arrays.asList(createSampleStreamGroupbyPartition("sampleStream_1",Arrays.asList("name"))));
- return policyDefinition;
- }
-
- public static StreamPartition createSampleStreamGroupbyPartition(String streamId, List<String> groupByField){
- StreamPartition streamPartition = new StreamPartition();
- streamPartition.setStreamId(streamId);
- streamPartition.setColumns(new ArrayList<>(groupByField));
- streamPartition.setType(StreamPartition.Type.GROUPBY);
- StreamSortSpec streamSortSpec = new StreamSortSpec();
- streamSortSpec.setWindowPeriod("PT30m");
- streamSortSpec.setWindowMargin(10000);
- streamPartition.setSortSpec(streamSortSpec);
- return streamPartition;
- }
-
- public static StreamRouterSpec createSampleStreamRouteSpec(String streamId, String groupByField, List<String> targetEvaluatorIds){
- List<WorkSlot> slots = Arrays.asList(targetEvaluatorIds.stream().map((t) -> {
- return new WorkSlot("sampleTopology", t);
- }).toArray(WorkSlot[]::new));
- StreamRouterSpec streamRouteSpec = new StreamRouterSpec();
- streamRouteSpec.setStreamId(streamId);
- streamRouteSpec.setPartition(createSampleStreamGroupbyPartition(streamId,Arrays.asList(groupByField)));
- streamRouteSpec.setTargetQueue(Arrays.asList(new PolicyWorkerQueue(slots)));
- return streamRouteSpec;
- }
-
- public static StreamRouterSpec createSampleStreamRouteSpec(List<String> targetEvaluatorIds){
- return createSampleStreamRouteSpec("sampleStream_1","name",targetEvaluatorIds);
- }
-
- /**
- * GROUPBY_sampleStream_1_ON_name
- *
- * @param targetEvaluatorIds
- * @return
- */
- public static StreamRouterSpec createRouteSpec_GROUP_sampleStream_1_BY_name(List<String> targetEvaluatorIds){
- return createSampleStreamRouteSpec("sampleStream_1","name",targetEvaluatorIds);
- }
-
- public static StreamRouterSpec createRouteSpec_GROUP_sampleStream_2_BY_name(List<String> targetEvaluatorIds){
- return createSampleStreamRouteSpec("sampleStream_2","name",targetEvaluatorIds);
- }
-
- public static PartitionedEvent createSimpleStreamEvent() {
- StreamEvent event = null;
- try {
- event = StreamEvent.Builder()
- .schema(MockSampleMetadataFactory.createSingletonMetadataServiceWithSample().getStreamDefinition("sampleStream_1"))
- .streamId("sampleStream_1")
- .timestamep(System.currentTimeMillis())
- .attributes(new HashMap<String,Object>(){{
- put("name","cpu");
- put("value",60.0);
- put("unknown","unknown column value");
- }}).build();
- } catch (StreamDefinitionNotFoundException e) {
- e.printStackTrace();
- }
- PartitionedEvent pEvent = new PartitionedEvent();
- pEvent.setEvent(event);
- return pEvent;
- }
-
- private final static String[] SAMPLE_STREAM_NAME_OPTIONS=new String[]{
- "cpu","memory","disk","network"
- };
-
- private final static String[] SAMPLE_STREAM_HOST_OPTIONS =new String[]{
- "localhost_1","localhost_2","localhost_3","localhost_4"
- };
-
- private final static Boolean[] SAMPLE_STREAM_FLAG_OPTIONS=new Boolean[]{
- true,false
- };
-
- private final static Double[] SAMPLE_STREAM_VALUE_OPTIONS=new Double[]{
- -0.20, 40.4,50.5,60.6,10000.1
- };
- private final static String[] SAMPLE_STREAM_ID_OPTIONS=new String[]{
- "sampleStream_1","sampleStream_2","sampleStream_3","sampleStream_4",
- };
- private final static Random RANDOM = ThreadLocalRandom.current();
-
- public static StreamEvent createRandomStreamEvent() {
- return createRandomStreamEvent(SAMPLE_STREAM_ID_OPTIONS[RANDOM.nextInt(SAMPLE_STREAM_ID_OPTIONS.length)]);
- }
-
- public static StreamEvent createRandomStreamEvent(String streamId) {
- return createRandomStreamEvent(streamId,System.currentTimeMillis());
- }
-
- private final static Long[] TIME_DELTA_OPTIONS = new Long[]{
- -30000L, -10000L, -5000L, -1000L, 0L, 1000L, 5000L, 10000L, 30000L
- };
-
- public static StreamEvent createRandomOutOfTimeOrderStreamEvent(String streamId) {
- StreamEvent event = createRandomStreamEvent(streamId);
- event.setTimestamp(System.currentTimeMillis()+TIME_DELTA_OPTIONS[RANDOM.nextInt(TIME_DELTA_OPTIONS.length)]);
- return event;
- }
-
-
- public static PartitionedEvent createRandomOutOfTimeOrderEventGroupedByName(String streamId) {
- StreamEvent event = createRandomStreamEvent(streamId);
- event.setTimestamp(System.currentTimeMillis()+TIME_DELTA_OPTIONS[RANDOM.nextInt(TIME_DELTA_OPTIONS.length)]);
- return new PartitionedEvent(event,createSampleStreamGroupbyPartition(streamId,Arrays.asList("name")),event.getData()[0].hashCode());
- }
-
- public static PartitionedEvent createPartitionedEventGroupedByName(String streamId,long timestamp) {
- StreamEvent event = createRandomStreamEvent(streamId);
- event.setTimestamp(timestamp);
- return new PartitionedEvent(event,createSampleStreamGroupbyPartition(streamId,Arrays.asList("name")),event.getData()[0].hashCode());
- }
-
- public static PartitionedEvent createRandomSortedEventGroupedByName(String streamId) {
- StreamEvent event = createRandomStreamEvent(streamId);
- event.setTimestamp(System.currentTimeMillis());
- return new PartitionedEvent(event,createSampleStreamGroupbyPartition(streamId,Arrays.asList("name")),event.getData()[0].hashCode());
- }
-
- public static StreamEvent createRandomStreamEvent(String streamId, long timestamp) {
- StreamEvent event;
- try {
- event = StreamEvent.Builder()
- .schema(MockSampleMetadataFactory.createSingletonMetadataServiceWithSample().getStreamDefinition(streamId))
- .streamId(streamId)
- .timestamep(timestamp)
- .attributes(new HashMap<String,Object>(){{
- put("name",SAMPLE_STREAM_NAME_OPTIONS[RANDOM.nextInt(SAMPLE_STREAM_NAME_OPTIONS.length)]);
- put("value", SAMPLE_STREAM_VALUE_OPTIONS[RANDOM.nextInt(SAMPLE_STREAM_VALUE_OPTIONS.length)]);
- put("host", SAMPLE_STREAM_HOST_OPTIONS[RANDOM.nextInt(SAMPLE_STREAM_HOST_OPTIONS.length)]);
- put("flag",SAMPLE_STREAM_FLAG_OPTIONS[RANDOM.nextInt(SAMPLE_STREAM_FLAG_OPTIONS.length)]);
-// put("value1", SAMPLE_STREAM_VALUE_OPTIONS[RANDOM.nextInt(SAMPLE_STREAM_VALUE_OPTIONS.length)]);
-// put("value2", SAMPLE_STREAM_VALUE_OPTIONS[RANDOM.nextInt(SAMPLE_STREAM_VALUE_OPTIONS.length)]);
-// put("value3", SAMPLE_STREAM_VALUE_OPTIONS[RANDOM.nextInt(SAMPLE_STREAM_VALUE_OPTIONS.length)]);
-// put("value4", SAMPLE_STREAM_VALUE_OPTIONS[RANDOM.nextInt(SAMPLE_STREAM_VALUE_OPTIONS.length)]);
-// put("value5", SAMPLE_STREAM_VALUE_OPTIONS[RANDOM.nextInt(SAMPLE_STREAM_VALUE_OPTIONS.length)]);
- put("unknown","unknown column value");
- }}).build();
- } catch (StreamDefinitionNotFoundException e) {
- throw new IllegalStateException(e.getMessage(),e);
- }
- return event;
- }
-
- public static PartitionedEvent createRandomPartitionedEvent(String streamId, long timestamp) {
- StreamEvent event = createRandomStreamEvent(streamId,timestamp);
- PartitionedEvent partitionedEvent = new PartitionedEvent(event,createSampleStreamGroupbyPartition(streamId,Arrays.asList("name")),event.getData()[0].hashCode());
- return partitionedEvent;
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/java/org/apache/eagle/alert/engine/mock/MockStreamCollector.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/java/org/apache/eagle/alert/engine/mock/MockStreamCollector.java b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/java/org/apache/eagle/alert/engine/mock/MockStreamCollector.java
deleted file mode 100755
index fa07701..0000000
--- a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/java/org/apache/eagle/alert/engine/mock/MockStreamCollector.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.engine.mock;
-
-import java.util.LinkedList;
-import java.util.List;
-
-import org.apache.eagle.alert.engine.Collector;
-import org.apache.eagle.alert.engine.model.AlertStreamEvent;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class MockStreamCollector implements Collector<AlertStreamEvent> {
- @SuppressWarnings("unused")
- private final static Logger LOG = LoggerFactory.getLogger(MockStreamCollector.class);
- private List<AlertStreamEvent> cache;
- public MockStreamCollector(){
- cache = new LinkedList<>();
- }
-
- public void emit(AlertStreamEvent event) {
- cache.add(event);
- // LOG.info("PartitionedEventCollector received: {}",event);
- }
-
- public void clear(){
- cache.clear();
- }
-
- public List<AlertStreamEvent> get(){
- return cache;
- }
-
- public int size(){
- return cache.size();
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/java/org/apache/eagle/alert/engine/mock/MockStreamMetadataService.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/java/org/apache/eagle/alert/engine/mock/MockStreamMetadataService.java b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/java/org/apache/eagle/alert/engine/mock/MockStreamMetadataService.java
deleted file mode 100644
index 2119ecd..0000000
--- a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/java/org/apache/eagle/alert/engine/mock/MockStreamMetadataService.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.engine.mock;
-
-import java.util.HashMap;
-import java.util.Map;
-
-import org.apache.eagle.alert.engine.coordinator.StreamDefinition;
-import org.apache.eagle.alert.engine.coordinator.StreamDefinitionNotFoundException;
-
-public class MockStreamMetadataService{
- private final Map<String,StreamDefinition> streamSchemaMap = new HashMap<>();
-
- public StreamDefinition getStreamDefinition(String streamId) throws StreamDefinitionNotFoundException {
- if(streamSchemaMap.containsKey(streamId)) {
- return streamSchemaMap.get(streamId);
- }else {
- throw new StreamDefinitionNotFoundException(streamId);
- }
- }
-
- public void registerStream(String streamId, StreamDefinition schema){
- streamSchemaMap.put(streamId,schema);
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/java/org/apache/eagle/alert/engine/mock/MockStreamReceiver.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/java/org/apache/eagle/alert/engine/mock/MockStreamReceiver.java b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/java/org/apache/eagle/alert/engine/mock/MockStreamReceiver.java
deleted file mode 100644
index 1e44c2a..0000000
--- a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/java/org/apache/eagle/alert/engine/mock/MockStreamReceiver.java
+++ /dev/null
@@ -1,79 +0,0 @@
-package org.apache.eagle.alert.engine.mock;
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.eagle.alert.engine.model.PartitionedEvent;
-import org.apache.eagle.alert.utils.AlertConstants;
-import org.apache.eagle.alert.utils.StreamIdConversion;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import backtype.storm.spout.SpoutOutputCollector;
-import backtype.storm.task.TopologyContext;
-import backtype.storm.topology.OutputFieldsDeclarer;
-import backtype.storm.topology.base.BaseRichSpout;
-import backtype.storm.tuple.Fields;
-import backtype.storm.utils.Utils;
-
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-@SuppressWarnings("serial")
-public class MockStreamReceiver extends BaseRichSpout {
- private final static Logger LOG = LoggerFactory.getLogger(MockStreamReceiver.class);
- private SpoutOutputCollector collector;
- private List<String> outputStreamIds;
- public MockStreamReceiver(int partition){
- outputStreamIds = new ArrayList<>(partition);
- for(int i=0;i<partition;i++){
- outputStreamIds.add(StreamIdConversion.generateStreamIdByPartition(i));
- }
- }
-
- @SuppressWarnings("rawtypes")
- @Override
- public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
- this.collector = collector;
- }
-
- @Override
- public void close() {}
-
- /**
- * This unit test is not to mock the end2end logic of correlation spout,
- * but simply generate some sample data for following bolts testing
- */
- @Override
- public void nextTuple() {
- PartitionedEvent event = MockSampleMetadataFactory.createRandomOutOfTimeOrderEventGroupedByName("sampleStream_1");
- LOG.info("Receive {}",event);
- collector.emit(outputStreamIds.get(
- // group by the first field in event i.e. name
- (int) (event.getPartitionKey() % outputStreamIds.size())),
- Collections.singletonList(event));
- Utils.sleep(500);
- }
-
- @Override
- public void declareOutputFields(OutputFieldsDeclarer declarer) {
- for(String streamId:outputStreamIds) {
- declarer.declareStream(streamId,new Fields(AlertConstants.FIELD_0));
- }
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/java/org/apache/eagle/alert/engine/perf/TestSerDeserPer.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/java/org/apache/eagle/alert/engine/perf/TestSerDeserPer.java b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/java/org/apache/eagle/alert/engine/perf/TestSerDeserPer.java
deleted file mode 100644
index 251e47f..0000000
--- a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/java/org/apache/eagle/alert/engine/perf/TestSerDeserPer.java
+++ /dev/null
@@ -1,317 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.engine.perf;
-
-import java.io.FileInputStream;
-import java.io.FileOutputStream;
-import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.eagle.alert.engine.coordinator.StreamPartition;
-import org.apache.eagle.alert.engine.coordinator.StreamSortSpec;
-import org.apache.eagle.alert.engine.model.PartitionedEvent;
-import org.apache.eagle.alert.engine.model.StreamEvent;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Test;
-
-import com.esotericsoftware.kryo.Kryo;
-import com.esotericsoftware.kryo.io.Input;
-import com.esotericsoftware.kryo.io.Output;
-
-/**
- * Since 5/13/16.
- */
-public class TestSerDeserPer {
- Object[] data = null;
- @Before
- public void before(){
- int max = 100;
- StringBuilder sb = new StringBuilder();
- for(int i=0; i<max; i++){
- sb.append("a");
- }
- data = new Object[]{sb.toString()};
- }
-
- @Test
- public void testSerDeserPerf() throws Exception{
- Kryo kryo = new Kryo();
- Output output = new Output(new FileOutputStream("/tmp/file.bin"));
- for(int i=0; i<1000; i++){
- kryo.writeObject(output, constructPE());
- }
- output.close();
- Input input = new Input(new FileInputStream("/tmp/file.bin"));
- PartitionedEvent someObject = kryo.readObject(input, PartitionedEvent.class);
- input.close();
- Assert.assertTrue(someObject.getData().length == 1);
- }
-
- private PartitionedEvent constructPE(){
- StreamEvent e = new StreamEvent();
- e.setStreamId("testStreamId");
- e.setTimestamp(1463159382000L);
- e.setData(data);
- StreamPartition sp = new StreamPartition();
- List<String> col = new ArrayList<>();
- col.add("host");
- sp.setColumns(col);
- StreamSortSpec sortSpec = new StreamSortSpec();
- sortSpec.setWindowMargin(30000);
- sortSpec.setWindowPeriod("PT1M");
- sp.setSortSpec(sortSpec);
- sp.setStreamId("testStreamId");
- sp.setType(StreamPartition.Type.GROUPBY);
- PartitionedEvent pe = new PartitionedEvent();
- pe.setEvent(e);
- pe.setPartition(sp);
- pe.setPartitionKey(1000);
- return pe;
- }
-
- @Test
- public void testSerDeserPerf2() throws Exception{
- Kryo kryo = new Kryo();
- Output output = new Output(new FileOutputStream("/tmp/file2.bin"));
- for(int i=0; i<1000; i++){
- kryo.writeObject(output, constructNewPE());
- }
- output.close();
- Input input = new Input(new FileInputStream("/tmp/file2.bin"));
- NewPartitionedEvent someObject = kryo.readObject(input, NewPartitionedEvent.class);
- input.close();
- Assert.assertTrue(someObject.getData().length == 1);
- }
-
- private NewPartitionedEvent constructNewPE(){
- NewPartitionedEvent pe = new NewPartitionedEvent();
- pe.setStreamId("testStreamId");
- pe.setTimestamp(1463159382000L);
- pe.setData(data);
-
- pe.setType(StreamPartition.Type.GROUPBY);
- List<String> col = new ArrayList<>();
- col.add("host");
- pe.setColumns(col);
- pe.setPartitionKey(1000);
-
- pe.setWindowMargin(30000);
- pe.setWindowPeriod("PT1M");
- return pe;
- }
-
- @Test
- public void testSerDeserPerf3() throws Exception{
- Kryo kryo = new Kryo();
- Output output = new Output(new FileOutputStream("/tmp/file3.bin"));
- for(int i=0; i<1000; i++){
- kryo.writeObject(output, constructNewPE2());
- }
- output.close();
- Input input = new Input(new FileInputStream("/tmp/file3.bin"));
- NewPartitionedEvent2 someObject = kryo.readObject(input, NewPartitionedEvent2.class);
- input.close();
- Assert.assertTrue(someObject.getData().length == 1);
- }
-
- private NewPartitionedEvent2 constructNewPE2(){
- NewPartitionedEvent2 pe = new NewPartitionedEvent2();
- pe.setStreamId(100);
- pe.setTimestamp(1463159382000L);
- pe.setData(data);
-
- pe.setType(1);
- int[] col = new int[1];
- col[0] = 1;
- pe.setColumns(col);
- pe.setPartitionKey(1000);
-
- pe.setWindowMargin(30000);
- pe.setWindowPeriod(60);
- return pe;
- }
-
- public static class NewPartitionedEvent implements Serializable {
- private static final long serialVersionUID = -3840016190614238593L;
- // basic
- private String streamId;
- private long timestamp;
- private Object[] data;
-
- // stream partition
- private StreamPartition.Type type;
- private List<String> columns = new ArrayList<>();
- private long partitionKey;
-
- // sort spec
- private String windowPeriod="";
- private long windowMargin = 30 * 1000;
-
- public NewPartitionedEvent(){
- }
-
- public String getStreamId() {
- return streamId;
- }
-
- public void setStreamId(String streamId) {
- this.streamId = streamId;
- }
-
- public long getTimestamp() {
- return timestamp;
- }
-
- public void setTimestamp(long timestamp) {
- this.timestamp = timestamp;
- }
-
- public Object[] getData() {
- return data;
- }
-
- public void setData(Object[] data) {
- this.data = data;
- }
-
- public StreamPartition.Type getType() {
- return type;
- }
-
- public void setType(StreamPartition.Type type) {
- this.type = type;
- }
-
- public List<String> getColumns() {
- return columns;
- }
-
- public void setColumns(List<String> columns) {
- this.columns = columns;
- }
-
- public long getPartitionKey() {
- return partitionKey;
- }
-
- public void setPartitionKey(long partitionKey) {
- this.partitionKey = partitionKey;
- }
-
- public String getWindowPeriod() {
- return windowPeriod;
- }
-
- public void setWindowPeriod(String windowPeriod) {
- this.windowPeriod = windowPeriod;
- }
-
- public long getWindowMargin() {
- return windowMargin;
- }
-
- public void setWindowMargin(long windowMargin) {
- this.windowMargin = windowMargin;
- }
- }
-
- public static class NewPartitionedEvent2 implements Serializable {
- private static final long serialVersionUID = -3840016190614238593L;
- // basic
- private int streamId;
- private long timestamp;
- private Object[] data;
-
- // stream partition
- private int type;
- private int[] columns;
- private long partitionKey;
-
- // sort spec
- private long windowPeriod;
- private long windowMargin = 30 * 1000;
-
- public NewPartitionedEvent2(){
- }
-
- public int getStreamId() {
- return streamId;
- }
-
- public void setStreamId(int streamId) {
- this.streamId = streamId;
- }
-
- public int getType() {
- return type;
- }
-
- public void setType(int type) {
- this.type = type;
- }
-
- public int[] getColumns() {
- return columns;
- }
-
- public void setColumns(int[] columns) {
- this.columns = columns;
- }
-
- public long getPartitionKey() {
- return partitionKey;
- }
-
- public void setPartitionKey(long partitionKey) {
- this.partitionKey = partitionKey;
- }
-
- public long getWindowPeriod() {
- return windowPeriod;
- }
-
- public void setWindowPeriod(long windowPeriod) {
- this.windowPeriod = windowPeriod;
- }
-
- public long getTimestamp() {
- return timestamp;
- }
-
- public void setTimestamp(long timestamp) {
- this.timestamp = timestamp;
- }
-
- public Object[] getData() {
- return data;
- }
-
- public void setData(Object[] data) {
- this.data = data;
- }
-
- public long getWindowMargin() {
- return windowMargin;
- }
-
- public void setWindowMargin(long windowMargin) {
- this.windowMargin = windowMargin;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/java/org/apache/eagle/alert/engine/router/TestAlertBolt.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/java/org/apache/eagle/alert/engine/router/TestAlertBolt.java b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/java/org/apache/eagle/alert/engine/router/TestAlertBolt.java
deleted file mode 100755
index e322099..0000000
--- a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/java/org/apache/eagle/alert/engine/router/TestAlertBolt.java
+++ /dev/null
@@ -1,177 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.engine.router;
-
-import static org.mockito.Matchers.any;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.Semaphore;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import org.apache.eagle.alert.coordination.model.AlertBoltSpec;
-import org.apache.eagle.alert.engine.coordinator.PolicyDefinition;
-import org.apache.eagle.alert.engine.coordinator.StreamColumn;
-import org.apache.eagle.alert.engine.coordinator.StreamDefinition;
-import org.apache.eagle.alert.engine.coordinator.StreamPartition;
-import org.apache.eagle.alert.engine.evaluator.PolicyGroupEvaluator;
-import org.apache.eagle.alert.engine.evaluator.PolicyStreamHandlers;
-import org.apache.eagle.alert.engine.evaluator.impl.PolicyGroupEvaluatorImpl;
-import org.apache.eagle.alert.engine.model.AlertStreamEvent;
-import org.apache.eagle.alert.engine.model.PartitionedEvent;
-import org.apache.eagle.alert.engine.model.StreamEvent;
-import org.apache.eagle.alert.engine.runner.AlertBolt;
-import org.apache.eagle.alert.engine.runner.TestStreamRouterBolt;
-import org.apache.eagle.alert.utils.DateTimeUtil;
-import org.junit.Assert;
-import org.junit.Test;
-
-import backtype.storm.metric.api.MultiCountMetric;
-import backtype.storm.task.GeneralTopologyContext;
-import backtype.storm.task.IOutputCollector;
-import backtype.storm.task.OutputCollector;
-import backtype.storm.task.TopologyContext;
-import backtype.storm.tuple.Fields;
-import backtype.storm.tuple.Tuple;
-import backtype.storm.tuple.TupleImpl;
-
-import com.typesafe.config.Config;
-import com.typesafe.config.ConfigFactory;
-
-/**
- * Since 5/2/16.
- */
-@SuppressWarnings({"rawtypes", "unused"})
-public class TestAlertBolt {
- /**
- * Following knowledge is guaranteed in
- *
- * @see org.apache.eagle.alert.engine.runner.AlertBolt#execute{
- * if(!routedStreamEvent.getRoute().getTargetComponentId().equals(this.policyGroupEvaluator.getName())){
- * throw new IllegalStateException("Got event targeted to "+ routedStreamEvent.getRoute().getTargetComponentId()+" in "+this.policyGroupEvaluator.getName());
- * }
- * }
- *
- * @throws Exception
- *
- * Add test case: 2 alerts should be generated even if they are very close to each other in timestamp
- */
- @Test
- public void testAlertBolt() throws Exception{
- final AtomicInteger alertCount = new AtomicInteger();
- final Semaphore mutex = new Semaphore(0);
- Config config = ConfigFactory.load();
- PolicyGroupEvaluator policyGroupEvaluator = new PolicyGroupEvaluatorImpl("testPolicyGroupEvaluatorImpl");
- TestStreamRouterBolt.MockChangeService mockChangeService = new TestStreamRouterBolt.MockChangeService();
- AlertBolt bolt = new AlertBolt("alertBolt1", policyGroupEvaluator, config, mockChangeService);
- OutputCollector collector = new OutputCollector(new IOutputCollector(){
- int count = 0;
- @Override
- public List<Integer> emit(String streamId, Collection<Tuple> anchors, List<Object> tuple) {
- alertCount.incrementAndGet();
- mutex.release();
- Assert.assertEquals("testAlertStream", tuple.get(0));
- AlertStreamEvent event = (AlertStreamEvent) tuple.get(1);
- System.out.println(String.format("collector received: [streamId=[%s], tuple=[%s] ", streamId, tuple));
- return null;
- }
- @Override
- public void emitDirect(int taskId, String streamId, Collection<Tuple> anchors, List<Object> tuple) { }
- @Override
- public void ack(Tuple input) { }
- @Override
- public void fail(Tuple input) { }
- @Override
- public void reportError(Throwable error) { }
- });
- Map stormConf = new HashMap<>();
- TopologyContext topologyContext = mock(TopologyContext.class);
- when(topologyContext.registerMetric(any(String.class), any(MultiCountMetric.class), any(int.class))).thenReturn(new MultiCountMetric());
- bolt.prepare(stormConf, topologyContext, collector);
-
- String streamId = "cpuUsageStream";
-
- // construct StreamDefinition
- StreamDefinition schema = new StreamDefinition();
- schema.setStreamId(streamId);
- StreamColumn column = new StreamColumn();
- column.setName("col1");
- column.setType(StreamColumn.Type.STRING);
- schema.setColumns(Collections.singletonList(column));
- Map<String, StreamDefinition> sds = new HashMap<>();
- sds.put(schema.getStreamId(), schema);
-
- // construct StreamPartition
- StreamPartition sp = new StreamPartition();
- sp.setColumns(Collections.singletonList("col1"));
- sp.setStreamId(streamId);
- sp.setType(StreamPartition.Type.GROUPBY);
-
- AlertBoltSpec spec = new AlertBoltSpec();
- spec.setVersion("version1");
- spec.setTopologyName("testTopology");
- PolicyDefinition pd = new PolicyDefinition();
- pd.setName("policy1");
- pd.setPartitionSpec(Collections.singletonList(sp));
- pd.setOutputStreams(Collections.singletonList("testAlertStream"));
- pd.setInputStreams(Collections.singletonList(streamId));
- pd.setDefinition(new PolicyDefinition.Definition());
- pd.getDefinition().type = PolicyStreamHandlers.SIDDHI_ENGINE;
- pd.getDefinition().value = "from cpuUsageStream[col1=='value1' OR col1=='value2'] select col1 insert into testAlertStream;";
- spec.addBoltPolicy("alertBolt1", pd.getName());
- spec.getBoltPoliciesMap().put("alertBolt1", new ArrayList<PolicyDefinition>(Arrays.asList(pd)));
- bolt.onAlertBoltSpecChange(spec, sds);
-
- // contruct GeneralTopologyContext
- GeneralTopologyContext context = mock(GeneralTopologyContext.class);
- int taskId = 1;
- when(context.getComponentId(taskId)).thenReturn("comp1");
- when(context.getComponentOutputFields("comp1", "default")).thenReturn(new Fields("f0"));
-
- // construct event with "value1"
- StreamEvent event1 = new StreamEvent();
- event1.setTimestamp(DateTimeUtil.humanDateToSeconds("2016-01-01 00:00:00")*1000);
- Object[] data = new Object[]{"value1"};
- event1.setData(data);
- event1.setStreamId(streamId);
- PartitionedEvent partitionedEvent1 = new PartitionedEvent(event1, sp,1001);
-
- // construct another event with "value1"
- StreamEvent event2 = new StreamEvent();
- event2.setTimestamp(DateTimeUtil.humanDateToSeconds("2016-01-01 00:00:00")*1000);
- data = new Object[]{"value2"};
- event2.setData(data);
- event2.setStreamId(streamId);
- PartitionedEvent partitionedEvent2 = new PartitionedEvent(event2, sp,1001);
-
- Tuple input = new TupleImpl(context, Collections.singletonList(partitionedEvent1), taskId, "default");
- Tuple input2 = new TupleImpl(context, Collections.singletonList(partitionedEvent2), taskId, "default");
- bolt.execute(input);
- bolt.execute(input2);
- Assert.assertTrue("Timeout to acquire mutex in 5s",mutex.tryAcquire(2, 5, TimeUnit.SECONDS));
- Assert.assertEquals(2, alertCount.get());
- bolt.cleanup();
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/java/org/apache/eagle/alert/engine/router/TestAlertPublisherBolt.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/java/org/apache/eagle/alert/engine/router/TestAlertPublisherBolt.java b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/java/org/apache/eagle/alert/engine/router/TestAlertPublisherBolt.java
deleted file mode 100644
index af79f96..0000000
--- a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/java/org/apache/eagle/alert/engine/router/TestAlertPublisherBolt.java
+++ /dev/null
@@ -1,111 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.eagle.alert.engine.router;
-
-import com.fasterxml.jackson.databind.JavaType;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.databind.type.CollectionType;
-import com.fasterxml.jackson.databind.type.SimpleType;
-import org.apache.eagle.alert.coordination.model.PublishSpec;
-import org.apache.eagle.alert.engine.coordinator.PolicyDefinition;
-import org.apache.eagle.alert.engine.coordinator.Publishment;
-import org.apache.eagle.alert.engine.model.AlertStreamEvent;
-import org.apache.eagle.alert.engine.publisher.AlertPublisher;
-import org.apache.eagle.alert.engine.publisher.impl.AlertPublisherImpl;
-import org.apache.eagle.alert.engine.runner.AlertPublisherBolt;
-import org.apache.eagle.alert.engine.runner.MapComparator;
-import org.apache.eagle.alert.engine.utils.MetadataSerDeser;
-import org.junit.Assert;
-import org.junit.Ignore;
-import org.junit.Test;
-
-import com.typesafe.config.Config;
-import com.typesafe.config.ConfigFactory;
-
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-/**
- * @Since 5/14/16.
- */
-public class TestAlertPublisherBolt {
-
- @Ignore
- @Test
- public void test() {
- Config config = ConfigFactory.load("application-test.conf");
- AlertPublisher publisher = new AlertPublisherImpl("alertPublishBolt");
- publisher.init(config);
- PublishSpec spec = MetadataSerDeser.deserialize(getClass().getResourceAsStream("/testPublishSpec.json"), PublishSpec.class);
- publisher.onPublishChange(spec.getPublishments(), null, null, null);
- AlertStreamEvent event = create("testAlertStream");
- publisher.nextEvent(event);
- AlertStreamEvent event1 = create("testAlertStream");
- publisher.nextEvent(event1);
- }
-
- private AlertStreamEvent create(String streamId){
- AlertStreamEvent alert = new AlertStreamEvent();
- PolicyDefinition policy = new PolicyDefinition();
- policy.setName("policy1");
- alert.setPolicy(policy);
- alert.setCreatedTime(System.currentTimeMillis());
- alert.setData(new Object[]{"field_1", 2, "field_3"});
- alert.setStreamId(streamId);
- alert.setCreatedBy(this.toString());
- return alert;
- }
-
- @Test
- public void testMapComparator() {
- PublishSpec spec1 = MetadataSerDeser.deserialize(getClass().getResourceAsStream("/testPublishSpec.json"), PublishSpec.class);
- PublishSpec spec2 = MetadataSerDeser.deserialize(getClass().getResourceAsStream("/testPublishSpec2.json"), PublishSpec.class);
- Map<String, Publishment> map1 = new HashMap<>();
- Map<String, Publishment> map2 = new HashMap<>();
- spec1.getPublishments().forEach(p -> map1.put(p.getName(), p));
- spec2.getPublishments().forEach(p -> map2.put(p.getName(), p));
-
- MapComparator<String, Publishment> comparator = new MapComparator<>(map1, map2);
- comparator.compare();
- Assert.assertTrue(comparator.getModified().size() == 1);
-
- AlertPublisher alertPublisher = new AlertPublisherImpl("alert-publisher-test");
- AlertPublisherBolt publisherBolt = new AlertPublisherBolt(alertPublisher, null, null);
- publisherBolt.onAlertPublishSpecChange(spec1, null);
- publisherBolt.onAlertPublishSpecChange(spec2, null);
- }
-
- @Test
- public void testAlertPublisher() throws Exception {
- AlertPublisher alertPublisher = new AlertPublisherImpl("alert-publisher-test");
- List<Publishment> oldPubs = loadEntities("/publishments.json", Publishment.class);
- List<Publishment> newPubs = loadEntities("/publishments2.json", Publishment.class);
- alertPublisher.onPublishChange(oldPubs, null, null, null);
- alertPublisher.onPublishChange(null, null, newPubs, oldPubs);
- }
-
- private <T> List<T> loadEntities(String path, Class<T> tClz) throws Exception {
- ObjectMapper objectMapper = new ObjectMapper();
- JavaType type = CollectionType.construct(List.class, SimpleType.construct(tClz));
- List<T> l = objectMapper.readValue(TestAlertPublisherBolt.class.getResourceAsStream(path), type);
- return l;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/java/org/apache/eagle/alert/engine/runner/TestStreamRouterBolt.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/java/org/apache/eagle/alert/engine/runner/TestStreamRouterBolt.java b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/java/org/apache/eagle/alert/engine/runner/TestStreamRouterBolt.java
deleted file mode 100644
index 8c048cb..0000000
--- a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/java/org/apache/eagle/alert/engine/runner/TestStreamRouterBolt.java
+++ /dev/null
@@ -1,259 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.engine.runner;
-
-import backtype.storm.metric.api.MultiCountMetric;
-import backtype.storm.task.GeneralTopologyContext;
-import backtype.storm.task.IOutputCollector;
-import backtype.storm.task.OutputCollector;
-import backtype.storm.task.TopologyContext;
-import backtype.storm.tuple.Fields;
-import backtype.storm.tuple.Tuple;
-import backtype.storm.tuple.TupleImpl;
-import com.typesafe.config.Config;
-import com.typesafe.config.ConfigFactory;
-import org.apache.eagle.alert.coordination.model.PolicyWorkerQueue;
-import org.apache.eagle.alert.coordination.model.RouterSpec;
-import org.apache.eagle.alert.coordination.model.StreamRouterSpec;
-import org.apache.eagle.alert.coordination.model.WorkSlot;
-import org.apache.eagle.alert.engine.coordinator.StreamColumn;
-import org.apache.eagle.alert.engine.coordinator.StreamDefinition;
-import org.apache.eagle.alert.engine.coordinator.StreamPartition;
-import org.apache.eagle.alert.engine.coordinator.StreamSortSpec;
-import org.apache.eagle.alert.engine.coordinator.impl.AbstractMetadataChangeNotifyService;
-import org.apache.eagle.alert.engine.model.PartitionedEvent;
-import org.apache.eagle.alert.engine.model.StreamEvent;
-import org.apache.eagle.alert.engine.router.impl.StreamRouterImpl;
-import org.apache.eagle.alert.utils.DateTimeUtil;
-import org.joda.time.Period;
-import org.junit.Assert;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.util.*;
-
-import static org.mockito.Matchers.any;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
-public class TestStreamRouterBolt {
- private final static Logger LOG = LoggerFactory.getLogger(TestStreamRouterBolt.class);
-
- /**
- * Mocked 5 Events
- *
- * 1. Sent in random order:
- * "value1","value2","value3","value4","value5"
- *
- * 2. Received correct time order and value5 is thrown because too late: "value2","value1","value3","value4"
- *
- * @throws Exception
- */
- @SuppressWarnings("rawtypes")
- @Test
- public void testRouterWithSortAndRouteSpec() throws Exception{
- Config config = ConfigFactory.load();
- StreamRouterImpl routerImpl = new StreamRouterImpl("testStreamRouterImpl");
- MockChangeService mockChangeService = new MockChangeService();
- StreamRouterBolt bolt = new StreamRouterBolt(routerImpl, config, mockChangeService);
-
- final Map<String,List<PartitionedEvent>> streamCollected = new HashMap<>();
- final List<PartitionedEvent> orderCollected = new ArrayList<>();
-
- OutputCollector collector = new OutputCollector(new IOutputCollector(){
- int count = 0;
- @Override
- public List<Integer> emit(String streamId, Collection<Tuple> anchors, List<Object> tuple) {
- PartitionedEvent event;
- try {
- event = bolt.deserialize(tuple.get(0));
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
- if(count == 0) {
- count++;
- }
- LOG.info(String.format("Collector received: [streamId=[%s], tuple=[%s] ", streamId, tuple));
- if(!streamCollected.containsKey(streamId)){
- streamCollected.put(streamId,new ArrayList<>());
- }
- streamCollected.get(streamId).add(event);
- orderCollected.add(event);
- return null;
- }
- @Override
- public void emitDirect(int taskId, String streamId, Collection<Tuple> anchors, List<Object> tuple) { }
- @Override
- public void ack(Tuple input) { }
- @Override
- public void fail(Tuple input) { }
- @SuppressWarnings("unused")
- public void resetTimeout(Tuple input) { }
- @Override
- public void reportError(Throwable error) { }
- });
-
- Map stormConf = new HashMap<>();
- TopologyContext topologyContext = mock(TopologyContext.class);
- when(topologyContext.registerMetric(any(String.class), any(MultiCountMetric.class), any(int.class))).thenReturn(new MultiCountMetric());
- bolt.prepare(stormConf, topologyContext, collector);
-
- String streamId = "cpuUsageStream";
- // StreamPartition, groupby col1 for stream cpuUsageStream
- StreamPartition sp = new StreamPartition();
- sp.setStreamId(streamId);
- sp.setColumns(Collections.singletonList("col1"));
- sp.setType(StreamPartition.Type.GROUPBY);
-
- StreamSortSpec sortSpec = new StreamSortSpec();
-// sortSpec.setColumn("timestamp");
-// sortSpec.setOrder("asc");
- sortSpec.setWindowPeriod2(Period.minutes(1));
- sortSpec.setWindowMargin(1000);
- sp.setSortSpec(sortSpec);
-
- RouterSpec boltSpec = new RouterSpec();
-
- // set StreamRouterSpec to have 2 WorkSlot
- StreamRouterSpec routerSpec = new StreamRouterSpec();
- routerSpec.setPartition(sp);
- routerSpec.setStreamId(streamId);
- PolicyWorkerQueue queue = new PolicyWorkerQueue();
- queue.setPartition(sp);
- queue.setWorkers(Arrays.asList(new WorkSlot("testTopology","alertBolt1"), new WorkSlot("testTopology","alertBolt2")));
- routerSpec.setTargetQueue(Collections.singletonList(queue));
- boltSpec.addRouterSpec(routerSpec);
-
- // construct StreamDefinition
- StreamDefinition schema = new StreamDefinition();
- schema.setStreamId(streamId);
- StreamColumn column = new StreamColumn();
- column.setName("col1");
- column.setType(StreamColumn.Type.STRING);
- schema.setColumns(Collections.singletonList(column));
- Map<String, StreamDefinition> sds = new HashMap<>();
- sds.put(schema.getStreamId(), schema);
-
- bolt.declareOutputStreams(Arrays.asList("alertBolt1", "alertBolt2"));
- bolt.onStreamRouteBoltSpecChange(boltSpec, sds);
- GeneralTopologyContext context = mock(GeneralTopologyContext.class);
- int taskId = 1;
- when(context.getComponentId(taskId)).thenReturn("comp1");
- when(context.getComponentOutputFields("comp1", "default")).thenReturn(new Fields("f0"));
-
- // =======================================
- // Mock 5 Events
- //
- // 1. Sent in random order:
- // "value1","value2","value3","value4","value5"
- //
- // 2. Received correct time order and value5 is thrown because too:
- // "value2","value1","value3","value4"
- // =======================================
-
- // construct event with "value1"
- StreamEvent event = new StreamEvent();
- event.setTimestamp(DateTimeUtil.humanDateToSeconds("2016-01-01 00:01:30")*1000);
- Object[] data = new Object[]{"value1"};
- event.setData(data);
- event.setStreamId(streamId);
- PartitionedEvent pEvent = new PartitionedEvent();
- pEvent.setEvent(event);
- pEvent.setPartition(sp);
- Tuple input = new TupleImpl(context, Collections.singletonList(pEvent), taskId, "default");
- bolt.execute(input);
-
- // construct another event with "value2"
- event = new StreamEvent();
- event.setTimestamp(DateTimeUtil.humanDateToSeconds("2016-01-01 00:01:10")*1000);
- data = new Object[]{"value2"};
- event.setData(data);
- event.setStreamId(streamId);
- pEvent = new PartitionedEvent();
- pEvent.setPartition(sp);
- pEvent.setEvent(event);
- input = new TupleImpl(context, Collections.singletonList(pEvent), taskId, "default");
- bolt.execute(input);
-
- // construct another event with "value3"
- event = new StreamEvent();
- event.setTimestamp(DateTimeUtil.humanDateToSeconds("2016-01-01 00:01:40")*1000);
- data = new Object[]{"value3"};
- event.setData(data);
- event.setStreamId(streamId);
- pEvent = new PartitionedEvent();
- pEvent.setPartition(sp);
- pEvent.setEvent(event);
- input = new TupleImpl(context, Collections.singletonList(pEvent), taskId, "default");
- bolt.execute(input);
-
- // construct another event with "value4"
- event = new StreamEvent();
- event.setTimestamp(DateTimeUtil.humanDateToSeconds("2016-01-01 00:02:10")*1000);
- data = new Object[]{"value4"};
- event.setData(data);
- event.setStreamId(streamId);
- pEvent = new PartitionedEvent();
- pEvent.setPartition(sp);
- pEvent.setEvent(event);
- input = new TupleImpl(context, Collections.singletonList(pEvent), taskId, "default");
- bolt.execute(input);
-
- // construct another event with "value5", which will be thrown because two late
- event = new StreamEvent();
- event.setTimestamp(DateTimeUtil.humanDateToSeconds("2016-01-01 00:00:10")*1000);
- data = new Object[]{"value5"};
- event.setData(data);
- event.setStreamId(streamId);
- pEvent = new PartitionedEvent();
- pEvent.setPartition(sp);
- pEvent.setEvent(event);
- input = new TupleImpl(context, Collections.singletonList(pEvent), taskId, "default");
- bolt.execute(input);
-
- Assert.assertEquals("Should finally collect two streams",2,streamCollected.size());
- Assert.assertTrue("Should collect stream stream_testStreamRouterImpl_to_alertBolt1",streamCollected.keySet().contains("stream_testStreamRouterImpl_to_alertBolt1"));
- Assert.assertTrue("Should collect stream stream_testStreamRouterImpl_to_alertBolt2",streamCollected.keySet().contains("stream_testStreamRouterImpl_to_alertBolt2"));
-
- Assert.assertEquals("Should finally collect 3 events",3,orderCollected.size());
- Assert.assertArrayEquals("Should sort 3 events in ASC order",new String[]{"value2","value1","value3"},orderCollected.stream().map((d)->d.getData()[0]).toArray());
-
- // The first 3 events are ticked automatically by window
-
- bolt.cleanup();
-
- // Close will flush all events in memory, so will receive the last event which is still in memory as window is not expired according to clock
- // The 5th event will be thrown because too late and out of margin
-
- Assert.assertEquals("Should finally collect two streams",2,streamCollected.size());
- Assert.assertEquals("Should finally collect 3 events",4,orderCollected.size());
- Assert.assertArrayEquals("Should sort 4 events in ASC-ordered timestamp",new String[]{"value2","value1","value3","value4"},orderCollected.stream().map((d)->d.getData()[0]).toArray());
-
- }
-
- @SuppressWarnings("serial")
- public static class MockChangeService extends AbstractMetadataChangeNotifyService{
- private final static Logger LOG = LoggerFactory.getLogger(MockChangeService.class);
-
- @Override
- public void close() throws IOException {
- LOG.info("Closing");
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/java/org/apache/eagle/alert/engine/serialization/JavaSerializationTest.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/java/org/apache/eagle/alert/engine/serialization/JavaSerializationTest.java b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/java/org/apache/eagle/alert/engine/serialization/JavaSerializationTest.java
deleted file mode 100644
index 13d1015..0000000
--- a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/java/org/apache/eagle/alert/engine/serialization/JavaSerializationTest.java
+++ /dev/null
@@ -1,112 +0,0 @@
-package org.apache.eagle.alert.engine.serialization;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.List;
-
-import org.apache.commons.lang3.SerializationUtils;
-import org.apache.eagle.alert.engine.coordinator.StreamColumn;
-import org.apache.eagle.alert.engine.coordinator.StreamDefinition;
-import org.apache.eagle.alert.engine.coordinator.StreamPartition;
-import org.apache.eagle.alert.engine.model.PartitionedEvent;
-import org.apache.eagle.alert.engine.model.StreamEvent;
-import org.apache.eagle.alert.utils.ByteUtils;
-import org.junit.Assert;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-public class JavaSerializationTest {
- private final static Logger LOG = LoggerFactory.getLogger(JavaSerializationTest.class);
-
- @Test
- public void testJavaSerialization(){
- PartitionedEvent partitionedEvent = new PartitionedEvent();
- partitionedEvent.setPartitionKey(partitionedEvent.hashCode());
- partitionedEvent.setPartition(createSampleStreamGroupbyPartition("sampleStream",Arrays.asList("name","host")));
- StreamEvent event = new StreamEvent();
- event.setStreamId("sampleStream");
- event.setTimestamp(System.currentTimeMillis());
- event.setData(new Object[]{"CPU","LOCALHOST",true,Long.MAX_VALUE,60.0});
- partitionedEvent.setEvent(event);
-
- int javaSerializationLength = SerializationUtils.serialize(partitionedEvent).length;
- LOG.info("Java serialization length: {}, event: {}",javaSerializationLength,partitionedEvent);
-
- int compactLength = 0;
- compactLength += "sampleStream".getBytes().length;
- compactLength += ByteUtils.intToBytes(partitionedEvent.getPartition().hashCode()).length;
- compactLength += ByteUtils.longToBytes(partitionedEvent.getTimestamp()).length;
- compactLength += "CPU".getBytes().length;
- compactLength += "LOCALHOST".getBytes().length;
- compactLength += 1;
- compactLength += ByteUtils.longToBytes(Long.MAX_VALUE).length;
- compactLength += ByteUtils.doubleToBytes(60.0).length;
-
- LOG.info("Compact serialization length: {}, event: {}",compactLength,partitionedEvent);
- Assert.assertTrue(compactLength * 20 < javaSerializationLength);
- }
-
-
- public static StreamDefinition createSampleStreamDefinition(String streamId){
- StreamDefinition sampleStreamDefinition = new StreamDefinition();
- sampleStreamDefinition.setStreamId(streamId);
- sampleStreamDefinition.setTimeseries(true);
- sampleStreamDefinition.setValidate(true);
- sampleStreamDefinition.setDescription("Schema for "+streamId);
- List<StreamColumn> streamColumns = new ArrayList<>();
-
- streamColumns.add(new StreamColumn.Builder().name("name").type(StreamColumn.Type.STRING).build());
- streamColumns.add(new StreamColumn.Builder().name("host").type(StreamColumn.Type.STRING).build());
- streamColumns.add(new StreamColumn.Builder().name("flag").type(StreamColumn.Type.BOOL).build());
- streamColumns.add(new StreamColumn.Builder().name("data").type(StreamColumn.Type.LONG).build());
- streamColumns.add(new StreamColumn.Builder().name("value").type(StreamColumn.Type.DOUBLE).build());
- sampleStreamDefinition.setColumns(streamColumns);
- return sampleStreamDefinition;
- }
-
- public static StreamPartition createSampleStreamGroupbyPartition(String streamId, List<String> groupByField){
- StreamPartition streamPartition = new StreamPartition();
- streamPartition.setStreamId(streamId);
- streamPartition.setColumns(groupByField);
- streamPartition.setType(StreamPartition.Type.GROUPBY);
- return streamPartition;
- }
-
- @SuppressWarnings("serial")
- public static PartitionedEvent createSimpleStreamEvent() {
- StreamEvent event = StreamEvent.Builder()
- .schema(createSampleStreamDefinition("sampleStream_1"))
- .streamId("sampleStream_1")
- .timestamep(System.currentTimeMillis())
- .attributes(new HashMap<String,Object>(){{
- put("name","cpu");
- put("host","localhost");
- put("flag",true);
- put("value",60.0);
- put("data",Long.MAX_VALUE);
- put("unknown","unknown column value");
- }}).build();
- PartitionedEvent pEvent = new PartitionedEvent();
- pEvent.setEvent(event);
- pEvent.setPartition(createSampleStreamGroupbyPartition("sampleStream_1", Arrays.asList("name","host")));
- return pEvent;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/java/org/apache/eagle/alert/engine/serialization/PartitionedEventSerializerTest.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/java/org/apache/eagle/alert/engine/serialization/PartitionedEventSerializerTest.java b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/java/org/apache/eagle/alert/engine/serialization/PartitionedEventSerializerTest.java
deleted file mode 100644
index 0347d50..0000000
--- a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/java/org/apache/eagle/alert/engine/serialization/PartitionedEventSerializerTest.java
+++ /dev/null
@@ -1,220 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.engine.serialization;
-
-import backtype.storm.serialization.DefaultKryoFactory;
-import backtype.storm.serialization.DefaultSerializationDelegate;
-import com.esotericsoftware.kryo.*;
-import com.esotericsoftware.kryo.io.Input;
-import com.esotericsoftware.kryo.io.Output;
-import com.google.common.io.ByteArrayDataInput;
-import com.google.common.io.ByteArrayDataOutput;
-import com.google.common.io.ByteStreams;
-import org.apache.commons.lang.time.StopWatch;
-import org.apache.eagle.alert.engine.mock.MockSampleMetadataFactory;
-import org.apache.eagle.alert.engine.model.PartitionedEvent;
-import org.apache.eagle.alert.engine.serialization.impl.PartitionedEventSerializerImpl;
-import org.apache.eagle.alert.utils.TimePeriodUtils;
-import org.joda.time.Period;
-import org.junit.Assert;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.util.BitSet;
-
-
-public class PartitionedEventSerializerTest {
- private final static Logger LOG = LoggerFactory.getLogger(PartitionedEventSerializerTest.class);
- @Test
- public void testPartitionEventSerialization() throws IOException {
- PartitionedEvent partitionedEvent = MockSampleMetadataFactory.createPartitionedEventGroupedByName("sampleStream",System.currentTimeMillis());;
- PartitionedEventSerializerImpl serializer = new PartitionedEventSerializerImpl(MockSampleMetadataFactory::createSampleStreamDefinition);
-
- ByteArrayDataOutput dataOutput1 = ByteStreams.newDataOutput();
- serializer.serialize(partitionedEvent,dataOutput1);
- byte[] serializedBytes = dataOutput1.toByteArray();
- PartitionedEvent deserializedEvent = serializer.deserialize(ByteStreams.newDataInput(serializedBytes));
- Assert.assertEquals(partitionedEvent,deserializedEvent);
-
- PartitionedEventSerializerImpl compressSerializer = new PartitionedEventSerializerImpl(MockSampleMetadataFactory::createSampleStreamDefinition,true);
-
- byte[] serializedBytesCompressed = compressSerializer.serialize(partitionedEvent);
- PartitionedEvent deserializedEventCompressed = compressSerializer.deserialize(serializedBytesCompressed);
- Assert.assertEquals(partitionedEvent,deserializedEventCompressed);
-
- PartitionedEventDigestSerializer serializer2 = new PartitionedEventDigestSerializer(MockSampleMetadataFactory::createSampleStreamDefinition);
- ByteArrayDataOutput dataOutput2 = ByteStreams.newDataOutput();
- serializer2.serialize(partitionedEvent,dataOutput2);
- byte[] serializedBytes2 = dataOutput2.toByteArray();
- ByteArrayDataInput dataInput2 = ByteStreams.newDataInput(serializedBytes2);
- PartitionedEvent deserializedEvent2 = serializer2.deserialize(dataInput2);
- Assert.assertEquals(partitionedEvent,deserializedEvent2);
-
- byte[] javaSerialization = new DefaultSerializationDelegate().serialize(partitionedEvent);
- Kryo kryo = new DefaultKryoFactory.KryoSerializableDefault();
- Output output = new Output(10000);
- kryo.writeClassAndObject(output,partitionedEvent);
- byte[] kryoBytes = output.toBytes();
- Input input = new Input(kryoBytes);
- PartitionedEvent kryoDeserializedEvent = (PartitionedEvent) kryo.readClassAndObject(input);
- Assert.assertEquals(partitionedEvent,kryoDeserializedEvent);
- LOG.info("\nCached Stream:{}\nCompressed Cached Stream :{}\nCached Stream + Cached Partition: {}\nJava Native: {}\nKryo: {}\nKryo + Cached Stream: {}\nKryo + Cached Stream + Cached Partition: {}",serializedBytes.length,serializedBytesCompressed.length,serializedBytes2.length,javaSerialization.length,kryoBytes.length,kryoSerialize(serializedBytes).length,kryoSerialize(serializedBytes2).length);
- }
- @Test
- public void testPartitionEventSerializationEfficiency() throws IOException {
- PartitionedEvent partitionedEvent = MockSampleMetadataFactory.createPartitionedEventGroupedByName("sampleStream",System.currentTimeMillis());;
- PartitionedEventSerializerImpl serializer = new PartitionedEventSerializerImpl(MockSampleMetadataFactory::createSampleStreamDefinition);
-
- int count = 100000;
- StopWatch stopWatch = new StopWatch();
- stopWatch.start();
- int i = 0;
- while(i<count) {
- ByteArrayDataOutput dataOutput1 = ByteStreams.newDataOutput();
- serializer.serialize(partitionedEvent, dataOutput1);
- byte[] serializedBytes = dataOutput1.toByteArray();
- PartitionedEvent deserializedEvent = serializer.deserialize(ByteStreams.newDataInput(serializedBytes));
- Assert.assertEquals(partitionedEvent, deserializedEvent);
- i++;
- }
- stopWatch.stop();
- LOG.info("Cached Stream: {} ms",stopWatch.getTime());
- stopWatch.reset();
- PartitionedEventSerializerImpl compressSerializer = new PartitionedEventSerializerImpl(MockSampleMetadataFactory::createSampleStreamDefinition,true);
- i = 0;
- stopWatch.start();
- while(i<count) {
- byte[] serializedBytesCompressed = compressSerializer.serialize(partitionedEvent);
- PartitionedEvent deserializedEventCompressed = compressSerializer.deserialize(serializedBytesCompressed);
- Assert.assertEquals(partitionedEvent, deserializedEventCompressed);
- i++;
- }
- stopWatch.stop();
- LOG.info("Compressed Cached Stream: {} ms",stopWatch.getTime());
- stopWatch.reset();
-
- i = 0;
- stopWatch.start();
- while(i<count) {
- PartitionedEventDigestSerializer serializer2 = new PartitionedEventDigestSerializer(MockSampleMetadataFactory::createSampleStreamDefinition);
- ByteArrayDataOutput dataOutput2 = ByteStreams.newDataOutput();
- serializer2.serialize(partitionedEvent, dataOutput2);
- byte[] serializedBytes2 = dataOutput2.toByteArray();
- ByteArrayDataInput dataInput2 = ByteStreams.newDataInput(serializedBytes2);
- PartitionedEvent deserializedEvent2 = serializer2.deserialize(dataInput2);
- Assert.assertEquals(partitionedEvent, deserializedEvent2);
- i++;
- }
- stopWatch.stop();
- LOG.info("Cached Stream&Partition: {} ms",stopWatch.getTime());
- stopWatch.reset();
- i = 0;
- stopWatch.start();
- while(i<count) {
- byte[] javaSerialization = new DefaultSerializationDelegate().serialize(partitionedEvent);
- PartitionedEvent javaSerializedEvent = (PartitionedEvent) new DefaultSerializationDelegate().deserialize(javaSerialization);
- Assert.assertEquals(partitionedEvent, javaSerializedEvent);
- i++;
- }
- stopWatch.stop();
- LOG.info("Java Native: {} ms",stopWatch.getTime());
- stopWatch.reset();
- i = 0;
- stopWatch.start();
- Kryo kryo = new DefaultKryoFactory.KryoSerializableDefault();
- while(i<count) {
- Output output = new Output(10000);
- kryo.writeClassAndObject(output, partitionedEvent);
- byte[] kryoBytes = output.toBytes();
- Input input = new Input(kryoBytes);
- PartitionedEvent kryoDeserializedEvent = (PartitionedEvent) kryo.readClassAndObject(input);
- Assert.assertEquals(partitionedEvent, kryoDeserializedEvent);
- i++;
- }
- stopWatch.stop();
- LOG.info("Kryo: {} ms",stopWatch.getTime());
- }
-
- /**
- * Kryo Serialization Length = Length of byte[] + 2
- */
- @Test
- public void testKryoByteArraySerialization(){
- Kryo kryo = new DefaultKryoFactory.KryoSerializableDefault();
- byte[] bytes = new byte[]{0,1,2,3,4,5,6,7,8,9};
- Output output = new Output(1000);
- kryo.writeObject(output,bytes);
- Assert.assertEquals(bytes.length + 2,output.toBytes().length);
- }
-
- private byte[] kryoSerialize(Object object){
- Kryo kryo = new DefaultKryoFactory.KryoSerializableDefault();
- Output output = new Output(100000);
- kryo.writeClassAndObject(output,object);
- return output.toBytes();
- }
-
- @Test
- public void testBitSet(){
- BitSet bitSet = new BitSet();
- bitSet.set(0,true); // 1
- bitSet.set(1,false); // 0
- bitSet.set(2,true); // 1
- LOG.info("Bit Set Size: {}",bitSet.size());
- LOG.info("Bit Set Byte[]: {}",bitSet.toByteArray());
- LOG.info("Bit Set Byte[]: {}",bitSet.toLongArray());
- LOG.info("BitSet[0]: {}",bitSet.get(0));
- LOG.info("BitSet[1]: {}",bitSet.get(1));
- LOG.info("BitSet[1]: {}",bitSet.get(2));
-
- byte[] bytes = bitSet.toByteArray();
-
- BitSet bitSet2 = BitSet.valueOf(bytes);
-
- LOG.info("Bit Set Size: {}",bitSet2.size());
- LOG.info("Bit Set Byte[]: {}",bitSet2.toByteArray());
- LOG.info("Bit Set Byte[]: {}",bitSet2.toLongArray());
- LOG.info("BitSet[0]: {}",bitSet2.get(0));
- LOG.info("BitSet[1]: {}",bitSet2.get(1));
- LOG.info("BitSet[1]: {}",bitSet2.get(2));
-
-
- BitSet bitSet3 = new BitSet();
- bitSet3.set(0,true);
- Assert.assertEquals(1,bitSet3.length());
-
- BitSet bitSet4 = new BitSet();
- bitSet4.set(0,false);
- Assert.assertEquals(0,bitSet4.length());
- Assert.assertFalse(bitSet4.get(1));
- Assert.assertFalse(bitSet4.get(2));
- }
-
- @Test
- public void testPeriod(){
- Assert.assertEquals(30*60*1000, TimePeriodUtils.getMillisecondsOfPeriod(Period.parse("PT30m")));
- Assert.assertEquals(30*60*1000, TimePeriodUtils.getMillisecondsOfPeriod(Period.millis(30*60*1000)));
- Assert.assertEquals("PT1800S", Period.millis(30*60*1000).toString());
- }
-
- @Test
- public void testPartitionType(){
-
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/java/org/apache/eagle/alert/engine/siddhi/extension/AttributeCollectAggregatorTest.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/java/org/apache/eagle/alert/engine/siddhi/extension/AttributeCollectAggregatorTest.java b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/java/org/apache/eagle/alert/engine/siddhi/extension/AttributeCollectAggregatorTest.java
deleted file mode 100644
index f2f3b46..0000000
--- a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/java/org/apache/eagle/alert/engine/siddhi/extension/AttributeCollectAggregatorTest.java
+++ /dev/null
@@ -1,149 +0,0 @@
-/*
- *
- * * Licensed to the Apache Software Foundation (ASF) under one or more
- * * contributor license agreements. See the NOTICE file distributed with
- * * this work for additional information regarding copyright ownership.
- * * The ASF licenses this file to You under the Apache License, Version 2.0
- * * (the "License"); you may not use this file except in compliance with
- * * the License. You may obtain a copy of the License at
- * *
- * * http://www.apache.org/licenses/LICENSE-2.0
- * *
- * * Unless required by applicable law or agreed to in writing, software
- * * distributed under the License is distributed on an "AS IS" BASIS,
- * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * * See the License for the specific language governing permissions and
- * * limitations under the License.
- *
- */
-package org.apache.eagle.alert.engine.siddhi.extension;
-
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Random;
-
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.wso2.siddhi.core.ExecutionPlanRuntime;
-import org.wso2.siddhi.core.SiddhiManager;
-import org.wso2.siddhi.core.event.Event;
-import org.wso2.siddhi.core.stream.input.InputHandler;
-import org.wso2.siddhi.core.stream.output.StreamCallback;
-
-/**
- * @since Apr 1, 2016
- *
- */
-public class AttributeCollectAggregatorTest {
-
- private static final Logger logger = LoggerFactory.getLogger(AttributeCollectAggregatorTest.class);
-
- @Test
- public void test() throws Exception {
- String ql = "define stream s1(timestamp long, host string, type string);";
- ql += " from s1#window.externalTime(timestamp, 1 sec)";
- ql += " select eagle:collect(timestamp) as timestamps, eagle:collect(host) as hosts, type group by type insert into output;";
-
- SiddhiManager sm = new SiddhiManager();
- ExecutionPlanRuntime runtime = sm.createExecutionPlanRuntime(ql);
-
- InputHandler input = runtime.getInputHandler("s1");
- runtime.addCallback("output", new StreamCallback() {
-
- @Override
- public void receive(Event[] arg0) {
- logger.info("output event length:" + arg0.length);
-
- for (Event e : arg0) {
- StringBuilder sb = new StringBuilder("\t - [").append(e.getData().length).append("]");
- for (Object o : e.getData()) {
- sb.append("," + o);
- }
- logger.info(sb.toString());
- }
- logger.info("===end===");
- }
- });
-// StreamDefinition definition = (StreamDefinition) runtime.getStreamDefinitionMap().get("output");
-
- runtime.start();
-
- Event[] events = generateEvents();
- for (Event e : events) {
- input.send(e);
- }
-
- Thread.sleep(1000);
-
- }
-
- private Event[] generateEvents() {
- List<Event> events = new LinkedList<Event>();
-
- Random r = new Random();
- Event e = null;
- long base = System.currentTimeMillis();
- {
- e = new Event(base, new Object[] { base, "host" + r.nextInt(), "nova" });
- base += 100;
- events.add(e);
- e = new Event(base, new Object[] { base, "host" + r.nextInt(), "nova" });
- base += 100;
- events.add(e);
- e = new Event(base, new Object[] { base, "host" + r.nextInt(), "nova" });
- base += 100;
- events.add(e);
- }
-
- {
- e = new Event(base, new Object[] { base, "host" + r.nextInt(), "neutron" });
- base += 100;
- events.add(e);
- e = new Event(base, new Object[] { base, "host" + r.nextInt(), "neutron" });
- base += 100;
- events.add(e);
- e = new Event(base, new Object[] { base, "host" + r.nextInt(), "neutron" });
- base += 100;
- events.add(e);
- }
-
- base += 10000;
- {
- e = new Event(base, new Object[] { base, "host" + r.nextInt(), "nova1" });
- base += 100;
- events.add(e);
- e = new Event(base, new Object[] { base, "host" + r.nextInt(), "nova1" });
- base += 100;
- events.add(e);
- e = new Event(base, new Object[] { base, "host" + r.nextInt(), "nova1" });
- base += 100;
- events.add(e);
- }
-
- {
- e = new Event(base, new Object[] { base, "host" + r.nextInt(), "neutron2" });
- base += 100;
- events.add(e);
- e = new Event(base, new Object[] { base, "host" + r.nextInt(), "neutron2" });
- base += 100;
- events.add(e);
- e = new Event(base, new Object[] { base, "host" + r.nextInt(), "neutron2" });
- base += 100;
- events.add(e);
- }
- base += 10000;
- e = new Event(base, new Object[] { base, "host" + r.nextInt(), "mq" });
-
- return events.toArray(new Event[0]);
- }
-
- @Test
- public void testQuery() {
- String ql = "define stream perfmon_input_stream_cpu ( host string,timestamp long,metric string,pool string,value double,colo string );";
- ql += "from perfmon_input_stream_cpu#window.length(3) select host, min(value) as min group by host having min>91.0 insert into perfmon_output_stream_cpu;";
-
- SiddhiManager sm = new SiddhiManager();
- sm.createExecutionPlanRuntime(ql);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/java/org/apache/eagle/alert/engine/sorter/MapDBTestSuite.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/java/org/apache/eagle/alert/engine/sorter/MapDBTestSuite.java b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/java/org/apache/eagle/alert/engine/sorter/MapDBTestSuite.java
deleted file mode 100644
index 613be00..0000000
--- a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/java/org/apache/eagle/alert/engine/sorter/MapDBTestSuite.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.engine.sorter;
-
-import org.junit.Assert;
-import org.junit.Test;
-import org.mapdb.BTreeMap;
-import org.mapdb.DB;
-import org.mapdb.DBMaker;
-import org.mapdb.Serializer;
-
-public class MapDBTestSuite {
- @Test
- public void testOnHeapDB(){
- DB db = DBMaker.heapDB().make();
- BTreeMap<Long,String> map = db.treeMap("btree").keySerializer(Serializer.LONG).valueSerializer(Serializer.STRING).create();
- Assert.assertFalse(map.putIfAbsentBoolean(1L,"val_1"));
- Assert.assertTrue(map.putIfAbsentBoolean(1L,"val_2"));
- Assert.assertTrue(map.putIfAbsentBoolean(1L,"val_3"));
- Assert.assertFalse(map.putIfAbsentBoolean(2L,"val_4"));
-
- Assert.assertEquals("val_1",map.get(1L));
- Assert.assertEquals("val_4",map.get(2L));
-
- Assert.assertTrue(map.replace(2L,"val_4","val_5"));
- Assert.assertEquals("val_5",map.get(2L));
-
- map.close();
- db.close();
- }
-}