You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@metron.apache.org by ni...@apache.org on 2018/10/09 17:56:38 UTC

metron git commit: METRON-1809 Support Column Oriented Input with Batch Profiler (nickwallen) closes apache/metron#1229

Repository: metron
Updated Branches:
  refs/heads/master e48236672 -> 3467ffdb1


METRON-1809 Support Column Oriented Input with Batch Profiler (nickwallen) closes apache/metron#1229


Project: http://git-wip-us.apache.org/repos/asf/metron/repo
Commit: http://git-wip-us.apache.org/repos/asf/metron/commit/3467ffdb
Tree: http://git-wip-us.apache.org/repos/asf/metron/tree/3467ffdb
Diff: http://git-wip-us.apache.org/repos/asf/metron/diff/3467ffdb

Branch: refs/heads/master
Commit: 3467ffdb13aad22ab1e4065a0b6715b99371aa23
Parents: e482366
Author: nickwallen <ni...@nickallen.org>
Authored: Tue Oct 9 13:55:45 2018 -0400
Committer: nickallen <ni...@apache.org>
Committed: Tue Oct 9 13:55:45 2018 -0400

----------------------------------------------------------------------
 .../metron-profiler-spark/README.md             |  53 +++++++--
 .../metron/profiler/spark/BatchProfiler.java    |  21 ++--
 .../profiler/spark/BatchProfilerConfig.java     |   9 +-
 .../reader/ColumnEncodedTelemetryReader.java    |  84 +++++++++++++
 .../profiler/spark/reader/TelemetryReader.java  |  43 +++++++
 .../profiler/spark/reader/TelemetryReaders.java | 110 +++++++++++++++++
 .../reader/TextEncodedTelemetryReader.java      |  83 +++++++++++++
 .../spark/BatchProfilerIntegrationTest.java     |  36 +++++-
 .../ColumnEncodedTelemetryReaderTest.java       | 118 +++++++++++++++++++
 .../spark/function/reader/IsValidJSON.java      |  38 ++++++
 .../function/reader/TelemetryReadersTest.java   |  89 ++++++++++++++
 .../reader/TextEncodedTelemetryReaderTest.java  | 114 ++++++++++++++++++
 12 files changed, 770 insertions(+), 28 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/metron/blob/3467ffdb/metron-analytics/metron-profiler-spark/README.md
----------------------------------------------------------------------
diff --git a/metron-analytics/metron-profiler-spark/README.md b/metron-analytics/metron-profiler-spark/README.md
index df143f1..5ed5f4b 100644
--- a/metron-analytics/metron-profiler-spark/README.md
+++ b/metron-analytics/metron-profiler-spark/README.md
@@ -162,13 +162,13 @@ ${SPARK_HOME}/bin/spark-submit \
 
 The Batch Profiler accepts the following arguments when run from the command line as shown above.  All arguments following the Profiler jar are passed to the Profiler.  All argument preceeding the Profiler jar are passed to Spark.
 
