You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@metron.apache.org by ni...@apache.org on 2019/11/12 19:57:02 UTC

[metron] branch master updated: METRON-2284 Metron Profiler for Spark Doesn't Work as Expected (nickwallen) closes apache/metron#1556

This is an automated email from the ASF dual-hosted git repository.

nickallen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/metron.git


The following commit(s) were added to refs/heads/master by this push:
     new 8686f52  METRON-2284 Metron Profiler for Spark Doesn't Work as Expected (nickwallen) closes apache/metron#1556
8686f52 is described below

commit 8686f5265e68a5fb861f944b4b3b086c807618f2
Author: nickwallen <ni...@apache.org>
AuthorDate: Tue Nov 12 14:56:35 2019 -0500

    METRON-2284 Metron Profiler for Spark Doesn't Work as Expected (nickwallen) closes apache/metron#1556
---
 .../metron/profiler/spark/BatchProfiler.java       |  4 +--
 .../spark/BatchProfilerIntegrationTest.java        | 29 +++++++++++++++++++++-
 2 files changed, 30 insertions(+), 3 deletions(-)

diff --git a/metron-analytics/metron-profiler-spark/src/main/java/org/apache/metron/profiler/spark/BatchProfiler.java b/metron-analytics/metron-profiler-spark/src/main/java/org/apache/metron/profiler/spark/BatchProfiler.java
index 571545e..43b42be 100644
--- a/metron-analytics/metron-profiler-spark/src/main/java/org/apache/metron/profiler/spark/BatchProfiler.java
+++ b/metron-analytics/metron-profiler-spark/src/main/java/org/apache/metron/profiler/spark/BatchProfiler.java
@@ -87,13 +87,13 @@ public class BatchProfiler implements Serializable {
 
     // find all routes for each message
     Dataset<MessageRoute> routes = telemetry
-            .flatMap(messageRouterFunction(profilerProps, profiles, globals), Encoders.bean(MessageRoute.class));
+            .flatMap(messageRouterFunction(profilerProps, profiles, globals), Encoders.kryo(MessageRoute.class));
     LOG.debug("Generated {} message route(s)", routes.cache().count());
 
     // build the profiles
     Dataset<ProfileMeasurementAdapter> measurements = routes
             .groupByKey(new GroupByPeriodFunction(profilerProps), Encoders.STRING())
-            .mapGroups(new ProfileBuilderFunction(profilerProps, globals), Encoders.bean(ProfileMeasurementAdapter.class));
+            .mapGroups(new ProfileBuilderFunction(profilerProps, globals), Encoders.kryo(ProfileMeasurementAdapter.class));
     LOG.debug("Produced {} profile measurement(s)", measurements.cache().count());
 
     // write the profile measurements to HBase
diff --git a/metron-analytics/metron-profiler-spark/src/test/java/org/apache/metron/profiler/spark/BatchProfilerIntegrationTest.java b/metron-analytics/metron-profiler-spark/src/test/java/org/apache/metron/profiler/spark/BatchProfilerIntegrationTest.java
index b36cf8c..72fd283 100644
--- a/metron-analytics/metron-profiler-spark/src/test/java/org/apache/metron/profiler/spark/BatchProfilerIntegrationTest.java
+++ b/metron-analytics/metron-profiler-spark/src/test/java/org/apache/metron/profiler/spark/BatchProfilerIntegrationTest.java
@@ -19,19 +19,24 @@
  */
 package org.apache.metron.profiler.spark;
 
+import com.google.common.collect.Maps;
 import org.adrianwalker.multilinestring.Multiline;
 import org.apache.metron.hbase.mock.MockHBaseTableProvider;
+import org.apache.metron.profiler.MessageRoute;
 import org.apache.metron.profiler.client.stellar.FixedLookback;
 import org.apache.metron.profiler.client.stellar.GetProfile;
 import org.apache.metron.profiler.client.stellar.WindowLookback;
+import org.apache.metron.profiler.spark.function.MessageRouterFunction;
 import org.apache.metron.stellar.common.DefaultStellarStatefulExecutor;
 import org.apache.metron.stellar.common.StellarStatefulExecutor;
 import org.apache.metron.stellar.dsl.Context;
 import org.apache.metron.stellar.dsl.functions.resolver.SimpleFunctionResolver;
 import org.apache.spark.SparkConf;
 import org.apache.spark.SparkException;
+import org.apache.spark.sql.Dataset;
 import org.apache.spark.sql.Encoders;
 import org.apache.spark.sql.SparkSession;
+import org.json.simple.JSONObject;
 import org.junit.AfterClass;
 import org.junit.Before;
 import org.junit.BeforeClass;
@@ -41,7 +46,9 @@ import org.junit.rules.TemporaryFolder;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.IOException;
 import java.lang.invoke.MethodHandles;
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
@@ -62,6 +69,7 @@ import static org.apache.metron.profiler.spark.BatchProfilerConfig.TELEMETRY_INP
 import static org.apache.metron.profiler.spark.reader.TelemetryReaders.JSON;
 import static org.apache.metron.profiler.spark.reader.TelemetryReaders.ORC;
 import static org.apache.metron.profiler.spark.reader.TelemetryReaders.PARQUET;
+import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 
 /**
@@ -145,6 +153,14 @@ public class BatchProfilerIntegrationTest {
    *        "init": { "count": 0 },
    *        "update": { "count": "count + 1" },
    *        "result": "count"
+   *      },
+   *      {
+   *        "profile": "response-body-len",
+   *        "onlyif": "exists(response_body_len)",
+   *        "foreach": "ip_src_addr",
+   *        "init": { "len": 0 },
+   *        "update": { "len": "len + response_body_len" },
+   *        "result": "TO_INTEGER(len)"
    *      }
    *   ]
    * }
@@ -345,7 +361,15 @@ public class BatchProfilerIntegrationTest {
     *        "init": { "count": "STATS_INIT()" },
     *        "update": { "count": "STATS_ADD(count, 1)" },
     *        "result": "TO_INTEGER(STATS_COUNT(count))"
-    *      }
+    *      },
+    *      {
+    *        "profile": "response-body-len",
+    *        "onlyif": "exists(response_body_len)",
+    *        "foreach": "ip_src_addr",
+    *        "init": { "len": "STATS_INIT()" },
+    *        "update": { "len": "STATS_ADD(len, response_body_len)" },
+    *        "result": "TO_INTEGER(STATS_SUM(len))"
+    *       }
     *   ]
     * }
     */
@@ -386,6 +410,9 @@ public class BatchProfilerIntegrationTest {
 
     // there are 100 messages in all
     assertTrue(execute("[100] == PROFILE_GET('total-count', 'total', window)", Boolean.class));
+
+    // check the sum of the `response_body_len` field
+    assertTrue(execute("[1029726] == PROFILE_GET('response-body-len', '192.168.138.158', window)", Boolean.class));
   }
 
   private Properties getGlobals() {