You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@metron.apache.org by ni...@apache.org on 2018/08/23 21:58:56 UTC

[4/4] metron git commit: METRON-1707 Port Profiler to Spark (nickwallen) closes apache/metron#1150

METRON-1707 Port Profiler to Spark (nickwallen) closes apache/metron#1150


Project: http://git-wip-us.apache.org/repos/asf/metron/repo
Commit: http://git-wip-us.apache.org/repos/asf/metron/commit/3bfbf018
Tree: http://git-wip-us.apache.org/repos/asf/metron/tree/3bfbf018
Diff: http://git-wip-us.apache.org/repos/asf/metron/diff/3bfbf018

Branch: refs/heads/feature/METRON-1699-create-batch-profiler
Commit: 3bfbf018a9c3e1c74dc934901446b5111a0ada03
Parents: 6fb50a1
Author: nickwallen <ni...@nickallen.org>
Authored: Thu Aug 23 17:58:18 2018 -0400
Committer: nickallen <ni...@apache.org>
Committed: Thu Aug 23 17:58:18 2018 -0400

----------------------------------------------------------------------
 dependencies_with_url.csv                       |  64 +++++-
 .../profiler/DefaultMessageDistributor.java     |   1 -
 .../apache/metron/profiler/MessageRoute.java    |  10 +-
 metron-analytics/metron-profiler-spark/pom.xml  | 195 +++++++++++++++++++
 .../metron/profiler/spark/BatchProfiler.java    | 102 ++++++++++
 .../profiler/spark/BatchProfilerConfig.java     | 190 ++++++++++++++++++
 .../spark/ProfileMeasurementAdapter.java        | 132 +++++++++++++
 .../spark/function/GroupByPeriodFunction.java   |  60 ++++++
 .../spark/function/HBaseWriterFunction.java     | 171 ++++++++++++++++
 .../spark/function/MessageRouterFunction.java   | 113 +++++++++++
 .../spark/function/ProfileBuilderFunction.java  | 107 ++++++++++
 .../profiler/spark/function/TaskUtils.java      |  41 ++++
 .../spark/BatchProfilerIntegrationTest.java     | 111 +++++++++++
 .../spark/function/HBaseWriterFunctionTest.java | 176 +++++++++++++++++
 .../function/MessageRouterFunctionTest.java     | 114 +++++++++++
 .../function/ProfileBuilderFunctionTest.java    |  98 ++++++++++
 .../src/test/resources/log4j.properties         |  31 +++
 .../src/test/resources/telemetry.json           | 100 ++++++++++
 metron-analytics/pom.xml                        |   1 +
 .../configuration/profiler/ProfileResult.java   |   4 +
 .../profiler/ProfileResultExpressions.java      |   4 +
 pom.xml                                         |   1 +
 22 files changed, 1822 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/metron/blob/3bfbf018/dependencies_with_url.csv
----------------------------------------------------------------------
diff --git a/dependencies_with_url.csv b/dependencies_with_url.csv
index 6ac1f23..6b4385b 100644
--- a/dependencies_with_url.csv
+++ b/dependencies_with_url.csv
@@ -33,13 +33,18 @@ com.maxmind.geoip2:geoip2:jar:2.8.0:compile,Apache v2,https://github.com/maxmind
 com.sun.xml.bind:jaxb-impl:jar:2.2.3-1:compile,CDDL,http://jaxb.java.net/
 com.sun.xml.bind:jaxb-impl:jar:2.2.5-2:compile,CDDL,http://jaxb.java.net/
 com.twitter:jsr166e:jar:1.1.0:compile,CC0 1.0 Universal,http://github.com/twitter/jsr166e
+com.twitter:chill-java:jar:0.8.4:compile,ASLv2,https://github.com/twitter/chill
+com.twitter:chill_2.11:jar:0.8.4:compile,ASLv2,https://github.com/twitter/chill
 it.unimi.dsi:fastutil:jar:7.0.6:compile,ASLv2,https://github.com/vigna/fastutil
 javassist:javassist:jar:3.12.1.GA:compile,Apache v2,http://www.javassist.org/
 javax.activation:activation:jar:1.1:compile,Common Development and Distribution License (CDDL) v1.0,http://java.sun.com/products/javabeans/jaf/index.jsp
+javax.activation:activation:jar:1.1.1:compile,Common Development and Distribution License (CDDL) v1.0,http://java.sun.com/products/javabeans/jaf/index.jsp
 javax.annotation:jsr250-api:jar:1.0:compile,COMMON DEVELOPMENT AND DISTRIBUTION LICENSE (CDDL) Version 1.0,http://jcp.org/aboutJava/communityprocess/final/jsr250/index.html
 javax.annotation:javax.annotation-api:jar:1.3.2:compile,CDDL 1.1,https://github.com/javaee/javax.annotation/
+javax.annotation:javax.annotation-api:jar:1.2:compile,CDDL 1.1,https://github.com/javaee/javax.annotation/
 javax.mail:mail:jar:1.4:compile,Common Development and Distribution License (CDDL) v1.0,https://glassfish.dev.java.net/javaee5/mail/
 javax.servlet:javax.servlet-api:jar:3.1.0:compile,CDDL,http://servlet-spec.java.net
+javax.ws.rs:javax.ws.rs-api:jar:2.0.1:compile,CDDL 1.1,https://github.com/jax-rs/api
 javax.xml.bind:jaxb-api:jar:2.2.11:compile,CDDL,http://jaxb.java.net/
 javax.xml.bind:jaxb-api:jar:2.2.2:compile,CDDL,https://jaxb.dev.java.net/
 javax.xml.bind:jaxb-api:jar:2.3.0:compile,CDDL,https://jaxb.dev.java.net/
@@ -47,25 +52,41 @@ javax.xml.stream:stax-api:jar:1.0-2:compile,COMMON DEVELOPMENT AND DISTRIBUTION
 jline:jline:jar:0.9.94:compile,BSD,http://jline.sourceforge.net
 junit:junit:jar:4.12:compile,Eclipse Public License 1.0,http://junit.org
 junit:junit:jar:4.4:compile,Common Public License Version 1.0,http://junit.org
+net.razorvine:pyrolite:jar:4.13:compile,MIT,https://github.com/irmen/Pyrolite
 net.sf.jopt-simple:jopt-simple:jar:3.2:compile,The MIT License,http://jopt-simple.sourceforge.net
 net.sf.jopt-simple:jopt-simple:jar:4.9:compile,The MIT License,http://jopt-simple.sourceforge.net
 net.sf.saxon:Saxon-HE:jar:9.5.1-5:compile,Mozilla Public License Version 2.0,http://www.saxonica.com/
 org.abego.treelayout:org.abego.treelayout.core:jar:1.0.1:compile,BSD 3-Clause "New" or "Revised" License (BSD-3-Clause),http://code.google.com/p/treelayout/
 org.adrianwalker:multiline-string:jar:0.1.2:compile,Common Public License Version 1.0,https://github.com/benelog/multiline
 org.antlr:antlr4-runtime:jar:4.5:compile,BSD 3-Clause License,http://www.antlr.org
+org.bouncycastle:bcprov-jdk15on:jar:1.52:compile,MIT,https://www.bouncycastle.org/license.html
 org.clojure:clojure:jar:1.6.0:compile,Eclipse Public License 1.0,http://clojure.org/
 org.clojure:clojure:jar:1.7.0:compile,Eclipse Public License 1.0,http://clojure.org/
 org.codehaus.jackson:jackson-jaxrs:jar:1.8.3:compile,Apache v2,http://jackson.codehaus.org
 org.codehaus.jackson:jackson-jaxrs:jar:1.9.13:compile,Apache v2,http://jackson.codehaus.org
 org.codehaus.jackson:jackson-xc:jar:1.8.3:compile,Apache v2,http://jackson.codehaus.org
 org.codehaus.jackson:jackson-xc:jar:1.9.13:compile,Apache v2,http://jackson.codehaus.org
+org.codehaus.janino:commons-compiler:jar:3.0.8:compile,New BSD,https://github.com/janino-compiler/janino
+org.codehaus.janino:janino:jar:3.0.8:compile,New BSD,https://github.com/janino-compiler/janino
 org.codehaus.woodstox:stax2-api:jar:3.1.4:compile,The BSD License,http://wiki.fasterxml.com/WoodstoxStax2
+org.json4s:json4s-ast_2.11:jar:3.2.11:compile,ASLv2,https://github.com/json4s/json4s
+org.json4s:json4s-core_2.11:jar:3.2.11:compile,ASLv2,https://github.com/json4s/json4s
+org.json4s:json4s-jackson_2.11:jar:3.2.11:compile,ASLv2,https://github.com/json4s/json4s
 org.jruby.jcodings:jcodings:jar:1.0.8:compile,MIT License,https://github.com/jruby/jcodings
 org.jruby.joni:joni:jar:2.1.2:compile,MIT License,https://github.com/jruby/joni
+org.lz4:lz4-java:jar:1.4.0:compile,ASLv2,https://github.com/lz4/lz4-java
 org.mitre.taxii:taxii:jar:1.1.0.1:compile,The BSD 3-Clause License,https://github.com/TAXIIProject/java-taxii
 org.mitre:stix:jar:1.2.0.2:compile,The BSD 3-Clause License,https://github.com/STIXProject/java-stix
 org.mockito:mockito-core:jar:1.10.19:compile,The MIT License,http://www.mockito.org
+org.roaringbitmap:RoaringBitmap:jar:0.5.11:compile,ASLv2,https://github.com/RoaringBitmap/RoaringBitmap
 org.scala-lang:scala-library:jar:2.10.6:compile,BSD-like,http://www.scala-lang.org/