-| Argument         | Description
-|---               |---
-| -p, --profiles   | Path to the profile definitions.
-| -c, --config     | Path to the profiler properties file.
-| -g, --globals    | Path to the Stellar global config file.
-| -r, --reader     | Path to properties for the DataFrameReader.
-| -h, --help       | Print the help text.
+| Argument                              | Description
+|---                                    |---
+| [`-p`, `--profiles`](#--profiles)     | Path to the profile definitions.
+| [`-c`, `--config`](#--config)         | Path to the profiler properties file.
+| [`-g`, `--globals`](#--globals)       | Path to the Stellar global config file.
+| [`-r`, `--reader`](#--reader)         | Path to properties for the DataFrameReader.
+| `-h`, `--help`                        | Print the help text.
 
 #### `--profiles`
 
@@ -234,6 +234,28 @@ The Profiler can consume archived telemetry stored in a variety of input formats
 
 1. If additional options are required for your input format, then use the [`--reader`](#--reader) command-line argument when launching the Batch Profiler as [described here](#advanced-usage).
 
+#### Common Formats
+
+The following examples highlight the configuration values needed to read telemetry stored in common formats.  These values should be defined in the Profiler properties (see [`--config`](#--config)).
+
+##### JSON
+```
+profiler.batch.input.reader=json
+profiler.batch.input.path=/path/to/json/
+```
+
+##### [Apache ORC](https://orc.apache.org/)
+```
+profiler.batch.input.reader=orc
+profiler.batch.input.path=/path/to/orc/
+```
+
+##### [Apache Parquet](http://parquet.apache.org/)
+```
+profiler.batch.input.reader=parquet
+profiler.batch.input.path=/path/to/parquet/
+```
+
 
 ## Configuring the Profiler
 
@@ -244,6 +266,7 @@ You can store both settings for the Profiler along with settings for Spark in th
 | Setting                                                                       | Description
 |---                                                                            |---
 | [`profiler.batch.input.path`](#profilerbatchinputpath)                        | The path to the input data read by the Batch Profiler.
+| [`profiler.batch.input.reader`](#profilerbatchinputreader)                    | The telemetry reader used to read the input data.
 | [`profiler.batch.input.format`](#profilerbatchinputformat)                    | The format of the input data read by the Batch Profiler.
 | [`profiler.batch.input.begin`](#profilerbatchinputend)                        | Only messages with a timestamp after this will be profiled.
 | [`profiler.batch.input.end`](#profilerbatchinputbegin)                        | Only messages with a timestamp before this will be profiled.
@@ -259,11 +282,25 @@ You can store both settings for the Profiler along with settings for Spark in th
 
 The path to the input data read by the Batch Profiler.
 
+### `profiler.batch.input.reader`
+
+*Default*: json
+
+Defines how the input data is treated when read.  The value is not case sensitive so `JSON` and `json` are equivalent.
+
+ * `json`: Read text/json formatted telemetry
+ * `orc`: Read [Apache ORC](https://orc.apache.org/) formatted telemetry
+ * `parquet`: Read [Apache Parquet](http://parquet.apache.org/) formatted telemetry
+ * `text` Consumes input data stored as raw text. Should be defined along with [`profiler.batch.input.format`](#profilerbatchinputformat). Only use if the input format is not directly supported like `json`.
+ * `columnar` Consumes input data stored in columnar formats. Should be defined along with [`profiler.batch.input.format`](#profilerbatchinputformat).  Only use if the input format is not directly supported like `json`.
+
+See [Common Formats](#common-formats) for further information.
+
 ### `profiler.batch.input.format`
 
 *Default*: text
 
-The format of the input data read by the Batch Profiler.
+The format of the input data read by the Batch Profiler. This is optional and not required in most cases. For example, this property is not required when [`profiler.batch.input.reader`](#profilerbatchinputreader)  is `json`, `orc`, or `parquet`.
 
 ### `profiler.batch.input.begin`
 

http://git-wip-us.apache.org/repos/asf/metron/blob/3467ffdb/metron-analytics/metron-profiler-spark/src/main/java/org/apache/metron/profiler/spark/BatchProfiler.java
----------------------------------------------------------------------
diff --git a/metron-analytics/metron-profiler-spark/src/main/java/org/apache/metron/profiler/spark/BatchProfiler.java b/metron-analytics/metron-profiler-spark/src/main/java/org/apache/metron/profiler/spark/BatchProfiler.java
index 39f8b3a..571545e 100644
--- a/metron-analytics/metron-profiler-spark/src/main/java/org/apache/metron/profiler/spark/BatchProfiler.java
+++ b/metron-analytics/metron-profiler-spark/src/main/java/org/apache/metron/profiler/spark/BatchProfiler.java
@@ -26,6 +26,8 @@ import org.apache.metron.profiler.spark.function.GroupByPeriodFunction;
 import org.apache.metron.profiler.spark.function.HBaseWriterFunction;
 import org.apache.metron.profiler.spark.function.MessageRouterFunction;
 import org.apache.metron.profiler.spark.function.ProfileBuilderFunction;
+import org.apache.metron.profiler.spark.reader.TelemetryReader;
+import org.apache.metron.profiler.spark.reader.TelemetryReaders;
 import org.apache.spark.sql.Dataset;
 import org.apache.spark.sql.Encoders;
 import org.apache.spark.sql.SparkSession;
@@ -40,8 +42,7 @@ import java.util.Properties;
 
 import static org.apache.metron.profiler.spark.BatchProfilerConfig.TELEMETRY_INPUT_BEGIN;
 import static org.apache.metron.profiler.spark.BatchProfilerConfig.TELEMETRY_INPUT_END;
-import static org.apache.metron.profiler.spark.BatchProfilerConfig.TELEMETRY_INPUT_FORMAT;
-import static org.apache.metron.profiler.spark.BatchProfilerConfig.TELEMETRY_INPUT_PATH;
+import static org.apache.metron.profiler.spark.BatchProfilerConfig.TELEMETRY_INPUT_READER;
 import static org.apache.spark.sql.functions.sum;
 
 /**
@@ -54,6 +55,7 @@ public class BatchProfiler implements Serializable {
   protected static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
 
   private TimestampParser timestampParser;
+  private TelemetryReader reader;
 
   public BatchProfiler() {
     this.timestampParser = new TimestampParser();
@@ -77,17 +79,10 @@ public class BatchProfiler implements Serializable {
 
     LOG.debug("Building {} profile(s)", profiles.getProfiles().size());
     Map<String, String> globals = Maps.fromProperties(globalProperties);
-    String inputFormat = TELEMETRY_INPUT_FORMAT.get(profilerProps, String.class);
-    String inputPath = TELEMETRY_INPUT_PATH.get(profilerProps, String.class);
-    LOG.debug("Loading telemetry from '{}'", inputPath);
-
-    // fetch the archived telemetry
-    Dataset<String> telemetry = spark
-            .read()
-            .options(Maps.fromProperties(readerProps))
-            .format(inputFormat)
-            .load(inputPath)
-            .as(Encoders.STRING());
+    
+    // fetch the archived telemetry using the input reader
+    TelemetryReader reader = TelemetryReaders.create(TELEMETRY_INPUT_READER.get(profilerProps, String.class));
+    Dataset<String> telemetry = reader.read(spark, profilerProps, readerProps);
     LOG.debug("Found {} telemetry record(s)", telemetry.cache().count());
 
     // find all routes for each message

http://git-wip-us.apache.org/repos/asf/metron/blob/3467ffdb/metron-analytics/metron-profiler-spark/src/main/java/org/apache/metron/profiler/spark/BatchProfilerConfig.java
----------------------------------------------------------------------
diff --git a/metron-analytics/metron-profiler-spark/src/main/java/org/apache/metron/profiler/spark/BatchProfilerConfig.java b/metron-analytics/metron-profiler-spark/src/main/java/org/apache/metron/profiler/spark/BatchProfilerConfig.java
index e8cd160..148d970 100644
--- a/metron-analytics/metron-profiler-spark/src/main/java/org/apache/metron/profiler/spark/BatchProfilerConfig.java
+++ b/metron-analytics/metron-profiler-spark/src/main/java/org/apache/metron/profiler/spark/BatchProfilerConfig.java
@@ -25,6 +25,9 @@ import org.apache.metron.stellar.common.utils.ConversionUtils;
 import java.util.Map;
 import java.util.Properties;
 
+import static org.apache.metron.profiler.spark.reader.TelemetryReaders.JSON;
+import static org.apache.metron.profiler.spark.reader.TelemetryReaders.TEXT;
+
 /**
  * Defines the configuration values recognized by the Batch Profiler.
  */
@@ -44,9 +47,11 @@ public enum BatchProfilerConfig {
 
   HBASE_WRITE_DURABILITY("profiler.hbase.durability", Durability.USE_DEFAULT, Durability.class),
 
-  TELEMETRY_INPUT_FORMAT("profiler.batch.input.format", "text", String.class),
+  TELEMETRY_INPUT_READER("profiler.batch.input.reader", JSON.toString(), String.class),
+
+  TELEMETRY_INPUT_FORMAT("profiler.batch.input.format", "", String.class),
 
-  TELEMETRY_INPUT_PATH("profiler.batch.input.path", "hdfs://localhost:9000/apps/metron/indexing/indexed/*/*", String.class),
+  TELEMETRY_INPUT_PATH("profiler.batch.input.path", "hdfs://localhost:8020/apps/metron/indexing/indexed/*/*", String.class),
 
   TELEMETRY_INPUT_BEGIN("profiler.batch.input.begin", "", String.class),
 

http://git-wip-us.apache.org/repos/asf/metron/blob/3467ffdb/metron-analytics/metron-profiler-spark/src/main/java/org/apache/metron/profiler/spark/reader/ColumnEncodedTelemetryReader.java
----------------------------------------------------------------------
diff --git a/metron-analytics/metron-profiler-spark/src/main/java/org/apache/metron/profiler/spark/reader/ColumnEncodedTelemetryReader.java b/metron-analytics/metron-profiler-spark/src/main/java/org/apache/metron/profiler/spark/reader/ColumnEncodedTelemetryReader.java
new file mode 100644
index 0000000..41bf9b1
--- /dev/null
+++ b/metron-analytics/metron-profiler-spark/src/main/java/org/apache/metron/profiler/spark/reader/ColumnEncodedTelemetryReader.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.metron.profiler.spark.reader;
+
+import com.google.common.collect.Maps;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.SparkSession;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.lang.invoke.MethodHandles;
+import java.util.Properties;
+
+import static org.apache.metron.profiler.spark.BatchProfilerConfig.TELEMETRY_INPUT_FORMAT;
+import static org.apache.metron.profiler.spark.BatchProfilerConfig.TELEMETRY_INPUT_PATH;
+
+/**
+ * Reads in a {@link Dataset} then converts all of the {@link Dataset}'s column
+ * into a single JSON-formatted string.
+ *
+ * <p>This {@link TelemetryReader} is useful for any column-oriented format that
+ * is supported by Spark.  For example, ORC and Parquet.
+ */
+public class ColumnEncodedTelemetryReader implements TelemetryReader {
+
+  protected static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+  /**
+   * The input format to use when reading telemetry.
+   */
+  private String inputFormat;
+
+  /**
+   * Creates a {@link ColumnEncodedTelemetryReader}.
+   *
+   * <p>The input format used to read the telemetry is defined by the
+   * BatchProfilerConfig.TELEMETRY_INPUT_PATH property.
+   */
+  public ColumnEncodedTelemetryReader() {
+    this.inputFormat = null;
+  }
+
+  /**
+   * Creates a {@link ColumnEncodedTelemetryReader}.
+   *
+   * @param inputFormat The input format to use when reading telemetry.
+   */
+  public ColumnEncodedTelemetryReader(String inputFormat) {
+    this.inputFormat = inputFormat;
+  }
+
+  @Override
+  public Dataset<String> read(SparkSession spark, Properties profilerProps, Properties readerProps) {
+    String inputPath = TELEMETRY_INPUT_PATH.get(profilerProps, String.class);
+    if(inputFormat == null) {
+      inputFormat = TELEMETRY_INPUT_FORMAT.get(profilerProps, String.class);
+    }
+    LOG.debug("Loading telemetry; inputPath={}, inputFormat={}", inputPath, inputFormat);
+
+    return spark
+            .read()
+            .options(Maps.fromProperties(readerProps))
+            .format(inputFormat)
+            .load(inputPath)
+            .toJSON();
+  }
+}

http://git-wip-us.apache.org/repos/asf/metron/blob/3467ffdb/metron-analytics/metron-profiler-spark/src/main/java/org/apache/metron/profiler/spark/reader/TelemetryReader.java
----------------------------------------------------------------------
diff --git a/metron-analytics/metron-profiler-spark/src/main/java/org/apache/metron/profiler/spark/reader/TelemetryReader.java b/metron-analytics/metron-profiler-spark/src/main/java/org/apache/metron/profiler/spark/reader/TelemetryReader.java
new file mode 100644
index 0000000..18861c0
--- /dev/null
+++ b/metron-analytics/metron-profiler-spark/src/main/java/org/apache/metron/profiler/spark/reader/TelemetryReader.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.metron.profiler.spark.reader;
+
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.SparkSession;
+
+import java.io.Serializable;
+import java.util.Properties;
+
+/**
+ * A {@link TelemetryReader} is responsible for creating a {@link Dataset} containing
+ * telemetry that can be consumed by the {@link org.apache.metron.profiler.spark.BatchProfiler}.
+ */
+public interface TelemetryReader extends Serializable {
+
+  /**
+   * Read in the telemetry
+   *
+   * @param spark The spark session.
+   * @param profilerProps The profiler properties.
+   * @param readerProps The properties specific to reading input data.
+   * @return A {@link Dataset} containing archived telemetry.
+   */
+  Dataset<String> read(SparkSession spark, Properties profilerProps, Properties readerProps);
+}

http://git-wip-us.apache.org/repos/asf/metron/blob/3467ffdb/metron-analytics/metron-profiler-spark/src/main/java/org/apache/metron/profiler/spark/reader/TelemetryReaders.java
----------------------------------------------------------------------
diff --git a/metron-analytics/metron-profiler-spark/src/main/java/org/apache/metron/profiler/spark/reader/TelemetryReaders.java b/metron-analytics/metron-profiler-spark/src/main/java/org/apache/metron/profiler/spark/reader/TelemetryReaders.java
new file mode 100644
index 0000000..0646155
--- /dev/null
+++ b/metron-analytics/metron-profiler-spark/src/main/java/org/apache/metron/profiler/spark/reader/TelemetryReaders.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.metron.profiler.spark.reader;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.SparkSession;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.lang.invoke.MethodHandles;
+import java.util.Properties;
+import java.util.function.Supplier;
+
+/**
+ * Allows a user to easily define the value of the property
+ * {@link org.apache.metron.profiler.spark.BatchProfilerConfig#TELEMETRY_INPUT_READER}.
+ */
+public enum TelemetryReaders implements TelemetryReader {
+
+  /**
+   * A {@link TelemetryReader} that is able to consume text/json formatted data.
+   *
+   * <p>This serves as a configuration short-cut for users. The user only needs to define the
+   * {@link org.apache.metron.profiler.spark.BatchProfilerConfig#TELEMETRY_INPUT_READER} property
+   * with this value to consume text/json.
+   */
+  JSON(() -> new TextEncodedTelemetryReader("text")),
+
+  /**
+   * A {@link TelemetryReader} that is able to consume Apache ORC formatted data.
+   *
+   * <p>This serves as a configuration short-cut for users. The user only needs to define the
+   * {@link org.apache.metron.profiler.spark.BatchProfilerConfig#TELEMETRY_INPUT_READER} property
+   * with this value to consume Apache ORC formatted data.
+   */
+  ORC(() -> new ColumnEncodedTelemetryReader("org.apache.spark.sql.execution.datasources.orc")),
+
+  /**
+   * A {@link TelemetryReader} that is able to consume Apache Parquet formatted data.
+   *
+   * <p>This serves as a configuration short-cut for users. The user only needs to define the
+   * {@link org.apache.metron.profiler.spark.BatchProfilerConfig#TELEMETRY_INPUT_READER} property
+   * with this value to consume Apache Parquet formatted data.
+   */
+  PARQUET(() -> new ColumnEncodedTelemetryReader("parquet")),
+
+  /**
+   * Use a {@link TextEncodedTelemetryReader} by defining the property value as 'TEXT'.
+   */
+  TEXT(() -> new TextEncodedTelemetryReader()),
+
+  /**
+   * Use a {@link ColumnEncodedTelemetryReader} by defining the property value as 'COLUMNAR'.
+   */
+  COLUMNAR(() -> new ColumnEncodedTelemetryReader());
+
+  static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+  private Supplier<TelemetryReader> supplier;
+
+  private TelemetryReaders(Supplier<TelemetryReader> supplier) {
+    this.supplier = supplier;
+  }
+
+  /**
+   * Returns a {@link TelemetryReader} based on a property value.
+   *
+   * @param propertyValue The property value.
+   * @return A {@link TelemetryReader}
+   * @throws IllegalArgumentException If the property value is invalid.
+   */
+  public static TelemetryReader create(String propertyValue) {
+    LOG.debug("Creating telemetry reader: telemetryReader={}", propertyValue);
+    TelemetryReader reader = null;
+    try {
+      String key = StringUtils.upperCase(propertyValue);
+      TelemetryReaders strategy = TelemetryReaders.valueOf(key);
+      reader = strategy.supplier.get();
+
+    } catch(IllegalArgumentException e) {
+      LOG.error("Unexpected telemetry reader: telemetryReader=" + propertyValue, e);
+      throw e;
+    }
+
+    return reader;
+  }
+
+  @Override
+  public Dataset<String> read(SparkSession spark, Properties profilerProps, Properties readerProps) {
+    return supplier.get().read(spark, profilerProps, readerProps);
+  }
+}

http://git-wip-us.apache.org/repos/asf/metron/blob/3467ffdb/metron-analytics/metron-profiler-spark/src/main/java/org/apache/metron/profiler/spark/reader/TextEncodedTelemetryReader.java
----------------------------------------------------------------------
diff --git a/metron-analytics/metron-profiler-spark/src/main/java/org/apache/metron/profiler/spark/reader/TextEncodedTelemetryReader.java b/metron-analytics/metron-profiler-spark/src/main/java/org/apache/metron/profiler/spark/reader/TextEncodedTelemetryReader.java
new file mode 100644
index 0000000..1f10b97
--- /dev/null
+++ b/metron-analytics/metron-profiler-spark/src/main/java/org/apache/metron/profiler/spark/reader/TextEncodedTelemetryReader.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.metron.profiler.spark.reader;
+
+import com.google.common.collect.Maps;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Encoders;
+import org.apache.spark.sql.SparkSession;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.lang.invoke.MethodHandles;
+import java.util.Properties;
+
+import static org.apache.metron.profiler.spark.BatchProfilerConfig.TELEMETRY_INPUT_FORMAT;
+import static org.apache.metron.profiler.spark.BatchProfilerConfig.TELEMETRY_INPUT_PATH;
+
+/**
+ * A {@link TelemetryReader} that consumes telemetry stored as raw text.
+ *
+ * <p>This {@link TelemetryReader} is useful for any text-encoded formats like JSON and CSV.
+ */
+public class TextEncodedTelemetryReader implements TelemetryReader {
+
+  protected static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+  /**
+   * The input format to use when reading telemetry.
+   */
+  private String inputFormat;
+
+  /**
+   * Creates a {@link TextEncodedTelemetryReader}.
+   *
+   * <p>The input format used to read the telemetry is defined by the
+   * BatchProfilerConfig.TELEMETRY_INPUT_PATH property.
+   */
+  public TextEncodedTelemetryReader() {
+    this.inputFormat = null;
+  }
+
+  /**
+   * Creates a {@link TextEncodedTelemetryReader}.
+   *
+   * @param inputFormat The input format to use when reading telemetry.
+   */
+  public TextEncodedTelemetryReader(String inputFormat) {
+    this.inputFormat = inputFormat;
+  }
+
+  @Override
+  public Dataset<String> read(SparkSession spark, Properties profilerProps, Properties readerProps) {
+    String inputPath = TELEMETRY_INPUT_PATH.get(profilerProps, String.class);
+    if(inputFormat == null) {
+      inputFormat = TELEMETRY_INPUT_FORMAT.get(profilerProps, String.class);
+    }
+    LOG.debug("Loading telemetry; inputPath={}, inputFormat={}", inputPath, inputFormat);
+
+    return spark
+            .read()
+            .options(Maps.fromProperties(readerProps))
+            .format(inputFormat)
+            .load(inputPath)
+            .as(Encoders.STRING());
+  }
+}

http://git-wip-us.apache.org/repos/asf/metron/blob/3467ffdb/metron-analytics/metron-profiler-spark/src/test/java/org/apache/metron/profiler/spark/BatchProfilerIntegrationTest.java
----------------------------------------------------------------------
diff --git a/metron-analytics/metron-profiler-spark/src/test/java/org/apache/metron/profiler/spark/BatchProfilerIntegrationTest.java b/metron-analytics/metron-profiler-spark/src/test/java/org/apache/metron/profiler/spark/BatchProfilerIntegrationTest.java
index c33644f..83800af 100644
--- a/metron-analytics/metron-profiler-spark/src/test/java/org/apache/metron/profiler/spark/BatchProfilerIntegrationTest.java
+++ b/metron-analytics/metron-profiler-spark/src/test/java/org/apache/metron/profiler/spark/BatchProfilerIntegrationTest.java
@@ -58,8 +58,11 @@ import static org.apache.metron.profiler.spark.BatchProfilerConfig.TELEMETRY_INP
 import static org.apache.metron.profiler.spark.BatchProfilerConfig.TELEMETRY_INPUT_END;
 import static org.apache.metron.profiler.spark.BatchProfilerConfig.TELEMETRY_INPUT_FORMAT;
 import static org.apache.metron.profiler.spark.BatchProfilerConfig.TELEMETRY_INPUT_PATH;
+import static org.apache.metron.profiler.spark.BatchProfilerConfig.TELEMETRY_INPUT_READER;
 import static org.junit.Assert.assertTrue;
 
+import static org.apache.metron.profiler.spark.reader.TelemetryReaders.*;
+
 /**
  * An integration test for the {@link BatchProfiler}.
  */
@@ -159,8 +162,8 @@ public class BatchProfilerIntegrationTest {
   @Test
   public void testBatchProfilerWithJSON() throws Exception {
     // the input telemetry is text/json stored in the local filesystem
+    profilerProperties.put(TELEMETRY_INPUT_READER.getKey(), JSON.toString());
     profilerProperties.put(TELEMETRY_INPUT_PATH.getKey(), "src/test/resources/telemetry.json");
-    profilerProperties.put(TELEMETRY_INPUT_FORMAT.getKey(), "text");
 
     BatchProfiler profiler = new BatchProfiler();
     profiler.run(spark, profilerProperties, getGlobals(), readerProperties, getProfile());
@@ -170,20 +173,41 @@ public class BatchProfilerIntegrationTest {
 
   @Test
   public void testBatchProfilerWithORC() throws Exception {
-    // re-write the test data as ORC
+    // re-write the test data as column-oriented ORC
     String pathToORC = tempFolder.getRoot().getAbsolutePath();
     spark.read()
-            .format("text")
+            .format("json")
             .load("src/test/resources/telemetry.json")
-            .as(Encoders.STRING())
             .write()
             .mode("overwrite")
             .format("org.apache.spark.sql.execution.datasources.orc")
             .save(pathToORC);
 
     // tell the profiler to use the ORC input data
+    profilerProperties.put(TELEMETRY_INPUT_READER.getKey(), ORC.toString());
     profilerProperties.put(TELEMETRY_INPUT_PATH.getKey(), pathToORC);
-    profilerProperties.put(TELEMETRY_INPUT_FORMAT.getKey(), "org.apache.spark.sql.execution.datasources.orc");
+
+    BatchProfiler profiler = new BatchProfiler();
+    profiler.run(spark, profilerProperties, getGlobals(), readerProperties, getProfile());
+
+    validateProfiles();
+  }
+
+  @Test
+  public void testBatchProfilerWithParquet() throws Exception {
+    // re-write the test data as column-oriented ORC
+    String inputPath = tempFolder.getRoot().getAbsolutePath();
+    spark.read()
+            .format("json")
+            .load("src/test/resources/telemetry.json")
+            .write()
+            .mode("overwrite")
+            .format("parquet")
+            .save(inputPath);
+
+    // tell the profiler to use the ORC input data
+    profilerProperties.put(TELEMETRY_INPUT_READER.getKey(), PARQUET.toString());
+    profilerProperties.put(TELEMETRY_INPUT_PATH.getKey(), inputPath);
 
     BatchProfiler profiler = new BatchProfiler();
     profiler.run(spark, profilerProperties, getGlobals(), readerProperties, getProfile());
@@ -206,7 +230,9 @@ public class BatchProfilerIntegrationTest {
             .save(pathToCSV);
 
     // tell the profiler to use the CSV input data
+    // CSV is an example of needing to define both the reader and the input format
     profilerProperties.put(TELEMETRY_INPUT_PATH.getKey(), pathToCSV);
+    profilerProperties.put(TELEMETRY_INPUT_READER.getKey(), "text");
     profilerProperties.put(TELEMETRY_INPUT_FORMAT.getKey(), "csv");
 
     // set a reader property; tell the reader to expect a header

http://git-wip-us.apache.org/repos/asf/metron/blob/3467ffdb/metron-analytics/metron-profiler-spark/src/test/java/org/apache/metron/profiler/spark/function/reader/ColumnEncodedTelemetryReaderTest.java
----------------------------------------------------------------------
diff --git a/metron-analytics/metron-profiler-spark/src/test/java/org/apache/metron/profiler/spark/function/reader/ColumnEncodedTelemetryReaderTest.java b/metron-analytics/metron-profiler-spark/src/test/java/org/apache/metron/profiler/spark/function/reader/ColumnEncodedTelemetryReaderTest.java
new file mode 100644
index 0000000..0bcfb3f
--- /dev/null
+++ b/metron-analytics/metron-profiler-spark/src/test/java/org/apache/metron/profiler/spark/function/reader/ColumnEncodedTelemetryReaderTest.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.metron.profiler.spark.function.reader;
+
+import org.apache.metron.profiler.spark.reader.TelemetryReaders;
+import org.apache.spark.SparkConf;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.SparkSession;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.util.Properties;
+
+import static org.apache.metron.profiler.spark.BatchProfilerConfig.TELEMETRY_INPUT_FORMAT;
+import static org.apache.metron.profiler.spark.BatchProfilerConfig.TELEMETRY_INPUT_PATH;
+
+/**
+ * Tests the {@link org.apache.metron.profiler.spark.reader.ColumnEncodedTelemetryReader} class.
+ */
+public class ColumnEncodedTelemetryReaderTest {
+
+  @Rule
+  public TemporaryFolder tempFolder = new TemporaryFolder();
+  private static SparkSession spark;
+  private Properties profilerProperties;
+  private Properties readerProperties;
+
+  @BeforeClass
+  public static void setupSpark() {
+    SparkConf conf = new SparkConf()
+            .setMaster("local")
+            .setAppName("BatchProfilerIntegrationTest")
+            .set("spark.sql.shuffle.partitions", "8");
+    spark = SparkSession
+            .builder()
+            .config(conf)
+            .getOrCreate();
+  }
+
+  @AfterClass
+  public static void tearDownSpark() {
+    if(spark != null) {
+      spark.close();
+    }
+  }
+
+  @Before
+  public void setup() {
+    readerProperties = new Properties();
+    profilerProperties = new Properties();
+  }
+
+  @Test
+  public void testParquet() {
+    // re-write the test data as column-oriented ORC
+    String inputPath = tempFolder.getRoot().getAbsolutePath();
+    spark.read()
+            .format("json")
+            .load("src/test/resources/telemetry.json")
+            .write()
+            .mode("overwrite")
+            .format("parquet")
+            .save(inputPath);
+
+    // tell the profiler to use the CSV input data
+    profilerProperties.put(TELEMETRY_INPUT_PATH.getKey(), inputPath);
+    profilerProperties.put(TELEMETRY_INPUT_FORMAT.getKey(), "parquet");
+
+    // set a reader property; tell the reader to expect a header
+    readerProperties.put("header", "true");
+
+    // there should be 100 valid JSON records
+    Dataset<String> telemetry = TelemetryReaders.COLUMNAR.read(spark, profilerProperties, readerProperties);
+    Assert.assertEquals(100, telemetry.filter(new IsValidJSON()).count());
+  }
+
+  @Test
+  public void testORC() {
+    // re-write the test data as column-oriented ORC
+    String pathToORC = tempFolder.getRoot().getAbsolutePath();
+    spark.read()
+            .format("json")
+            .load("src/test/resources/telemetry.json")
+            .write()
+            .mode("overwrite")
+            .format("org.apache.spark.sql.execution.datasources.orc")
+            .save(pathToORC);
+
+    // tell the profiler to use the CSV input data
+    profilerProperties.put(TELEMETRY_INPUT_PATH.getKey(), pathToORC);
+    profilerProperties.put(TELEMETRY_INPUT_FORMAT.getKey(), "org.apache.spark.sql.execution.datasources.orc");
+
+    // there should be 100 valid JSON records
+    Dataset<String> telemetry = TelemetryReaders.COLUMNAR.read(spark, profilerProperties, readerProperties);
+    Assert.assertEquals(100, telemetry.filter(new IsValidJSON()).count());
+  }
+}

http://git-wip-us.apache.org/repos/asf/metron/blob/3467ffdb/metron-analytics/metron-profiler-spark/src/test/java/org/apache/metron/profiler/spark/function/reader/IsValidJSON.java
----------------------------------------------------------------------
diff --git a/metron-analytics/metron-profiler-spark/src/test/java/org/apache/metron/profiler/spark/function/reader/IsValidJSON.java b/metron-analytics/metron-profiler-spark/src/test/java/org/apache/metron/profiler/spark/function/reader/IsValidJSON.java
new file mode 100644
index 0000000..50144f9
--- /dev/null
+++ b/metron-analytics/metron-profiler-spark/src/test/java/org/apache/metron/profiler/spark/function/reader/IsValidJSON.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.metron.profiler.spark.function.reader;
+
+import org.apache.spark.api.java.function.FilterFunction;
+import org.json.simple.JSONObject;
+import org.json.simple.parser.JSONParser;
+
+/**
+ * A filter function that filters out invalid JSON records.
+ */
+public class IsValidJSON implements FilterFunction<String> {
+
+  @Override
+  public boolean call(String text) throws Exception {
+    JSONParser parser = new JSONParser();
+    JSONObject json = (JSONObject) parser.parse(text);
+
+    // all of the test data has at least 32 fields in each JSON record
+    return json.keySet().size() >= 32;
+  }
+}

http://git-wip-us.apache.org/repos/asf/metron/blob/3467ffdb/metron-analytics/metron-profiler-spark/src/test/java/org/apache/metron/profiler/spark/function/reader/TelemetryReadersTest.java
----------------------------------------------------------------------
diff --git a/metron-analytics/metron-profiler-spark/src/test/java/org/apache/metron/profiler/spark/function/reader/TelemetryReadersTest.java b/metron-analytics/metron-profiler-spark/src/test/java/org/apache/metron/profiler/spark/function/reader/TelemetryReadersTest.java
new file mode 100644
index 0000000..e525ae0
--- /dev/null
+++ b/metron-analytics/metron-profiler-spark/src/test/java/org/apache/metron/profiler/spark/function/reader/TelemetryReadersTest.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.metron.profiler.spark.function.reader;
+
+import org.apache.metron.profiler.spark.reader.ColumnEncodedTelemetryReader;
+import org.apache.metron.profiler.spark.reader.TelemetryReaders;
+import org.apache.metron.profiler.spark.reader.TextEncodedTelemetryReader;
+import org.junit.Assert;
+import org.junit.Test;
+
+import static org.apache.metron.profiler.spark.reader.TelemetryReaders.COLUMNAR;
+import static org.apache.metron.profiler.spark.reader.TelemetryReaders.JSON;
+import static org.apache.metron.profiler.spark.reader.TelemetryReaders.ORC;
+import static org.apache.metron.profiler.spark.reader.TelemetryReaders.PARQUET;
+import static org.apache.metron.profiler.spark.reader.TelemetryReaders.TEXT;
+
+public class TelemetryReadersTest {
+
+  @Test
+  public void testJsonReader() {
+    String key = JSON.toString();
+    Assert.assertTrue(TelemetryReaders.create(key) instanceof TextEncodedTelemetryReader);
+  }
+
+  @Test
+  public void testJsonReaderLowerCase() {
+    String key = JSON.toString().toLowerCase();
+    Assert.assertTrue(TelemetryReaders.create(key) instanceof TextEncodedTelemetryReader);
+  }
+
+  @Test
+  public void testOrcReader() {
+    String key = ORC.toString();
+    Assert.assertTrue(TelemetryReaders.create(key) instanceof ColumnEncodedTelemetryReader);
+  }
+
+
+  @Test
+  public void testOrcReaderLowerCase() {
+    String key = ORC.toString().toLowerCase();
+    Assert.assertTrue(TelemetryReaders.create(key) instanceof ColumnEncodedTelemetryReader);
+  }
+
+  @Test
+  public void testParquetReader() {
+    String key = PARQUET.toString();
+    Assert.assertTrue(TelemetryReaders.create(key) instanceof ColumnEncodedTelemetryReader);
+  }
+
+  @Test
+  public void testParquetReaderLowerCase() {
+    String key = PARQUET.toString().toLowerCase();
+    Assert.assertTrue(TelemetryReaders.create(key) instanceof ColumnEncodedTelemetryReader);
+  }
+
+  @Test
+  public void testTextReader() {
+    String key = TEXT.toString();
+    Assert.assertTrue(TelemetryReaders.create(key) instanceof TextEncodedTelemetryReader);
+  }
+
+  @Test
+  public void testColumnReader() {
+    String key = COLUMNAR.toString();
+    Assert.assertTrue(TelemetryReaders.create(key) instanceof ColumnEncodedTelemetryReader);
+  }
+
+  @Test(expected = IllegalArgumentException.class)
+  public void testInvalidReader() {
+    TelemetryReaders.create("invalid");
+    Assert.fail("exception expected");
+  }
+}

http://git-wip-us.apache.org/repos/asf/metron/blob/3467ffdb/metron-analytics/metron-profiler-spark/src/test/java/org/apache/metron/profiler/spark/function/reader/TextEncodedTelemetryReaderTest.java
----------------------------------------------------------------------
diff --git a/metron-analytics/metron-profiler-spark/src/test/java/org/apache/metron/profiler/spark/function/reader/TextEncodedTelemetryReaderTest.java b/metron-analytics/metron-profiler-spark/src/test/java/org/apache/metron/profiler/spark/function/reader/TextEncodedTelemetryReaderTest.java
new file mode 100644
index 0000000..3b26bb9
--- /dev/null
+++ b/metron-analytics/metron-profiler-spark/src/test/java/org/apache/metron/profiler/spark/function/reader/TextEncodedTelemetryReaderTest.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.metron.profiler.spark.function.reader;
+
+import org.apache.metron.profiler.spark.reader.TelemetryReaders;
+import org.apache.spark.SparkConf;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Encoders;
+import org.apache.spark.sql.SparkSession;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.util.Properties;
+
+import static org.apache.metron.profiler.spark.BatchProfilerConfig.TELEMETRY_INPUT_FORMAT;
+import static org.apache.metron.profiler.spark.BatchProfilerConfig.TELEMETRY_INPUT_PATH;
+
+/**
+ * Tests the {@link org.apache.metron.profiler.spark.reader.TextEncodedTelemetryReader} class.
+ */
+public class TextEncodedTelemetryReaderTest {
+
+  @Rule
+  public TemporaryFolder tempFolder = new TemporaryFolder();
+  private static SparkSession spark;
+  private Properties profilerProperties;
+  private Properties readerProperties;
+
+  @BeforeClass
+  public static void setupSpark() {
+    SparkConf conf = new SparkConf()
+            .setMaster("local")
+            .setAppName("BatchProfilerIntegrationTest")
+            .set("spark.sql.shuffle.partitions", "8");
+    spark = SparkSession
+            .builder()
+            .config(conf)
+            .getOrCreate();
+  }
+
+  @AfterClass
+  public static void tearDownSpark() {
+    if(spark != null) {
+      spark.close();
+    }
+  }
+
+  @Before
+  public void setup() {
+    readerProperties = new Properties();
+    profilerProperties = new Properties();
+  }
+
+  @Test
+  public void testCSV() {
+    // re-write the test data as a CSV with a header record
+    String pathToCSV = tempFolder.getRoot().getAbsolutePath();
+    spark.read()
+            .format("text")
+            .load("src/test/resources/telemetry.json")
+            .as(Encoders.STRING())
+            .write()
+            .mode("overwrite")
+            .option("header", "true")
+            .format("csv")
+            .save(pathToCSV);
+
+    // tell the profiler to use the CSV input data
+    profilerProperties.put(TELEMETRY_INPUT_PATH.getKey(), pathToCSV);
+    profilerProperties.put(TELEMETRY_INPUT_FORMAT.getKey(), "csv");
+
+    // set a reader property; tell the reader to expect a header
+    readerProperties.put("header", "true");
+
+    // there should be 100 valid JSON records
+    Dataset<String> telemetry = TelemetryReaders.TEXT.read(spark, profilerProperties, readerProperties);
+    Assert.assertEquals(100, telemetry.filter(new IsValidJSON()).count());
+  }
+
+  @Test
+  public void testJSON() {
+    // use the test telemetry that is stored as raw json
+    profilerProperties.put(TELEMETRY_INPUT_PATH.getKey(), "src/test/resources/telemetry.json");
+    profilerProperties.put(TELEMETRY_INPUT_FORMAT.getKey(), "text");
+
+    // set a reader property; tell the reader to expect a header
+    readerProperties.put("header", "true");
+
+    // there should be 100 valid JSON records
+    Dataset<String> telemetry = TelemetryReaders.TEXT.read(spark, profilerProperties, readerProperties);
+    Assert.assertEquals(100, telemetry.filter(new IsValidJSON()).count());
+  }
+}