You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@eagle.apache.org by ha...@apache.org on 2017/04/03 11:54:22 UTC
[14/84] [partial] eagle git commit: Clean repo for eagle site
http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/router/TestAlertPublisherBolt.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/router/TestAlertPublisherBolt.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/router/TestAlertPublisherBolt.java
deleted file mode 100644
index 46517fe..0000000
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/router/TestAlertPublisherBolt.java
+++ /dev/null
@@ -1,303 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.eagle.alert.engine.router;
-
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.eagle.alert.coordination.model.PublishSpec;
-import org.apache.eagle.alert.engine.coordinator.PolicyDefinition;
-import org.apache.eagle.alert.engine.coordinator.PublishPartition;
-import org.apache.eagle.alert.engine.coordinator.Publishment;
-import org.apache.eagle.alert.engine.coordinator.StreamColumn;
-import org.apache.eagle.alert.engine.coordinator.StreamDefinition;
-import org.apache.eagle.alert.engine.model.AlertStreamEvent;
-import org.apache.eagle.alert.engine.publisher.AlertPublishPlugin;
-import org.apache.eagle.alert.engine.publisher.AlertPublisher;
-import org.apache.eagle.alert.engine.publisher.impl.AlertPublishPluginsFactory;
-import org.apache.eagle.alert.engine.publisher.impl.AlertPublisherImpl;
-import org.apache.eagle.alert.engine.runner.AlertPublisherBolt;
-import org.apache.eagle.alert.engine.runner.MapComparator;
-import org.apache.eagle.alert.engine.utils.MetadataSerDeser;
-import org.junit.Assert;
-import org.junit.Ignore;
-import org.junit.Test;
-
-import com.fasterxml.jackson.databind.JavaType;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.databind.type.CollectionType;
-import com.fasterxml.jackson.databind.type.SimpleType;
-import com.typesafe.config.Config;
-import com.typesafe.config.ConfigFactory;
-
-/**
- * @Since 5/14/16.
- */
-public class TestAlertPublisherBolt {
-
- @SuppressWarnings("rawtypes")
- @Ignore
- @Test
- public void test() {
- Config config = ConfigFactory.load("application-test.conf");
- AlertPublisher publisher = new AlertPublisherImpl("alertPublishBolt");
- publisher.init(config, new HashMap());
- PublishSpec spec = MetadataSerDeser.deserialize(getClass().getResourceAsStream("/testPublishSpec.json"), PublishSpec.class);
- publisher.onPublishChange(spec.getPublishments(), null, null, null);
- AlertStreamEvent event = create("testAlertStream");
- publisher.nextEvent(new PublishPartition(event.getStreamId(), event.getPolicyId(),
- spec.getPublishments().get(0).getName(), spec.getPublishments().get(0).getPartitionColumns()), event);
- AlertStreamEvent event1 = create("testAlertStream");
- publisher.nextEvent(new PublishPartition(event1.getStreamId(), event1.getPolicyId(),
- spec.getPublishments().get(0).getName(), spec.getPublishments().get(0).getPartitionColumns()), event1);
- }
-
- private AlertStreamEvent create(String streamId) {
- AlertStreamEvent alert = new AlertStreamEvent();
- PolicyDefinition policy = new PolicyDefinition();
- policy.setName("policy1");
- alert.setPolicyId(policy.getName());
- alert.setCreatedTime(System.currentTimeMillis());
- alert.setData(new Object[] {"field_1", 2, "field_3"});
- alert.setStreamId(streamId);
- alert.setCreatedBy(this.toString());
- return alert;
- }
-
-
- @Test
- public void testMapComparatorAdded() {
-
- PublishSpec spec1 = MetadataSerDeser.deserialize(getClass().getResourceAsStream("/testPublishForAdd1.json"), PublishSpec.class);
- PublishSpec spec2 = MetadataSerDeser.deserialize(getClass().getResourceAsStream("/testPublishForAdd0.json"), PublishSpec.class);
-
- Map<String, Publishment> map1 = new HashMap<>();
- Map<String, Publishment> map2 = new HashMap<>();
- spec1.getPublishments().forEach(p -> map1.put(p.getName(), p));
- spec2.getPublishments().forEach(p -> map2.put(p.getName(), p));
-
- MapComparator<String, Publishment> comparator = new MapComparator<>(map1, map2);
- comparator.compare();
- Assert.assertTrue(comparator.getAdded().size() == 1);
-
- }
-
- @Test
- public void testMapComparatorRemoved() {
-
- PublishSpec spec1 = MetadataSerDeser.deserialize(getClass().getResourceAsStream("/testPublishForAdd0.json"), PublishSpec.class);
- PublishSpec spec2 = MetadataSerDeser.deserialize(getClass().getResourceAsStream("/testPublishForAdd1.json"), PublishSpec.class);
-
- Map<String, Publishment> map1 = new HashMap<>();
- Map<String, Publishment> map2 = new HashMap<>();
- spec1.getPublishments().forEach(p -> map1.put(p.getName(), p));
- spec2.getPublishments().forEach(p -> map2.put(p.getName(), p));
-
- MapComparator<String, Publishment> comparator = new MapComparator<>(map1, map2);
- comparator.compare();
- Assert.assertTrue(comparator.getRemoved().size() == 1);
-
- }
-
- @Test
- public void testMapComparatorModified() {
-
- PublishSpec spec1 = MetadataSerDeser.deserialize(getClass().getResourceAsStream("/testPublishForAdd0.json"), PublishSpec.class);
- PublishSpec spec2 = MetadataSerDeser.deserialize(getClass().getResourceAsStream("/testPublishForMdyValue.json"), PublishSpec.class);
-
- Map<String, Publishment> map1 = new HashMap<>();
- Map<String, Publishment> map2 = new HashMap<>();
- spec1.getPublishments().forEach(p -> map1.put(p.getName(), p));
- spec2.getPublishments().forEach(p -> map2.put(p.getName(), p));
-
- MapComparator<String, Publishment> comparator = new MapComparator<>(map1, map2);
- comparator.compare();
- Assert.assertTrue(comparator.getModified().size() == 1);
-
- }
-
-
- @Test
- public void testMapComparator() {
- PublishSpec spec1 = MetadataSerDeser.deserialize(getClass().getResourceAsStream("/testPublishSpec.json"), PublishSpec.class);
- PublishSpec spec2 = MetadataSerDeser.deserialize(getClass().getResourceAsStream("/testPublishSpec2.json"), PublishSpec.class);
- PublishSpec spec3 = MetadataSerDeser.deserialize(getClass().getResourceAsStream("/testPublishSpec3.json"), PublishSpec.class);
- Map<String, Publishment> map1 = new HashMap<>();
- Map<String, Publishment> map2 = new HashMap<>();
- spec1.getPublishments().forEach(p -> map1.put(p.getName(), p));
- spec2.getPublishments().forEach(p -> map2.put(p.getName(), p));
-
- MapComparator<String, Publishment> comparator = new MapComparator<>(map1, map2);
- comparator.compare();
- Assert.assertTrue(comparator.getModified().size() == 1);
-
- AlertPublisherBolt publisherBolt = new AlertPublisherBolt("alert-publisher-test", null, null);
- publisherBolt.onAlertPublishSpecChange(spec1, null);
- publisherBolt.onAlertPublishSpecChange(spec2, null);
- publisherBolt.onAlertPublishSpecChange(spec3, null);
- }
-
- @SuppressWarnings("rawtypes")
- @Test
- public void testAlertPublisher() throws Exception {
- AlertPublisher alertPublisher = new AlertPublisherImpl("alert-publisher-test");
- Config config = ConfigFactory.load("application-test.conf");
- alertPublisher.init(config, new HashMap());
- List<Publishment> oldPubs = loadEntities("/publishments1.json", Publishment.class);
- List<Publishment> newPubs = loadEntities("/publishments2.json", Publishment.class);
- alertPublisher.onPublishChange(oldPubs, null, null, null);
- alertPublisher.onPublishChange(null, null, newPubs, oldPubs);
- }
-
- private <T> List<T> loadEntities(String path, Class<T> tClz) throws Exception {
- ObjectMapper objectMapper = new ObjectMapper();
- JavaType type = CollectionType.construct(List.class, SimpleType.construct(tClz));
- List<T> l = objectMapper.readValue(TestAlertPublisherBolt.class.getResourceAsStream(path), type);
- return l;
- }
-
- private AlertStreamEvent createWithStreamDef(String hostname, String appName, String state) {
- AlertStreamEvent alert = new AlertStreamEvent();
- PolicyDefinition policy = new PolicyDefinition();
- policy.setName("perfmon_cpu_host_check");
- alert.setPolicyId(policy.getName());
- alert.setCreatedTime(System.currentTimeMillis());
- alert.setData(new Object[] {appName, hostname, state});
- alert.setStreamId("testAlertStream");
- alert.setCreatedBy(this.toString());
-
- // build stream definition
- StreamDefinition sd = new StreamDefinition();
- StreamColumn appColumn = new StreamColumn();
- appColumn.setName("appname");
- appColumn.setType(StreamColumn.Type.STRING);
-
- StreamColumn hostColumn = new StreamColumn();
- hostColumn.setName("hostname");
- hostColumn.setType(StreamColumn.Type.STRING);
-
- StreamColumn stateColumn = new StreamColumn();
- stateColumn.setName("state");
- stateColumn.setType(StreamColumn.Type.STRING);
-
- sd.setColumns(Arrays.asList(appColumn, hostColumn, stateColumn));
-
- alert.setSchema(sd);
- return alert;
- }
-
- @Test
- public void testCustomFieldDedupEvent() throws Exception {
- List<Publishment> pubs = loadEntities("/router/publishments.json", Publishment.class);
-
- AlertPublishPlugin plugin = AlertPublishPluginsFactory.createNotificationPlugin(pubs.get(0), null, null);
- AlertStreamEvent event1 = createWithStreamDef("host1", "testapp1", "OPEN");
- AlertStreamEvent event2 = createWithStreamDef("host2", "testapp1", "OPEN");
- AlertStreamEvent event3 = createWithStreamDef("host2", "testapp2", "CLOSE");
-
- Assert.assertNotNull(plugin.dedup(event1));
- Assert.assertNull(plugin.dedup(event2));
- Assert.assertNotNull(plugin.dedup(event3));
- }
-
- @Test
- public void testEmptyCustomFieldDedupEvent() throws Exception {
- List<Publishment> pubs = loadEntities("/router/publishments-empty-dedup-field.json", Publishment.class);
-
- AlertPublishPlugin plugin = AlertPublishPluginsFactory.createNotificationPlugin(pubs.get(0), null, null);
- AlertStreamEvent event1 = createWithStreamDef("host1", "testapp1", "OPEN");
- AlertStreamEvent event2 = createWithStreamDef("host1", "testapp1", "OPEN");
-
- Assert.assertNotNull(plugin.dedup(event1));
- Assert.assertNull(plugin.dedup(event2));
- }
-
- private AlertStreamEvent createSeverityWithStreamDef(String hostname, String appName, String message, String severity, String docId, String df_device, String df_type, String colo) {
- AlertStreamEvent alert = new AlertStreamEvent();
- PolicyDefinition policy = new PolicyDefinition();
- policy.setName("switch_check");
- alert.setPolicyId(policy.getName());
- alert.setCreatedTime(System.currentTimeMillis());
- alert.setData(new Object[] {appName, hostname, message, severity, docId, df_device, df_type, colo});
- alert.setStreamId("testAlertStream");
- alert.setCreatedBy(this.toString());
-
- // build stream definition
- StreamDefinition sd = new StreamDefinition();
- StreamColumn appColumn = new StreamColumn();
- appColumn.setName("appname");
- appColumn.setType(StreamColumn.Type.STRING);
-
- StreamColumn hostColumn = new StreamColumn();
- hostColumn.setName("hostname");
- hostColumn.setType(StreamColumn.Type.STRING);
-
- StreamColumn msgColumn = new StreamColumn();
- msgColumn.setName("message");
- msgColumn.setType(StreamColumn.Type.STRING);
-
- StreamColumn severityColumn = new StreamColumn();
- severityColumn.setName("severity");
- severityColumn.setType(StreamColumn.Type.STRING);
-
- StreamColumn docIdColumn = new StreamColumn();
- docIdColumn.setName("docId");
- docIdColumn.setType(StreamColumn.Type.STRING);
-
- StreamColumn deviceColumn = new StreamColumn();
- deviceColumn.setName("df_device");
- deviceColumn.setType(StreamColumn.Type.STRING);
-
- StreamColumn deviceTypeColumn = new StreamColumn();
- deviceTypeColumn.setName("df_type");
- deviceTypeColumn.setType(StreamColumn.Type.STRING);
-
- StreamColumn coloColumn = new StreamColumn();
- coloColumn.setName("dc");
- coloColumn.setType(StreamColumn.Type.STRING);
-
- sd.setColumns(Arrays.asList(appColumn, hostColumn, msgColumn, severityColumn, docIdColumn, deviceColumn, deviceTypeColumn, coloColumn));
-
- alert.setSchema(sd);
- return alert;
- }
-
- @Test
- public void testSlackPublishment() throws Exception {
- Config config = ConfigFactory.load("application-test.conf");
- AlertPublisher publisher = new AlertPublisherImpl("alertPublishBolt");
- publisher.init(config, new HashMap());
- List<Publishment> pubs = loadEntities("/router/publishments-slack.json", Publishment.class);
- publisher.onPublishChange(pubs, null, null, null);
-
- AlertStreamEvent event1 = createSeverityWithStreamDef("switch1", "testapp1", "Memory 1 inconsistency detected", "WARNING", "docId1", "ed01", "distribution switch", "us");
- AlertStreamEvent event2 = createSeverityWithStreamDef("switch2", "testapp2", "Memory 2 inconsistency detected", "CRITICAL", "docId2", "ed02", "distribution switch", "us");
- AlertStreamEvent event3 = createSeverityWithStreamDef("switch2", "testapp2", "Memory 3 inconsistency detected", "WARNING", "docId3", "ed02", "distribution switch", "us");
-
- publisher.nextEvent(new PublishPartition(event1.getStreamId(), event1.getPolicyId(),
- pubs.get(0).getName(), pubs.get(0).getPartitionColumns()), event1);
- publisher.nextEvent(new PublishPartition(event2.getStreamId(), event2.getPolicyId(),
- pubs.get(0).getName(), pubs.get(0).getPartitionColumns()), event2);
- publisher.nextEvent(new PublishPartition(event3.getStreamId(), event3.getPolicyId(),
- pubs.get(0).getName(), pubs.get(0).getPartitionColumns()), event3);
-
- }
-}
http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/router/TestStreamRouterBoltOutputCollector.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/router/TestStreamRouterBoltOutputCollector.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/router/TestStreamRouterBoltOutputCollector.java
deleted file mode 100644
index 704857d..0000000
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/router/TestStreamRouterBoltOutputCollector.java
+++ /dev/null
@@ -1,176 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.eagle.alert.engine.router;
-
-import backtype.storm.metric.api.MultiCountMetric;
-import backtype.storm.task.IOutputCollector;
-import backtype.storm.task.OutputCollector;
-import backtype.storm.task.TopologyContext;
-import backtype.storm.tuple.Tuple;
-import org.apache.eagle.alert.coordination.model.PolicyWorkerQueue;
-import org.apache.eagle.alert.coordination.model.StreamRouterSpec;
-import org.apache.eagle.alert.coordination.model.WorkSlot;
-import org.apache.eagle.alert.engine.StreamContext;
-import org.apache.eagle.alert.engine.StreamContextImpl;
-import org.apache.eagle.alert.engine.coordinator.StreamColumn;
-import org.apache.eagle.alert.engine.coordinator.StreamDefinition;
-import org.apache.eagle.alert.engine.coordinator.StreamPartition;
-
-import org.apache.eagle.alert.engine.model.PartitionedEvent;
-import org.apache.eagle.alert.engine.model.StreamEvent;
-import org.apache.eagle.alert.engine.router.impl.StormOutputCollector;
-import org.apache.eagle.alert.engine.router.impl.StreamRouterBoltOutputCollector;
-import org.junit.Assert;
-import org.junit.Test;
-import org.mockito.Mockito;
-
-import java.text.ParseException;
-import java.util.*;
-
-import static org.mockito.Matchers.any;
-import static org.mockito.Mockito.when;
-
-public class TestStreamRouterBoltOutputCollector {
-
- @Test
- public void testStreamRouterCollector() throws ParseException {
- String streamId = "HDFS_AUDIT_LOG_ENRICHED_STREAM_SANDBOX";
- StreamPartition partition = new StreamPartition();
- partition.setStreamId(streamId);
- partition.setType(StreamPartition.Type.GROUPBY);
- partition.setColumns(new ArrayList<String>() {{
- add("col1");
- }});
-
- // begin to create two router specs
- WorkSlot worker1 = new WorkSlot("ALERT_UNIT_TOPOLOGY_APP_SANDBOX", "alertBolt1");
- WorkSlot worker2 = new WorkSlot("ALERT_UNIT_TOPOLOGY_APP_SANDBOX", "alertBolt2");
-
- PolicyWorkerQueue queue1 = new PolicyWorkerQueue();
- queue1.setPartition(partition);
- queue1.setWorkers(new ArrayList<WorkSlot>() {
- {
- add(worker1);
- }
- });
-
- PolicyWorkerQueue queue2 = new PolicyWorkerQueue();
- queue2.setPartition(partition);
- queue2.setWorkers(new ArrayList<WorkSlot>() {
- {
- add(worker1);
- add(worker2);
- }
- });
-
- StreamRouterSpec spec1 = new StreamRouterSpec();
- spec1.setStreamId(streamId);
- spec1.setPartition(partition);
-
- spec1.setTargetQueue(new ArrayList<PolicyWorkerQueue>() {{
- add(queue1);
- }});
-
- StreamRouterSpec spec2 = new StreamRouterSpec();
- spec2.setStreamId(streamId);
- spec2.setPartition(partition);
-
- spec2.setTargetQueue(new ArrayList<PolicyWorkerQueue>() {{
- add(queue2);
- }});
-
- // the end of creating
-
- List<String> targetStreamIds = new ArrayList<>();
- IOutputCollector delegate = new IOutputCollector() {
-
- @Override
- public void reportError(Throwable error) {
-
- }
-
- @Override
- public List<Integer> emit(String streamId, Collection<Tuple> anchors, List<Object> tuple) {
- targetStreamIds.add(streamId);
- return null;
- }
-
- @Override
- public void emitDirect(int taskId, String streamId, Collection<Tuple> anchors, List<Object> tuple) {
- }
-
- @Override
- public void ack(Tuple input) {
- }
-
- @Override
- public void fail(Tuple input) {
- }
-
- };
-
- List<StreamRouterSpec> list1 = new ArrayList<>();
- list1.add(spec1);
-
- List<StreamRouterSpec> list2 = new ArrayList<>();
- list2.add(spec2);
-
- // construct StreamDefinition
- StreamDefinition schema = new StreamDefinition();
- schema.setStreamId(streamId);
- StreamColumn column = new StreamColumn();
- column.setName("col1");
- column.setType(StreamColumn.Type.STRING);
- schema.setColumns(Collections.singletonList(column));
- Map<String, StreamDefinition> sds = new HashMap<>();
- sds.put(schema.getStreamId(), schema);
-
- // create two events
- StreamEvent event1 = new StreamEvent();
- Object[] data = new Object[]{"value1"};
- event1.setData(data);
- event1.setStreamId(streamId);
- PartitionedEvent pEvent1 = new PartitionedEvent();
- pEvent1.setEvent(event1);
- pEvent1.setPartition(partition);
-
- StreamEvent event2 = new StreamEvent();
- Object[] data2 = new Object[]{"value3"};
- event2.setData(data2);
- event2.setStreamId(streamId);
- PartitionedEvent pEvent2 = new PartitionedEvent();
- pEvent2.setEvent(event2);
- pEvent2.setPartition(partition);
-
- TopologyContext context = Mockito.mock(TopologyContext.class);
- when(context.registerMetric(any(String.class), any(MultiCountMetric.class), any(int.class))).thenReturn(new MultiCountMetric());
- StreamContext streamContext = new StreamContextImpl(null, context.registerMetric("eagle.router", new MultiCountMetric(), 60), context);
- StreamRouterBoltOutputCollector collector = new StreamRouterBoltOutputCollector("test", new StormOutputCollector(new OutputCollector(delegate), null), null, streamContext);
-
- // add a StreamRouterSpec which has one worker
- collector.onStreamRouterSpecChange(list1, new ArrayList<>(), new ArrayList<>(), sds);
- collector.emit(pEvent1);
- Assert.assertTrue(targetStreamIds.size() == 1);
-
- // modify the StreamRouterSpec to contain two workers
- collector.onStreamRouterSpecChange(new ArrayList<>(), new ArrayList<>(), list2, sds);
- collector.emit(pEvent2);
- Assert.assertTrue(targetStreamIds.size() == 2);
- }
-}
http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/runner/TestStreamRouterBolt.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/runner/TestStreamRouterBolt.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/runner/TestStreamRouterBolt.java
deleted file mode 100644
index a480fcf..0000000
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/runner/TestStreamRouterBolt.java
+++ /dev/null
@@ -1,273 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.engine.runner;
-
-import backtype.storm.metric.api.MultiCountMetric;
-import backtype.storm.task.GeneralTopologyContext;
-import backtype.storm.task.IOutputCollector;
-import backtype.storm.task.OutputCollector;
-import backtype.storm.task.TopologyContext;
-import backtype.storm.tuple.Fields;
-import backtype.storm.tuple.Tuple;
-import backtype.storm.tuple.TupleImpl;
-import com.typesafe.config.Config;
-import com.typesafe.config.ConfigFactory;
-import org.apache.eagle.alert.coordination.model.PolicyWorkerQueue;
-import org.apache.eagle.alert.coordination.model.RouterSpec;
-import org.apache.eagle.alert.coordination.model.StreamRouterSpec;
-import org.apache.eagle.alert.coordination.model.WorkSlot;
-import org.apache.eagle.alert.engine.coordinator.StreamColumn;
-import org.apache.eagle.alert.engine.coordinator.StreamDefinition;
-import org.apache.eagle.alert.engine.coordinator.StreamPartition;
-import org.apache.eagle.alert.engine.coordinator.StreamSortSpec;
-import org.apache.eagle.alert.engine.coordinator.impl.AbstractMetadataChangeNotifyService;
-import org.apache.eagle.alert.engine.model.PartitionedEvent;
-import org.apache.eagle.alert.engine.model.StreamEvent;
-import org.apache.eagle.alert.utils.StreamIdConversion;
-import org.apache.eagle.common.DateTimeUtil;
-
-import org.joda.time.Period;
-import org.junit.Assert;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.util.*;
-
-import static org.mockito.Matchers.any;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
-public class TestStreamRouterBolt {
- private final static Logger LOG = LoggerFactory.getLogger(TestStreamRouterBolt.class);
-
- /**
- * Mocked 5 Events
- * <p>
- * 1. Sent in random order:
- * "value1","value2","value3","value4","value5"
- * <p>
- * 2. Received correct time order and value5 is thrown because too late: "value2","value1","value3","value4"
- *
- * @throws Exception
- */
- @SuppressWarnings("rawtypes")
- @Test
- public void testRouterWithSortAndRouteSpec() throws Exception {
- Config config = ConfigFactory.load();
- MockChangeService mockChangeService = new MockChangeService();
- StreamRouterBolt routerBolt = new StreamRouterBolt("routerBolt1", config, mockChangeService);
-
- final Map<String, List<PartitionedEvent>> streamCollected = new HashMap<>();
- final List<PartitionedEvent> orderCollected = new ArrayList<>();
-
- OutputCollector collector = new OutputCollector(new IOutputCollector() {
- int count = 0;
-
- @Override
- public List<Integer> emit(String streamId, Collection<Tuple> anchors, List<Object> tuple) {
- PartitionedEvent event;
- try {
- event = routerBolt.deserialize(tuple.get(0));
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
- if (count == 0) {
- count++;
- }
- LOG.info(String.format("Collector received: [streamId=[%s], tuple=[%s] ", streamId, tuple));
- if (!streamCollected.containsKey(streamId)) {
- streamCollected.put(streamId, new ArrayList<>());
- }
- streamCollected.get(streamId).add(event);
- orderCollected.add(event);
- return null;
- }
-
- @Override
- public void emitDirect(int taskId, String streamId, Collection<Tuple> anchors, List<Object> tuple) {
- }
-
- @Override
- public void ack(Tuple input) {
- }
-
- @Override
- public void fail(Tuple input) {
- }
-
- @SuppressWarnings("unused")
- public void resetTimeout(Tuple input) {
- }
-
- @Override
- public void reportError(Throwable error) {
- }
- });
-
- Map stormConf = new HashMap<>();
- TopologyContext topologyContext = mock(TopologyContext.class);
- when(topologyContext.registerMetric(any(String.class), any(MultiCountMetric.class), any(int.class))).thenReturn(new MultiCountMetric());
- routerBolt.prepare(stormConf, topologyContext, collector);
-
- String streamId = "cpuUsageStream";
- // StreamPartition, groupby col1 for stream cpuUsageStream
- StreamPartition sp = new StreamPartition();
- sp.setStreamId(streamId);
- sp.setColumns(Collections.singletonList("col1"));
- sp.setType(StreamPartition.Type.GROUPBY);
-
- StreamSortSpec sortSpec = new StreamSortSpec();
-// sortSpec.setColumn("timestamp");
-// sortSpec.setOrder("asc");
- sortSpec.setWindowPeriod2(Period.minutes(1));
- sortSpec.setWindowMargin(1000);
- sp.setSortSpec(sortSpec);
-
- RouterSpec boltSpec = new RouterSpec();
-
- // set StreamRouterSpec to have 2 WorkSlot
- StreamRouterSpec routerSpec = new StreamRouterSpec();
- routerSpec.setPartition(sp);
- routerSpec.setStreamId(streamId);
- PolicyWorkerQueue queue = new PolicyWorkerQueue();
- queue.setPartition(sp);
- queue.setWorkers(Arrays.asList(new WorkSlot("testTopology", "alertBolt1"), new WorkSlot("testTopology", "alertBolt2")));
- routerSpec.setTargetQueue(Collections.singletonList(queue));
- boltSpec.addRouterSpec(routerSpec);
- boltSpec.setVersion("version1");
-
- // construct StreamDefinition
- StreamDefinition schema = new StreamDefinition();
- schema.setStreamId(streamId);
- StreamColumn column = new StreamColumn();
- column.setName("col1");
- column.setType(StreamColumn.Type.STRING);
- schema.setColumns(Collections.singletonList(column));
- Map<String, StreamDefinition> sds = new HashMap<>();
- sds.put(schema.getStreamId(), schema);
-
- routerBolt.declareOutputStreams(Arrays.asList("alertBolt1", "alertBolt2"));
- routerBolt.onStreamRouteBoltSpecChange(boltSpec, sds);
- GeneralTopologyContext context = mock(GeneralTopologyContext.class);
- int taskId = 1;
- when(context.getComponentId(taskId)).thenReturn("comp1");
- when(context.getComponentOutputFields("comp1", "default")).thenReturn(new Fields("f0"));
-
- // =======================================
- // Mock 5 Events
- //
- // 1. Sent in random order:
- // "value1","value2","value3","value4","value5"
- //
- // 2. Received correct time order and value5 is thrown because too:
- // "value2","value1","value3","value4"
- // =======================================
-
- // construct event with "value1"
- StreamEvent event = new StreamEvent();
- event.setTimestamp(DateTimeUtil.humanDateToSeconds("2016-01-01 00:01:30") * 1000);
- Object[] data = new Object[] {"value1"};
- event.setData(data);
- event.setStreamId(streamId);
- PartitionedEvent pEvent = new PartitionedEvent();
- pEvent.setEvent(event);
- pEvent.setPartition(sp);
- Tuple input = new TupleImpl(context, Collections.singletonList(pEvent), taskId, "default");
- routerBolt.execute(input);
-
- // construct another event with "value2"
- event = new StreamEvent();
- event.setTimestamp(DateTimeUtil.humanDateToSeconds("2016-01-01 00:01:10") * 1000);
- data = new Object[] {"value2"};
- event.setData(data);
- event.setStreamId(streamId);
- pEvent = new PartitionedEvent();
- pEvent.setPartition(sp);
- pEvent.setEvent(event);
- input = new TupleImpl(context, Collections.singletonList(pEvent), taskId, "default");
- routerBolt.execute(input);
-
- // construct another event with "value3"
- event = new StreamEvent();
- event.setTimestamp(DateTimeUtil.humanDateToSeconds("2016-01-01 00:01:40") * 1000);
- data = new Object[] {"value3"};
- event.setData(data);
- event.setStreamId(streamId);
- pEvent = new PartitionedEvent();
- pEvent.setPartition(sp);
- pEvent.setEvent(event);
- input = new TupleImpl(context, Collections.singletonList(pEvent), taskId, "default");
- routerBolt.execute(input);
-
- // construct another event with "value4"
- event = new StreamEvent();
- event.setTimestamp(DateTimeUtil.humanDateToSeconds("2016-01-01 00:02:10") * 1000);
- data = new Object[] {"value4"};
- event.setData(data);
- event.setStreamId(streamId);
- pEvent = new PartitionedEvent();
- pEvent.setPartition(sp);
- pEvent.setEvent(event);
- input = new TupleImpl(context, Collections.singletonList(pEvent), taskId, "default");
- routerBolt.execute(input);
-
- // construct another event with "value5", which will be thrown because two late
- event = new StreamEvent();
- event.setTimestamp(DateTimeUtil.humanDateToSeconds("2016-01-01 00:00:10") * 1000);
- data = new Object[] {"value5"};
- event.setData(data);
- event.setStreamId(streamId);
- pEvent = new PartitionedEvent();
- pEvent.setPartition(sp);
- pEvent.setEvent(event);
- input = new TupleImpl(context, Collections.singletonList(pEvent), taskId, "default");
- routerBolt.execute(input);
-
- Assert.assertEquals("Should finally collect two streams", 2, streamCollected.size());
- Assert.assertTrue("Should collect stream stream_routerBolt_to_alertBolt1", streamCollected.keySet().contains(
- String.format(StreamIdConversion.generateStreamIdBetween(routerBolt.getBoltId(), "alertBolt1"))));
- Assert.assertTrue("Should collect stream stream_routerBolt_to_alertBolt2", streamCollected.keySet().contains(
- String.format(StreamIdConversion.generateStreamIdBetween(routerBolt.getBoltId(), "alertBolt2"))));
-
- Assert.assertEquals("Should finally collect 3 events", 3, orderCollected.size());
- Assert.assertArrayEquals("Should sort 3 events in ASC order", new String[] {"value2", "value1", "value3"}, orderCollected.stream().map((d) -> d.getData()[0]).toArray());
-
- // The first 3 events are ticked automatically by window
-
- routerBolt.cleanup();
-
- // Close will flush all events in memory, so will receive the last event which is still in memory as window is not expired according to clock
- // The 5th event will be thrown because too late and out of margin
-
- Assert.assertEquals("Should finally collect two streams", 2, streamCollected.size());
- Assert.assertEquals("Should finally collect 3 events", 4, orderCollected.size());
- Assert.assertArrayEquals("Should sort 4 events in ASC-ordered timestamp", new String[] {"value2", "value1", "value3", "value4"}, orderCollected.stream().map((d) -> d.getData()[0]).toArray());
-
- }
-
- @SuppressWarnings("serial")
- public static class MockChangeService extends AbstractMetadataChangeNotifyService {
- private final static Logger LOG = LoggerFactory.getLogger(MockChangeService.class);
-
- @Override
- public void close() throws IOException {
- LOG.info("Closing");
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/serialization/JavaSerializationTest.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/serialization/JavaSerializationTest.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/serialization/JavaSerializationTest.java
deleted file mode 100644
index a3939cc..0000000
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/serialization/JavaSerializationTest.java
+++ /dev/null
@@ -1,112 +0,0 @@
-package org.apache.eagle.alert.engine.serialization;
-
-import org.apache.commons.lang3.SerializationUtils;
-import org.apache.eagle.alert.engine.coordinator.StreamColumn;
-import org.apache.eagle.alert.engine.coordinator.StreamDefinition;
-import org.apache.eagle.alert.engine.coordinator.StreamPartition;
-import org.apache.eagle.alert.engine.model.PartitionedEvent;
-import org.apache.eagle.alert.engine.model.StreamEvent;
-import org.apache.eagle.alert.utils.ByteUtils;
-import org.junit.Assert;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.List;
-
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-public class JavaSerializationTest {
- private final static Logger LOG = LoggerFactory.getLogger(JavaSerializationTest.class);
-
- @Test
- public void testJavaSerialization() {
- PartitionedEvent partitionedEvent = new PartitionedEvent();
- partitionedEvent.setPartitionKey(partitionedEvent.hashCode());
- partitionedEvent.setPartition(createSampleStreamGroupbyPartition("sampleStream", Arrays.asList("name", "host")));
- StreamEvent event = new StreamEvent();
- event.setStreamId("sampleStream");
- event.setTimestamp(System.currentTimeMillis());
- event.setData(new Object[] {"CPU", "LOCALHOST", true, Long.MAX_VALUE, 60.0});
- partitionedEvent.setEvent(event);
-
- int javaSerializationLength = SerializationUtils.serialize(partitionedEvent).length;
- LOG.info("Java serialization length: {}, event: {}", javaSerializationLength, partitionedEvent);
-
- int compactLength = 0;
- compactLength += "sampleStream".getBytes().length;
- compactLength += ByteUtils.intToBytes(partitionedEvent.getPartition().hashCode()).length;
- compactLength += ByteUtils.longToBytes(partitionedEvent.getTimestamp()).length;
- compactLength += "CPU".getBytes().length;
- compactLength += "LOCALHOST".getBytes().length;
- compactLength += 1;
- compactLength += ByteUtils.longToBytes(Long.MAX_VALUE).length;
- compactLength += ByteUtils.doubleToBytes(60.0).length;
-
- LOG.info("Compact serialization length: {}, event: {}", compactLength, partitionedEvent);
- Assert.assertTrue(compactLength * 20 < javaSerializationLength);
- }
-
-
- public static StreamDefinition createSampleStreamDefinition(String streamId) {
- StreamDefinition sampleStreamDefinition = new StreamDefinition();
- sampleStreamDefinition.setStreamId(streamId);
- sampleStreamDefinition.setTimeseries(true);
- sampleStreamDefinition.setValidate(true);
- sampleStreamDefinition.setDescription("Schema for " + streamId);
- List<StreamColumn> streamColumns = new ArrayList<>();
-
- streamColumns.add(new StreamColumn.Builder().name("name").type(StreamColumn.Type.STRING).build());
- streamColumns.add(new StreamColumn.Builder().name("host").type(StreamColumn.Type.STRING).build());
- streamColumns.add(new StreamColumn.Builder().name("flag").type(StreamColumn.Type.BOOL).build());
- streamColumns.add(new StreamColumn.Builder().name("data").type(StreamColumn.Type.LONG).build());
- streamColumns.add(new StreamColumn.Builder().name("value").type(StreamColumn.Type.DOUBLE).build());
- sampleStreamDefinition.setColumns(streamColumns);
- return sampleStreamDefinition;
- }
-
- public static StreamPartition createSampleStreamGroupbyPartition(String streamId, List<String> groupByField) {
- StreamPartition streamPartition = new StreamPartition();
- streamPartition.setStreamId(streamId);
- streamPartition.setColumns(groupByField);
- streamPartition.setType(StreamPartition.Type.GROUPBY);
- return streamPartition;
- }
-
- @SuppressWarnings("serial")
- public static PartitionedEvent createSimpleStreamEvent() {
- StreamEvent event = StreamEvent.builder()
- .schema(createSampleStreamDefinition("sampleStream_1"))
- .streamId("sampleStream_1")
- .timestamep(System.currentTimeMillis())
- .attributes(new HashMap<String, Object>() {{
- put("name", "cpu");
- put("host", "localhost");
- put("flag", true);
- put("value", 60.0);
- put("data", Long.MAX_VALUE);
- put("unknown", "unknown column value");
- }}).build();
- PartitionedEvent pEvent = new PartitionedEvent();
- pEvent.setEvent(event);
- pEvent.setPartition(createSampleStreamGroupbyPartition("sampleStream_1", Arrays.asList("name", "host")));
- return pEvent;
- }
-}
http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/serialization/PartitionedEventSerializerTest.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/serialization/PartitionedEventSerializerTest.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/serialization/PartitionedEventSerializerTest.java
deleted file mode 100644
index 5a81e26..0000000
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/serialization/PartitionedEventSerializerTest.java
+++ /dev/null
@@ -1,219 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.engine.serialization;
-
-import backtype.storm.serialization.DefaultKryoFactory;
-import backtype.storm.serialization.DefaultSerializationDelegate;
-import com.esotericsoftware.kryo.Kryo;
-import com.esotericsoftware.kryo.io.Input;
-import com.esotericsoftware.kryo.io.Output;
-import com.google.common.io.ByteArrayDataInput;
-import com.google.common.io.ByteArrayDataOutput;
-import com.google.common.io.ByteStreams;
-import org.apache.commons.lang.time.StopWatch;
-import org.apache.eagle.alert.engine.mock.MockSampleMetadataFactory;
-import org.apache.eagle.alert.engine.model.PartitionedEvent;
-import org.apache.eagle.alert.engine.serialization.impl.PartitionedEventSerializerImpl;
-import org.apache.eagle.alert.utils.TimePeriodUtils;
-import org.joda.time.Period;
-import org.junit.Assert;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.util.BitSet;
-
-
-public class PartitionedEventSerializerTest {
- private final static Logger LOG = LoggerFactory.getLogger(PartitionedEventSerializerTest.class);
-
- @SuppressWarnings("deprecation")
- @Test
- public void testPartitionEventSerialization() throws IOException {
- PartitionedEvent partitionedEvent = MockSampleMetadataFactory.createPartitionedEventGroupedByName("sampleStream", System.currentTimeMillis());
- ;
- PartitionedEventSerializerImpl serializer = new PartitionedEventSerializerImpl(MockSampleMetadataFactory::createSampleStreamDefinition);
-
- ByteArrayDataOutput dataOutput1 = ByteStreams.newDataOutput();
- serializer.serialize(partitionedEvent, dataOutput1);
- byte[] serializedBytes = dataOutput1.toByteArray();
- PartitionedEvent deserializedEvent = serializer.deserialize(ByteStreams.newDataInput(serializedBytes));
- Assert.assertEquals(partitionedEvent, deserializedEvent);
-
- PartitionedEventSerializerImpl compressSerializer = new PartitionedEventSerializerImpl(MockSampleMetadataFactory::createSampleStreamDefinition, true);
-
- byte[] serializedBytesCompressed = compressSerializer.serialize(partitionedEvent);
- PartitionedEvent deserializedEventCompressed = compressSerializer.deserialize(serializedBytesCompressed);
- Assert.assertEquals(partitionedEvent, deserializedEventCompressed);
-
- PartitionedEventDigestSerializer serializer2 = new PartitionedEventDigestSerializer(MockSampleMetadataFactory::createSampleStreamDefinition);
- ByteArrayDataOutput dataOutput2 = ByteStreams.newDataOutput();
- serializer2.serialize(partitionedEvent, dataOutput2);
- byte[] serializedBytes2 = dataOutput2.toByteArray();
- ByteArrayDataInput dataInput2 = ByteStreams.newDataInput(serializedBytes2);
- PartitionedEvent deserializedEvent2 = serializer2.deserialize(dataInput2);
- Assert.assertEquals(partitionedEvent, deserializedEvent2);
-
- byte[] javaSerialization = new DefaultSerializationDelegate().serialize(partitionedEvent);
- Kryo kryo = new DefaultKryoFactory.KryoSerializableDefault();
- Output output = new Output(10000);
- kryo.writeClassAndObject(output, partitionedEvent);
- byte[] kryoBytes = output.toBytes();
- Input input = new Input(kryoBytes);
- PartitionedEvent kryoDeserializedEvent = (PartitionedEvent) kryo.readClassAndObject(input);
- Assert.assertEquals(partitionedEvent, kryoDeserializedEvent);
- LOG.info("\nCached Stream:{}\nCompressed Cached Stream :{}\nCached Stream + Cached Partition: {}\nJava Native: {}\nKryo: {}\nKryo + Cached Stream: {}\nKryo + Cached Stream + Cached Partition: {}", serializedBytes.length, serializedBytesCompressed.length, serializedBytes2.length, javaSerialization.length, kryoBytes.length, kryoSerialize(serializedBytes).length, kryoSerialize(serializedBytes2).length);
- }
-
- @SuppressWarnings("deprecation")
- @Test
- public void testPartitionEventSerializationEfficiency() throws IOException {
- PartitionedEvent partitionedEvent = MockSampleMetadataFactory.createPartitionedEventGroupedByName("sampleStream", System.currentTimeMillis());
- ;
- PartitionedEventSerializerImpl serializer = new PartitionedEventSerializerImpl(MockSampleMetadataFactory::createSampleStreamDefinition);
-
- int count = 100000;
- StopWatch stopWatch = new StopWatch();
- stopWatch.start();
- int i = 0;
- while (i < count) {
- ByteArrayDataOutput dataOutput1 = ByteStreams.newDataOutput();
- serializer.serialize(partitionedEvent, dataOutput1);
- byte[] serializedBytes = dataOutput1.toByteArray();
- PartitionedEvent deserializedEvent = serializer.deserialize(ByteStreams.newDataInput(serializedBytes));
- Assert.assertEquals(partitionedEvent, deserializedEvent);
- i++;
- }
- stopWatch.stop();
- LOG.info("Cached Stream: {} ms", stopWatch.getTime());
- stopWatch.reset();
- PartitionedEventSerializerImpl compressSerializer = new PartitionedEventSerializerImpl(MockSampleMetadataFactory::createSampleStreamDefinition, true);
- i = 0;
- stopWatch.start();
- while (i < count) {
- byte[] serializedBytesCompressed = compressSerializer.serialize(partitionedEvent);
- PartitionedEvent deserializedEventCompressed = compressSerializer.deserialize(serializedBytesCompressed);
- Assert.assertEquals(partitionedEvent, deserializedEventCompressed);
- i++;
- }
- stopWatch.stop();
- LOG.info("Compressed Cached Stream: {} ms", stopWatch.getTime());
- stopWatch.reset();
-
- i = 0;
- stopWatch.start();
- while (i < count) {
- PartitionedEventDigestSerializer serializer2 = new PartitionedEventDigestSerializer(MockSampleMetadataFactory::createSampleStreamDefinition);
- ByteArrayDataOutput dataOutput2 = ByteStreams.newDataOutput();
- serializer2.serialize(partitionedEvent, dataOutput2);
- byte[] serializedBytes2 = dataOutput2.toByteArray();
- ByteArrayDataInput dataInput2 = ByteStreams.newDataInput(serializedBytes2);
- PartitionedEvent deserializedEvent2 = serializer2.deserialize(dataInput2);
- Assert.assertEquals(partitionedEvent, deserializedEvent2);
- i++;
- }
- stopWatch.stop();
- LOG.info("Cached Stream&Partition: {} ms", stopWatch.getTime());
- stopWatch.reset();
- i = 0;
- stopWatch.start();
- while (i < count) {
- byte[] javaSerialization = new DefaultSerializationDelegate().serialize(partitionedEvent);
- PartitionedEvent javaSerializedEvent = (PartitionedEvent) new DefaultSerializationDelegate().deserialize(javaSerialization);
- Assert.assertEquals(partitionedEvent, javaSerializedEvent);
- i++;
- }
- stopWatch.stop();
- LOG.info("Java Native: {} ms", stopWatch.getTime());
- stopWatch.reset();
- i = 0;
- stopWatch.start();
- Kryo kryo = new DefaultKryoFactory.KryoSerializableDefault();
- while (i < count) {
- Output output = new Output(10000);
- kryo.writeClassAndObject(output, partitionedEvent);
- byte[] kryoBytes = output.toBytes();
- Input input = new Input(kryoBytes);
- PartitionedEvent kryoDeserializedEvent = (PartitionedEvent) kryo.readClassAndObject(input);
- Assert.assertEquals(partitionedEvent, kryoDeserializedEvent);
- i++;
- }
- stopWatch.stop();
- LOG.info("Kryo: {} ms", stopWatch.getTime());
- }
-
- /**
- * Kryo Serialization Length = Length of byte[] + 2
- */
- @Test
- public void testKryoByteArraySerialization() {
- Kryo kryo = new DefaultKryoFactory.KryoSerializableDefault();
- byte[] bytes = new byte[] {0, 1, 2, 3, 4, 5, 6, 7, 8, 9};
- Output output = new Output(1000);
- kryo.writeObject(output, bytes);
- Assert.assertEquals(bytes.length + 2, output.toBytes().length);
- }
-
- private byte[] kryoSerialize(Object object) {
- Kryo kryo = new DefaultKryoFactory.KryoSerializableDefault();
- Output output = new Output(100000);
- kryo.writeClassAndObject(output, object);
- return output.toBytes();
- }
-
- @Test
- public void testBitSet() {
- BitSet bitSet = new BitSet();
- bitSet.set(0, true); // 1
- bitSet.set(1, false); // 0
- bitSet.set(2, true); // 1
- LOG.info("Bit Set Size: {}", bitSet.size());
- LOG.info("Bit Set Byte[]: {}", bitSet.toByteArray());
- LOG.info("Bit Set Byte[]: {}", bitSet.toLongArray());
- LOG.info("BitSet[0]: {}", bitSet.get(0));
- LOG.info("BitSet[1]: {}", bitSet.get(1));
- LOG.info("BitSet[1]: {}", bitSet.get(2));
-
- byte[] bytes = bitSet.toByteArray();
-
- BitSet bitSet2 = BitSet.valueOf(bytes);
-
- LOG.info("Bit Set Size: {}", bitSet2.size());
- LOG.info("Bit Set Byte[]: {}", bitSet2.toByteArray());
- LOG.info("Bit Set Byte[]: {}", bitSet2.toLongArray());
- LOG.info("BitSet[0]: {}", bitSet2.get(0));
- LOG.info("BitSet[1]: {}", bitSet2.get(1));
- LOG.info("BitSet[1]: {}", bitSet2.get(2));
-
-
- BitSet bitSet3 = new BitSet();
- bitSet3.set(0, true);
- Assert.assertEquals(1, bitSet3.length());
-
- BitSet bitSet4 = new BitSet();
- bitSet4.set(0, false);
- Assert.assertEquals(0, bitSet4.length());
- Assert.assertFalse(bitSet4.get(1));
- Assert.assertFalse(bitSet4.get(2));
- }
-
- @Test
- public void testPartitionType() {
-
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/siddhi/SiddhiPolicyTest.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/siddhi/SiddhiPolicyTest.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/siddhi/SiddhiPolicyTest.java
deleted file mode 100644
index 9520b62..0000000
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/siddhi/SiddhiPolicyTest.java
+++ /dev/null
@@ -1,280 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.engine.siddhi;
-
-import org.junit.*;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.wso2.siddhi.core.ExecutionPlanRuntime;
-import org.wso2.siddhi.core.SiddhiManager;
-import org.wso2.siddhi.core.event.Event;
-import org.wso2.siddhi.core.stream.input.InputHandler;
-import org.wso2.siddhi.core.stream.output.StreamCallback;
-import org.wso2.siddhi.core.util.EventPrinter;
-
-import java.util.HashSet;
-import java.util.Set;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-/**
- * @since Jun 21, 2016
- */
-public class SiddhiPolicyTest {
-
- private static final Logger LOG = LoggerFactory.getLogger(SiddhiPolicyTest.class);
-
- private String streams = " define stream syslog_stream("
- + "dims_facility string, "
- + "dims_severity string, "
- + "dims_hostname string, "
- + "dims_msgid string, "
- + "timestamp string, "
- + "conn string, "
- + "op string, "
- + "msgId string, "
- + "command string, "
- + "name string, "
- + "namespace string, "
- + "epochMillis long); ";
- private SiddhiManager sm;
-
- @Before
- public void setup() {
- sm = new SiddhiManager();
- }
-
- @After
- public void shutdown() {
- sm.shutdown();
- }
-
- @Test
- public void testPolicy_grpby() {
- String ql = " from syslog_stream#window.time(1min) select name, namespace, timestamp, dims_hostname, count(*) as abortCount group by dims_hostname insert into syslog_severity_check_output; ";
- StreamCallback sc = new StreamCallback() {
- @Override
- public void receive(Event[] arg0) {
-
- }
-
- ;
- };
-
- String executionPlan = streams + ql;
- ExecutionPlanRuntime runtime = sm.createExecutionPlanRuntime(executionPlan);
- runtime.addCallback("syslog_severity_check_output", sc);
- runtime.start();
- }
-
- @Ignore
- @Test
- public void testPolicy_agg() throws Exception {
- String sql = " from syslog_stream#window.time(1min) select "
- + "name, "
- + "namespace, "
- + "timestamp, "
- + "dims_hostname, "
- + "count(*) as abortCount "
- + "group by dims_hostname "
- + "having abortCount > 3 insert into syslog_severity_check_output; ";
-
- final AtomicBoolean checked = new AtomicBoolean(false);
- StreamCallback sc = new StreamCallback() {
- @Override
- public void receive(Event[] arg0) {
- checked.set(true);
- LOG.info("event array size: " + arg0.length);
- Set<String> hosts = new HashSet<String>();
- for (Event e : arg0) {
- hosts.add((String) e.getData()[3]);
- }
-
- LOG.info(" grouped hosts : " + hosts);
- Assert.assertTrue(hosts.contains("HOSTNAME-" + 0));
- Assert.assertTrue(hosts.contains("HOSTNAME-" + 1));
- Assert.assertTrue(hosts.contains("HOSTNAME-" + 2));
- Assert.assertFalse(hosts.contains("HOSTNAME-" + 3));
- }
-
- ;
- };
-
- String executionPlan = streams + sql;
- ExecutionPlanRuntime runtime = sm.createExecutionPlanRuntime(executionPlan);
- runtime.addCallback("syslog_severity_check_output", sc);
- runtime.start();
- InputHandler handler = runtime.getInputHandler("syslog_stream");
-
- sendInput(handler);
-
- Thread.sleep(1000);
-
- Assert.assertTrue(checked.get());
-
- runtime.shutdown();
- }
-
- /*
- + "dims_facility string, "
- + "dims_severity string, "
- + "dims_hostname string, "
- + "dims_msgid string, "
- + "timestamp string, "
- + "conn string, "
- + "op string, "
- + "msgId string, "
- + "command string, "
- + "name string, "
- + "namespace string, "
- + "epochMillis long)
- */
- private void sendInput(InputHandler handler) throws Exception {
- int length = 15;
- Event[] events = new Event[length];
- for (int i = 0; i < length; i++) {
- Event e = new Event(12);
- e.setTimestamp(System.currentTimeMillis());
- e.setData(new Object[] {"facitliy", "SEVERITY_EMERG", "HOSTNAME-" + i % 4, "MSGID-...", "Timestamp", "conn-sss", "op-msg-Abort", "msgId..", "command-...", "name-", "namespace", System.currentTimeMillis()});
-
- events[i] = e;
- }
-
- handler.send(events);
-
- Thread.sleep(61 * 1000);
-
- Event e = new Event(12);
- e.setTimestamp(System.currentTimeMillis());
- e.setData(new Object[] {"facitliy", "SEVERITY_EMERG", "HOSTNAME-" + 11, "MSGID-...", "Timestamp", "conn-sss", "op-msg", "msgId..", "command-...", "name-", "namespace", System.currentTimeMillis()});
- handler.send(e);
- }
-
- @Ignore
- @Test
- public void testPolicy_regex() throws Exception {
- String sql = " from syslog_stream[regex:find(\"Abort\", op)]#window.time(1min) select timestamp, dims_hostname, count(*) as abortCount group by dims_hostname insert into syslog_severity_check_output; ";
-
- AtomicBoolean checked = new AtomicBoolean();
- StreamCallback sc = new StreamCallback() {
- @Override
- public void receive(Event[] arg0) {
- checked.set(true);
- }
-
- ;
- };
-
- String executionPlan = streams + sql;
- ExecutionPlanRuntime runtime = sm.createExecutionPlanRuntime(executionPlan);
- runtime.addCallback("syslog_severity_check_output", sc);
- runtime.start();
-
- InputHandler handler = runtime.getInputHandler("syslog_stream");
-
- sendInput(handler);
-
- Thread.sleep(1000);
-
- Assert.assertTrue(checked.get());
-
- runtime.shutdown();
- }
-
- @Ignore
- @Test
- public void testPolicy_seq() throws Exception {
- String sql = ""
- + " from every e1=syslog_stream[regex:find(\"UPDOWN\", op)] -> "
- + " e2=syslog_stream[dims_hostname == e1.dims_hostname and regex:find(\"Abort\", op)] within 1 min "
- + " select e1.timestamp as timestamp, e1.op as a_op, e2.op as b_op "
- + " insert into syslog_severity_check_output; ";
-
- AtomicBoolean checked = new AtomicBoolean();
- StreamCallback sc = new StreamCallback() {
- @Override
- public void receive(Event[] arg0) {
- checked.set(true);
- }
-
- ;
- };
-
- String executionPlan = streams + sql;
- ExecutionPlanRuntime runtime = sm.createExecutionPlanRuntime(executionPlan);
- runtime.addCallback("syslog_severity_check_output", sc);
- runtime.start();
- InputHandler handler = runtime.getInputHandler("syslog_stream");
-
- sendPatternInput(handler);
-
- Thread.sleep(1000);
- Assert.assertTrue(checked.get());
-
- runtime.shutdown();
- }
-
- private void sendPatternInput(InputHandler handler) throws Exception {
- // validate one
- Event e = new Event(12);
- e.setTimestamp(System.currentTimeMillis());
- e.setData(new Object[] {"facitliy", "SEVERITY_EMERG", "HOSTNAME-" + 0, "MSGID-...", "Timestamp", "conn-sss", "op-msg-UPDOWN", "msgId..", "command-...", "name-", "namespace", System.currentTimeMillis()});
-
- e = new Event(12);
- e.setTimestamp(System.currentTimeMillis());
- e.setData(new Object[] {"facitliy", "SEVERITY_EMERG", "HOSTNAME-" + 0, "MSGID-...", "Timestamp", "conn-sss", "op-msg-nothing", "msgId..", "command-...", "name-", "namespace", System.currentTimeMillis()});
-
- e = new Event(12);
- e.setTimestamp(System.currentTimeMillis());
- e.setData(new Object[] {"facitliy", "SEVERITY_EMERG", "HOSTNAME-" + 0, "MSGID-...", "Timestamp", "conn-sss", "op-msg-Abort", "msgId..", "command-...", "name-", "namespace", System.currentTimeMillis()});
-
- Thread.sleep(61 * 1000);
-
- e = new Event(12);
- e.setTimestamp(System.currentTimeMillis());
- e.setData(new Object[] {"facitliy", "SEVERITY_EMERG", "HOSTNAME-" + 11, "MSGID-...", "Timestamp", "conn-sss", "op-msg", "msgId..", "command-...", "name-", "namespace", System.currentTimeMillis()});
- handler.send(e);
- }
-
-
- @Test
- public void testStrConcat() throws Exception {
- String ql = " define stream log(timestamp long, switchLabel string, port string, message string); " +
- " from log select timestamp, str:concat(switchLabel, '===', port) as alertKey, message insert into output; ";
- SiddhiManager manager = new SiddhiManager();
- ExecutionPlanRuntime runtime = manager.createExecutionPlanRuntime(ql);
- runtime.addCallback("output", new StreamCallback() {
- @Override
- public void receive(Event[] events) {
- EventPrinter.print(events);
- }
- });
-
- runtime.start();
-
- InputHandler logInput = runtime.getInputHandler("log");
-
- Event e = new Event();
- e.setTimestamp(System.currentTimeMillis());
- e.setData(new Object[] {System.currentTimeMillis(), "switch-ra-slc-01", "port01", "log-message...."});
- logInput.send(e);
-
- Thread.sleep(1000);
- runtime.shutdown();
-
- }
-
-}
http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/siddhi/extension/AttributeCollectAggregatorTest.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/siddhi/extension/AttributeCollectAggregatorTest.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/siddhi/extension/AttributeCollectAggregatorTest.java
deleted file mode 100644
index 7694623..0000000
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/siddhi/extension/AttributeCollectAggregatorTest.java
+++ /dev/null
@@ -1,148 +0,0 @@
-/*
- *
- * * Licensed to the Apache Software Foundation (ASF) under one or more
- * * contributor license agreements. See the NOTICE file distributed with
- * * this work for additional information regarding copyright ownership.
- * * The ASF licenses this file to You under the Apache License, Version 2.0
- * * (the "License"); you may not use this file except in compliance with
- * * the License. You may obtain a copy of the License at
- * *
- * * http://www.apache.org/licenses/LICENSE-2.0
- * *
- * * Unless required by applicable law or agreed to in writing, software
- * * distributed under the License is distributed on an "AS IS" BASIS,
- * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * * See the License for the specific language governing permissions and
- * * limitations under the License.
- *
- */
-package org.apache.eagle.alert.engine.siddhi.extension;
-
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.wso2.siddhi.core.ExecutionPlanRuntime;
-import org.wso2.siddhi.core.SiddhiManager;
-import org.wso2.siddhi.core.event.Event;
-import org.wso2.siddhi.core.stream.input.InputHandler;
-import org.wso2.siddhi.core.stream.output.StreamCallback;
-
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Random;
-
-/**
- * @since Apr 1, 2016
- */
-public class AttributeCollectAggregatorTest {
-
- private static final Logger logger = LoggerFactory.getLogger(AttributeCollectAggregatorTest.class);
-
- @Test
- public void test() throws Exception {
- String ql = "define stream s1(timestamp long, host string, type string);";
- ql += " from s1#window.externalTime(timestamp, 1 sec)";
- ql += " select eagle:collect(timestamp) as timestamps, eagle:collect(host) as hosts, type group by type insert into output;";
-
- SiddhiManager sm = new SiddhiManager();
- ExecutionPlanRuntime runtime = sm.createExecutionPlanRuntime(ql);
-
- InputHandler input = runtime.getInputHandler("s1");
- runtime.addCallback("output", new StreamCallback() {
-
- @Override
- public void receive(Event[] arg0) {
- logger.info("output event length:" + arg0.length);
-
- for (Event e : arg0) {
- StringBuilder sb = new StringBuilder("\t - [").append(e.getData().length).append("]");
- for (Object o : e.getData()) {
- sb.append("," + o);
- }
- logger.info(sb.toString());
- }
- logger.info("===end===");
- }
- });
-// StreamDefinition definition = (StreamDefinition) runtime.getStreamDefinitionMap().get("output");
-
- runtime.start();
-
- Event[] events = generateEvents();
- for (Event e : events) {
- input.send(e);
- }
-
- Thread.sleep(1000);
-
- }
-
- private Event[] generateEvents() {
- List<Event> events = new LinkedList<Event>();
-
- Random r = new Random();
- Event e = null;
- long base = System.currentTimeMillis();
- {
- e = new Event(base, new Object[] {base, "host" + r.nextInt(), "nova"});
- base += 100;
- events.add(e);
- e = new Event(base, new Object[] {base, "host" + r.nextInt(), "nova"});
- base += 100;
- events.add(e);
- e = new Event(base, new Object[] {base, "host" + r.nextInt(), "nova"});
- base += 100;
- events.add(e);
- }
-
- {
- e = new Event(base, new Object[] {base, "host" + r.nextInt(), "neutron"});
- base += 100;
- events.add(e);
- e = new Event(base, new Object[] {base, "host" + r.nextInt(), "neutron"});
- base += 100;
- events.add(e);
- e = new Event(base, new Object[] {base, "host" + r.nextInt(), "neutron"});
- base += 100;
- events.add(e);
- }
-
- base += 10000;
- {
- e = new Event(base, new Object[] {base, "host" + r.nextInt(), "nova1"});
- base += 100;
- events.add(e);
- e = new Event(base, new Object[] {base, "host" + r.nextInt(), "nova1"});
- base += 100;
- events.add(e);
- e = new Event(base, new Object[] {base, "host" + r.nextInt(), "nova1"});
- base += 100;
- events.add(e);
- }
-
- {
- e = new Event(base, new Object[] {base, "host" + r.nextInt(), "neutron2"});
- base += 100;
- events.add(e);
- e = new Event(base, new Object[] {base, "host" + r.nextInt(), "neutron2"});
- base += 100;
- events.add(e);
- e = new Event(base, new Object[] {base, "host" + r.nextInt(), "neutron2"});
- base += 100;
- events.add(e);
- }
- base += 10000;
- e = new Event(base, new Object[] {base, "host" + r.nextInt(), "mq"});
-
- return events.toArray(new Event[0]);
- }
-
- @Test
- public void testQuery() {
- String ql = "define stream perfmon_input_stream_cpu ( host string,timestamp long,metric string,pool string,value double,colo string );";
- ql += "from perfmon_input_stream_cpu#window.length(3) select host, min(value) as min group by host having min>91.0 insert into perfmon_output_stream_cpu;";
-
- SiddhiManager sm = new SiddhiManager();
- sm.createExecutionPlanRuntime(ql);
- }
-}
http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/sorter/MapDBTestSuite.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/sorter/MapDBTestSuite.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/sorter/MapDBTestSuite.java
deleted file mode 100644
index ebec509..0000000
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/sorter/MapDBTestSuite.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.engine.sorter;
-
-import org.junit.Assert;
-import org.junit.Test;
-import org.mapdb.BTreeMap;
-import org.mapdb.DB;
-import org.mapdb.DBMaker;
-import org.mapdb.Serializer;
-
-public class MapDBTestSuite {
- @Test
- public void testOnHeapDB() {
- DB db = DBMaker.heapDB().make();
- BTreeMap<Long, String> map = db.treeMap("btree").keySerializer(Serializer.LONG).valueSerializer(Serializer.STRING).create();
- Assert.assertFalse(map.putIfAbsentBoolean(1L, "val_1"));
- Assert.assertTrue(map.putIfAbsentBoolean(1L, "val_2"));
- Assert.assertTrue(map.putIfAbsentBoolean(1L, "val_3"));
- Assert.assertFalse(map.putIfAbsentBoolean(2L, "val_4"));
-
- Assert.assertEquals("val_1", map.get(1L));
- Assert.assertEquals("val_4", map.get(2L));
-
- Assert.assertTrue(map.replace(2L, "val_4", "val_5"));
- Assert.assertEquals("val_5", map.get(2L));
-
- map.close();
- db.close();
- }
-}
http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/sorter/StreamSortHandlerTest.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/sorter/StreamSortHandlerTest.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/sorter/StreamSortHandlerTest.java
deleted file mode 100644
index ff7b8ee..0000000
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/sorter/StreamSortHandlerTest.java
+++ /dev/null
@@ -1,268 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.engine.sorter;
-
-import com.codahale.metrics.MetricRegistry;
-import com.codahale.metrics.ScheduledReporter;
-import com.codahale.metrics.Slf4jReporter;
-import com.codahale.metrics.jvm.GarbageCollectorMetricSet;
-import com.codahale.metrics.jvm.MemoryUsageGaugeSet;
-import com.google.common.collect.Ordering;
-import org.apache.commons.lang.time.StopWatch;
-import org.apache.eagle.alert.engine.mock.MockPartitionedCollector;
-import org.apache.eagle.alert.engine.mock.MockSampleMetadataFactory;
-import org.apache.eagle.alert.engine.model.PartitionedEvent;
-import org.apache.eagle.alert.engine.sorter.impl.PartitionedEventTimeOrderingComparator;
-import org.apache.eagle.alert.engine.sorter.impl.StreamSortWindowHandlerImpl;
-import org.apache.eagle.alert.engine.sorter.impl.StreamTimeClockInLocalMemory;
-import org.apache.eagle.common.DateTimeUtil;
-
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Ignore;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.lang.management.ManagementFactory;
-import java.util.*;
-import java.util.concurrent.TimeUnit;
-
-/**
- * -XX:+UseConcMarkSweepGC -XX:+UseParNewGC -XX:+PrintGCTaskTimeStamps -XX:+PrintGCDetails -verbose:gc
- */
-public class StreamSortHandlerTest {
- private final static Logger LOG = LoggerFactory.getLogger(StreamSortHandlerTest.class);
-
- static {
- LOG.info(ManagementFactory.getRuntimeMXBean().getName());
- }
-
- private ScheduledReporter metricReporter;
-
- @Before
- public void setUp() {
- final MetricRegistry metrics = new MetricRegistry();
- metrics.registerAll(new MemoryUsageGaugeSet());
- metrics.registerAll(new GarbageCollectorMetricSet());
- metricReporter = Slf4jReporter.forRegistry(metrics)
- .filter((name, metric) -> name.matches("(.*heap|pools.PS.*).usage"))
- .withLoggingLevel(Slf4jReporter.LoggingLevel.DEBUG)
- .convertRatesTo(TimeUnit.SECONDS)
- .convertDurationsTo(TimeUnit.MILLISECONDS)
- .build();
- metricReporter.start(60, TimeUnit.SECONDS);
- }
-
- /**
- * Used to debug window bucket lifecycle
- * <p>
- * Window period: PT1s, margin: 5s
- *
- * @throws InterruptedException
- */
- @Test
- public void testWithUnsortedEventsIn1MinuteWindow() throws InterruptedException {
- MockPartitionedCollector mockCollector = new MockPartitionedCollector();
- StreamTimeClockInLocalMemory timeClock = new StreamTimeClockInLocalMemory("sampleStream_1");
- Ordering<PartitionedEvent> timeOrdering = Ordering.from(PartitionedEventTimeOrderingComparator.INSTANCE);
- StreamSortWindowHandlerImpl sortHandler = new StreamSortWindowHandlerImpl();
- sortHandler.prepare("sampleStream_1", MockSampleMetadataFactory.createSampleStreamSortSpec("sampleStream_1", "PT1m", 5000), mockCollector);
- List<PartitionedEvent> unsortedList = new LinkedList<>();
-
- int i = 0;
- while (i < 1000) {
- PartitionedEvent event = MockSampleMetadataFactory.createRandomOutOfTimeOrderEventGroupedByName("sampleStream_1");
- sortHandler.nextEvent(event);
- unsortedList.add(event);
- if (event.getTimestamp() > timeClock.getTime()) {
- timeClock.moveForward(event.getTimestamp());
- }
- sortHandler.onTick(timeClock, System.currentTimeMillis());
- i++;
- }
- sortHandler.close();
- Assert.assertFalse(timeOrdering.isOrdered(unsortedList));
- Assert.assertTrue(timeOrdering.isOrdered(mockCollector.get()));
- Assert.assertTrue(mockCollector.get().size() > 0);
- }
-
- @Test
- public void testStreamSortHandlerWithUnsortedEventsIn1HourWindow() throws InterruptedException {
- testWithUnsortedEventsIn1hWindow(1000000);
- }
-
- @Test
- public void testSortedInPatient() throws InterruptedException {
- MockPartitionedCollector mockCollector = new MockPartitionedCollector();
- StreamTimeClockInLocalMemory timeClock = new StreamTimeClockInLocalMemory("sampleStream_1");
- Ordering<PartitionedEvent> timeOrdering = Ordering.from(PartitionedEventTimeOrderingComparator.INSTANCE);
- StreamSortWindowHandlerImpl sortHandler = new StreamSortWindowHandlerImpl();
- sortHandler.prepare("sampleStream_1", MockSampleMetadataFactory.createSampleStreamSortSpec("sampleStream_1", "PT1h", 5000), mockCollector);
- List<PartitionedEvent> sortedList = new LinkedList<>();
-
- int i = 0;
- while (i < 1000000) {
- PartitionedEvent event = MockSampleMetadataFactory.createRandomPartitionedEvent("sampleStream_1", System.currentTimeMillis() + i);
- sortHandler.nextEvent(event);
- sortedList.add(event);
- if (event.getTimestamp() > timeClock.getTime()) {
- timeClock.moveForward(event.getTimestamp());
- }
- sortHandler.onTick(timeClock, System.currentTimeMillis());
- i++;
- }
- sortHandler.close();
- Assert.assertTrue(timeOrdering.isOrdered(sortedList));
- Assert.assertTrue(timeOrdering.isOrdered(mockCollector.get()));
- Assert.assertEquals(1000000, mockCollector.get().size());
- }
-
- /**
- * -XX:+PrintGC
- *
- * @throws InterruptedException
- */
- @Test @Ignore("Igored heavy benchmark test in unit test")
- public void testWithUnsortedEventsInLargeWindowBenchmark() throws InterruptedException {
- metricReporter.report();
- testWithUnsortedEventsIn1hWindow(1000);
- metricReporter.report();
- testWithUnsortedEventsIn1hWindow(10000);
- metricReporter.report();
- testWithUnsortedEventsIn1hWindow(100000);
- metricReporter.report();
- testWithUnsortedEventsIn1hWindow(1000000);
- metricReporter.report();
- testWithUnsortedEventsIn1hWindow(10000000);
- metricReporter.report();
- }
-
- public void testWithUnsortedEventsIn1hWindow(int count) throws InterruptedException {
- MockPartitionedCollector mockCollector = new MockPartitionedCollector();
- StreamTimeClockInLocalMemory timeClock = new StreamTimeClockInLocalMemory("sampleStream_1");
- Ordering<PartitionedEvent> timeOrdering = Ordering.from(PartitionedEventTimeOrderingComparator.INSTANCE);
- StreamSortWindowHandlerImpl sortHandler = new StreamSortWindowHandlerImpl();
- sortHandler.prepare("sampleStream_1", MockSampleMetadataFactory.createSampleStreamSortSpec("sampleStream_1", "PT1h", 5000), mockCollector);
- List<PartitionedEvent> unsortedList = new LinkedList<>();
-
- StopWatch stopWatch = new StopWatch();
- stopWatch.start();
- int i = 0;
- while (i < count) {
- PartitionedEvent event = MockSampleMetadataFactory.createRandomOutOfTimeOrderEventGroupedByName("sampleStream_1");
- sortHandler.nextEvent(event);
- unsortedList.add(event);
- if (event.getEvent().getTimestamp() > timeClock.getTime()) {
- timeClock.moveForward(event.getEvent().getTimestamp());
- }
- sortHandler.onTick(timeClock, System.currentTimeMillis());
- i++;
- }
- stopWatch.stop();
- LOG.info("Produced {} events in {} ms", count, stopWatch.getTime());
- sortHandler.close();
- Assert.assertFalse(timeOrdering.isOrdered(unsortedList));
- Assert.assertTrue(timeOrdering.isOrdered(mockCollector.get()));
- Assert.assertTrue(mockCollector.get().size() >= 0);
- }
-
- /**
- * Used to debug window bucket lifecycle
- * <p>
- * Window period: PT1h, margin: 5s
- *
- * @throws InterruptedException
- */
- @Test
- public void testWithSortedEvents() throws InterruptedException {
- MockPartitionedCollector mockCollector = new MockPartitionedCollector();
- StreamTimeClockInLocalMemory timeClock = new StreamTimeClockInLocalMemory("sampleStream_1");
- Ordering<PartitionedEvent> timeOrdering = Ordering.from(PartitionedEventTimeOrderingComparator.INSTANCE);
- StreamSortWindowHandlerImpl sortHandler = new StreamSortWindowHandlerImpl();
- sortHandler.prepare("sampleStream_1", MockSampleMetadataFactory.createSampleStreamSortSpec("sampleStream_1", "PT1h", 5000), mockCollector);
- List<PartitionedEvent> sortedList = new LinkedList<>();
-
- int i = 0;
- while (i < 1000000) {
- PartitionedEvent event = MockSampleMetadataFactory.createRandomPartitionedEvent("sampleStream_1", System.currentTimeMillis() + i);
- sortHandler.nextEvent(event);
- sortedList.add(event);
- if (event.getTimestamp() > timeClock.getTime()) {
- timeClock.moveForward(event.getTimestamp());
- }
- sortHandler.onTick(timeClock, System.currentTimeMillis());
- i++;
- }
- sortHandler.close();
- Assert.assertTrue(timeOrdering.isOrdered(sortedList));
- Assert.assertTrue(timeOrdering.isOrdered(mockCollector.get()));
- Assert.assertEquals(1000000, mockCollector.get().size());
- }
-
- /**
- * Used to debug window bucket lifecycle
- * <p>
- * Window period: PT1h, margin: 5s
- *
- * @throws InterruptedException
- */
- @Test
- public void testWithSortedEventsAndExpireBySystemTime() throws InterruptedException {
- MockPartitionedCollector mockCollector = new MockPartitionedCollector();
- StreamTimeClockInLocalMemory timeClock = new StreamTimeClockInLocalMemory("sampleStream_1");
- Ordering<PartitionedEvent> timeOrdering = Ordering.from(PartitionedEventTimeOrderingComparator.INSTANCE);
- StreamSortWindowHandlerImpl sortHandler = new StreamSortWindowHandlerImpl();
- sortHandler.prepare("sampleStream_1", MockSampleMetadataFactory.createSampleStreamSortSpec("sampleStream_1", "PT10s", 1000), mockCollector);
- List<PartitionedEvent> sortedList = new LinkedList<>();
-
- PartitionedEvent event = MockSampleMetadataFactory.createRandomSortedEventGroupedByName("sampleStream_1");
- sortHandler.nextEvent(event);
- sortedList.add(event);
- timeClock.moveForward(event.getTimestamp());
- sortHandler.onTick(timeClock, System.currentTimeMillis());
-
- // Triggered to become expired by System time
- sortHandler.onTick(timeClock, System.currentTimeMillis() + 10 * 1000 + 1000L + 1);
-
- Assert.assertTrue(timeOrdering.isOrdered(sortedList));
- Assert.assertTrue(timeOrdering.isOrdered(mockCollector.get()));
- Assert.assertEquals(1, mockCollector.get().size());
-
- sortHandler.close();
- }
-
- // @Test
- public void testWithTimerLock() throws InterruptedException {
- Timer timer = new Timer();
- List<Long> collected = new ArrayList<>();
- timer.schedule(new TimerTask() {
- @Override
- public void run() {
- synchronized (collected) {
- LOG.info("Ticking {}", DateTimeUtil.millisecondsToHumanDateWithMilliseconds(System.currentTimeMillis()));
- collected.add(System.currentTimeMillis());
- try {
- Thread.sleep(5000);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
- }
- }, 0, 100);
- }
-}
\ No newline at end of file