+org.scala-lang.modules:scala-parser-combinators_2.11:jar:1.0.4:compile,BSD-like,http://www.scala-lang.org/
+org.scala-lang.modules:scala-xml_2.11:jar:1.0.1:compile,BSD-like,http://www.scala-lang.org/
+org.scala-lang:scala-compiler:jar:2.11.0:compile,BSD-like,http://www.scala-lang.org/
+org.scala-lang:scala-library:jar:2.11.8:compile,BSD-like,http://www.scala-lang.org/
+org.scala-lang:scala-reflect:jar:2.11.8:compile,BSD-like,http://www.scala-lang.org/
+org.scala-lang:scalap:jar:2.11.0:compile,BSD-like,http://www.scala-lang.org/
+oro:oro:jar:2.0.8:compile,ASLv2,http://attic.apache.org/projects/jakarta-oro.html
 xmlenc:xmlenc:jar:0.52:compile,The BSD License,http://xmlenc.sourceforge.net
 asm:asm:jar:3.1:compile,BSD,http://asm.ow2.org/
 com.sun.jersey.contribs:jersey-guice:jar:1.9:compile,CDDL 1.1,https://jersey.java.net/
@@ -103,7 +124,10 @@ org.slf4j:slf4j-log4j12:jar:1.7.5:compile,MIT,http://www.slf4j.org
 org.slf4j:slf4j-log4j12:jar:1.7.7:compile,MIT,http://www.slf4j.org
 org.slf4j:slf4j-simple:jar:1.7.7:compile,MIT,http://www.slf4j.org
 org.slf4j:jcl-over-slf4j:jar:1.7.7:compile,MIT,http://www.slf4j.org
+org.slf4j:jcl-over-slf4j:jar:1.7.16:compile,MIT,http://www.slf4j.org
 org.slf4j:jcl-over-slf4j:jar:1.7.21:compile,MIT,http://www.slf4j.org
+org.slf4j:jcl-over-slf4j:jar:1.7.21:compile,MIT,http://www.slf4j.org
+org.slf4j:jul-to-slf4j:jar:1.7.16:compile,MIT,http://www.slf4j.org
 org.slf4j:jul-to-slf4j:jar:1.7.21:compile,MIT,http://www.slf4j.org
 org.slf4j:jul-to-slf4j:jar:1.7.25:compile,MIT,http://www.slf4j.org
 aopalliance:aopalliance:jar:1.0:compile,Public Domain,http://aopalliance.sourceforge.net
@@ -113,7 +137,9 @@ com.github.tony19:named-regexp:jar:0.2.3:compile,Apache License, Version 2.0,
 com.google.code.findbugs:jsr305:jar:1.3.9:compile,The Apache Software License, Version 2.0,http://findbugs.sourceforge.net/
 com.google.code.findbugs:jsr305:jar:3.0.0:compile,The Apache Software License, Version 2.0,http://findbugs.sourceforge.net/
 com.google.code.findbugs:annotations:jar:2.0.1:compile,The Apache Software License, Version 2.0,http://findbugs.sourceforge.net/
-com.carrotsearch:hppc:jar:0.7.1:compile,ASLv2,
+com.carrotsearch:hppc:jar:0.7.1:compile,ASLv2,https://github.com/carrotsearch/hppc
+com.carrotsearch:hppc:jar:0.7.2:compile,ASLv2,https://github.com/carrotsearch/hppc
+com.clearspring.analytics:stream:jar:2.7.0:compile,ASLv2,https://github.com/addthis/stream-lib
 com.clearspring.analytics:stream:jar:2.9.5:compile,ASLv2,https://github.com/addthis/stream-lib
 com.codahale.metrics:metrics-core:jar:3.0.2:compile,MIT,https://github.com/codahale/metrics
 com.codahale.metrics:metrics-graphite:jar:3.0.2:compile,MIT,https://github.com/codahale/metrics
@@ -132,6 +158,7 @@ com.fasterxml.jackson.core:jackson-core:jar:2.9.4:compile,ASLv2,https://github.c
 com.fasterxml.jackson.core:jackson-core:jar:2.9.5:compile,ASLv2,https://github.com/FasterXML/jackson-core
 com.fasterxml.jackson.core:jackson-databind:jar:2.2.3:compile,ASLv2,http://wiki.fasterxml.com/JacksonHome
 com.fasterxml.jackson.core:jackson-databind:jar:2.4.3:compile,ASLv2,http://github.com/FasterXML/jackson
+com.fasterxml.jackson.core:jackson-databind:jar:2.6.7.1:compile,ASLv2,http://github.com/FasterXML/jackson
 com.fasterxml.jackson.core:jackson-databind:jar:2.7.4:compile,ASLv2,http://github.com/FasterXML/jackson
 com.fasterxml.jackson.core:jackson-databind:jar:2.8.3:compile,ASLv2,http://github.com/FasterXML/jackson
 com.fasterxml.jackson.core:jackson-databind:jar:2.9.4:compile,ASLv2,http://github.com/FasterXML/jackson
@@ -147,6 +174,8 @@ com.fasterxml.jackson.datatype:jackson-datatype-joda:jar:2.9.5:compile,ASLv2,htt
 com.fasterxml.jackson.datatype:jackson-datatype-jdk8:jar:2.9.5:compile,ASLv2,https://github.com/FasterXML/jackson-modules-java8
 com.fasterxml.jackson.datatype:jackson-datatype-jsr310:jar:2.9.5:compile,ASLv2,https://github.com/FasterXML/jackson-modules-java8
 com.fasterxml.jackson.module:jackson-module-parameter-names:jar:2.9.5:compile,ASLv2,https://github.com/FasterXML/jackson-modules-java8
+com.fasterxml.jackson.module:jackson-module-paranamer:jar:2.7.9:compile,ASLv2,https://github.com/FasterXML/jackson-modules-base
+com.fasterxml.jackson.module:jackson-module-scala_2.11:jar:2.6.7.1:compile,ASLv2,https://github.com/FasterXML/jackson-module-scala
 com.fasterxml:classmate:jar:1.3.1:compile,ASLv2,http://github.com/cowtowncoder/java-classmate
 com.fasterxml:classmate:jar:1.3.4:compile,ASLv2,http://github.com/cowtowncoder/java-classmate
 com.google.code.gson:gson:jar:2.2.4:compile,The Apache Software License, Version 2.0,http://code.google.com/p/google-gson/
@@ -164,10 +193,14 @@ com.lmax:disruptor:jar:3.3.2:compile,The Apache Software License, Version 2.0,ht
 com.googlecode.json-simple:json-simple:jar:1.1:compile,The Apache Software License, Version 2.0,http://code.google.com/p/json-simple/
 com.googlecode.json-simple:json-simple:jar:1.1.1:compile,The Apache Software License, Version 2.0,http://code.google.com/p/json-simple/
 com.jamesmurty.utils:java-xmlbuilder:jar:0.4:compile,Apache License, Version 2.0,http://code.google.com/p/java-xmlbuilder/
+com.jamesmurty.utils:java-xmlbuilder:jar:1.1:compile,Apache License, Version 2.0,http://code.google.com/p/java-xmlbuilder/
 com.ning:compress-lzf:jar:1.0.2:compile,Apache License 2.0,http://github.com/ning/compress
+com.ning:compress-lzf:jar:1.0.3:compile,Apache License 2.0,http://github.com/ning/compress
 com.opencsv:opencsv:jar:3.7:compile,Apache 2,http://opencsv.sf.net
 com.spatial4j:spatial4j:jar:0.5:compile,The Apache Software License, Version 2.0,
 com.tdunning:t-digest:jar:3.0:compile,The Apache Software License, Version 2.0,https://github.com/tdunning/t-digest
+com.univocity:univocity-parsers:jar:2.5.9:compile,ASLv2,https://github.com/uniVocity/univocity-parsers
+com.vlkan:flatbuffers:jar:1.2.0-3f79e055:compile,ASLv2,https://github.com/vy/flatbuffers
 com.yammer.metrics:metrics-core:jar:2.2.0:compile,ASLv2,
 commons-beanutils:commons-beanutils-core:jar:1.8.0:compile,ASLv2,http://commons.apache.org/beanutils/
 commons-beanutils:commons-beanutils-core:jar:1.8.0:provided,ASLv2,http://commons.apache.org/beanutils/
@@ -203,19 +236,28 @@ commons-lang:commons-lang:jar:2.6:provided,ASLv2,http://commons.apache.org/lang/
 commons-logging:commons-logging:jar:1.1.1:compile,ASLv2,http://commons.apache.org/logging
 commons-logging:commons-logging:jar:1.1.3:compile,ASLv2,http://commons.apache.org/proper/commons-logging/
 commons-logging:commons-logging:jar:1.2:compile,ASLv2,http://commons.apache.org/proper/commons-logging/
+commons-net:commons-net:jar:2.2:compile,ASLv2,http://commons.apache.org/net/
 commons-net:commons-net:jar:3.1:compile,ASLv2,http://commons.apache.org/net/
 commons-net:commons-net:jar:3.1:provided,ASLv2,http://commons.apache.org/net/
 commons-text:commons-text:jar:1.1:compile,ASLv2,http://commons.apache.org/proper/commons-text/
 commons-validator:commons-validator:jar:1.4.0:compile,ASLv2,http://commons.apache.org/validator/
 commons-validator:commons-validator:jar:1.5.1:compile,ASLv2,http://commons.apache.org/proper/commons-validator/
 commons-validator:commons-validator:jar:1.6:compile,ASLv2,http://commons.apache.org/proper/commons-validator/
+et.razorvine:pyrolite:jar:4.13:compile,MIT,https://github.com/irmen/Pyrolite
+io.airlift:aircompressor:jar:0.8:compile,ASLv2,https://github.com/airlift/aircompressor
 io.confluent:kafka-avro-serializer:jar:1.0:compile,ASLv2,https://github.com/confluentinc/schema-registry/
 io.confluent:kafka-schema-registry-client:jar:1.0:compile,ASLv2,https://github.com/confluentinc/schema-registry/
