You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@metron.apache.org by ot...@apache.org on 2018/10/24 15:03:58 UTC

[09/51] [abbrv] metron git commit: METRON-1772 Support alternative input formats in the Batch Profiler (nickwallen) closes apache/metron#1191

METRON-1772 Support alternative input formats in the Batch Profiler (nickwallen) closes apache/metron#1191


Project: http://git-wip-us.apache.org/repos/asf/metron/repo
Commit: http://git-wip-us.apache.org/repos/asf/metron/commit/1545978e
Tree: http://git-wip-us.apache.org/repos/asf/metron/tree/1545978e
Diff: http://git-wip-us.apache.org/repos/asf/metron/diff/1545978e

Branch: refs/heads/feature/METRON-1090-stellar-assignment
Commit: 1545978e169a01e4a06735b8713c8fa65373a394
Parents: f83f0ac
Author: nickwallen <ni...@nickallen.org>
Authored: Wed Sep 19 10:11:28 2018 -0400
Committer: nickallen <ni...@apache.org>
Committed: Wed Sep 19 10:11:28 2018 -0400

----------------------------------------------------------------------
 .../metron-profiler-spark/README.md             | 47 +++++++++-
 metron-analytics/metron-profiler-spark/pom.xml  | 18 ++--
 .../metron/profiler/spark/BatchProfiler.java    | 21 +++--
 .../profiler/spark/cli/BatchProfilerCLI.java    | 40 +++++++--
 .../spark/cli/BatchProfilerCLIOptions.java      | 10 ++-
 .../spark/BatchProfilerIntegrationTest.java     | 91 +++++++++++++++++---
 6 files changed, 189 insertions(+), 38 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/metron/blob/1545978e/metron-analytics/metron-profiler-spark/README.md
----------------------------------------------------------------------
diff --git a/metron-analytics/metron-profiler-spark/README.md b/metron-analytics/metron-profiler-spark/README.md
index 3d7017c..99e8c7e 100644
--- a/metron-analytics/metron-profiler-spark/README.md
+++ b/metron-analytics/metron-profiler-spark/README.md
@@ -131,6 +131,14 @@ The Batch Profiler requires Spark version 2.3.0+.
 
 ## Running the Profiler
 
