You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@metron.apache.org by ot...@apache.org on 2018/10/24 15:03:58 UTC
[09/51] [abbrv] metron git commit: METRON-1772 Support alternative
input formats in the Batch Profiler (nickwallen) closes apache/metron#1191
METRON-1772 Support alternative input formats in the Batch Profiler (nickwallen) closes apache/metron#1191
Project: http://git-wip-us.apache.org/repos/asf/metron/repo
Commit: http://git-wip-us.apache.org/repos/asf/metron/commit/1545978e
Tree: http://git-wip-us.apache.org/repos/asf/metron/tree/1545978e
Diff: http://git-wip-us.apache.org/repos/asf/metron/diff/1545978e
Branch: refs/heads/feature/METRON-1090-stellar-assignment
Commit: 1545978e169a01e4a06735b8713c8fa65373a394
Parents: f83f0ac
Author: nickwallen <ni...@nickallen.org>
Authored: Wed Sep 19 10:11:28 2018 -0400
Committer: nickallen <ni...@apache.org>
Committed: Wed Sep 19 10:11:28 2018 -0400
----------------------------------------------------------------------
.../metron-profiler-spark/README.md | 47 +++++++++-
metron-analytics/metron-profiler-spark/pom.xml | 18 ++--
.../metron/profiler/spark/BatchProfiler.java | 21 +++--
.../profiler/spark/cli/BatchProfilerCLI.java | 40 +++++++--
.../spark/cli/BatchProfilerCLIOptions.java | 10 ++-
.../spark/BatchProfilerIntegrationTest.java | 91 +++++++++++++++++---
6 files changed, 189 insertions(+), 38 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/metron/blob/1545978e/metron-analytics/metron-profiler-spark/README.md
----------------------------------------------------------------------
diff --git a/metron-analytics/metron-profiler-spark/README.md b/metron-analytics/metron-profiler-spark/README.md
index 3d7017c..99e8c7e 100644
--- a/metron-analytics/metron-profiler-spark/README.md
+++ b/metron-analytics/metron-profiler-spark/README.md
@@ -131,6 +131,14 @@ The Batch Profiler requires Spark version 2.3.0+.
## Running the Profiler
+* [Usage](#usage)
+* [Advanced Usage](#advanced-usage)
+* [Spark Execution](#spark-execution)
+* [Kerberos](#kerberos)
+* [Input Formats](#input-formats)
+
+### Usage
+
A script located at `$METRON_HOME/bin/start_batch_profiler.sh` has been provided to simplify running the Batch Profiler. This script makes the following assumptions.
* The script builds the profiles defined in `$METRON_HOME/config/zookeeper/profiler.json`.
@@ -156,11 +164,28 @@ The Batch Profiler accepts the following arguments when run from the command lin
| Argument | Description
|--- |---
-| -p, --profiles | The path to a file containing the profile definitions.
-| -c, --config | The path to the profiler properties file.
-| -g, --globals | The path to a properties file containing global properties.
+| -p, --profiles | Path to the profile definitions.
+| -c, --config | Path to the profiler properties file.
+| -g, --globals | Path to the Stellar global config file.
+| -r, --reader | Path to properties for the DataFrameReader.
| -h, --help | Print the help text.
+#### `--profiles`
+
+The path to a file containing the profile definition in JSON.
+
+#### `--config`
+
+The path to a file containing key-value properties for the Profiler. This file would contain the properties described under [Configuring the Profiler](#configuring-the-profiler).
+
+#### `--globals`
+
+The path to a file containing key-value properties that define the global properties. This can be used to customize how certain Stellar functions behave during execution.
+
+#### `--reader`
+
+The path to a file containing key-value properties that are passed to the DataFrameReader when reading the input telemetry. This allows additional customization for how the input telemetry is read.
+
### Spark Execution
Spark supports a number of different [cluster managers](https://spark.apache.org/docs/latest/cluster-overview.html#cluster-manager-types). The underlying cluster manager is transparent to the Profiler. To run the Profiler on a particular cluster manager, it is just a matter of setting the appropriate options as defined in the Spark documentation.
@@ -191,10 +216,24 @@ The following command can be useful to review the logs generated when the Profil
yarn logs -applicationId <application-id>
```
-#### Kerberos
+### Kerberos
See the Spark documentation for information on running the Batch Profiler in a [secure, kerberized cluster](https://spark.apache.org/docs/latest/running-on-yarn.html#running-in-a-secure-cluster).
+### Input Formats
+
+The Profiler can consume archived telemetry stored in a variety of input formats. By default, it is configured to consume the text/json that Metron archives in HDFS. This is often not the best format for archiving telemetry. If you choose a different format, you should be able to configure the Profiler to consume it by doing the following.
+
+1. Edit [`profiler.batch.input.format`](#profilerbatchinputformat) and [`profiler.batch.input.path`](#profilerbatchinputpath) as needed. For example, to read ORC you might do the following.
+
+ `$METRON_HOME/config/batch-profiler.properties`
+ ```
+ profiler.batch.input.format=org.apache.spark.sql.execution.datasources.orc
+ profiler.batch.input.path=hdfs://localhost:9000/apps/metron/indexing/orc/\*/\*
+ ```
+
+1. If additional options are required for your input format, then use the [`--reader`](#--reader) command-line argument when launching the Batch Profiler as [described here](#advanced-usage).
+
## Configuring the Profiler
http://git-wip-us.apache.org/repos/asf/metron/blob/1545978e/metron-analytics/metron-profiler-spark/pom.xml
----------------------------------------------------------------------
diff --git a/metron-analytics/metron-profiler-spark/pom.xml b/metron-analytics/metron-profiler-spark/pom.xml
index 587b38c..668ee2c 100644
--- a/metron-analytics/metron-profiler-spark/pom.xml
+++ b/metron-analytics/metron-profiler-spark/pom.xml
@@ -25,6 +25,7 @@
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
+ <spark_antlr_version>4.7</spark_antlr_version>
</properties>
<dependencies>
<dependency>
@@ -36,12 +37,11 @@
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.11</artifactId>
<version>${global_spark_version}</version>
- <exclusions>
- <exclusion>
- <groupId>org.antlr</groupId>
- <artifactId>antlr-runtime</artifactId>
- </exclusion>
- </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>org.antlr</groupId>
+ <artifactId>antlr4-runtime</artifactId>
+ <version>${spark_antlr_version}</version>
</dependency>
<dependency>
<groupId>org.apache.metron</groupId>
@@ -53,6 +53,12 @@
<artifactId>metron-profiler-client</artifactId>
<version>${project.parent.version}</version>
<scope>test</scope>
+ <exclusions>
+ <exclusion>
+ <groupId>org.antlr</groupId>
+ <artifactId>antlr4-runtime</artifactId>
+ </exclusion>
+ </exclusions>
</dependency>
<dependency>
<groupId>org.apache.metron</groupId>
http://git-wip-us.apache.org/repos/asf/metron/blob/1545978e/metron-analytics/metron-profiler-spark/src/main/java/org/apache/metron/profiler/spark/BatchProfiler.java
----------------------------------------------------------------------
diff --git a/metron-analytics/metron-profiler-spark/src/main/java/org/apache/metron/profiler/spark/BatchProfiler.java b/metron-analytics/metron-profiler-spark/src/main/java/org/apache/metron/profiler/spark/BatchProfiler.java
index f999613..d75abc3 100644
--- a/metron-analytics/metron-profiler-spark/src/main/java/org/apache/metron/profiler/spark/BatchProfiler.java
+++ b/metron-analytics/metron-profiler-spark/src/main/java/org/apache/metron/profiler/spark/BatchProfiler.java
@@ -34,6 +34,7 @@ import org.slf4j.LoggerFactory;
import java.io.Serializable;
import java.lang.invoke.MethodHandles;
+import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
@@ -54,25 +55,29 @@ public class BatchProfiler implements Serializable {
* Execute the Batch Profiler.
*
* @param spark The spark session.
- * @param properties The profiler configuration properties.
+ * @param profilerProps The profiler configuration properties.
+ * @param globalProperties The Stellar global properties.
+ * @param readerProps The properties passed to the {@link org.apache.spark.sql.DataFrameReader}.
* @param profiles The profile definitions.
* @return The number of profile measurements produced.
*/
public long run(SparkSession spark,
- Properties properties,
+ Properties profilerProps,
Properties globalProperties,
+ Properties readerProps,
ProfilerConfig profiles) {
LOG.debug("Building {} profile(s)", profiles.getProfiles().size());
Map<String, String> globals = Maps.fromProperties(globalProperties);
- String inputFormat = TELEMETRY_INPUT_FORMAT.get(properties, String.class);
- String inputPath = TELEMETRY_INPUT_PATH.get(properties, String.class);
+ String inputFormat = TELEMETRY_INPUT_FORMAT.get(profilerProps, String.class);
+ String inputPath = TELEMETRY_INPUT_PATH.get(profilerProps, String.class);
LOG.debug("Loading telemetry from '{}'", inputPath);
// fetch the archived telemetry
Dataset<String> telemetry = spark
.read()
+ .options(Maps.fromProperties(readerProps))
.format(inputFormat)
.load(inputPath)
.as(Encoders.STRING());
@@ -85,13 +90,13 @@ public class BatchProfiler implements Serializable {
// build the profiles
Dataset<ProfileMeasurementAdapter> measurements = routes
- .groupByKey(new GroupByPeriodFunction(properties), Encoders.STRING())
- .mapGroups(new ProfileBuilderFunction(properties, globals), Encoders.bean(ProfileMeasurementAdapter.class));
+ .groupByKey(new GroupByPeriodFunction(profilerProps), Encoders.STRING())
+ .mapGroups(new ProfileBuilderFunction(profilerProps, globals), Encoders.bean(ProfileMeasurementAdapter.class));
LOG.debug("Produced {} profile measurement(s)", measurements.cache().count());
// write the profile measurements to HBase
long count = measurements
- .mapPartitions(new HBaseWriterFunction(properties), Encoders.INT())
+ .mapPartitions(new HBaseWriterFunction(profilerProps), Encoders.INT())
.agg(sum("value"))
.head()
.getLong(0);
@@ -99,4 +104,4 @@ public class BatchProfiler implements Serializable {
return count;
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/metron/blob/1545978e/metron-analytics/metron-profiler-spark/src/main/java/org/apache/metron/profiler/spark/cli/BatchProfilerCLI.java
----------------------------------------------------------------------
diff --git a/metron-analytics/metron-profiler-spark/src/main/java/org/apache/metron/profiler/spark/cli/BatchProfilerCLI.java b/metron-analytics/metron-profiler-spark/src/main/java/org/apache/metron/profiler/spark/cli/BatchProfilerCLI.java
index bdcf231..29fe4a2 100644
--- a/metron-analytics/metron-profiler-spark/src/main/java/org/apache/metron/profiler/spark/cli/BatchProfilerCLI.java
+++ b/metron-analytics/metron-profiler-spark/src/main/java/org/apache/metron/profiler/spark/cli/BatchProfilerCLI.java
@@ -37,9 +37,10 @@ import java.io.Serializable;
import java.lang.invoke.MethodHandles;
import java.util.Properties;
-import static org.apache.metron.profiler.spark.cli.BatchProfilerCLIOptions.CONFIGURATION_FILE;
+import static org.apache.metron.profiler.spark.cli.BatchProfilerCLIOptions.PROFILER_PROPS_FILE;
import static org.apache.metron.profiler.spark.cli.BatchProfilerCLIOptions.GLOBALS_FILE;
import static org.apache.metron.profiler.spark.cli.BatchProfilerCLIOptions.PROFILE_DEFN_FILE;
+import static org.apache.metron.profiler.spark.cli.BatchProfilerCLIOptions.READER_PROPS_FILE;
import static org.apache.metron.profiler.spark.cli.BatchProfilerCLIOptions.parse;
/**
@@ -54,7 +55,8 @@ import static org.apache.metron.profiler.spark.cli.BatchProfilerCLIOptions.parse
* metron-profiler-spark-<version>.jar \
* --config profiler.properties \
* --globals global.properties \
- * --profiles profiles.json
+ * --profiles profiles.json \
+ * --reader reader.properties
* }</pre>
*/
public class BatchProfilerCLI implements Serializable {
@@ -63,6 +65,7 @@ public class BatchProfilerCLI implements Serializable {
public static Properties globals;
public static Properties profilerProps;
+ public static Properties readerProps;
public static ProfilerConfig profiles;
public static void main(String[] args) throws IOException, org.apache.commons.cli.ParseException {
@@ -71,6 +74,7 @@ public class BatchProfilerCLI implements Serializable {
profilerProps = handleProfilerProperties(commandLine);
globals = handleGlobals(commandLine);
profiles = handleProfileDefinitions(commandLine);
+ readerProps = handleReaderProperties(commandLine);
// the batch profiler must use 'event time'
if(!profiles.getTimestampField().isPresent()) {
@@ -88,7 +92,7 @@ public class BatchProfilerCLI implements Serializable {
.getOrCreate();
BatchProfiler profiler = new BatchProfiler();
- long count = profiler.run(spark, profilerProps, globals, profiles);
+ long count = profiler.run(spark, profilerProps, globals, readerProps, profiles);
LOG.info("Profiler produced {} profile measurement(s)", count);
}
@@ -117,13 +121,31 @@ public class BatchProfilerCLI implements Serializable {
*/
private static Properties handleProfilerProperties(CommandLine commandLine) throws IOException {
Properties config = new Properties();
- if(CONFIGURATION_FILE.has(commandLine)) {
- String propertiesPath = CONFIGURATION_FILE.get(commandLine);
+ if(PROFILER_PROPS_FILE.has(commandLine)) {
+ String propertiesPath = PROFILER_PROPS_FILE.get(commandLine);
LOG.info("Loading profiler properties from '{}'", propertiesPath);
config.load(new FileInputStream(propertiesPath));
- LOG.info("Properties = {}", config.toString());
+ LOG.info("Profiler properties = {}", config.toString());
+ }
+ return config;
+ }
+
+ /**
+ * Load the properties for the {@link org.apache.spark.sql.DataFrameReader}.
+ *
+ * @param commandLine The command line.
+ */
+ private static Properties handleReaderProperties(CommandLine commandLine) throws IOException {
+ Properties config = new Properties();
+ if(READER_PROPS_FILE.has(commandLine)) {
+ String readerPropsPath = READER_PROPS_FILE.get(commandLine);
+
+ LOG.info("Loading reader properties from '{}'", readerPropsPath);
+ config.load(new FileInputStream(readerPropsPath));
+
+ LOG.info("Reader properties = {}", config.toString());
}
return config;
}
@@ -171,4 +193,8 @@ public class BatchProfilerCLI implements Serializable {
public static ProfilerConfig getProfiles() {
return profiles;
}
-}
\ No newline at end of file
+
+ public static Properties getReaderProps() {
+ return readerProps;
+ }
+}
http://git-wip-us.apache.org/repos/asf/metron/blob/1545978e/metron-analytics/metron-profiler-spark/src/main/java/org/apache/metron/profiler/spark/cli/BatchProfilerCLIOptions.java
----------------------------------------------------------------------
diff --git a/metron-analytics/metron-profiler-spark/src/main/java/org/apache/metron/profiler/spark/cli/BatchProfilerCLIOptions.java b/metron-analytics/metron-profiler-spark/src/main/java/org/apache/metron/profiler/spark/cli/BatchProfilerCLIOptions.java
index f5dfe12..d58728a 100644
--- a/metron-analytics/metron-profiler-spark/src/main/java/org/apache/metron/profiler/spark/cli/BatchProfilerCLIOptions.java
+++ b/metron-analytics/metron-profiler-spark/src/main/java/org/apache/metron/profiler/spark/cli/BatchProfilerCLIOptions.java
@@ -36,12 +36,12 @@ import java.util.function.Supplier;
public enum BatchProfilerCLIOptions {
PROFILE_DEFN_FILE(() -> {
- Option o = new Option("p", "profiles", true, "Path to a file containing profile definitions.");
+ Option o = new Option("p", "profiles", true, "Path to the profile definitions.");
o.setRequired(true);
return o;
}),
- CONFIGURATION_FILE(() -> {
+ PROFILER_PROPS_FILE(() -> {
Option o = new Option("c", "config", true, "Path to the profiler properties file.");
o.setRequired(false);
return o;
@@ -53,6 +53,12 @@ public enum BatchProfilerCLIOptions {
return o;
}),
+ READER_PROPS_FILE(() -> {
+ Option o = new Option("r", "reader", true, "Path to properties for the DataFrameReader.");
+ o.setRequired(false);
+ return o;
+ }),
+
HELP(() -> {
Option o = new Option("h", "help", false, "Usage instructions.");
o.setRequired(false);
http://git-wip-us.apache.org/repos/asf/metron/blob/1545978e/metron-analytics/metron-profiler-spark/src/test/java/org/apache/metron/profiler/spark/BatchProfilerIntegrationTest.java
----------------------------------------------------------------------
diff --git a/metron-analytics/metron-profiler-spark/src/test/java/org/apache/metron/profiler/spark/BatchProfilerIntegrationTest.java b/metron-analytics/metron-profiler-spark/src/test/java/org/apache/metron/profiler/spark/BatchProfilerIntegrationTest.java
index 376623c..87c4246 100644
--- a/metron-analytics/metron-profiler-spark/src/test/java/org/apache/metron/profiler/spark/BatchProfilerIntegrationTest.java
+++ b/metron-analytics/metron-profiler-spark/src/test/java/org/apache/metron/profiler/spark/BatchProfilerIntegrationTest.java
@@ -30,11 +30,14 @@ import org.apache.metron.stellar.common.StellarStatefulExecutor;
import org.apache.metron.stellar.dsl.Context;
import org.apache.metron.stellar.dsl.functions.resolver.SimpleFunctionResolver;
import org.apache.spark.SparkConf;
+import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.SparkSession;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
+import org.junit.Rule;
import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
import java.io.IOException;
import java.util.Collections;
@@ -82,8 +85,12 @@ public class BatchProfilerIntegrationTest {
private static String profileJson;
private static SparkSession spark;
private Properties profilerProperties;
+ private Properties readerProperties;
private StellarStatefulExecutor executor;
+ @Rule
+ public TemporaryFolder tempFolder = new TemporaryFolder();
+
@BeforeClass
public static void setupSpark() {
SparkConf conf = new SparkConf()
@@ -105,12 +112,9 @@ public class BatchProfilerIntegrationTest {
@Before
public void setup() {
+ readerProperties = new Properties();
profilerProperties = new Properties();
- // the input telemetry is read from the local filesystem
- profilerProperties.put(TELEMETRY_INPUT_PATH.getKey(), "src/test/resources/telemetry.json");
- profilerProperties.put(TELEMETRY_INPUT_FORMAT.getKey(), "text");
-
// the output will be written to a mock HBase table
String tableName = HBASE_TABLE_NAME.get(profilerProperties, String.class);
String columnFamily = HBASE_COLUMN_FAMILY.get(profilerProperties, String.class);
@@ -147,15 +151,80 @@ public class BatchProfilerIntegrationTest {
* produced will center around this date.
*/
@Test
- public void testBatchProfiler() throws Exception {
- // run the batch profiler
+ public void testBatchProfilerWithJSON() throws Exception {
+ // the input telemetry is text/json stored in the local filesystem
+ profilerProperties.put(TELEMETRY_INPUT_PATH.getKey(), "src/test/resources/telemetry.json");
+ profilerProperties.put(TELEMETRY_INPUT_FORMAT.getKey(), "text");
+
+ BatchProfiler profiler = new BatchProfiler();
+ profiler.run(spark, profilerProperties, getGlobals(), readerProperties, getProfile());
+
+ validateProfiles();
+ }
+
+ @Test
+ public void testBatchProfilerWithORC() throws Exception {
+ // re-write the test data as ORC
+ String pathToORC = tempFolder.getRoot().getAbsolutePath();
+ spark.read()
+ .format("text")
+ .load("src/test/resources/telemetry.json")
+ .as(Encoders.STRING())
+ .write()
+ .mode("overwrite")
+ .format("org.apache.spark.sql.execution.datasources.orc")
+ .save(pathToORC);
+
+ // tell the profiler to use the ORC input data
+ profilerProperties.put(TELEMETRY_INPUT_PATH.getKey(), pathToORC);
+ profilerProperties.put(TELEMETRY_INPUT_FORMAT.getKey(), "org.apache.spark.sql.execution.datasources.orc");
+
+ BatchProfiler profiler = new BatchProfiler();
+ profiler.run(spark, profilerProperties, getGlobals(), readerProperties, getProfile());
+
+ validateProfiles();
+ }
+
+ @Test
+ public void testBatchProfilerWithCSV() throws Exception {
+ // re-write the test data as a CSV with a header record
+ String pathToCSV = tempFolder.getRoot().getAbsolutePath();
+ spark.read()
+ .format("text")
+ .load("src/test/resources/telemetry.json")
+ .as(Encoders.STRING())
+ .write()
+ .mode("overwrite")
+ .option("header", "true")
+ .format("csv")
+ .save(pathToCSV);
+
+ // tell the profiler to use the CSV input data
+ profilerProperties.put(TELEMETRY_INPUT_PATH.getKey(), pathToCSV);
+ profilerProperties.put(TELEMETRY_INPUT_FORMAT.getKey(), "csv");
+
+ // set a reader property; tell the reader to expect a header
+ readerProperties.put("header", "true");
+
BatchProfiler profiler = new BatchProfiler();
- profiler.run(spark, profilerProperties, getGlobals(), getProfile());
+ profiler.run(spark, profilerProperties, getGlobals(), readerProperties, getProfile());
+
+ validateProfiles();
+ }
+
+ /**
+ * Validates the profiles that were built.
+ *
+ * These tests use the Batch Profiler to seed two profiles with archived telemetry. The first profile
+ * called 'count-by-ip', counts the number of messages by 'ip_src_addr'. The second profile called
+ * 'total-count', counts the total number of messages.
+ */
+ private void validateProfiles() {
+ // the max timestamp in the data is around July 7, 2018
+ assign("maxTimestamp", "1530978728982L");
- // validate the measurements written by the batch profiler using `PROFILE_GET`
- // the 'window' looks up to 5 hours before the last timestamp contained in the telemetry
- assign("lastTimestamp", "1530978728982L");
- assign("window", "PROFILE_WINDOW('from 5 hours ago', lastTimestamp)");
+ // the 'window' looks up to 5 hours before the max timestamp
+ assign("window", "PROFILE_WINDOW('from 5 hours ago', maxTimestamp)");
// there are 26 messages where ip_src_addr = 192.168.66.1
assertTrue(execute("[26] == PROFILE_GET('count-by-ip', '192.168.66.1', window)", Boolean.class));