You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@metron.apache.org by ni...@apache.org on 2018/08/23 21:58:56 UTC
[4/4] metron git commit: METRON-1707 Port Profiler to Spark
(nickwallen) closes apache/metron#1150
METRON-1707 Port Profiler to Spark (nickwallen) closes apache/metron#1150
Project: http://git-wip-us.apache.org/repos/asf/metron/repo
Commit: http://git-wip-us.apache.org/repos/asf/metron/commit/3bfbf018
Tree: http://git-wip-us.apache.org/repos/asf/metron/tree/3bfbf018
Diff: http://git-wip-us.apache.org/repos/asf/metron/diff/3bfbf018
Branch: refs/heads/feature/METRON-1699-create-batch-profiler
Commit: 3bfbf018a9c3e1c74dc934901446b5111a0ada03
Parents: 6fb50a1
Author: nickwallen <ni...@nickallen.org>
Authored: Thu Aug 23 17:58:18 2018 -0400
Committer: nickallen <ni...@apache.org>
Committed: Thu Aug 23 17:58:18 2018 -0400
----------------------------------------------------------------------
dependencies_with_url.csv | 64 +++++-
.../profiler/DefaultMessageDistributor.java | 1 -
.../apache/metron/profiler/MessageRoute.java | 10 +-
metron-analytics/metron-profiler-spark/pom.xml | 195 +++++++++++++++++++
.../metron/profiler/spark/BatchProfiler.java | 102 ++++++++++
.../profiler/spark/BatchProfilerConfig.java | 190 ++++++++++++++++++
.../spark/ProfileMeasurementAdapter.java | 132 +++++++++++++
.../spark/function/GroupByPeriodFunction.java | 60 ++++++
.../spark/function/HBaseWriterFunction.java | 171 ++++++++++++++++
.../spark/function/MessageRouterFunction.java | 113 +++++++++++
.../spark/function/ProfileBuilderFunction.java | 107 ++++++++++
.../profiler/spark/function/TaskUtils.java | 41 ++++
.../spark/BatchProfilerIntegrationTest.java | 111 +++++++++++
.../spark/function/HBaseWriterFunctionTest.java | 176 +++++++++++++++++
.../function/MessageRouterFunctionTest.java | 114 +++++++++++
.../function/ProfileBuilderFunctionTest.java | 98 ++++++++++
.../src/test/resources/log4j.properties | 31 +++
.../src/test/resources/telemetry.json | 100 ++++++++++
metron-analytics/pom.xml | 1 +
.../configuration/profiler/ProfileResult.java | 4 +
.../profiler/ProfileResultExpressions.java | 4 +
pom.xml | 1 +
22 files changed, 1822 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/metron/blob/3bfbf018/dependencies_with_url.csv
----------------------------------------------------------------------
diff --git a/dependencies_with_url.csv b/dependencies_with_url.csv
index 6ac1f23..6b4385b 100644
--- a/dependencies_with_url.csv
+++ b/dependencies_with_url.csv
@@ -33,13 +33,18 @@ com.maxmind.geoip2:geoip2:jar:2.8.0:compile,Apache v2,https://github.com/maxmind
com.sun.xml.bind:jaxb-impl:jar:2.2.3-1:compile,CDDL,http://jaxb.java.net/
com.sun.xml.bind:jaxb-impl:jar:2.2.5-2:compile,CDDL,http://jaxb.java.net/
com.twitter:jsr166e:jar:1.1.0:compile,CC0 1.0 Universal,http://github.com/twitter/jsr166e
+com.twitter:chill-java:jar:0.8.4:compile,ASLv2,https://github.com/twitter/chill
+com.twitter:chill_2.11:jar:0.8.4:compile,ASLv2,https://github.com/twitter/chill
it.unimi.dsi:fastutil:jar:7.0.6:compile,ASLv2,https://github.com/vigna/fastutil
javassist:javassist:jar:3.12.1.GA:compile,Apache v2,http://www.javassist.org/
javax.activation:activation:jar:1.1:compile,Common Development and Distribution License (CDDL) v1.0,http://java.sun.com/products/javabeans/jaf/index.jsp
+javax.activation:activation:jar:1.1.1:compile,Common Development and Distribution License (CDDL) v1.0,http://java.sun.com/products/javabeans/jaf/index.jsp
javax.annotation:jsr250-api:jar:1.0:compile,COMMON DEVELOPMENT AND DISTRIBUTION LICENSE (CDDL) Version 1.0,http://jcp.org/aboutJava/communityprocess/final/jsr250/index.html
javax.annotation:javax.annotation-api:jar:1.3.2:compile,CDDL 1.1,https://github.com/javaee/javax.annotation/
+javax.annotation:javax.annotation-api:jar:1.2:compile,CDDL 1.1,https://github.com/javaee/javax.annotation/
javax.mail:mail:jar:1.4:compile,Common Development and Distribution License (CDDL) v1.0,https://glassfish.dev.java.net/javaee5/mail/
javax.servlet:javax.servlet-api:jar:3.1.0:compile,CDDL,http://servlet-spec.java.net
+javax.ws.rs:javax.ws.rs-api:jar:2.0.1:compile,CDDL 1.1,https://github.com/jax-rs/api
javax.xml.bind:jaxb-api:jar:2.2.11:compile,CDDL,http://jaxb.java.net/
javax.xml.bind:jaxb-api:jar:2.2.2:compile,CDDL,https://jaxb.dev.java.net/
javax.xml.bind:jaxb-api:jar:2.3.0:compile,CDDL,https://jaxb.dev.java.net/
@@ -47,25 +52,41 @@ javax.xml.stream:stax-api:jar:1.0-2:compile,COMMON DEVELOPMENT AND DISTRIBUTION
jline:jline:jar:0.9.94:compile,BSD,http://jline.sourceforge.net
junit:junit:jar:4.12:compile,Eclipse Public License 1.0,http://junit.org
junit:junit:jar:4.4:compile,Common Public License Version 1.0,http://junit.org
+net.razorvine:pyrolite:jar:4.13:compile,MIT,https://github.com/irmen/Pyrolite
net.sf.jopt-simple:jopt-simple:jar:3.2:compile,The MIT License,http://jopt-simple.sourceforge.net
net.sf.jopt-simple:jopt-simple:jar:4.9:compile,The MIT License,http://jopt-simple.sourceforge.net
net.sf.saxon:Saxon-HE:jar:9.5.1-5:compile,Mozilla Public License Version 2.0,http://www.saxonica.com/
org.abego.treelayout:org.abego.treelayout.core:jar:1.0.1:compile,BSD 3-Clause "New" or "Revised" License (BSD-3-Clause),http://code.google.com/p/treelayout/
org.adrianwalker:multiline-string:jar:0.1.2:compile,Common Public License Version 1.0,https://github.com/benelog/multiline
org.antlr:antlr4-runtime:jar:4.5:compile,BSD 3-Clause License,http://www.antlr.org
+org.bouncycastle:bcprov-jdk15on:jar:1.52:compile,MIT,https://www.bouncycastle.org/license.html
org.clojure:clojure:jar:1.6.0:compile,Eclipse Public License 1.0,http://clojure.org/
org.clojure:clojure:jar:1.7.0:compile,Eclipse Public License 1.0,http://clojure.org/
org.codehaus.jackson:jackson-jaxrs:jar:1.8.3:compile,Apache v2,http://jackson.codehaus.org
org.codehaus.jackson:jackson-jaxrs:jar:1.9.13:compile,Apache v2,http://jackson.codehaus.org
org.codehaus.jackson:jackson-xc:jar:1.8.3:compile,Apache v2,http://jackson.codehaus.org
org.codehaus.jackson:jackson-xc:jar:1.9.13:compile,Apache v2,http://jackson.codehaus.org
+org.codehaus.janino:commons-compiler:jar:3.0.8:compile,New BSD,https://github.com/janino-compiler/janino
+org.codehaus.janino:janino:jar:3.0.8:compile,New BSD,https://github.com/janino-compiler/janino
org.codehaus.woodstox:stax2-api:jar:3.1.4:compile,The BSD License,http://wiki.fasterxml.com/WoodstoxStax2
+org.json4s:json4s-ast_2.11:jar:3.2.11:compile,ASLv2,https://github.com/json4s/json4s
+org.json4s:json4s-core_2.11:jar:3.2.11:compile,ASLv2,https://github.com/json4s/json4s
+org.json4s:json4s-jackson_2.11:jar:3.2.11:compile,ASLv2,https://github.com/json4s/json4s
org.jruby.jcodings:jcodings:jar:1.0.8:compile,MIT License,https://github.com/jruby/jcodings
org.jruby.joni:joni:jar:2.1.2:compile,MIT License,https://github.com/jruby/joni
+org.lz4:lz4-java:jar:1.4.0:compile,ASLv2,https://github.com/lz4/lz4-java
org.mitre.taxii:taxii:jar:1.1.0.1:compile,The BSD 3-Clause License,https://github.com/TAXIIProject/java-taxii
org.mitre:stix:jar:1.2.0.2:compile,The BSD 3-Clause License,https://github.com/STIXProject/java-stix
org.mockito:mockito-core:jar:1.10.19:compile,The MIT License,http://www.mockito.org
+org.roaringbitmap:RoaringBitmap:jar:0.5.11:compile,ASLv2,https://github.com/RoaringBitmap/RoaringBitmap
org.scala-lang:scala-library:jar:2.10.6:compile,BSD-like,http://www.scala-lang.org/
+org.scala-lang.modules:scala-parser-combinators_2.11:jar:1.0.4:compile,BSD-like,http://www.scala-lang.org/
+org.scala-lang.modules:scala-xml_2.11:jar:1.0.1:compile,BSD-like,http://www.scala-lang.org/
+org.scala-lang:scala-compiler:jar:2.11.0:compile,BSD-like,http://www.scala-lang.org/
+org.scala-lang:scala-library:jar:2.11.8:compile,BSD-like,http://www.scala-lang.org/
+org.scala-lang:scala-reflect:jar:2.11.8:compile,BSD-like,http://www.scala-lang.org/
+org.scala-lang:scalap:jar:2.11.0:compile,BSD-like,http://www.scala-lang.org/
+oro:oro:jar:2.0.8:compile,ASLv2,http://attic.apache.org/projects/jakarta-oro.html
xmlenc:xmlenc:jar:0.52:compile,The BSD License,http://xmlenc.sourceforge.net
asm:asm:jar:3.1:compile,BSD,http://asm.ow2.org/
com.sun.jersey.contribs:jersey-guice:jar:1.9:compile,CDDL 1.1,https://jersey.java.net/
@@ -103,7 +124,10 @@ org.slf4j:slf4j-log4j12:jar:1.7.5:compile,MIT,http://www.slf4j.org
org.slf4j:slf4j-log4j12:jar:1.7.7:compile,MIT,http://www.slf4j.org
org.slf4j:slf4j-simple:jar:1.7.7:compile,MIT,http://www.slf4j.org
org.slf4j:jcl-over-slf4j:jar:1.7.7:compile,MIT,http://www.slf4j.org
+org.slf4j:jcl-over-slf4j:jar:1.7.16:compile,MIT,http://www.slf4j.org
org.slf4j:jcl-over-slf4j:jar:1.7.21:compile,MIT,http://www.slf4j.org
+org.slf4j:jcl-over-slf4j:jar:1.7.21:compile,MIT,http://www.slf4j.org
+org.slf4j:jul-to-slf4j:jar:1.7.16:compile,MIT,http://www.slf4j.org
org.slf4j:jul-to-slf4j:jar:1.7.21:compile,MIT,http://www.slf4j.org
org.slf4j:jul-to-slf4j:jar:1.7.25:compile,MIT,http://www.slf4j.org
aopalliance:aopalliance:jar:1.0:compile,Public Domain,http://aopalliance.sourceforge.net
@@ -113,7 +137,9 @@ com.github.tony19:named-regexp:jar:0.2.3:compile,Apache License, Version 2.0,
com.google.code.findbugs:jsr305:jar:1.3.9:compile,The Apache Software License, Version 2.0,http://findbugs.sourceforge.net/
com.google.code.findbugs:jsr305:jar:3.0.0:compile,The Apache Software License, Version 2.0,http://findbugs.sourceforge.net/
com.google.code.findbugs:annotations:jar:2.0.1:compile,The Apache Software License, Version 2.0,http://findbugs.sourceforge.net/
-com.carrotsearch:hppc:jar:0.7.1:compile,ASLv2,
+com.carrotsearch:hppc:jar:0.7.1:compile,ASLv2,https://github.com/carrotsearch/hppc
+com.carrotsearch:hppc:jar:0.7.2:compile,ASLv2,https://github.com/carrotsearch/hppc
+com.clearspring.analytics:stream:jar:2.7.0:compile,ASLv2,https://github.com/addthis/stream-lib
com.clearspring.analytics:stream:jar:2.9.5:compile,ASLv2,https://github.com/addthis/stream-lib
com.codahale.metrics:metrics-core:jar:3.0.2:compile,MIT,https://github.com/codahale/metrics
com.codahale.metrics:metrics-graphite:jar:3.0.2:compile,MIT,https://github.com/codahale/metrics
@@ -132,6 +158,7 @@ com.fasterxml.jackson.core:jackson-core:jar:2.9.4:compile,ASLv2,https://github.c
com.fasterxml.jackson.core:jackson-core:jar:2.9.5:compile,ASLv2,https://github.com/FasterXML/jackson-core
com.fasterxml.jackson.core:jackson-databind:jar:2.2.3:compile,ASLv2,http://wiki.fasterxml.com/JacksonHome
com.fasterxml.jackson.core:jackson-databind:jar:2.4.3:compile,ASLv2,http://github.com/FasterXML/jackson
+com.fasterxml.jackson.core:jackson-databind:jar:2.6.7.1:compile,ASLv2,http://github.com/FasterXML/jackson
com.fasterxml.jackson.core:jackson-databind:jar:2.7.4:compile,ASLv2,http://github.com/FasterXML/jackson
com.fasterxml.jackson.core:jackson-databind:jar:2.8.3:compile,ASLv2,http://github.com/FasterXML/jackson
com.fasterxml.jackson.core:jackson-databind:jar:2.9.4:compile,ASLv2,http://github.com/FasterXML/jackson
@@ -147,6 +174,8 @@ com.fasterxml.jackson.datatype:jackson-datatype-joda:jar:2.9.5:compile,ASLv2,htt
com.fasterxml.jackson.datatype:jackson-datatype-jdk8:jar:2.9.5:compile,ASLv2,https://github.com/FasterXML/jackson-modules-java8
com.fasterxml.jackson.datatype:jackson-datatype-jsr310:jar:2.9.5:compile,ASLv2,https://github.com/FasterXML/jackson-modules-java8
com.fasterxml.jackson.module:jackson-module-parameter-names:jar:2.9.5:compile,ASLv2,https://github.com/FasterXML/jackson-modules-java8
+com.fasterxml.jackson.module:jackson-module-paranamer:jar:2.7.9:compile,ASLv2,https://github.com/FasterXML/jackson-modules-base
+com.fasterxml.jackson.module:jackson-module-scala_2.11:jar:2.6.7.1:compile,ASLv2,https://github.com/FasterXML/jackson-module-scala
com.fasterxml:classmate:jar:1.3.1:compile,ASLv2,http://github.com/cowtowncoder/java-classmate
com.fasterxml:classmate:jar:1.3.4:compile,ASLv2,http://github.com/cowtowncoder/java-classmate
com.google.code.gson:gson:jar:2.2.4:compile,The Apache Software License, Version 2.0,http://code.google.com/p/google-gson/
@@ -164,10 +193,14 @@ com.lmax:disruptor:jar:3.3.2:compile,The Apache Software License, Version 2.0,ht
com.googlecode.json-simple:json-simple:jar:1.1:compile,The Apache Software License, Version 2.0,http://code.google.com/p/json-simple/
com.googlecode.json-simple:json-simple:jar:1.1.1:compile,The Apache Software License, Version 2.0,http://code.google.com/p/json-simple/
com.jamesmurty.utils:java-xmlbuilder:jar:0.4:compile,Apache License, Version 2.0,http://code.google.com/p/java-xmlbuilder/
+com.jamesmurty.utils:java-xmlbuilder:jar:1.1:compile,Apache License, Version 2.0,http://code.google.com/p/java-xmlbuilder/
com.ning:compress-lzf:jar:1.0.2:compile,Apache License 2.0,http://github.com/ning/compress
+com.ning:compress-lzf:jar:1.0.3:compile,Apache License 2.0,http://github.com/ning/compress
com.opencsv:opencsv:jar:3.7:compile,Apache 2,http://opencsv.sf.net
com.spatial4j:spatial4j:jar:0.5:compile,The Apache Software License, Version 2.0,
com.tdunning:t-digest:jar:3.0:compile,The Apache Software License, Version 2.0,https://github.com/tdunning/t-digest
+com.univocity:univocity-parsers:jar:2.5.9:compile,ASLv2,https://github.com/uniVocity/univocity-parsers
+com.vlkan:flatbuffers:jar:1.2.0-3f79e055:compile,ASLv2,https://github.com/vy/flatbuffers
com.yammer.metrics:metrics-core:jar:2.2.0:compile,ASLv2,
commons-beanutils:commons-beanutils-core:jar:1.8.0:compile,ASLv2,http://commons.apache.org/beanutils/
commons-beanutils:commons-beanutils-core:jar:1.8.0:provided,ASLv2,http://commons.apache.org/beanutils/
@@ -203,19 +236,28 @@ commons-lang:commons-lang:jar:2.6:provided,ASLv2,http://commons.apache.org/lang/
commons-logging:commons-logging:jar:1.1.1:compile,ASLv2,http://commons.apache.org/logging
commons-logging:commons-logging:jar:1.1.3:compile,ASLv2,http://commons.apache.org/proper/commons-logging/
commons-logging:commons-logging:jar:1.2:compile,ASLv2,http://commons.apache.org/proper/commons-logging/
+commons-net:commons-net:jar:2.2:compile,ASLv2,http://commons.apache.org/net/
commons-net:commons-net:jar:3.1:compile,ASLv2,http://commons.apache.org/net/
commons-net:commons-net:jar:3.1:provided,ASLv2,http://commons.apache.org/net/
commons-text:commons-text:jar:1.1:compile,ASLv2,http://commons.apache.org/proper/commons-text/
commons-validator:commons-validator:jar:1.4.0:compile,ASLv2,http://commons.apache.org/validator/
commons-validator:commons-validator:jar:1.5.1:compile,ASLv2,http://commons.apache.org/proper/commons-validator/
commons-validator:commons-validator:jar:1.6:compile,ASLv2,http://commons.apache.org/proper/commons-validator/
+et.razorvine:pyrolite:jar:4.13:compile,MIT,https://github.com/irmen/Pyrolite
+io.airlift:aircompressor:jar:0.8:compile,ASLv2,https://github.com/airlift/aircompressor
io.confluent:kafka-avro-serializer:jar:1.0:compile,ASLv2,https://github.com/confluentinc/schema-registry/
io.confluent:kafka-schema-registry-client:jar:1.0:compile,ASLv2,https://github.com/confluentinc/schema-registry/
+io.dropwizard.metrics:metrics-core:jar:3.1.5:compile,ASLv2,https://github.com/dropwizard/metrics
+io.dropwizard.metrics:metrics-graphite:jar:3.1.5:compile,ASLv2,https://github.com/dropwizard/metrics
+io.dropwizard.metrics:metrics-json:jar:3.1.5:compile,ASLv2,https://github.com/dropwizard/metrics
+io.dropwizard.metrics:metrics-jvm:jar:3.1.5:compile,ASLv2,https://github.com/dropwizard/metrics
io.netty:netty-all:jar:4.0.23.Final:compile,ASLv2,
io.netty:netty-all:jar:4.0.23.Final:provided,ASLv2,
-io.netty:netty:jar:3.10.5.Final:compile,Apache License, Version 2.0,http://netty.io/
+io.netty:netty-all:jar:4.1.17.Final:compile,ASLv2,
io.netty:netty:jar:3.6.2.Final:compile,Apache License, Version 2.0,http://netty.io/
io.netty:netty:jar:3.7.0.Final:compile,Apache License, Version 2.0,http://netty.io/
+io.netty:netty:jar:3.9.9.Final:compile,Apache License, Version 2.0,http://netty.io/
+io.netty:netty:jar:3.10.5.Final:compile,Apache License, Version 2.0,http://netty.io/
io.thekraken:grok:jar:0.1.0:compile,Apache License, Version 2.0,http://maven.apache.org
javax.inject:javax.inject:jar:1:compile,The Apache Software License, Version 2.0,http://code.google.com/p/atinject/
joda-time:joda-time:jar:2.3:compile,Apache 2,http://www.joda.org/joda-time/
@@ -224,9 +266,12 @@ joda-time:joda-time:jar:2.9.9:compile,Apache 2,http://www.joda.org/joda-time/
log4j:log4j:jar:1.2.15:compile,The Apache Software License, Version 2.0,http://logging.apache.org:80/log4j/1.2/
log4j:log4j:jar:1.2.16:compile,The Apache Software License, Version 2.0,http://logging.apache.org/log4j/1.2/
log4j:log4j:jar:1.2.17:compile,The Apache Software License, Version 2.0,http://logging.apache.org/log4j/1.2/
+net.iharder:base64:jar:2.3.8:compile,Public Domain,http://iharder.sourceforge.net/current/java/base64/
net.java.dev.jets3t:jets3t:jar:0.9.0:compile,Apache License, Version 2.0,http://www.jets3t.org
+net.java.dev.jets3t:jets3t:jar:0.9.4:compile,Apache License, Version 2.0,http://www.jets3t.org
net.jpountz.lz4:lz4:jar:1.2.0:compile,The Apache Software License, Version 2.0,https://github.com/jpountz/lz4-java
net.jpountz.lz4:lz4:jar:1.3.0:compile,The Apache Software License, Version 2.0,https://github.com/jpountz/lz4-java
+net.sf.py4j:py4j:jar:0.10.7:compile,,
nl.jqno.equalsverifier:equalsverifier:jar:2.0.2:compile,The Apache Software License, Version 2.0,http://www.jqno.nl/equalsverifier
org.codehaus.jackson:jackson-core-asl:jar:1.9.13:compile,The Apache Software License, Version 2.0,http://jackson.codehaus.org
org.codehaus.jackson:jackson-mapper-asl:jar:1.9.13:compile,The Apache Software License, Version 2.0,http://jackson.codehaus.org
@@ -354,6 +399,7 @@ org.springframework.security:spring-security-core:jar:4.1.3.RELEASE:compile,ASLv
org.springframework.security:spring-security-core:jar:5.0.4.RELEASE:compile,ASLv2,https://github.com/spring-projects/spring-security
org.springframework.security:spring-security-web:jar:4.1.3.RELEASE:compile,ASLv2,https://github.com/spring-projects/spring-security
org.springframework.security:spring-security-web:jar:5.0.4.RELEASE:compile,ASLv2,https://github.com/spring-projects/spring-security
+org.spark-project.spark:unused:jar:1.0.0:compile,ASLv2,https://spark.apache.org
antlr:antlr:jar:2.7.7:compile,BSD 3-Clause License,http://www.antlr2.org
com.h2database:h2:jar:1.4.192:compile,EPL 1.0,http://www.h2database.com/html/license.html
com.h2database:h2:jar:1.4.197:compile,EPL 1.0,http://www.h2database.com/html/license.html
@@ -370,6 +416,7 @@ org.springframework.kafka:spring-kafka:jar:1.1.1.RELEASE:compile,ASLv2,https://g
org.springframework.kafka:spring-kafka:jar:2.0.4.RELEASE:compile,ASLv2,https://github.com/spring-projects/spring-kafka
ch.hsr:geohash:jar:1.3.0:compile,ASLv2,https://github.com/kungfoo/geohash-java
org.locationtech.spatial4j:spatial4j:jar:0.6:compile,ASLv2,https://github.com/locationtech/spatial4j
+com.github.luben:zstd-jni:jar:1.3.2-2:compile,BSD,https://github.com/luben/zstd-jni
com.github.spullara.mustache.java:compiler:jar:0.9.3:compile,ASLv2,https://github.com/spullara/mustache.java/blob/master/LICENSE
io.netty:netty-buffer:jar:4.1.13.Final:compile,ASLv2,http://netty.io/
io.netty:netty-codec-http:jar:4.1.13.Final:compile,ASLv2,http://netty.io/
@@ -395,6 +442,19 @@ org.elasticsearch:securesm:jar:1.1:compile,ASLv2,https://github.com/elastic/elas
org.hdrhistogram:HdrHistogram:jar:2.1.9:compile,BSD,https://github.com/HdrHistogram/HdrHistogram/blob/master/LICENSE.txt
com.trendmicro:tlsh:jar:3.7.1:compile,ASLv2,https://github.com/trendmicro/tlsh
org.glassfish:javax.json:jar:1.0.4:compile,Common Development and Distribution License (CDDL) v1.0,https://github.com/javaee/jsonp
+org.glassfish.hk2.external:aopalliance-repackaged:jar:2.4.0-b34:compile,Common Development and Distribution License (CDDL) v1.0,https://github.com/javaee/hk2
+org.glassfish.hk2.external:javax.inject:jar:2.4.0-b34:compile,Common Development and Distribution License (CDDL) v1.0,https://github.com/javaee/hk2
+org.glassfish.hk2:hk2-api:jar:2.4.0-b34:compile,Common Development and Distribution License (CDDL) v1.0,https://github.com/javaee/hk2
+org.glassfish.hk2:hk2-locator:jar:2.4.0-b34:compile,Common Development and Distribution License (CDDL) v1.0,https://github.com/javaee/hk2
+org.glassfish.hk2:hk2-utils:jar:2.4.0-b34:compile,Common Development and Distribution License (CDDL) v1.0,https://github.com/javaee/hk2
+org.glassfish.hk2:osgi-resource-locator:jar:1.0.1:compile,Common Development and Distribution License (CDDL) v1.0,https://github.com/javaee/hk2
+org.glassfish.jersey.bundles.repackaged:jersey-guava:jar:2.22.2:compile
+org.glassfish.jersey.containers:jersey-container-servlet-core:jar:2.22.2:compile
+org.glassfish.jersey.containers:jersey-container-servlet:jar:2.22.2:compile
+org.glassfish.jersey.core:jersey-client:jar:2.22.2:compile,EPL 2.0,https://github.com/eclipse-ee4j/jersey
+org.glassfish.jersey.core:jersey-common:jar:2.22.2:compile,EPL 2.0,https://github.com/eclipse-ee4j/jersey
+org.glassfish.jersey.core:jersey-server:jar:2.22.2:compile,EPL 2.0,https://github.com/eclipse-ee4j/jersey
+org.glassfish.jersey.media:jersey-media-jaxb:jar:2.22.2:compile,EPL 2.0,https://github.com/eclipse-ee4j/jersey
org.eclipse.persistence:javax.persistence:jar:2.1.1:compile,EPL 1.0,http://www.eclipse.org/eclipselink
org.eclipse.persistence:org.eclipse.persistence.antlr:jar:2.6.4:compile,EPL 1.0,http://www.eclipse.org/eclipselink
org.eclipse.persistence:org.eclipse.persistence.asm:jar:2.6.4:compile,EPL 1.0,http://www.eclipse.org/eclipselink
http://git-wip-us.apache.org/repos/asf/metron/blob/3bfbf018/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/DefaultMessageDistributor.java
----------------------------------------------------------------------
diff --git a/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/DefaultMessageDistributor.java b/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/DefaultMessageDistributor.java
index d950b07..673072b 100644
--- a/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/DefaultMessageDistributor.java
+++ b/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/DefaultMessageDistributor.java
@@ -28,7 +28,6 @@ import com.google.common.cache.RemovalNotification;
import org.apache.commons.lang.builder.HashCodeBuilder;
import org.apache.metron.common.configuration.profiler.ProfileConfig;
import org.apache.metron.stellar.dsl.Context;
-import org.json.simple.JSONObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
http://git-wip-us.apache.org/repos/asf/metron/blob/3bfbf018/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/MessageRoute.java
----------------------------------------------------------------------
diff --git a/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/MessageRoute.java b/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/MessageRoute.java
index e76b897..7cdb607 100644
--- a/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/MessageRoute.java
+++ b/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/MessageRoute.java
@@ -22,11 +22,11 @@ package org.apache.metron.profiler;
import org.apache.commons.lang3.builder.EqualsBuilder;
import org.apache.commons.lang3.builder.HashCodeBuilder;
-import org.apache.commons.lang3.builder.ToStringBuilder;
import org.apache.metron.common.configuration.profiler.ProfileConfig;
import org.json.simple.JSONObject;
import java.io.Serializable;
+import java.util.Map;
/**
* Defines the 'route' a message must take through the Profiler.
@@ -74,6 +74,10 @@ public class MessageRoute implements Serializable {
this.timestamp = timestamp;
}
+ public MessageRoute() {
+ // necessary for serialization
+ }
+
public String getEntity() {
return entity;
}
@@ -98,6 +102,10 @@ public class MessageRoute implements Serializable {
this.message = message;
}
+ public void setMessage(Map message) {
+ this.message = new JSONObject(message);
+ }
+
public Long getTimestamp() {
return timestamp;
}
http://git-wip-us.apache.org/repos/asf/metron/blob/3bfbf018/metron-analytics/metron-profiler-spark/pom.xml
----------------------------------------------------------------------
diff --git a/metron-analytics/metron-profiler-spark/pom.xml b/metron-analytics/metron-profiler-spark/pom.xml
new file mode 100644
index 0000000..93ce08a
--- /dev/null
+++ b/metron-analytics/metron-profiler-spark/pom.xml
@@ -0,0 +1,195 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software
+ Foundation (ASF) under one or more contributor license agreements. See the
+ NOTICE file distributed with this work for additional information regarding
+ copyright ownership. The ASF licenses this file to You under the Apache License,
+ Version 2.0 (the "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
+ Unless required by applicable law or agreed to in writing, software distributed
+ under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES
+ OR CONDITIONS OF ANY KIND, either express or implied. See the License for
+ the specific language governing permissions and limitations under the License.
+ -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <parent>
+ <groupId>org.apache.metron</groupId>
+ <artifactId>metron-analytics</artifactId>
+ <version>0.5.1</version>
+ </parent>
+ <artifactId>metron-profiler-spark</artifactId>
+ <url>https://metron.apache.org/</url>
+ <properties>
+ <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+ <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
+ </properties>
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.spark</groupId>
+ <artifactId>spark-core_2.11</artifactId>
+ <version>${global_spark_version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.spark</groupId>
+ <artifactId>spark-sql_2.11</artifactId>
+ <version>${global_spark_version}</version>
+ <exclusions>
+ <exclusion>
+ <groupId>org.antlr</groupId>
+ <artifactId>antlr-runtime</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.metron</groupId>
+ <artifactId>metron-profiler-common</artifactId>
+ <version>${project.parent.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.metron</groupId>
+ <artifactId>metron-common</artifactId>
+ <version>${project.parent.version}</version>
+ <exclusions>
+ <exclusion>
+ <groupId>com.google.guava</groupId>
+ <artifactId>guava</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-log4j12</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.metron</groupId>
+ <artifactId>metron-hbase</artifactId>
+ <version>${project.parent.version}</version>
+ <exclusions>
+ <exclusion>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-log4j12</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.apache.storm</groupId>
+ <artifactId>storm-hbase</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.metron</groupId>
+ <artifactId>metron-hbase</artifactId>
+ <version>${project.parent.version}</version>
+ <scope>test</scope>
+ <type>test-jar</type>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hbase</groupId>
+ <artifactId>hbase-client</artifactId>
+ <version>${global_hbase_version}</version>
+ <exclusions>
+ <exclusion>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-auth</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-common</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-mapreduce-client-core</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <!-- allows profiles to use the Stellar stats functions -->
+ <groupId>org.apache.metron</groupId>
+ <artifactId>metron-statistics</artifactId>
+ <version>${project.parent.version}</version>
+ <exclusions>
+ <exclusion>
+ <artifactId>kryo</artifactId>
+ <groupId>com.esotericsoftware</groupId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.logging.log4j</groupId>
+ <artifactId>log4j-api</artifactId>
+ <version>${global_log4j_core_version}</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.logging.log4j</groupId>
+ <artifactId>log4j-core</artifactId>
+ <version>${global_log4j_core_version}</version>
+ <scope>test</scope>
+ </dependency>
+ </dependencies>
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-shade-plugin</artifactId>
+ <version>${global_shade_version}</version>
+ <configuration>
+ <createDependencyReducedPom>true</createDependencyReducedPom>
+ </configuration>
+ <executions>
+ <execution>
+ <phase>package</phase>
+ <goals>
+ <goal>shade</goal>
+ </goals>
+ <configuration>
+ <filters>
+ <filter>
+ <artifact>*:*</artifact>
+ <excludes>
+ <exclude>META-INF/*.SF</exclude>
+ <exclude>META-INF/*.DSA</exclude>
+ <exclude>META-INF/*.RSA</exclude>
+ </excludes>
+ </filter>
+ </filters>
+ <relocations>
+ <relocation>
+ <pattern>com.tdunning</pattern>
+ <shadedPattern>org.apache.metron.tdunning</shadedPattern>
+ </relocation>
+ </relocations>
+ <artifactSet>
+ <excludes>
+ <exclude>storm:storm-core:*</exclude>
+ <exclude>storm:storm-lib:*</exclude>
+ <exclude>org.slf4j.impl*</exclude>
+ <exclude>org.slf4j:slf4j-log4j*</exclude>
+ </excludes>
+ </artifactSet>
+ <transformers>
+ <transformer
+ implementation="org.apache.maven.plugins.shade.resource.DontIncludeResourceTransformer">
+ <resources>
+ <resource>.yaml</resource>
+ <resource>LICENSE.txt</resource>
+ <resource>ASL2.0</resource>
+ <resource>NOTICE.txt</resource>
+ </resources>
+ </transformer>
+ <transformer
+ implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
+ <transformer
+ implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
+ <mainClass></mainClass>
+ </transformer>
+ </transformers>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+</project>
http://git-wip-us.apache.org/repos/asf/metron/blob/3bfbf018/metron-analytics/metron-profiler-spark/src/main/java/org/apache/metron/profiler/spark/BatchProfiler.java
----------------------------------------------------------------------
diff --git a/metron-analytics/metron-profiler-spark/src/main/java/org/apache/metron/profiler/spark/BatchProfiler.java b/metron-analytics/metron-profiler-spark/src/main/java/org/apache/metron/profiler/spark/BatchProfiler.java
new file mode 100644
index 0000000..f999613
--- /dev/null
+++ b/metron-analytics/metron-profiler-spark/src/main/java/org/apache/metron/profiler/spark/BatchProfiler.java
@@ -0,0 +1,102 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.metron.profiler.spark;
+
+import com.google.common.collect.Maps;
+import org.apache.metron.common.configuration.profiler.ProfilerConfig;
+import org.apache.metron.profiler.MessageRoute;
+import org.apache.metron.profiler.spark.function.GroupByPeriodFunction;
+import org.apache.metron.profiler.spark.function.HBaseWriterFunction;
+import org.apache.metron.profiler.spark.function.MessageRouterFunction;
+import org.apache.metron.profiler.spark.function.ProfileBuilderFunction;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Encoders;
+import org.apache.spark.sql.SparkSession;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Serializable;
+import java.lang.invoke.MethodHandles;
+import java.util.Map;
+import java.util.Properties;
+
+import static org.apache.metron.profiler.spark.BatchProfilerConfig.TELEMETRY_INPUT_FORMAT;
+import static org.apache.metron.profiler.spark.BatchProfilerConfig.TELEMETRY_INPUT_PATH;
+import static org.apache.spark.sql.functions.sum;
+
+/**
+ * The 'Batch Profiler' that generates profiles by consuming data in batch from archived telemetry.
+ *
+ * <p>The Batch Profiler is executed in Spark.
+ */
+public class BatchProfiler implements Serializable {
+
+ protected static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+ /**
+ * Execute the Batch Profiler.
+ *
+ * @param spark The spark session.
+ * @param properties The profiler configuration properties.
+ * @param profiles The profile definitions.
+ * @return The number of profile measurements produced.
+ */
+ public long run(SparkSession spark,
+ Properties properties,
+ Properties globalProperties,
+ ProfilerConfig profiles) {
+
+ LOG.debug("Building {} profile(s)", profiles.getProfiles().size());
+ Map<String, String> globals = Maps.fromProperties(globalProperties);
+
+ String inputFormat = TELEMETRY_INPUT_FORMAT.get(properties, String.class);
+ String inputPath = TELEMETRY_INPUT_PATH.get(properties, String.class);
+ LOG.debug("Loading telemetry from '{}'", inputPath);
+
+ // fetch the archived telemetry
+ Dataset<String> telemetry = spark
+ .read()
+ .format(inputFormat)
+ .load(inputPath)
+ .as(Encoders.STRING());
+ LOG.debug("Found {} telemetry record(s)", telemetry.cache().count());
+
+ // find all routes for each message
+ Dataset<MessageRoute> routes = telemetry
+ .flatMap(new MessageRouterFunction(profiles, globals), Encoders.bean(MessageRoute.class));
+ LOG.debug("Generated {} message route(s)", routes.cache().count());
+
+ // build the profiles
+ Dataset<ProfileMeasurementAdapter> measurements = routes
+ .groupByKey(new GroupByPeriodFunction(properties), Encoders.STRING())
+ .mapGroups(new ProfileBuilderFunction(properties, globals), Encoders.bean(ProfileMeasurementAdapter.class));
+ LOG.debug("Produced {} profile measurement(s)", measurements.cache().count());
+
+ // write the profile measurements to HBase
+ long count = measurements
+ .mapPartitions(new HBaseWriterFunction(properties), Encoders.INT())
+ .agg(sum("value"))
+ .head()
+ .getLong(0);
+ LOG.debug("{} profile measurement(s) written to HBase", count);
+
+ return count;
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/metron/blob/3bfbf018/metron-analytics/metron-profiler-spark/src/main/java/org/apache/metron/profiler/spark/BatchProfilerConfig.java
----------------------------------------------------------------------
diff --git a/metron-analytics/metron-profiler-spark/src/main/java/org/apache/metron/profiler/spark/BatchProfilerConfig.java b/metron-analytics/metron-profiler-spark/src/main/java/org/apache/metron/profiler/spark/BatchProfilerConfig.java
new file mode 100644
index 0000000..054806e
--- /dev/null
+++ b/metron-analytics/metron-profiler-spark/src/main/java/org/apache/metron/profiler/spark/BatchProfilerConfig.java
@@ -0,0 +1,190 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.metron.profiler.spark;
+
+import org.apache.hadoop.hbase.client.Durability;
+import org.apache.metron.stellar.common.utils.ConversionUtils;
+
+import java.util.Map;
+import java.util.Properties;
+
+/**
+ * Defines the configuration values recognized by the Batch Profiler.
+ */
+public enum BatchProfilerConfig {
+
+ PERIOD_DURATION_UNITS("profiler.period.duration.units", "MINUTES", String.class),
+
+ PERIOD_DURATION("profiler.period.duration", 15, Integer.class),
+
+ HBASE_SALT_DIVISOR("profiler.hbase.salt.divisor", 1000, Integer.class),
+
+ HBASE_TABLE_PROVIDER("profiler.hbase.table.provider", "org.apache.metron.hbase.HTableProvider", String.class),
+
+ HBASE_TABLE_NAME("profiler.hbase.table", "profiler", String.class),
+
+ HBASE_COLUMN_FAMILY("profiler.hbase.column.family", "P", String.class),
+
+ HBASE_WRITE_DURABILITY("profiler.hbase.durability", Durability.USE_DEFAULT, Durability.class),
+
+ TELEMETRY_INPUT_FORMAT("profiler.batch.input.format", "text", String.class),
+
+ TELEMETRY_INPUT_PATH("profiler.batch.input.path", "hdfs://localhost:9000/apps/metron/indexing/indexed/*/*", String.class);
+
+ /**
+ * The key for the configuration value.
+ */
+ private String key;
+
+ /**
+ * The default value of the configuration, if none other is specified.
+ */
+ private Object defaultValue;
+
+ /**
+ * The type of the configuration value.
+ */
+ private Class<?> valueType;
+
+ BatchProfilerConfig(String key, Object defaultValue, Class<?> valueType) {
+ this.key = key;
+ this.defaultValue = defaultValue;
+ this.valueType = valueType;
+ }
+
+ /**
+ * Returns the key of the configuration value.
+ */
+ public String getKey() {
+ return key;
+ }
+
+ /**
+ * Returns the default value of the configuration.
+ */
+ public Object getDefault() {
+ return getDefault(valueType);
+ }
+
+ /**
+ * Returns the default value of the configuration, cast to the expected type.
+ *
+ * @param clazz The class of the expected type of the configuration value.
+ * @param <T> The expected type of the configuration value.
+ */
+ public <T> T getDefault(Class<T> clazz) {
+ return defaultValue == null ? null: ConversionUtils.convert(defaultValue, clazz);
+ }
+
+ /**
+ * Returns the configuration value from a map of configuration values.
+ *
+ * @param config A map containing configuration values.
+ */
+ public Object get(Map<String, String> config) {
+ return getOrDefault(config, defaultValue);
+ }
+
+ /**
+ * Returns the configuration value from a map of configuration values.
+ *
+ * @param properties Configuration properties.
+ */
+ public Object get(Properties properties) {
+ return getOrDefault(properties, defaultValue);
+ }
+
+ /**
+ * Returns the configuration value from a map of configuration values, cast to the expected type.
+ *
+ * @param config A map containing configuration values.
+ */
+ public <T> T get(Map<String, String> config, Class<T> clazz) {
+ return getOrDefault(config, defaultValue, clazz);
+ }
+
+ /**
+ * Returns the configuration value from a map of configuration values, cast to the expected type.
+ *
+ * @param properties Configuration properties.
+ */
+ public <T> T get(Properties properties, Class<T> clazz) {
+ return getOrDefault(properties, defaultValue, clazz);
+ }
+
+ /**
+ * Returns the configuration value from a map of configuration values. If the value is not specified,
+ * the default value is returned.
+ *
+ * @param config A map containing configuration values.
+ * @param defaultValue The default value to return, if one is not defined.
+ * @return The configuration value or the specified default, if one is not defined.
+ */
+ private Object getOrDefault(Map<String, String> config, Object defaultValue) {
+ return getOrDefault(config, defaultValue, valueType);
+ }
+
+ /**
+ * Returns the configuration value from a map of configuration values. If the value is not specified,
+ * the default value is returned.
+ *
+ * @param properties A map containing configuration values.
+ * @param defaultValue The default value to return, if one is not defined.
+ * @return The configuration value or the specified default, if one is not defined.
+ */
+ private Object getOrDefault(Properties properties, Object defaultValue) {
+ return getOrDefault(properties, defaultValue, valueType);
+ }
+
+ /**
+ * Returns the configuration value, cast to the expected type, from a map of configuration values.
+ * If the value is not specified, the default value is returned.
+ *
+ * @param config A map containing configuration values.
+ * @param defaultValue The default value to return, if one is not defined.
+ * @param clazz The class of the expected type of the configuration value.
+ * @param <T> The expected type of the configuration value.
+ * @return The configuration value or the specified default, if one is not defined.
+ */
+ private <T> T getOrDefault(Map<String, String> config, Object defaultValue, Class<T> clazz) {
+ Object value = config.getOrDefault(key, defaultValue.toString());
+ return value == null ? null : ConversionUtils.convert(value, clazz);
+ }
+
+ /**
+ * Returns the configuration value, cast to the expected type, from a map of configuration values.
+ * If the value is not specified, the default value is returned.
+ *
+ * @param properties Configuration properties.
+ * @param defaultValue The default value to return, if one is not defined.
+ * @param clazz The class of the expected type of the configuration value.
+ * @param <T> The expected type of the configuration value.
+ * @return The configuration value or the specified default, if one is not defined.
+ */
+ private <T> T getOrDefault(Properties properties, Object defaultValue, Class<T> clazz) {
+ Object value = properties.getOrDefault(key, defaultValue);
+ return value == null ? null : ConversionUtils.convert(value, clazz);
+ }
+
+ @Override
+ public String toString() {
+ return key;
+ }
+}
http://git-wip-us.apache.org/repos/asf/metron/blob/3bfbf018/metron-analytics/metron-profiler-spark/src/main/java/org/apache/metron/profiler/spark/ProfileMeasurementAdapter.java
----------------------------------------------------------------------
diff --git a/metron-analytics/metron-profiler-spark/src/main/java/org/apache/metron/profiler/spark/ProfileMeasurementAdapter.java b/metron-analytics/metron-profiler-spark/src/main/java/org/apache/metron/profiler/spark/ProfileMeasurementAdapter.java
new file mode 100644
index 0000000..5da7d04
--- /dev/null
+++ b/metron-analytics/metron-profiler-spark/src/main/java/org/apache/metron/profiler/spark/ProfileMeasurementAdapter.java
@@ -0,0 +1,132 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.metron.profiler.spark;
+
+import org.apache.metron.common.utils.SerDeUtils;
+import org.apache.metron.profiler.ProfileMeasurement;
+import org.apache.metron.profiler.ProfilePeriod;
+
+import java.io.Serializable;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * An adapter for the {@link ProfileMeasurement} class so that the data
+ * can be serialized as required by Spark.
+ *
+ * <p>The `Encoders.bean(Class<T>)` encoder does not handle serialization of type `Object` well. This
+ * adapter encodes the profile's result as byte[] rather than an Object to work around this.
+ */
+public class ProfileMeasurementAdapter implements Serializable {
+
+ /**
+ * The name of the profile that this measurement is associated with.
+ */
+ private String profileName;
+
+ /**
+ * The name of the entity being profiled.
+ */
+ private String entity;
+
+ /**
+ * A monotonically increasing number identifying the period. The first period is 0
+ * and began at the epoch.
+ */
+ private Long periodId;
+
+ /**
+ * The duration of each period in milliseconds.
+ */
+ private Long durationMillis;
+
+ /**
+ * The result of evaluating the profile expression.
+ *
+ * The `Encoders.bean(Class<T>)` encoder does not handle serialization of type `Object`. This
+ * adapter encodes the profile's result as `byte[]` rather than an `Object` to work around this.
+ */
+ private byte[] profileValue;
+
+ public ProfileMeasurementAdapter() {
+ // default constructor required for serialization in Spark
+ }
+
+ public ProfileMeasurementAdapter(ProfileMeasurement measurement) {
+ this.profileName = measurement.getProfileName();
+ this.entity = measurement.getEntity();
+ this.periodId = measurement.getPeriod().getPeriod();
+ this.durationMillis = measurement.getPeriod().getDurationMillis();
+ this.profileValue = SerDeUtils.toBytes(measurement.getProfileValue());
+ }
+
+ public ProfileMeasurement toProfileMeasurement() {
+ ProfilePeriod period = ProfilePeriod.fromPeriodId(periodId, durationMillis, TimeUnit.MILLISECONDS);
+ ProfileMeasurement measurement = new ProfileMeasurement()
+ .withProfileName(profileName)
+ .withEntity(entity)
+ .withPeriod(period)
+ .withProfileValue(SerDeUtils.fromBytes(profileValue, Object.class));
+ return measurement;
+ }
+
+ public String getProfileName() {
+ return profileName;
+ }
+
+ public void setProfileName(String profileName) {
+ this.profileName = profileName;
+ }
+
+ public String getEntity() {
+ return entity;
+ }
+
+ public void setEntity(String entity) {
+ this.entity = entity;
+ }
+
+ public Long getPeriodId() {
+ return periodId;
+ }
+
+ public void setPeriodId(Long periodId) {
+ this.periodId = periodId;
+ }
+
+ public Long getDurationMillis() {
+ return durationMillis;
+ }
+
+ public void setDurationMillis(Long durationMillis) {
+ this.durationMillis = durationMillis;
+ }
+
+ public byte[] getProfileValue() {
+ return profileValue;
+ }
+
+ public void setProfileValue(byte[] profileValue) {
+ this.profileValue = profileValue;
+ }
+
+ public void setProfileValue(Object profileValue) {
+ this.profileValue = SerDeUtils.toBytes(profileValue);
+ }
+}
http://git-wip-us.apache.org/repos/asf/metron/blob/3bfbf018/metron-analytics/metron-profiler-spark/src/main/java/org/apache/metron/profiler/spark/function/GroupByPeriodFunction.java
----------------------------------------------------------------------
diff --git a/metron-analytics/metron-profiler-spark/src/main/java/org/apache/metron/profiler/spark/function/GroupByPeriodFunction.java b/metron-analytics/metron-profiler-spark/src/main/java/org/apache/metron/profiler/spark/function/GroupByPeriodFunction.java
new file mode 100644
index 0000000..1b602f4
--- /dev/null
+++ b/metron-analytics/metron-profiler-spark/src/main/java/org/apache/metron/profiler/spark/function/GroupByPeriodFunction.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.metron.profiler.spark.function;
+
+import org.apache.metron.profiler.MessageRoute;
+import org.apache.metron.profiler.ProfilePeriod;
+import org.apache.spark.api.java.function.MapFunction;
+
+import java.util.Properties;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.metron.profiler.spark.BatchProfilerConfig.PERIOD_DURATION;
+import static org.apache.metron.profiler.spark.BatchProfilerConfig.PERIOD_DURATION_UNITS;
+
+/**
+ * Defines how {@link MessageRoute} are grouped.
+ *
+ * The routes are grouped by (profile, entity, periodId) so that all of the required
+ * messages are available to produce a {@link org.apache.metron.profiler.ProfileMeasurement}.
+ */
+public class GroupByPeriodFunction implements MapFunction<MessageRoute, String> {
+
+ /**
+ * The duration of each profile period.
+ */
+ private int periodDuration;
+
+ /**
+ * The units of the period duration.
+ */
+ private TimeUnit periodDurationUnits;
+
+ public GroupByPeriodFunction(Properties profilerProperties) {
+ periodDurationUnits = TimeUnit.valueOf(PERIOD_DURATION_UNITS.get(profilerProperties, String.class));
+ periodDuration = PERIOD_DURATION.get(profilerProperties, Integer.class);
+ }
+
+ @Override
+ public String call(MessageRoute route) {
+ ProfilePeriod period = ProfilePeriod.fromTimestamp(route.getTimestamp(), periodDuration, periodDurationUnits);
+ return route.getProfileDefinition().getProfile() + "-" + route.getEntity() + "-" + period.getPeriod();
+ }
+}
http://git-wip-us.apache.org/repos/asf/metron/blob/3bfbf018/metron-analytics/metron-profiler-spark/src/main/java/org/apache/metron/profiler/spark/function/HBaseWriterFunction.java
----------------------------------------------------------------------
diff --git a/metron-analytics/metron-profiler-spark/src/main/java/org/apache/metron/profiler/spark/function/HBaseWriterFunction.java b/metron-analytics/metron-profiler-spark/src/main/java/org/apache/metron/profiler/spark/function/HBaseWriterFunction.java
new file mode 100644
index 0000000..cfabd94
--- /dev/null
+++ b/metron-analytics/metron-profiler-spark/src/main/java/org/apache/metron/profiler/spark/function/HBaseWriterFunction.java
@@ -0,0 +1,171 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.metron.profiler.spark.function;
+
+import org.apache.commons.collections4.IteratorUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.client.Durability;
+import org.apache.metron.hbase.HTableProvider;
+import org.apache.metron.hbase.TableProvider;
+import org.apache.metron.hbase.client.HBaseClient;
+import org.apache.metron.profiler.ProfileMeasurement;
+import org.apache.metron.profiler.hbase.ColumnBuilder;
+import org.apache.metron.profiler.hbase.RowKeyBuilder;
+import org.apache.metron.profiler.hbase.SaltyRowKeyBuilder;
+import org.apache.metron.profiler.hbase.ValueOnlyColumnBuilder;
+import org.apache.metron.profiler.spark.ProfileMeasurementAdapter;
+import org.apache.spark.api.java.function.MapPartitionsFunction;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.lang.invoke.MethodHandles;
+import java.lang.reflect.InvocationTargetException;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Properties;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.metron.profiler.spark.BatchProfilerConfig.HBASE_COLUMN_FAMILY;
+import static org.apache.metron.profiler.spark.BatchProfilerConfig.HBASE_SALT_DIVISOR;
+import static org.apache.metron.profiler.spark.BatchProfilerConfig.HBASE_TABLE_NAME;
+import static org.apache.metron.profiler.spark.BatchProfilerConfig.HBASE_TABLE_PROVIDER;
+import static org.apache.metron.profiler.spark.BatchProfilerConfig.HBASE_WRITE_DURABILITY;
+import static org.apache.metron.profiler.spark.BatchProfilerConfig.PERIOD_DURATION;
+import static org.apache.metron.profiler.spark.BatchProfilerConfig.PERIOD_DURATION_UNITS;
+
+/**
+ * Writes the profile measurements to HBase in Spark.
+ */
+public class HBaseWriterFunction implements MapPartitionsFunction<ProfileMeasurementAdapter, Integer> {
+
+ protected static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+ private TableProvider tableProvider;
+
+ /**
+ * The name of the HBase table to write to.
+ */
+ private String tableName;
+
+ /**
+ * The durability guarantee when writing to HBase.
+ */
+ private Durability durability;
+
+ /**
+ * Builds the HBase row key.
+ */
+ private RowKeyBuilder rowKeyBuilder;
+
+ /**
+ * Assembles the columns for HBase.
+ */
+ private ColumnBuilder columnBuilder;
+
+ public HBaseWriterFunction(Properties properties) {
+ tableName = HBASE_TABLE_NAME.get(properties, String.class);
+ durability = HBASE_WRITE_DURABILITY.get(properties, Durability.class);
+
+ // row key builder
+ int saltDivisor = HBASE_SALT_DIVISOR.get(properties, Integer.class);
+ int periodDuration = PERIOD_DURATION.get(properties, Integer.class);
+ TimeUnit periodDurationUnits = TimeUnit.valueOf(PERIOD_DURATION_UNITS.get(properties, String.class));
+ rowKeyBuilder = new SaltyRowKeyBuilder(saltDivisor, periodDuration, periodDurationUnits);
+
+ // column builder
+ String columnFamily = HBASE_COLUMN_FAMILY.get(properties, String.class);
+ columnBuilder = new ValueOnlyColumnBuilder(columnFamily);
+
+ // hbase table provider
+ String providerImpl = HBASE_TABLE_PROVIDER.get(properties, String.class);
+ tableProvider = createTableProvider(providerImpl);
+ }
+
+ /**
+ * Writes a set of measurements to HBase.
+ *
+ * @param iterator The measurements to write.
+ * @return The number of measurements written to HBase.
+ */
+ @Override
+ public Iterator<Integer> call(Iterator<ProfileMeasurementAdapter> iterator) throws Exception {
+ int count = 0;
+ LOG.debug("About to write profile measurement(s) to HBase");
+
+ // do not open hbase connection, if nothing to write
+ List<ProfileMeasurementAdapter> measurements = IteratorUtils.toList(iterator);
+ if(measurements.size() > 0) {
+
+ // open an HBase connection
+ Configuration config = HBaseConfiguration.create();
+ try (HBaseClient client = new HBaseClient(tableProvider, config, tableName)) {
+
+ for (ProfileMeasurementAdapter adapter : measurements) {
+ ProfileMeasurement m = adapter.toProfileMeasurement();
+ client.addMutation(rowKeyBuilder.rowKey(m), columnBuilder.columns(m), durability);
+ }
+ count = client.mutate();
+
+ } catch (IOException e) {
+ LOG.error("Unable to open connection to HBase", e);
+ throw new RuntimeException(e);
+ }
+ }
+
+ LOG.debug("{} profile measurement(s) written to HBase", count);
+ return IteratorUtils.singletonIterator(count);
+ }
+
+ /**
+ * Set the {@link TableProvider} using the class name of the provider.
+ * @param providerImpl The name of the class.
+ * @return
+ */
+ public HBaseWriterFunction withTableProviderImpl(String providerImpl) {
+ this.tableProvider = createTableProvider(providerImpl);
+ return this;
+ }
+
+ /**
+ * Creates a TableProvider based on a class name.
+ * @param providerImpl The class name of a TableProvider
+ */
+ private static TableProvider createTableProvider(String providerImpl) {
+ LOG.trace("Creating table provider; className={}", providerImpl);
+
+ // if class name not defined, use a reasonable default
+ if(StringUtils.isEmpty(providerImpl) || providerImpl.charAt(0) == '$') {
+ return new HTableProvider();
+ }
+
+ // instantiate the table provider
+ try {
+ Class<? extends TableProvider> clazz = (Class<? extends TableProvider>) Class.forName(providerImpl);
+ return clazz.getConstructor().newInstance();
+
+ } catch (InstantiationException | IllegalAccessException | IllegalStateException |
+ InvocationTargetException | NoSuchMethodException | ClassNotFoundException e) {
+ throw new IllegalStateException("Unable to instantiate connector", e);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/metron/blob/3bfbf018/metron-analytics/metron-profiler-spark/src/main/java/org/apache/metron/profiler/spark/function/MessageRouterFunction.java
----------------------------------------------------------------------
diff --git a/metron-analytics/metron-profiler-spark/src/main/java/org/apache/metron/profiler/spark/function/MessageRouterFunction.java b/metron-analytics/metron-profiler-spark/src/main/java/org/apache/metron/profiler/spark/function/MessageRouterFunction.java
new file mode 100644
index 0000000..cf8029f
--- /dev/null
+++ b/metron-analytics/metron-profiler-spark/src/main/java/org/apache/metron/profiler/spark/function/MessageRouterFunction.java
@@ -0,0 +1,113 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.metron.profiler.spark.function;
+
+import org.apache.metron.common.configuration.profiler.ProfilerConfig;
+import org.apache.metron.profiler.DefaultMessageRouter;
+import org.apache.metron.profiler.MessageRoute;
+import org.apache.metron.profiler.MessageRouter;
+import org.apache.metron.stellar.dsl.Context;
+import org.apache.spark.api.java.function.FlatMapFunction;
+import org.json.simple.JSONObject;
+import org.json.simple.parser.JSONParser;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.lang.invoke.MethodHandles;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+
+/**
+ * The function responsible for finding routes for a given message in Spark.
+ */
+public class MessageRouterFunction implements FlatMapFunction<String, MessageRoute> {
+
+ protected static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+ /**
+ * The global configuration used for the execution of Stellar.
+ */
+ private Map<String, String> globals;
+
+ /**
+ * The profile definitions.
+ */
+ private ProfilerConfig profilerConfig;
+
+ public MessageRouterFunction(ProfilerConfig profilerConfig, Map<String, String> globals) {
+ this.profilerConfig = profilerConfig;
+ this.globals = globals;
+ }
+
+ /**
+ * Find all routes for a given telemetry message.
+ *
+ * <p>A message may need routed to multiple profiles should it be needed by more than one. A
+ * message may also not be routed should it not be needed by any profiles.
+ *
+ * @param jsonMessage The raw JSON message.
+ * @return A list of message routes.
+ */
+ @Override
+ public Iterator<MessageRoute> call(String jsonMessage) throws Exception {
+ List<MessageRoute> routes;
+
+ JSONParser parser = new JSONParser();
+ Context context = TaskUtils.getContext(globals);
+ MessageRouter router = new DefaultMessageRouter(context);
+
+ // parse the raw message
+ Optional<JSONObject> message = toMessage(jsonMessage, parser);
+ if(message.isPresent()) {
+
+ // find all routes
+ routes = router.route(message.get(), profilerConfig, context);
+ LOG.trace("Found {} route(s) for a message", routes.size());
+
+ } else {
+ // the message is not valid and must be ignored
+ routes = Collections.emptyList();
+ LOG.trace("No route possible. Unable to parse message.");
+ }
+
+ return routes.iterator();
+ }
+
+ /**
+ * Parses the raw JSON of a message.
+ *
+ * @param json The raw JSON to parse.
+ * @param parser The parser to use.
+ * @return The parsed telemetry message.
+ */
+ private static Optional<JSONObject> toMessage(String json, JSONParser parser) {
+ try {
+ JSONObject message = (JSONObject) parser.parse(json);
+ return Optional.of(message);
+
+ } catch(Throwable e) {
+ LOG.warn(String.format("Unable to parse message, message will be ignored; message='%s'", json), e);
+ return Optional.empty();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/metron/blob/3bfbf018/metron-analytics/metron-profiler-spark/src/main/java/org/apache/metron/profiler/spark/function/ProfileBuilderFunction.java
----------------------------------------------------------------------
diff --git a/metron-analytics/metron-profiler-spark/src/main/java/org/apache/metron/profiler/spark/function/ProfileBuilderFunction.java b/metron-analytics/metron-profiler-spark/src/main/java/org/apache/metron/profiler/spark/function/ProfileBuilderFunction.java
new file mode 100644
index 0000000..273695b
--- /dev/null
+++ b/metron-analytics/metron-profiler-spark/src/main/java/org/apache/metron/profiler/spark/function/ProfileBuilderFunction.java
@@ -0,0 +1,107 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.metron.profiler.spark.function;
+
+import org.apache.metron.profiler.DefaultMessageDistributor;
+import org.apache.metron.profiler.MessageDistributor;
+import org.apache.metron.profiler.MessageRoute;
+import org.apache.metron.profiler.ProfileMeasurement;
+import org.apache.metron.profiler.spark.ProfileMeasurementAdapter;
+import org.apache.metron.stellar.dsl.Context;
+import org.apache.spark.api.java.function.MapGroupsFunction;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.lang.invoke.MethodHandles;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+import java.util.stream.StreamSupport;
+
+import static java.util.Comparator.comparing;
+import static org.apache.metron.profiler.spark.BatchProfilerConfig.PERIOD_DURATION;
+import static org.apache.metron.profiler.spark.BatchProfilerConfig.PERIOD_DURATION_UNITS;
+
+/**
+ * The function responsible for building profiles in Spark.
+ */
+public class ProfileBuilderFunction implements MapGroupsFunction<String, MessageRoute, ProfileMeasurementAdapter> {
+
+ protected static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+ private long periodDurationMillis;
+ private Map<String, String> globals;
+
+ public ProfileBuilderFunction(Properties properties, Map<String, String> globals) {
+ TimeUnit periodDurationUnits = TimeUnit.valueOf(PERIOD_DURATION_UNITS.get(properties, String.class));
+ int periodDuration = PERIOD_DURATION.get(properties, Integer.class);
+ this.periodDurationMillis = periodDurationUnits.toMillis(periodDuration);
+ this.globals = globals;
+ }
+
+ /**
+ * Build a profile from a set of message routes.
+ *
+ * <p>This assumes that all of the necessary routes have been provided
+ *
+ * @param group The group identifier.
+ * @param iterator The message routes.
+ * @return
+ */
+ @Override
+ public ProfileMeasurementAdapter call(String group, Iterator<MessageRoute> iterator) throws Exception {
+ // create the distributor; some settings are unnecessary because it is cleaned-up immediately after processing the batch
+ int maxRoutes = Integer.MAX_VALUE;
+ long profileTTLMillis = Long.MAX_VALUE;
+ MessageDistributor distributor = new DefaultMessageDistributor(periodDurationMillis, profileTTLMillis, maxRoutes);
+ Context context = TaskUtils.getContext(globals);
+
+ // sort the messages/routes
+ List<MessageRoute> routes = toStream(iterator)
+ .sorted(comparing(rt -> rt.getTimestamp()))
+ .collect(Collectors.toList());
+ LOG.debug("Building a profile for group '{}' from {} message(s)", group, routes.size());
+
+ // apply each message/route to build the profile
+ for(MessageRoute route: routes) {
+ distributor.distribute(route, context);
+ }
+
+ // flush the profile
+ List<ProfileMeasurement> measurements = distributor.flush();
+ if(measurements.size() > 1) {
+ throw new IllegalStateException("No more than 1 profile measurement is expected");
+ }
+
+ ProfileMeasurement m = measurements.get(0);
+ LOG.debug("Profile measurement created; profile={}, entity={}, period={}, value={}",
+ m.getProfileName(), m.getEntity(), m.getPeriod().getPeriod(), m.getProfileValue());
+ return new ProfileMeasurementAdapter(m);
+ }
+
+ private static <T> Stream<T> toStream(Iterator<T> iterator) {
+ Iterable<T> iterable = () -> iterator;
+ return StreamSupport.stream(iterable.spliterator(), false);
+ }
+}
http://git-wip-us.apache.org/repos/asf/metron/blob/3bfbf018/metron-analytics/metron-profiler-spark/src/main/java/org/apache/metron/profiler/spark/function/TaskUtils.java
----------------------------------------------------------------------
diff --git a/metron-analytics/metron-profiler-spark/src/main/java/org/apache/metron/profiler/spark/function/TaskUtils.java b/metron-analytics/metron-profiler-spark/src/main/java/org/apache/metron/profiler/spark/function/TaskUtils.java
new file mode 100644
index 0000000..d401f12
--- /dev/null
+++ b/metron-analytics/metron-profiler-spark/src/main/java/org/apache/metron/profiler/spark/function/TaskUtils.java
@@ -0,0 +1,41 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.metron.profiler.spark.function;
+
+import org.apache.metron.stellar.dsl.Context;
+import org.apache.metron.stellar.dsl.StellarFunctions;
+
+import java.io.Serializable;
+import java.util.Map;
+
+public class TaskUtils implements Serializable {
+
+ /**
+ * Create the execution context for running Stellar.
+ */
+ public static Context getContext(Map<String, String> globals) {
+ Context context = new Context.Builder()
+ .with(Context.Capabilities.GLOBAL_CONFIG, () -> globals)
+ .with(Context.Capabilities.STELLAR_CONFIG, () -> globals)
+ .build();
+ StellarFunctions.initialize(context);
+ return context;
+ }
+}
http://git-wip-us.apache.org/repos/asf/metron/blob/3bfbf018/metron-analytics/metron-profiler-spark/src/test/java/org/apache/metron/profiler/spark/BatchProfilerIntegrationTest.java
----------------------------------------------------------------------
diff --git a/metron-analytics/metron-profiler-spark/src/test/java/org/apache/metron/profiler/spark/BatchProfilerIntegrationTest.java b/metron-analytics/metron-profiler-spark/src/test/java/org/apache/metron/profiler/spark/BatchProfilerIntegrationTest.java
new file mode 100644
index 0000000..f560740
--- /dev/null
+++ b/metron-analytics/metron-profiler-spark/src/test/java/org/apache/metron/profiler/spark/BatchProfilerIntegrationTest.java
@@ -0,0 +1,111 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.metron.profiler.spark;
+
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.metron.common.configuration.profiler.ProfileConfig;
+import org.apache.metron.common.configuration.profiler.ProfilerConfig;
+import org.apache.metron.hbase.mock.MockHBaseTableProvider;
+import org.apache.metron.hbase.mock.MockHTable;
+import org.apache.spark.SparkConf;
+import org.apache.spark.sql.SparkSession;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.util.List;
+import java.util.Properties;
+
+import static org.apache.metron.profiler.spark.BatchProfilerConfig.HBASE_COLUMN_FAMILY;
+import static org.apache.metron.profiler.spark.BatchProfilerConfig.HBASE_TABLE_NAME;
+import static org.apache.metron.profiler.spark.BatchProfilerConfig.HBASE_TABLE_PROVIDER;
+import static org.apache.metron.profiler.spark.BatchProfilerConfig.TELEMETRY_INPUT_FORMAT;
+import static org.apache.metron.profiler.spark.BatchProfilerConfig.TELEMETRY_INPUT_PATH;
+import static org.junit.Assert.assertEquals;
+
+public class BatchProfilerIntegrationTest {
+
+ private static SparkSession spark;
+ private MockHTable profilerTable;
+ private Properties profilerProperties;
+
+ @BeforeClass
+ public static void setupSpark() {
+ SparkConf conf = new SparkConf()
+ .setMaster("local")
+ .setAppName("BatchProfilerIntegrationTest")
+ .set("spark.sql.shuffle.partitions", "8");
+ spark = SparkSession
+ .builder()
+ .config(conf)
+ .getOrCreate();
+ }
+
+ @AfterClass
+ public static void tearDownSpark() {
+ if(spark != null) {
+ spark.close();
+ }
+ }
+
+ @Before
+ public void setup() {
+ profilerProperties = new Properties();
+
+ // define the source of the input telemetry
+ profilerProperties.put(TELEMETRY_INPUT_PATH.getKey(), "src/test/resources/telemetry.json");
+ profilerProperties.put(TELEMETRY_INPUT_FORMAT.getKey(), "text");
+
+ // define where the output will go
+ String tableName = HBASE_TABLE_NAME.get(profilerProperties, String.class);
+ String columnFamily = HBASE_COLUMN_FAMILY.get(profilerProperties, String.class);
+ profilerProperties.put(HBASE_TABLE_PROVIDER.getKey(), MockHBaseTableProvider.class.getName());
+
+ // create the mock hbase table
+ profilerTable = (MockHTable) MockHBaseTableProvider.addToCache(tableName, columnFamily);
+ }
+
+ @Test
+ public void testBatchProfiler() {
+
+ // run the batch profiler
+ BatchProfiler profiler = new BatchProfiler();
+ profiler.run(spark, profilerProperties, getGlobals(), getProfile());
+
+ List<Put> puts = profilerTable.getPutLog();
+ assertEquals(2, puts.size());
+ }
+
+
+ private ProfilerConfig getProfile() {
+ ProfileConfig profile = new ProfileConfig()
+ .withProfile("profile1")
+ .withForeach("ip_src_addr")
+ .withUpdate("count", "count + 1")
+ .withResult("count");
+ return new ProfilerConfig()
+ .withProfile(profile);
+ }
+
+ private Properties getGlobals() {
+ return new Properties();
+ }
+}
http://git-wip-us.apache.org/repos/asf/metron/blob/3bfbf018/metron-analytics/metron-profiler-spark/src/test/java/org/apache/metron/profiler/spark/function/HBaseWriterFunctionTest.java
----------------------------------------------------------------------
diff --git a/metron-analytics/metron-profiler-spark/src/test/java/org/apache/metron/profiler/spark/function/HBaseWriterFunctionTest.java b/metron-analytics/metron-profiler-spark/src/test/java/org/apache/metron/profiler/spark/function/HBaseWriterFunctionTest.java
new file mode 100644
index 0000000..55f3e21
--- /dev/null
+++ b/metron-analytics/metron-profiler-spark/src/test/java/org/apache/metron/profiler/spark/function/HBaseWriterFunctionTest.java
@@ -0,0 +1,176 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.metron.profiler.spark.function;
+
+import org.apache.commons.collections4.IteratorUtils;
+import org.apache.metron.common.configuration.profiler.ProfileConfig;
+import org.apache.metron.hbase.mock.MockHBaseTableProvider;
+import org.apache.metron.profiler.ProfileMeasurement;
+import org.apache.metron.profiler.spark.ProfileMeasurementAdapter;
+import org.json.simple.JSONObject;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Properties;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.metron.profiler.spark.BatchProfilerConfig.HBASE_COLUMN_FAMILY;
+import static org.apache.metron.profiler.spark.BatchProfilerConfig.HBASE_TABLE_NAME;
+
+public class HBaseWriterFunctionTest {
+
+ Properties profilerProperties;
+
+ @Before
+ public void setup() {
+ profilerProperties = getProfilerProperties();
+
+ // create a mock table for HBase
+ String tableName = HBASE_TABLE_NAME.get(profilerProperties, String.class);
+ String columnFamily = HBASE_COLUMN_FAMILY.get(profilerProperties, String.class);
+ MockHBaseTableProvider.addToCache(tableName, columnFamily);
+ }
+
+ @Test
+ public void testWrite() throws Exception {
+
+ JSONObject message = getMessage();
+ String entity = (String) message.get("ip_src_addr");
+ long timestamp = (Long) message.get("timestamp");
+ ProfileConfig profile = getProfile();
+
+ // setup the profile measurements that will be written
+ List<ProfileMeasurementAdapter> measurements = createMeasurements(1, entity, timestamp, profile);
+
+ // setup the function to test
+ HBaseWriterFunction function = new HBaseWriterFunction(profilerProperties);
+ function.withTableProviderImpl(MockHBaseTableProvider.class.getName());
+
+ // write the measurements
+ Iterator<Integer> results = function.call(measurements.iterator());
+
+ // validate the result
+ List<Integer> counts = IteratorUtils.toList(results);
+ Assert.assertEquals(1, counts.size());
+ Assert.assertEquals(1, counts.get(0).intValue());
+ }
+
+ @Test
+ public void testWriteMany() throws Exception {
+
+ JSONObject message = getMessage();
+ String entity = (String) message.get("ip_src_addr");
+ long timestamp = (Long) message.get("timestamp");
+ ProfileConfig profile = getProfile();
+
+ // setup the profile measurements that will be written
+ List<ProfileMeasurementAdapter> measurements = createMeasurements(10, entity, timestamp, profile);
+
+ // setup the function to test
+ HBaseWriterFunction function = new HBaseWriterFunction(profilerProperties);
+ function.withTableProviderImpl(MockHBaseTableProvider.class.getName());
+
+ // write the measurements
+ Iterator<Integer> results = function.call(measurements.iterator());
+
+ // validate the result
+ List<Integer> counts = IteratorUtils.toList(results);
+ Assert.assertEquals(1, counts.size());
+ Assert.assertEquals(10, counts.get(0).intValue());
+ }
+
+ @Test
+ public void testWriteNone() throws Exception {
+
+ // there are no profile measurements to write
+ List<ProfileMeasurementAdapter> measurements = new ArrayList<>();
+
+ // setup the function to test
+ HBaseWriterFunction function = new HBaseWriterFunction(profilerProperties);
+ function.withTableProviderImpl(MockHBaseTableProvider.class.getName());
+
+ // write the measurements
+ Iterator<Integer> results = function.call(measurements.iterator());
+
+ // validate the result
+ List<Integer> counts = IteratorUtils.toList(results);
+ Assert.assertEquals(1, counts.size());
+ Assert.assertEquals(0, counts.get(0).intValue());
+ }
+
+ /**
+ * Create a list of measurements for testing.
+ *
+ * @param count The number of messages to create.
+ * @param entity The entity.
+ * @param timestamp The timestamp.
+ * @param profile The profile definition.
+ * @return
+ */
+ private List<ProfileMeasurementAdapter> createMeasurements(int count, String entity, long timestamp, ProfileConfig profile) {
+ List<ProfileMeasurementAdapter> measurements = new ArrayList<>();
+
+ for(int i=0; i<count; i++) {
+ ProfileMeasurement measurement = new ProfileMeasurement()
+ .withProfileName(profile.getProfile())
+ .withEntity(entity)
+ .withPeriod(timestamp, 15, TimeUnit.MINUTES);
+
+ // wrap the measurement using the adapter
+ measurements.add(new ProfileMeasurementAdapter(measurement));
+ }
+
+ return measurements;
+ }
+
+ /**
+ * Returns a telemetry message to use for testing.
+ */
+ private JSONObject getMessage() {
+ JSONObject message = new JSONObject();
+ message.put("ip_src_addr", "192.168.1.1");
+ message.put("status", "red");
+ message.put("timestamp", System.currentTimeMillis());
+ return message;
+ }
+
+ /**
+ * Returns profiler properties to use for testing.
+ */
+ private Properties getProfilerProperties() {
+ return new Properties();
+ }
+
+ /**
+ * Returns a profile definition to use for testing.
+ */
+ private ProfileConfig getProfile() {
+ return new ProfileConfig()
+ .withProfile("profile1")
+ .withForeach("ip_src_addr")
+ .withUpdate("count", "count + 1")
+ .withResult("count");
+
+ }
+}