+* [Usage](#usage)
+* [Advanced Usage](#advanced-usage)
+* [Spark Execution](#spark-execution)
+* [Kerberos](#kerberos)
+* [Input Formats](#input-formats)
+
+### Usage
+
 A script located at `$METRON_HOME/bin/start_batch_profiler.sh` has been provided to simplify running the Batch Profiler.  This script makes the following assumptions.
 
   * The script builds the profiles defined in `$METRON_HOME/config/zookeeper/profiler.json`.
@@ -156,11 +164,28 @@ The Batch Profiler accepts the following arguments when run from the command lin
 
 | Argument         | Description
 |---               |---
-| -p, --profiles   | The path to a file containing the profile definitions.
-| -c, --config     | The path to the profiler properties file.
-| -g, --globals    | The path to a properties file containing global properties.
+| -p, --profiles   | Path to the profile definitions.
+| -c, --config     | Path to the profiler properties file.
+| -g, --globals    | Path to the Stellar global config file.
+| -r, --reader     | Path to properties for the DataFrameReader.
 | -h, --help       | Print the help text.
 
+#### `--profiles`
+
+The path to a file containing the profile definition in JSON.
+
+#### `--config`
+
+The path to a file containing key-value properties for the Profiler. This file would contain the properties described under [Configuring the Profiler](#configuring-the-profiler).
+
+#### `--globals`
+
+The path to a file containing key-value properties that define the global properties. This can be used to customize how certain Stellar functions behave during execution.
+
+#### `--reader`
+
+The path to a file containing key-value properties that are passed to the DataFrameReader when reading the input telemetry. This allows additional customization for how the input telemetry is read.
+
 ### Spark Execution
 
 Spark supports a number of different [cluster managers](https://spark.apache.org/docs/latest/cluster-overview.html#cluster-manager-types).  The underlying cluster manager is transparent to the Profiler.  To run the Profiler on a particular cluster manager, it is just a matter of setting the appropriate options as defined in the Spark documentation.
@@ -191,10 +216,24 @@ The following command can be useful to review the logs generated when the Profil
 yarn logs -applicationId <application-id>
 ```
 
-#### Kerberos
+### Kerberos
 
 See the Spark documentation for information on running the Batch Profiler in a [secure, kerberized cluster](https://spark.apache.org/docs/latest/running-on-yarn.html#running-in-a-secure-cluster).
 
+### Input Formats
+
+The Profiler can consume archived telemetry stored in a variety of input formats.  By default, it is configured to consume the text/json that Metron archives in HDFS. This is often not the best format for archiving telemetry.  If you choose a different format, you should be able to configure the Profiler to consume it by doing the following.
+
+1. Edit [`profiler.batch.input.format`](#profilerbatchinputformat) and [`profiler.batch.input.path`](#profilerbatchinputpath) as needed.  For example, to read ORC you might do the following.
+
+  `$METRON_HOME/config/batch-profiler.properties`
+  ```
+  profiler.batch.input.format=org.apache.spark.sql.execution.datasources.orc
+  profiler.batch.input.path=hdfs://localhost:9000/apps/metron/indexing/orc/\*/\*
+  ```
+
+1. If additional options are required for your input format, then use the [`--reader`](#--reader) command-line argument when launching the Batch Profiler as [described here](#advanced-usage).
+
 
 ## Configuring the Profiler
 

http://git-wip-us.apache.org/repos/asf/metron/blob/1545978e/metron-analytics/metron-profiler-spark/pom.xml
----------------------------------------------------------------------
diff --git a/metron-analytics/metron-profiler-spark/pom.xml b/metron-analytics/metron-profiler-spark/pom.xml
index 587b38c..668ee2c 100644
--- a/metron-analytics/metron-profiler-spark/pom.xml
+++ b/metron-analytics/metron-profiler-spark/pom.xml
@@ -25,6 +25,7 @@
     <properties>
         <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
         <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
+        <spark_antlr_version>4.7</spark_antlr_version>
     </properties>
     <dependencies>
         <dependency>
@@ -36,12 +37,11 @@
             <groupId>org.apache.spark</groupId>
             <artifactId>spark-sql_2.11</artifactId>
             <version>${global_spark_version}</version>
-            <exclusions>
-                <exclusion>
-                    <groupId>org.antlr</groupId>
-                    <artifactId>antlr-runtime</artifactId>
-                </exclusion>
-            </exclusions>
+        </dependency>
+        <dependency>
+            <groupId>org.antlr</groupId>
+            <artifactId>antlr4-runtime</artifactId>
+            <version>${spark_antlr_version}</version>
         </dependency>
         <dependency>
             <groupId>org.apache.metron</groupId>
@@ -53,6 +53,12 @@
             <artifactId>metron-profiler-client</artifactId>
             <version>${project.parent.version}</version>
             <scope>test</scope>
+            <exclusions>
+                <exclusion>
+                    <groupId>org.antlr</groupId>
+                    <artifactId>antlr4-runtime</artifactId>
+                </exclusion>
+            </exclusions>
         </dependency>
         <dependency>
             <groupId>org.apache.metron</groupId>

http://git-wip-us.apache.org/repos/asf/metron/blob/1545978e/metron-analytics/metron-profiler-spark/src/main/java/org/apache/metron/profiler/spark/BatchProfiler.java
----------------------------------------------------------------------
diff --git a/metron-analytics/metron-profiler-spark/src/main/java/org/apache/metron/profiler/spark/BatchProfiler.java b/metron-analytics/metron-profiler-spark/src/main/java/org/apache/metron/profiler/spark/BatchProfiler.java
index f999613..d75abc3 100644
--- a/metron-analytics/metron-profiler-spark/src/main/java/org/apache/metron/profiler/spark/BatchProfiler.java
+++ b/metron-analytics/metron-profiler-spark/src/main/java/org/apache/metron/profiler/spark/BatchProfiler.java
@@ -34,6 +34,7 @@ import org.slf4j.LoggerFactory;
 
 import java.io.Serializable;
 import java.lang.invoke.MethodHandles;
+import java.util.HashMap;
 import java.util.Map;
 import java.util.Properties;
 
@@ -54,25 +55,29 @@ public class BatchProfiler implements Serializable {
    * Execute the Batch Profiler.
    *
    * @param spark The spark session.
-   * @param properties The profiler configuration properties.
+   * @param profilerProps The profiler configuration properties.
+   * @param globalProperties The Stellar global properties.
+   * @param readerProps The properties passed to the {@link org.apache.spark.sql.DataFrameReader}.
    * @param profiles The profile definitions.
    * @return The number of profile measurements produced.
    */
   public long run(SparkSession spark,
-                  Properties properties,
+                  Properties profilerProps,
                   Properties globalProperties,
+                  Properties readerProps,
                   ProfilerConfig profiles) {
 
     LOG.debug("Building {} profile(s)", profiles.getProfiles().size());
     Map<String, String> globals = Maps.fromProperties(globalProperties);
 
-    String inputFormat = TELEMETRY_INPUT_FORMAT.get(properties, String.class);
-    String inputPath = TELEMETRY_INPUT_PATH.get(properties, String.class);
+    String inputFormat = TELEMETRY_INPUT_FORMAT.get(profilerProps, String.class);
+    String inputPath = TELEMETRY_INPUT_PATH.get(profilerProps, String.class);
     LOG.debug("Loading telemetry from '{}'", inputPath);
 
     // fetch the archived telemetry
     Dataset<String> telemetry = spark
             .read()
+            .options(Maps.fromProperties(readerProps))
             .format(inputFormat)
             .load(inputPath)
             .as(Encoders.STRING());
@@ -85,13 +90,13 @@ public class BatchProfiler implements Serializable {
 
     // build the profiles
     Dataset<ProfileMeasurementAdapter> measurements = routes
-            .groupByKey(new GroupByPeriodFunction(properties), Encoders.STRING())
-            .mapGroups(new ProfileBuilderFunction(properties, globals), Encoders.bean(ProfileMeasurementAdapter.class));
+            .groupByKey(new GroupByPeriodFunction(profilerProps), Encoders.STRING())
+            .mapGroups(new ProfileBuilderFunction(profilerProps, globals), Encoders.bean(ProfileMeasurementAdapter.class));
     LOG.debug("Produced {} profile measurement(s)", measurements.cache().count());
 
     // write the profile measurements to HBase
     long count = measurements
-            .mapPartitions(new HBaseWriterFunction(properties), Encoders.INT())
+            .mapPartitions(new HBaseWriterFunction(profilerProps), Encoders.INT())
             .agg(sum("value"))
             .head()
             .getLong(0);
@@ -99,4 +104,4 @@ public class BatchProfiler implements Serializable {
 
     return count;
   }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/metron/blob/1545978e/metron-analytics/metron-profiler-spark/src/main/java/org/apache/metron/profiler/spark/cli/BatchProfilerCLI.java
----------------------------------------------------------------------
diff --git a/metron-analytics/metron-profiler-spark/src/main/java/org/apache/metron/profiler/spark/cli/BatchProfilerCLI.java b/metron-analytics/metron-profiler-spark/src/main/java/org/apache/metron/profiler/spark/cli/BatchProfilerCLI.java
index bdcf231..29fe4a2 100644
--- a/metron-analytics/metron-profiler-spark/src/main/java/org/apache/metron/profiler/spark/cli/BatchProfilerCLI.java
+++ b/metron-analytics/metron-profiler-spark/src/main/java/org/apache/metron/profiler/spark/cli/BatchProfilerCLI.java
@@ -37,9 +37,10 @@ import java.io.Serializable;
 import java.lang.invoke.MethodHandles;
 import java.util.Properties;
 
-import static org.apache.metron.profiler.spark.cli.BatchProfilerCLIOptions.CONFIGURATION_FILE;
+import static org.apache.metron.profiler.spark.cli.BatchProfilerCLIOptions.PROFILER_PROPS_FILE;
 import static org.apache.metron.profiler.spark.cli.BatchProfilerCLIOptions.GLOBALS_FILE;
 import static org.apache.metron.profiler.spark.cli.BatchProfilerCLIOptions.PROFILE_DEFN_FILE;
+import static org.apache.metron.profiler.spark.cli.BatchProfilerCLIOptions.READER_PROPS_FILE;
 import static org.apache.metron.profiler.spark.cli.BatchProfilerCLIOptions.parse;
 
 /**
@@ -54,7 +55,8 @@ import static org.apache.metron.profiler.spark.cli.BatchProfilerCLIOptions.parse
  *     metron-profiler-spark-<version>.jar \
  *     --config profiler.properties \
  *     --globals global.properties \
- *     --profiles profiles.json
+ *     --profiles profiles.json \
+ *     --reader reader.properties
  * }</pre>
  */
 public class BatchProfilerCLI implements Serializable {
@@ -63,6 +65,7 @@ public class BatchProfilerCLI implements Serializable {
 
   public static Properties globals;
   public static Properties profilerProps;
+  public static Properties readerProps;
   public static ProfilerConfig profiles;
 
   public static void main(String[] args) throws IOException, org.apache.commons.cli.ParseException {
@@ -71,6 +74,7 @@ public class BatchProfilerCLI implements Serializable {
     profilerProps = handleProfilerProperties(commandLine);
     globals = handleGlobals(commandLine);
     profiles = handleProfileDefinitions(commandLine);
+    readerProps = handleReaderProperties(commandLine);
 
     // the batch profiler must use 'event time'
     if(!profiles.getTimestampField().isPresent()) {
@@ -88,7 +92,7 @@ public class BatchProfilerCLI implements Serializable {
             .getOrCreate();
 
     BatchProfiler profiler = new BatchProfiler();
-    long count = profiler.run(spark, profilerProps, globals, profiles);
+    long count = profiler.run(spark, profilerProps, globals, readerProps, profiles);
     LOG.info("Profiler produced {} profile measurement(s)", count);
   }
 
@@ -117,13 +121,31 @@ public class BatchProfilerCLI implements Serializable {
    */
   private static Properties handleProfilerProperties(CommandLine commandLine) throws IOException {
     Properties config = new Properties();
-    if(CONFIGURATION_FILE.has(commandLine)) {
-      String propertiesPath = CONFIGURATION_FILE.get(commandLine);
+    if(PROFILER_PROPS_FILE.has(commandLine)) {
+      String propertiesPath = PROFILER_PROPS_FILE.get(commandLine);
 
       LOG.info("Loading profiler properties from '{}'", propertiesPath);
       config.load(new FileInputStream(propertiesPath));
 
-      LOG.info("Properties = {}", config.toString());
+      LOG.info("Profiler properties = {}", config.toString());
+    }
+    return config;
+  }
+
+  /**
+   * Load the properties for the {@link org.apache.spark.sql.DataFrameReader}.
+   *
+   * @param commandLine The command line.
+   */
+  private static Properties handleReaderProperties(CommandLine commandLine) throws IOException {
+    Properties config = new Properties();
+    if(READER_PROPS_FILE.has(commandLine)) {
+      String readerPropsPath = READER_PROPS_FILE.get(commandLine);
+
+      LOG.info("Loading reader properties from '{}'", readerPropsPath);
+      config.load(new FileInputStream(readerPropsPath));
+
+      LOG.info("Reader properties = {}", config.toString());
     }
     return config;
   }
@@ -171,4 +193,8 @@ public class BatchProfilerCLI implements Serializable {
   public static ProfilerConfig getProfiles() {
     return profiles;
   }
-}
\ No newline at end of file
+
+  public static Properties getReaderProps() {
+    return readerProps;
+  }
+}

http://git-wip-us.apache.org/repos/asf/metron/blob/1545978e/metron-analytics/metron-profiler-spark/src/main/java/org/apache/metron/profiler/spark/cli/BatchProfilerCLIOptions.java
----------------------------------------------------------------------
diff --git a/metron-analytics/metron-profiler-spark/src/main/java/org/apache/metron/profiler/spark/cli/BatchProfilerCLIOptions.java b/metron-analytics/metron-profiler-spark/src/main/java/org/apache/metron/profiler/spark/cli/BatchProfilerCLIOptions.java
index f5dfe12..d58728a 100644
--- a/metron-analytics/metron-profiler-spark/src/main/java/org/apache/metron/profiler/spark/cli/BatchProfilerCLIOptions.java
+++ b/metron-analytics/metron-profiler-spark/src/main/java/org/apache/metron/profiler/spark/cli/BatchProfilerCLIOptions.java
@@ -36,12 +36,12 @@ import java.util.function.Supplier;
 public enum BatchProfilerCLIOptions {
 
   PROFILE_DEFN_FILE(() -> {
-    Option o = new Option("p", "profiles", true, "Path to a file containing profile definitions.");
+    Option o = new Option("p", "profiles", true, "Path to the profile definitions.");
     o.setRequired(true);
     return o;
   }),
 
-  CONFIGURATION_FILE(() -> {
+  PROFILER_PROPS_FILE(() -> {
     Option o = new Option("c", "config", true, "Path to the profiler properties file.");
     o.setRequired(false);
     return o;
@@ -53,6 +53,12 @@ public enum BatchProfilerCLIOptions {
     return o;
   }),
 
+  READER_PROPS_FILE(() -> {
+    Option o = new Option("r", "reader", true, "Path to properties for the DataFrameReader.");
+    o.setRequired(false);
+    return o;
+  }),
+
   HELP(() -> {
     Option o = new Option("h", "help", false, "Usage instructions.");
     o.setRequired(false);

http://git-wip-us.apache.org/repos/asf/metron/blob/1545978e/metron-analytics/metron-profiler-spark/src/test/java/org/apache/metron/profiler/spark/BatchProfilerIntegrationTest.java
----------------------------------------------------------------------
diff --git a/metron-analytics/metron-profiler-spark/src/test/java/org/apache/metron/profiler/spark/BatchProfilerIntegrationTest.java b/metron-analytics/metron-profiler-spark/src/test/java/org/apache/metron/profiler/spark/BatchProfilerIntegrationTest.java
index 376623c..87c4246 100644
--- a/metron-analytics/metron-profiler-spark/src/test/java/org/apache/metron/profiler/spark/BatchProfilerIntegrationTest.java
+++ b/metron-analytics/metron-profiler-spark/src/test/java/org/apache/metron/profiler/spark/BatchProfilerIntegrationTest.java
@@ -30,11 +30,14 @@ import org.apache.metron.stellar.common.StellarStatefulExecutor;
 import org.apache.metron.stellar.dsl.Context;
 import org.apache.metron.stellar.dsl.functions.resolver.SimpleFunctionResolver;
 import org.apache.spark.SparkConf;
+import org.apache.spark.sql.Encoders;
 import org.apache.spark.sql.SparkSession;
 import org.junit.AfterClass;
 import org.junit.Before;
 import org.junit.BeforeClass;
+import org.junit.Rule;
 import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
 
 import java.io.IOException;
 import java.util.Collections;
@@ -82,8 +85,12 @@ public class BatchProfilerIntegrationTest {
   private static String profileJson;
   private static SparkSession spark;
   private Properties profilerProperties;
+  private Properties readerProperties;
   private StellarStatefulExecutor executor;
 
+  @Rule
+  public TemporaryFolder tempFolder = new TemporaryFolder();
+
   @BeforeClass
   public static void setupSpark() {
     SparkConf conf = new SparkConf()
@@ -105,12 +112,9 @@ public class BatchProfilerIntegrationTest {
 
   @Before
   public void setup() {
+    readerProperties = new Properties();
     profilerProperties = new Properties();
 
-    // the input telemetry is read from the local filesystem
-    profilerProperties.put(TELEMETRY_INPUT_PATH.getKey(), "src/test/resources/telemetry.json");
-    profilerProperties.put(TELEMETRY_INPUT_FORMAT.getKey(), "text");
-
     // the output will be written to a mock HBase table
     String tableName = HBASE_TABLE_NAME.get(profilerProperties, String.class);
     String columnFamily = HBASE_COLUMN_FAMILY.get(profilerProperties, String.class);
@@ -147,15 +151,80 @@ public class BatchProfilerIntegrationTest {
    * produced will center around this date.
    */
   @Test
-  public void testBatchProfiler() throws Exception {
-    // run the batch profiler
+  public void testBatchProfilerWithJSON() throws Exception {
+    // the input telemetry is text/json stored in the local filesystem
+    profilerProperties.put(TELEMETRY_INPUT_PATH.getKey(), "src/test/resources/telemetry.json");
+    profilerProperties.put(TELEMETRY_INPUT_FORMAT.getKey(), "text");
+
+    BatchProfiler profiler = new BatchProfiler();
+    profiler.run(spark, profilerProperties, getGlobals(), readerProperties, getProfile());
+
+    validateProfiles();
+  }
+
+  @Test
+  public void testBatchProfilerWithORC() throws Exception {
+    // re-write the test data as ORC
+    String pathToORC = tempFolder.getRoot().getAbsolutePath();
+    spark.read()
+            .format("text")
+            .load("src/test/resources/telemetry.json")
+            .as(Encoders.STRING())
+            .write()
+            .mode("overwrite")
+            .format("org.apache.spark.sql.execution.datasources.orc")
+            .save(pathToORC);
+
+    // tell the profiler to use the ORC input data
+    profilerProperties.put(TELEMETRY_INPUT_PATH.getKey(), pathToORC);
+    profilerProperties.put(TELEMETRY_INPUT_FORMAT.getKey(), "org.apache.spark.sql.execution.datasources.orc");
+
+    BatchProfiler profiler = new BatchProfiler();
+    profiler.run(spark, profilerProperties, getGlobals(), readerProperties, getProfile());
+
+    validateProfiles();
+  }
+
+  @Test
+  public void testBatchProfilerWithCSV() throws Exception {
+    // re-write the test data as a CSV with a header record
+    String pathToCSV = tempFolder.getRoot().getAbsolutePath();
+    spark.read()
+            .format("text")
+            .load("src/test/resources/telemetry.json")
+            .as(Encoders.STRING())
+            .write()
+            .mode("overwrite")
+            .option("header", "true")
+            .format("csv")
+            .save(pathToCSV);
+
+    // tell the profiler to use the CSV input data
+    profilerProperties.put(TELEMETRY_INPUT_PATH.getKey(), pathToCSV);
+    profilerProperties.put(TELEMETRY_INPUT_FORMAT.getKey(), "csv");
+
+    // set a reader property; tell the reader to expect a header
+    readerProperties.put("header", "true");
+
     BatchProfiler profiler = new BatchProfiler();
-    profiler.run(spark, profilerProperties, getGlobals(), getProfile());
+    profiler.run(spark, profilerProperties, getGlobals(), readerProperties, getProfile());
+
+    validateProfiles();
+  }
+
+  /**
+   * Validates the profiles that were built.
+   *
+   * These tests use the Batch Profiler to seed two profiles with archived telemetry.  The first profile
+   * called 'count-by-ip', counts the number of messages by 'ip_src_addr'.  The second profile called
+   * 'total-count', counts the total number of messages.
+   */
+  private void validateProfiles() {
+    // the max timestamp in the data is around July 7, 2018
+    assign("maxTimestamp", "1530978728982L");
 
-    // validate the measurements written by the batch profiler using `PROFILE_GET`
-    // the 'window' looks up to 5 hours before the last timestamp contained in the telemetry
-    assign("lastTimestamp", "1530978728982L");
-    assign("window", "PROFILE_WINDOW('from 5 hours ago', lastTimestamp)");
+    // the 'window' looks up to 5 hours before the max timestamp
+    assign("window", "PROFILE_WINDOW('from 5 hours ago', maxTimestamp)");
 
     // there are 26 messages where ip_src_addr = 192.168.66.1
     assertTrue(execute("[26] == PROFILE_GET('count-by-ip', '192.168.66.1', window)", Boolean.class));