You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@metron.apache.org by ot...@apache.org on 2018/04/18 15:00:09 UTC
[39/52] [abbrv] metron git commit: METRON-1494 Profiler Emits
Messages to Kafka When Not Needed (nickwallen) closes apache/metron#967
METRON-1494 Profiler Emits Messages to Kafka When Not Needed (nickwallen) closes apache/metron#967
Project: http://git-wip-us.apache.org/repos/asf/metron/repo
Commit: http://git-wip-us.apache.org/repos/asf/metron/commit/62d1a1bf
Tree: http://git-wip-us.apache.org/repos/asf/metron/tree/62d1a1bf
Diff: http://git-wip-us.apache.org/repos/asf/metron/diff/62d1a1bf
Branch: refs/heads/feature/METRON-1211-extensions-parsers-gradual
Commit: 62d1a1bf7e8b9b3ee2f260c358719ea5080c9045
Parents: 438893b
Author: nickwallen <ni...@nickallen.org>
Authored: Wed Apr 11 17:57:09 2018 -0400
Committer: nickallen <ni...@apache.org>
Committed: Wed Apr 11 17:57:09 2018 -0400
----------------------------------------------------------------------
.../metron/profiler/DefaultProfileBuilder.java | 5 +
.../bolt/FixedFrequencyFlushSignal.java | 13 +-
.../metron/profiler/bolt/HBaseEmitter.java | 12 +-
.../metron/profiler/bolt/KafkaEmitter.java | 78 +++++--
.../profiler/bolt/ProfileSplitterBolt.java | 5 +
.../metron/profiler/bolt/HBaseEmitterTest.java | 120 +++++++++++
.../metron/profiler/bolt/KafkaEmitterTest.java | 201 +++++++++++++------
7 files changed, 358 insertions(+), 76 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/metron/blob/62d1a1bf/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/DefaultProfileBuilder.java
----------------------------------------------------------------------
diff --git a/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/DefaultProfileBuilder.java b/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/DefaultProfileBuilder.java
index 4b564c9..66034ac 100644
--- a/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/DefaultProfileBuilder.java
+++ b/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/DefaultProfileBuilder.java
@@ -124,8 +124,13 @@ public class DefaultProfileBuilder implements ProfileBuilder, Serializable {
*/
@Override
public void apply(JSONObject message, long timestamp) {
+ LOG.debug("Applying message to profile; profile={}, entity={}, timestamp={}",
+ profileName, entity, timestamp);
+
try {
if (!isInitialized()) {
+ LOG.debug("Initializing profile; profile={}, entity={}, timestamp={}",
+ profileName, entity, timestamp);
// execute each 'init' expression
assign(definition.getInit(), message, "init");
http://git-wip-us.apache.org/repos/asf/metron/blob/62d1a1bf/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/bolt/FixedFrequencyFlushSignal.java
----------------------------------------------------------------------
diff --git a/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/bolt/FixedFrequencyFlushSignal.java b/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/bolt/FixedFrequencyFlushSignal.java
index b9f57dd..8c0a0b1 100644
--- a/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/bolt/FixedFrequencyFlushSignal.java
+++ b/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/bolt/FixedFrequencyFlushSignal.java
@@ -94,7 +94,8 @@ public class FixedFrequencyFlushSignal implements FlushSignal {
// set the next time to flush
flushTime = currentTime + flushFrequency;
- LOG.debug("Setting flush time; flushTime={}, currentTime={}, flushFreq={}",
+ LOG.debug("Setting flush time; '{}' ms until flush; flushTime={}, currentTime={}, flushFreq={}",
+ timeToNextFlush(),
flushTime,
currentTime,
flushFrequency);
@@ -112,7 +113,7 @@ public class FixedFrequencyFlushSignal implements FlushSignal {
boolean flush = currentTime > flushTime;
LOG.debug("Flush={}, '{}' ms until flush; currentTime={}, flushTime={}",
flush,
- flush ? 0 : (flushTime-currentTime),
+ timeToNextFlush(),
currentTime,
flushTime);
@@ -123,4 +124,12 @@ public class FixedFrequencyFlushSignal implements FlushSignal {
public long currentTimeMillis() {
return currentTime;
}
+
+ /**
+ * Returns the number of milliseconds to the next flush.
+ * @return The time left until the next flush.
+ */
+ private long timeToNextFlush() {
+ return Math.max(0, flushTime - currentTime);
+ }
}
http://git-wip-us.apache.org/repos/asf/metron/blob/62d1a1bf/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/bolt/HBaseEmitter.java
----------------------------------------------------------------------
diff --git a/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/bolt/HBaseEmitter.java b/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/bolt/HBaseEmitter.java
index 8e1229a..e4e3552 100644
--- a/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/bolt/HBaseEmitter.java
+++ b/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/bolt/HBaseEmitter.java
@@ -40,7 +40,7 @@ public class HBaseEmitter implements ProfileMeasurementEmitter, Serializable {
/**
* The stream identifier used for this destination;
*/
- private String streamId = "hbase";
+ private String streamId = "hbase";
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
@@ -49,7 +49,17 @@ public class HBaseEmitter implements ProfileMeasurementEmitter, Serializable {
@Override
public void emit(ProfileMeasurement measurement, OutputCollector collector) {
+
+ // measurements are always emitted to hbase
collector.emit(getStreamId(), new Values(measurement));
+
+ LOG.debug("Emitted measurement; stream={}, profile={}, entity={}, period={}, start={}, end={}",
+ getStreamId(),
+ measurement.getProfileName(),
+ measurement.getEntity(),
+ measurement.getPeriod().getPeriod(),
+ measurement.getPeriod().getStartTimeMillis(),
+ measurement.getPeriod().getEndTimeMillis());
}
@Override
http://git-wip-us.apache.org/repos/asf/metron/blob/62d1a1bf/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/bolt/KafkaEmitter.java
----------------------------------------------------------------------
diff --git a/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/bolt/KafkaEmitter.java b/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/bolt/KafkaEmitter.java
index 29d1a49..87920da 100644
--- a/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/bolt/KafkaEmitter.java
+++ b/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/bolt/KafkaEmitter.java
@@ -19,8 +19,7 @@
package org.apache.metron.profiler.bolt;
-import java.io.Serializable;
-import java.lang.invoke.MethodHandles;
+import org.apache.commons.collections4.MapUtils;
import org.apache.commons.lang3.ClassUtils;
import org.apache.metron.profiler.ProfileMeasurement;
import org.apache.storm.task.OutputCollector;
@@ -31,6 +30,10 @@ import org.json.simple.JSONObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.io.Serializable;
+import java.lang.invoke.MethodHandles;
+import java.util.Map;
+
/**
* Responsible for emitting a {@link ProfileMeasurement} to an output stream that will
* persist data in HBase.
@@ -58,19 +61,48 @@ public class KafkaEmitter implements ProfileMeasurementEmitter, Serializable {
@Override
public void emit(ProfileMeasurement measurement, OutputCollector collector) {
- JSONObject message = new JSONObject();
- message.put("profile", measurement.getDefinition().getProfile());
- message.put("entity", measurement.getEntity());
- message.put("period", measurement.getPeriod().getPeriod());
- message.put("period.start", measurement.getPeriod().getStartTimeMillis());
- message.put("period.end", measurement.getPeriod().getEndTimeMillis());
- message.put("timestamp", System.currentTimeMillis());
- message.put("source.type", sourceType);
- message.put("is_alert", "true");
+ // only need to emit, if there are triage values
+ Map<String, Object> triageValues = measurement.getTriageValues();
+ if(MapUtils.isNotEmpty(triageValues)) {
+
+ JSONObject message = createMessage(measurement);
+ appendTriageValues(measurement, message);
+ collector.emit(getStreamId(), new Values(message));
+
+ LOG.debug("Emitted measurement; stream={}, profile={}, entity={}, period={}, start={}, end={}",
+ getStreamId(),
+ measurement.getProfileName(),
+ measurement.getEntity(),
+ measurement.getPeriod().getPeriod(),
+ measurement.getPeriod().getStartTimeMillis(),
+ measurement.getPeriod().getEndTimeMillis());
+
+ } else {
+
+ LOG.debug("No triage values, nothing to emit; stream={}, profile={}, entity={}, period={}, start={}, end={}",
+ getStreamId(),
+ measurement.getProfileName(),
+ measurement.getEntity(),
+ measurement.getPeriod().getPeriod(),
+ measurement.getPeriod().getStartTimeMillis(),
+ measurement.getPeriod().getEndTimeMillis());
+ }
+ }
- // append each of the triage values to the message
- measurement.getTriageValues().forEach((key, value) -> {
+ /**
+ * Appends triage values obtained from a {@code ProfileMeasurement} to the
+ * outgoing message.
+ *
+ * @param measurement The measurement that may contain triage values.
+ * @param message The message that the triage values are appended to.
+ */
+ private void appendTriageValues(ProfileMeasurement measurement, JSONObject message) {
+
+ // for each triage value...
+ Map<String, Object> triageValues = MapUtils.emptyIfNull(measurement.getTriageValues());
+ triageValues.forEach((key, value) -> {
+ // append the triage value to the message
if(isValidType(value)) {
message.put(key, value);
@@ -83,8 +115,26 @@ public class KafkaEmitter implements ProfileMeasurementEmitter, Serializable {
key));
}
});
+ }
+
+ /**
+ * Creates a message that will be emitted to Kafka.
+ *
+ * @param measurement The profile measurement used as a basis for the message.
+ * @return A message that can be emitted to Kafka.
+ */
+ private JSONObject createMessage(ProfileMeasurement measurement) {
- collector.emit(getStreamId(), new Values(message));
+ JSONObject message = new JSONObject();
+ message.put("profile", measurement.getDefinition().getProfile());
+ message.put("entity", measurement.getEntity());
+ message.put("period", measurement.getPeriod().getPeriod());
+ message.put("period.start", measurement.getPeriod().getStartTimeMillis());
+ message.put("period.end", measurement.getPeriod().getEndTimeMillis());
+ message.put("timestamp", System.currentTimeMillis());
+ message.put("source.type", sourceType);
+ message.put("is_alert", "true");
+ return message;
}
/**
http://git-wip-us.apache.org/repos/asf/metron/blob/62d1a1bf/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/bolt/ProfileSplitterBolt.java
----------------------------------------------------------------------
diff --git a/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/bolt/ProfileSplitterBolt.java b/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/bolt/ProfileSplitterBolt.java
index a92a432..f28411f 100644
--- a/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/bolt/ProfileSplitterBolt.java
+++ b/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/bolt/ProfileSplitterBolt.java
@@ -190,6 +190,11 @@ public class ProfileSplitterBolt extends ConfiguredProfilerBolt {
Values values = createValues(message, timestamp, route);
collector.emit(input, values);
+
+ LOG.debug("Found route for message; profile={}, entity={}, timestamp={}",
+ route.getProfileDefinition().getProfile(),
+ route.getEntity(),
+ timestamp);
}
LOG.debug("Found {} route(s) for message with timestamp={}", routes.size(), timestamp);
http://git-wip-us.apache.org/repos/asf/metron/blob/62d1a1bf/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/bolt/HBaseEmitterTest.java
----------------------------------------------------------------------
diff --git a/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/bolt/HBaseEmitterTest.java b/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/bolt/HBaseEmitterTest.java
new file mode 100644
index 0000000..35ca4d9
--- /dev/null
+++ b/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/bolt/HBaseEmitterTest.java
@@ -0,0 +1,120 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.metron.profiler.bolt;
+
+import org.adrianwalker.multilinestring.Multiline;
+import org.apache.metron.common.configuration.profiler.ProfileConfig;
+import org.apache.metron.common.utils.JSONUtils;
+import org.apache.metron.profiler.ProfileMeasurement;
+import org.apache.storm.task.OutputCollector;
+import org.apache.storm.tuple.Values;
+import org.json.simple.JSONObject;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Mockito;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.concurrent.TimeUnit;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.eq;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
+/**
+ * Tests the HBaseEmitter class.
+ */
+public class HBaseEmitterTest {
+
+ /**
+ * {
+ * "profile": "profile-one",
+ * "foreach": "ip_src_addr",
+ * "init": { "x": "0" },
+ * "update": { "x": "x + 1" },
+ * "result": "x"
+ * }
+ */
+ @Multiline
+ private String profileDefinition;
+
+ private HBaseEmitter emitter;
+ private ProfileConfig profile;
+ private OutputCollector collector;
+
+ @Before
+ public void setup() throws Exception {
+ emitter = new HBaseEmitter();
+ profile = createDefinition(profileDefinition);
+ collector = Mockito.mock(OutputCollector.class);
+ }
+
+ /**
+ * The handler should emit a message containing the result of executing
+ * the 'result/profile' expression.
+ */
+ @Test
+ public void testEmit() throws Exception {
+
+ // create a measurement that has triage values
+ ProfileMeasurement measurement = new ProfileMeasurement()
+ .withProfileName("profile")
+ .withEntity("entity")
+ .withPeriod(20000, 15, TimeUnit.MINUTES)
+ .withDefinition(profile)
+ .withProfileValue(22);
+
+ // execute the test
+ emitter.emit(measurement, collector);
+
+ // the measurement should be emitted as-is
+ ProfileMeasurement actual = expectMeasurement(emitter, collector);
+ assertEquals(measurement, actual);
+ }
+
+ /**
+ * Verifies that the emitter does emit a {@code ProfileMeasurement}.
+ *
+ * @return The {@code ProfileMeasurement} that was emitted
+ */
+ private ProfileMeasurement expectMeasurement(HBaseEmitter hbaseEmitter, OutputCollector collector) {
+
+ ArgumentCaptor<Values> arg = ArgumentCaptor.forClass(Values.class);
+ verify(collector, times(1)).emit(eq(hbaseEmitter.getStreamId()), arg.capture());
+ Values values = arg.getValue();
+ assertTrue(values.get(0) instanceof ProfileMeasurement);
+ return (ProfileMeasurement) values.get(0);
+ }
+
+ /**
+ * Creates a profile definition based on a string of JSON.
+ * @param json The string of JSON.
+ */
+ private ProfileConfig createDefinition(String json) throws IOException {
+ return JSONUtils.INSTANCE.load(json, ProfileConfig.class);
+ }
+}
http://git-wip-us.apache.org/repos/asf/metron/blob/62d1a1bf/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/bolt/KafkaEmitterTest.java
----------------------------------------------------------------------
diff --git a/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/bolt/KafkaEmitterTest.java b/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/bolt/KafkaEmitterTest.java
index b02e377..95a2d29 100644
--- a/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/bolt/KafkaEmitterTest.java
+++ b/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/bolt/KafkaEmitterTest.java
@@ -43,6 +43,7 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
+import static org.mockito.Matchers.any;
import static org.mockito.Mockito.eq;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
@@ -58,54 +59,128 @@ public class KafkaEmitterTest {
* "foreach": "ip_src_addr",
* "init": { "x": "0" },
* "update": { "x": "x + 1" },
- * "result": "x"
+ * "result": {
+ * "profile": "x",
+ * "triage": {
+ * "value": "x"
+ * }
+ * }
* }
*/
@Multiline
- private String profileDefinition;
+ private String profileDefinitionWithTriage;
- private KafkaEmitter handler;
+ private KafkaEmitter kafkaEmitter;
private ProfileConfig profile;
private OutputCollector collector;
@Before
public void setup() throws Exception {
- handler = new KafkaEmitter();
- profile = createDefinition(profileDefinition);
+ kafkaEmitter = new KafkaEmitter();
+ profile = createDefinition(profileDefinitionWithTriage);
collector = Mockito.mock(OutputCollector.class);
}
/**
- * The handler must serialize the ProfileMeasurement into a JSONObject.
+ * The handler should emit a message when a result/triage expression(s) has been defined.
*/
@Test
- public void testSerialization() throws Exception {
+ public void testEmit() throws Exception {
+ // create a measurement that has triage values
+ ProfileMeasurement measurement = new ProfileMeasurement()
+ .withProfileName("profile")
+ .withEntity("entity")
+ .withPeriod(20000, 15, TimeUnit.MINUTES)
+ .withDefinition(profile)
+ .withTriageValues(Collections.singletonMap("triage-key", "triage-value"));
+
+ // execute the test
+ kafkaEmitter.emit(measurement, collector);
+
+ // a message should be emitted
+ verify(collector, times(1)).emit(eq(kafkaEmitter.getStreamId()), any());
+ }
+
+ /**
+ * The handler should NOT emit a message when there is NO result/triage value(s).
+ */
+ @Test
+ public void testDoNotEmit() throws Exception {
+
+ // create a measurement with NO triage values
ProfileMeasurement measurement = new ProfileMeasurement()
.withProfileName("profile")
.withEntity("entity")
.withPeriod(20000, 15, TimeUnit.MINUTES)
- .withTriageValues(Collections.singletonMap("triage-key", "triage-value"))
.withDefinition(profile);
- handler.emit(measurement, collector);
- ArgumentCaptor<Values> arg = ArgumentCaptor.forClass(Values.class);
- verify(collector, times(1)).emit(eq(handler.getStreamId()), arg.capture());
+ // execute the test
+ kafkaEmitter.emit(measurement, collector);
- // expect a JSONObject
- Values values = arg.getValue();
- assertTrue(values.get(0) instanceof JSONObject);
+ // a message should NOT be emitted
+ verify(collector, times(0)).emit(eq(kafkaEmitter.getStreamId()), any());
+ }
- // validate the json
- JSONObject actual = (JSONObject) values.get(0);
- assertEquals(measurement.getDefinition().getProfile(), actual.get("profile"));
- assertEquals(measurement.getEntity(), actual.get("entity"));
- assertEquals(measurement.getPeriod().getPeriod(), actual.get("period"));
- assertEquals(measurement.getPeriod().getStartTimeMillis(), actual.get("period.start"));
- assertEquals(measurement.getPeriod().getEndTimeMillis(), actual.get("period.end"));
- assertEquals(measurement.getTriageValues().get("triage-key"), actual.get("triage-key"));
+ /**
+ * Validate that the message generated for Kafka should include the triage value.
+ */
+ @Test
+ public void testTriageValueInMessage() throws Exception {
+
+ // create a measurement that has triage values
+ ProfileMeasurement measurement = new ProfileMeasurement()
+ .withDefinition(profile)
+ .withProfileName(profile.getProfile())
+ .withEntity("entity")
+ .withPeriod(20000, 15, TimeUnit.MINUTES)
+ .withTriageValues(Collections.singletonMap("triage-key", "triage-value"));
+
+ // execute the test
+ kafkaEmitter.emit(measurement, collector);
+ JSONObject actual = expectJsonObject(kafkaEmitter, collector);
+
+ // validate the core parts of the message
+ assertEquals(measurement.getProfileName(), actual.get("profile"));
+ assertEquals(measurement.getEntity(), actual.get("entity"));
+ assertEquals(measurement.getPeriod().getPeriod(), actual.get("period"));
+ assertEquals(measurement.getPeriod().getStartTimeMillis(), actual.get("period.start"));
+ assertEquals(measurement.getPeriod().getEndTimeMillis(), actual.get("period.end"));
+ assertEquals("profiler", actual.get("source.type"));
assertNotNull(actual.get("timestamp"));
- assertEquals("profiler", actual.get("source.type"));
+
+ // validate that the triage value has been added
+ assertEquals(measurement.getTriageValues().get("triage-key"), actual.get("triage-key"));
+ }
+
+ /**
+ * Validate that the message generated for Kafka can include multiple triage values.
+ */
+ @Test
+ public void testMultipleTriageValueInMessage() throws Exception {
+
+ // multiple triage values have been defined
+ Map<String, Object> triageValues = ImmutableMap.of(
+ "x", 2,
+ "y", "4",
+ "z", 6.0);
+
+ // create a measurement that has multiple triage values
+ ProfileMeasurement measurement = new ProfileMeasurement()
+ .withDefinition(profile)
+ .withProfileName(profile.getProfile())
+ .withEntity("entity")
+ .withPeriod(20000, 15, TimeUnit.MINUTES)
+ .withTriageValues(triageValues);
+
+ // execute the test
+ kafkaEmitter.emit(measurement, collector);
+ JSONObject actual = expectJsonObject(kafkaEmitter, collector);
+
+ // validate that ALL of the triage values have been added
+ assertEquals(measurement.getTriageValues().get("x"), actual.get("x"));
+ assertEquals(measurement.getTriageValues().get("y"), actual.get("y"));
+ assertEquals(measurement.getTriageValues().get("z"), actual.get("z"));
}
/**
@@ -120,30 +195,27 @@ public class KafkaEmitterTest {
"invalid", new OnlineStatisticsProvider(),
"valid", 4);
+ // create the measurement with a Map as a triage value; this is not allowed
ProfileMeasurement measurement = new ProfileMeasurement()
- .withProfileName("profile")
+ .withDefinition(profile)
+ .withProfileName(profile.getProfile())
.withEntity("entity")
.withPeriod(20000, 15, TimeUnit.MINUTES)
- .withTriageValues(triageValues)
- .withDefinition(profile);
- handler.emit(measurement, collector);
+ .withTriageValues(triageValues);
- ArgumentCaptor<Values> arg = ArgumentCaptor.forClass(Values.class);
- verify(collector, times(1)).emit(eq(handler.getStreamId()), arg.capture());
- Values values = arg.getValue();
- assertTrue(values.get(0) instanceof JSONObject);
+ // execute the test
+ kafkaEmitter.emit(measurement, collector);
+ JSONObject actual = expectJsonObject(kafkaEmitter, collector);
- // only the triage expression value itself should have been skipped, all others should be there
- JSONObject actual = (JSONObject) values.get(0);
- assertEquals(measurement.getDefinition().getProfile(), actual.get("profile"));
- assertEquals(measurement.getEntity(), actual.get("entity"));
- assertEquals(measurement.getPeriod().getPeriod(), actual.get("period"));
- assertEquals(measurement.getPeriod().getStartTimeMillis(), actual.get("period.start"));
- assertEquals(measurement.getPeriod().getEndTimeMillis(), actual.get("period.end"));
- assertNotNull(actual.get("timestamp"));
- assertEquals("profiler", actual.get("source.type"));
+ // validate the core parts of the message still exist
+ assertEquals(measurement.getProfileName(), actual.get("profile"));
+ assertEquals(measurement.getEntity(), actual.get("entity"));
+ assertEquals(measurement.getPeriod().getPeriod(), actual.get("period"));
+ assertEquals(measurement.getPeriod().getStartTimeMillis(), actual.get("period.start"));
+ assertEquals(measurement.getPeriod().getEndTimeMillis(), actual.get("period.end"));
+ assertEquals("profiler", actual.get("source.type"));
- // the invalid expression should be skipped due to invalid type
+ // the invalid expression should be skipped and not included in the message
assertFalse(actual.containsKey("invalid"));
// but the valid expression should still be there
@@ -156,19 +228,18 @@ public class KafkaEmitterTest {
*/
@Test
public void testIntegerIsValidType() throws Exception {
+
+ // create a measurement with a triage value that is an integer
ProfileMeasurement measurement = new ProfileMeasurement()
- .withProfileName("profile")
+ .withDefinition(profile)
+ .withProfileName(profile.getProfile())
.withEntity("entity")
.withPeriod(20000, 15, TimeUnit.MINUTES)
- .withTriageValues(Collections.singletonMap("triage-key", 123))
- .withDefinition(profile);
- handler.emit(measurement, collector);
+ .withTriageValues(Collections.singletonMap("triage-key", 123));
- ArgumentCaptor<Values> arg = ArgumentCaptor.forClass(Values.class);
- verify(collector, times(1)).emit(eq(handler.getStreamId()), arg.capture());
- Values values = arg.getValue();
- assertTrue(values.get(0) instanceof JSONObject);
- JSONObject actual = (JSONObject) values.get(0);
+ // execute the test
+ kafkaEmitter.emit(measurement, collector);
+ JSONObject actual = expectJsonObject(kafkaEmitter, collector);
// the triage expression is valid
assertEquals(measurement.getTriageValues().get("triage-key"), actual.get("triage-key"));
@@ -180,25 +251,37 @@ public class KafkaEmitterTest {
*/
@Test
public void testStringIsValidType() throws Exception {
+
+ // create a measurement with a triage value that is a string
ProfileMeasurement measurement = new ProfileMeasurement()
- .withProfileName("profile")
+ .withDefinition(profile)
+ .withProfileName(profile.getProfile())
.withEntity("entity")
.withPeriod(20000, 15, TimeUnit.MINUTES)
- .withTriageValues(Collections.singletonMap("triage-key", "value"))
- .withDefinition(profile);
- handler.emit(measurement, collector);
+ .withTriageValues(Collections.singletonMap("triage-key", "value"));
- ArgumentCaptor<Values> arg = ArgumentCaptor.forClass(Values.class);
- verify(collector, times(1)).emit(eq(handler.getStreamId()), arg.capture());
- Values values = arg.getValue();
- assertTrue(values.get(0) instanceof JSONObject);
- JSONObject actual = (JSONObject) values.get(0);
+ // execute the test
+ kafkaEmitter.emit(measurement, collector);
+ JSONObject actual = expectJsonObject(kafkaEmitter, collector);
// the triage expression is valid
assertEquals(measurement.getTriageValues().get("triage-key"), actual.get("triage-key"));
}
/**
+ * Verifies that the KafkaEmitter does emit a JSONObject.
+ * @return The JSONObject that was emitted
+ */
+ private JSONObject expectJsonObject(KafkaEmitter kafkaEmitter, OutputCollector collector) {
+
+ ArgumentCaptor<Values> arg = ArgumentCaptor.forClass(Values.class);
+ verify(collector, times(1)).emit(eq(kafkaEmitter.getStreamId()), arg.capture());
+ Values values = arg.getValue();
+ assertTrue(values.get(0) instanceof JSONObject);
+ return (JSONObject) values.get(0);
+ }
+
+ /**
* Creates a profile definition based on a string of JSON.
* @param json The string of JSON.
*/