You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@metron.apache.org by ni...@apache.org on 2019/11/12 19:57:02 UTC
[metron] branch master updated: METRON-2284 Metron Profiler for
Spark Doesn't Work as Expected (nickwallen) closes apache/metron#1556
This is an automated email from the ASF dual-hosted git repository.
nickallen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/metron.git
The following commit(s) were added to refs/heads/master by this push:
new 8686f52 METRON-2284 Metron Profiler for Spark Doesn't Work as Expected (nickwallen) closes apache/metron#1556
8686f52 is described below
commit 8686f5265e68a5fb861f944b4b3b086c807618f2
Author: nickwallen <ni...@apache.org>
AuthorDate: Tue Nov 12 14:56:35 2019 -0500
METRON-2284 Metron Profiler for Spark Doesn't Work as Expected (nickwallen) closes apache/metron#1556
---
.../metron/profiler/spark/BatchProfiler.java | 4 +--
.../spark/BatchProfilerIntegrationTest.java | 29 +++++++++++++++++++++-
2 files changed, 30 insertions(+), 3 deletions(-)
diff --git a/metron-analytics/metron-profiler-spark/src/main/java/org/apache/metron/profiler/spark/BatchProfiler.java b/metron-analytics/metron-profiler-spark/src/main/java/org/apache/metron/profiler/spark/BatchProfiler.java
index 571545e..43b42be 100644
--- a/metron-analytics/metron-profiler-spark/src/main/java/org/apache/metron/profiler/spark/BatchProfiler.java
+++ b/metron-analytics/metron-profiler-spark/src/main/java/org/apache/metron/profiler/spark/BatchProfiler.java
@@ -87,13 +87,13 @@ public class BatchProfiler implements Serializable {
// find all routes for each message
Dataset<MessageRoute> routes = telemetry
- .flatMap(messageRouterFunction(profilerProps, profiles, globals), Encoders.bean(MessageRoute.class));
+ .flatMap(messageRouterFunction(profilerProps, profiles, globals), Encoders.kryo(MessageRoute.class));
LOG.debug("Generated {} message route(s)", routes.cache().count());
// build the profiles
Dataset<ProfileMeasurementAdapter> measurements = routes
.groupByKey(new GroupByPeriodFunction(profilerProps), Encoders.STRING())
- .mapGroups(new ProfileBuilderFunction(profilerProps, globals), Encoders.bean(ProfileMeasurementAdapter.class));
+ .mapGroups(new ProfileBuilderFunction(profilerProps, globals), Encoders.kryo(ProfileMeasurementAdapter.class));
LOG.debug("Produced {} profile measurement(s)", measurements.cache().count());
// write the profile measurements to HBase
diff --git a/metron-analytics/metron-profiler-spark/src/test/java/org/apache/metron/profiler/spark/BatchProfilerIntegrationTest.java b/metron-analytics/metron-profiler-spark/src/test/java/org/apache/metron/profiler/spark/BatchProfilerIntegrationTest.java
index b36cf8c..72fd283 100644
--- a/metron-analytics/metron-profiler-spark/src/test/java/org/apache/metron/profiler/spark/BatchProfilerIntegrationTest.java
+++ b/metron-analytics/metron-profiler-spark/src/test/java/org/apache/metron/profiler/spark/BatchProfilerIntegrationTest.java
@@ -19,19 +19,24 @@
*/
package org.apache.metron.profiler.spark;
+import com.google.common.collect.Maps;
import org.adrianwalker.multilinestring.Multiline;
import org.apache.metron.hbase.mock.MockHBaseTableProvider;
+import org.apache.metron.profiler.MessageRoute;
import org.apache.metron.profiler.client.stellar.FixedLookback;
import org.apache.metron.profiler.client.stellar.GetProfile;
import org.apache.metron.profiler.client.stellar.WindowLookback;
+import org.apache.metron.profiler.spark.function.MessageRouterFunction;
import org.apache.metron.stellar.common.DefaultStellarStatefulExecutor;
import org.apache.metron.stellar.common.StellarStatefulExecutor;
import org.apache.metron.stellar.dsl.Context;
import org.apache.metron.stellar.dsl.functions.resolver.SimpleFunctionResolver;
import org.apache.spark.SparkConf;
import org.apache.spark.SparkException;
+import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.SparkSession;
+import org.json.simple.JSONObject;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
@@ -41,7 +46,9 @@ import org.junit.rules.TemporaryFolder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.io.IOException;
import java.lang.invoke.MethodHandles;
+import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
@@ -62,6 +69,7 @@ import static org.apache.metron.profiler.spark.BatchProfilerConfig.TELEMETRY_INP
import static org.apache.metron.profiler.spark.reader.TelemetryReaders.JSON;
import static org.apache.metron.profiler.spark.reader.TelemetryReaders.ORC;
import static org.apache.metron.profiler.spark.reader.TelemetryReaders.PARQUET;
+import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
/**
@@ -145,6 +153,14 @@ public class BatchProfilerIntegrationTest {
* "init": { "count": 0 },
* "update": { "count": "count + 1" },
* "result": "count"
+ * },
+ * {
+ * "profile": "response-body-len",
+ * "onlyif": "exists(response_body_len)",
+ * "foreach": "ip_src_addr",
+ * "init": { "len": 0 },
+ * "update": { "len": "len + response_body_len" },
+ * "result": "TO_INTEGER(len)"
* }
* ]
* }
@@ -345,7 +361,15 @@ public class BatchProfilerIntegrationTest {
* "init": { "count": "STATS_INIT()" },
* "update": { "count": "STATS_ADD(count, 1)" },
* "result": "TO_INTEGER(STATS_COUNT(count))"
- * }
+ * },
+ * {
+ * "profile": "response-body-len",
+ * "onlyif": "exists(response_body_len)",
+ * "foreach": "ip_src_addr",
+ * "init": { "len": "STATS_INIT()" },
+ * "update": { "len": "STATS_ADD(len, response_body_len)" },
+ * "result": "TO_INTEGER(STATS_SUM(len))"
+ * }
* ]
* }
*/
@@ -386,6 +410,9 @@ public class BatchProfilerIntegrationTest {
// there are 100 messages in all
assertTrue(execute("[100] == PROFILE_GET('total-count', 'total', window)", Boolean.class));
+
+ // check the sum of the `response_body_len` field
+ assertTrue(execute("[1029726] == PROFILE_GET('response-body-len', '192.168.138.158', window)", Boolean.class));
}
private Properties getGlobals() {