You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@metron.apache.org by ni...@apache.org on 2018/08/28 20:42:06 UTC

metron git commit: METRON-1736 Enhance Batch Profiler Integration Test (nickwallen) closes apache/metron#1162

Repository: metron
Updated Branches:
  refs/heads/feature/METRON-1699-create-batch-profiler c7a3dc230 -> 3cb088c06


METRON-1736 Enhance Batch Profiler Integration Test (nickwallen) closes apache/metron#1162


Project: http://git-wip-us.apache.org/repos/asf/metron/repo
Commit: http://git-wip-us.apache.org/repos/asf/metron/commit/3cb088c0
Tree: http://git-wip-us.apache.org/repos/asf/metron/tree/3cb088c0
Diff: http://git-wip-us.apache.org/repos/asf/metron/diff/3cb088c0

Branch: refs/heads/feature/METRON-1699-create-batch-profiler
Commit: 3cb088c06e3b5749cf32c37dada395aff9ea41d2
Parents: c7a3dc2
Author: nickwallen <ni...@nickallen.org>
Authored: Tue Aug 28 16:41:47 2018 -0400
Committer: nickallen <ni...@apache.org>
Committed: Tue Aug 28 16:41:47 2018 -0400

----------------------------------------------------------------------
 metron-analytics/metron-profiler-spark/pom.xml  |   6 +
 .../spark/BatchProfilerIntegrationTest.java     | 132 +++++++++++++++----
 2 files changed, 116 insertions(+), 22 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/metron/blob/3cb088c0/metron-analytics/metron-profiler-spark/pom.xml
----------------------------------------------------------------------
diff --git a/metron-analytics/metron-profiler-spark/pom.xml b/metron-analytics/metron-profiler-spark/pom.xml
index 2d5ec98..387dce4 100644
--- a/metron-analytics/metron-profiler-spark/pom.xml
+++ b/metron-analytics/metron-profiler-spark/pom.xml
@@ -50,6 +50,12 @@
         </dependency>
         <dependency>
             <groupId>org.apache.metron</groupId>
+            <artifactId>metron-profiler-client</artifactId>
+            <version>${project.parent.version}</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.metron</groupId>
             <artifactId>metron-common</artifactId>
             <version>${project.parent.version}</version>
             <exclusions>

http://git-wip-us.apache.org/repos/asf/metron/blob/3cb088c0/metron-analytics/metron-profiler-spark/src/test/java/org/apache/metron/profiler/spark/BatchProfilerIntegrationTest.java
----------------------------------------------------------------------
diff --git a/metron-analytics/metron-profiler-spark/src/test/java/org/apache/metron/profiler/spark/BatchProfilerIntegrationTest.java b/metron-analytics/metron-profiler-spark/src/test/java/org/apache/metron/profiler/spark/BatchProfilerIntegrationTest.java
index f560740..376623c 100644
--- a/metron-analytics/metron-profiler-spark/src/test/java/org/apache/metron/profiler/spark/BatchProfilerIntegrationTest.java
+++ b/metron-analytics/metron-profiler-spark/src/test/java/org/apache/metron/profiler/spark/BatchProfilerIntegrationTest.java
@@ -19,11 +19,16 @@
  */
 package org.apache.metron.profiler.spark;
 
-import org.apache.hadoop.hbase.client.Put;
-import org.apache.metron.common.configuration.profiler.ProfileConfig;
+import org.adrianwalker.multilinestring.Multiline;
 import org.apache.metron.common.configuration.profiler.ProfilerConfig;
 import org.apache.metron.hbase.mock.MockHBaseTableProvider;
-import org.apache.metron.hbase.mock.MockHTable;
+import org.apache.metron.profiler.client.stellar.FixedLookback;
+import org.apache.metron.profiler.client.stellar.GetProfile;
+import org.apache.metron.profiler.client.stellar.WindowLookback;
+import org.apache.metron.stellar.common.DefaultStellarStatefulExecutor;
+import org.apache.metron.stellar.common.StellarStatefulExecutor;
+import org.apache.metron.stellar.dsl.Context;
+import org.apache.metron.stellar.dsl.functions.resolver.SimpleFunctionResolver;
 import org.apache.spark.SparkConf;
 import org.apache.spark.sql.SparkSession;
 import org.junit.AfterClass;
@@ -31,21 +36,53 @@ import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.Test;
 
-import java.util.List;
+import java.io.IOException;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
 import java.util.Properties;
 
+import static org.apache.metron.profiler.client.stellar.ProfilerClientConfig.PROFILER_COLUMN_FAMILY;
+import static org.apache.metron.profiler.client.stellar.ProfilerClientConfig.PROFILER_HBASE_TABLE;
+import static org.apache.metron.profiler.client.stellar.ProfilerClientConfig.PROFILER_HBASE_TABLE_PROVIDER;
 import static org.apache.metron.profiler.spark.BatchProfilerConfig.HBASE_COLUMN_FAMILY;
 import static org.apache.metron.profiler.spark.BatchProfilerConfig.HBASE_TABLE_NAME;
 import static org.apache.metron.profiler.spark.BatchProfilerConfig.HBASE_TABLE_PROVIDER;
 import static org.apache.metron.profiler.spark.BatchProfilerConfig.TELEMETRY_INPUT_FORMAT;
 import static org.apache.metron.profiler.spark.BatchProfilerConfig.TELEMETRY_INPUT_PATH;
-import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
 
