You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@metron.apache.org by ni...@apache.org on 2018/08/28 20:42:06 UTC
metron git commit: METRON-1736 Enhance Batch Profiler Integration
Test (nickwallen) closes apache/metron#1162
Repository: metron
Updated Branches:
refs/heads/feature/METRON-1699-create-batch-profiler c7a3dc230 -> 3cb088c06
METRON-1736 Enhance Batch Profiler Integration Test (nickwallen) closes apache/metron#1162
Project: http://git-wip-us.apache.org/repos/asf/metron/repo
Commit: http://git-wip-us.apache.org/repos/asf/metron/commit/3cb088c0
Tree: http://git-wip-us.apache.org/repos/asf/metron/tree/3cb088c0
Diff: http://git-wip-us.apache.org/repos/asf/metron/diff/3cb088c0
Branch: refs/heads/feature/METRON-1699-create-batch-profiler
Commit: 3cb088c06e3b5749cf32c37dada395aff9ea41d2
Parents: c7a3dc2
Author: nickwallen <ni...@nickallen.org>
Authored: Tue Aug 28 16:41:47 2018 -0400
Committer: nickallen <ni...@apache.org>
Committed: Tue Aug 28 16:41:47 2018 -0400
----------------------------------------------------------------------
metron-analytics/metron-profiler-spark/pom.xml | 6 +
.../spark/BatchProfilerIntegrationTest.java | 132 +++++++++++++++----
2 files changed, 116 insertions(+), 22 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/metron/blob/3cb088c0/metron-analytics/metron-profiler-spark/pom.xml
----------------------------------------------------------------------
diff --git a/metron-analytics/metron-profiler-spark/pom.xml b/metron-analytics/metron-profiler-spark/pom.xml
index 2d5ec98..387dce4 100644
--- a/metron-analytics/metron-profiler-spark/pom.xml
+++ b/metron-analytics/metron-profiler-spark/pom.xml
@@ -50,6 +50,12 @@
</dependency>
<dependency>
<groupId>org.apache.metron</groupId>
+ <artifactId>metron-profiler-client</artifactId>
+ <version>${project.parent.version}</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.metron</groupId>
<artifactId>metron-common</artifactId>
<version>${project.parent.version}</version>
<exclusions>
http://git-wip-us.apache.org/repos/asf/metron/blob/3cb088c0/metron-analytics/metron-profiler-spark/src/test/java/org/apache/metron/profiler/spark/BatchProfilerIntegrationTest.java
----------------------------------------------------------------------
diff --git a/metron-analytics/metron-profiler-spark/src/test/java/org/apache/metron/profiler/spark/BatchProfilerIntegrationTest.java b/metron-analytics/metron-profiler-spark/src/test/java/org/apache/metron/profiler/spark/BatchProfilerIntegrationTest.java
index f560740..376623c 100644
--- a/metron-analytics/metron-profiler-spark/src/test/java/org/apache/metron/profiler/spark/BatchProfilerIntegrationTest.java
+++ b/metron-analytics/metron-profiler-spark/src/test/java/org/apache/metron/profiler/spark/BatchProfilerIntegrationTest.java
@@ -19,11 +19,16 @@
*/
package org.apache.metron.profiler.spark;
-import org.apache.hadoop.hbase.client.Put;
-import org.apache.metron.common.configuration.profiler.ProfileConfig;
+import org.adrianwalker.multilinestring.Multiline;
import org.apache.metron.common.configuration.profiler.ProfilerConfig;
import org.apache.metron.hbase.mock.MockHBaseTableProvider;
-import org.apache.metron.hbase.mock.MockHTable;
+import org.apache.metron.profiler.client.stellar.FixedLookback;
+import org.apache.metron.profiler.client.stellar.GetProfile;
+import org.apache.metron.profiler.client.stellar.WindowLookback;
+import org.apache.metron.stellar.common.DefaultStellarStatefulExecutor;
+import org.apache.metron.stellar.common.StellarStatefulExecutor;
+import org.apache.metron.stellar.dsl.Context;
+import org.apache.metron.stellar.dsl.functions.resolver.SimpleFunctionResolver;
import org.apache.spark.SparkConf;
import org.apache.spark.sql.SparkSession;
import org.junit.AfterClass;
@@ -31,21 +36,53 @@ import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
-import java.util.List;
+import java.io.IOException;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
import java.util.Properties;
+import static org.apache.metron.profiler.client.stellar.ProfilerClientConfig.PROFILER_COLUMN_FAMILY;
+import static org.apache.metron.profiler.client.stellar.ProfilerClientConfig.PROFILER_HBASE_TABLE;
+import static org.apache.metron.profiler.client.stellar.ProfilerClientConfig.PROFILER_HBASE_TABLE_PROVIDER;
import static org.apache.metron.profiler.spark.BatchProfilerConfig.HBASE_COLUMN_FAMILY;
import static org.apache.metron.profiler.spark.BatchProfilerConfig.HBASE_TABLE_NAME;
import static org.apache.metron.profiler.spark.BatchProfilerConfig.HBASE_TABLE_PROVIDER;
import static org.apache.metron.profiler.spark.BatchProfilerConfig.TELEMETRY_INPUT_FORMAT;
import static org.apache.metron.profiler.spark.BatchProfilerConfig.TELEMETRY_INPUT_PATH;
-import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+/**
+ * An integration test for the {@link BatchProfiler}.
+ */
public class BatchProfilerIntegrationTest {
+ /**
+ * {
+ * "timestampField": "timestamp",
+ * "profiles": [
+ * {
+ * "profile": "count-by-ip",
+ * "foreach": "ip_src_addr",
+ * "init": { "count": 0 },
+ * "update": { "count" : "count + 1" },
+ * "result": "count"
+ * },
+ * {
+ * "profile": "total-count",
+ * "foreach": "'total'",
+ * "init": { "count": 0 },
+ * "update": { "count": "count + 1" },
+ * "result": "count"
+ * }
+ * ]
+ * }
+ */
+ @Multiline
+ private static String profileJson;
private static SparkSession spark;
- private MockHTable profilerTable;
private Properties profilerProperties;
+ private StellarStatefulExecutor executor;
@BeforeClass
public static void setupSpark() {
@@ -70,42 +107,93 @@ public class BatchProfilerIntegrationTest {
public void setup() {
profilerProperties = new Properties();
- // define the source of the input telemetry
+ // the input telemetry is read from the local filesystem
profilerProperties.put(TELEMETRY_INPUT_PATH.getKey(), "src/test/resources/telemetry.json");
profilerProperties.put(TELEMETRY_INPUT_FORMAT.getKey(), "text");
- // define where the output will go
+ // the output will be written to a mock HBase table
String tableName = HBASE_TABLE_NAME.get(profilerProperties, String.class);
String columnFamily = HBASE_COLUMN_FAMILY.get(profilerProperties, String.class);
profilerProperties.put(HBASE_TABLE_PROVIDER.getKey(), MockHBaseTableProvider.class.getName());
// create the mock hbase table
- profilerTable = (MockHTable) MockHBaseTableProvider.addToCache(tableName, columnFamily);
+ MockHBaseTableProvider.addToCache(tableName, columnFamily);
+
+ // define the globals required by `PROFILE_GET`
+ Map<String, Object> global = new HashMap<String, Object>() {{
+ put(PROFILER_HBASE_TABLE.getKey(), tableName);
+ put(PROFILER_COLUMN_FAMILY.getKey(), columnFamily);
+ put(PROFILER_HBASE_TABLE_PROVIDER.getKey(), MockHBaseTableProvider.class.getName());
+ }};
+
+ // create the stellar execution environment
+ executor = new DefaultStellarStatefulExecutor(
+ new SimpleFunctionResolver()
+ .withClass(GetProfile.class)
+ .withClass(FixedLookback.class)
+ .withClass(WindowLookback.class),
+ new Context.Builder()
+ .with(Context.Capabilities.GLOBAL_CONFIG, () -> global)
+ .build());
}
+ /**
+ * This test uses the Batch Profiler to seed two profiles using archived telemetry.
+ *
+ * The first profile counts the number of messages by 'ip_src_addr'. The second profile counts the total number
+ * of messages.
+ *
+ * The archived telemetry contains timestamps from around July 7, 2018. All of the measurements
+ * produced will center around this date.
+ */
@Test
- public void testBatchProfiler() {
-
+ public void testBatchProfiler() throws Exception {
// run the batch profiler
BatchProfiler profiler = new BatchProfiler();
profiler.run(spark, profilerProperties, getGlobals(), getProfile());
- List<Put> puts = profilerTable.getPutLog();
- assertEquals(2, puts.size());
- }
+ // validate the measurements written by the batch profiler using `PROFILE_GET`
+ // the 'window' looks up to 5 hours before the last timestamp contained in the telemetry
+ assign("lastTimestamp", "1530978728982L");
+ assign("window", "PROFILE_WINDOW('from 5 hours ago', lastTimestamp)");
+
+ // there are 26 messages where ip_src_addr = 192.168.66.1
+ assertTrue(execute("[26] == PROFILE_GET('count-by-ip', '192.168.66.1', window)", Boolean.class));
+
+ // there are 74 messages where ip_src_addr = 192.168.138.158
+ assertTrue(execute("[74] == PROFILE_GET('count-by-ip', '192.168.138.158', window)", Boolean.class));
+ // there are 100 messages in all
+ assertTrue(execute("[100] == PROFILE_GET('total-count', 'total', window)", Boolean.class));
+ }
- private ProfilerConfig getProfile() {
- ProfileConfig profile = new ProfileConfig()
- .withProfile("profile1")
- .withForeach("ip_src_addr")
- .withUpdate("count", "count + 1")
- .withResult("count");
- return new ProfilerConfig()
- .withProfile(profile);
+ private ProfilerConfig getProfile() throws IOException {
+ return ProfilerConfig.fromJSON(profileJson);
}
private Properties getGlobals() {
return new Properties();
}
+
+ /**
+ * Assign a value to the result of an expression.
+ *
+ * @param var The variable to assign.
+ * @param expression The expression to execute.
+ */
+ private void assign(String var, String expression) {
+ executor.assign(var, expression, Collections.emptyMap());
+ }
+
+ /**
+ * Execute a Stellar expression.
+ *
+ * @param expression The Stellar expression to execute.
+ * @param clazz
+ * @param <T>
+ * @return The result of executing the Stellar expression.
+ */
+ private <T> T execute(String expression, Class<T> clazz) {
+ return executor.execute(expression, Collections.emptyMap(), clazz);
+ }
}