You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@metron.apache.org by ni...@apache.org on 2018/10/04 21:45:56 UTC

metron git commit: METRON-1791 Add GUID to Messages Produced by Profiler (nickwallen) closes apache/metron#1210

Repository: metron
Updated Branches:
  refs/heads/master b872fdcd0 -> 7e222fa47


METRON-1791 Add GUID to Messages Produced by Profiler (nickwallen) closes apache/metron#1210


Project: http://git-wip-us.apache.org/repos/asf/metron/repo
Commit: http://git-wip-us.apache.org/repos/asf/metron/commit/7e222fa4
Tree: http://git-wip-us.apache.org/repos/asf/metron/tree/7e222fa4
Diff: http://git-wip-us.apache.org/repos/asf/metron/diff/7e222fa4

Branch: refs/heads/master
Commit: 7e222fa47aedc4ab3bafec54590ec0bc73b5f75c
Parents: b872fdc
Author: nickwallen <ni...@nickallen.org>
Authored: Thu Oct 4 17:45:32 2018 -0400
Committer: nickallen <ni...@apache.org>
Committed: Thu Oct 4 17:45:32 2018 -0400

----------------------------------------------------------------------
 .../metron/profiler/storm/KafkaEmitter.java     | 31 +++++++++---
 .../zookeeper/triage-result/profiler.json       | 20 ++++++++
 .../metron/profiler/storm/KafkaEmitterTest.java |  4 ++
 .../integration/ProfilerIntegrationTest.java    | 52 ++++++++++++++------
 4 files changed, 85 insertions(+), 22 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/metron/blob/7e222fa4/metron-analytics/metron-profiler-storm/src/main/java/org/apache/metron/profiler/storm/KafkaEmitter.java
----------------------------------------------------------------------
diff --git a/metron-analytics/metron-profiler-storm/src/main/java/org/apache/metron/profiler/storm/KafkaEmitter.java b/metron-analytics/metron-profiler-storm/src/main/java/org/apache/metron/profiler/storm/KafkaEmitter.java
index af1fbca..adbde1b 100644
--- a/metron-analytics/metron-profiler-storm/src/main/java/org/apache/metron/profiler/storm/KafkaEmitter.java
+++ b/metron-analytics/metron-profiler-storm/src/main/java/org/apache/metron/profiler/storm/KafkaEmitter.java
@@ -21,6 +21,7 @@ package org.apache.metron.profiler.storm;
 
 import org.apache.commons.collections4.MapUtils;
 import org.apache.commons.lang3.ClassUtils;
+import org.apache.metron.common.Constants;
 import org.apache.metron.profiler.ProfileMeasurement;
 import org.apache.storm.task.OutputCollector;
 import org.apache.storm.topology.OutputFieldsDeclarer;
@@ -33,6 +34,7 @@ import org.slf4j.LoggerFactory;
 import java.io.Serializable;
 import java.lang.invoke.MethodHandles;
 import java.util.Map;
+import java.util.UUID;
 
 /**
  * Responsible for emitting a {@link ProfileMeasurement} to an output stream that will
@@ -42,6 +44,14 @@ public class KafkaEmitter implements ProfileMeasurementEmitter, Serializable {
 
   protected static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
 
+  public static final String PROFILE_FIELD = "profile";
+  public static final String ENTITY_FIELD = "entity";
+  public static final String PERIOD_ID_FIELD = "period";
+  public static final String PERIOD_START_FIELD = "period.start";
+  public static final String PERIOD_END_FIELD = "period.end";
+  public static final String TIMESTAMP_FIELD = "timestamp";
+  public static final String ALERT_FIELD = "is_alert";
+
   /**
    * The stream identifier used for this destination;
    */
