You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@metron.apache.org by ot...@apache.org on 2018/04/18 14:59:49 UTC
[19/52] [abbrv] metron git commit: METRON-590 Enable Use of Event
Time in Profiler (nickwallen) closes apache/metron#965
http://git-wip-us.apache.org/repos/asf/metron/blob/3083b471/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/bolt/FixedFrequencyFlushSignalTest.java
----------------------------------------------------------------------
diff --git a/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/bolt/FixedFrequencyFlushSignalTest.java b/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/bolt/FixedFrequencyFlushSignalTest.java
new file mode 100644
index 0000000..b8949c5
--- /dev/null
+++ b/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/bolt/FixedFrequencyFlushSignalTest.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.metron.profiler.bolt;
+
+import org.junit.Test;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Tests the {@code FixedFrequencyFlushSignal} class.
+ */
+public class FixedFrequencyFlushSignalTest {
+
+ @Test
+ public void testSignalFlush() {
+
+ FixedFrequencyFlushSignal signal = new FixedFrequencyFlushSignal(1000);
+
+ // not time to flush yet
+ assertFalse(signal.isTimeToFlush());
+
+ // advance time
+ signal.update(5000);
+
+ // not time to flush yet
+ assertFalse(signal.isTimeToFlush());
+
+ // advance time
+ signal.update(7000);
+
+ // time to flush
+ assertTrue(signal.isTimeToFlush());
+ }
+
+ @Test
+ public void testOutOfOrderTimestamps() {
+ FixedFrequencyFlushSignal signal = new FixedFrequencyFlushSignal(1000);
+
+ // advance time, out-of-order
+ signal.update(5000);
+ signal.update(1000);
+ signal.update(7000);
+ signal.update(3000);
+
+ // need to flush @ 5000 + 1000 = 6000. if anything > 6000 (even out-of-order), then it should signal a flush
+ assertTrue(signal.isTimeToFlush());
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void testNegativeFrequency() {
+ new FixedFrequencyFlushSignal(-1000);
+ }
+}
http://git-wip-us.apache.org/repos/asf/metron/blob/3083b471/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/bolt/KafkaDestinationHandlerTest.java
----------------------------------------------------------------------
diff --git a/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/bolt/KafkaDestinationHandlerTest.java b/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/bolt/KafkaDestinationHandlerTest.java
deleted file mode 100644
index c3f2584..0000000
--- a/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/bolt/KafkaDestinationHandlerTest.java
+++ /dev/null
@@ -1,203 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.metron.profiler.bolt;
-
-import com.google.common.collect.ImmutableMap;
-import org.adrianwalker.multilinestring.Multiline;
-import org.apache.metron.common.configuration.profiler.ProfileConfig;
-import org.apache.metron.common.utils.JSONUtils;
-import org.apache.metron.profiler.ProfileMeasurement;
-import org.apache.metron.statistics.OnlineStatisticsProvider;
-import org.apache.storm.task.OutputCollector;
-import org.apache.storm.tuple.Values;
-import org.json.simple.JSONObject;
-import org.junit.Before;
-import org.junit.Test;
-import org.mockito.ArgumentCaptor;
-import org.mockito.Mockito;
-
-import java.io.IOException;
-import java.util.Collections;
-import java.util.Map;
-import java.util.concurrent.TimeUnit;
-
-import static org.mockito.Mockito.*;
-import static org.junit.Assert.*;
-
-/**
- * Tests the KafkaDestinationHandler.
- */
-public class KafkaDestinationHandlerTest {
-
- /**
- * {
- * "profile": "profile-one-destination",
- * "foreach": "ip_src_addr",
- * "init": { "x": "0" },
- * "update": { "x": "x + 1" },
- * "result": "x"
- * }
- */
- @Multiline
- private String profileDefinition;
-
- private KafkaDestinationHandler handler;
- private ProfileConfig profile;
- private OutputCollector collector;
-
- @Before
- public void setup() throws Exception {
- handler = new KafkaDestinationHandler();
- profile = createDefinition(profileDefinition);
- collector = Mockito.mock(OutputCollector.class);
- }
-
- /**
- * The handler must serialize the ProfileMeasurement into a JSONObject.
- */
- @Test
- public void testSerialization() throws Exception {
-
- ProfileMeasurement measurement = new ProfileMeasurement()
- .withProfileName("profile")
- .withEntity("entity")
- .withPeriod(20000, 15, TimeUnit.MINUTES)
- .withTriageValues(Collections.singletonMap("triage-key", "triage-value"))
- .withDefinition(profile);
- handler.emit(measurement, collector);
-
- ArgumentCaptor<Values> arg = ArgumentCaptor.forClass(Values.class);
- verify(collector, times(1)).emit(eq(handler.getStreamId()), arg.capture());
-
- // expect a JSONObject
- Values values = arg.getValue();
- assertTrue(values.get(0) instanceof JSONObject);
-
- // validate the json
- JSONObject actual = (JSONObject) values.get(0);
- assertEquals(measurement.getDefinition().getProfile(), actual.get("profile"));
- assertEquals(measurement.getEntity(), actual.get("entity"));
- assertEquals(measurement.getPeriod().getPeriod(), actual.get("period"));
- assertEquals(measurement.getPeriod().getStartTimeMillis(), actual.get("period.start"));
- assertEquals(measurement.getPeriod().getEndTimeMillis(), actual.get("period.end"));
- assertEquals(measurement.getTriageValues().get("triage-key"), actual.get("triage-key"));
- assertNotNull(actual.get("timestamp"));
- assertEquals("profiler", actual.get("source.type"));
- }
-
- /**
- * Values destined for Kafka can only be serialized into text, which limits the types of values
- * that can result from a triage expression. Only primitive types and Strings are allowed.
- */
- @Test
- public void testInvalidType() throws Exception {
-
- // create one invalid expression and one valid expression
- Map<String, Object> triageValues = ImmutableMap.of(
- "invalid", new OnlineStatisticsProvider(),
- "valid", 4);
-
- ProfileMeasurement measurement = new ProfileMeasurement()
- .withProfileName("profile")
- .withEntity("entity")
- .withPeriod(20000, 15, TimeUnit.MINUTES)
- .withTriageValues(triageValues)
- .withDefinition(profile);
- handler.emit(measurement, collector);
-
- ArgumentCaptor<Values> arg = ArgumentCaptor.forClass(Values.class);
- verify(collector, times(1)).emit(eq(handler.getStreamId()), arg.capture());
- Values values = arg.getValue();
- assertTrue(values.get(0) instanceof JSONObject);
-
- // only the triage expression value itself should have been skipped, all others should be there
- JSONObject actual = (JSONObject) values.get(0);
- assertEquals(measurement.getDefinition().getProfile(), actual.get("profile"));
- assertEquals(measurement.getEntity(), actual.get("entity"));
- assertEquals(measurement.getPeriod().getPeriod(), actual.get("period"));
- assertEquals(measurement.getPeriod().getStartTimeMillis(), actual.get("period.start"));
- assertEquals(measurement.getPeriod().getEndTimeMillis(), actual.get("period.end"));
- assertNotNull(actual.get("timestamp"));
- assertEquals("profiler", actual.get("source.type"));
-
- // the invalid expression should be skipped due to invalid type
- assertFalse(actual.containsKey("invalid"));
-
- // but the valid expression should still be there
- assertEquals(triageValues.get("valid"), actual.get("valid"));
- }
-
- /**
- * Values destined for Kafka can only be serialized into text, which limits the types of values
- * that can result from a triage expression. Only primitive types and Strings are allowed.
- */
- @Test
- public void testIntegerIsValidType() throws Exception {
- ProfileMeasurement measurement = new ProfileMeasurement()
- .withProfileName("profile")
- .withEntity("entity")
- .withPeriod(20000, 15, TimeUnit.MINUTES)
- .withTriageValues(Collections.singletonMap("triage-key", 123))
- .withDefinition(profile);
- handler.emit(measurement, collector);
-
- ArgumentCaptor<Values> arg = ArgumentCaptor.forClass(Values.class);
- verify(collector, times(1)).emit(eq(handler.getStreamId()), arg.capture());
- Values values = arg.getValue();
- assertTrue(values.get(0) instanceof JSONObject);
- JSONObject actual = (JSONObject) values.get(0);
-
- // the triage expression is valid
- assertEquals(measurement.getTriageValues().get("triage-key"), actual.get("triage-key"));
- }
-
- /**
- * Values destined for Kafka can only be serialized into text, which limits the types of values
- * that can result from a triage expression. Only primitive types and Strings are allowed.
- */
- @Test
- public void testStringIsValidType() throws Exception {
- ProfileMeasurement measurement = new ProfileMeasurement()
- .withProfileName("profile")
- .withEntity("entity")
- .withPeriod(20000, 15, TimeUnit.MINUTES)
- .withTriageValues(Collections.singletonMap("triage-key", "value"))
- .withDefinition(profile);
- handler.emit(measurement, collector);
-
- ArgumentCaptor<Values> arg = ArgumentCaptor.forClass(Values.class);
- verify(collector, times(1)).emit(eq(handler.getStreamId()), arg.capture());
- Values values = arg.getValue();
- assertTrue(values.get(0) instanceof JSONObject);
- JSONObject actual = (JSONObject) values.get(0);
-
- // the triage expression is valid
- assertEquals(measurement.getTriageValues().get("triage-key"), actual.get("triage-key"));
- }
-
- /**
- * Creates a profile definition based on a string of JSON.
- * @param json The string of JSON.
- */
- private ProfileConfig createDefinition(String json) throws IOException {
- return JSONUtils.INSTANCE.load(json, ProfileConfig.class);
- }
-}
http://git-wip-us.apache.org/repos/asf/metron/blob/3083b471/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/bolt/KafkaEmitterTest.java
----------------------------------------------------------------------
diff --git a/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/bolt/KafkaEmitterTest.java b/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/bolt/KafkaEmitterTest.java
new file mode 100644
index 0000000..b02e377
--- /dev/null
+++ b/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/bolt/KafkaEmitterTest.java
@@ -0,0 +1,208 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.metron.profiler.bolt;
+
+import com.google.common.collect.ImmutableMap;
+import org.adrianwalker.multilinestring.Multiline;
+import org.apache.metron.common.configuration.profiler.ProfileConfig;
+import org.apache.metron.common.utils.JSONUtils;
+import org.apache.metron.profiler.ProfileMeasurement;
+import org.apache.metron.statistics.OnlineStatisticsProvider;
+import org.apache.storm.task.OutputCollector;
+import org.apache.storm.tuple.Values;
+import org.json.simple.JSONObject;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Mockito;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.eq;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
+/**
+ * Tests the KafkaDestinationHandler.
+ */
+public class KafkaEmitterTest {
+
+ /**
+ * {
+ * "profile": "profile-one-destination",
+ * "foreach": "ip_src_addr",
+ * "init": { "x": "0" },
+ * "update": { "x": "x + 1" },
+ * "result": "x"
+ * }
+ */
+ @Multiline
+ private String profileDefinition;
+
+ private KafkaEmitter handler;
+ private ProfileConfig profile;
+ private OutputCollector collector;
+
+ @Before
+ public void setup() throws Exception {
+ handler = new KafkaEmitter();
+ profile = createDefinition(profileDefinition);
+ collector = Mockito.mock(OutputCollector.class);
+ }
+
+ /**
+ * The handler must serialize the ProfileMeasurement into a JSONObject.
+ */
+ @Test
+ public void testSerialization() throws Exception {
+
+ ProfileMeasurement measurement = new ProfileMeasurement()
+ .withProfileName("profile")
+ .withEntity("entity")
+ .withPeriod(20000, 15, TimeUnit.MINUTES)
+ .withTriageValues(Collections.singletonMap("triage-key", "triage-value"))
+ .withDefinition(profile);
+ handler.emit(measurement, collector);
+
+ ArgumentCaptor<Values> arg = ArgumentCaptor.forClass(Values.class);
+ verify(collector, times(1)).emit(eq(handler.getStreamId()), arg.capture());
+
+ // expect a JSONObject
+ Values values = arg.getValue();
+ assertTrue(values.get(0) instanceof JSONObject);
+
+ // validate the json
+ JSONObject actual = (JSONObject) values.get(0);
+ assertEquals(measurement.getDefinition().getProfile(), actual.get("profile"));
+ assertEquals(measurement.getEntity(), actual.get("entity"));
+ assertEquals(measurement.getPeriod().getPeriod(), actual.get("period"));
+ assertEquals(measurement.getPeriod().getStartTimeMillis(), actual.get("period.start"));
+ assertEquals(measurement.getPeriod().getEndTimeMillis(), actual.get("period.end"));
+ assertEquals(measurement.getTriageValues().get("triage-key"), actual.get("triage-key"));
+ assertNotNull(actual.get("timestamp"));
+ assertEquals("profiler", actual.get("source.type"));
+ }
+
+ /**
+ * Values destined for Kafka can only be serialized into text, which limits the types of values
+ * that can result from a triage expression. Only primitive types and Strings are allowed.
+ */
+ @Test
+ public void testInvalidType() throws Exception {
+
+ // create one invalid expression and one valid expression
+ Map<String, Object> triageValues = ImmutableMap.of(
+ "invalid", new OnlineStatisticsProvider(),
+ "valid", 4);
+
+ ProfileMeasurement measurement = new ProfileMeasurement()
+ .withProfileName("profile")
+ .withEntity("entity")
+ .withPeriod(20000, 15, TimeUnit.MINUTES)
+ .withTriageValues(triageValues)
+ .withDefinition(profile);
+ handler.emit(measurement, collector);
+
+ ArgumentCaptor<Values> arg = ArgumentCaptor.forClass(Values.class);
+ verify(collector, times(1)).emit(eq(handler.getStreamId()), arg.capture());
+ Values values = arg.getValue();
+ assertTrue(values.get(0) instanceof JSONObject);
+
+ // only the triage expression value itself should have been skipped, all others should be there
+ JSONObject actual = (JSONObject) values.get(0);
+ assertEquals(measurement.getDefinition().getProfile(), actual.get("profile"));
+ assertEquals(measurement.getEntity(), actual.get("entity"));
+ assertEquals(measurement.getPeriod().getPeriod(), actual.get("period"));
+ assertEquals(measurement.getPeriod().getStartTimeMillis(), actual.get("period.start"));
+ assertEquals(measurement.getPeriod().getEndTimeMillis(), actual.get("period.end"));
+ assertNotNull(actual.get("timestamp"));
+ assertEquals("profiler", actual.get("source.type"));
+
+ // the invalid expression should be skipped due to invalid type
+ assertFalse(actual.containsKey("invalid"));
+
+ // but the valid expression should still be there
+ assertEquals(triageValues.get("valid"), actual.get("valid"));
+ }
+
+ /**
+ * Values destined for Kafka can only be serialized into text, which limits the types of values
+ * that can result from a triage expression. Only primitive types and Strings are allowed.
+ */
+ @Test
+ public void testIntegerIsValidType() throws Exception {
+ ProfileMeasurement measurement = new ProfileMeasurement()
+ .withProfileName("profile")
+ .withEntity("entity")
+ .withPeriod(20000, 15, TimeUnit.MINUTES)
+ .withTriageValues(Collections.singletonMap("triage-key", 123))
+ .withDefinition(profile);
+ handler.emit(measurement, collector);
+
+ ArgumentCaptor<Values> arg = ArgumentCaptor.forClass(Values.class);
+ verify(collector, times(1)).emit(eq(handler.getStreamId()), arg.capture());
+ Values values = arg.getValue();
+ assertTrue(values.get(0) instanceof JSONObject);
+ JSONObject actual = (JSONObject) values.get(0);
+
+ // the triage expression is valid
+ assertEquals(measurement.getTriageValues().get("triage-key"), actual.get("triage-key"));
+ }
+
+ /**
+ * Values destined for Kafka can only be serialized into text, which limits the types of values
+ * that can result from a triage expression. Only primitive types and Strings are allowed.
+ */
+ @Test
+ public void testStringIsValidType() throws Exception {
+ ProfileMeasurement measurement = new ProfileMeasurement()
+ .withProfileName("profile")
+ .withEntity("entity")
+ .withPeriod(20000, 15, TimeUnit.MINUTES)
+ .withTriageValues(Collections.singletonMap("triage-key", "value"))
+ .withDefinition(profile);
+ handler.emit(measurement, collector);
+
+ ArgumentCaptor<Values> arg = ArgumentCaptor.forClass(Values.class);
+ verify(collector, times(1)).emit(eq(handler.getStreamId()), arg.capture());
+ Values values = arg.getValue();
+ assertTrue(values.get(0) instanceof JSONObject);
+ JSONObject actual = (JSONObject) values.get(0);
+
+ // the triage expression is valid
+ assertEquals(measurement.getTriageValues().get("triage-key"), actual.get("triage-key"));
+ }
+
+ /**
+ * Creates a profile definition based on a string of JSON.
+ * @param json The string of JSON.
+ */
+ private ProfileConfig createDefinition(String json) throws IOException {
+ return JSONUtils.INSTANCE.load(json, ProfileConfig.class);
+ }
+}
http://git-wip-us.apache.org/repos/asf/metron/blob/3083b471/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/bolt/ProfileBuilderBoltTest.java
----------------------------------------------------------------------
diff --git a/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/bolt/ProfileBuilderBoltTest.java b/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/bolt/ProfileBuilderBoltTest.java
index 21d61ab..78e20e0 100644
--- a/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/bolt/ProfileBuilderBoltTest.java
+++ b/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/bolt/ProfileBuilderBoltTest.java
@@ -20,35 +20,37 @@
package org.apache.metron.profiler.bolt;
-import org.adrianwalker.multilinestring.Multiline;
-import org.apache.commons.lang3.StringUtils;
import org.apache.metron.common.configuration.profiler.ProfileConfig;
-import org.apache.metron.common.utils.JSONUtils;
+import org.apache.metron.common.configuration.profiler.ProfilerConfigurations;
+import org.apache.metron.profiler.MessageDistributor;
import org.apache.metron.profiler.MessageRoute;
-import org.apache.metron.profiler.ProfileBuilder;
import org.apache.metron.profiler.ProfileMeasurement;
-import org.apache.metron.stellar.dsl.Context;
+import org.apache.metron.profiler.integration.MessageBuilder;
import org.apache.metron.test.bolt.BaseBoltTest;
-import org.apache.storm.Constants;
+import org.apache.storm.task.OutputCollector;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.topology.base.BaseWindowedBolt;
+import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;
+import org.apache.storm.windowing.TupleWindow;
import org.json.simple.JSONObject;
-import org.json.simple.parser.JSONParser;
import org.junit.Before;
import org.junit.Test;
import org.mockito.ArgumentCaptor;
import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collections;
import java.util.HashMap;
+import java.util.List;
import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
-import static org.apache.metron.stellar.common.utils.ConversionUtils.convert;
import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotSame;
-import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
+import static org.mockito.Matchers.any;
import static org.mockito.Matchers.eq;
-import static org.mockito.Mockito.atLeastOnce;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
@@ -59,284 +61,348 @@ import static org.mockito.Mockito.when;
*/
public class ProfileBuilderBoltTest extends BaseBoltTest {
- /**
- * {
- * "ip_src_addr": "10.0.0.1",
- * "value": "22"
- * }
- */
- @Multiline
- private String inputOne;
- private JSONObject messageOne;
+ private JSONObject message1;
+ private JSONObject message2;
+ private ProfileConfig profile1;
+ private ProfileConfig profile2;
+ private ProfileMeasurementEmitter emitter;
+ private ManualFlushSignal flushSignal;
- /**
- * {
- * "ip_src_addr": "10.0.0.2",
- * "value": "22"
- * }
- */
- @Multiline
- private String inputTwo;
- private JSONObject messageTwo;
+ @Before
+ public void setup() throws Exception {
+
+ message1 = new MessageBuilder()
+ .withField("ip_src_addr", "10.0.0.1")
+ .withField("value", "22")
+ .build();
+
+ message2 = new MessageBuilder()
+ .withField("ip_src_addr", "10.0.0.2")
+ .withField("value", "22")
+ .build();
+
+ profile1 = new ProfileConfig()
+ .withProfile("profile1")
+ .withForeach("ip_src_addr")
+ .withInit("x", "0")
+ .withUpdate("x", "x + 1")
+ .withResult("x");
+
+ profile2 = new ProfileConfig()
+ .withProfile("profile2")
+ .withForeach("ip_src_addr")
+ .withInit(Collections.singletonMap("x", "0"))
+ .withUpdate(Collections.singletonMap("x", "x + 1"))
+ .withResult("x");
+
+ flushSignal = new ManualFlushSignal();
+ flushSignal.setFlushNow(false);
+ }
/**
- * {
- * "profile": "profileOne",
- * "foreach": "ip_src_addr",
- * "init": { "x": "0" },
- * "update": { "x": "x + 1" },
- * "result": "x"
- * }
+ * The bolt should extract a message and timestamp from a tuple and
+ * pass that to a {@code MessageDistributor}.
*/
- @Multiline
- private String profileOne;
+ @Test
+ public void testExtractMessage() throws Exception {
+ ProfileBuilderBolt bolt = createBolt();
- /**
- * {
- * "profile": "profileTwo",
- * "foreach": "ip_src_addr",
- * "init": { "x": "0" },
- * "update": { "x": "x + 1" },
- * "result": "x"
- * }
- */
- @Multiline
- private String profileTwo;
+ // create a mock
+ MessageDistributor distributor = mock(MessageDistributor.class);
+ bolt.withMessageDistributor(distributor);
- public static Tuple mockTickTuple() {
- Tuple tuple = mock(Tuple.class);
- when(tuple.getSourceComponent()).thenReturn(Constants.SYSTEM_COMPONENT_ID);
- when(tuple.getSourceStreamId()).thenReturn(Constants.SYSTEM_TICK_STREAM_ID);
- return tuple;
- }
+ // create a tuple
+ final long timestamp1 = 100000000L;
+ Tuple tuple1 = createTuple("entity1", message1, profile1, timestamp1);
- @Before
- public void setup() throws Exception {
- JSONParser parser = new JSONParser();
- messageOne = (JSONObject) parser.parse(inputOne);
- messageTwo = (JSONObject) parser.parse(inputTwo);
+ // execute the bolt
+ TupleWindow tupleWindow = createWindow(tuple1);
+ bolt.execute(tupleWindow);
+
+ // the message should have been extracted from the tuple and passed to the MessageDistributor
+ verify(distributor).distribute(eq(message1), eq(timestamp1), any(MessageRoute.class), any());
}
+
/**
- * Creates a profile definition based on a string of JSON.
- * @param json The string of JSON.
+ * If the {@code FlushSignal} tells the bolt to flush, it should flush the {@code MessageDistributor}
+ * and emit the {@code ProfileMeasurement} values.
*/
- private ProfileConfig createDefinition(String json) throws IOException {
- return JSONUtils.INSTANCE.load(json, ProfileConfig.class);
+ @Test
+ public void testEmitWhenFlush() throws Exception {
+
+ ProfileBuilderBolt bolt = createBolt();
+
+ // create a profile measurement
+ ProfileMeasurement m = new ProfileMeasurement()
+ .withEntity("entity1")
+ .withProfileName("profile1")
+ .withPeriod(1000, 500, TimeUnit.MILLISECONDS)
+ .withProfileValue(22);
+
+ // create a mock that returns the profile measurement above
+ MessageDistributor distributor = mock(MessageDistributor.class);
+ when(distributor.flush()).thenReturn(Collections.singletonList(m));
+ bolt.withMessageDistributor(distributor);
+
+ // signal the bolt to flush
+ flushSignal.setFlushNow(true);
+
+ // execute the bolt
+ Tuple tuple1 = createTuple("entity1", message1, profile1, 1000L);
+ TupleWindow tupleWindow = createWindow(tuple1);
+ bolt.execute(tupleWindow);
+
+ // a profile measurement should be emitted by the bolt
+ List<ProfileMeasurement> measurements = getProfileMeasurements(outputCollector, 1);
+ assertEquals(1, measurements.size());
+ assertEquals(m, measurements.get(0));
}
/**
- * Create a tuple that will contain the message, the entity name, and profile definition.
- * @param entity The entity name
- * @param message The telemetry message.
- * @param profile The profile definition.
+ * If the {@code FlushSignal} tells the bolt NOT to flush, nothing should be emitted.
*/
- private Tuple createTuple(String entity, JSONObject message, ProfileConfig profile) {
- Tuple tuple = mock(Tuple.class);
- when(tuple.getValueByField(eq("message"))).thenReturn(message);
- when(tuple.getValueByField(eq("entity"))).thenReturn(entity);
- when(tuple.getValueByField(eq("profile"))).thenReturn(profile);
- return tuple;
+ @Test
+ public void testDoNotEmitWhenNoFlush() throws Exception {
+
+ ProfileBuilderBolt bolt = createBolt();
+
+ // create a profile measurement
+ ProfileMeasurement m = new ProfileMeasurement()
+ .withEntity("entity1")
+ .withProfileName("profile1")
+ .withPeriod(1000, 500, TimeUnit.MILLISECONDS)
+ .withProfileValue(22);
+
+ // create a mock that returns the profile measurement above
+ MessageDistributor distributor = mock(MessageDistributor.class);
+ when(distributor.flush()).thenReturn(Collections.singletonList(m));
+ bolt.withMessageDistributor(distributor);
+
+ // no flush signal
+ flushSignal.setFlushNow(false);
+
+ // execute the bolt
+ Tuple tuple1 = createTuple("entity1", message1, profile1, 1000L);
+ TupleWindow tupleWindow = createWindow(tuple1);
+ bolt.execute(tupleWindow);
+
+ // nothing should have been emitted
+ getProfileMeasurements(outputCollector, 0);
}
/**
- * Create a ProfileBuilderBolt to test
+ * A {@link ProfileMeasurement} is built for each profile/entity pair. The measurement should be emitted to each
+ * destination defined by the profile. By default, a profile uses both Kafka and HBase as destinations.
*/
- private ProfileBuilderBolt createBolt() throws IOException {
+ @Test
+ public void testEmitters() throws Exception {
+
+ // defines the zk configurations accessible from the bolt
+ ProfilerConfigurations configurations = new ProfilerConfigurations();
+ configurations.updateGlobalConfig(Collections.emptyMap());
+
+ // create the bolt with 3 destinations
+ ProfileBuilderBolt bolt = (ProfileBuilderBolt) new ProfileBuilderBolt()
+ .withProfileTimeToLive(30, TimeUnit.MINUTES)
+ .withPeriodDuration(10, TimeUnit.MINUTES)
+ .withMaxNumberOfRoutes(Long.MAX_VALUE)
+ .withZookeeperClient(client)
+ .withZookeeperCache(cache)
+ .withEmitter(new TestEmitter("destination1"))
+ .withEmitter(new TestEmitter("destination2"))
+ .withEmitter(new TestEmitter("destination3"))
+ .withProfilerConfigurations(configurations)
+ .withTumblingWindow(new BaseWindowedBolt.Duration(10, TimeUnit.MINUTES));
+ bolt.prepare(new HashMap<>(), topologyContext, outputCollector);
- ProfileBuilderBolt bolt = new ProfileBuilderBolt("zookeeperURL");
- bolt.setCuratorFramework(client);
- bolt.setZKCache(cache);
- bolt.withPeriodDuration(10, TimeUnit.MINUTES);
- bolt.withProfileTimeToLive(30, TimeUnit.MINUTES);
+ // signal the bolt to flush
+ bolt.withFlushSignal(flushSignal);
+ flushSignal.setFlushNow(true);
- // define the valid destinations for the profiler
- bolt.withDestinationHandler(new HBaseDestinationHandler());
- bolt.withDestinationHandler(new KafkaDestinationHandler());
+ // execute the bolt
+ Tuple tuple1 = createTuple("entity", message1, profile1, System.currentTimeMillis());
+ TupleWindow window = createWindow(tuple1);
+ bolt.execute(window);
- bolt.prepare(new HashMap<>(), topologyContext, outputCollector);
- return bolt;
+ // validate measurements emitted to each
+ verify(outputCollector, times(1)).emit(eq("destination1"), any());
+ verify(outputCollector, times(1)).emit(eq("destination2"), any());
+ verify(outputCollector, times(1)).emit(eq("destination3"), any());
}
- /**
- * The bolt should create a ProfileBuilder to manage a profile.
- */
@Test
- public void testCreateProfileBuilder() throws Exception {
+ public void testFlushExpiredWithTick() throws Exception {
ProfileBuilderBolt bolt = createBolt();
- ProfileConfig definition = createDefinition(profileOne);
- String entity = (String) messageOne.get("ip_src_addr");
- Tuple tupleOne = createTuple(entity, messageOne, definition);
- // execute - send two tuples with different entities
- bolt.execute(tupleOne);
+ // create a mock
+ MessageDistributor distributor = mock(MessageDistributor.class);
+ bolt.withMessageDistributor(distributor);
+
+ // tell the bolt to flush on the first window
+ flushSignal.setFlushNow(true);
- // validate - 1 messages applied
- MessageRoute route = new MessageRoute(definition, entity);
- ProfileBuilder builderOne = bolt.getMessageDistributor().getBuilder(route, Context.EMPTY_CONTEXT());
- assertEquals(1, (int) convert(builderOne.valueOf("x"), Integer.class));
+ // execute the bolt; include a tick tuple in the window
+ Tuple tuple1 = createTuple("entity", message1, profile1, 100000000L);
+ TupleWindow tupleWindow = createWindow(tuple1, mockTickTuple());
+ bolt.execute(tupleWindow);
+
+ // ensure the expired profiles were flushed when the tick tuple was received
+ verify(distributor).flushExpired();
}
- /**
- * This test creates two different messages, with different entities that are applied to
- * the same profile. The bolt should create separate ProfileBuilder objects to handle each
- * profile/entity pair.
- */
@Test
- public void testCreateProfileBuilderForEachEntity() throws Exception {
+ public void testFlushExpiredWithNoTick() throws Exception {
- // setup
ProfileBuilderBolt bolt = createBolt();
- ProfileConfig definition = createDefinition(profileOne);
-
- // apply a message to the profile
- String entityOne = (String) messageOne.get("ip_src_addr");
- Tuple tupleOne = createTuple(entityOne, messageOne, definition);
- bolt.execute(tupleOne);
- bolt.execute(tupleOne);
-
- // apply a different message (with different entity) to the same profile
- String entityTwo = (String) messageTwo.get("ip_src_addr");
- Tuple tupleTwo = createTuple(entityTwo, messageTwo, definition);
- bolt.execute(tupleTwo);
-
- // validate - 2 messages applied
- MessageRoute routeOne = new MessageRoute(definition, entityOne);
- ProfileBuilder builderOne = bolt.getMessageDistributor().getBuilder(routeOne, Context.EMPTY_CONTEXT());
- assertTrue(builderOne.isInitialized());
- assertEquals(2, (int) convert(builderOne.valueOf("x"), Integer.class));
-
- // validate - 1 message applied
- MessageRoute routeTwo = new MessageRoute(definition, entityTwo);
- ProfileBuilder builderTwo = bolt.getMessageDistributor().getBuilder(routeTwo, Context.EMPTY_CONTEXT());
- assertTrue(builderTwo.isInitialized());
- assertEquals(1, (int) convert(builderTwo.valueOf("x"), Integer.class));
-
- assertNotSame(builderOne, builderTwo);
+
+ // create a mock
+ MessageDistributor distributor = mock(MessageDistributor.class);
+ bolt.withMessageDistributor(distributor);
+
+ // tell the bolt to flush on the first window
+ flushSignal.setFlushNow(true);
+
+ // execute the bolt; NO tick tuple
+ Tuple tuple1 = createTuple("entity", message1, profile1, 100000000L);
+ TupleWindow tupleWindow = createWindow(tuple1);
+ bolt.execute(tupleWindow);
+
+ // there was no tick tuple; the expired profiles should NOT have been flushed
+ verify(distributor, times(0)).flushExpired();
}
/**
- * The bolt should create separate ProfileBuilder objects to handle each
- * profile/entity pair.
+ * Creates a mock tick tuple to use for testing.
+ * @return A mock tick tuple.
*/
- @Test
- public void testCreateProfileBuilderForEachProfile() throws Exception {
+ private Tuple mockTickTuple() {
- // setup - apply one message to different profile definitions
- ProfileBuilderBolt bolt = createBolt();
- String entity = (String) messageOne.get("ip_src_addr");
-
- // apply a message to the first profile
- ProfileConfig definitionOne = createDefinition(profileOne);
- Tuple tupleOne = createTuple(entity, messageOne, definitionOne);
- bolt.execute(tupleOne);
-
- // apply the same message to the second profile
- ProfileConfig definitionTwo = createDefinition(profileTwo);
- Tuple tupleTwo = createTuple(entity, messageOne, definitionTwo);
- bolt.execute(tupleTwo);
-
- // validate - 1 message applied
- MessageRoute routeOne = new MessageRoute(definitionOne, entity);
- ProfileBuilder builderOne = bolt.getMessageDistributor().getBuilder(routeOne, Context.EMPTY_CONTEXT());
- assertTrue(builderOne.isInitialized());
- assertEquals(1, (int) convert(builderOne.valueOf("x"), Integer.class));
-
- // validate - 1 message applied
- MessageRoute routeTwo = new MessageRoute(definitionTwo, entity);
- ProfileBuilder builderTwo = bolt.getMessageDistributor().getBuilder(routeTwo, Context.EMPTY_CONTEXT());
- assertTrue(builderTwo.isInitialized());
- assertEquals(1, (int) convert(builderTwo.valueOf("x"), Integer.class));
-
- assertNotSame(builderOne, builderTwo);
+ Tuple tuple = mock(Tuple.class);
+ when(tuple.getSourceComponent()).thenReturn("__system");
+ when(tuple.getSourceStreamId()).thenReturn("__tick");
+
+ return tuple;
}
/**
- * A ProfileMeasurement is build for each profile/entity pair. A measurement for each profile/entity
- * pair should be emitted.
+ * Retrieves the ProfileMeasurement(s) (if any) that have been emitted.
+ *
+ * @param collector The Storm output collector.
+ * @param expected The number of measurements expected.
+ * @return A list of ProfileMeasurement(s).
*/
- @Test
- public void testEmitMeasurements() throws Exception {
-
- // setup
- ProfileBuilderBolt bolt = createBolt();
- final String entity = (String) messageOne.get("ip_src_addr");
+ private List<ProfileMeasurement> getProfileMeasurements(OutputCollector collector, int expected) {
- // apply the message to the first profile
- ProfileConfig definitionOne = createDefinition(profileOne);
- Tuple tupleOne = createTuple(entity, messageOne, definitionOne);
- bolt.execute(tupleOne);
+ // the 'streamId' is defined by the DestinationHandler being used by the bolt
+ final String streamId = emitter.getStreamId();
- // apply the same message to the second profile
- ProfileConfig definitionTwo = createDefinition(profileTwo);
- Tuple tupleTwo = createTuple(entity, messageOne, definitionTwo);
- bolt.execute(tupleTwo);
+ // capture the emitted tuple(s)
+ ArgumentCaptor<Values> argCaptor = ArgumentCaptor.forClass(Values.class);
+ verify(collector, times(expected))
+ .emit(eq(streamId), argCaptor.capture());
- // execute - the tick tuple triggers a flush of the profile
- bolt.execute(mockTickTuple());
+ // return the profile measurements that were emitted
+ return argCaptor.getAllValues()
+ .stream()
+ .map(val -> (ProfileMeasurement) val.get(0))
+ .collect(Collectors.toList());
+ }
- // capture the ProfileMeasurement that should be emitted
- ArgumentCaptor<Values> arg = ArgumentCaptor.forClass(Values.class);
+ /**
+ * Create a tuple that will contain the message, the entity name, and profile definition.
+ * @param entity The entity name
+ * @param message The telemetry message.
+ * @param profile The profile definition.
+ */
+ private Tuple createTuple(String entity, JSONObject message, ProfileConfig profile, long timestamp) {
- // validate emitted measurements for hbase
- verify(outputCollector, atLeastOnce()).emit(eq("hbase"), arg.capture());
- for (Values value : arg.getAllValues()) {
+ Tuple tuple = mock(Tuple.class);
+ when(tuple.getValueByField(eq(ProfileSplitterBolt.MESSAGE_TUPLE_FIELD))).thenReturn(message);
+ when(tuple.getValueByField(eq(ProfileSplitterBolt.TIMESTAMP_TUPLE_FIELD))).thenReturn(timestamp);
+ when(tuple.getValueByField(eq(ProfileSplitterBolt.ENTITY_TUPLE_FIELD))).thenReturn(entity);
+ when(tuple.getValueByField(eq(ProfileSplitterBolt.PROFILE_TUPLE_FIELD))).thenReturn(profile);
- ProfileMeasurement measurement = (ProfileMeasurement) value.get(0);
- ProfileConfig definition = measurement.getDefinition();
+ return tuple;
+ }
- if (StringUtils.equals(definitionTwo.getProfile(), definition.getProfile())) {
+ /**
+ * Create a ProfileBuilderBolt to test.
+ * @return A {@link ProfileBuilderBolt} to test.
+ */
+ private ProfileBuilderBolt createBolt() throws IOException {
- // validate measurement emitted for profile two
- assertEquals(definitionTwo, definition);
- assertEquals(entity, measurement.getEntity());
- assertEquals(definitionTwo.getProfile(), measurement.getProfileName());
- assertEquals(1, (int) convert(measurement.getProfileValue(), Integer.class));
+ return createBolt(30, TimeUnit.SECONDS);
+ }
- } else if (StringUtils.equals(definitionOne.getProfile(), definition.getProfile())) {
+ /**
+ * Create a ProfileBuilderBolt to test.
+ *
+ * @param windowDuration The event window duration.
+ * @param windowDurationUnits The units of the event window duration.
+ * @return A {@link ProfileBuilderBolt} to test.
+ */
+ private ProfileBuilderBolt createBolt(int windowDuration, TimeUnit windowDurationUnits) throws IOException {
+
+ // defines the zk configurations accessible from the bolt
+ ProfilerConfigurations configurations = new ProfilerConfigurations();
+ configurations.updateGlobalConfig(Collections.emptyMap());
+
+ emitter = new HBaseEmitter();
+ ProfileBuilderBolt bolt = (ProfileBuilderBolt) new ProfileBuilderBolt()
+ .withProfileTimeToLive(30, TimeUnit.MINUTES)
+ .withMaxNumberOfRoutes(Long.MAX_VALUE)
+ .withZookeeperClient(client)
+ .withZookeeperCache(cache)
+ .withEmitter(emitter)
+ .withProfilerConfigurations(configurations)
+ .withPeriodDuration(1, TimeUnit.MINUTES)
+ .withTumblingWindow(new BaseWindowedBolt.Duration(windowDuration, windowDurationUnits));
+ bolt.prepare(new HashMap<>(), topologyContext, outputCollector);
- // validate measurement emitted for profile one
- assertEquals(definitionOne, definition);
- assertEquals(entity, measurement.getEntity());
- assertEquals(definitionOne.getProfile(), measurement.getProfileName());
- assertEquals(1, (int) convert(measurement.getProfileValue(), Integer.class));
+ // set the flush signal AFTER calling 'prepare'
+ bolt.withFlushSignal(flushSignal);
- } else {
- fail();
- }
- }
+ return bolt;
}
/**
- * A ProfileMeasurement is build for each profile/entity pair. The measurement should be emitted to each
- * destination defined by the profile. By default, a profile uses both Kafka and HBase as destinations.
+ * Creates a mock TupleWindow containing multiple tuples.
+ * @param tuples The tuples to add to the window.
*/
- @Test
- public void testDestinationHandlers() throws Exception {
+ private TupleWindow createWindow(Tuple... tuples) {
- // setup
- ProfileBuilderBolt bolt = createBolt();
- ProfileConfig definitionOne = createDefinition(profileOne);
+ TupleWindow window = mock(TupleWindow.class);
+ when(window.get()).thenReturn(Arrays.asList(tuples));
+ return window;
+ }
- // apply the message to the first profile
- final String entity = (String) messageOne.get("ip_src_addr");
- Tuple tupleOne = createTuple(entity, messageOne, definitionOne);
- bolt.execute(tupleOne);
+ /**
+ * An implementation for testing purposes only.
+ */
+ private class TestEmitter implements ProfileMeasurementEmitter {
- // trigger a flush of the profile
- bolt.execute(mockTickTuple());
+ private String streamId;
- // capture the values that should be emitted
- ArgumentCaptor<Values> arg = ArgumentCaptor.forClass(Values.class);
+ public TestEmitter(String streamId) {
+ this.streamId = streamId;
+ }
- // validate measurements emitted to HBase
- verify(outputCollector, times(1)).emit(eq("hbase"), arg.capture());
- assertTrue(arg.getValue().get(0) instanceof ProfileMeasurement);
+ @Override
+ public String getStreamId() {
+ return streamId;
+ }
- // validate measurements emitted to Kafka
- verify(outputCollector, times(1)).emit(eq("kafka"), arg.capture());
- assertTrue(arg.getValue().get(0) instanceof JSONObject);
+ @Override
+ public void declareOutputFields(OutputFieldsDeclarer declarer) {
+ declarer.declareStream(getStreamId(), new Fields("measurement"));
+ }
+
+ @Override
+ public void emit(ProfileMeasurement measurement, OutputCollector collector) {
+ collector.emit(getStreamId(), new Values(measurement));
+ }
}
}
http://git-wip-us.apache.org/repos/asf/metron/blob/3083b471/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/bolt/ProfileHBaseMapperTest.java
----------------------------------------------------------------------
diff --git a/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/bolt/ProfileHBaseMapperTest.java b/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/bolt/ProfileHBaseMapperTest.java
index 17d6827..04c774c 100644
--- a/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/bolt/ProfileHBaseMapperTest.java
+++ b/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/bolt/ProfileHBaseMapperTest.java
@@ -20,11 +20,11 @@
package org.apache.metron.profiler.bolt;
-import org.apache.metron.common.configuration.profiler.ProfileResult;
-import org.apache.storm.tuple.Tuple;
import org.apache.metron.common.configuration.profiler.ProfileConfig;
+import org.apache.metron.common.configuration.profiler.ProfileResult;
import org.apache.metron.profiler.ProfileMeasurement;
import org.apache.metron.profiler.hbase.RowKeyBuilder;
+import org.apache.storm.tuple.Tuple;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
@@ -32,10 +32,8 @@ import org.junit.Test;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
-import static org.mockito.Matchers.any;
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
/**
http://git-wip-us.apache.org/repos/asf/metron/blob/3083b471/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/bolt/ProfileSplitterBoltTest.java
----------------------------------------------------------------------
diff --git a/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/bolt/ProfileSplitterBoltTest.java b/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/bolt/ProfileSplitterBoltTest.java
index beab8d5..bf81923 100644
--- a/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/bolt/ProfileSplitterBoltTest.java
+++ b/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/bolt/ProfileSplitterBoltTest.java
@@ -21,7 +21,10 @@
package org.apache.metron.profiler.bolt;
import org.adrianwalker.multilinestring.Multiline;
-import org.apache.metron.stellar.common.DefaultStellarStatefulExecutor;
+import org.apache.metron.common.configuration.profiler.ProfileConfig;
+import org.apache.metron.common.configuration.profiler.ProfilerConfig;
+import org.apache.metron.profiler.clock.FixedClockFactory;
+import org.apache.metron.common.utils.JSONUtils;
import org.apache.metron.test.bolt.BaseBoltTest;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;
@@ -31,12 +34,15 @@ import org.json.simple.parser.ParseException;
import org.junit.Before;
import org.junit.Test;
-import java.io.IOException;
+import java.io.ByteArrayInputStream;
+import java.io.InputStream;
import java.util.HashMap;
import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.refEq;
-import static org.mockito.Mockito.*;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
/**
* Tests the ProfileSplitterBolt.
@@ -47,7 +53,9 @@ public class ProfileSplitterBoltTest extends BaseBoltTest {
* {
* "ip_src_addr": "10.0.0.1",
* "ip_dst_addr": "10.0.0.20",
- * "protocol": "HTTP"
+ * "protocol": "HTTP",
+ * "timestamp.custom": 2222222222222,
+ * "timestamp.string": "3333333333333"
* }
*/
@Multiline
@@ -68,7 +76,7 @@ public class ProfileSplitterBoltTest extends BaseBoltTest {
* }
*/
@Multiline
- private String onlyIfTrue;
+ private String profileWithOnlyIfTrue;
/**
* {
@@ -85,7 +93,7 @@ public class ProfileSplitterBoltTest extends BaseBoltTest {
* }
*/
@Multiline
- private String onlyIfFalse;
+ private String profileWithOnlyIfFalse;
/**
* {
@@ -101,7 +109,7 @@ public class ProfileSplitterBoltTest extends BaseBoltTest {
* }
*/
@Multiline
- private String onlyIfMissing;
+ private String profileWithOnlyIfMissing;
/**
* {
@@ -118,9 +126,89 @@ public class ProfileSplitterBoltTest extends BaseBoltTest {
* }
*/
@Multiline
- private String onlyIfInvalid;
+ private String profileWithOnlyIfInvalid;
+
+ /**
+ * {
+ * "profiles": [
+ * {
+ * "profile": "test",
+ * "foreach": "ip_src_addr",
+ * "init": {},
+ * "update": {},
+ * "result": "2"
+ * }
+ * ],
+ * "timestampField": "timestamp.custom"
+ * }
+ */
+ @Multiline
+ private String profileUsingCustomTimestampField;
+
+ /**
+ * {
+ * "profiles": [
+ * {
+ * "profile": "test",
+ * "foreach": "ip_src_addr",
+ * "init": {},
+ * "update": {},
+ * "result": "2"
+ * }
+ * ],
+ * "timestampField": "timestamp.missing"
+ * }
+ */
+ @Multiline
+ private String profileUsingMissingTimestampField;
+
+ /**
+ * {
+ * "profiles": [
+ * {
+ * "profile": "test",
+ * "foreach": "ip_src_addr",
+ * "init": {},
+ * "update": {},
+ * "result": "2"
+ * }
+ * ],
+ * "timestampField": "timestamp.string"
+ * }
+ */
+ @Multiline
+ private String profileUsingStringTimestampField;
+
+ /**
+ * {
+ * "profiles": [
+ * ]
+ * }
+ */
+ @Multiline
+ private String noProfilesDefined;
+
+ /**
+ * {
+ * "profiles": [
+ * {
+ * "profile": "profile1",
+ * "foreach": "'global'",
+ * "result": "1"
+ * },
+ * {
+ * "profile": "profile2",
+ * "foreach": "'global'",
+ * "result": "2"
+ * }
+ * ]
+ * }
+ */
+ @Multiline
+ private String twoProfilesDefined;
private JSONObject message;
+ private long timestamp = 3333333;
@Before
public void setup() throws ParseException {
@@ -134,17 +222,83 @@ public class ProfileSplitterBoltTest extends BaseBoltTest {
}
/**
- * Create a ProfileSplitterBolt to test
+ * Ensure that a tuple with the correct fields is emitted to downstream bolts
+ * when a profile is defined.
*/
- private ProfileSplitterBolt createBolt(String profilerConfig) throws IOException {
+ @Test
+ public void testEmitTupleWithOneProfile() throws Exception {
- ProfileSplitterBolt bolt = new ProfileSplitterBolt("zookeeperURL");
- bolt.setCuratorFramework(client);
- bolt.setZKCache(cache);
- bolt.getConfigurations().updateProfilerConfig(profilerConfig.getBytes("UTF-8"));
- bolt.prepare(new HashMap<>(), topologyContext, outputCollector);
+ // setup the bolt and execute a tuple
+ ProfilerConfig config = toProfilerConfig(profileWithOnlyIfTrue);
+ ProfileSplitterBolt bolt = createBolt(config);
+ bolt.execute(tuple);
- return bolt;
+ // the expected tuple fields
+ String expectedEntity = "10.0.0.1";
+ ProfileConfig expectedConfig = config.getProfiles().get(0);
+ Values expected = new Values(message, timestamp, expectedEntity, expectedConfig);
+
+ // a tuple should be emitted for the downstream profile builder
+ verify(outputCollector, times(1))
+ .emit(eq(tuple), eq(expected));
+
+ // the original tuple should be ack'd
+ verify(outputCollector, times(1))
+ .ack(eq(tuple));
+ }
+
+ /**
+ * If there are two profiles that need the same message, then two tuples should
+ * be emitted. One tuple for each profile.
+ */
+ @Test
+ public void testEmitTupleWithTwoProfiles() throws Exception {
+
+ // setup the bolt and execute a tuple
+ ProfilerConfig config = toProfilerConfig(twoProfilesDefined);
+ ProfileSplitterBolt bolt = createBolt(config);
+ bolt.execute(tuple);
+
+ // the expected tuple fields
+ final String expectedEntity = "global";
+ {
+ // a tuple should be emitted for the first profile
+ ProfileConfig profile1 = config.getProfiles().get(0);
+ Values expected = new Values(message, timestamp, expectedEntity, profile1);
+ verify(outputCollector, times(1))
+ .emit(eq(tuple), eq(expected));
+ }
+ {
+ // a tuple should be emitted for the second profile
+ ProfileConfig profile2 = config.getProfiles().get(1);
+ Values expected = new Values(message, timestamp, expectedEntity, profile2);
+ verify(outputCollector, times(1))
+ .emit(eq(tuple), eq(expected));
+ }
+
+ // the original tuple should be ack'd
+ verify(outputCollector, times(1))
+ .ack(eq(tuple));
+ }
+
+ /**
+ * No tuples should be emitted, if no profiles are defined.
+ */
+ @Test
+ public void testNoProfilesDefined() throws Exception {
+
+ // setup the bolt and execute a tuple
+ ProfilerConfig config = toProfilerConfig(noProfilesDefined);
+ ProfileSplitterBolt bolt = createBolt(config);
+ bolt.execute(tuple);
+
+ // no tuple should be emitted
+ verify(outputCollector, times(0))
+ .emit(any(Tuple.class), any());
+
+ // the original tuple should be ack'd
+ verify(outputCollector, times(1))
+ .ack(eq(tuple));
}
/**
@@ -154,17 +308,17 @@ public class ProfileSplitterBoltTest extends BaseBoltTest {
@Test
public void testOnlyIfTrue() throws Exception {
- // setup
- ProfileSplitterBolt bolt = createBolt(onlyIfTrue);
-
- // execute
+ ProfilerConfig config = toProfilerConfig(profileWithOnlyIfTrue);
+ ProfileSplitterBolt bolt = createBolt(config);
bolt.execute(tuple);
// a tuple should be emitted for the downstream profile builder
- verify(outputCollector, times(1)).emit(refEq(tuple), any(Values.class));
+ verify(outputCollector, times(1))
+ .emit(eq(tuple), any(Values.class));
// the original tuple should be ack'd
- verify(outputCollector, times(1)).ack(tuple);
+ verify(outputCollector, times(1))
+ .ack(eq(tuple));
}
/**
@@ -174,17 +328,17 @@ public class ProfileSplitterBoltTest extends BaseBoltTest {
@Test
public void testOnlyIfMissing() throws Exception {
- // setup
- ProfileSplitterBolt bolt = createBolt(onlyIfMissing);
-
- // execute
+ ProfilerConfig config = toProfilerConfig(profileWithOnlyIfMissing);
+ ProfileSplitterBolt bolt = createBolt(config);
bolt.execute(tuple);
// a tuple should be emitted for the downstream profile builder
- verify(outputCollector, times(1)).emit(refEq(tuple), any(Values.class));
+ verify(outputCollector, times(1))
+ .emit(eq(tuple), any(Values.class));
// the original tuple should be ack'd
- verify(outputCollector, times(1)).ack(tuple);
+ verify(outputCollector, times(1))
+ .ack(eq(tuple));
}
/**
@@ -194,36 +348,45 @@ public class ProfileSplitterBoltTest extends BaseBoltTest {
@Test
public void testOnlyIfFalse() throws Exception {
- // setup
- ProfileSplitterBolt bolt = createBolt(onlyIfFalse);
-
- // execute
+ ProfilerConfig config = toProfilerConfig(profileWithOnlyIfFalse);
+ ProfileSplitterBolt bolt = createBolt(config);
bolt.execute(tuple);
// a tuple should NOT be emitted for the downstream profile builder
- verify(outputCollector, times(0)).emit(any(Values.class));
+ verify(outputCollector, times(0))
+ .emit(any());
// the original tuple should be ack'd
- verify(outputCollector, times(1)).ack(tuple);
+ verify(outputCollector, times(1))
+ .ack(eq(tuple));
}
/**
- * The entity associated with a ProfileMeasurement can be defined using a variable that is resolved
- * via Stella. In this case the entity is defined as 'ip_src_addr' which is resolved to
- * '10.0.0.1' based on the data contained within the message.
+ * The entity associated with a profile is defined with a Stellar expression. That expression
+ * can refer to any field within the message.
+ *
+ * In this case the entity is defined as 'ip_src_addr' which is resolved to '10.0.0.1' based on
+ * the data contained within the message.
*/
@Test
public void testResolveEntityName() throws Exception {
- // setup
- ProfileSplitterBolt bolt = createBolt(onlyIfTrue);
-
- // execute
+ ProfilerConfig config = toProfilerConfig(profileWithOnlyIfTrue);
+ ProfileSplitterBolt bolt = createBolt(config);
bolt.execute(tuple);
- // verify - the entity name comes from variable resolution in stella
+ // expected values
String expectedEntity = "10.0.0.1";
- verify(outputCollector, times(1)).emit(any(Tuple.class), refEq(new Values(expectedEntity, onlyIfTrue, message)));
+ ProfileConfig expectedConfig = config.getProfiles().get(0);
+ Values expected = new Values(message, timestamp, expectedEntity, expectedConfig);
+
+ // a tuple should be emitted for the downstream profile builder
+ verify(outputCollector, times(1))
+ .emit(eq(tuple), eq(expected));
+
+ // the original tuple should be ack'd
+ verify(outputCollector, times(1))
+ .ack(eq(tuple));
}
/**
@@ -232,11 +395,42 @@ public class ProfileSplitterBoltTest extends BaseBoltTest {
@Test
public void testOnlyIfInvalid() throws Exception {
- // setup
- ProfileSplitterBolt bolt = createBolt(onlyIfInvalid);
+ ProfilerConfig config = toProfilerConfig(profileWithOnlyIfInvalid);
+ ProfileSplitterBolt bolt = createBolt(config);
bolt.execute(tuple);
// a tuple should NOT be emitted for the downstream profile builder
- verify(outputCollector, times(0)).emit(any(Values.class));
+ verify(outputCollector, times(0))
+ .emit(any(Values.class));
+ }
+
+ /**
+ * Creates a ProfilerConfig based on a string containing JSON.
+ *
+ * @param configAsJSON The config as JSON.
+ * @return The ProfilerConfig.
+ * @throws Exception
+ */
+ private ProfilerConfig toProfilerConfig(String configAsJSON) throws Exception {
+ InputStream in = new ByteArrayInputStream(configAsJSON.getBytes("UTF-8"));
+ return JSONUtils.INSTANCE.load(in, ProfilerConfig.class);
}
+
+ /**
+ * Create a ProfileSplitterBolt to test
+ */
+ private ProfileSplitterBolt createBolt(ProfilerConfig config) throws Exception {
+
+ ProfileSplitterBolt bolt = new ProfileSplitterBolt("zookeeperURL");
+ bolt.setCuratorFramework(client);
+ bolt.setZKCache(cache);
+ bolt.getConfigurations().updateProfilerConfig(config);
+ bolt.prepare(new HashMap<>(), topologyContext, outputCollector);
+
+ // set the clock factory AFTER calling prepare to use the fixed clock factory
+ bolt.setClockFactory(new FixedClockFactory(timestamp));
+
+ return bolt;
+ }
+
}
http://git-wip-us.apache.org/repos/asf/metron/blob/3083b471/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/integration/MessageBuilder.java
----------------------------------------------------------------------
diff --git a/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/integration/MessageBuilder.java b/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/integration/MessageBuilder.java
new file mode 100644
index 0000000..7e1628b
--- /dev/null
+++ b/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/integration/MessageBuilder.java
@@ -0,0 +1,75 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.metron.profiler.integration;
+
+import org.json.simple.JSONObject;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Enables simple creation of telemetry messages for testing.
+ */
+public class MessageBuilder {
+
+ private Map<Object, Object> fields;
+
+ /**
+ * Create a new {@link MessageBuilder}.
+ */
+ public MessageBuilder() {
+ this.fields = new HashMap<>();
+ }
+
+ /**
+ * Adds all of the fields from a message to this message.
+ *
+ * @param prototype The other message that is treated as a prototype.
+ * @return A {@link MessageBuilder}
+ */
+ public MessageBuilder withFields(JSONObject prototype) {
+ prototype.forEach((key, val) -> this.fields.put(key, val));
+ return this;
+ }
+
+ /**
+ * Adds a field to the message.
+ *
+ * @param key The field name.
+ * @param value The field value.
+ * @return A {@link MessageBuilder}
+ */
+ public MessageBuilder withField(String key, Object value) {
+ this.fields.put(key, value);
+ return this;
+ }
+
+ /**
+ * Build the message.
+ *
+ * <p>This should be called after defining all of the message fields.
+ *
+ * @return A {@link MessageBuilder}.
+ */
+ public JSONObject build() {
+ return new JSONObject(fields);
+ }
+}
http://git-wip-us.apache.org/repos/asf/metron/blob/3083b471/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/integration/ProfilerIntegrationTest.java
----------------------------------------------------------------------
diff --git a/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/integration/ProfilerIntegrationTest.java b/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/integration/ProfilerIntegrationTest.java
index 0d1b465..c48a3e9 100644
--- a/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/integration/ProfilerIntegrationTest.java
+++ b/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/integration/ProfilerIntegrationTest.java
@@ -28,15 +28,18 @@ import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.metron.common.Constants;
import org.apache.metron.common.utils.SerDeUtils;
-import org.apache.metron.hbase.mock.MockHTable;
import org.apache.metron.hbase.mock.MockHBaseTableProvider;
+import org.apache.metron.hbase.mock.MockHTable;
import org.apache.metron.integration.BaseIntegrationTest;
import org.apache.metron.integration.ComponentRunner;
import org.apache.metron.integration.UnableToStartException;
import org.apache.metron.integration.components.FluxTopologyComponent;
import org.apache.metron.integration.components.KafkaComponent;
import org.apache.metron.integration.components.ZKServerComponent;
+import org.apache.metron.profiler.ProfileMeasurement;
import org.apache.metron.profiler.hbase.ColumnBuilder;
+import org.apache.metron.profiler.hbase.RowKeyBuilder;
+import org.apache.metron.profiler.hbase.SaltyRowKeyBuilder;
import org.apache.metron.profiler.hbase.ValueOnlyColumnBuilder;
import org.apache.metron.statistics.OnlineStatisticsProvider;
import org.junit.After;
@@ -49,15 +52,15 @@ import org.junit.Test;
import java.io.File;
import java.util.ArrayList;
import java.util.Arrays;
-import java.util.Collections;
import java.util.List;
import java.util.Properties;
-import java.util.stream.Collectors;
-import java.util.stream.Stream;
+import java.util.concurrent.TimeUnit;
import static com.google.code.tempusfugit.temporal.Duration.seconds;
import static com.google.code.tempusfugit.temporal.Timeout.timeout;
import static com.google.code.tempusfugit.temporal.WaitFor.waitOrTimeout;
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
/**
* An integration test of the Profiler topology.
@@ -105,7 +108,6 @@ public class ProfilerIntegrationTest extends BaseIntegrationTest {
private static FluxTopologyComponent fluxComponent;
private static KafkaComponent kafkaComponent;
private static ConfigUploadComponent configUploadComponent;
- private static List<byte[]> input;
private static ComponentRunner runner;
private static MockHTable profilerTable;
@@ -114,7 +116,13 @@ public class ProfilerIntegrationTest extends BaseIntegrationTest {
private static final double epsilon = 0.001;
private static final String inputTopic = Constants.INDEXING_TOPIC;
private static final String outputTopic = "profiles";
+ private static final int saltDivisor = 10;
+ private static final long windowLagMillis = TimeUnit.SECONDS.toMillis(5);
+ private static final long windowDurationMillis = TimeUnit.SECONDS.toMillis(5);
+ private static final long periodDurationMillis = TimeUnit.SECONDS.toMillis(15);
+ private static final long profileTimeToLiveMillis = TimeUnit.SECONDS.toMillis(20);
+ private static final long maxRoutesPerBolt = 100000;
/**
* Tests the first example contained within the README.
@@ -122,22 +130,25 @@ public class ProfilerIntegrationTest extends BaseIntegrationTest {
@Test
public void testExample1() throws Exception {
- update(TEST_RESOURCES + "/config/zookeeper/readme-example-1");
+ uploadConfig(TEST_RESOURCES + "/config/zookeeper/readme-example-1");
// start the topology and write test messages to kafka
fluxComponent.submitTopology();
- kafkaComponent.writeMessages(inputTopic, input);
+ kafkaComponent.writeMessages(inputTopic, message1, message1, message1);
+ kafkaComponent.writeMessages(inputTopic, message2, message2, message2);
+ kafkaComponent.writeMessages(inputTopic, message3, message3, message3);
// verify - ensure the profile is being persisted
waitOrTimeout(() -> profilerTable.getPutLog().size() > 0,
- timeout(seconds(90)));
+ timeout(seconds(180)));
// verify - only 10.0.0.2 sends 'HTTP', thus there should be only 1 value
- List<Double> actuals = read(profilerTable.getPutLog(), columnFamily, columnBuilder.getColumnQualifier("value"), Double.class);
+ List<Double> actuals = read(profilerTable.getPutLog(), columnFamily,
+ columnBuilder.getColumnQualifier("value"), Double.class);
- // verify - there are 5 'HTTP' each with 390 bytes
+ // verify - there are 3 'HTTP' each with 390 bytes
Assert.assertTrue(actuals.stream().anyMatch(val ->
- MathUtils.equals(390.0 * 5, val, epsilon)
+ MathUtils.equals(390.0 * 3, val, epsilon)
));
}
@@ -147,11 +158,13 @@ public class ProfilerIntegrationTest extends BaseIntegrationTest {
@Test
public void testExample2() throws Exception {
- update(TEST_RESOURCES + "/config/zookeeper/readme-example-2");
+ uploadConfig(TEST_RESOURCES + "/config/zookeeper/readme-example-2");
// start the topology and write test messages to kafka
fluxComponent.submitTopology();
- kafkaComponent.writeMessages(inputTopic, input);
+ kafkaComponent.writeMessages(inputTopic, message1, message1, message1);
+ kafkaComponent.writeMessages(inputTopic, message2, message2, message2);
+ kafkaComponent.writeMessages(inputTopic, message3, message3, message3);
// expect 2 values written by the profile; one for 10.0.0.2 and another for 10.0.0.3
final int expected = 2;
@@ -161,16 +174,17 @@ public class ProfilerIntegrationTest extends BaseIntegrationTest {
timeout(seconds(90)));
// verify - expect 2 results as 2 hosts involved; 10.0.0.2 sends 'HTTP' and 10.0.0.3 send 'DNS'
- List<Double> actuals = read(profilerTable.getPutLog(), columnFamily, columnBuilder.getColumnQualifier("value"), Double.class);
+ List<Double> actuals = read(profilerTable.getPutLog(), columnFamily,
+ columnBuilder.getColumnQualifier("value"), Double.class);
- // verify - 10.0.0.3 -> 1/6
- Assert.assertTrue( "Could not find a value near 1/6. Actual values read are are: " + Joiner.on(",").join(actuals)
- , actuals.stream().anyMatch(val -> MathUtils.equals(val, 1.0/6.0, epsilon)
+ // verify - 10.0.0.3 -> 1/4
+ Assert.assertTrue( "Could not find a value near 1/4. Actual values read are are: " + Joiner.on(",").join(actuals),
+ actuals.stream().anyMatch(val -> MathUtils.equals(val, 1.0/4.0, epsilon)
));
- // verify - 10.0.0.2 -> 6/1
- Assert.assertTrue("Could not find a value near 6. Actual values read are are: " + Joiner.on(",").join(actuals)
- ,actuals.stream().anyMatch(val -> MathUtils.equals(val, 6.0/1.0, epsilon)
+ // verify - 10.0.0.2 -> 4/1
+ Assert.assertTrue("Could not find a value near 4. Actual values read are are: " + Joiner.on(",").join(actuals),
+ actuals.stream().anyMatch(val -> MathUtils.equals(val, 4.0/1.0, epsilon)
));
}
@@ -180,22 +194,25 @@ public class ProfilerIntegrationTest extends BaseIntegrationTest {
@Test
public void testExample3() throws Exception {
- update(TEST_RESOURCES + "/config/zookeeper/readme-example-3");
+ uploadConfig(TEST_RESOURCES + "/config/zookeeper/readme-example-3");
// start the topology and write test messages to kafka
fluxComponent.submitTopology();
- kafkaComponent.writeMessages(inputTopic, input);
+ kafkaComponent.writeMessages(inputTopic, message1, message1, message1);
+ kafkaComponent.writeMessages(inputTopic, message2, message2, message2);
+ kafkaComponent.writeMessages(inputTopic, message3, message3, message3);
// verify - ensure the profile is being persisted
waitOrTimeout(() -> profilerTable.getPutLog().size() > 0,
timeout(seconds(90)));
// verify - only 10.0.0.2 sends 'HTTP', thus there should be only 1 value
- List<Double> actuals = read(profilerTable.getPutLog(), columnFamily, columnBuilder.getColumnQualifier("value"), Double.class);
+ List<Double> actuals = read(profilerTable.getPutLog(), columnFamily,
+ columnBuilder.getColumnQualifier("value"), Double.class);
// verify - there are 5 'HTTP' messages each with a length of 20, thus the average should be 20
- Assert.assertTrue("Could not find a value near 20. Actual values read are are: " + Joiner.on(",").join(actuals)
- , actuals.stream().anyMatch(val -> MathUtils.equals(val, 20.0, epsilon)
+ Assert.assertTrue("Could not find a value near 20. Actual values read are are: " + Joiner.on(",").join(actuals),
+ actuals.stream().anyMatch(val -> MathUtils.equals(val, 20.0, epsilon)
));
}
@@ -205,11 +222,13 @@ public class ProfilerIntegrationTest extends BaseIntegrationTest {
@Test
public void testExample4() throws Exception {
- update(TEST_RESOURCES + "/config/zookeeper/readme-example-4");
+ uploadConfig(TEST_RESOURCES + "/config/zookeeper/readme-example-4");
// start the topology and write test messages to kafka
fluxComponent.submitTopology();
- kafkaComponent.writeMessages(inputTopic, input);
+ kafkaComponent.writeMessages(inputTopic, message1, message1, message1);
+ kafkaComponent.writeMessages(inputTopic, message2, message2, message2);
+ kafkaComponent.writeMessages(inputTopic, message3, message3, message3);
// verify - ensure the profile is being persisted
waitOrTimeout(() -> profilerTable.getPutLog().size() > 0,
@@ -220,34 +239,109 @@ public class ProfilerIntegrationTest extends BaseIntegrationTest {
List<OnlineStatisticsProvider> actuals = read(profilerTable.getPutLog(), columnFamily, column, OnlineStatisticsProvider.class);
// verify - there are 5 'HTTP' messages each with a length of 20, thus the average should be 20
- Assert.assertTrue("Could not find a value near 20. Actual values read are are: " + Joiner.on(",").join(actuals)
- , actuals.stream().anyMatch(val -> MathUtils.equals(val.getMean(), 20.0, epsilon)
+ Assert.assertTrue("Could not find a value near 20. Actual values read are are: " + Joiner.on(",").join(actuals),
+ actuals.stream().anyMatch(val -> MathUtils.equals(val.getMean(), 20.0, epsilon)
));
}
@Test
public void testPercentiles() throws Exception {
- update(TEST_RESOURCES + "/config/zookeeper/percentiles");
+ uploadConfig(TEST_RESOURCES + "/config/zookeeper/percentiles");
+
+ // start the topology and write test messages to kafka
+ fluxComponent.submitTopology();
+ kafkaComponent.writeMessages(inputTopic, message1, message1, message1);
+ kafkaComponent.writeMessages(inputTopic, message2, message2, message2);
+ kafkaComponent.writeMessages(inputTopic, message3, message3, message3);
+
+ // verify - ensure the profile is being persisted
+ waitOrTimeout(() -> profilerTable.getPutLog().size() > 0,
+ timeout(seconds(90)));
+
+ List<Double> actuals = read(profilerTable.getPutLog(), columnFamily,
+ columnBuilder.getColumnQualifier("value"), Double.class);
+ // verify - the 70th percentile of x3, 20s = 20.0
+ Assert.assertTrue("Could not find a value near 20. Actual values read are are: " + Joiner.on(",").join(actuals),
+ actuals.stream().anyMatch(val -> MathUtils.equals(val, 20.0, epsilon)));
+ }
+
+ /**
+ * The Profiler can optionally perform event time processing. With event time processing,
+ * the Profiler uses timestamps contained in the source telemetry.
+ *
+ * <p>Defining a 'timestampField' within the Profiler configuration tells the Profiler
+ * from which field the timestamp should be extracted.
+ */
+ @Test
+ public void testEventTimeProcessing() throws Exception {
+
+ // constants used for the test
+ final long startAt = 10;
+ final String entity = "10.0.0.1";
+ final String profileName = "event-time-test";
+
+ // create some messages that contain a timestamp - a really old timestamp; close to 1970
+ String message1 = new MessageBuilder()
+ .withField("ip_src_addr", entity)
+ .withField("timestamp", startAt)
+ .build()
+ .toJSONString();
+
+ String message2 = new MessageBuilder()
+ .withField("ip_src_addr", entity)
+ .withField("timestamp", startAt + 100)
+ .build()
+ .toJSONString();
+
+ uploadConfig(TEST_RESOURCES + "/config/zookeeper/event-time-test");
// start the topology and write test messages to kafka
fluxComponent.submitTopology();
- kafkaComponent.writeMessages(inputTopic, input);
+ kafkaComponent.writeMessages(inputTopic, message1, message2);
// verify - ensure the profile is being persisted
waitOrTimeout(() -> profilerTable.getPutLog().size() > 0,
timeout(seconds(90)));
- List<Double> actuals = read(profilerTable.getPutLog(), columnFamily, columnBuilder.getColumnQualifier("value"), Double.class);
+ List<Put> puts = profilerTable.getPutLog();
+ assertEquals(1, puts.size());
+
+ // inspect the row key to ensure the profiler used event time correctly. the timestamp
+ // embedded in the row key should match those in the source telemetry
+ byte[] expectedRowKey = generateExpectedRowKey(profileName, entity, startAt);
+ byte[] actualRowKey = puts.get(0).getRow();
+ String msg = String.format("expected '%s', got '%s'",
+ new String(expectedRowKey, "UTF-8"),
+ new String(actualRowKey, "UTF-8"));
+ assertArrayEquals(msg, expectedRowKey, actualRowKey);
+ }
- // verify - the 70th percentile of 5 x 20s = 20.0
- Assert.assertTrue("Could not find a value near 20. Actual values read are are: " + Joiner.on(",").join(actuals)
- , actuals.stream().anyMatch(val -> MathUtils.equals(val, 20.0, epsilon)));
+ /**
+ * Generates the expected row key.
+ *
+ * @param profileName The name of the profile.
+ * @param entity The entity.
+ * @param whenMillis A timestamp in epoch milliseconds.
+ * @return A row key.
+ */
+ private byte[] generateExpectedRowKey(String profileName, String entity, long whenMillis) {
+
+ // only the profile name, entity, and period are used to generate the row key
+ ProfileMeasurement measurement = new ProfileMeasurement()
+ .withProfileName(profileName)
+ .withEntity(entity)
+ .withPeriod(whenMillis, periodDurationMillis, TimeUnit.MILLISECONDS);
+
+ // build the row key
+ RowKeyBuilder rowKeyBuilder = new SaltyRowKeyBuilder(saltDivisor, periodDurationMillis, TimeUnit.MILLISECONDS);
+ return rowKeyBuilder.rowKey(measurement);
}
/**
* Reads a value written by the Profiler.
+ *
* @param family The column family.
* @param qualifier The column qualifier.
* @param clazz The expected type of the value.
@@ -258,7 +352,8 @@ public class ProfilerIntegrationTest extends BaseIntegrationTest {
List<T> results = new ArrayList<>();
for(Put put: puts) {
- for(Cell cell: put.get(Bytes.toBytes(family), qualifier)) {
+ List<Cell> cells = put.get(Bytes.toBytes(family), qualifier);
+ for(Cell cell : cells) {
T value = SerDeUtils.fromBytes(cell.getValue(), clazz);
results.add(value);
}
@@ -271,39 +366,41 @@ public class ProfilerIntegrationTest extends BaseIntegrationTest {
public static void setupBeforeClass() throws UnableToStartException {
columnBuilder = new ValueOnlyColumnBuilder(columnFamily);
- List<String> inputNew = Stream.of(message1, message2, message3)
- .map(m -> Collections.nCopies(5, m))
- .flatMap(l -> l.stream())
- .collect(Collectors.toList());
-
- // create input messages for the profiler to consume
- input = Stream.of(message1, message2, message3)
- .map(Bytes::toBytes)
- .map(m -> Collections.nCopies(5, m))
- .flatMap(l -> l.stream())
- .collect(Collectors.toList());
-
// storm topology properties
final Properties topologyProperties = new Properties() {{
- setProperty("kafka.start", "UNCOMMITTED_EARLIEST");
+
+ // storm settings
setProperty("profiler.workers", "1");
setProperty("profiler.executors", "0");
+ setProperty("storm.auto.credentials", "[]");
+ setProperty("topology.auto-credentials", "[]");
+ setProperty("topology.message.timeout.secs", "60");
+ setProperty("topology.max.spout.pending", "100000");
+
+ // kafka settings
setProperty("profiler.input.topic", inputTopic);
setProperty("profiler.output.topic", outputTopic);
- setProperty("profiler.period.duration", "20");
- setProperty("profiler.period.duration.units", "SECONDS");
- setProperty("profiler.ttl", "30");
- setProperty("profiler.ttl.units", "MINUTES");
- setProperty("profiler.hbase.salt.divisor", "10");
+ setProperty("kafka.start", "UNCOMMITTED_EARLIEST");
+ setProperty("kafka.security.protocol", "PLAINTEXT");
+
+ // hbase settings
+ setProperty("profiler.hbase.salt.divisor", Integer.toString(saltDivisor));
setProperty("profiler.hbase.table", tableName);
setProperty("profiler.hbase.column.family", columnFamily);
setProperty("profiler.hbase.batch", "10");
setProperty("profiler.hbase.flush.interval.seconds", "1");
- setProperty("profiler.profile.ttl", "20");
setProperty("hbase.provider.impl", "" + MockHBaseTableProvider.class.getName());
- setProperty("storm.auto.credentials", "[]");
- setProperty("kafka.security.protocol", "PLAINTEXT");
- setProperty("topology.auto-credentials", "[]");
+
+ // profile settings
+ setProperty("profiler.period.duration", Long.toString(periodDurationMillis));
+ setProperty("profiler.period.duration.units", "MILLISECONDS");
+ setProperty("profiler.ttl", Long.toString(profileTimeToLiveMillis));
+ setProperty("profiler.ttl.units", "MILLISECONDS");
+ setProperty("profiler.window.duration", Long.toString(windowDurationMillis));
+ setProperty("profiler.window.duration.units", "MILLISECONDS");
+ setProperty("profiler.window.lag", Long.toString(windowLagMillis));
+ setProperty("profiler.window.lag.units", "MILLISECONDS");
+ setProperty("profiler.max.routes.per.bolt", Long.toString(maxRoutesPerBolt));
}};
// create the mock table
@@ -311,7 +408,7 @@ public class ProfilerIntegrationTest extends BaseIntegrationTest {
zkComponent = getZKServerComponent(topologyProperties);
- // create the input topic
+ // create the input and output topics
kafkaComponent = getKafkaComponent(topologyProperties, Arrays.asList(
new KafkaComponent.Topic(inputTopic, 1),
new KafkaComponent.Topic(outputTopic, 1)));
@@ -340,12 +437,6 @@ public class ProfilerIntegrationTest extends BaseIntegrationTest {
runner.start();
}
- public void update(String path) throws Exception {
- configUploadComponent.withGlobalConfiguration(path)
- .withProfilerConfiguration(path);
- configUploadComponent.update();
- }
-
@AfterClass
public static void tearDownAfterClass() throws Exception {
MockHBaseTableProvider.clear();
@@ -368,4 +459,16 @@ public class ProfilerIntegrationTest extends BaseIntegrationTest {
runner.reset();
}
}
-}
\ No newline at end of file
+
+ /**
+ * Uploads config values to Zookeeper.
+ * @param path The path on the local filesystem to the config values.
+ * @throws Exception
+ */
+ public void uploadConfig(String path) throws Exception {
+ configUploadComponent
+ .withGlobalConfiguration(path)
+ .withProfilerConfiguration(path)
+ .update();
+ }
+}
http://git-wip-us.apache.org/repos/asf/metron/blob/3083b471/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/configuration/metron-profiler-env.xml
----------------------------------------------------------------------
diff --git a/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/configuration/metron-profiler-env.xml b/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/configuration/metron-profiler-env.xml
index c7f6ce2..8546b56 100644
--- a/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/configuration/metron-profiler-env.xml
+++ b/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/configuration/metron-profiler-env.xml
@@ -57,9 +57,33 @@
<type>value-list</type>
<entries>
<entry>
- <value>DAYS</value>
+ <value>HOURS</value>
</entry>
<entry>
+ <value>MINUTES</value>
+ </entry>
+ <entry>
+ <value>SECONDS</value>
+ </entry>
+ </entries>
+ <selection-cardinality>1</selection-cardinality>
+ </value-attributes>
+ </property>
+ <property>
+ <name>profiler_window_duration</name>
+ <value>30</value>
+ <description>The duration of each profile window. This value should be defined along with profiler.window.duration.units</description>
+ <display-name>Window Duration</display-name>
+ </property>
+ <property>
+ <name>profiler_window_units</name>
+ <value>SECONDS</value>
+ <description>The units used to specify the profiler.window.duration. This value should be defined along with profiler.window.duration.</description>
+ <display-name>Window Units</display-name>
+ <value-attributes>
+ <type>value-list</type>
+ <entries>
+ <entry>
<value>HOURS</value>
</entry>
<entry>
@@ -71,7 +95,6 @@
</entries>
<selection-cardinality>1</selection-cardinality>
</value-attributes>
-
</property>
<property>
<name>profiler_ttl</name>
@@ -104,8 +127,54 @@
</entries>
<selection-cardinality>1</selection-cardinality>
</value-attributes>
-
-
+ </property>
+ <property>
+ <name>profiler_window_lag</name>
+ <value>1</value>
+ <description>The maximum time lag for timestamps. Timestamps cannot arrive out-of-order by more than this amount.</description>
+ <display-name>Window Time Lag</display-name>
+ </property>
+ <property>
+ <name>profiler_window_lag_units</name>
+ <value>MINUTES</value>
+ <description>The units used to specify the Event Time Lag.</description>
+ <display-name>Window Lag Units</display-name>
+ <value-attributes>
+ <type>value-list</type>
+ <entries>
+ <entry>
+ <value>HOURS</value>
+ </entry>
+ <entry>
+ <value>MINUTES</value>
+ </entry>
+ <entry>
+ <value>SECONDS</value>
+ </entry>
+ </entries>
+ <selection-cardinality>1</selection-cardinality>
+ </value-attributes>
+ </property>
+ <property>
+ <name>profiler_topology_message_timeout_secs</name>
+ <description>The maximum amount of time a message has to complete before it is considered failed.</description>
+ <display-name>Profiler Topology Message Timeout</display-name>
+ <value>900</value>
+ </property>
+ <property>
+ <name>profiler_topology_max_spout_pending</name>
+ <description>Profiler Topology Spout Max Pending Tuples</description>
+ <display-name>Spout Max Pending Tuples</display-name>
+ <value/>
+ <value-attributes>
+ <empty-value-valid>true</empty-value-valid>
+ </value-attributes>
+ </property>
+ <property>
+ <name>profiler_max_routes_per_bolt</name>
+ <value>100000</value>
+ <description>The max number of routes allowed per bolt. The number of routes increases as the number of profiles and entities increases.</description>
+ <display-name>Max Routes Per Bolt</display-name>
</property>
<property>
<name>profiler_hbase_table</name>