+io.dropwizard.metrics:metrics-core:jar:3.1.5:compile,ASLv2,https://github.com/dropwizard/metrics
+io.dropwizard.metrics:metrics-graphite:jar:3.1.5:compile,ASLv2,https://github.com/dropwizard/metrics
+io.dropwizard.metrics:metrics-json:jar:3.1.5:compile,ASLv2,https://github.com/dropwizard/metrics
+io.dropwizard.metrics:metrics-jvm:jar:3.1.5:compile,ASLv2,https://github.com/dropwizard/metrics
 io.netty:netty-all:jar:4.0.23.Final:compile,ASLv2,
 io.netty:netty-all:jar:4.0.23.Final:provided,ASLv2,
-io.netty:netty:jar:3.10.5.Final:compile,Apache License, Version 2.0,http://netty.io/
+io.netty:netty-all:jar:4.1.17.Final:compile,ASLv2,
 io.netty:netty:jar:3.6.2.Final:compile,Apache License, Version 2.0,http://netty.io/
 io.netty:netty:jar:3.7.0.Final:compile,Apache License, Version 2.0,http://netty.io/
+io.netty:netty:jar:3.9.9.Final:compile,Apache License, Version 2.0,http://netty.io/
+io.netty:netty:jar:3.10.5.Final:compile,Apache License, Version 2.0,http://netty.io/
 io.thekraken:grok:jar:0.1.0:compile,Apache License, Version 2.0,http://maven.apache.org
 javax.inject:javax.inject:jar:1:compile,The Apache Software License, Version 2.0,http://code.google.com/p/atinject/
 joda-time:joda-time:jar:2.3:compile,Apache 2,http://www.joda.org/joda-time/
@@ -224,9 +266,12 @@ joda-time:joda-time:jar:2.9.9:compile,Apache 2,http://www.joda.org/joda-time/
 log4j:log4j:jar:1.2.15:compile,The Apache Software License, Version 2.0,http://logging.apache.org:80/log4j/1.2/
 log4j:log4j:jar:1.2.16:compile,The Apache Software License, Version 2.0,http://logging.apache.org/log4j/1.2/
 log4j:log4j:jar:1.2.17:compile,The Apache Software License, Version 2.0,http://logging.apache.org/log4j/1.2/
+net.iharder:base64:jar:2.3.8:compile,Public Domain,http://iharder.sourceforge.net/current/java/base64/
 net.java.dev.jets3t:jets3t:jar:0.9.0:compile,Apache License, Version 2.0,http://www.jets3t.org
+net.java.dev.jets3t:jets3t:jar:0.9.4:compile,Apache License, Version 2.0,http://www.jets3t.org
 net.jpountz.lz4:lz4:jar:1.2.0:compile,The Apache Software License, Version 2.0,https://github.com/jpountz/lz4-java
 net.jpountz.lz4:lz4:jar:1.3.0:compile,The Apache Software License, Version 2.0,https://github.com/jpountz/lz4-java
+net.sf.py4j:py4j:jar:0.10.7:compile,,
 nl.jqno.equalsverifier:equalsverifier:jar:2.0.2:compile,The Apache Software License, Version 2.0,http://www.jqno.nl/equalsverifier
 org.codehaus.jackson:jackson-core-asl:jar:1.9.13:compile,The Apache Software License, Version 2.0,http://jackson.codehaus.org
 org.codehaus.jackson:jackson-mapper-asl:jar:1.9.13:compile,The Apache Software License, Version 2.0,http://jackson.codehaus.org
@@ -354,6 +399,7 @@ org.springframework.security:spring-security-core:jar:4.1.3.RELEASE:compile,ASLv
 org.springframework.security:spring-security-core:jar:5.0.4.RELEASE:compile,ASLv2,https://github.com/spring-projects/spring-security
 org.springframework.security:spring-security-web:jar:4.1.3.RELEASE:compile,ASLv2,https://github.com/spring-projects/spring-security
 org.springframework.security:spring-security-web:jar:5.0.4.RELEASE:compile,ASLv2,https://github.com/spring-projects/spring-security
+org.spark-project.spark:unused:jar:1.0.0:compile,ASLv2,https://spark.apache.org
 antlr:antlr:jar:2.7.7:compile,BSD 3-Clause License,http://www.antlr2.org
 com.h2database:h2:jar:1.4.192:compile,EPL 1.0,http://www.h2database.com/html/license.html
 com.h2database:h2:jar:1.4.197:compile,EPL 1.0,http://www.h2database.com/html/license.html
@@ -370,6 +416,7 @@ org.springframework.kafka:spring-kafka:jar:1.1.1.RELEASE:compile,ASLv2,https://g
 org.springframework.kafka:spring-kafka:jar:2.0.4.RELEASE:compile,ASLv2,https://github.com/spring-projects/spring-kafka
 ch.hsr:geohash:jar:1.3.0:compile,ASLv2,https://github.com/kungfoo/geohash-java
 org.locationtech.spatial4j:spatial4j:jar:0.6:compile,ASLv2,https://github.com/locationtech/spatial4j
+com.github.luben:zstd-jni:jar:1.3.2-2:compile,BSD,https://github.com/luben/zstd-jni
 com.github.spullara.mustache.java:compiler:jar:0.9.3:compile,ASLv2,https://github.com/spullara/mustache.java/blob/master/LICENSE
 io.netty:netty-buffer:jar:4.1.13.Final:compile,ASLv2,http://netty.io/
 io.netty:netty-codec-http:jar:4.1.13.Final:compile,ASLv2,http://netty.io/
@@ -395,6 +442,19 @@ org.elasticsearch:securesm:jar:1.1:compile,ASLv2,https://github.com/elastic/elas
 org.hdrhistogram:HdrHistogram:jar:2.1.9:compile,BSD,https://github.com/HdrHistogram/HdrHistogram/blob/master/LICENSE.txt
 com.trendmicro:tlsh:jar:3.7.1:compile,ASLv2,https://github.com/trendmicro/tlsh
 org.glassfish:javax.json:jar:1.0.4:compile,Common Development and Distribution License (CDDL) v1.0,https://github.com/javaee/jsonp
+org.glassfish.hk2.external:aopalliance-repackaged:jar:2.4.0-b34:compile,Common Development and Distribution License (CDDL) v1.0,https://github.com/javaee/hk2
+org.glassfish.hk2.external:javax.inject:jar:2.4.0-b34:compile,Common Development and Distribution License (CDDL) v1.0,https://github.com/javaee/hk2
+org.glassfish.hk2:hk2-api:jar:2.4.0-b34:compile,Common Development and Distribution License (CDDL) v1.0,https://github.com/javaee/hk2
+org.glassfish.hk2:hk2-locator:jar:2.4.0-b34:compile,Common Development and Distribution License (CDDL) v1.0,https://github.com/javaee/hk2
+org.glassfish.hk2:hk2-utils:jar:2.4.0-b34:compile,Common Development and Distribution License (CDDL) v1.0,https://github.com/javaee/hk2
+org.glassfish.hk2:osgi-resource-locator:jar:1.0.1:compile,Common Development and Distribution License (CDDL) v1.0,https://github.com/javaee/hk2
+org.glassfish.jersey.bundles.repackaged:jersey-guava:jar:2.22.2:compile
+org.glassfish.jersey.containers:jersey-container-servlet-core:jar:2.22.2:compile
+org.glassfish.jersey.containers:jersey-container-servlet:jar:2.22.2:compile
+org.glassfish.jersey.core:jersey-client:jar:2.22.2:compile,EPL 2.0,https://github.com/eclipse-ee4j/jersey
+org.glassfish.jersey.core:jersey-common:jar:2.22.2:compile,EPL 2.0,https://github.com/eclipse-ee4j/jersey
+org.glassfish.jersey.core:jersey-server:jar:2.22.2:compile,EPL 2.0,https://github.com/eclipse-ee4j/jersey
+org.glassfish.jersey.media:jersey-media-jaxb:jar:2.22.2:compile,EPL 2.0,https://github.com/eclipse-ee4j/jersey
 org.eclipse.persistence:javax.persistence:jar:2.1.1:compile,EPL 1.0,http://www.eclipse.org/eclipselink
 org.eclipse.persistence:org.eclipse.persistence.antlr:jar:2.6.4:compile,EPL 1.0,http://www.eclipse.org/eclipselink
 org.eclipse.persistence:org.eclipse.persistence.asm:jar:2.6.4:compile,EPL 1.0,http://www.eclipse.org/eclipselink

http://git-wip-us.apache.org/repos/asf/metron/blob/3bfbf018/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/DefaultMessageDistributor.java
----------------------------------------------------------------------
diff --git a/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/DefaultMessageDistributor.java b/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/DefaultMessageDistributor.java
index d950b07..673072b 100644
--- a/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/DefaultMessageDistributor.java
+++ b/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/DefaultMessageDistributor.java
@@ -28,7 +28,6 @@ import com.google.common.cache.RemovalNotification;
 import org.apache.commons.lang.builder.HashCodeBuilder;
 import org.apache.metron.common.configuration.profiler.ProfileConfig;
 import org.apache.metron.stellar.dsl.Context;
-import org.json.simple.JSONObject;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 

http://git-wip-us.apache.org/repos/asf/metron/blob/3bfbf018/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/MessageRoute.java
----------------------------------------------------------------------
diff --git a/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/MessageRoute.java b/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/MessageRoute.java
index e76b897..7cdb607 100644
--- a/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/MessageRoute.java
+++ b/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/MessageRoute.java
@@ -22,11 +22,11 @@ package org.apache.metron.profiler;
 
 import org.apache.commons.lang3.builder.EqualsBuilder;
 import org.apache.commons.lang3.builder.HashCodeBuilder;
-import org.apache.commons.lang3.builder.ToStringBuilder;
 import org.apache.metron.common.configuration.profiler.ProfileConfig;
 import org.json.simple.JSONObject;
 
 import java.io.Serializable;
