You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@eagle.apache.org by ha...@apache.org on 2016/09/08 07:14:05 UTC
[04/18] incubator-eagle git commit: [EAGLE-530] Fix checkstyle
problems on eagle-alert module and enable failOnViolation
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a0fc8708/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/sorter/MapDBTestSuite.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/sorter/MapDBTestSuite.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/sorter/MapDBTestSuite.java
index 613be00..ebec509 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/sorter/MapDBTestSuite.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/sorter/MapDBTestSuite.java
@@ -25,19 +25,19 @@ import org.mapdb.Serializer;
public class MapDBTestSuite {
@Test
- public void testOnHeapDB(){
+ public void testOnHeapDB() {
DB db = DBMaker.heapDB().make();
- BTreeMap<Long,String> map = db.treeMap("btree").keySerializer(Serializer.LONG).valueSerializer(Serializer.STRING).create();
- Assert.assertFalse(map.putIfAbsentBoolean(1L,"val_1"));
- Assert.assertTrue(map.putIfAbsentBoolean(1L,"val_2"));
- Assert.assertTrue(map.putIfAbsentBoolean(1L,"val_3"));
- Assert.assertFalse(map.putIfAbsentBoolean(2L,"val_4"));
+ BTreeMap<Long, String> map = db.treeMap("btree").keySerializer(Serializer.LONG).valueSerializer(Serializer.STRING).create();
+ Assert.assertFalse(map.putIfAbsentBoolean(1L, "val_1"));
+ Assert.assertTrue(map.putIfAbsentBoolean(1L, "val_2"));
+ Assert.assertTrue(map.putIfAbsentBoolean(1L, "val_3"));
+ Assert.assertFalse(map.putIfAbsentBoolean(2L, "val_4"));
- Assert.assertEquals("val_1",map.get(1L));
- Assert.assertEquals("val_4",map.get(2L));
+ Assert.assertEquals("val_1", map.get(1L));
+ Assert.assertEquals("val_4", map.get(2L));
- Assert.assertTrue(map.replace(2L,"val_4","val_5"));
- Assert.assertEquals("val_5",map.get(2L));
+ Assert.assertTrue(map.replace(2L, "val_4", "val_5"));
+ Assert.assertEquals("val_5", map.get(2L));
map.close();
db.close();
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a0fc8708/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/sorter/StreamSortHandlerTest.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/sorter/StreamSortHandlerTest.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/sorter/StreamSortHandlerTest.java
index 6cadba7..98657e7 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/sorter/StreamSortHandlerTest.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/sorter/StreamSortHandlerTest.java
@@ -16,14 +16,12 @@
*/
package org.apache.eagle.alert.engine.sorter;
-import java.lang.management.ManagementFactory;
-import java.util.ArrayList;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Timer;
-import java.util.TimerTask;
-import java.util.concurrent.TimeUnit;
-
+import com.codahale.metrics.MetricRegistry;
+import com.codahale.metrics.ScheduledReporter;
+import com.codahale.metrics.Slf4jReporter;
+import com.codahale.metrics.jvm.GarbageCollectorMetricSet;
+import com.codahale.metrics.jvm.MemoryUsageGaugeSet;
+import com.google.common.collect.Ordering;
import org.apache.commons.lang.time.StopWatch;
import org.apache.eagle.alert.engine.mock.MockPartitionedCollector;
import org.apache.eagle.alert.engine.mock.MockSampleMetadataFactory;
@@ -38,12 +36,9 @@ import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.codahale.metrics.MetricRegistry;
-import com.codahale.metrics.ScheduledReporter;
-import com.codahale.metrics.Slf4jReporter;
-import com.codahale.metrics.jvm.GarbageCollectorMetricSet;
-import com.codahale.metrics.jvm.MemoryUsageGaugeSet;
-import com.google.common.collect.Ordering;
+import java.lang.management.ManagementFactory;
+import java.util.*;
+import java.util.concurrent.TimeUnit;
/**
* -XX:+UseConcMarkSweepGC -XX:+UseParNewGC -XX:+PrintGCTaskTimeStamps -XX:+PrintGCDetails -verbose:gc
@@ -56,23 +51,24 @@ public class StreamSortHandlerTest {
}
private ScheduledReporter metricReporter;
+
@Before
- public void setUp(){
+ public void setUp() {
final MetricRegistry metrics = new MetricRegistry();
metrics.registerAll(new MemoryUsageGaugeSet());
metrics.registerAll(new GarbageCollectorMetricSet());
metricReporter = Slf4jReporter.forRegistry(metrics)
- .filter((name, metric) -> name.matches("(.*heap|pools.PS.*).usage"))
- .withLoggingLevel(Slf4jReporter.LoggingLevel.DEBUG)
- .convertRatesTo(TimeUnit.SECONDS)
- .convertDurationsTo(TimeUnit.MILLISECONDS)
- .build();
- metricReporter.start(60,TimeUnit.SECONDS);
+ .filter((name, metric) -> name.matches("(.*heap|pools.PS.*).usage"))
+ .withLoggingLevel(Slf4jReporter.LoggingLevel.DEBUG)
+ .convertRatesTo(TimeUnit.SECONDS)
+ .convertDurationsTo(TimeUnit.MILLISECONDS)
+ .build();
+ metricReporter.start(60, TimeUnit.SECONDS);
}
/**
* Used to debug window bucket lifecycle
- *
+ * <p>
* Window period: PT1s, margin: 5s
*
* @throws InterruptedException
@@ -83,18 +79,18 @@ public class StreamSortHandlerTest {
StreamTimeClockInLocalMemory timeClock = new StreamTimeClockInLocalMemory("sampleStream_1");
Ordering<PartitionedEvent> timeOrdering = Ordering.from(PartitionedEventTimeOrderingComparator.INSTANCE);
StreamSortWindowHandlerImpl sortHandler = new StreamSortWindowHandlerImpl();
- sortHandler.prepare("sampleStream_1", MockSampleMetadataFactory.createSampleStreamSortSpec("sampleStream_1", "PT1m",5000),mockCollector);
+ sortHandler.prepare("sampleStream_1", MockSampleMetadataFactory.createSampleStreamSortSpec("sampleStream_1", "PT1m", 5000), mockCollector);
List<PartitionedEvent> unsortedList = new LinkedList<>();
int i = 0;
- while(i<1000) {
+ while (i < 1000) {
PartitionedEvent event = MockSampleMetadataFactory.createRandomOutOfTimeOrderEventGroupedByName("sampleStream_1");
sortHandler.nextEvent(event);
unsortedList.add(event);
- if(event.getTimestamp()>timeClock.getTime()) {
+ if (event.getTimestamp() > timeClock.getTime()) {
timeClock.moveForward(event.getTimestamp());
}
- sortHandler.onTick(timeClock,System.currentTimeMillis());
+ sortHandler.onTick(timeClock, System.currentTimeMillis());
i++;
}
sortHandler.close();
@@ -114,24 +110,24 @@ public class StreamSortHandlerTest {
StreamTimeClockInLocalMemory timeClock = new StreamTimeClockInLocalMemory("sampleStream_1");
Ordering<PartitionedEvent> timeOrdering = Ordering.from(PartitionedEventTimeOrderingComparator.INSTANCE);
StreamSortWindowHandlerImpl sortHandler = new StreamSortWindowHandlerImpl();
- sortHandler.prepare("sampleStream_1", MockSampleMetadataFactory.createSampleStreamSortSpec("sampleStream_1", "PT1h",5000),mockCollector);
+ sortHandler.prepare("sampleStream_1", MockSampleMetadataFactory.createSampleStreamSortSpec("sampleStream_1", "PT1h", 5000), mockCollector);
List<PartitionedEvent> sortedList = new LinkedList<>();
int i = 0;
- while(i<1000000) {
- PartitionedEvent event = MockSampleMetadataFactory.createRandomPartitionedEvent("sampleStream_1",System.currentTimeMillis()+i);
+ while (i < 1000000) {
+ PartitionedEvent event = MockSampleMetadataFactory.createRandomPartitionedEvent("sampleStream_1", System.currentTimeMillis() + i);
sortHandler.nextEvent(event);
sortedList.add(event);
- if(event.getTimestamp()>timeClock.getTime()) {
+ if (event.getTimestamp() > timeClock.getTime()) {
timeClock.moveForward(event.getTimestamp());
}
- sortHandler.onTick(timeClock,System.currentTimeMillis());
+ sortHandler.onTick(timeClock, System.currentTimeMillis());
i++;
}
sortHandler.close();
Assert.assertTrue(timeOrdering.isOrdered(sortedList));
Assert.assertTrue(timeOrdering.isOrdered(mockCollector.get()));
- Assert.assertEquals(1000000,mockCollector.get().size());
+ Assert.assertEquals(1000000, mockCollector.get().size());
}
/**
@@ -159,33 +155,33 @@ public class StreamSortHandlerTest {
StreamTimeClockInLocalMemory timeClock = new StreamTimeClockInLocalMemory("sampleStream_1");
Ordering<PartitionedEvent> timeOrdering = Ordering.from(PartitionedEventTimeOrderingComparator.INSTANCE);
StreamSortWindowHandlerImpl sortHandler = new StreamSortWindowHandlerImpl();
- sortHandler.prepare("sampleStream_1", MockSampleMetadataFactory.createSampleStreamSortSpec("sampleStream_1", "PT1h",5000),mockCollector);
+ sortHandler.prepare("sampleStream_1", MockSampleMetadataFactory.createSampleStreamSortSpec("sampleStream_1", "PT1h", 5000), mockCollector);
List<PartitionedEvent> unsortedList = new LinkedList<>();
StopWatch stopWatch = new StopWatch();
stopWatch.start();
int i = 0;
- while(i<count) {
+ while (i < count) {
PartitionedEvent event = MockSampleMetadataFactory.createRandomOutOfTimeOrderEventGroupedByName("sampleStream_1");
sortHandler.nextEvent(event);
unsortedList.add(event);
- if(event.getEvent().getTimestamp()>timeClock.getTime()) {
+ if (event.getEvent().getTimestamp() > timeClock.getTime()) {
timeClock.moveForward(event.getEvent().getTimestamp());
}
- sortHandler.onTick(timeClock,System.currentTimeMillis());
+ sortHandler.onTick(timeClock, System.currentTimeMillis());
i++;
}
stopWatch.stop();
- LOG.info("Produced {} events in {} ms",count,stopWatch.getTime());
+ LOG.info("Produced {} events in {} ms", count, stopWatch.getTime());
sortHandler.close();
Assert.assertFalse(timeOrdering.isOrdered(unsortedList));
Assert.assertTrue(timeOrdering.isOrdered(mockCollector.get()));
- Assert.assertTrue(mockCollector.get().size()>=0);
+ Assert.assertTrue(mockCollector.get().size() >= 0);
}
/**
* Used to debug window bucket lifecycle
- *
+ * <p>
* Window period: PT1h, margin: 5s
*
* @throws InterruptedException
@@ -196,29 +192,29 @@ public class StreamSortHandlerTest {
StreamTimeClockInLocalMemory timeClock = new StreamTimeClockInLocalMemory("sampleStream_1");
Ordering<PartitionedEvent> timeOrdering = Ordering.from(PartitionedEventTimeOrderingComparator.INSTANCE);
StreamSortWindowHandlerImpl sortHandler = new StreamSortWindowHandlerImpl();
- sortHandler.prepare("sampleStream_1", MockSampleMetadataFactory.createSampleStreamSortSpec("sampleStream_1", "PT1h",5000),mockCollector);
+ sortHandler.prepare("sampleStream_1", MockSampleMetadataFactory.createSampleStreamSortSpec("sampleStream_1", "PT1h", 5000), mockCollector);
List<PartitionedEvent> sortedList = new LinkedList<>();
int i = 0;
- while(i<1000000) {
- PartitionedEvent event = MockSampleMetadataFactory.createRandomPartitionedEvent("sampleStream_1",System.currentTimeMillis()+i);
+ while (i < 1000000) {
+ PartitionedEvent event = MockSampleMetadataFactory.createRandomPartitionedEvent("sampleStream_1", System.currentTimeMillis() + i);
sortHandler.nextEvent(event);
sortedList.add(event);
- if(event.getTimestamp()>timeClock.getTime()) {
+ if (event.getTimestamp() > timeClock.getTime()) {
timeClock.moveForward(event.getTimestamp());
}
- sortHandler.onTick(timeClock,System.currentTimeMillis());
+ sortHandler.onTick(timeClock, System.currentTimeMillis());
i++;
}
sortHandler.close();
Assert.assertTrue(timeOrdering.isOrdered(sortedList));
Assert.assertTrue(timeOrdering.isOrdered(mockCollector.get()));
- Assert.assertEquals(1000000,mockCollector.get().size());
+ Assert.assertEquals(1000000, mockCollector.get().size());
}
/**
* Used to debug window bucket lifecycle
- *
+ * <p>
* Window period: PT1h, margin: 5s
*
* @throws InterruptedException
@@ -229,26 +225,26 @@ public class StreamSortHandlerTest {
StreamTimeClockInLocalMemory timeClock = new StreamTimeClockInLocalMemory("sampleStream_1");
Ordering<PartitionedEvent> timeOrdering = Ordering.from(PartitionedEventTimeOrderingComparator.INSTANCE);
StreamSortWindowHandlerImpl sortHandler = new StreamSortWindowHandlerImpl();
- sortHandler.prepare("sampleStream_1", MockSampleMetadataFactory.createSampleStreamSortSpec("sampleStream_1", "PT10s",1000),mockCollector);
+ sortHandler.prepare("sampleStream_1", MockSampleMetadataFactory.createSampleStreamSortSpec("sampleStream_1", "PT10s", 1000), mockCollector);
List<PartitionedEvent> sortedList = new LinkedList<>();
PartitionedEvent event = MockSampleMetadataFactory.createRandomSortedEventGroupedByName("sampleStream_1");
sortHandler.nextEvent(event);
sortedList.add(event);
timeClock.moveForward(event.getTimestamp());
- sortHandler.onTick(timeClock,System.currentTimeMillis());
+ sortHandler.onTick(timeClock, System.currentTimeMillis());
// Triggered to become expired by System time
- sortHandler.onTick(timeClock,System.currentTimeMillis()+10*1000+1000L + 1);
+ sortHandler.onTick(timeClock, System.currentTimeMillis() + 10 * 1000 + 1000L + 1);
Assert.assertTrue(timeOrdering.isOrdered(sortedList));
Assert.assertTrue(timeOrdering.isOrdered(mockCollector.get()));
- Assert.assertEquals(1,mockCollector.get().size());
+ Assert.assertEquals(1, mockCollector.get().size());
sortHandler.close();
}
-// @Test
+ // @Test
public void testWithTimerLock() throws InterruptedException {
Timer timer = new Timer();
List<Long> collected = new ArrayList<>();
@@ -265,6 +261,6 @@ public class StreamSortHandlerTest {
}
}
}
- },0,100);
+ }, 0, 100);
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a0fc8708/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/sorter/StreamWindowBenchmarkTest.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/sorter/StreamWindowBenchmarkTest.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/sorter/StreamWindowBenchmarkTest.java
index a2f0da6..3d32d72 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/sorter/StreamWindowBenchmarkTest.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/sorter/StreamWindowBenchmarkTest.java
@@ -16,10 +16,11 @@
*/
package org.apache.eagle.alert.engine.sorter;
-import java.util.Map;
-import java.util.TreeMap;
-import java.util.concurrent.TimeUnit;
-
+import com.codahale.metrics.ConsoleReporter;
+import com.codahale.metrics.MetricRegistry;
+import com.codahale.metrics.ScheduledReporter;
+import com.codahale.metrics.jvm.GarbageCollectorMetricSet;
+import com.codahale.metrics.jvm.MemoryUsageGaugeSet;
import org.apache.commons.lang.time.StopWatch;
import org.apache.eagle.alert.engine.mock.MockSampleMetadataFactory;
import org.apache.eagle.alert.engine.model.PartitionedEvent;
@@ -31,115 +32,120 @@ import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.codahale.metrics.ConsoleReporter;
-import com.codahale.metrics.MetricRegistry;
-import com.codahale.metrics.ScheduledReporter;
-import com.codahale.metrics.jvm.GarbageCollectorMetricSet;
-import com.codahale.metrics.jvm.MemoryUsageGaugeSet;
+import java.util.Map;
+import java.util.TreeMap;
+import java.util.concurrent.TimeUnit;
@Ignore("Ignore automatic heavy benchmark test")
public class StreamWindowBenchmarkTest {
private final static Logger LOGGER = LoggerFactory.getLogger(StreamWindowBenchmarkTest.class);
+
public void sendDESCOrderedEventsToWindow(StreamWindow window, StreamWindowRepository.StorageType storageType, int num) {
- LOGGER.info("Sending {} events to {} ({})",num,window.getClass().getSimpleName(),storageType);
+ LOGGER.info("Sending {} events to {} ({})", num, window.getClass().getSimpleName(), storageType);
StopWatch stopWatch = new StopWatch();
stopWatch.start();
- int i=0;
- while(i<num) {
- PartitionedEvent event = MockSampleMetadataFactory.createPartitionedEventGroupedByName("sampleStream_1",(window.startTime()+i));
+ int i = 0;
+ while (i < num) {
+ PartitionedEvent event = MockSampleMetadataFactory.createPartitionedEventGroupedByName("sampleStream_1", (window.startTime() + i));
window.add(event);
i++;
}
stopWatch.stop();
- performanceReport.put(num+"\tInsertTime\t"+storageType,stopWatch.getTime());
- LOGGER.info("Inserted {} events in {} ms",num,stopWatch.getTime());
+ performanceReport.put(num + "\tInsertTime\t" + storageType, stopWatch.getTime());
+ LOGGER.info("Inserted {} events in {} ms", num, stopWatch.getTime());
stopWatch.reset();
stopWatch.start();
window.flush();
stopWatch.stop();
- performanceReport.put(num+"\tReadTime\t"+storageType,stopWatch.getTime());
+ performanceReport.put(num + "\tReadTime\t" + storageType, stopWatch.getTime());
}
private ScheduledReporter metricReporter;
- private Map<String,Long> performanceReport;
+ private Map<String, Long> performanceReport;
+
@Before
- public void setUp(){
+ public void setUp() {
final MetricRegistry metrics = new MetricRegistry();
metrics.registerAll(new MemoryUsageGaugeSet());
metrics.registerAll(new GarbageCollectorMetricSet());
metricReporter = ConsoleReporter.forRegistry(metrics)
- .filter((name, metric) -> name.matches("(.*heap|total).(usage|used)"))
+ .filter((name, metric) -> name.matches("(.*heap|total).(usage|used)"))
// .withLoggingLevel(Slf4jReporter.LoggingLevel.DEBUG)
- .convertRatesTo(TimeUnit.SECONDS)
- .convertDurationsTo(TimeUnit.MILLISECONDS)
- .build();
- metricReporter.start(60,TimeUnit.SECONDS);
+ .convertRatesTo(TimeUnit.SECONDS)
+ .convertDurationsTo(TimeUnit.MILLISECONDS)
+ .build();
+ metricReporter.start(60, TimeUnit.SECONDS);
performanceReport = new TreeMap<>();
}
@After
- public void after(){
+ public void after() {
StringBuilder sb = new StringBuilder();
- for(Map.Entry<String,Long> entry:performanceReport.entrySet()){
- sb.append(String.format("%-40s\t%s\n",entry.getKey(),entry.getValue()));
+ for (Map.Entry<String, Long> entry : performanceReport.entrySet()) {
+ sb.append(String.format("%-40s\t%s\n", entry.getKey(), entry.getValue()));
}
- LOGGER.info("\n===== Benchmark Result Report =====\n\n{}",sb.toString());
+ LOGGER.info("\n===== Benchmark Result Report =====\n\n{}", sb.toString());
}
private final long start = DateTimeUtil.humanDateToMillisecondsWithoutException("2016-05-04 00:00:00,000");
private final long stop = DateTimeUtil.humanDateToMillisecondsWithoutException("2016-05-05 00:00:00,000");
- private final long margin = (stop - start)/3;
+ private final long margin = (stop - start) / 3;
- private void benchmarkTest(StreamWindow window, StreamWindowRepository.StorageType storageType){
+ private void benchmarkTest(StreamWindow window, StreamWindowRepository.StorageType storageType) {
StopWatch stopWatch = new StopWatch();
stopWatch.start();
- LOGGER.info("\n===== Benchmark Test for {} ({}) =====",window.getClass().getSimpleName(),storageType);
+ LOGGER.info("\n===== Benchmark Test for {} ({}) =====", window.getClass().getSimpleName(), storageType);
metricReporter.report();
- sendDESCOrderedEventsToWindow(window,storageType,1000);
+ sendDESCOrderedEventsToWindow(window, storageType, 1000);
metricReporter.report();
- sendDESCOrderedEventsToWindow(window,storageType,10000);
+ sendDESCOrderedEventsToWindow(window, storageType, 10000);
metricReporter.report();
- sendDESCOrderedEventsToWindow(window,storageType,100000);
+ sendDESCOrderedEventsToWindow(window, storageType, 100000);
metricReporter.report();
- sendDESCOrderedEventsToWindow(window,storageType,1000000);
+ sendDESCOrderedEventsToWindow(window, storageType, 1000000);
metricReporter.report();
stopWatch.stop();
- LOGGER.info("\n===== Finished in total {} ms =====\n",stopWatch.getTime());
+ LOGGER.info("\n===== Finished in total {} ms =====\n", stopWatch.getTime());
}
- @Test @Ignore
- public void testStreamWindowBenchmarkMain(){
+ @Test
+ @Ignore
+ public void testStreamWindowBenchmarkMain() {
testStreamSortedWindowOnHeap();
testStreamSortedWindowInSerializedMemory();
testStreamSortedWindowOffHeap();
testStreamSortedWindowFile();
}
- @Test @Ignore
+ @Test
+ @Ignore
public void testStreamSortedWindowOnHeap() {
- StreamWindow window = StreamWindowRepository.getSingletonInstance().createWindow(start,stop,margin,StreamWindowRepository.StorageType.ONHEAP);
- benchmarkTest(window,StreamWindowRepository.StorageType.ONHEAP);
+ StreamWindow window = StreamWindowRepository.getSingletonInstance().createWindow(start, stop, margin, StreamWindowRepository.StorageType.ONHEAP);
+ benchmarkTest(window, StreamWindowRepository.StorageType.ONHEAP);
window.close();
}
- @Test @Ignore
+ @Test
+ @Ignore
public void testStreamSortedWindowInSerializedMemory() {
- StreamWindow window = StreamWindowRepository.getSingletonInstance().createWindow(start,stop,margin,StreamWindowRepository.StorageType.MEMORY);
- benchmarkTest(window,StreamWindowRepository.StorageType.MEMORY);
+ StreamWindow window = StreamWindowRepository.getSingletonInstance().createWindow(start, stop, margin, StreamWindowRepository.StorageType.MEMORY);
+ benchmarkTest(window, StreamWindowRepository.StorageType.MEMORY);
window.close();
}
- @Test @Ignore
+ @Test
+ @Ignore
public void testStreamSortedWindowOffHeap() {
- StreamWindow window = StreamWindowRepository.getSingletonInstance().createWindow(start,stop,margin,StreamWindowRepository.StorageType.DIRECT_MEMORY);
- benchmarkTest(window,StreamWindowRepository.StorageType.DIRECT_MEMORY);
+ StreamWindow window = StreamWindowRepository.getSingletonInstance().createWindow(start, stop, margin, StreamWindowRepository.StorageType.DIRECT_MEMORY);
+ benchmarkTest(window, StreamWindowRepository.StorageType.DIRECT_MEMORY);
window.close();
}
- @Test @Ignore
+ @Test
+ @Ignore
public void testStreamSortedWindowFile() {
- StreamWindow window = StreamWindowRepository.getSingletonInstance().createWindow(start,stop,margin,StreamWindowRepository.StorageType.FILE_RAF);
- benchmarkTest(window,StreamWindowRepository.StorageType.FILE_RAF);
+ StreamWindow window = StreamWindowRepository.getSingletonInstance().createWindow(start, stop, margin, StreamWindowRepository.StorageType.FILE_RAF);
+ benchmarkTest(window, StreamWindowRepository.StorageType.FILE_RAF);
window.close();
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a0fc8708/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/sorter/StreamWindowTestSuite.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/sorter/StreamWindowTestSuite.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/sorter/StreamWindowTestSuite.java
index 950aa34..6dc9c9f 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/sorter/StreamWindowTestSuite.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/sorter/StreamWindowTestSuite.java
@@ -16,8 +16,7 @@
*/
package org.apache.eagle.alert.engine.sorter;
-import java.util.List;
-
+import com.google.common.collect.Ordering;
import org.apache.eagle.alert.engine.mock.MockPartitionedCollector;
import org.apache.eagle.alert.engine.mock.MockSampleMetadataFactory;
import org.apache.eagle.alert.engine.model.PartitionedEvent;
@@ -29,7 +28,7 @@ import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.google.common.collect.Ordering;
+import java.util.List;
@SuppressWarnings("unused")
public class StreamWindowTestSuite {
@@ -37,34 +36,34 @@ public class StreamWindowTestSuite {
private final long start = DateTimeUtil.humanDateToMillisecondsWithoutException("2016-05-04 00:00:00,000");
private final long stop = DateTimeUtil.humanDateToMillisecondsWithoutException("2016-05-04 00:01:00,000");
- private final long margin = (stop - start)/3;
+ private final long margin = (stop - start) / 3;
@Test
public void testStreamSortedWindowOnHeap() {
- StreamWindow window = StreamWindowRepository.getSingletonInstance().createWindow(start,stop,margin,StreamWindowRepository.StorageType.ONHEAP);
+ StreamWindow window = StreamWindowRepository.getSingletonInstance().createWindow(start, stop, margin, StreamWindowRepository.StorageType.ONHEAP);
streamSortedWindowMustTest(window);
}
@Test
public void testStreamSortedWindowInSerializedMemory() {
- StreamWindow window = StreamWindowRepository.getSingletonInstance().createWindow(start,stop,margin,StreamWindowRepository.StorageType.MEMORY);
+ StreamWindow window = StreamWindowRepository.getSingletonInstance().createWindow(start, stop, margin, StreamWindowRepository.StorageType.MEMORY);
streamSortedWindowMustTest(window);
}
@Test
public void testStreamSortedWindowOffHeap() {
- StreamWindow window = StreamWindowRepository.getSingletonInstance().createWindow(start,stop,margin,StreamWindowRepository.StorageType.DIRECT_MEMORY);
+ StreamWindow window = StreamWindowRepository.getSingletonInstance().createWindow(start, stop, margin, StreamWindowRepository.StorageType.DIRECT_MEMORY);
streamSortedWindowMustTest(window);
}
@Test
public void testStreamSortedWindowFile() {
- StreamWindow window = StreamWindowRepository.getSingletonInstance().createWindow(start,stop,margin,StreamWindowRepository.StorageType.FILE_RAF);
+ StreamWindow window = StreamWindowRepository.getSingletonInstance().createWindow(start, stop, margin, StreamWindowRepository.StorageType.FILE_RAF);
streamSortedWindowMustTest(window);
}
- @SuppressWarnings({ "unchecked", "rawtypes" })
- private void streamSortedWindowMustTest(StreamWindow window){
+ @SuppressWarnings( {"unchecked", "rawtypes"})
+ private void streamSortedWindowMustTest(StreamWindow window) {
MockPartitionedCollector collector = new MockPartitionedCollector();
window.register(collector);
@@ -72,7 +71,7 @@ public class StreamWindowTestSuite {
clock.moveForward(DateTimeUtil.humanDateToMillisecondsWithoutException("2016-05-04 00:00:30,000"));
// Current time is: "2016-05-04 00:00:30"
- window.onTick(clock,System.currentTimeMillis());
+ window.onTick(clock, System.currentTimeMillis());
Assert.assertTrue(window.alive());
Assert.assertFalse(window.expired());
@@ -91,74 +90,74 @@ public class StreamWindowTestSuite {
Assert.assertFalse(window.accept(DateTimeUtil.humanDateToMillisecondsWithoutException("2016-05-04 00:01:01,000")));
// Accepted
- Assert.assertTrue(window.add(MockSampleMetadataFactory.createRandomPartitionedEvent("sampleStream_1",DateTimeUtil.humanDateToMillisecondsWithoutException("2016-05-04 00:00:00,000"))));
+ Assert.assertTrue(window.add(MockSampleMetadataFactory.createRandomPartitionedEvent("sampleStream_1", DateTimeUtil.humanDateToMillisecondsWithoutException("2016-05-04 00:00:00,000"))));
- Assert.assertTrue(window.add(MockSampleMetadataFactory.createRandomPartitionedEvent("sampleStream_1",DateTimeUtil.humanDateToMillisecondsWithoutException("2016-05-04 00:00:01,000"))));
- Assert.assertTrue(window.add(MockSampleMetadataFactory.createRandomPartitionedEvent("sampleStream_1",DateTimeUtil.humanDateToMillisecondsWithoutException("2016-05-04 00:00:50,000"))));
- Assert.assertTrue(window.add(MockSampleMetadataFactory.createRandomPartitionedEvent("sampleStream_1",DateTimeUtil.humanDateToMillisecondsWithoutException("2016-05-04 00:00:40,000"))));
- Assert.assertTrue(window.add(MockSampleMetadataFactory.createRandomPartitionedEvent("sampleStream_1",DateTimeUtil.humanDateToMillisecondsWithoutException("2016-05-04 00:00:30,000"))));
+ Assert.assertTrue(window.add(MockSampleMetadataFactory.createRandomPartitionedEvent("sampleStream_1", DateTimeUtil.humanDateToMillisecondsWithoutException("2016-05-04 00:00:01,000"))));
+ Assert.assertTrue(window.add(MockSampleMetadataFactory.createRandomPartitionedEvent("sampleStream_1", DateTimeUtil.humanDateToMillisecondsWithoutException("2016-05-04 00:00:50,000"))));
+ Assert.assertTrue(window.add(MockSampleMetadataFactory.createRandomPartitionedEvent("sampleStream_1", DateTimeUtil.humanDateToMillisecondsWithoutException("2016-05-04 00:00:40,000"))));
+ Assert.assertTrue(window.add(MockSampleMetadataFactory.createRandomPartitionedEvent("sampleStream_1", DateTimeUtil.humanDateToMillisecondsWithoutException("2016-05-04 00:00:30,000"))));
// Should accept Duplicated
- Assert.assertTrue("Should support duplicated timestamp",window.add(MockSampleMetadataFactory.createRandomPartitionedEvent("sampleStream_1",DateTimeUtil.humanDateToMillisecondsWithoutException("2016-05-04 00:00:00,000"))));
+ Assert.assertTrue("Should support duplicated timestamp", window.add(MockSampleMetadataFactory.createRandomPartitionedEvent("sampleStream_1", DateTimeUtil.humanDateToMillisecondsWithoutException("2016-05-04 00:00:00,000"))));
- Assert.assertEquals(6,window.size());
+ Assert.assertEquals(6, window.size());
// Rejected
- Assert.assertFalse(window.add(MockSampleMetadataFactory.createRandomPartitionedEvent("sampleStream_1",DateTimeUtil.humanDateToMillisecondsWithoutException("2016-05-03 23:59:59,000"))));
- Assert.assertFalse(window.add(MockSampleMetadataFactory.createRandomPartitionedEvent("sampleStream_1",DateTimeUtil.humanDateToMillisecondsWithoutException("2016-05-04 00:01:00,000"))));
- Assert.assertFalse(window.add(MockSampleMetadataFactory.createRandomPartitionedEvent("sampleStream_1",DateTimeUtil.humanDateToMillisecondsWithoutException("2016-05-04 00:01:01,000"))));
+ Assert.assertFalse(window.add(MockSampleMetadataFactory.createRandomPartitionedEvent("sampleStream_1", DateTimeUtil.humanDateToMillisecondsWithoutException("2016-05-03 23:59:59,000"))));
+ Assert.assertFalse(window.add(MockSampleMetadataFactory.createRandomPartitionedEvent("sampleStream_1", DateTimeUtil.humanDateToMillisecondsWithoutException("2016-05-04 00:01:00,000"))));
+ Assert.assertFalse(window.add(MockSampleMetadataFactory.createRandomPartitionedEvent("sampleStream_1", DateTimeUtil.humanDateToMillisecondsWithoutException("2016-05-04 00:01:01,000"))));
- Assert.assertEquals(6,window.size());
+ Assert.assertEquals(6, window.size());
// Now is: "2016-05-04 00:00:55"
clock.moveForward(DateTimeUtil.humanDateToMillisecondsWithoutException("2016-05-04 00:00:55,000"));
- window.onTick(clock,System.currentTimeMillis());
+ window.onTick(clock, System.currentTimeMillis());
Assert.assertTrue(window.alive());
Assert.assertFalse(window.expired());
- Assert.assertTrue(window.add(MockSampleMetadataFactory.createRandomPartitionedEvent("sampleStream_1",DateTimeUtil.humanDateToMillisecondsWithoutException("2016-05-04 00:00:35,000"))));
- Assert.assertEquals(7,window.size());
+ Assert.assertTrue(window.add(MockSampleMetadataFactory.createRandomPartitionedEvent("sampleStream_1", DateTimeUtil.humanDateToMillisecondsWithoutException("2016-05-04 00:00:35,000"))));
+ Assert.assertEquals(7, window.size());
// Flush when stream time delay too much after system time but window will still be alive
- window.onTick(clock,System.currentTimeMillis() + 1 + stop - start + margin);
+ window.onTick(clock, System.currentTimeMillis() + 1 + stop - start + margin);
Assert.assertTrue(window.alive());
Assert.assertFalse(window.expired());
- Assert.assertEquals(0,window.size());
- Assert.assertEquals(7,collector.size());
+ Assert.assertEquals(0, window.size());
+ Assert.assertEquals(7, collector.size());
Assert.assertFalse("Because window has flushed but not expired, window should reject future events < last flush stream time",
- window.add(MockSampleMetadataFactory.createRandomPartitionedEvent("sampleStream_1",DateTimeUtil.humanDateToMillisecondsWithoutException("2016-05-04 00:00:54,000"))));
+ window.add(MockSampleMetadataFactory.createRandomPartitionedEvent("sampleStream_1", DateTimeUtil.humanDateToMillisecondsWithoutException("2016-05-04 00:00:54,000"))));
Assert.assertTrue("Because window has flushed but not expired, window should still accept future events >= last flush stream time",
- window.add(MockSampleMetadataFactory.createRandomPartitionedEvent("sampleStream_1",DateTimeUtil.humanDateToMillisecondsWithoutException("2016-05-04 00:00:56,000"))));
- Assert.assertEquals(1,window.size());
- Assert.assertEquals(7,collector.size());
+ window.add(MockSampleMetadataFactory.createRandomPartitionedEvent("sampleStream_1", DateTimeUtil.humanDateToMillisecondsWithoutException("2016-05-04 00:00:56,000"))));
+ Assert.assertEquals(1, window.size());
+ Assert.assertEquals(7, collector.size());
// Now is: "2016-05-04 00:01:10", not expire,
clock.moveForward(DateTimeUtil.humanDateToMillisecondsWithoutException("2016-05-04 00:01:10,000"));
- window.onTick(clock,System.currentTimeMillis() + 2 * (1+ stop - start + margin));
- Assert.assertEquals(8,collector.size());
+ window.onTick(clock, System.currentTimeMillis() + 2 * (1 + stop - start + margin));
+ Assert.assertEquals(8, collector.size());
// Now is: "2016-05-04 00:01:20", expire
clock.moveForward(DateTimeUtil.humanDateToMillisecondsWithoutException("2016-05-04 00:01:20,000"));
- window.onTick(clock,System.currentTimeMillis());
+ window.onTick(clock, System.currentTimeMillis());
Assert.assertFalse(window.alive());
Assert.assertTrue(window.expired());
- Assert.assertFalse(window.add(MockSampleMetadataFactory.createRandomPartitionedEvent("sampleStream_1",DateTimeUtil.humanDateToMillisecondsWithoutException("2016-05-04 00:00:35,000"))));
- Assert.assertEquals(0,window.size());
+ Assert.assertFalse(window.add(MockSampleMetadataFactory.createRandomPartitionedEvent("sampleStream_1", DateTimeUtil.humanDateToMillisecondsWithoutException("2016-05-04 00:00:35,000"))));
+ Assert.assertEquals(0, window.size());
- Assert.assertEquals(8,collector.size());
+ Assert.assertEquals(8, collector.size());
Ordering ordering = Ordering.from(PartitionedEventTimeOrderingComparator.INSTANCE);
Assert.assertTrue(ordering.isOrdered(collector.get()));
List<PartitionedEvent> list = collector.get();
- Assert.assertEquals(8,list.size());
- Assert.assertEquals(DateTimeUtil.humanDateToMillisecondsWithoutException("2016-05-04 00:00:00,000"),list.get(0).getTimestamp());
- Assert.assertEquals(DateTimeUtil.humanDateToMillisecondsWithoutException("2016-05-04 00:00:00,000"),list.get(1).getTimestamp());
- Assert.assertEquals(DateTimeUtil.humanDateToMillisecondsWithoutException("2016-05-04 00:00:01,000"),list.get(2).getTimestamp());
- Assert.assertEquals(DateTimeUtil.humanDateToMillisecondsWithoutException("2016-05-04 00:00:30,000"),list.get(3).getTimestamp());
- Assert.assertEquals(DateTimeUtil.humanDateToMillisecondsWithoutException("2016-05-04 00:00:35,000"),list.get(4).getTimestamp());
- Assert.assertEquals(DateTimeUtil.humanDateToMillisecondsWithoutException("2016-05-04 00:00:40,000"),list.get(5).getTimestamp());
- Assert.assertEquals(DateTimeUtil.humanDateToMillisecondsWithoutException("2016-05-04 00:00:50,000"),list.get(6).getTimestamp());
- Assert.assertEquals(DateTimeUtil.humanDateToMillisecondsWithoutException("2016-05-04 00:00:56,000"),list.get(7).getTimestamp());
+ Assert.assertEquals(8, list.size());
+ Assert.assertEquals(DateTimeUtil.humanDateToMillisecondsWithoutException("2016-05-04 00:00:00,000"), list.get(0).getTimestamp());
+ Assert.assertEquals(DateTimeUtil.humanDateToMillisecondsWithoutException("2016-05-04 00:00:00,000"), list.get(1).getTimestamp());
+ Assert.assertEquals(DateTimeUtil.humanDateToMillisecondsWithoutException("2016-05-04 00:00:01,000"), list.get(2).getTimestamp());
+ Assert.assertEquals(DateTimeUtil.humanDateToMillisecondsWithoutException("2016-05-04 00:00:30,000"), list.get(3).getTimestamp());
+ Assert.assertEquals(DateTimeUtil.humanDateToMillisecondsWithoutException("2016-05-04 00:00:35,000"), list.get(4).getTimestamp());
+ Assert.assertEquals(DateTimeUtil.humanDateToMillisecondsWithoutException("2016-05-04 00:00:40,000"), list.get(5).getTimestamp());
+ Assert.assertEquals(DateTimeUtil.humanDateToMillisecondsWithoutException("2016-05-04 00:00:50,000"), list.get(6).getTimestamp());
+ Assert.assertEquals(DateTimeUtil.humanDateToMillisecondsWithoutException("2016-05-04 00:00:56,000"), list.get(7).getTimestamp());
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a0fc8708/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/sorter/TreeMultisetComparatorTest.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/sorter/TreeMultisetComparatorTest.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/sorter/TreeMultisetComparatorTest.java
index b7e76b8..c59f0de 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/sorter/TreeMultisetComparatorTest.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/sorter/TreeMultisetComparatorTest.java
@@ -16,9 +16,7 @@
*/
package org.apache.eagle.alert.engine.sorter;
-import java.util.Arrays;
-import java.util.Iterator;
-
+import com.google.common.collect.TreeMultiset;
import org.apache.eagle.alert.engine.coordinator.StreamPartition;
import org.apache.eagle.alert.engine.model.PartitionedEvent;
import org.apache.eagle.alert.engine.model.StreamEvent;
@@ -26,7 +24,8 @@ import org.apache.eagle.alert.engine.sorter.impl.PartitionedEventTimeOrderingCom
import org.junit.Assert;
import org.junit.Test;
-import com.google.common.collect.TreeMultiset;
+import java.util.Arrays;
+import java.util.Iterator;
/**
* Since 5/10/16.
@@ -37,7 +36,7 @@ public class TreeMultisetComparatorTest {
* when they are added into TreeMultiset, the second event will be replaced by the first event
*/
@Test
- public void testComparator(){
+ public void testComparator() {
TreeMultiset<PartitionedEvent> set = TreeMultiset.create(PartitionedEventTimeOrderingComparator.INSTANCE);
// construct PartitionEvent1
@@ -50,7 +49,7 @@ public class TreeMultisetComparatorTest {
event1.setPartition(sp);
event1.setPartitionKey(1000);
StreamEvent e1 = new StreamEvent();
- e1.setData(new Object[]{18.4});
+ e1.setData(new Object[] {18.4});
e1.setStreamId("testStreamId");
e1.setTimestamp(1462909984000L);
event1.setEvent(e1);
@@ -60,7 +59,7 @@ public class TreeMultisetComparatorTest {
event2.setPartition(sp);
event2.setPartitionKey(1000);
StreamEvent e2 = new StreamEvent();
- e2.setData(new Object[]{16.3});
+ e2.setData(new Object[] {16.3});
e2.setStreamId("testStreamId");
e2.setTimestamp(1462909984000L);
event2.setEvent(e2);
@@ -70,7 +69,7 @@ public class TreeMultisetComparatorTest {
event3.setPartition(sp);
event3.setPartitionKey(1000);
StreamEvent e3 = new StreamEvent();
- e3.setData(new Object[]{14.3});
+ e3.setData(new Object[] {14.3});
e3.setStreamId("testStreamId");
e3.setTimestamp(1462909984001L);
event3.setEvent(e3);
@@ -79,13 +78,13 @@ public class TreeMultisetComparatorTest {
event4.setPartition(sp);
event4.setPartitionKey(1000);
StreamEvent e4 = new StreamEvent();
- e4.setData(new Object[]{14.3});
+ e4.setData(new Object[] {14.3});
e4.setStreamId("testStreamId");
e4.setTimestamp(1462909984001L);
event4.setEvent(e4);
- Assert.assertNotEquals(event2,event3);
- Assert.assertEquals(event3,event4);
+ Assert.assertNotEquals(event2, event3);
+ Assert.assertEquals(event3, event4);
// check content in set
set.add(event1);
@@ -96,15 +95,15 @@ public class TreeMultisetComparatorTest {
set.forEach(System.out::println);
- Assert.assertEquals(-1,PartitionedEventTimeOrderingComparator.INSTANCE.compare(event1,event2));
- Assert.assertEquals(-1,PartitionedEventTimeOrderingComparator.INSTANCE.compare(event1,event3));
- Assert.assertEquals(-1,PartitionedEventTimeOrderingComparator.INSTANCE.compare(event2,event3));
- Assert.assertEquals(0,PartitionedEventTimeOrderingComparator.INSTANCE.compare(event3,event4));
+ Assert.assertEquals(-1, PartitionedEventTimeOrderingComparator.INSTANCE.compare(event1, event2));
+ Assert.assertEquals(-1, PartitionedEventTimeOrderingComparator.INSTANCE.compare(event1, event3));
+ Assert.assertEquals(-1, PartitionedEventTimeOrderingComparator.INSTANCE.compare(event2, event3));
+ Assert.assertEquals(0, PartitionedEventTimeOrderingComparator.INSTANCE.compare(event3, event4));
Iterator<PartitionedEvent> it = set.iterator();
- Assert.assertEquals(16.3,it.next().getData()[0]);
- Assert.assertEquals(18.4,it.next().getData()[0]);
- Assert.assertEquals(14.3,it.next().getData()[0]);
- Assert.assertEquals(14.3,it.next().getData()[0]);
+ Assert.assertEquals(16.3, it.next().getData()[0]);
+ Assert.assertEquals(18.4, it.next().getData()[0]);
+ Assert.assertEquals(14.3, it.next().getData()[0]);
+ Assert.assertEquals(14.3, it.next().getData()[0]);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a0fc8708/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/statecheck/TestStateCheckPolicy.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/statecheck/TestStateCheckPolicy.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/statecheck/TestStateCheckPolicy.java
index 8a5d616..1fc54a9 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/statecheck/TestStateCheckPolicy.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/statecheck/TestStateCheckPolicy.java
@@ -106,7 +106,7 @@ public class TestStateCheckPolicy {
mapdata.put("value", 1000.0 + i * 1000.0);
mapdata.put("colo", "phx");
- StreamEvent event = StreamEvent.Builder().timestamep(time).attributes(mapdata, definition).build();
+ StreamEvent event = StreamEvent.builder().timestamep(time).attributes(mapdata, definition).build();
PartitionedEvent pEvent = new PartitionedEvent(event, policyDefinition.getPartitionSpec().get(0), 1);
GeneralTopologyContext mock = Mockito.mock(GeneralTopologyContext.class);
@@ -122,12 +122,13 @@ public class TestStateCheckPolicy {
@NotNull
private Map<String, StreamDefinition> createStreamMap() throws Exception {
List<StreamDefinition> streams = mapper.readValue(TestStateCheckPolicy.class.getResourceAsStream("/statecheck/streamdefinitions.json"),
- new TypeReference<List<StreamDefinition>>() {
- });
+ new TypeReference<List<StreamDefinition>>() {
+ });
return streams.stream().collect(Collectors.toMap(StreamDefinition::getStreamId, item -> item));
}
private static ObjectMapper mapper = new ObjectMapper();
+
static {
mapper.configure(DeserializationConfig.Feature.FAIL_ON_UNKNOWN_PROPERTIES, false);
}
@@ -139,8 +140,8 @@ public class TestStateCheckPolicy {
spec.setTopologyName("testTopology");
List<PolicyDefinition> policies = mapper.readValue(TestStateCheckPolicy.class.getResourceAsStream("/statecheck/policies.json"),
- new TypeReference<List<PolicyDefinition>>() {
- });
+ new TypeReference<List<PolicyDefinition>>() {
+ });
Assert.assertTrue(policies.size() > 0);
spec.addBoltPolicy("alertBolt1", policies.get(0).getName());
spec.getBoltPoliciesMap().put("alertBolt1", new ArrayList<>(policies));
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a0fc8708/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/topology/AlertTopologyTest.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/topology/AlertTopologyTest.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/topology/AlertTopologyTest.java
index 30a0ef9..7212785 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/topology/AlertTopologyTest.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/topology/AlertTopologyTest.java
@@ -19,10 +19,13 @@
package org.apache.eagle.alert.engine.topology;
-import java.io.Serializable;
-import java.util.HashMap;
-import java.util.Properties;
-
+import backtype.storm.LocalCluster;
+import backtype.storm.topology.BoltDeclarer;
+import backtype.storm.topology.SpoutDeclarer;
+import backtype.storm.topology.TopologyBuilder;
+import backtype.storm.tuple.Fields;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
import org.apache.eagle.alert.engine.spout.CorrelationSpout;
import org.apache.eagle.alert.engine.spout.CreateTopicUtils;
import org.apache.kafka.clients.producer.KafkaProducer;
@@ -34,19 +37,15 @@ import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import backtype.storm.LocalCluster;
-import backtype.storm.topology.BoltDeclarer;
-import backtype.storm.topology.SpoutDeclarer;
-import backtype.storm.topology.TopologyBuilder;
-import backtype.storm.tuple.Fields;
-
-import com.typesafe.config.Config;
-import com.typesafe.config.ConfigFactory;
+import java.io.Serializable;
+import java.util.HashMap;
+import java.util.Properties;
-@SuppressWarnings({"serial", "unused"})
-public class AlertTopologyTest implements Serializable{
+@SuppressWarnings( {"serial", "unused"})
+public class AlertTopologyTest implements Serializable {
private static final Logger LOG = LoggerFactory.getLogger(AlertTopologyTest.class);
char[] alphabets = {'a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i', 'j', 'k', 'l', 'm', 'n', 'o', 'p', 'q', 'r', 's', 't', 'u', 'v', 'w', 'x', 'y', 'z'};
+
@Ignore
@Test
public void testMultipleTopics() throws Exception {
@@ -83,7 +82,7 @@ public class AlertTopologyTest implements Serializable{
LocalCluster cluster = new LocalCluster();
cluster.submitTopology(topoName, new HashMap<>(), topoBuilder.createTopology());
- while(true) {
+ while (true) {
try {
Thread.sleep(1000);
} catch (Exception e) {
@@ -92,10 +91,10 @@ public class AlertTopologyTest implements Serializable{
}
}
- @SuppressWarnings({ "rawtypes", "unchecked" })
+ @SuppressWarnings( {"rawtypes", "unchecked"})
@Ignore
@Test
- public void generateRandomStringsToKafka(){
+ public void generateRandomStringsToKafka() {
String topic = "testTopic3";
int max = 1000;
Properties configMap = new Properties();
@@ -104,33 +103,34 @@ public class AlertTopologyTest implements Serializable{
configMap.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
configMap.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
configMap.put("request.required.acks", "1");
- configMap.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
- configMap.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
+ configMap.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
+ configMap.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaProducer<String, Object> producer = new KafkaProducer<>(configMap);
int i = 0;
- while(i++ < max) {
+ while (i++ < max) {
String randomString = generateRandomString();
System.out.println("sending string : " + randomString);
ProducerRecord record = new ProducerRecord(topic, randomString);
producer.send(record);
- if(i % 10 == 0){
+ if (i % 10 == 0) {
try {
Thread.sleep(10);
- }catch(Exception ex){
+ } catch (Exception ex) {
}
}
}
producer.close();
}
- private String generateRandomString(){
+ private String generateRandomString() {
long count = Math.round(Math.random() * 10);
- if(count == 0)
+ if (count == 0) {
count = 1;
+ }
StringBuilder sb = new StringBuilder();
- while(count-- > 0) {
- int index = (int)(Math.floor(Math.random()*26));
+ while (count-- > 0) {
+ int index = (int) (Math.floor(Math.random() * 26));
sb.append(alphabets[index]);
}
return sb.toString();
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a0fc8708/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/topology/CoordinatorSpoutIntegrationTest.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/topology/CoordinatorSpoutIntegrationTest.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/topology/CoordinatorSpoutIntegrationTest.java
index deca9b4..d011770 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/topology/CoordinatorSpoutIntegrationTest.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/topology/CoordinatorSpoutIntegrationTest.java
@@ -19,9 +19,15 @@
package org.apache.eagle.alert.engine.topology;
-import java.io.Serializable;
-import java.util.HashMap;
-
+import backtype.storm.LocalCluster;
+import backtype.storm.generated.StormTopology;
+import backtype.storm.topology.BoltDeclarer;
+import backtype.storm.topology.SpoutDeclarer;
+import backtype.storm.topology.TopologyBuilder;
+import backtype.storm.tuple.Fields;
+import backtype.storm.utils.Utils;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
import org.apache.eagle.alert.engine.spout.CorrelationSpout;
import org.apache.eagle.alert.engine.spout.CreateTopicUtils;
import org.apache.eagle.alert.utils.AlertConstants;
@@ -33,26 +39,19 @@ import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import backtype.storm.LocalCluster;
-import backtype.storm.generated.StormTopology;
-import backtype.storm.topology.BoltDeclarer;
-import backtype.storm.topology.SpoutDeclarer;
-import backtype.storm.topology.TopologyBuilder;
-import backtype.storm.tuple.Fields;
-import backtype.storm.utils.Utils;
-
-import com.typesafe.config.Config;
-import com.typesafe.config.ConfigFactory;
+import java.io.Serializable;
+import java.util.HashMap;
/**
* Since 4/28/16.
*/
-@SuppressWarnings({"serial", "unused", "rawtypes"})
+@SuppressWarnings( {"serial", "unused", "rawtypes"})
public class CoordinatorSpoutIntegrationTest implements Serializable {
private static final Logger LOG = LoggerFactory.getLogger(CoordinatorSpoutIntegrationTest.class);
+
@Ignore // this test need zookeeper
@Test
- public void testConfigNotify() throws Exception{
+ public void testConfigNotify() throws Exception {
final String topoId = "myTopology";
int numGroupbyBolts = 2;
int numTotalGroupbyBolts = 3;
@@ -77,14 +76,14 @@ public class CoordinatorSpoutIntegrationTest implements Serializable {
int numBolts = config.getInt("correlation.numGroupbyBolts");
String spoutId = "correlation-spout";
CorrelationSpout spout = new CorrelationSpout(config, topoId,
- new MockMetadataChangeNotifyService(topoId, spoutId), numBolts);
+ new MockMetadataChangeNotifyService(topoId, spoutId), numBolts);
SpoutDeclarer declarer = topoBuilder.setSpout(spoutId, spout);
declarer.setNumTasks(2);
for (int i = 0; i < numBolts; i++) {
TestBolt bolt = new TestBolt();
BoltDeclarer boltDecl = topoBuilder.setBolt("engineBolt" + i, bolt);
boltDecl.fieldsGrouping(spoutId,
- StreamIdConversion.generateStreamIdBetween(AlertConstants.DEFAULT_SPOUT_NAME, AlertConstants.DEFAULT_ROUTERBOLT_NAME+i), new Fields());
+ StreamIdConversion.generateStreamIdBetween(AlertConstants.DEFAULT_SPOUT_NAME, AlertConstants.DEFAULT_ROUTERBOLT_NAME + i), new Fields());
}
String topoName = config.getString("correlation.topologyName");
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a0fc8708/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/topology/CorrelationSpoutTest.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/topology/CorrelationSpoutTest.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/topology/CorrelationSpoutTest.java
index 5c5a37f..41e49fa 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/topology/CorrelationSpoutTest.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/topology/CorrelationSpoutTest.java
@@ -19,12 +19,10 @@
package org.apache.eagle.alert.engine.topology;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.atomic.AtomicBoolean;
-
+import backtype.storm.spout.SpoutOutputCollector;
+import backtype.storm.task.TopologyContext;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
import org.apache.eagle.alert.coordination.model.Kafka2TupleMetadata;
import org.apache.eagle.alert.coordination.model.SpoutSpec;
import org.apache.eagle.alert.coordination.model.StreamRepartitionMetadata;
@@ -35,13 +33,13 @@ import org.junit.Assert;
import org.junit.Ignore;
import org.junit.Test;
import org.slf4j.Logger;
-
import storm.kafka.KafkaSpoutWrapper;
-import backtype.storm.spout.SpoutOutputCollector;
-import backtype.storm.task.TopologyContext;
-import com.typesafe.config.Config;
-import com.typesafe.config.ConfigFactory;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
@Ignore
public class CorrelationSpoutTest {
@@ -50,17 +48,17 @@ public class CorrelationSpoutTest {
private static final Logger LOG = org.slf4j.LoggerFactory.getLogger(CorrelationSpoutTest.class);
private String dataSourceName = "ds-name";
- @SuppressWarnings({ "serial", "rawtypes" })
+ @SuppressWarnings( {"serial", "rawtypes"})
@Test
- public void testMetadataInjestion_emptyMetadata() throws Exception{
+ public void testMetadataInjestion_emptyMetadata() throws Exception {
String topoId = "testMetadataInjection";
Config config = ConfigFactory.load();
AtomicBoolean validated = new AtomicBoolean(false);
CorrelationSpout spout = new CorrelationSpout(config, topoId, null, 1) {
@Override
protected KafkaSpoutWrapper createKafkaSpout(Map conf, TopologyContext context,
- SpoutOutputCollector collector, String topic, String schemeClsName, SpoutSpec streamMetadatas, Map<String, StreamDefinition> sds)
- throws Exception {
+ SpoutOutputCollector collector, String topic, String schemeClsName, SpoutSpec streamMetadatas, Map<String, StreamDefinition> sds)
+ throws Exception {
validated.set(true);
return null;
}
@@ -72,7 +70,7 @@ public class CorrelationSpoutTest {
ds.setTopic("name-of-topic1");
ds.setSchemeCls("PlainStringScheme");
ds.setCodec(new Tuple2StreamMetadata());
- Map<String,Kafka2TupleMetadata> dsMap = new HashMap<String, Kafka2TupleMetadata>();
+ Map<String, Kafka2TupleMetadata> dsMap = new HashMap<String, Kafka2TupleMetadata>();
dsMap.put(ds.getName(), ds);
StreamRepartitionMetadata m1 = new StreamRepartitionMetadata(ds.getName(), "s1");
@@ -81,12 +79,12 @@ public class CorrelationSpoutTest {
dataSources.put(ds.getName(), Arrays.asList(m1));
SpoutSpec newMetadata = new SpoutSpec(topoId, dataSources, null, dsMap);
-
+
spout.onReload(newMetadata, null);
Assert.assertTrue(validated.get());
}
- @SuppressWarnings({ "serial", "rawtypes" })
+ @SuppressWarnings( {"serial", "rawtypes"})
@Test
public void testMetadataInjestion_oneNewTopic2Streams() throws Exception {
String topoId = "testMetadataInjection";
@@ -94,15 +92,15 @@ public class CorrelationSpoutTest {
Config config = ConfigFactory.load();
final AtomicBoolean verified = new AtomicBoolean(false);
- CorrelationSpout spout = new CorrelationSpout(config, topoId, null, 1) {
+ CorrelationSpout spout = new CorrelationSpout(config, topoId, null, 1) {
@Override
- protected KafkaSpoutWrapper createKafkaSpout(Map conf,
- TopologyContext context,
- SpoutOutputCollector collector,
- String topic,
- String topic2SchemeClsName,
- SpoutSpec streamMetadatas,
- Map<String, StreamDefinition> sds) {
+ protected KafkaSpoutWrapper createKafkaSpout(Map conf,
+ TopologyContext context,
+ SpoutOutputCollector collector,
+ String topic,
+ String topic2SchemeClsName,
+ SpoutSpec streamMetadatas,
+ Map<String, StreamDefinition> sds) {
Assert.assertEquals(1, streamMetadatas.getStreamRepartitionMetadataMap().size());
Assert.assertTrue(streamMetadatas.getStream("s1") != null);
Assert.assertTrue(streamMetadatas.getStream("s2") != null);
@@ -128,7 +126,7 @@ public class CorrelationSpoutTest {
private Map<String, Kafka2TupleMetadata> createDatasource(final String topicName, final String dataSourceName) {
Kafka2TupleMetadata ds = new Kafka2TupleMetadata();
-
+
ds.setName(dataSourceName);
ds.setType("KAFKA");
ds.setProperties(new HashMap<String, String>());
@@ -140,9 +138,9 @@ public class CorrelationSpoutTest {
return dsMap;
}
- @SuppressWarnings({ "serial", "rawtypes" })
+ @SuppressWarnings( {"serial", "rawtypes"})
@Test
- public void testMetadataInjestion_deleteOneTopic() throws Exception{
+ public void testMetadataInjestion_deleteOneTopic() throws Exception {
String topoId = "testMetadataInjection";
final String topicName = "testTopic";
Config config = ConfigFactory.load();
@@ -151,11 +149,12 @@ public class CorrelationSpoutTest {
@Override
protected KafkaSpoutWrapper createKafkaSpout(Map conf, TopologyContext context, SpoutOutputCollector collector, final String topic,
String schemeClsName, SpoutSpec streamMetadatas,
- Map<String, StreamDefinition> sds){
+ Map<String, StreamDefinition> sds) {
return new KafkaSpoutWrapper(null, null);
}
+
@Override
- protected void removeKafkaSpout(KafkaSpoutWrapper wrapper){
+ protected void removeKafkaSpout(KafkaSpoutWrapper wrapper) {
LOG.info("successfully verified removed topic and streams");
verified.set(true);
}
@@ -174,8 +173,8 @@ public class CorrelationSpoutTest {
// delete new topic
try {
spout.onReload(new SpoutSpec(topoId, new HashMap<String, List<StreamRepartitionMetadata>>(), new HashMap<>(), new HashMap<String, Kafka2TupleMetadata>()),
- null);
- }catch(Exception ex){
+ null);
+ } catch (Exception ex) {
LOG.error("error reloading spout metadata", ex);
throw ex;
}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a0fc8708/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/topology/FastWordCountTopology.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/topology/FastWordCountTopology.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/topology/FastWordCountTopology.java
index 4779fac..5aff5c8 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/topology/FastWordCountTopology.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/topology/FastWordCountTopology.java
@@ -6,9 +6,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ * <p>
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ * <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -17,23 +17,9 @@
*/
package org.apache.eagle.alert.engine.topology;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Random;
-import java.util.concurrent.ThreadLocalRandom;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
import backtype.storm.Config;
import backtype.storm.LocalCluster;
-import backtype.storm.generated.ClusterSummary;
-import backtype.storm.generated.ExecutorSummary;
-import backtype.storm.generated.KillOptions;
-import backtype.storm.generated.Nimbus;
-import backtype.storm.generated.SpoutStats;
-import backtype.storm.generated.TopologyInfo;
-import backtype.storm.generated.TopologySummary;
+import backtype.storm.generated.*;
import backtype.storm.metric.LoggingMetricsConsumer;
import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;
@@ -46,6 +32,13 @@ import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;
import backtype.storm.utils.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Random;
+import java.util.concurrent.ThreadLocalRandom;
/**
* WordCount but the spout does not stop, and the bolts are implemented in
@@ -58,11 +51,11 @@ public class FastWordCountTopology {
SpoutOutputCollector _collector;
Random _rand;
private static final String[] CHOICES = {
- "marry had a little lamb whos fleese was white as snow",
- "and every where that marry went the lamb was sure to go",
- "one two three four five six seven eight nine ten",
- "this is a test of the emergency broadcast system this is only a test",
- "peter piper picked a peck of pickeled peppers"
+ "marry had a little lamb whos fleese was white as snow",
+ "and every where that marry went the lamb was sure to go",
+ "one two three four five six seven eight nine ten",
+ "this is a test of the emergency broadcast system this is only a test",
+ "peter piper picked a peck of pickeled peppers"
};
@SuppressWarnings("rawtypes")
@@ -76,7 +69,7 @@ public class FastWordCountTopology {
public void nextTuple() {
String sentence = CHOICES[_rand.nextInt(CHOICES.length)];
_collector.emit(new Values(sentence), sentence);
- LOG.debug("Emit tuple: {}, id:{}",new Values(sentence),sentence);
+ LOG.debug("Emit tuple: {}, id:{}", new Values(sentence), sentence);
}
@Override
@@ -99,7 +92,7 @@ public class FastWordCountTopology {
@Override
public void execute(Tuple tuple, BasicOutputCollector collector) {
String sentence = tuple.getString(0);
- for (String word: sentence.split("\\s+")) {
+ for (String word : sentence.split("\\s+")) {
collector.emit(new Values(word, 1));
}
}
@@ -118,8 +111,9 @@ public class FastWordCountTopology {
public void execute(Tuple tuple, BasicOutputCollector collector) {
String word = tuple.getString(0);
Integer count = counts.get(word);
- if (count == null)
+ if (count == null) {
count = 0;
+ }
count++;
counts.put(word, count);
collector.emit(new Values(word, count));
@@ -134,26 +128,26 @@ public class FastWordCountTopology {
public static void printMetrics(Nimbus.Client client, String name) throws Exception {
ClusterSummary summary = client.getClusterInfo();
String id = null;
- for (TopologySummary ts: summary.get_topologies()) {
+ for (TopologySummary ts : summary.get_topologies()) {
if (name.equals(ts.get_name())) {
id = ts.get_id();
}
}
if (id == null) {
- throw new Exception("Could not find a topology named "+name);
+ throw new Exception("Could not find a topology named " + name);
}
TopologyInfo info = client.getTopologyInfo(id);
int uptime = info.get_uptime_secs();
long acked = 0;
long failed = 0;
double weightedAvgTotal = 0.0;
- for (ExecutorSummary exec: info.get_executors()) {
+ for (ExecutorSummary exec : info.get_executors()) {
if ("spout".equals(exec.get_component_id())) {
SpoutStats stats = exec.get_stats().get_specific().get_spout();
Map<String, Long> failedMap = stats.get_failed().get(":all-time");
Map<String, Long> ackedMap = stats.get_acked().get(":all-time");
Map<String, Double> avgLatMap = stats.get_complete_ms_avg().get(":all-time");
- for (String key: ackedMap.keySet()) {
+ for (String key : ackedMap.keySet()) {
if (failedMap != null) {
Long tmp = failedMap.get(key);
if (tmp != null) {
@@ -167,8 +161,8 @@ public class FastWordCountTopology {
}
}
}
- double avgLatency = weightedAvgTotal/acked;
- System.out.println("uptime: "+uptime+" acked: "+acked+" avgLatency: "+avgLatency+" acked/sec: "+(((double)acked)/uptime+" failed: "+failed));
+ double avgLatency = weightedAvgTotal / acked;
+ System.out.println("uptime: " + uptime + " acked: " + acked + " avgLatency: " + avgLatency + " acked/sec: " + (((double) acked) / uptime + " failed: " + failed));
}
public static void kill(Nimbus.Client client, String name) throws Exception {
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a0fc8708/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/topology/MockMetadataChangeNotifyService.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/topology/MockMetadataChangeNotifyService.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/topology/MockMetadataChangeNotifyService.java
index a8fb6d0..1494313 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/topology/MockMetadataChangeNotifyService.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/topology/MockMetadataChangeNotifyService.java
@@ -19,10 +19,7 @@
package org.apache.eagle.alert.engine.topology;
-import java.util.Arrays;
-import java.util.List;
-import java.util.Map;
-
+import com.typesafe.config.Config;
import org.apache.eagle.alert.coordination.model.AlertBoltSpec;
import org.apache.eagle.alert.coordination.model.PublishSpec;
import org.apache.eagle.alert.coordination.model.RouterSpec;
@@ -35,23 +32,25 @@ import org.codehaus.jackson.type.TypeReference;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.typesafe.config.Config;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
/**
* Since 5/4/16.
*/
-@SuppressWarnings({"serial"})
+@SuppressWarnings( {"serial"})
public class MockMetadataChangeNotifyService extends AbstractMetadataChangeNotifyService implements Runnable {
private final static Logger LOG = LoggerFactory.getLogger(MockMetadataChangeNotifyService.class);
@SuppressWarnings("unused")
- private static final String[] topics = new String[]{"testTopic3", "testTopic4", "testTopic5"};
+ private static final String[] topics = new String[] {"testTopic3", "testTopic4", "testTopic5"};
@SuppressWarnings("unused")
private String topologyName;
@SuppressWarnings("unused")
private String spoutId;
private Map<String, StreamDefinition> sds;
- public MockMetadataChangeNotifyService(String topologyName, String spoutId){
+ public MockMetadataChangeNotifyService(String topologyName, String spoutId) {
this.topologyName = topologyName;
this.spoutId = spoutId;
}
@@ -83,28 +82,29 @@ public class MockMetadataChangeNotifyService extends AbstractMetadataChangeNotif
}
}
- private Map<String, StreamDefinition> defineStreamDefinitions(){
+ private Map<String, StreamDefinition> defineStreamDefinitions() {
Map<String, StreamDefinition> sds = MetadataSerDeser.deserialize(getClass().getResourceAsStream("/testStreamDefinitionsSpec.json"),
- new TypeReference<Map<String, StreamDefinition>>() {});
+ new TypeReference<Map<String, StreamDefinition>>() {
+ });
return sds;
}
- private void notifySpout(List<String> plainStringTopics, List<String> jsonStringTopics){
+ private void notifySpout(List<String> plainStringTopics, List<String> jsonStringTopics) {
SpoutSpec newSpec = MetadataSerDeser.deserialize(getClass().getResourceAsStream("/testSpoutSpec.json"), SpoutSpec.class);
notifySpout(newSpec, sds);
}
- private void populateRouterMetadata(){
+ private void populateRouterMetadata() {
RouterSpec boltSpec = MetadataSerDeser.deserialize(getClass().getResourceAsStream("/testStreamRouterBoltSpec.json"), RouterSpec.class);
notifyStreamRouterBolt(boltSpec, sds);
}
- private void populateAlertBoltSpec(){
+ private void populateAlertBoltSpec() {
AlertBoltSpec spec = MetadataSerDeser.deserialize(getClass().getResourceAsStream("/testAlertBoltSpec.json"), AlertBoltSpec.class);
notifyAlertBolt(spec, sds);
}
- private void notifyAlertPublishBolt(){
+ private void notifyAlertPublishBolt() {
PublishSpec spec = MetadataSerDeser.deserialize(getClass().getResourceAsStream("/testPublishSpec.json"), PublishSpec.class);
notifyAlertPublishBolt(spec, sds);
}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a0fc8708/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/topology/SendData2KafkaTest.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/topology/SendData2KafkaTest.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/topology/SendData2KafkaTest.java
index fa2f0a6..0ade06a 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/topology/SendData2KafkaTest.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/topology/SendData2KafkaTest.java
@@ -18,6 +18,12 @@
*/
package org.apache.eagle.alert.engine.topology;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.junit.Ignore;
+import org.junit.Test;
+
import java.io.FileWriter;
import java.io.PrintWriter;
import java.io.Serializable;
@@ -27,22 +33,14 @@ import java.nio.file.Paths;
import java.util.List;
import java.util.Properties;
-import org.apache.kafka.clients.producer.KafkaProducer;
-import org.apache.kafka.clients.producer.ProducerRecord;
-import org.apache.kafka.common.serialization.StringSerializer;
-import org.junit.Ignore;
-import org.junit.Test;
-
/**
* Created on 3/12/16.
*/
-@SuppressWarnings({"serial", "unchecked", "rawtypes", "resource"})
-public class SendData2KafkaTest implements Serializable{
+@SuppressWarnings( {"serial", "unchecked", "rawtypes", "resource"})
+public class SendData2KafkaTest implements Serializable {
/**
- *
* {"timestamp": 10000, "metric": "esErrorLogEvent", "instanceUuid": "vm-InstanceId1", "host":"test-host1", "type":"nova", "stack":"NullPointException-..........."}
- {"timestamp": 10000, "metric": "instanceFailureLogEvent", "instanceUuid": "vm-InstanceId1", "message":"instance boot failure for user liasu!"}
- *
+ * {"timestamp": 10000, "metric": "instanceFailureLogEvent", "instanceUuid": "vm-InstanceId1", "message":"instance boot failure for user liasu!"}
*/
@Test
@Ignore
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a0fc8708/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/topology/TestBolt.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/topology/TestBolt.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/topology/TestBolt.java
index 0ad6924..1c375fa 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/topology/TestBolt.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/topology/TestBolt.java
@@ -20,27 +20,27 @@
package org.apache.eagle.alert.engine.topology;
-import java.util.Map;
-
-import org.junit.Ignore;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichBolt;
import backtype.storm.tuple.Tuple;
+import org.junit.Ignore;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
/**
* Created by yonzhang on 4/7/16.
*/
@Ignore
-@SuppressWarnings({"rawtypes", "serial"})
+@SuppressWarnings( {"rawtypes", "serial"})
public class TestBolt extends BaseRichBolt {
private static final Logger LOG = LoggerFactory.getLogger(TestBolt.class);
private OutputCollector collector;
private long count;
+
@Override
public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
this.collector = collector;
@@ -50,7 +50,7 @@ public class TestBolt extends BaseRichBolt {
public void execute(Tuple input) {
LOG.info("data is coming: " + input);
count++;
- if(count % 10 == 0){
+ if (count % 10 == 0) {
LOG.info("count = " + count);
}
collector.ack(input);
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a0fc8708/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/topology/TestByteBuffer.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/topology/TestByteBuffer.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/topology/TestByteBuffer.java
index 666d167..29986f1 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/topology/TestByteBuffer.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/topology/TestByteBuffer.java
@@ -19,26 +19,26 @@
package org.apache.eagle.alert.engine.topology;
-import java.nio.ByteBuffer;
-
import org.junit.Assert;
import org.junit.Test;
+import java.nio.ByteBuffer;
+
/**
* Storm 1.0.0 uses ByteBuffer, we need test it
*/
public class TestByteBuffer {
@Test
- public void testBB(){
+ public void testBB() {
ByteBuffer bb = ByteBuffer.allocate(100);
- bb.put((byte)12);
+ bb.put((byte) 12);
Assert.assertTrue(bb.hasArray());
bb.rewind();
Assert.assertEquals(12, bb.get());
}
@Test
- public void testMultipleStrings() throws Exception{
+ public void testMultipleStrings() throws Exception {
ByteBuffer bb = ByteBuffer.allocate(100);
bb.put("abc".getBytes("UTF-8"));
bb.put("xyz".getBytes("UTF-8"));