+/**
+ * An integration test for the {@link BatchProfiler}.
+ */
 public class BatchProfilerIntegrationTest {
 
+  /**
+   * {
+   *   "timestampField": "timestamp",
+   *   "profiles": [
+   *      {
+   *        "profile": "count-by-ip",
+   *        "foreach": "ip_src_addr",
+   *        "init": { "count": 0 },
+   *        "update": { "count" : "count + 1" },
+   *        "result": "count"
+   *      },
+   *      {
+   *        "profile": "total-count",
+   *        "foreach": "'total'",
+   *        "init": { "count": 0 },
+   *        "update": { "count": "count + 1" },
+   *        "result": "count"
+   *      }
+   *   ]
+   * }
+   */
+  @Multiline
+  private static String profileJson;
   private static SparkSession spark;
-  private MockHTable profilerTable;
   private Properties profilerProperties;
+  private StellarStatefulExecutor executor;
 
   @BeforeClass
   public static void setupSpark() {
@@ -70,42 +107,93 @@ public class BatchProfilerIntegrationTest {
   public void setup() {
     profilerProperties = new Properties();
 
-    // define the source of the input telemetry
+    // the input telemetry is read from the local filesystem
     profilerProperties.put(TELEMETRY_INPUT_PATH.getKey(), "src/test/resources/telemetry.json");
     profilerProperties.put(TELEMETRY_INPUT_FORMAT.getKey(), "text");
 
-    // define where the output will go
+    // the output will be written to a mock HBase table
     String tableName = HBASE_TABLE_NAME.get(profilerProperties, String.class);
     String columnFamily = HBASE_COLUMN_FAMILY.get(profilerProperties, String.class);
     profilerProperties.put(HBASE_TABLE_PROVIDER.getKey(), MockHBaseTableProvider.class.getName());
 
     // create the mock hbase table
-    profilerTable = (MockHTable) MockHBaseTableProvider.addToCache(tableName, columnFamily);
+    MockHBaseTableProvider.addToCache(tableName, columnFamily);
+
+    // define the globals required by `PROFILE_GET`
+    Map<String, Object> global = new HashMap<String, Object>() {{
+      put(PROFILER_HBASE_TABLE.getKey(), tableName);
+      put(PROFILER_COLUMN_FAMILY.getKey(), columnFamily);
+      put(PROFILER_HBASE_TABLE_PROVIDER.getKey(), MockHBaseTableProvider.class.getName());
+    }};
+
+    // create the stellar execution environment
+    executor = new DefaultStellarStatefulExecutor(
+            new SimpleFunctionResolver()
+                    .withClass(GetProfile.class)
+                    .withClass(FixedLookback.class)
+                    .withClass(WindowLookback.class),
+            new Context.Builder()
+                    .with(Context.Capabilities.GLOBAL_CONFIG, () -> global)
+                    .build());
   }
 
+  /**
+   * This test uses the Batch Profiler to seed two profiles using archived telemetry.
+   *
+   * The first profile counts the number of messages by 'ip_src_addr'.  The second profile counts the total number
+   * of messages.
+   *
+   * The archived telemetry contains timestamps from around July 7, 2018.  All of the measurements
+   * produced will center around this date.
+   */
   @Test
-  public void testBatchProfiler() {
-
+  public void testBatchProfiler() throws Exception {
     // run the batch profiler
     BatchProfiler profiler = new BatchProfiler();
     profiler.run(spark, profilerProperties, getGlobals(), getProfile());
 
-    List<Put> puts = profilerTable.getPutLog();
-    assertEquals(2, puts.size());
-  }
+    // validate the measurements written by the batch profiler using `PROFILE_GET`
+    // the 'window' looks up to 5 hours before the last timestamp contained in the telemetry
+    assign("lastTimestamp", "1530978728982L");
+    assign("window", "PROFILE_WINDOW('from 5 hours ago', lastTimestamp)");
+
+    // there are 26 messages where ip_src_addr = 192.168.66.1
+    assertTrue(execute("[26] == PROFILE_GET('count-by-ip', '192.168.66.1', window)", Boolean.class));
+
+    // there are 74 messages where ip_src_addr = 192.168.138.158
+    assertTrue(execute("[74] == PROFILE_GET('count-by-ip', '192.168.138.158', window)", Boolean.class));
 
+    // there are 100 messages in all
+    assertTrue(execute("[100] == PROFILE_GET('total-count', 'total', window)", Boolean.class));
+  }
 
-  private ProfilerConfig getProfile() {
-    ProfileConfig profile = new ProfileConfig()
-            .withProfile("profile1")
-            .withForeach("ip_src_addr")
-            .withUpdate("count", "count + 1")
-            .withResult("count");
-    return new ProfilerConfig()
-            .withProfile(profile);
+  private ProfilerConfig getProfile() throws IOException {
+    return ProfilerConfig.fromJSON(profileJson);
   }
 
   private Properties getGlobals() {
     return new Properties();
   }
+
+  /**
+   * Assign a value to the result of an expression.
+   *
+   * @param var The variable to assign.
+   * @param expression The expression to execute.
+   */
+  private void assign(String var, String expression) {
+    executor.assign(var, expression, Collections.emptyMap());
+  }
+
+  /**
+   * Execute a Stellar expression.
+   *
+   * @param expression The Stellar expression to execute.
+   * @param clazz
+   * @param <T>
+   * @return The result of executing the Stellar expression.
+   */
+  private <T> T execute(String expression, Class<T> clazz) {
+    return executor.execute(expression, Collections.emptyMap(), clazz);
+  }
 }