You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@metron.apache.org by ni...@apache.org on 2018/10/04 21:45:56 UTC
metron git commit: METRON-1791 Add GUID to Messages Produced by
Profiler (nickwallen) closes apache/metron#1210
Repository: metron
Updated Branches:
refs/heads/master b872fdcd0 -> 7e222fa47
METRON-1791 Add GUID to Messages Produced by Profiler (nickwallen) closes apache/metron#1210
Project: http://git-wip-us.apache.org/repos/asf/metron/repo
Commit: http://git-wip-us.apache.org/repos/asf/metron/commit/7e222fa4
Tree: http://git-wip-us.apache.org/repos/asf/metron/tree/7e222fa4
Diff: http://git-wip-us.apache.org/repos/asf/metron/diff/7e222fa4
Branch: refs/heads/master
Commit: 7e222fa47aedc4ab3bafec54590ec0bc73b5f75c
Parents: b872fdc
Author: nickwallen <ni...@nickallen.org>
Authored: Thu Oct 4 17:45:32 2018 -0400
Committer: nickallen <ni...@apache.org>
Committed: Thu Oct 4 17:45:32 2018 -0400
----------------------------------------------------------------------
.../metron/profiler/storm/KafkaEmitter.java | 31 +++++++++---
.../zookeeper/triage-result/profiler.json | 20 ++++++++
.../metron/profiler/storm/KafkaEmitterTest.java | 4 ++
.../integration/ProfilerIntegrationTest.java | 52 ++++++++++++++------
4 files changed, 85 insertions(+), 22 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/metron/blob/7e222fa4/metron-analytics/metron-profiler-storm/src/main/java/org/apache/metron/profiler/storm/KafkaEmitter.java
----------------------------------------------------------------------
diff --git a/metron-analytics/metron-profiler-storm/src/main/java/org/apache/metron/profiler/storm/KafkaEmitter.java b/metron-analytics/metron-profiler-storm/src/main/java/org/apache/metron/profiler/storm/KafkaEmitter.java
index af1fbca..adbde1b 100644
--- a/metron-analytics/metron-profiler-storm/src/main/java/org/apache/metron/profiler/storm/KafkaEmitter.java
+++ b/metron-analytics/metron-profiler-storm/src/main/java/org/apache/metron/profiler/storm/KafkaEmitter.java
@@ -21,6 +21,7 @@ package org.apache.metron.profiler.storm;
import org.apache.commons.collections4.MapUtils;
import org.apache.commons.lang3.ClassUtils;
+import org.apache.metron.common.Constants;
import org.apache.metron.profiler.ProfileMeasurement;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.topology.OutputFieldsDeclarer;
@@ -33,6 +34,7 @@ import org.slf4j.LoggerFactory;
import java.io.Serializable;
import java.lang.invoke.MethodHandles;
import java.util.Map;
+import java.util.UUID;
/**
* Responsible for emitting a {@link ProfileMeasurement} to an output stream that will
@@ -42,6 +44,14 @@ public class KafkaEmitter implements ProfileMeasurementEmitter, Serializable {
protected static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+ public static final String PROFILE_FIELD = "profile";
+ public static final String ENTITY_FIELD = "entity";
+ public static final String PERIOD_ID_FIELD = "period";
+ public static final String PERIOD_START_FIELD = "period.start";
+ public static final String PERIOD_END_FIELD = "period.end";
+ public static final String TIMESTAMP_FIELD = "timestamp";
+ public static final String ALERT_FIELD = "is_alert";
+
/**
* The stream identifier used for this destination;
*/
@@ -126,14 +136,15 @@ public class KafkaEmitter implements ProfileMeasurementEmitter, Serializable {
private JSONObject createMessage(ProfileMeasurement measurement) {
JSONObject message = new JSONObject();
- message.put("profile", measurement.getDefinition().getProfile());
- message.put("entity", measurement.getEntity());
- message.put("period", measurement.getPeriod().getPeriod());
- message.put("period.start", measurement.getPeriod().getStartTimeMillis());
- message.put("period.end", measurement.getPeriod().getEndTimeMillis());
- message.put("timestamp", System.currentTimeMillis());
- message.put("source.type", sourceType);
- message.put("is_alert", "true");
+ message.put(PROFILE_FIELD, measurement.getDefinition().getProfile());
+ message.put(ENTITY_FIELD, measurement.getEntity());
+ message.put(PERIOD_ID_FIELD, measurement.getPeriod().getPeriod());
+ message.put(PERIOD_START_FIELD, measurement.getPeriod().getStartTimeMillis());
+ message.put(PERIOD_END_FIELD, measurement.getPeriod().getEndTimeMillis());
+ message.put(TIMESTAMP_FIELD, System.currentTimeMillis());
+ message.put(Constants.SENSOR_TYPE, sourceType);
+ message.put(ALERT_FIELD, "true");
+ message.put(Constants.GUID, UUID.randomUUID().toString());
return message;
}
@@ -158,6 +169,10 @@ public class KafkaEmitter implements ProfileMeasurementEmitter, Serializable {
this.streamId = streamId;
}
+ public String getSourceType() {
+ return sourceType;
+ }
+
public void setSourceType(String sourceType) {
this.sourceType = sourceType;
}
http://git-wip-us.apache.org/repos/asf/metron/blob/7e222fa4/metron-analytics/metron-profiler-storm/src/test/config/zookeeper/triage-result/profiler.json
----------------------------------------------------------------------
diff --git a/metron-analytics/metron-profiler-storm/src/test/config/zookeeper/triage-result/profiler.json b/metron-analytics/metron-profiler-storm/src/test/config/zookeeper/triage-result/profiler.json
new file mode 100644
index 0000000..7d63da7
--- /dev/null
+++ b/metron-analytics/metron-profiler-storm/src/test/config/zookeeper/triage-result/profiler.json
@@ -0,0 +1,20 @@
+{
+ "profiles": [
+ {
+ "profile": "profile-with-triage",
+ "foreach": "'global'",
+ "update": {
+ "stats": "STATS_ADD(stats, 1)"
+ },
+ "result": {
+ "profile": "stats",
+ "triage": {
+ "min": "STATS_MIN(stats)",
+ "max": "STATS_MAX(stats)",
+ "mean": "STATS_MEAN(stats)"
+ }
+ }
+ }
+ ],
+ "timestampField": "timestamp"
+}
http://git-wip-us.apache.org/repos/asf/metron/blob/7e222fa4/metron-analytics/metron-profiler-storm/src/test/java/org/apache/metron/profiler/storm/KafkaEmitterTest.java
----------------------------------------------------------------------
diff --git a/metron-analytics/metron-profiler-storm/src/test/java/org/apache/metron/profiler/storm/KafkaEmitterTest.java b/metron-analytics/metron-profiler-storm/src/test/java/org/apache/metron/profiler/storm/KafkaEmitterTest.java
index 51ca3a4..86849ac 100644
--- a/metron-analytics/metron-profiler-storm/src/test/java/org/apache/metron/profiler/storm/KafkaEmitterTest.java
+++ b/metron-analytics/metron-profiler-storm/src/test/java/org/apache/metron/profiler/storm/KafkaEmitterTest.java
@@ -22,6 +22,7 @@ package org.apache.metron.profiler.storm;
import com.google.common.collect.ImmutableMap;
import org.adrianwalker.multilinestring.Multiline;
+import org.apache.metron.common.Constants;
import org.apache.metron.common.configuration.profiler.ProfileConfig;
import org.apache.metron.common.utils.JSONUtils;
import org.apache.metron.profiler.ProfileMeasurement;
@@ -148,6 +149,7 @@ public class KafkaEmitterTest {
assertEquals(measurement.getPeriod().getEndTimeMillis(), actual.get("period.end"));
assertEquals("profiler", actual.get("source.type"));
assertNotNull(actual.get("timestamp"));
+ assertNotNull(actual.get(Constants.GUID));
// validate that the triage value has been added
assertEquals(measurement.getTriageValues().get("triage-key"), actual.get("triage-key"));
@@ -214,6 +216,8 @@ public class KafkaEmitterTest {
assertEquals(measurement.getPeriod().getStartTimeMillis(), actual.get("period.start"));
assertEquals(measurement.getPeriod().getEndTimeMillis(), actual.get("period.end"));
assertEquals("profiler", actual.get("source.type"));
+ assertNotNull(actual.get("timestamp"));
+ assertNotNull(actual.get(Constants.GUID));
// the invalid expression should be skipped and not included in the message
assertFalse(actual.containsKey("invalid"));
http://git-wip-us.apache.org/repos/asf/metron/blob/7e222fa4/metron-analytics/metron-profiler-storm/src/test/java/org/apache/metron/profiler/storm/integration/ProfilerIntegrationTest.java
----------------------------------------------------------------------
diff --git a/metron-analytics/metron-profiler-storm/src/test/java/org/apache/metron/profiler/storm/integration/ProfilerIntegrationTest.java b/metron-analytics/metron-profiler-storm/src/test/java/org/apache/metron/profiler/storm/integration/ProfilerIntegrationTest.java
index 4389d42..64a1482 100644
--- a/metron-analytics/metron-profiler-storm/src/test/java/org/apache/metron/profiler/storm/integration/ProfilerIntegrationTest.java
+++ b/metron-analytics/metron-profiler-storm/src/test/java/org/apache/metron/profiler/storm/integration/ProfilerIntegrationTest.java
@@ -40,6 +40,8 @@ import org.apache.metron.stellar.common.StellarStatefulExecutor;
import org.apache.metron.stellar.dsl.Context;
import org.apache.metron.stellar.dsl.functions.resolver.SimpleFunctionResolver;
import org.apache.storm.Config;
+import org.json.simple.JSONObject;
+import org.json.simple.parser.JSONParser;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
@@ -49,7 +51,6 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
-import java.io.UnsupportedEncodingException;
import java.lang.invoke.MethodHandles;
import java.util.Arrays;
import java.util.Collections;
@@ -68,6 +69,13 @@ import static org.apache.metron.profiler.client.stellar.ProfilerClientConfig.PRO
import static org.apache.metron.profiler.client.stellar.ProfilerClientConfig.PROFILER_PERIOD;
import static org.apache.metron.profiler.client.stellar.ProfilerClientConfig.PROFILER_PERIOD_UNITS;
import static org.apache.metron.profiler.client.stellar.ProfilerClientConfig.PROFILER_SALT_DIVISOR;
+import static org.apache.metron.profiler.storm.KafkaEmitter.ALERT_FIELD;
+import static org.apache.metron.profiler.storm.KafkaEmitter.ENTITY_FIELD;
+import static org.apache.metron.profiler.storm.KafkaEmitter.PERIOD_END_FIELD;
+import static org.apache.metron.profiler.storm.KafkaEmitter.PERIOD_ID_FIELD;
+import static org.apache.metron.profiler.storm.KafkaEmitter.PERIOD_START_FIELD;
+import static org.apache.metron.profiler.storm.KafkaEmitter.PROFILE_FIELD;
+import static org.apache.metron.profiler.storm.KafkaEmitter.TIMESTAMP_FIELD;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
@@ -285,18 +293,35 @@ public class ProfilerIntegrationTest extends BaseIntegrationTest {
assertTrue(results.get(0) instanceof OnlineStatisticsProvider);
}
- /**
- * Generates an error message for if the byte comparison fails.
- *
- * @param expected The expected value.
- * @param actual The actual value.
- * @return
- * @throws UnsupportedEncodingException
- */
- private String failMessage(byte[] expected, byte[] actual) throws UnsupportedEncodingException {
- return String.format("expected '%s', got '%s'",
- new String(expected, "UTF-8"),
- new String(actual, "UTF-8"));
+ @Test
+ public void testProfileWithTriageResult() throws Exception {
+ uploadConfigToZookeeper(TEST_RESOURCES + "/config/zookeeper/triage-result");
+
+ // start the topology and write test messages to kafka
+ fluxComponent.submitTopology();
+ List<String> telemetry = FileUtils.readLines(new File("src/test/resources/telemetry.json"));
+ kafkaComponent.writeMessages(inputTopic, telemetry);
+
+ // wait until the triage message is output to kafka
+ waitOrTimeout(() -> kafkaComponent.readMessages(outputTopic).size() > 0, timeout(seconds(90)));
+
+ List<byte[]> outputMessages = kafkaComponent.readMessages(outputTopic);
+ assertEquals(1, outputMessages.size());
+
+ // validate the triage message
+ JSONObject message = (JSONObject) new JSONParser().parse(new String(outputMessages.get(0), "UTF-8"));
+ assertEquals("profile-with-triage", message.get(PROFILE_FIELD));
+ assertEquals("global", message.get(ENTITY_FIELD));
+ assertEquals(76548935L, message.get(PERIOD_ID_FIELD));
+ assertEquals(1530978700000L, message.get(PERIOD_START_FIELD));
+ assertEquals(1530978720000L, message.get(PERIOD_END_FIELD));
+ assertEquals("profiler", message.get(Constants.SENSOR_TYPE));
+ assertEquals("true", message.get(ALERT_FIELD));
+ assertEquals(1.0, message.get("min"));
+ assertEquals(1.0, message.get("max"));
+ assertEquals(1.0, message.get("mean"));
+ assertTrue(message.containsKey(TIMESTAMP_FIELD));
+ assertTrue(message.containsKey(Constants.GUID));
}
private static String getMessage(String ipSource, long timestamp) {
@@ -471,7 +496,6 @@ public class ProfilerIntegrationTest extends BaseIntegrationTest {
*/
private <T> T execute(String expression, Class<T> clazz) {
T results = executor.execute(expression, Collections.emptyMap(), clazz);
-
LOG.debug("{} = {}", expression, results);
return results;
}