+import java.util.Map;
 
 /**
  * Defines the 'route' a message must take through the Profiler.
@@ -74,6 +74,10 @@ public class MessageRoute implements Serializable {
     this.timestamp = timestamp;
   }
 
+  public MessageRoute() {
+    // necessary for serialization
+  }
+
   public String getEntity() {
     return entity;
   }
@@ -98,6 +102,10 @@ public class MessageRoute implements Serializable {
     this.message = message;
   }
 
+  public void setMessage(Map message) {
+    this.message = new JSONObject(message);
+  }
+
   public Long getTimestamp() {
     return timestamp;
   }

http://git-wip-us.apache.org/repos/asf/metron/blob/3bfbf018/metron-analytics/metron-profiler-spark/pom.xml
----------------------------------------------------------------------
diff --git a/metron-analytics/metron-profiler-spark/pom.xml b/metron-analytics/metron-profiler-spark/pom.xml
new file mode 100644
index 0000000..93ce08a
--- /dev/null
+++ b/metron-analytics/metron-profiler-spark/pom.xml
@@ -0,0 +1,195 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed to the Apache Software
+	Foundation (ASF) under one or more contributor license agreements. See the
+	NOTICE file distributed with this work for additional information regarding
+	copyright ownership. The ASF licenses this file to You under the Apache License,
+	Version 2.0 (the "License"); you may not use this file except in compliance
+	with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
+	Unless required by applicable law or agreed to in writing, software distributed
+	under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES
+	OR CONDITIONS OF ANY KIND, either express or implied. See the License for
+  the specific language governing permissions and limitations under the License.
+  -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+    <parent>
+        <groupId>org.apache.metron</groupId>
+        <artifactId>metron-analytics</artifactId>
+        <version>0.5.1</version>
+    </parent>
+    <artifactId>metron-profiler-spark</artifactId>
+    <url>https://metron.apache.org/</url>
+    <properties>
+        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+        <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
+    </properties>
+    <dependencies>
+        <dependency>
+          <groupId>org.apache.spark</groupId>
+          <artifactId>spark-core_2.11</artifactId>
+          <version>${global_spark_version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.spark</groupId>
+            <artifactId>spark-sql_2.11</artifactId>
+            <version>${global_spark_version}</version>
+            <exclusions>
+                <exclusion>
+                    <groupId>org.antlr</groupId>
+                    <artifactId>antlr-runtime</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.metron</groupId>
+            <artifactId>metron-profiler-common</artifactId>
+            <version>${project.parent.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.metron</groupId>
+            <artifactId>metron-common</artifactId>
+            <version>${project.parent.version}</version>
+            <exclusions>
+                <exclusion>
+                    <groupId>com.google.guava</groupId>
+                    <artifactId>guava</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.slf4j</groupId>
+                    <artifactId>slf4j-log4j12</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.metron</groupId>
+            <artifactId>metron-hbase</artifactId>
+            <version>${project.parent.version}</version>
+            <exclusions>
+                <exclusion>
+                    <groupId>org.slf4j</groupId>
+                    <artifactId>slf4j-log4j12</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.apache.storm</groupId>
+                    <artifactId>storm-hbase</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.metron</groupId>
+            <artifactId>metron-hbase</artifactId>
+            <version>${project.parent.version}</version>
+            <scope>test</scope>
+            <type>test-jar</type>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.hbase</groupId>
+            <artifactId>hbase-client</artifactId>
+            <version>${global_hbase_version}</version>
+            <exclusions>
+                <exclusion>
+                    <groupId>org.apache.hadoop</groupId>
+                    <artifactId>hadoop-auth</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.apache.hadoop</groupId>
+                    <artifactId>hadoop-common</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.apache.hadoop</groupId>
+                    <artifactId>hadoop-mapreduce-client-core</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+        <dependency>
+            <!-- allows profiles to use the Stellar stats functions -->
+            <groupId>org.apache.metron</groupId>
+            <artifactId>metron-statistics</artifactId>
+            <version>${project.parent.version}</version>
+            <exclusions>
+                <exclusion>
+                    <artifactId>kryo</artifactId>
+                    <groupId>com.esotericsoftware</groupId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.logging.log4j</groupId>
+            <artifactId>log4j-api</artifactId>
+            <version>${global_log4j_core_version}</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.logging.log4j</groupId>
+            <artifactId>log4j-core</artifactId>
+            <version>${global_log4j_core_version}</version>
+            <scope>test</scope>
+        </dependency>
+    </dependencies>
+    <build>
+        <plugins>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-shade-plugin</artifactId>
+                <version>${global_shade_version}</version>
+                <configuration>
+                    <createDependencyReducedPom>true</createDependencyReducedPom>
+                </configuration>
+                <executions>
+                    <execution>
+                        <phase>package</phase>
+                        <goals>
+                            <goal>shade</goal>
+                        </goals>
+                        <configuration>
+                            <filters>
+                                <filter>
+                                    <artifact>*:*</artifact>
+                                    <excludes>
+                                        <exclude>META-INF/*.SF</exclude>
+                                        <exclude>META-INF/*.DSA</exclude>
+                                        <exclude>META-INF/*.RSA</exclude>
+                                    </excludes>
+                                </filter>
+                            </filters>
+                            <relocations>
+                                <relocation>
+                                    <pattern>com.tdunning</pattern>
+                                    <shadedPattern>org.apache.metron.tdunning</shadedPattern>
+                                </relocation>
+                            </relocations>
+                            <artifactSet>
+                                <excludes>
+                                    <exclude>storm:storm-core:*</exclude>
+                                    <exclude>storm:storm-lib:*</exclude>
+                                    <exclude>org.slf4j.impl*</exclude>
+                                    <exclude>org.slf4j:slf4j-log4j*</exclude>
+                                </excludes>
+                            </artifactSet>
+                            <transformers>
+                                <transformer
+                                        implementation="org.apache.maven.plugins.shade.resource.DontIncludeResourceTransformer">
+                                    <resources>
+                                        <resource>.yaml</resource>
+                                        <resource>LICENSE.txt</resource>
+                                        <resource>ASL2.0</resource>
+                                        <resource>NOTICE.txt</resource>
+                                    </resources>
+                                </transformer>
+                                <transformer
+                                        implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
+                                <transformer
+                                        implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
+                                    <mainClass></mainClass>
+                                </transformer>
+                            </transformers>
+                        </configuration>
+                    </execution>
+                </executions>
+            </plugin>
+        </plugins>
+    </build>
+</project>

http://git-wip-us.apache.org/repos/asf/metron/blob/3bfbf018/metron-analytics/metron-profiler-spark/src/main/java/org/apache/metron/profiler/spark/BatchProfiler.java
----------------------------------------------------------------------
diff --git a/metron-analytics/metron-profiler-spark/src/main/java/org/apache/metron/profiler/spark/BatchProfiler.java b/metron-analytics/metron-profiler-spark/src/main/java/org/apache/metron/profiler/spark/BatchProfiler.java
new file mode 100644
index 0000000..f999613
--- /dev/null
+++ b/metron-analytics/metron-profiler-spark/src/main/java/org/apache/metron/profiler/spark/BatchProfiler.java
@@ -0,0 +1,102 @@
+/*
+ *
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *
+ */
+package org.apache.metron.profiler.spark;
+
+import com.google.common.collect.Maps;
+import org.apache.metron.common.configuration.profiler.ProfilerConfig;
+import org.apache.metron.profiler.MessageRoute;
+import org.apache.metron.profiler.spark.function.GroupByPeriodFunction;
+import org.apache.metron.profiler.spark.function.HBaseWriterFunction;
+import org.apache.metron.profiler.spark.function.MessageRouterFunction;
+import org.apache.metron.profiler.spark.function.ProfileBuilderFunction;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Encoders;
+import org.apache.spark.sql.SparkSession;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Serializable;
+import java.lang.invoke.MethodHandles;
+import java.util.Map;
+import java.util.Properties;
+
+import static org.apache.metron.profiler.spark.BatchProfilerConfig.TELEMETRY_INPUT_FORMAT;
+import static org.apache.metron.profiler.spark.BatchProfilerConfig.TELEMETRY_INPUT_PATH;
+import static org.apache.spark.sql.functions.sum;
+
+/**
+ * The 'Batch Profiler' that generates profiles by consuming data in batch from archived telemetry.
+ *
+ * <p>The Batch Profiler is executed in Spark.
+ */
+public class BatchProfiler implements Serializable {
+
+  protected static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+  /**
+   * Execute the Batch Profiler.
+   *
+   * @param spark The spark session.
+   * @param properties The profiler configuration properties.
+   * @param profiles The profile definitions.
+   * @return The number of profile measurements produced.
+   */
+  public long run(SparkSession spark,
+                  Properties properties,
+                  Properties globalProperties,
+                  ProfilerConfig profiles) {
+
+    LOG.debug("Building {} profile(s)", profiles.getProfiles().size());
+    Map<String, String> globals = Maps.fromProperties(globalProperties);
+
+    String inputFormat = TELEMETRY_INPUT_FORMAT.get(properties, String.class);
+    String inputPath = TELEMETRY_INPUT_PATH.get(properties, String.class);
+    LOG.debug("Loading telemetry from '{}'", inputPath);
+
+    // fetch the archived telemetry
+    Dataset<String> telemetry = spark
+            .read()
+            .format(inputFormat)
+            .load(inputPath)
+            .as(Encoders.STRING());
+    LOG.debug("Found {} telemetry record(s)", telemetry.cache().count());
+
+    // find all routes for each message
+    Dataset<MessageRoute> routes = telemetry
+            .flatMap(new MessageRouterFunction(profiles, globals), Encoders.bean(MessageRoute.class));
+    LOG.debug("Generated {} message route(s)", routes.cache().count());
+
+    // build the profiles
+    Dataset<ProfileMeasurementAdapter> measurements = routes
+            .groupByKey(new GroupByPeriodFunction(properties), Encoders.STRING())
+            .mapGroups(new ProfileBuilderFunction(properties, globals), Encoders.bean(ProfileMeasurementAdapter.class));
+    LOG.debug("Produced {} profile measurement(s)", measurements.cache().count());
+
+    // write the profile measurements to HBase
+    long count = measurements
+            .mapPartitions(new HBaseWriterFunction(properties), Encoders.INT())
+            .agg(sum("value"))
+            .head()
+            .getLong(0);
+    LOG.debug("{} profile measurement(s) written to HBase", count);
+
+    return count;
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/metron/blob/3bfbf018/metron-analytics/metron-profiler-spark/src/main/java/org/apache/metron/profiler/spark/BatchProfilerConfig.java
----------------------------------------------------------------------
diff --git a/metron-analytics/metron-profiler-spark/src/main/java/org/apache/metron/profiler/spark/BatchProfilerConfig.java b/metron-analytics/metron-profiler-spark/src/main/java/org/apache/metron/profiler/spark/BatchProfilerConfig.java
new file mode 100644
index 0000000..054806e
--- /dev/null
+++ b/metron-analytics/metron-profiler-spark/src/main/java/org/apache/metron/profiler/spark/BatchProfilerConfig.java
@@ -0,0 +1,190 @@
+/*
+ *
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *
+ */
+package org.apache.metron.profiler.spark;
+
+import org.apache.hadoop.hbase.client.Durability;
+import org.apache.metron.stellar.common.utils.ConversionUtils;
+
+import java.util.Map;
+import java.util.Properties;
+
+/**
+ * Defines the configuration values recognized by the Batch Profiler.
+ */
+public enum BatchProfilerConfig {
+
+  PERIOD_DURATION_UNITS("profiler.period.duration.units", "MINUTES", String.class),
+
+  PERIOD_DURATION("profiler.period.duration", 15, Integer.class),
+
+  HBASE_SALT_DIVISOR("profiler.hbase.salt.divisor", 1000, Integer.class),
+
+  HBASE_TABLE_PROVIDER("profiler.hbase.table.provider", "org.apache.metron.hbase.HTableProvider", String.class),
+
+  HBASE_TABLE_NAME("profiler.hbase.table", "profiler", String.class),
+
+  HBASE_COLUMN_FAMILY("profiler.hbase.column.family", "P", String.class),
+
+  HBASE_WRITE_DURABILITY("profiler.hbase.durability", Durability.USE_DEFAULT, Durability.class),
+
+  TELEMETRY_INPUT_FORMAT("profiler.batch.input.format", "text", String.class),
+
+  TELEMETRY_INPUT_PATH("profiler.batch.input.path", "hdfs://localhost:9000/apps/metron/indexing/indexed/*/*", String.class);
+
+  /**
+   * The key for the configuration value.
+   */
+  private String key;
+
+  /**
+   * The default value of the configuration, if none other is specified.
+   */
+  private Object defaultValue;
+
+  /**
+   * The type of the configuration value.
+   */
+  private Class<?> valueType;
+
+  BatchProfilerConfig(String key, Object defaultValue, Class<?> valueType) {
+    this.key = key;
+    this.defaultValue = defaultValue;
+    this.valueType = valueType;
+  }
+
+  /**
+   * Returns the key of the configuration value.
+   */
+  public String getKey() {
+    return key;
+  }
+
+  /**
+   * Returns the default value of the configuration.
+   */
+  public Object getDefault() {
+    return getDefault(valueType);
+  }
+
+  /**
+   * Returns the default value of the configuration, cast to the expected type.
+   *
+   * @param clazz The class of the expected type of the configuration value.
+   * @param <T> The expected type of the configuration value.
+   */
+  public <T> T getDefault(Class<T> clazz) {
+    return defaultValue == null ? null: ConversionUtils.convert(defaultValue, clazz);
+  }
+
+  /**
+   * Returns the configuration value from a map of configuration values.
+   *
+   * @param config A map containing configuration values.
+   */
+  public Object get(Map<String, String> config) {
+    return getOrDefault(config, defaultValue);
+  }
+
+  /**
+   * Returns the configuration value from a map of configuration values.
+   *
+   * @param properties Configuration properties.
+   */
+  public Object get(Properties properties) {
+    return getOrDefault(properties, defaultValue);
+  }
+
+  /**
+   * Returns the configuration value from a map of configuration values, cast to the expected type.
+   *
+   * @param config A map containing configuration values.
+   */
+  public <T> T get(Map<String, String> config, Class<T> clazz) {
+    return getOrDefault(config, defaultValue, clazz);
+  }
+
+  /**
+   * Returns the configuration value from a map of configuration values, cast to the expected type.
+   *
+   * @param properties Configuration properties.
+   */
+  public <T> T get(Properties properties, Class<T> clazz) {
+    return getOrDefault(properties, defaultValue, clazz);
+  }
+
+  /**
+   * Returns the configuration value from a map of configuration values.  If the value is not specified,
+   * the default value is returned.
+   *
+   * @param config A map containing configuration values.
+   * @param defaultValue The default value to return, if one is not defined.
+   * @return The configuration value or the specified default, if one is not defined.
+   */
+  private Object getOrDefault(Map<String, String> config, Object defaultValue) {
+    return getOrDefault(config, defaultValue, valueType);
+  }
+
+  /**
+   * Returns the configuration value from a map of configuration values.  If the value is not specified,
+   * the default value is returned.
+   *
+   * @param properties A map containing configuration values.
+   * @param defaultValue The default value to return, if one is not defined.
+   * @return The configuration value or the specified default, if one is not defined.
+   */
+  private Object getOrDefault(Properties properties, Object defaultValue) {
+    return getOrDefault(properties, defaultValue, valueType);
+  }
+
+  /**
+   * Returns the configuration value, cast to the expected type, from a map of configuration values.
+   * If the value is not specified, the default value is returned.
+   *
+   * @param config A map containing configuration values.
+   * @param defaultValue The default value to return, if one is not defined.
+   * @param clazz The class of the expected type of the configuration value.
+   * @param <T> The expected type of the configuration value.
+   * @return The configuration value or the specified default, if one is not defined.
+   */
+  private <T> T getOrDefault(Map<String, String> config, Object defaultValue, Class<T> clazz) {
+    Object value = config.getOrDefault(key, defaultValue.toString());
+    return value == null ? null : ConversionUtils.convert(value, clazz);
+  }
+
+  /**
+   * Returns the configuration value, cast to the expected type, from a map of configuration values.
+   * If the value is not specified, the default value is returned.
+   *
+   * @param properties Configuration properties.
+   * @param defaultValue The default value to return, if one is not defined.
+   * @param clazz The class of the expected type of the configuration value.
+   * @param <T> The expected type of the configuration value.
+   * @return The configuration value or the specified default, if one is not defined.
+   */
+  private <T> T getOrDefault(Properties properties, Object defaultValue, Class<T> clazz) {
+    Object value = properties.getOrDefault(key, defaultValue);
+    return value == null ? null : ConversionUtils.convert(value, clazz);
+  }
+
+  @Override
+  public String toString() {
+    return key;
+  }
+}

http://git-wip-us.apache.org/repos/asf/metron/blob/3bfbf018/metron-analytics/metron-profiler-spark/src/main/java/org/apache/metron/profiler/spark/ProfileMeasurementAdapter.java
----------------------------------------------------------------------
diff --git a/metron-analytics/metron-profiler-spark/src/main/java/org/apache/metron/profiler/spark/ProfileMeasurementAdapter.java b/metron-analytics/metron-profiler-spark/src/main/java/org/apache/metron/profiler/spark/ProfileMeasurementAdapter.java
new file mode 100644
index 0000000..5da7d04
--- /dev/null
+++ b/metron-analytics/metron-profiler-spark/src/main/java/org/apache/metron/profiler/spark/ProfileMeasurementAdapter.java
@@ -0,0 +1,132 @@
+/*
+ *
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *
+ */
+package org.apache.metron.profiler.spark;
+
+import org.apache.metron.common.utils.SerDeUtils;
+import org.apache.metron.profiler.ProfileMeasurement;
+import org.apache.metron.profiler.ProfilePeriod;
+
+import java.io.Serializable;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * An adapter for the {@link ProfileMeasurement} class so that the data
+ * can be serialized as required by Spark.
+ *
+ * <p>The `Encoders.bean(Class<T>)` encoder does not handle serialization of type `Object` well. This
+ * adapter encodes the profile's result as byte[] rather than an Object to work around this.
+ */
+public class ProfileMeasurementAdapter implements Serializable {
+
+  /**
+   * The name of the profile that this measurement is associated with.
+   */
+  private String profileName;
+
+  /**
+   * The name of the entity being profiled.
+   */
+  private String entity;
+
+  /**
+   * A monotonically increasing number identifying the period.  The first period is 0
+   * and began at the epoch.
+   */
+  private Long periodId;
+
+  /**
+   * The duration of each period in milliseconds.
+   */
+  private Long durationMillis;
+
+  /**
+   * The result of evaluating the profile expression.
+   *
+   * The `Encoders.bean(Class<T>)` encoder does not handle serialization of type `Object`. This
+   * adapter encodes the profile's result as `byte[]` rather than an `Object` to work around this.
+   */
+  private byte[] profileValue;
+
+  public ProfileMeasurementAdapter() {
+    // default constructor required for serialization in Spark
+  }
+
+  public ProfileMeasurementAdapter(ProfileMeasurement measurement) {
+    this.profileName = measurement.getProfileName();
+    this.entity = measurement.getEntity();
+    this.periodId = measurement.getPeriod().getPeriod();
+    this.durationMillis = measurement.getPeriod().getDurationMillis();
+    this.profileValue = SerDeUtils.toBytes(measurement.getProfileValue());
+  }
+
+  public ProfileMeasurement toProfileMeasurement() {
+    ProfilePeriod period = ProfilePeriod.fromPeriodId(periodId, durationMillis, TimeUnit.MILLISECONDS);
+    ProfileMeasurement measurement = new ProfileMeasurement()
+            .withProfileName(profileName)
+            .withEntity(entity)
+            .withPeriod(period)
+            .withProfileValue(SerDeUtils.fromBytes(profileValue, Object.class));
+    return measurement;
+  }
+
+  public String getProfileName() {
+    return profileName;
+  }
+
+  public void setProfileName(String profileName) {
+    this.profileName = profileName;
+  }
+
+  public String getEntity() {
+    return entity;
+  }
+
+  public void setEntity(String entity) {
+    this.entity = entity;
+  }
+
+  public Long getPeriodId() {
+    return periodId;
+  }
+
+  public void setPeriodId(Long periodId) {
+    this.periodId = periodId;
+  }
+
+  public Long getDurationMillis() {
+    return durationMillis;
+  }
+
+  public void setDurationMillis(Long durationMillis) {
+    this.durationMillis = durationMillis;
+  }
+
+  public byte[] getProfileValue() {
+    return profileValue;
+  }
+
+  public void setProfileValue(byte[] profileValue) {
+    this.profileValue = profileValue;
+  }
+
+  public void setProfileValue(Object profileValue) {
+    this.profileValue = SerDeUtils.toBytes(profileValue);
+  }
+}

http://git-wip-us.apache.org/repos/asf/metron/blob/3bfbf018/metron-analytics/metron-profiler-spark/src/main/java/org/apache/metron/profiler/spark/function/GroupByPeriodFunction.java
----------------------------------------------------------------------
diff --git a/metron-analytics/metron-profiler-spark/src/main/java/org/apache/metron/profiler/spark/function/GroupByPeriodFunction.java b/metron-analytics/metron-profiler-spark/src/main/java/org/apache/metron/profiler/spark/function/GroupByPeriodFunction.java
new file mode 100644
index 0000000..1b602f4
--- /dev/null
+++ b/metron-analytics/metron-profiler-spark/src/main/java/org/apache/metron/profiler/spark/function/GroupByPeriodFunction.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.metron.profiler.spark.function;
+
+import org.apache.metron.profiler.MessageRoute;
+import org.apache.metron.profiler.ProfilePeriod;
+import org.apache.spark.api.java.function.MapFunction;
+
+import java.util.Properties;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.metron.profiler.spark.BatchProfilerConfig.PERIOD_DURATION;
+import static org.apache.metron.profiler.spark.BatchProfilerConfig.PERIOD_DURATION_UNITS;
+
+/**
+ * Defines how {@link MessageRoute} are grouped.
+ *
+ * The routes are grouped by (profile, entity, periodId) so that all of the required
+ * messages are available to produce a {@link org.apache.metron.profiler.ProfileMeasurement}.
+ */
+public class GroupByPeriodFunction implements MapFunction<MessageRoute, String> {
+
+  /**
+   * The duration of each profile period.
+   */
+  private int periodDuration;
+
+  /**
+   * The units of the period duration.
+   */
+  private TimeUnit periodDurationUnits;
+
+  public GroupByPeriodFunction(Properties profilerProperties) {
+    periodDurationUnits = TimeUnit.valueOf(PERIOD_DURATION_UNITS.get(profilerProperties, String.class));
+    periodDuration = PERIOD_DURATION.get(profilerProperties, Integer.class);
+  }
+
+  @Override
+  public String call(MessageRoute route) {
+    ProfilePeriod period = ProfilePeriod.fromTimestamp(route.getTimestamp(), periodDuration, periodDurationUnits);
+    return route.getProfileDefinition().getProfile() + "-" + route.getEntity() + "-" + period.getPeriod();
+  }
+}

http://git-wip-us.apache.org/repos/asf/metron/blob/3bfbf018/metron-analytics/metron-profiler-spark/src/main/java/org/apache/metron/profiler/spark/function/HBaseWriterFunction.java
----------------------------------------------------------------------
diff --git a/metron-analytics/metron-profiler-spark/src/main/java/org/apache/metron/profiler/spark/function/HBaseWriterFunction.java b/metron-analytics/metron-profiler-spark/src/main/java/org/apache/metron/profiler/spark/function/HBaseWriterFunction.java
new file mode 100644
index 0000000..cfabd94
--- /dev/null
+++ b/metron-analytics/metron-profiler-spark/src/main/java/org/apache/metron/profiler/spark/function/HBaseWriterFunction.java
@@ -0,0 +1,171 @@
+/*
+ *
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *
+ */
+package org.apache.metron.profiler.spark.function;
+
+import org.apache.commons.collections4.IteratorUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.client.Durability;
+import org.apache.metron.hbase.HTableProvider;
+import org.apache.metron.hbase.TableProvider;
+import org.apache.metron.hbase.client.HBaseClient;
+import org.apache.metron.profiler.ProfileMeasurement;
+import org.apache.metron.profiler.hbase.ColumnBuilder;
+import org.apache.metron.profiler.hbase.RowKeyBuilder;
+import org.apache.metron.profiler.hbase.SaltyRowKeyBuilder;
+import org.apache.metron.profiler.hbase.ValueOnlyColumnBuilder;
+import org.apache.metron.profiler.spark.ProfileMeasurementAdapter;
+import org.apache.spark.api.java.function.MapPartitionsFunction;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.lang.invoke.MethodHandles;
+import java.lang.reflect.InvocationTargetException;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Properties;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.metron.profiler.spark.BatchProfilerConfig.HBASE_COLUMN_FAMILY;
+import static org.apache.metron.profiler.spark.BatchProfilerConfig.HBASE_SALT_DIVISOR;
+import static org.apache.metron.profiler.spark.BatchProfilerConfig.HBASE_TABLE_NAME;
+import static org.apache.metron.profiler.spark.BatchProfilerConfig.HBASE_TABLE_PROVIDER;
+import static org.apache.metron.profiler.spark.BatchProfilerConfig.HBASE_WRITE_DURABILITY;
+import static org.apache.metron.profiler.spark.BatchProfilerConfig.PERIOD_DURATION;
+import static org.apache.metron.profiler.spark.BatchProfilerConfig.PERIOD_DURATION_UNITS;
+
+/**
+ * Writes the profile measurements to HBase in Spark.
+ */
+public class HBaseWriterFunction implements MapPartitionsFunction<ProfileMeasurementAdapter, Integer> {
+
+  protected static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+  private TableProvider tableProvider;
+
+  /**
+   * The name of the HBase table to write to.
+   */
+  private String tableName;
+
+  /**
+   * The durability guarantee when writing to HBase.
+   */
+  private Durability durability;
+
+  /**
+   * Builds the HBase row key.
+   */
+  private RowKeyBuilder rowKeyBuilder;
+
+  /**
+   * Assembles the columns for HBase.
+   */
+  private ColumnBuilder columnBuilder;
+
+  public HBaseWriterFunction(Properties properties) {
+    tableName = HBASE_TABLE_NAME.get(properties, String.class);
+    durability = HBASE_WRITE_DURABILITY.get(properties, Durability.class);
+
+    // row key builder
+    int saltDivisor = HBASE_SALT_DIVISOR.get(properties, Integer.class);
+    int periodDuration = PERIOD_DURATION.get(properties, Integer.class);
+    TimeUnit periodDurationUnits = TimeUnit.valueOf(PERIOD_DURATION_UNITS.get(properties, String.class));
+    rowKeyBuilder = new SaltyRowKeyBuilder(saltDivisor, periodDuration, periodDurationUnits);
+
+    // column builder
+    String columnFamily = HBASE_COLUMN_FAMILY.get(properties, String.class);
+    columnBuilder = new ValueOnlyColumnBuilder(columnFamily);
+
+    // hbase table provider
+    String providerImpl = HBASE_TABLE_PROVIDER.get(properties, String.class);
+    tableProvider = createTableProvider(providerImpl);
+  }
+
+  /**
+   * Writes a set of measurements to HBase.
+   *
+   * @param iterator The measurements to write.
+   * @return The number of measurements written to HBase.
+   */
+  @Override
+  public Iterator<Integer> call(Iterator<ProfileMeasurementAdapter> iterator) throws Exception {
+    int count = 0;
+    LOG.debug("About to write profile measurement(s) to HBase");
+
+    // do not open hbase connection, if nothing to write
+    List<ProfileMeasurementAdapter> measurements = IteratorUtils.toList(iterator);
+    if(measurements.size() > 0) {
+
+      // open an HBase connection
+      Configuration config = HBaseConfiguration.create();
+      try (HBaseClient client = new HBaseClient(tableProvider, config, tableName)) {
+
+        for (ProfileMeasurementAdapter adapter : measurements) {
+          ProfileMeasurement m = adapter.toProfileMeasurement();
+          client.addMutation(rowKeyBuilder.rowKey(m), columnBuilder.columns(m), durability);
+        }
+        count = client.mutate();
+
+      } catch (IOException e) {
+        LOG.error("Unable to open connection to HBase", e);
+        throw new RuntimeException(e);
+      }
+    }
+
+    LOG.debug("{} profile measurement(s) written to HBase", count);
+    return IteratorUtils.singletonIterator(count);
+  }
+
+  /**
+   * Set the {@link TableProvider} using the class name of the provider.
+   * @param providerImpl The name of the class.
+   * @return
+   */
+  public HBaseWriterFunction withTableProviderImpl(String providerImpl) {
+    this.tableProvider = createTableProvider(providerImpl);
+    return this;
+  }
+
+  /**
+   * Creates a TableProvider based on a class name.
+   * @param providerImpl The class name of a TableProvider
+   */
+  private static TableProvider createTableProvider(String providerImpl) {
+    LOG.trace("Creating table provider; className={}", providerImpl);
+
+    // if class name not defined, use a reasonable default
+    if(StringUtils.isEmpty(providerImpl) || providerImpl.charAt(0) == '$') {
+      return new HTableProvider();
+    }
+
+    // instantiate the table provider
+    try {
+      Class<? extends TableProvider> clazz = (Class<? extends TableProvider>) Class.forName(providerImpl);
+      return clazz.getConstructor().newInstance();
+
+    } catch (InstantiationException | IllegalAccessException | IllegalStateException |
+            InvocationTargetException | NoSuchMethodException | ClassNotFoundException e) {
+      throw new IllegalStateException("Unable to instantiate connector", e);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/metron/blob/3bfbf018/metron-analytics/metron-profiler-spark/src/main/java/org/apache/metron/profiler/spark/function/MessageRouterFunction.java
----------------------------------------------------------------------
diff --git a/metron-analytics/metron-profiler-spark/src/main/java/org/apache/metron/profiler/spark/function/MessageRouterFunction.java b/metron-analytics/metron-profiler-spark/src/main/java/org/apache/metron/profiler/spark/function/MessageRouterFunction.java
new file mode 100644
index 0000000..cf8029f
--- /dev/null
+++ b/metron-analytics/metron-profiler-spark/src/main/java/org/apache/metron/profiler/spark/function/MessageRouterFunction.java
@@ -0,0 +1,113 @@
+/*
+ *
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *
+ */
+package org.apache.metron.profiler.spark.function;
+
+import org.apache.metron.common.configuration.profiler.ProfilerConfig;
+import org.apache.metron.profiler.DefaultMessageRouter;
+import org.apache.metron.profiler.MessageRoute;
+import org.apache.metron.profiler.MessageRouter;
+import org.apache.metron.stellar.dsl.Context;
+import org.apache.spark.api.java.function.FlatMapFunction;
+import org.json.simple.JSONObject;
+import org.json.simple.parser.JSONParser;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.lang.invoke.MethodHandles;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+
+/**
+ * The function responsible for finding routes for a given message in Spark.
+ */
+public class MessageRouterFunction implements FlatMapFunction<String, MessageRoute> {
+
+  protected static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+  /**
+   * The global configuration used for the execution of Stellar.
+   */
+  private Map<String, String> globals;
+
+  /**
+   * The profile definitions.
+   */
+  private ProfilerConfig profilerConfig;
+
+  public MessageRouterFunction(ProfilerConfig profilerConfig, Map<String, String> globals) {
+    this.profilerConfig = profilerConfig;
+    this.globals = globals;
+  }
+
+  /**
+   * Find all routes for a given telemetry message.
+   *
+   * <p>A message may need routed to multiple profiles should it be needed by more than one.  A
+   * message may also not be routed should it not be needed by any profiles.
+   *
+   * @param jsonMessage The raw JSON message.
+   * @return A list of message routes.
+   */
+  @Override
+  public Iterator<MessageRoute> call(String jsonMessage) throws Exception {
+    List<MessageRoute> routes;
+
+    JSONParser parser = new JSONParser();
+    Context context = TaskUtils.getContext(globals);
+    MessageRouter router = new DefaultMessageRouter(context);
+
+    // parse the raw message
+    Optional<JSONObject> message = toMessage(jsonMessage, parser);
+    if(message.isPresent()) {
+
+      // find all routes
+      routes = router.route(message.get(), profilerConfig, context);
+      LOG.trace("Found {} route(s) for a message", routes.size());
+
+    } else {
+      // the message is not valid and must be ignored
+      routes = Collections.emptyList();
+      LOG.trace("No route possible. Unable to parse message.");
+    }
+
+    return routes.iterator();
+  }
+
+  /**
+   * Parses the raw JSON of a message.
+   *
+   * @param json The raw JSON to parse.
+   * @param parser The parser to use.
+   * @return The parsed telemetry message.
+   */
+  private static Optional<JSONObject> toMessage(String json, JSONParser parser) {
+    try {
+      JSONObject message = (JSONObject) parser.parse(json);
+      return Optional.of(message);
+
+    } catch(Throwable e) {
+      LOG.warn(String.format("Unable to parse message, message will be ignored; message='%s'", json), e);
+      return Optional.empty();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/metron/blob/3bfbf018/metron-analytics/metron-profiler-spark/src/main/java/org/apache/metron/profiler/spark/function/ProfileBuilderFunction.java
----------------------------------------------------------------------
diff --git a/metron-analytics/metron-profiler-spark/src/main/java/org/apache/metron/profiler/spark/function/ProfileBuilderFunction.java b/metron-analytics/metron-profiler-spark/src/main/java/org/apache/metron/profiler/spark/function/ProfileBuilderFunction.java
new file mode 100644
index 0000000..273695b
--- /dev/null
+++ b/metron-analytics/metron-profiler-spark/src/main/java/org/apache/metron/profiler/spark/function/ProfileBuilderFunction.java
@@ -0,0 +1,107 @@
+/*
+ *
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *
+ */
+package org.apache.metron.profiler.spark.function;
+
+import org.apache.metron.profiler.DefaultMessageDistributor;
+import org.apache.metron.profiler.MessageDistributor;
+import org.apache.metron.profiler.MessageRoute;
+import org.apache.metron.profiler.ProfileMeasurement;
+import org.apache.metron.profiler.spark.ProfileMeasurementAdapter;
+import org.apache.metron.stellar.dsl.Context;
+import org.apache.spark.api.java.function.MapGroupsFunction;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.lang.invoke.MethodHandles;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+import java.util.stream.StreamSupport;
+
+import static java.util.Comparator.comparing;
+import static org.apache.metron.profiler.spark.BatchProfilerConfig.PERIOD_DURATION;
+import static org.apache.metron.profiler.spark.BatchProfilerConfig.PERIOD_DURATION_UNITS;
+
+/**
+ * The function responsible for building profiles in Spark.
+ */
+public class ProfileBuilderFunction implements MapGroupsFunction<String, MessageRoute, ProfileMeasurementAdapter>  {
+
+  protected static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+  private long periodDurationMillis;
+  private Map<String, String> globals;
+
+  public ProfileBuilderFunction(Properties properties, Map<String, String> globals) {
+    TimeUnit periodDurationUnits = TimeUnit.valueOf(PERIOD_DURATION_UNITS.get(properties, String.class));
+    int periodDuration = PERIOD_DURATION.get(properties, Integer.class);
+    this.periodDurationMillis = periodDurationUnits.toMillis(periodDuration);
+    this.globals = globals;
+  }
+
+  /**
+   * Build a profile from a set of message routes.
+   *
+   * <p>This assumes that all of the necessary routes have been provided
+   *
+   * @param group The group identifier.
+   * @param iterator The message routes.
+   * @return
+   */
+  @Override
+  public ProfileMeasurementAdapter call(String group, Iterator<MessageRoute> iterator) throws Exception {
+    // create the distributor; some settings are unnecessary because it is cleaned-up immediately after processing the batch
+    int maxRoutes = Integer.MAX_VALUE;
+    long profileTTLMillis = Long.MAX_VALUE;
+    MessageDistributor distributor = new DefaultMessageDistributor(periodDurationMillis, profileTTLMillis, maxRoutes);
+    Context context = TaskUtils.getContext(globals);
+
+    // sort the messages/routes
+    List<MessageRoute> routes = toStream(iterator)
+            .sorted(comparing(rt -> rt.getTimestamp()))
+            .collect(Collectors.toList());
+    LOG.debug("Building a profile for group '{}' from {} message(s)", group, routes.size());
+
+    // apply each message/route to build the profile
+    for(MessageRoute route: routes) {
+      distributor.distribute(route, context);
+    }
+
+    // flush the profile
+    List<ProfileMeasurement> measurements = distributor.flush();
+    if(measurements.size() > 1) {
+      throw new IllegalStateException("No more than 1 profile measurement is expected");
+    }
+
+    ProfileMeasurement m = measurements.get(0);
+    LOG.debug("Profile measurement created; profile={}, entity={}, period={}, value={}",
+            m.getProfileName(), m.getEntity(), m.getPeriod().getPeriod(), m.getProfileValue());
+    return new ProfileMeasurementAdapter(m);
+  }
+
+  private static <T> Stream<T> toStream(Iterator<T> iterator) {
+    Iterable<T> iterable = () -> iterator;
+    return StreamSupport.stream(iterable.spliterator(), false);
+  }
+}

http://git-wip-us.apache.org/repos/asf/metron/blob/3bfbf018/metron-analytics/metron-profiler-spark/src/main/java/org/apache/metron/profiler/spark/function/TaskUtils.java
----------------------------------------------------------------------
diff --git a/metron-analytics/metron-profiler-spark/src/main/java/org/apache/metron/profiler/spark/function/TaskUtils.java b/metron-analytics/metron-profiler-spark/src/main/java/org/apache/metron/profiler/spark/function/TaskUtils.java
new file mode 100644
index 0000000..d401f12
--- /dev/null
+++ b/metron-analytics/metron-profiler-spark/src/main/java/org/apache/metron/profiler/spark/function/TaskUtils.java
@@ -0,0 +1,41 @@
+/*
+ *
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *
+ */
+package org.apache.metron.profiler.spark.function;
+
+import org.apache.metron.stellar.dsl.Context;
+import org.apache.metron.stellar.dsl.StellarFunctions;
+
+import java.io.Serializable;
+import java.util.Map;
+
+public class TaskUtils implements Serializable {
+
+  /**
+   * Create the execution context for running Stellar.
+   */
+  public static Context getContext(Map<String, String> globals) {
+    Context context = new Context.Builder()
+            .with(Context.Capabilities.GLOBAL_CONFIG, () -> globals)
+            .with(Context.Capabilities.STELLAR_CONFIG, () -> globals)
+            .build();
+    StellarFunctions.initialize(context);
+    return context;
+  }
+}

http://git-wip-us.apache.org/repos/asf/metron/blob/3bfbf018/metron-analytics/metron-profiler-spark/src/test/java/org/apache/metron/profiler/spark/BatchProfilerIntegrationTest.java
----------------------------------------------------------------------
diff --git a/metron-analytics/metron-profiler-spark/src/test/java/org/apache/metron/profiler/spark/BatchProfilerIntegrationTest.java b/metron-analytics/metron-profiler-spark/src/test/java/org/apache/metron/profiler/spark/BatchProfilerIntegrationTest.java
new file mode 100644
index 0000000..f560740
--- /dev/null
+++ b/metron-analytics/metron-profiler-spark/src/test/java/org/apache/metron/profiler/spark/BatchProfilerIntegrationTest.java
@@ -0,0 +1,111 @@
+/*
+ *
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *
+ */
+package org.apache.metron.profiler.spark;
+
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.metron.common.configuration.profiler.ProfileConfig;
+import org.apache.metron.common.configuration.profiler.ProfilerConfig;
+import org.apache.metron.hbase.mock.MockHBaseTableProvider;
+import org.apache.metron.hbase.mock.MockHTable;
+import org.apache.spark.SparkConf;
+import org.apache.spark.sql.SparkSession;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.util.List;
+import java.util.Properties;
+
+import static org.apache.metron.profiler.spark.BatchProfilerConfig.HBASE_COLUMN_FAMILY;
+import static org.apache.metron.profiler.spark.BatchProfilerConfig.HBASE_TABLE_NAME;
+import static org.apache.metron.profiler.spark.BatchProfilerConfig.HBASE_TABLE_PROVIDER;
+import static org.apache.metron.profiler.spark.BatchProfilerConfig.TELEMETRY_INPUT_FORMAT;
+import static org.apache.metron.profiler.spark.BatchProfilerConfig.TELEMETRY_INPUT_PATH;
+import static org.junit.Assert.assertEquals;
+
+public class BatchProfilerIntegrationTest {
+
+  private static SparkSession spark;
+  private MockHTable profilerTable;
+  private Properties profilerProperties;
+
+  @BeforeClass
+  public static void setupSpark() {
+    SparkConf conf = new SparkConf()
+            .setMaster("local")
+            .setAppName("BatchProfilerIntegrationTest")
+            .set("spark.sql.shuffle.partitions", "8");
+    spark = SparkSession
+            .builder()
+            .config(conf)
+            .getOrCreate();
+  }
+
+  @AfterClass
+  public static void tearDownSpark() {
+    if(spark != null) {
+      spark.close();
+    }
+  }
+
+  @Before
+  public void setup() {
+    profilerProperties = new Properties();
+
+    // define the source of the input telemetry
+    profilerProperties.put(TELEMETRY_INPUT_PATH.getKey(), "src/test/resources/telemetry.json");
+    profilerProperties.put(TELEMETRY_INPUT_FORMAT.getKey(), "text");
+
+    // define where the output will go
+    String tableName = HBASE_TABLE_NAME.get(profilerProperties, String.class);
+    String columnFamily = HBASE_COLUMN_FAMILY.get(profilerProperties, String.class);
+    profilerProperties.put(HBASE_TABLE_PROVIDER.getKey(), MockHBaseTableProvider.class.getName());
+
+    // create the mock hbase table
+    profilerTable = (MockHTable) MockHBaseTableProvider.addToCache(tableName, columnFamily);
+  }
+
+  @Test
+  public void testBatchProfiler() {
+
+    // run the batch profiler
+    BatchProfiler profiler = new BatchProfiler();
+    profiler.run(spark, profilerProperties, getGlobals(), getProfile());
+
+    List<Put> puts = profilerTable.getPutLog();
+    assertEquals(2, puts.size());
+  }
+
+
+  private ProfilerConfig getProfile() {
+    ProfileConfig profile = new ProfileConfig()
+            .withProfile("profile1")
+            .withForeach("ip_src_addr")
+            .withUpdate("count", "count + 1")
+            .withResult("count");
+    return new ProfilerConfig()
+            .withProfile(profile);
+  }
+
+  private Properties getGlobals() {
+    return new Properties();
+  }
+}

http://git-wip-us.apache.org/repos/asf/metron/blob/3bfbf018/metron-analytics/metron-profiler-spark/src/test/java/org/apache/metron/profiler/spark/function/HBaseWriterFunctionTest.java
----------------------------------------------------------------------
diff --git a/metron-analytics/metron-profiler-spark/src/test/java/org/apache/metron/profiler/spark/function/HBaseWriterFunctionTest.java b/metron-analytics/metron-profiler-spark/src/test/java/org/apache/metron/profiler/spark/function/HBaseWriterFunctionTest.java
new file mode 100644
index 0000000..55f3e21
--- /dev/null
+++ b/metron-analytics/metron-profiler-spark/src/test/java/org/apache/metron/profiler/spark/function/HBaseWriterFunctionTest.java
@@ -0,0 +1,176 @@
+/*
+ *
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *
+ */
+package org.apache.metron.profiler.spark.function;
+
+import org.apache.commons.collections4.IteratorUtils;
+import org.apache.metron.common.configuration.profiler.ProfileConfig;
+import org.apache.metron.hbase.mock.MockHBaseTableProvider;
+import org.apache.metron.profiler.ProfileMeasurement;
+import org.apache.metron.profiler.spark.ProfileMeasurementAdapter;
+import org.json.simple.JSONObject;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Properties;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.metron.profiler.spark.BatchProfilerConfig.HBASE_COLUMN_FAMILY;
+import static org.apache.metron.profiler.spark.BatchProfilerConfig.HBASE_TABLE_NAME;
+
+public class HBaseWriterFunctionTest {
+
+  Properties profilerProperties;
+
+  @Before
+  public void setup() {
+    profilerProperties = getProfilerProperties();
+
+    // create a mock table for HBase
+    String tableName = HBASE_TABLE_NAME.get(profilerProperties, String.class);
+    String columnFamily = HBASE_COLUMN_FAMILY.get(profilerProperties, String.class);
+    MockHBaseTableProvider.addToCache(tableName, columnFamily);
+  }
+
+  @Test
+  public void testWrite() throws Exception {
+
+    JSONObject message = getMessage();
+    String entity = (String) message.get("ip_src_addr");
+    long timestamp = (Long) message.get("timestamp");
+    ProfileConfig profile = getProfile();
+
+    // setup the profile measurements that will be written
+    List<ProfileMeasurementAdapter> measurements = createMeasurements(1, entity, timestamp, profile);
+
+    // setup the function to test
+    HBaseWriterFunction function = new HBaseWriterFunction(profilerProperties);
+    function.withTableProviderImpl(MockHBaseTableProvider.class.getName());
+
+    // write the measurements
+    Iterator<Integer> results = function.call(measurements.iterator());
+
+    // validate the result
+    List<Integer> counts = IteratorUtils.toList(results);
+    Assert.assertEquals(1, counts.size());
+    Assert.assertEquals(1, counts.get(0).intValue());
+  }
+
+  @Test
+  public void testWriteMany() throws Exception {
+
+    JSONObject message = getMessage();
+    String entity = (String) message.get("ip_src_addr");
+    long timestamp = (Long) message.get("timestamp");
+    ProfileConfig profile = getProfile();
+
+    // setup the profile measurements that will be written
+    List<ProfileMeasurementAdapter> measurements = createMeasurements(10, entity, timestamp, profile);
+
+    // setup the function to test
+    HBaseWriterFunction function = new HBaseWriterFunction(profilerProperties);
+    function.withTableProviderImpl(MockHBaseTableProvider.class.getName());
+
+    // write the measurements
+    Iterator<Integer> results = function.call(measurements.iterator());
+
+    // validate the result
+    List<Integer> counts = IteratorUtils.toList(results);
+    Assert.assertEquals(1, counts.size());
+    Assert.assertEquals(10, counts.get(0).intValue());
+  }
+
+  @Test
+  public void testWriteNone() throws Exception {
+
+    // there are no profile measurements to write
+    List<ProfileMeasurementAdapter> measurements = new ArrayList<>();
+
+    // setup the function to test
+    HBaseWriterFunction function = new HBaseWriterFunction(profilerProperties);
+    function.withTableProviderImpl(MockHBaseTableProvider.class.getName());
+
+    // write the measurements
+    Iterator<Integer> results = function.call(measurements.iterator());
+
+    // validate the result
+    List<Integer> counts = IteratorUtils.toList(results);
+    Assert.assertEquals(1, counts.size());
+    Assert.assertEquals(0, counts.get(0).intValue());
+  }
+
+  /**
+   * Create a list of measurements for testing.
+   *
+   * @param count The number of messages to create.
+   * @param entity The entity.
+   * @param timestamp The timestamp.
+   * @param profile The profile definition.
+   * @return
+   */
+  private List<ProfileMeasurementAdapter> createMeasurements(int count, String entity, long timestamp, ProfileConfig profile) {
+    List<ProfileMeasurementAdapter> measurements = new ArrayList<>();
+
+    for(int i=0; i<count; i++) {
+      ProfileMeasurement measurement = new ProfileMeasurement()
+              .withProfileName(profile.getProfile())
+              .withEntity(entity)
+              .withPeriod(timestamp, 15, TimeUnit.MINUTES);
+
+      // wrap the measurement using the adapter
+      measurements.add(new ProfileMeasurementAdapter(measurement));
+    }
+
+    return measurements;
+  }
+
+  /**
+   * Returns a telemetry message to use for testing.
+   */
+  private JSONObject getMessage() {
+    JSONObject message = new JSONObject();
+    message.put("ip_src_addr", "192.168.1.1");
+    message.put("status", "red");
+    message.put("timestamp", System.currentTimeMillis());
+    return message;
+  }
+
+  /**
+   * Returns profiler properties to use for testing.
+   */
+  private Properties getProfilerProperties() {
+    return new Properties();
+  }
+
+  /**
+   * Returns a profile definition to use for testing.
+   */
+  private ProfileConfig getProfile() {
+    return new ProfileConfig()
+            .withProfile("profile1")
+            .withForeach("ip_src_addr")
+            .withUpdate("count", "count + 1")
+            .withResult("count");
+
+  }
+}