@@ -126,14 +136,15 @@ public class KafkaEmitter implements ProfileMeasurementEmitter, Serializable {
   private JSONObject createMessage(ProfileMeasurement measurement) {
 
     JSONObject message = new JSONObject();
-    message.put("profile", measurement.getDefinition().getProfile());
-    message.put("entity", measurement.getEntity());
-    message.put("period", measurement.getPeriod().getPeriod());
-    message.put("period.start", measurement.getPeriod().getStartTimeMillis());
-    message.put("period.end", measurement.getPeriod().getEndTimeMillis());
-    message.put("timestamp", System.currentTimeMillis());
-    message.put("source.type", sourceType);
-    message.put("is_alert", "true");
+    message.put(PROFILE_FIELD, measurement.getDefinition().getProfile());
+    message.put(ENTITY_FIELD, measurement.getEntity());
+    message.put(PERIOD_ID_FIELD, measurement.getPeriod().getPeriod());
+    message.put(PERIOD_START_FIELD, measurement.getPeriod().getStartTimeMillis());
+    message.put(PERIOD_END_FIELD, measurement.getPeriod().getEndTimeMillis());
+    message.put(TIMESTAMP_FIELD, System.currentTimeMillis());
+    message.put(Constants.SENSOR_TYPE, sourceType);
+    message.put(ALERT_FIELD, "true");
+    message.put(Constants.GUID, UUID.randomUUID().toString());
     return message;
   }
 
@@ -158,6 +169,10 @@ public class KafkaEmitter implements ProfileMeasurementEmitter, Serializable {
     this.streamId = streamId;
   }
 
+  public String getSourceType() {
+    return sourceType;
+  }
+
   public void setSourceType(String sourceType) {
     this.sourceType = sourceType;
   }

http://git-wip-us.apache.org/repos/asf/metron/blob/7e222fa4/metron-analytics/metron-profiler-storm/src/test/config/zookeeper/triage-result/profiler.json
----------------------------------------------------------------------
diff --git a/metron-analytics/metron-profiler-storm/src/test/config/zookeeper/triage-result/profiler.json b/metron-analytics/metron-profiler-storm/src/test/config/zookeeper/triage-result/profiler.json
new file mode 100644
index 0000000..7d63da7
--- /dev/null
+++ b/metron-analytics/metron-profiler-storm/src/test/config/zookeeper/triage-result/profiler.json
@@ -0,0 +1,20 @@
+{
+  "profiles": [
+    {
+      "profile": "profile-with-triage",
+      "foreach": "'global'",
+      "update": {
+        "stats": "STATS_ADD(stats, 1)"
+      },
+      "result": {
+        "profile": "stats",
+        "triage": {
+          "min": "STATS_MIN(stats)",
+          "max": "STATS_MAX(stats)",
+          "mean": "STATS_MEAN(stats)"
+        }
+      }
+    }
+  ],
+  "timestampField": "timestamp"
+}

http://git-wip-us.apache.org/repos/asf/metron/blob/7e222fa4/metron-analytics/metron-profiler-storm/src/test/java/org/apache/metron/profiler/storm/KafkaEmitterTest.java
----------------------------------------------------------------------
diff --git a/metron-analytics/metron-profiler-storm/src/test/java/org/apache/metron/profiler/storm/KafkaEmitterTest.java b/metron-analytics/metron-profiler-storm/src/test/java/org/apache/metron/profiler/storm/KafkaEmitterTest.java
index 51ca3a4..86849ac 100644
--- a/metron-analytics/metron-profiler-storm/src/test/java/org/apache/metron/profiler/storm/KafkaEmitterTest.java
+++ b/metron-analytics/metron-profiler-storm/src/test/java/org/apache/metron/profiler/storm/KafkaEmitterTest.java
@@ -22,6 +22,7 @@ package org.apache.metron.profiler.storm;
 
 import com.google.common.collect.ImmutableMap;
 import org.adrianwalker.multilinestring.Multiline;
+import org.apache.metron.common.Constants;
 import org.apache.metron.common.configuration.profiler.ProfileConfig;
 import org.apache.metron.common.utils.JSONUtils;
 import org.apache.metron.profiler.ProfileMeasurement;
@@ -148,6 +149,7 @@ public class KafkaEmitterTest {
     assertEquals(measurement.getPeriod().getEndTimeMillis(),      actual.get("period.end"));
     assertEquals("profiler",                                      actual.get("source.type"));
     assertNotNull(actual.get("timestamp"));
+    assertNotNull(actual.get(Constants.GUID));
 
     // validate that the triage value has been added
     assertEquals(measurement.getTriageValues().get("triage-key"), actual.get("triage-key"));
@@ -214,6 +216,8 @@ public class KafkaEmitterTest {
     assertEquals(measurement.getPeriod().getStartTimeMillis(),    actual.get("period.start"));
     assertEquals(measurement.getPeriod().getEndTimeMillis(),      actual.get("period.end"));
     assertEquals("profiler",                                      actual.get("source.type"));
+    assertNotNull(actual.get("timestamp"));
+    assertNotNull(actual.get(Constants.GUID));
 
     // the invalid expression should be skipped and not included in the message
     assertFalse(actual.containsKey("invalid"));

http://git-wip-us.apache.org/repos/asf/metron/blob/7e222fa4/metron-analytics/metron-profiler-storm/src/test/java/org/apache/metron/profiler/storm/integration/ProfilerIntegrationTest.java
----------------------------------------------------------------------
diff --git a/metron-analytics/metron-profiler-storm/src/test/java/org/apache/metron/profiler/storm/integration/ProfilerIntegrationTest.java b/metron-analytics/metron-profiler-storm/src/test/java/org/apache/metron/profiler/storm/integration/ProfilerIntegrationTest.java
index 4389d42..64a1482 100644
--- a/metron-analytics/metron-profiler-storm/src/test/java/org/apache/metron/profiler/storm/integration/ProfilerIntegrationTest.java
+++ b/metron-analytics/metron-profiler-storm/src/test/java/org/apache/metron/profiler/storm/integration/ProfilerIntegrationTest.java
@@ -40,6 +40,8 @@ import org.apache.metron.stellar.common.StellarStatefulExecutor;
 import org.apache.metron.stellar.dsl.Context;
 import org.apache.metron.stellar.dsl.functions.resolver.SimpleFunctionResolver;
 import org.apache.storm.Config;
+import org.json.simple.JSONObject;
+import org.json.simple.parser.JSONParser;
 import org.junit.After;
 import org.junit.AfterClass;
 import org.junit.Before;
@@ -49,7 +51,6 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.File;
-import java.io.UnsupportedEncodingException;
 import java.lang.invoke.MethodHandles;
 import java.util.Arrays;
 import java.util.Collections;
@@ -68,6 +69,13 @@ import static org.apache.metron.profiler.client.stellar.ProfilerClientConfig.PRO
 import static org.apache.metron.profiler.client.stellar.ProfilerClientConfig.PROFILER_PERIOD;
 import static org.apache.metron.profiler.client.stellar.ProfilerClientConfig.PROFILER_PERIOD_UNITS;
 import static org.apache.metron.profiler.client.stellar.ProfilerClientConfig.PROFILER_SALT_DIVISOR;
+import static org.apache.metron.profiler.storm.KafkaEmitter.ALERT_FIELD;
+import static org.apache.metron.profiler.storm.KafkaEmitter.ENTITY_FIELD;
+import static org.apache.metron.profiler.storm.KafkaEmitter.PERIOD_END_FIELD;
+import static org.apache.metron.profiler.storm.KafkaEmitter.PERIOD_ID_FIELD;
+import static org.apache.metron.profiler.storm.KafkaEmitter.PERIOD_START_FIELD;
+import static org.apache.metron.profiler.storm.KafkaEmitter.PROFILE_FIELD;
+import static org.apache.metron.profiler.storm.KafkaEmitter.TIMESTAMP_FIELD;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 
@@ -285,18 +293,35 @@ public class ProfilerIntegrationTest extends BaseIntegrationTest {
     assertTrue(results.get(0) instanceof OnlineStatisticsProvider);
   }
 
-  /**
-   * Generates an error message for if the byte comparison fails.
-   *
-   * @param expected The expected value.
-   * @param actual The actual value.
-   * @return
-   * @throws UnsupportedEncodingException
-   */
-  private String failMessage(byte[] expected, byte[] actual) throws UnsupportedEncodingException {
-    return String.format("expected '%s', got '%s'",
-              new String(expected, "UTF-8"),
-              new String(actual, "UTF-8"));
+  @Test
+  public void testProfileWithTriageResult() throws Exception {
+    uploadConfigToZookeeper(TEST_RESOURCES + "/config/zookeeper/triage-result");
+
+    // start the topology and write test messages to kafka
+    fluxComponent.submitTopology();
+    List<String> telemetry = FileUtils.readLines(new File("src/test/resources/telemetry.json"));
+    kafkaComponent.writeMessages(inputTopic, telemetry);
+
+    // wait until the triage message is output to kafka
+    waitOrTimeout(() -> kafkaComponent.readMessages(outputTopic).size() > 0, timeout(seconds(90)));
+
+    List<byte[]> outputMessages = kafkaComponent.readMessages(outputTopic);
+    assertEquals(1, outputMessages.size());
+
+    // validate the triage message
+    JSONObject message = (JSONObject) new JSONParser().parse(new String(outputMessages.get(0), "UTF-8"));
+    assertEquals("profile-with-triage", message.get(PROFILE_FIELD));
+    assertEquals("global",              message.get(ENTITY_FIELD));
+    assertEquals(76548935L,             message.get(PERIOD_ID_FIELD));
+    assertEquals(1530978700000L,        message.get(PERIOD_START_FIELD));
+    assertEquals(1530978720000L,        message.get(PERIOD_END_FIELD));
+    assertEquals("profiler",            message.get(Constants.SENSOR_TYPE));
+    assertEquals("true",                message.get(ALERT_FIELD));
+    assertEquals(1.0,                   message.get("min"));
+    assertEquals(1.0,                   message.get("max"));
+    assertEquals(1.0,                   message.get("mean"));
+    assertTrue(message.containsKey(TIMESTAMP_FIELD));
+    assertTrue(message.containsKey(Constants.GUID));
   }
 
   private static String getMessage(String ipSource, long timestamp) {
@@ -471,7 +496,6 @@ public class ProfilerIntegrationTest extends BaseIntegrationTest {
    */
   private <T> T execute(String expression, Class<T> clazz) {
     T results = executor.execute(expression, Collections.emptyMap(), clazz);
-
     LOG.debug("{} = {}", expression, results);
     return results;
   }