You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@metron.apache.org by rm...@apache.org on 2019/08/27 18:48:24 UTC
[metron] branch feature/METRON-2088-support-hdp-3.1 updated:
METRON-2169 Upgrade to Kafka 2.0.0 and Storm 1.2.1 (merrimanr) closes
apache/metron#1490
This is an automated email from the ASF dual-hosted git repository.
rmerriman pushed a commit to branch feature/METRON-2088-support-hdp-3.1
in repository https://gitbox.apache.org/repos/asf/metron.git
The following commit(s) were added to refs/heads/feature/METRON-2088-support-hdp-3.1 by this push:
new 15e5276 METRON-2169 Upgrade to Kafka 2.0.0 and Storm 1.2.1 (merrimanr) closes apache/metron#1490
15e5276 is described below
commit 15e527601eee02d33f0a93388471e9525cbd927d
Author: merrimanr <me...@gmail.com>
AuthorDate: Tue Aug 27 13:48:09 2019 -0500
METRON-2169 Upgrade to Kafka 2.0.0 and Storm 1.2.1 (merrimanr) closes apache/metron#1490
---
dependencies_with_url.csv | 32 +++++++
metron-analytics/metron-maas-service/pom.xml | 4 +
metron-analytics/metron-profiler-repl/pom.xml | 5 +
metron-analytics/metron-profiler-spark/pom.xml | 6 ++
metron-analytics/metron-profiler-storm/pom.xml | 30 +++---
.../storm/integration/ProfilerIntegrationTest.java | 10 +-
metron-contrib/metron-performance/pom.xml | 2 +-
.../ansible/roles/metron-builder/tasks/build.yml | 2 +-
.../ansible/roles/metron-builder/tasks/clean.yml | 2 +-
metron-interface/metron-rest/pom.xml | 22 +++--
.../org/apache/metron/rest/config/KafkaConfig.java | 63 ++++++-------
.../apache/metron/rest/config/ZookeeperConfig.java | 9 --
.../metron/rest/service/impl/KafkaServiceImpl.java | 59 ++++++------
.../org/apache/metron/rest/config/TestConfig.java | 33 ++++---
.../rest/service/impl/KafkaServiceImplTest.java | 69 +++++---------
metron-platform/metron-common/pom.xml | 8 +-
.../org/apache/metron/common/utils/KafkaUtils.java | 8 +-
.../metron-elasticsearch-common/pom.xml | 21 +----
.../metron-enrichment-storm/pom.xml | 19 ++--
metron-platform/metron-enrichment/pom.xml | 6 ++
metron-platform/metron-hbase-server/pom.xml | 6 ++
.../metron-indexing/metron-indexing-common/pom.xml | 6 ++
.../metron-indexing/metron-indexing-storm/pom.xml | 5 +
metron-platform/metron-integration-test/pom.xml | 15 ++-
.../integration/components/KafkaComponent.java | 104 +++++++++++----------
metron-platform/metron-management/pom.xml | 22 ++---
.../metron-parsing/metron-parsers-common/pom.xml | 21 ++---
.../metron-parsing/metron-parsers/pom.xml | 11 ++-
.../metron/parsers/topology/ParserTopologyCLI.java | 2 +-
metron-platform/metron-pcap-backend/pom.xml | 15 ---
.../integration/PcapTopologyIntegrationTest.java | 9 +-
metron-platform/metron-pcap/pom.xml | 29 +-----
.../metron-solr/metron-solr-common/pom.xml | 19 +---
.../metron-solr/metron-solr-storm/pom.xml | 48 +++++-----
.../metron-storm-kafka-override/pom.xml | 24 -----
metron-platform/metron-storm-kafka/pom.xml | 24 -----
metron-platform/metron-test-utilities/pom.xml | 2 +-
.../metron-writer/metron-writer-common/pom.xml | 2 +-
.../apache/metron/writer/kafka/KafkaWriter.java | 10 +-
.../metron/writer/kafka/KafkaWriterTest.java | 4 +-
metron-stellar/stellar-zeppelin/pom.xml | 6 ++
pom.xml | 26 ++----
42 files changed, 379 insertions(+), 441 deletions(-)
diff --git a/dependencies_with_url.csv b/dependencies_with_url.csv
index 0065f9b..9d7b0fa 100644
--- a/dependencies_with_url.csv
+++ b/dependencies_with_url.csv
@@ -5,6 +5,8 @@ org.objenesis:objenesis:jar:2.1:compile,Apache v2,http://objenesis.org/
org.ow2.asm:asm:jar:4.1:compile,BSD,http://asm.ow2.org/
org.ow2.asm:asm:jar:5.0.3:compile,BSD,http://asm.ow2.org/
org.ow2.asm:asm:jar:5.0.4:compile,BSD,http://asm.ow2.org/
+asm:asm-commons:jar:3.1:compile,BSD,https://asm.ow2.io/
+asm:asm-tree:jar:3.1:compile,BSD,https://asm.ow2.io/
org.reflections:reflections:jar:0.9.10:compile,BSD,https://github.com/ronmamo/reflections
org.javassist:javassist:jar:3.19.0-GA:compile,Apache v2,https://github.com/jboss-javassist/javassist
org.javassist:javassist:jar:3.17.1-GA:compile,Apache v2,https://github.com/jboss-javassist/javassist
@@ -43,6 +45,7 @@ javax.annotation:jsr250-api:jar:1.0:compile,COMMON DEVELOPMENT AND DISTRIBUTION
javax.annotation:javax.annotation-api:jar:1.3.2:compile,CDDL 1.1,https://github.com/javaee/javax.annotation/
javax.annotation:javax.annotation-api:jar:1.2:compile,CDDL 1.1,https://github.com/javaee/javax.annotation/
javax.mail:mail:jar:1.4:compile,Common Development and Distribution License (CDDL) v1.0,https://glassfish.dev.java.net/javaee5/mail/
+javax.mail:mail:jar:1.4.1:compile,Common Development and Distribution License (CDDL) v1.0,https://glassfish.dev.java.net/javaee5/mail/
javax.servlet:javax.servlet-api:jar:3.1.0:compile,CDDL,http://servlet-spec.java.net
javax.ws.rs:javax.ws.rs-api:jar:2.0.1:compile,CDDL 1.1,https://github.com/jax-rs/api
javax.xml.bind:jaxb-api:jar:2.2.11:compile,CDDL,http://jaxb.java.net/
@@ -61,6 +64,8 @@ net.sf.saxon:Saxon-HE:jar:9.5.1-5:compile,Mozilla Public License Version 2.0,htt
org.abego.treelayout:org.abego.treelayout.core:jar:1.0.1:compile,BSD 3-Clause "New" or "Revised" License (BSD-3-Clause),http://code.google.com/p/treelayout/
org.adrianwalker:multiline-string:jar:0.1.2:compile,Common Public License Version 1.0,https://github.com/benelog/multiline
org.antlr:antlr4-runtime:jar:4.5:compile,BSD 3-Clause License,http://www.antlr.org
+org.antlr:ST4:jar:4.0.4:compile,BSD 3-Clause License,http://www.antlr.org
+org.antlr:stringtemplate:jar:3.2.1:compile,BSD 3-Clause License,http://www.antlr.org
org.bouncycastle:bcprov-jdk15on:jar:1.52:compile,MIT,https://www.bouncycastle.org/license.html
org.bouncycastle:bcpkix-jdk15on:jar:1.52:compile,MIT,https://www.bouncycastle.org/license.html
org.clojure:clojure:jar:1.6.0:compile,Eclipse Public License 1.0,http://clojure.org/
@@ -88,6 +93,8 @@ org.scala-lang.modules:scala-xml_2.11:jar:1.0.1:compile,BSD-like,http://www.scal
org.scala-lang:scala-compiler:jar:2.11.0:compile,BSD-like,http://www.scala-lang.org/
org.scala-lang:scala-library:jar:2.11.8:compile,BSD-like,http://www.scala-lang.org/
org.scala-lang:scala-reflect:jar:2.11.8:compile,BSD-like,http://www.scala-lang.org/
+org.scala-lang:scala-library:jar:2.12.6:compile,BSD-like,http://www.scala-lang.org/
+org.scala-lang:scala-reflect:jar:2.12.6:compile,BSD-like,http://www.scala-lang.org/
org.scala-lang:scalap:jar:2.11.0:compile,BSD-like,http://www.scala-lang.org/
oro:oro:jar:2.0.8:compile,ASLv2,http://attic.apache.org/projects/jakarta-oro.html
xmlenc:xmlenc:jar:0.52:compile,The BSD License,http://xmlenc.sourceforge.net
@@ -135,6 +142,7 @@ org.slf4j:jul-to-slf4j:jar:1.7.21:compile,MIT,http://www.slf4j.org
org.slf4j:jul-to-slf4j:jar:1.7.25:compile,MIT,http://www.slf4j.org
aopalliance:aopalliance:jar:1.0:compile,Public Domain,http://aopalliance.sourceforge.net
com.101tec:zkclient:jar:0.8:compile,The Apache Software License, Version 2.0,https://github.com/sgroschupf/zkclient
+com.101tec:zkclient:jar:0.10:compile,The Apache Software License, Version 2.0,https://github.com/sgroschupf/zkclient
com.github.stephenc.findbugs:findbugs-annotations:jar:1.3.9-1:compile,Apache License, Version 2.0,http://stephenc.github.com/findbugs-annotations
com.github.tony19:named-regexp:jar:0.2.3:compile,Apache License, Version 2.0,
com.google.code.findbugs:jsr305:jar:1.3.9:compile,The Apache Software License, Version 2.0,http://findbugs.sourceforge.net/
@@ -166,6 +174,7 @@ com.fasterxml.jackson.core:jackson-databind:jar:2.7.4:compile,ASLv2,http://githu
com.fasterxml.jackson.core:jackson-databind:jar:2.8.3:compile,ASLv2,http://github.com/FasterXML/jackson
com.fasterxml.jackson.core:jackson-databind:jar:2.9.4:compile,ASLv2,http://github.com/FasterXML/jackson
com.fasterxml.jackson.core:jackson-databind:jar:2.9.5:compile,ASLv2,http://github.com/FasterXML/jackson
+com.fasterxml.jackson.core:jackson-databind:jar:2.9.6:compile,ASLv2,http://github.com/FasterXML/jackson
com.fasterxml.jackson.dataformat:jackson-dataformat-cbor:jar:2.6.6:compile,ASLv2,http://wiki.fasterxml.com/JacksonForCbor
com.fasterxml.jackson.dataformat:jackson-dataformat-smile:jar:2.6.6:compile,ASLv2,http://wiki.fasterxml.com/JacksonForSmile
com.fasterxml.jackson.dataformat:jackson-dataformat-yaml:jar:2.6.6:compile,ASLv2,https://github.com/FasterXML/jackson
@@ -191,6 +200,7 @@ com.google.code.gson:gson:jar:2.8.5:compile,The Apache Software License, Version
com.google.guava:guava:jar:11.0.2:compile,ASLv2,
com.google.guava:guava:jar:12.0.1:compile,ASLv2,
com.google.guava:guava:jar:12.0:compile,ASLv2,
+com.google.guava:guava:jar:14.0.1:compile,ASLv2,
com.google.guava:guava:jar:16.0.1:compile,ASLv2,
com.google.guava:guava:jar:17.0:compile,ASLv2,
com.google.guava:guava:jar:18.0:compile,ASLv2,
@@ -229,12 +239,14 @@ commons-configuration:commons-configuration:jar:1.10:compile,ASLv2,http://common
commons-configuration:commons-configuration:jar:1.6:compile,The Apache Software License, Version 2.0,http://commons.apache.org/${pom.artifactId.substring(8)}/
commons-configuration:commons-configuration:jar:1.9:compile,The Apache Software License, Version 2.0,http://commons.apache.org/${pom.artifactId.substring(8)}/
commons-daemon:commons-daemon:jar:1.0.13:compile,ASLv2,http://commons.apache.org/daemon/
+commons-dbcp:commons-dbcp:jar:1.4:compile,ASLv2,http://commons.apache.org/proper/commons-dbcp/
commons-digester:commons-digester:jar:1.8.1:compile,ASLv2,http://commons.apache.org/digester/
commons-digester:commons-digester:jar:1.8:compile,The Apache Software License, Version 2.0,http://jakarta.apache.org/commons/digester/
commons-digester:commons-digester:jar:2.1:compile,ASLv2,http://commons.apache.org/digester/
commons-el:commons-el:jar:1.0:provided,The Apache Software License, Version 2.0,http://jakarta.apache.org/commons/el/
commons-el:commons-el:jar:1.0:runtime,The Apache Software License, Version 2.0,http://jakarta.apache.org/commons/el/
commons-el:commons-el:jar:1.0:compile,The Apache Software License, Version 2.0,http://jakarta.apache.org/commons/el/
+commons-httpclient:commons-httpclient:jar:3.0.1:compile,Apache License,http://jakarta.apache.org/httpcomponents/httpclient-3.x/
commons-httpclient:commons-httpclient:jar:3.1:compile,Apache License,http://jakarta.apache.org/httpcomponents/httpclient-3.x/
commons-io:commons-io:jar:2.4:compile,ASLv2,http://commons.apache.org/io/
commons-io:commons-io:jar:2.5:compile,ASLv2,http://commons.apache.org/io/
@@ -249,6 +261,7 @@ commons-logging:commons-logging:jar:1.2:compile,ASLv2,http://commons.apache.org/
commons-net:commons-net:jar:2.2:compile,ASLv2,http://commons.apache.org/net/
commons-net:commons-net:jar:3.1:compile,ASLv2,http://commons.apache.org/net/
commons-net:commons-net:jar:3.1:provided,ASLv2,http://commons.apache.org/net/
+commons-pool:commons-pool:jar:1.5.4:compile,ASLv2,http://commons.apache.org/proper/commons-pool/
commons-text:commons-text:jar:1.1:compile,ASLv2,http://commons.apache.org/proper/commons-text/
commons-validator:commons-validator:jar:1.4.0:compile,ASLv2,http://commons.apache.org/validator/
commons-validator:commons-validator:jar:1.5.1:compile,ASLv2,http://commons.apache.org/proper/commons-validator/
@@ -257,7 +270,10 @@ et.razorvine:pyrolite:jar:4.13:compile,MIT,https://github.com/irmen/Pyrolite
io.airlift:aircompressor:jar:0.8:compile,ASLv2,https://github.com/airlift/aircompressor
io.confluent:kafka-avro-serializer:jar:1.0:compile,ASLv2,https://github.com/confluentinc/schema-registry/
io.confluent:kafka-schema-registry-client:jar:1.0:compile,ASLv2,https://github.com/confluentinc/schema-registry/
+io.dropwizard.metrics:metrics-core:jar:3.1.0:compile,ASLv2,https://github.com/dropwizard/metrics
io.dropwizard.metrics:metrics-core:jar:3.1.5:compile,ASLv2,https://github.com/dropwizard/metrics
+io.dropwizard.metrics:metrics-ganglia:jar:3.1.0:compile,ASLv2,https://github.com/dropwizard/metrics
+io.dropwizard.metrics:metrics-graphite:jar:3.1.0:compile,ASLv2,https://github.com/dropwizard/metrics
io.dropwizard.metrics:metrics-graphite:jar:3.1.5:compile,ASLv2,https://github.com/dropwizard/metrics
io.dropwizard.metrics:metrics-json:jar:3.1.5:compile,ASLv2,https://github.com/dropwizard/metrics
io.dropwizard.metrics:metrics-jvm:jar:3.1.5:compile,ASLv2,https://github.com/dropwizard/metrics
@@ -294,6 +310,7 @@ org.eclipse.jetty:jetty-security:jar:9.3.0.M0:compile,ASLv2,http://www.eclipse.o
org.eclipse.jetty:jetty-server:jar:9.3.0.M0:compile,ASLv2,http://www.eclipse.org/jetty
org.eclipse.jetty:jetty-servlet:jar:9.3.0.M0:compile,ASLv2,http://www.eclipse.org/jetty
org.eclipse.jetty:jetty-util:jar:9.3.0.M0:compile,ASLv2,http://www.eclipse.org/jetty
+org.eclipse.jetty.aggregate:jetty-all:jar:7.6.0.v20120127:compile,ASLv2,http://www.eclipse.org/jetty
org.elasticsearch:elasticsearch:jar:2.3.3:compile,ASLv2,
org.elasticsearch:elasticsearch:jar:2.3.3:import,ASLv2,
org.elasticsearch:securesm:jar:1.0:compile,ASLv2,
@@ -354,6 +371,7 @@ org.tukaani:xz:jar:1.0:compile,Public Domain,http://tukaani.org/xz/java.html
org.xerial.snappy:snappy-java:jar:1.0.4.1:compile,The Apache Software License, Version 2.0,http://code.google.com/p/snappy-java/
org.xerial.snappy:snappy-java:jar:1.1.1.7:compile,The Apache Software License, Version 2.0,https://github.com/xerial/snappy-java
org.xerial.snappy:snappy-java:jar:1.1.2.6:compile,The Apache Software License, Version 2.0,https://github.com/xerial/snappy-java
+org.xerial.snappy:snappy-java:jar:1.1.7.1:compile,The Apache Software License, Version 2.0,https://github.com/xerial/snappy-java
org.yaml:snakeyaml:jar:1.11:compile,Apache License Version 2.0,http://www.snakeyaml.org
org.yaml:snakeyaml:jar:1.15:compile,Apache License Version 2.0,http://www.snakeyaml.org
org.yaml:snakeyaml:jar:1.17:compile,Apache License Version 2.0,http://www.snakeyaml.org
@@ -374,6 +392,7 @@ io.springfox:springfox-swagger2:jar:2.5.0:compile,ASLv2,https://github.com/sprin
io.swagger:swagger-annotations:jar:1.5.9:compile,ASLv2,https://github.com/swagger-api/swagger-core
io.swagger:swagger-models:jar:1.5.9:compile,ASLv2,https://github.com/swagger-api/swagger-core
javax.transaction:javax.transaction-api:jar:1.2:compile,CDDL-1.0,https://java.net/projects/jta-spec/
+javax.transaction:jta:jar:1.1:compile,CDDL-1.0,https://java.net/projects/jta-spec/
javax.validation:validation-api:jar:1.1.0.Final:compile,ASLv2,http://beanvalidation.org
javax.validation:validation-api:jar:2.0.1.Final:compile,ASLv2,http://beanvalidation.org
joda-time:joda-time:jar:2.9.4:compile,ASLv2,https://github.com/JodaOrg/joda-time
@@ -446,6 +465,9 @@ io.netty:netty-resolver:jar:4.1.13.Final:compile,ASLv2,http://netty.io/
io.netty:netty-transport:jar:4.1.13.Final:compile,ASLv2,http://netty.io/
joda-time:joda-time:jar:2.9.5:compile,ASLv2,https://github.com/JodaOrg/joda-time
net.sf.jopt-simple:jopt-simple:jar:5.0.2:compile,The MIT License,http://jopt-simple.sourceforge.net
+net.sf.jopt-simple:jopt-simple:jar:5.0.4:compile,The MIT License,http://jopt-simple.sourceforge.net
+net.sf.jpam:jpam:jar:1.1:compile,ASLv2,http://jpam.sourceforge.net/
+net.sf.opencsv:opencsv:jar:2.3:compile,ASLv2,http://opencsv.sourceforge.net/
org.elasticsearch.client:elasticsearch-rest-client:jar:5.6.14:compile,ASLv2,https://github.com/elastic/elasticsearch/blob/master/LICENSE.txt
org.elasticsearch.client:transport:jar:5.6.14:compile,ASLv2,https://github.com/elastic/elasticsearch/blob/master/LICENSE.txt
org.elasticsearch:elasticsearch:jar:5.6.14:compile,ASLv2,https://github.com/elastic/elasticsearch/blob/master/LICENSE.txt
@@ -509,3 +531,13 @@ com.github.stephenc.jcip:jcip-annotations:jar:1.0-1:compile,ASLv2,http://stephen
com.nimbusds:nimbus-jose-jwt:jar:4.41.2:compile,ASLv2,https://bitbucket.org/connect2id/nimbus-jose-jwt/wiki/Home
tomcat:jasper-compiler:jar:5.5.23:compile,ASLv2,https://tomcat.apache.org/
org.danilopianini:gson-extras:jar:0.2.1:compile,ASLv2,https://github.com/DanySK/gson-extras
+com.typesafe.scala-logging:scala-logging_2.12:jar:3.9.0:compile,ASLv2,https://github.com/lightbend/scala-logging
+info.ganglia.gmetric4j:gmetric4j:jar:1.0.7:compile,BSD,https://github.com/ganglia/gmetric4j
+javax.jdo:jdo-api:jar:3.0.1:compile,ASLv2,http://db.apache.org/jdo/
+org.codehaus.groovy:groovy-all:jar:2.1.6:compile,ASLv2,https://groovy-lang.org/
+org.datanucleus:datanucleus-api-jdo:jar:3.2.6:compile,ASLv2,http://www.datanucleus.org/
+org.datanucleus:datanucleus-core:jar:3.2.10:compile,ASLv2,http://www.datanucleus.org/
+org.datanucleus:datanucleus-rdbms:jar:3.2.9:compile,ASLv2,http://www.datanucleus.org/
+org.lz4:lz4-java:jar:1.4.1:compile,ASLv2,https://github.com/lz4/lz4-java
+com.jolbox:bonecp:jar:0.8.0.RELEASE:compile,ASLv2
+stax:stax-api:jar:1.0.1:compile,ASLv2
diff --git a/metron-analytics/metron-maas-service/pom.xml b/metron-analytics/metron-maas-service/pom.xml
index 2bb06fe..2f4ff3a 100644
--- a/metron-analytics/metron-maas-service/pom.xml
+++ b/metron-analytics/metron-maas-service/pom.xml
@@ -174,6 +174,10 @@
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
+ <exclusion>
+ <groupId>org.slf4j</groupId>
+ <artifactId>log4j-over-slf4j</artifactId>
+ </exclusion>
</exclusions>
</dependency>
<dependency>
diff --git a/metron-analytics/metron-profiler-repl/pom.xml b/metron-analytics/metron-profiler-repl/pom.xml
index 86e917d..bae7f37 100644
--- a/metron-analytics/metron-profiler-repl/pom.xml
+++ b/metron-analytics/metron-profiler-repl/pom.xml
@@ -34,6 +34,11 @@
</properties>
<dependencies>
<dependency>
+ <groupId>org.apache.commons</groupId>
+ <artifactId>commons-lang3</artifactId>
+ <version>3.2</version>
+ </dependency>
+ <dependency>
<groupId>org.apache.metron</groupId>
<artifactId>metron-profiler-common</artifactId>
<version>${project.parent.version}</version>
diff --git a/metron-analytics/metron-profiler-spark/pom.xml b/metron-analytics/metron-profiler-spark/pom.xml
index 0cd811d..caaed87 100644
--- a/metron-analytics/metron-profiler-spark/pom.xml
+++ b/metron-analytics/metron-profiler-spark/pom.xml
@@ -155,6 +155,12 @@
<artifactId>metron-integration-test</artifactId>
<version>${project.parent.version}</version>
<scope>test</scope>
+ <exclusions>
+ <exclusion>
+ <groupId>org.slf4j</groupId>
+ <artifactId>log4j-over-slf4j</artifactId>
+ </exclusion>
+ </exclusions>
</dependency>
</dependencies>
<build>
diff --git a/metron-analytics/metron-profiler-storm/pom.xml b/metron-analytics/metron-profiler-storm/pom.xml
index d71dfc4..ea3e612 100644
--- a/metron-analytics/metron-profiler-storm/pom.xml
+++ b/metron-analytics/metron-profiler-storm/pom.xml
@@ -39,6 +39,11 @@
<version>${global_jackson_version}</version>
</dependency>
<dependency>
+ <groupId>org.ow2.asm</groupId>
+ <artifactId>asm</artifactId>
+ <version>5.0.3</version>
+ </dependency>
+ <dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>${guava_version}</version>
@@ -96,6 +101,12 @@
<groupId>org.apache.metron</groupId>
<artifactId>metron-writer-storm</artifactId>
<version>${project.parent.version}</version>
+ <exclusions>
+ <exclusion>
+ <groupId>asm</groupId>
+ <artifactId>asm</artifactId>
+ </exclusion>
+ </exclusions>
</dependency>
<dependency>
<groupId>org.apache.metron</groupId>
@@ -173,6 +184,10 @@
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
+ <exclusion>
+ <groupId>org.slf4j</groupId>
+ <artifactId>log4j-over-slf4j</artifactId>
+ </exclusion>
</exclusions>
</dependency>
<dependency>
@@ -276,21 +291,6 @@
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
- <artifactId>kafka_2.10</artifactId>
- <version>${global_kafka_version}</version>
- <exclusions>
- <exclusion>
- <groupId>log4j</groupId>
- <artifactId>log4j</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.slf4j</groupId>
- <artifactId>slf4j-log4j12</artifactId>
- </exclusion>
- </exclusions>
- </dependency>
- <dependency>
- <groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>${global_kafka_version}</version>
<classifier>test</classifier>
diff --git a/metron-analytics/metron-profiler-storm/src/test/java/org/apache/metron/profiler/storm/integration/ProfilerIntegrationTest.java b/metron-analytics/metron-profiler-storm/src/test/java/org/apache/metron/profiler/storm/integration/ProfilerIntegrationTest.java
index a1bd458..4546a7c 100644
--- a/metron-analytics/metron-profiler-storm/src/test/java/org/apache/metron/profiler/storm/integration/ProfilerIntegrationTest.java
+++ b/metron-analytics/metron-profiler-storm/src/test/java/org/apache/metron/profiler/storm/integration/ProfilerIntegrationTest.java
@@ -155,6 +155,7 @@ public class ProfilerIntegrationTest extends BaseIntegrationTest {
uploadConfigToZookeeper(ProfilerConfig.fromJSON(processingTimeProfile));
// start the topology and write 3 test messages to kafka
+ fluxComponent.start();
fluxComponent.submitTopology();
kafkaComponent.writeMessages(inputTopic, message1);
kafkaComponent.writeMessages(inputTopic, message2);
@@ -196,6 +197,7 @@ public class ProfilerIntegrationTest extends BaseIntegrationTest {
uploadConfigToZookeeper(ProfilerConfig.fromJSON(processingTimeProfile));
// start the topology and write 3 test messages to kafka
+ fluxComponent.start();
fluxComponent.submitTopology();
kafkaComponent.writeMessages(inputTopic, message1);
kafkaComponent.writeMessages(inputTopic, message2);
@@ -244,6 +246,7 @@ public class ProfilerIntegrationTest extends BaseIntegrationTest {
uploadConfigToZookeeper(ProfilerConfig.fromJSON(eventTimeProfile));
// start the topology and write test messages to kafka
+ fluxComponent.start();
fluxComponent.submitTopology();
List<String> messages = FileUtils.readLines(new File("src/test/resources/telemetry.json"));
kafkaComponent.writeMessages(inputTopic, messages);
@@ -309,6 +312,7 @@ public class ProfilerIntegrationTest extends BaseIntegrationTest {
uploadConfigToZookeeper(ProfilerConfig.fromJSON(profileWithStats));
// start the topology and write test messages to kafka
+ fluxComponent.start();
fluxComponent.submitTopology();
List<String> messages = FileUtils.readLines(new File("src/test/resources/telemetry.json"));
kafkaComponent.writeMessages(inputTopic, messages);
@@ -358,6 +362,7 @@ public class ProfilerIntegrationTest extends BaseIntegrationTest {
uploadConfigToZookeeper(ProfilerConfig.fromJSON(profileWithTriageResult));
// start the topology and write test messages to kafka
+ fluxComponent.start();
fluxComponent.submitTopology();
List<String> telemetry = FileUtils.readLines(new File("src/test/resources/telemetry.json"));
kafkaComponent.writeMessages(inputTopic, telemetry);
@@ -519,7 +524,10 @@ public class ProfilerIntegrationTest extends BaseIntegrationTest {
@After
public void tearDown() throws Exception {
if (runner != null) {
- runner.reset();
+ fluxComponent.stop();
+ configUploadComponent.reset();
+ kafkaComponent.reset();
+ zkComponent.reset();
}
}
diff --git a/metron-contrib/metron-performance/pom.xml b/metron-contrib/metron-performance/pom.xml
index 0518fc8..4726608 100644
--- a/metron-contrib/metron-performance/pom.xml
+++ b/metron-contrib/metron-performance/pom.xml
@@ -44,7 +44,7 @@
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
- <artifactId>kafka_2.10</artifactId>
+ <artifactId>kafka-clients</artifactId>
<version>${global_kafka_version}</version>
<scope>provided</scope>
</dependency>
diff --git a/metron-deployment/ansible/roles/metron-builder/tasks/build.yml b/metron-deployment/ansible/roles/metron-builder/tasks/build.yml
index b63d1ef..247bea1 100644
--- a/metron-deployment/ansible/roles/metron-builder/tasks/build.yml
+++ b/metron-deployment/ansible/roles/metron-builder/tasks/build.yml
@@ -20,7 +20,7 @@
args:
chdir: "{{ metron_build_dir }}"
with_items:
- - mvn package -DskipTests -T 2C -P HDP-2.5.0.0,mpack
+ - mvn package -DskipTests -T 2C -P HDP-3.1,mpack
become: false
run_once: true
delegate_to: localhost
diff --git a/metron-deployment/ansible/roles/metron-builder/tasks/clean.yml b/metron-deployment/ansible/roles/metron-builder/tasks/clean.yml
index 4497d82..582235a 100644
--- a/metron-deployment/ansible/roles/metron-builder/tasks/clean.yml
+++ b/metron-deployment/ansible/roles/metron-builder/tasks/clean.yml
@@ -23,7 +23,7 @@
args:
chdir: "{{ metron_build_dir }}"
with_items:
- - mvn clean -P HDP-2.5.0.0
+ - mvn clean -P HDP-3.1
- mvn clean -P mpack
- mvn clean -P build-rpms
- mvn clean -P build-debs
diff --git a/metron-interface/metron-rest/pom.xml b/metron-interface/metron-rest/pom.xml
index 84269c2..8718ed8 100644
--- a/metron-interface/metron-rest/pom.xml
+++ b/metron-interface/metron-rest/pom.xml
@@ -43,6 +43,16 @@
</properties>
<dependencies>
<dependency>
+ <groupId>org.codehaus.groovy</groupId>
+ <artifactId>groovy-all</artifactId>
+ <version>2.1.6</version>
+ </dependency>
+ <dependency>
+ <groupId>io.springfox</groupId>
+ <artifactId>springfox-swagger2</artifactId>
+ <version>${swagger.version}</version>
+ </dependency>
+ <dependency>
<!-- There's an issue with the Javassist versions used by our version of Powermock. Keep the explicit test dep at the top
so it takes precedence during testing - https://github.com/powermock/powermock/issues/729
-->
@@ -325,11 +335,7 @@
<artifactId>json-path</artifactId>
<version>${jsonpath.version}</version>
</dependency>
- <dependency>
- <groupId>io.springfox</groupId>
- <artifactId>springfox-swagger2</artifactId>
- <version>${swagger.version}</version>
- </dependency>
+
<dependency>
<groupId>io.springfox</groupId>
<artifactId>springfox-swagger-ui</artifactId>
@@ -337,7 +343,7 @@
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
- <artifactId>kafka_2.10</artifactId>
+ <artifactId>kafka-clients</artifactId>
<version>${global_kafka_version}</version>
</dependency>
<dependency>
@@ -487,6 +493,10 @@
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
+ <exclusion>
+ <groupId>org.codehaus.groovy</groupId>
+ <artifactId>groovy-all</artifactId>
+ </exclusion>
</exclusions>
</dependency>
<dependency>
diff --git a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/config/KafkaConfig.java b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/config/KafkaConfig.java
index 03db036..52a7dbb 100644
--- a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/config/KafkaConfig.java
+++ b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/config/KafkaConfig.java
@@ -17,12 +17,14 @@
*/
package org.apache.metron.rest.config;
-import kafka.admin.AdminUtils$;
-import kafka.utils.ZkUtils;
-import org.I0Itec.zkclient.ZkClient;
+import org.apache.kafka.clients.admin.AdminClient;
+import org.apache.kafka.clients.admin.AdminClientConfig;
+import org.apache.kafka.clients.admin.KafkaAdminClient;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
-import org.apache.kafka.common.protocol.SecurityProtocol;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.apache.metron.common.utils.KafkaUtils;
import org.apache.metron.rest.MetronRestConstants;
import org.springframework.beans.factory.annotation.Autowired;
@@ -59,20 +61,6 @@ public class KafkaConfig {
}
/**
- * The client used for ZooKeeper.
- */
- @Autowired
- private ZkClient zkClient;
-
- /**
- * Bean for ZooKeeper
- */
- @Bean
- public ZkUtils zkUtils() {
- return ZkUtils.apply(zkClient, false);
- }
-
- /**
* Create properties that will be used by {@link org.apache.metron.rest.config.KafkaConfig#createConsumerFactory()}
*
* @return Configurations used by {@link org.apache.metron.rest.config.KafkaConfig#createConsumerFactory()}.
@@ -80,13 +68,13 @@ public class KafkaConfig {
@Bean
public Map<String, Object> consumerProperties() {
final Map<String, Object> props = new HashMap<>();
- props.put("bootstrap.servers", environment.getProperty(MetronRestConstants.KAFKA_BROKER_URL_SPRING_PROPERTY));
- props.put("group.id", "metron-rest");
- props.put("enable.auto.commit", "false");
- props.put("auto.commit.interval.ms", "1000");
- props.put("session.timeout.ms", "30000");
- props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
- props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
+ props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, environment.getProperty(MetronRestConstants.KAFKA_BROKER_URL_SPRING_PROPERTY));
+ props.put(ConsumerConfig.GROUP_ID_CONFIG, "metron-rest");
+ props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
+ props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
+ props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "30000");
+ props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
+ props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
if (environment.getProperty(MetronRestConstants.KERBEROS_ENABLED_SPRING_PROPERTY, Boolean.class, false)) {
props.put("security.protocol", KafkaUtils.INSTANCE.normalizeProtocol(environment.getProperty(MetronRestConstants.KAFKA_SECURITY_PROTOCOL_SPRING_PROPERTY)));
}
@@ -106,30 +94,35 @@ public class KafkaConfig {
@Bean
public Map<String, Object> producerProperties() {
Map<String, Object> producerConfig = new HashMap<>();
- producerConfig.put("bootstrap.servers", environment.getProperty(MetronRestConstants.KAFKA_BROKER_URL_SPRING_PROPERTY));
- producerConfig.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
- producerConfig.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
- producerConfig.put("request.required.acks", 1);
+ producerConfig.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, environment.getProperty(MetronRestConstants.KAFKA_BROKER_URL_SPRING_PROPERTY));
+ producerConfig.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
+ producerConfig.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
+ producerConfig.put(ProducerConfig.ACKS_CONFIG, "1");
if (environment.getProperty(MetronRestConstants.KERBEROS_ENABLED_SPRING_PROPERTY, Boolean.class, false)) {
- producerConfig.put("security.protocol", KafkaUtils.INSTANCE.normalizeProtocol(environment.getProperty(MetronRestConstants.KAFKA_SECURITY_PROTOCOL_SPRING_PROPERTY)));
+ producerConfig.put("security.protocol", SecurityProtocol.SASL_PLAINTEXT);
}
return producerConfig;
}
-
+ /**
+ * The {@link KafkaProducer} is thread-safe so we can reuse it across the application.
+ * @return
+ */
@Bean
public KafkaProducer kafkaProducer() {
return new KafkaProducer<>(producerProperties());
}
/**
- * Create a bean for {@link AdminUtils$}. This is primarily done to make testing a bit easier.
+ * Create a bean for {@link AdminClient}. This is primarily done to make testing a bit easier.
*
- * @return {@link AdminUtils$} is written in scala. We return a reference to this class.
+ * @return adminClient
*/
@Bean
- public AdminUtils$ adminUtils() {
- return AdminUtils$.MODULE$;
+ public AdminClient adminClient() {
+ Map<String, Object> adminConfig = new HashMap<>();
+ adminConfig.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, environment.getProperty(MetronRestConstants.KAFKA_BROKER_URL_SPRING_PROPERTY));
+ return KafkaAdminClient.create(adminConfig);
}
}
diff --git a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/config/ZookeeperConfig.java b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/config/ZookeeperConfig.java
index 6f4656e..1cb2e50 100644
--- a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/config/ZookeeperConfig.java
+++ b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/config/ZookeeperConfig.java
@@ -17,8 +17,6 @@
*/
package org.apache.metron.rest.config;
-import kafka.utils.ZKStringSerializer$;
-import org.I0Itec.zkclient.ZkClient;
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
@@ -55,11 +53,4 @@ public class ZookeeperConfig {
CuratorFramework ret = CuratorFrameworkFactory.newClient(environment.getProperty(MetronRestConstants.ZK_URL_SPRING_PROPERTY), retryPolicy);
return ret;
}
-
- @Bean(destroyMethod="close")
- public ZkClient zkClient(Environment environment) {
- int sessionTimeout = Integer.parseInt(environment.getProperty(MetronRestConstants.ZK_CLIENT_SESSION_TIMEOUT));
- int connectionTimeout = Integer.parseInt(environment.getProperty(MetronRestConstants.ZK_CLIENT_CONNECTION_TIMEOUT));
- return new ZkClient(environment.getProperty(MetronRestConstants.ZK_URL_SPRING_PROPERTY), sessionTimeout, connectionTimeout, ZKStringSerializer$.MODULE$);
- }
}
diff --git a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/KafkaServiceImpl.java b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/KafkaServiceImpl.java
index ac001b5..0c7939f 100644
--- a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/KafkaServiceImpl.java
+++ b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/KafkaServiceImpl.java
@@ -17,24 +17,31 @@
*/
package org.apache.metron.rest.service.impl;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Set;
+import java.time.Duration;
+import java.util.Collections;
import java.util.HashSet;
+import java.util.List;
import java.util.Map;
+import java.util.Set;
import java.util.stream.Collectors;
-import kafka.admin.AclCommand;
-import kafka.admin.AdminOperationException;
-import kafka.admin.AdminUtils$;
-import kafka.admin.RackAwareMode;
-import kafka.utils.ZkUtils;
+import org.apache.kafka.clients.admin.AdminClient;
+import org.apache.kafka.clients.admin.CreateTopicsResult;
+import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.acl.AccessControlEntry;
+import org.apache.kafka.common.acl.AclBinding;
+import org.apache.kafka.common.acl.AclOperation;
+import org.apache.kafka.common.acl.AclPermissionType;
+import org.apache.kafka.common.resource.PatternType;
+import org.apache.kafka.common.resource.ResourcePattern;
+import org.apache.kafka.common.resource.ResourceType;
import org.apache.metron.rest.MetronRestConstants;
import org.apache.metron.rest.RestException;
import org.apache.metron.rest.model.KafkaTopic;
@@ -58,40 +65,37 @@ public class KafkaServiceImpl implements KafkaService {
*/
private static final int KAFKA_CONSUMER_TIMEOUT = 100;
- private final ZkUtils zkUtils;
private final ConsumerFactory<String, String> kafkaConsumerFactory;
private final KafkaProducer<String, String> kafkaProducer;
- private final AdminUtils$ adminUtils;
+ private final AdminClient adminClient;
@Autowired
private Environment environment;
/**
- * @param zkUtils A utility class used to interact with ZooKeeper.
* @param kafkaConsumerFactory A class used to create {@link KafkaConsumer} in order to interact with Kafka.
* @param kafkaProducer A class used to produce messages to Kafka.
- * @param adminUtils A utility class used to do administration operations on Kafka.
+ * @param adminClient A utility class used to do administration operations on Kafka.
*/
@Autowired
- public KafkaServiceImpl(final ZkUtils zkUtils,
- final ConsumerFactory<String, String> kafkaConsumerFactory,
+ public KafkaServiceImpl(final ConsumerFactory<String, String> kafkaConsumerFactory,
final KafkaProducer<String, String> kafkaProducer,
- final AdminUtils$ adminUtils) {
- this.zkUtils = zkUtils;
+ final AdminClient adminClient) {
this.kafkaConsumerFactory = kafkaConsumerFactory;
this.kafkaProducer = kafkaProducer;
- this.adminUtils = adminUtils;
+ this.adminClient = adminClient;
}
@Override
public KafkaTopic createTopic(final KafkaTopic topic) throws RestException {
if (!listTopics().contains(topic.getName())) {
try {
- adminUtils.createTopic(zkUtils, topic.getName(), topic.getNumPartitions(), topic.getReplicationFactor(), topic.getProperties(), RackAwareMode.Disabled$.MODULE$);
+ CreateTopicsResult createTopicsResult = adminClient.createTopics(Collections.singletonList(new NewTopic(topic.getName(), topic.getNumPartitions(), (short) topic.getReplicationFactor())));
+ createTopicsResult.all();
if (environment.getProperty(MetronRestConstants.KERBEROS_ENABLED_SPRING_PROPERTY, Boolean.class, false)){
addACLToCurrentUser(topic.getName());
}
- } catch (AdminOperationException e) {
+ } catch (KafkaException e) {
throw new RestException(e);
}
}
@@ -102,7 +106,7 @@ public class KafkaServiceImpl implements KafkaService {
public boolean deleteTopic(final String name) {
final Set<String> topics = listTopics();
if (topics != null && topics.contains(name)) {
- adminUtils.deleteTopic(zkUtils, name);
+ adminClient.deleteTopics(Collections.singletonList(name));
return true;
} else {
return false;
@@ -150,7 +154,7 @@ public class KafkaServiceImpl implements KafkaService {
.filter(p -> (kafkaConsumer.position(p) - 1) >= 0)
.forEach(p -> kafkaConsumer.seek(p, kafkaConsumer.position(p) - 1));
- final ConsumerRecords<String, String> records = kafkaConsumer.poll(KAFKA_CONSUMER_TIMEOUT);
+ final ConsumerRecords<String, String> records = kafkaConsumer.poll(Duration.ofMillis(KAFKA_CONSUMER_TIMEOUT));
message = records.isEmpty() ? null : records.iterator().next().value();
kafkaConsumer.unsubscribe();
}
@@ -166,18 +170,11 @@ public class KafkaServiceImpl implements KafkaService {
@Override
public boolean addACLToCurrentUser(String name){
if(listTopics().contains(name)) {
- String zkServers = environment.getProperty(MetronRestConstants.ZK_URL_SPRING_PROPERTY);
User principal = (User) SecurityContextHolder.getContext().getAuthentication().getPrincipal();
String user = principal.getUsername();
- List<String> cmd = new ArrayList<>();
- cmd.add("--add");
- cmd.add("--allow-principal");
- cmd.add("User:" + user);
- cmd.add("--topic");
- cmd.add(name);
- cmd.add("--authorizer-properties");
- cmd.add("zookeeper.connect=" + String.join(",", zkServers));
- AclCommand.main(cmd.toArray(new String[cmd.size()]));
+ ResourcePattern resourcePattern = new ResourcePattern(ResourceType.TOPIC, name, PatternType.LITERAL);
+ AccessControlEntry accessControlEntry = new AccessControlEntry(user, "*", AclOperation.ALL, AclPermissionType.ALLOW);
+ adminClient.createAcls(Collections.singletonList(new AclBinding(resourcePattern, accessControlEntry)));
} else {
return false;
}
diff --git a/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/config/TestConfig.java b/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/config/TestConfig.java
index 667eb26..3d13341 100644
--- a/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/config/TestConfig.java
+++ b/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/config/TestConfig.java
@@ -17,7 +17,6 @@
*/
package org.apache.metron.rest.config;
-import kafka.admin.AdminUtils$;
import kafka.utils.ZKStringSerializer$;
import kafka.utils.ZkUtils;
import org.I0Itec.zkclient.ZkClient;
@@ -30,7 +29,11 @@ import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.HTableInterface;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.kafka.clients.admin.AdminClient;
+import org.apache.kafka.clients.admin.AdminClientConfig;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.metron.common.configuration.ConfigurationsUtils;
import org.apache.metron.common.zookeeper.ConfigurationsCache;
import org.apache.metron.common.zookeeper.ZKConfigurationsCache;
@@ -162,13 +165,13 @@ public class TestConfig {
@Bean
public Map<String, Object> kafkaConsumer(KafkaComponent kafkaWithZKComponent) {
Map<String, Object> props = new HashMap<>();
- props.put("bootstrap.servers", kafkaWithZKComponent.getBrokerList());
- props.put("group.id", "metron-config");
- props.put("enable.auto.commit", "false");
- props.put("auto.commit.interval.ms", "1000");
- props.put("session.timeout.ms", "30000");
- props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
- props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
+ props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaWithZKComponent.getBrokerList());
+ props.put(ConsumerConfig.GROUP_ID_CONFIG, "metron-config");
+ props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
+ props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
+ props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "30000");
+ props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
+ props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
return props;
}
@@ -180,10 +183,10 @@ public class TestConfig {
@Bean
public Map<String, Object> producerProperties(KafkaComponent kafkaWithZKComponent) {
Map<String, Object> producerConfig = new HashMap<>();
- producerConfig.put("bootstrap.servers", kafkaWithZKComponent.getBrokerList());
- producerConfig.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
- producerConfig.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
- producerConfig.put("request.required.acks", 1);
+ producerConfig.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaWithZKComponent.getBrokerList());
+ producerConfig.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
+ producerConfig.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
+ producerConfig.put(ProducerConfig.ACKS_CONFIG, "1");
return producerConfig;
}
@@ -205,8 +208,10 @@ public class TestConfig {
}
@Bean
- public AdminUtils$ adminUtils() {
- return AdminUtils$.MODULE$;
+ public AdminClient adminUtils(KafkaComponent kafkaWithZKComponent) {
+ Map<String, Object> adminConfig = new HashMap<>();
+ adminConfig.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaWithZKComponent.getBrokerList());
+ return AdminClient.create(adminConfig);
}
@Bean(destroyMethod = "close")
diff --git a/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/service/impl/KafkaServiceImplTest.java b/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/service/impl/KafkaServiceImplTest.java
index b99128a..4c56166 100644
--- a/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/service/impl/KafkaServiceImplTest.java
+++ b/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/service/impl/KafkaServiceImplTest.java
@@ -20,8 +20,8 @@ package org.apache.metron.rest.service.impl;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
+import static org.mockito.Matchers.any;
import static org.mockito.Matchers.eq;
-import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoMoreInteractions;
@@ -32,24 +32,27 @@ import static org.powermock.api.mockito.PowerMockito.mock;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
-import java.util.ArrayList;
+
+import java.time.Duration;
+import java.util.Collection;
+import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
-import kafka.admin.AdminOperationException;
-import kafka.admin.AdminUtils$;
-import kafka.admin.RackAwareMode;
-import kafka.utils.ZkUtils;
+
+import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.InvalidReplicationFactorException;
import org.apache.metron.rest.RestException;
import org.apache.metron.rest.model.KafkaTopic;
import org.apache.metron.rest.service.KafkaService;
@@ -57,26 +60,16 @@ import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
-import org.junit.runner.RunWith;
-import org.powermock.core.classloader.annotations.PowerMockIgnore;
-import org.powermock.core.classloader.annotations.PrepareForTest;
-import org.powermock.modules.junit4.PowerMockRunner;
import org.springframework.kafka.core.ConsumerFactory;
-
-@SuppressWarnings("unchecked")
-@RunWith(PowerMockRunner.class)
-@PowerMockIgnore("javax.management.*") // resolve classloader conflict
-@PrepareForTest({AdminUtils$.class})
public class KafkaServiceImplTest {
@Rule
public final ExpectedException exception = ExpectedException.none();
- private ZkUtils zkUtils;
private KafkaConsumer<String, String> kafkaConsumer;
private KafkaProducer<String, String> kafkaProducer;
private ConsumerFactory<String, String> kafkaConsumerFactory;
- private AdminUtils$ adminUtils;
+ private AdminClient adminClient;
private KafkaService kafkaService;
@@ -91,15 +84,14 @@ public class KafkaServiceImplTest {
@SuppressWarnings("unchecked")
@Before
public void setUp() throws Exception {
- zkUtils = mock(ZkUtils.class);
kafkaConsumerFactory = mock(ConsumerFactory.class);
kafkaConsumer = mock(KafkaConsumer.class);
kafkaProducer = mock(KafkaProducer.class);
- adminUtils = mock(AdminUtils$.class);
+ adminClient = mock(AdminClient.class);
when(kafkaConsumerFactory.createConsumer()).thenReturn(kafkaConsumer);
- kafkaService = new KafkaServiceImpl(zkUtils, kafkaConsumerFactory, kafkaProducer, adminUtils);
+ kafkaService = new KafkaServiceImpl(kafkaConsumerFactory, kafkaProducer, adminClient);
}
@Test
@@ -112,10 +104,9 @@ public class KafkaServiceImplTest {
assertEquals(Sets.newHashSet(), listedTopics);
- verifyZeroInteractions(zkUtils);
verify(kafkaConsumer).listTopics();
verify(kafkaConsumer).close();
- verifyNoMoreInteractions(kafkaConsumer, zkUtils, adminUtils);
+ verifyNoMoreInteractions(kafkaConsumer, adminClient);
}
@Test
@@ -128,10 +119,9 @@ public class KafkaServiceImplTest {
assertEquals(Sets.newHashSet(), listedTopics);
- verifyZeroInteractions(zkUtils);
verify(kafkaConsumer).listTopics();
verify(kafkaConsumer).close();
- verifyNoMoreInteractions(kafkaConsumer, zkUtils);
+ verifyNoMoreInteractions(kafkaConsumer);
}
@Test
@@ -147,10 +137,9 @@ public class KafkaServiceImplTest {
assertEquals(Sets.newHashSet("topic1", "topic2", "topic3"), listedTopics);
- verifyZeroInteractions(zkUtils);
verify(kafkaConsumer).listTopics();
verify(kafkaConsumer).close();
- verifyNoMoreInteractions(kafkaConsumer, zkUtils);
+ verifyNoMoreInteractions(kafkaConsumer);
}
@Test
@@ -167,10 +156,9 @@ public class KafkaServiceImplTest {
assertEquals(Sets.newHashSet("topic1", "topic2", "topic3"), listedTopics);
- verifyZeroInteractions(zkUtils);
verify(kafkaConsumer).listTopics();
verify(kafkaConsumer).close();
- verifyNoMoreInteractions(kafkaConsumer, zkUtils);
+ verifyNoMoreInteractions(kafkaConsumer);
}
@Test
@@ -179,10 +167,9 @@ public class KafkaServiceImplTest {
assertFalse(kafkaService.deleteTopic("non_existent_topic"));
- verifyZeroInteractions(zkUtils);
verify(kafkaConsumer).listTopics();
verify(kafkaConsumer).close();
- verifyNoMoreInteractions(kafkaConsumer, zkUtils);
+ verifyNoMoreInteractions(kafkaConsumer);
}
@Test
@@ -196,7 +183,7 @@ public class KafkaServiceImplTest {
verify(kafkaConsumer).listTopics();
verify(kafkaConsumer).close();
- verify(adminUtils).deleteTopic(zkUtils, "non_existent_topic");
+ verify(adminClient).deleteTopics(Collections.singletonList("non_existent_topic"));
verifyNoMoreInteractions(kafkaConsumer);
}
@@ -248,29 +235,21 @@ public class KafkaServiceImplTest {
verify(kafkaConsumer).listTopics();
verify(kafkaConsumer, times(0)).partitionsFor("t");
verify(kafkaConsumer).close();
- verifyZeroInteractions(zkUtils);
verifyNoMoreInteractions(kafkaConsumer);
}
@Test
public void createTopicShouldFailIfReplicationFactorIsGreaterThanAvailableBrokers() throws Exception {
exception.expect(RestException.class);
- doThrow(AdminOperationException.class).when(adminUtils).createTopic(eq(zkUtils), eq("t"), eq(1), eq(2), eq(new Properties()), eq(RackAwareMode.Disabled$.MODULE$));
+ when(adminClient.createTopics(any(Collection.class))).thenThrow(InvalidReplicationFactorException.class);
kafkaService.createTopic(VALID_KAFKA_TOPIC);
}
@Test
- public void whenAdminUtilsThrowsAdminOperationExceptionCreateTopicShouldProperlyWrapExceptionInRestException() throws Exception {
+ public void whenAdminClientThrowsKafkaExceptionCreateTopicShouldProperlyWrapExceptionInRestException() throws Exception {
exception.expect(RestException.class);
-
- final Map<String, List<PartitionInfo>> topics = new HashMap<>();
- topics.put("1", new ArrayList<>());
-
- when(kafkaConsumer.listTopics()).thenReturn(topics);
-
- doThrow(AdminOperationException.class).when(adminUtils).createTopic(eq(zkUtils), eq("t"), eq(1), eq(2), eq(new Properties()), eq(RackAwareMode.Disabled$.MODULE$));
-
+ when(adminClient.createTopics(any(Collection.class))).thenThrow(KafkaException.class);
kafkaService.createTopic(VALID_KAFKA_TOPIC);
}
@@ -291,18 +270,18 @@ public class KafkaServiceImplTest {
when(kafkaConsumer.partitionsFor(eq(topicName))).thenReturn(partitionInfo);
when(kafkaConsumer.assignment()).thenReturn(topicPartitionsSet);
when(kafkaConsumer.position(topicPartition)).thenReturn(1L);
- when(kafkaConsumer.poll(100)).thenReturn(records);
+ when(kafkaConsumer.poll(Duration.ofMillis(100))).thenReturn(records);
assertEquals("message", kafkaService.getSampleMessage(topicName));
verify(kafkaConsumer).assign(eq(topicPartitions));
verify(kafkaConsumer).assignment();
- verify(kafkaConsumer).poll(100);
+ verify(kafkaConsumer).poll(Duration.ofMillis(100));
verify(kafkaConsumer).unsubscribe();
verify(kafkaConsumer, times(2)).position(topicPartition);
verify(kafkaConsumer).seek(topicPartition, 0);
- verifyZeroInteractions(zkUtils, adminUtils);
+ verifyZeroInteractions(adminClient);
}
@Test
diff --git a/metron-platform/metron-common/pom.xml b/metron-platform/metron-common/pom.xml
index d215180..cf4759c 100644
--- a/metron-platform/metron-common/pom.xml
+++ b/metron-platform/metron-common/pom.xml
@@ -64,6 +64,12 @@
<artifactId>metron-integration-test</artifactId>
<version>${project.parent.version}</version>
<scope>test</scope>
+ <exclusions>
+ <exclusion>
+ <groupId>org.slf4j</groupId>
+ <artifactId>log4j-over-slf4j</artifactId>
+ </exclusion>
+ </exclusions>
</dependency>
<dependency>
<groupId>commons-validator</groupId>
@@ -87,7 +93,7 @@
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
- <artifactId>kafka_2.10</artifactId>
+ <artifactId>kafka-clients</artifactId>
<version>${global_kafka_version}</version>
<scope>provided</scope>
<exclusions>
diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/utils/KafkaUtils.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/utils/KafkaUtils.java
index 5cd25a2..685590d 100644
--- a/metron-platform/metron-common/src/main/java/org/apache/metron/common/utils/KafkaUtils.java
+++ b/metron-platform/metron-common/src/main/java/org/apache/metron/common/utils/KafkaUtils.java
@@ -25,7 +25,7 @@ import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
-import org.apache.kafka.common.protocol.SecurityProtocol;
+import org.apache.kafka.common.security.auth.SecurityProtocol;
import java.util.ArrayList;
import java.util.List;
@@ -107,15 +107,15 @@ public enum KafkaUtils {
*/
public String normalizeProtocol(String protocol) {
if(protocol.equalsIgnoreCase("PLAINTEXTSASL") || protocol.equalsIgnoreCase("SASL_PLAINTEXT")) {
- if(SecurityProtocol.getNames().contains("PLAINTEXTSASL")) {
+ if(SecurityProtocol.names().contains("PLAINTEXTSASL")) {
return "PLAINTEXTSASL";
}
- else if(SecurityProtocol.getNames().contains("SASL_PLAINTEXT")) {
+ else if(SecurityProtocol.names().contains("SASL_PLAINTEXT")) {
return "SASL_PLAINTEXT";
}
else {
throw new IllegalStateException("Unable to find the appropriate SASL protocol, " +
- "viable options are: " + Joiner.on(",").join(SecurityProtocol.getNames()));
+ "viable options are: " + Joiner.on(",").join(SecurityProtocol.names()));
}
}
else {
diff --git a/metron-platform/metron-elasticsearch/metron-elasticsearch-common/pom.xml b/metron-platform/metron-elasticsearch/metron-elasticsearch-common/pom.xml
index ce61df3..ac4e0a2 100644
--- a/metron-platform/metron-elasticsearch/metron-elasticsearch-common/pom.xml
+++ b/metron-platform/metron-elasticsearch/metron-elasticsearch-common/pom.xml
@@ -78,7 +78,7 @@
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
- <version>${guava_version}</version>
+ <version>${global_guava_version}</version>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
@@ -134,21 +134,6 @@
</exclusions>
</dependency>
<dependency>
- <groupId>org.apache.kafka</groupId>
- <artifactId>kafka_2.10</artifactId>
- <version>${global_kafka_version}</version>
- <exclusions>
- <exclusion>
- <groupId>log4j</groupId>
- <artifactId>log4j</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.slf4j</groupId>
- <artifactId>slf4j-log4j12</artifactId>
- </exclusion>
- </exclusions>
- </dependency>
- <dependency>
<!-- must be near the top of the dependencies to force retrieval of the correct netty version -->
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
@@ -196,6 +181,10 @@
<artifactId>commons-httpclient</artifactId>
<groupId>commons-httpclient</groupId>
</exclusion>
+ <exclusion>
+ <groupId>org.slf4j</groupId>
+ <artifactId>log4j-over-slf4j</artifactId>
+ </exclusion>
</exclusions>
</dependency>
<dependency>
diff --git a/metron-platform/metron-enrichment/metron-enrichment-storm/pom.xml b/metron-platform/metron-enrichment/metron-enrichment-storm/pom.xml
index fe21399..a76e91e 100644
--- a/metron-platform/metron-enrichment/metron-enrichment-storm/pom.xml
+++ b/metron-platform/metron-enrichment/metron-enrichment-storm/pom.xml
@@ -35,7 +35,11 @@
<dependencies>
<!-- Metron -->
-
+ <dependency>
+ <groupId>com.google.guava</groupId>
+ <artifactId>guava</artifactId>
+ <version>${global_guava_version}</version>
+ </dependency>
<dependency>
<groupId>org.apache.metron</groupId>
<artifactId>metron-enrichment-common</artifactId>
@@ -162,7 +166,7 @@
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
- <artifactId>kafka_2.11</artifactId>
+ <artifactId>kafka-clients</artifactId>
<version>${global_kafka_version}</version>
<exclusions>
<exclusion>
@@ -213,17 +217,6 @@
<version>${global_jackson_version}</version>
</dependency>
<dependency>
- <groupId>org.slf4j</groupId>
- <artifactId>slf4j-api</artifactId>
- <version>${slf4j.version}</version>
- <scope>provided</scope>
- </dependency>
- <dependency>
- <groupId>com.google.guava</groupId>
- <artifactId>guava</artifactId>
- <version>${global_hbase_guava_version}</version>
- </dependency>
- <dependency>
<groupId>commons-validator</groupId>
<artifactId>commons-validator</artifactId>
<version>1.4.0</version>
diff --git a/metron-platform/metron-enrichment/pom.xml b/metron-platform/metron-enrichment/pom.xml
index 1deb04a..1095021 100644
--- a/metron-platform/metron-enrichment/pom.xml
+++ b/metron-platform/metron-enrichment/pom.xml
@@ -61,6 +61,12 @@
<artifactId>metron-integration-test</artifactId>
<version>${project.parent.version}</version>
<scope>test</scope>
+ <exclusions>
+ <exclusion>
+ <groupId>org.slf4j</groupId>
+ <artifactId>log4j-over-slf4j</artifactId>
+ </exclusion>
+ </exclusions>
</dependency>
<dependency>
<groupId>org.apache.metron</groupId>
diff --git a/metron-platform/metron-hbase-server/pom.xml b/metron-platform/metron-hbase-server/pom.xml
index 0cbdeeb..27831fb 100644
--- a/metron-platform/metron-hbase-server/pom.xml
+++ b/metron-platform/metron-hbase-server/pom.xml
@@ -176,6 +176,12 @@
<artifactId>metron-integration-test</artifactId>
<version>${project.parent.version}</version>
<scope>test</scope>
+ <exclusions>
+ <exclusion>
+ <groupId>org.slf4j</groupId>
+ <artifactId>log4j-over-slf4j</artifactId>
+ </exclusion>
+ </exclusions>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
diff --git a/metron-platform/metron-indexing/metron-indexing-common/pom.xml b/metron-platform/metron-indexing/metron-indexing-common/pom.xml
index 3f8e812..e79758f 100644
--- a/metron-platform/metron-indexing/metron-indexing-common/pom.xml
+++ b/metron-platform/metron-indexing/metron-indexing-common/pom.xml
@@ -141,6 +141,12 @@
<artifactId>metron-integration-test</artifactId>
<version>${project.parent.version}</version>
<scope>test</scope>
+ <exclusions>
+ <exclusion>
+ <groupId>org.slf4j</groupId>
+ <artifactId>log4j-over-slf4j</artifactId>
+ </exclusion>
+ </exclusions>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
diff --git a/metron-platform/metron-indexing/metron-indexing-storm/pom.xml b/metron-platform/metron-indexing/metron-indexing-storm/pom.xml
index 581e819..b52ce7f 100644
--- a/metron-platform/metron-indexing/metron-indexing-storm/pom.xml
+++ b/metron-platform/metron-indexing/metron-indexing-storm/pom.xml
@@ -31,6 +31,11 @@
</properties>
<dependencies>
<dependency>
+ <groupId>org.apache.commons</groupId>
+ <artifactId>commons-lang3</artifactId>
+ <version>3.2</version>
+ </dependency>
+ <dependency>
<groupId>org.apache.metron</groupId>
<artifactId>metron-indexing-common</artifactId>
<version>${project.parent.version}</version>
diff --git a/metron-platform/metron-integration-test/pom.xml b/metron-platform/metron-integration-test/pom.xml
index 427290f..1719566 100644
--- a/metron-platform/metron-integration-test/pom.xml
+++ b/metron-platform/metron-integration-test/pom.xml
@@ -130,7 +130,7 @@
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
- <artifactId>kafka_2.10</artifactId>
+ <artifactId>kafka-clients</artifactId>
<version>${global_kafka_version}</version>
<classifier>test</classifier>
<exclusions>
@@ -145,6 +145,19 @@
</exclusions>
</dependency>
+
+ <dependency>
+ <groupId>org.apache.kafka</groupId>
+ <artifactId>kafka_2.12</artifactId>
+ <version>${global_kafka_version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.kafka</groupId>
+ <artifactId>kafka_2.12</artifactId>
+ <version>${global_kafka_version}</version>
+ <classifier>test</classifier>
+ </dependency>
+
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
diff --git a/metron-platform/metron-integration-test/src/main/java/org/apache/metron/integration/components/KafkaComponent.java b/metron-platform/metron-integration-test/src/main/java/org/apache/metron/integration/components/KafkaComponent.java
index 08910be..cad8549 100644
--- a/metron-platform/metron-integration-test/src/main/java/org/apache/metron/integration/components/KafkaComponent.java
+++ b/metron-platform/metron-integration-test/src/main/java/org/apache/metron/integration/components/KafkaComponent.java
@@ -20,12 +20,13 @@ package org.apache.metron.integration.components;
import com.google.common.base.Function;
import java.lang.invoke.MethodHandles;
-import java.nio.ByteBuffer;
+import java.time.Duration;
+
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
-import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Properties;
@@ -33,21 +34,21 @@ import java.util.logging.Level;
import java.util.stream.Collectors;
import java.util.stream.Stream;
-import kafka.api.FetchRequest;
-import kafka.api.FetchRequestBuilder;
-import kafka.common.TopicExistsException;
-import kafka.consumer.ConsumerConfig;
-import kafka.consumer.ConsumerIterator;
-import kafka.consumer.KafkaStream;
-import kafka.javaapi.FetchResponse;
-import kafka.javaapi.consumer.ConsumerConnector;
-import kafka.javaapi.consumer.SimpleConsumer;
-import kafka.message.MessageAndOffset;
+import org.apache.kafka.clients.admin.AdminClientConfig;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.TopicExistsException;
import kafka.server.KafkaConfig;
import kafka.server.KafkaServer;
-import kafka.utils.MockTime;
import kafka.utils.TestUtils;
-import kafka.utils.Time;
+import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.clients.admin.AdminClient;
+
import kafka.utils.ZKStringSerializer$;
import kafka.utils.ZkUtils;
import org.I0Itec.zkclient.ZkClient;
@@ -65,6 +66,7 @@ import org.slf4j.LoggerFactory;
public class KafkaComponent implements InMemoryComponent {
protected static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+ private static final int POLL_SECONDS = 1;
public static class Topic {
public int numPartitions;
@@ -79,7 +81,8 @@ public class KafkaComponent implements InMemoryComponent {
private List<KafkaProducer> producersCreated = new ArrayList<>();
private transient KafkaServer kafkaServer;
private transient ZkClient zkClient;
- private transient ConsumerConnector consumer;
+ private transient AdminClient adminClient;
+ private transient KafkaConsumer<byte[], byte[]> consumer;
private String zookeeperConnectString;
private Properties topologyProperties;
@@ -137,13 +140,20 @@ public class KafkaComponent implements InMemoryComponent {
return createProducer(String.class, byte[].class);
}
+ public AdminClient createAdminClient() {
+ Map<String, Object> adminConfig = new HashMap<>();
+ adminConfig.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, getBrokerList());
+ AdminClient adminClient = AdminClient.create(adminConfig);
+ return adminClient;
+ }
+
public <K,V> KafkaProducer<K,V> createProducer(Map<String, Object> properties, Class<K> keyClass, Class<V> valueClass)
{
Map<String, Object> producerConfig = new HashMap<>();
- producerConfig.put("bootstrap.servers", getBrokerList());
- producerConfig.put("key.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
- producerConfig.put("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
- producerConfig.put("request.required.acks", "-1");
+ producerConfig.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, getBrokerList());
+ producerConfig.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer");
+ producerConfig.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer");
+ producerConfig.put(ProducerConfig.ACKS_CONFIG, "-1");
producerConfig.put("fetch.message.max.bytes", ""+ 1024*1024*10);
producerConfig.put("replica.fetch.max.bytes", "" + 1024*1024*10);
producerConfig.put("message.max.bytes", "" + 1024*1024*10);
@@ -154,6 +164,18 @@ public class KafkaComponent implements InMemoryComponent {
return ret;
}
+ public <K,V> KafkaConsumer<K, V> createConsumer(Map<String, Object> properties) {
+ Properties consumerConfig = new Properties();
+ consumerConfig.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, getBrokerList());
+ consumerConfig.put(ConsumerConfig.GROUP_ID_CONFIG, "consumer");
+ consumerConfig.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+ consumerConfig.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer");
+ consumerConfig.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer");
+ consumerConfig.putAll(properties);
+ KafkaConsumer consumer = new KafkaConsumer<>(consumerConfig);
+ return consumer;
+ }
+
@Override
public void start() {
// setup Zookeeper
@@ -184,6 +206,8 @@ public class KafkaComponent implements InMemoryComponent {
if(postStartCallback != null) {
postStartCallback.apply(this);
}
+
+ adminClient = createAdminClient();
}
public String getZookeeperConnect() {
@@ -192,7 +216,6 @@ public class KafkaComponent implements InMemoryComponent {
@Override
public void stop() {
- shutdownConsumer();
shutdownProducers();
if(kafkaServer != null) {
@@ -222,8 +245,6 @@ public class KafkaComponent implements InMemoryComponent {
zkClient.deleteRecursive(ZkUtils.PreferredReplicaLeaderElectionPath());
zkClient.deleteRecursive(ZkUtils.BrokerSequenceIdPath());
zkClient.deleteRecursive(ZkUtils.IsrChangeNotificationPath());
- zkClient.deleteRecursive(ZkUtils.EntityConfigPath());
- zkClient.deleteRecursive(ZkUtils.EntityConfigChangesPath());
zkClient.close();
}
}
@@ -237,42 +258,23 @@ public class KafkaComponent implements InMemoryComponent {
}
public List<byte[]> readMessages(String topic) {
- SimpleConsumer consumer = new SimpleConsumer("localhost", 6667, 100000, 64 * 1024, "consumer");
- FetchRequest req = new FetchRequestBuilder()
- .clientId("consumer")
- .addFetch(topic, 0, 0, 100000)
- .build();
- FetchResponse fetchResponse = consumer.fetch(req);
- Iterator<MessageAndOffset> results = fetchResponse.messageSet(topic, 0).iterator();
+ if (consumer == null) {
+ consumer = createConsumer(new HashMap<>());
+ }
+ consumer.assign(Arrays.asList(new TopicPartition(topic, 0)));
+ consumer.seek(new TopicPartition(topic, 0), 0);
List<byte[]> messages = new ArrayList<>();
- while(results.hasNext()) {
- ByteBuffer payload = results.next().message().payload();
- byte[] bytes = new byte[payload.limit()];
- payload.get(bytes);
- messages.add(bytes);
+ ConsumerRecords<byte[], byte[]> records = consumer.poll(Duration.ofSeconds(POLL_SECONDS));
+ for(ConsumerRecord<byte[], byte[]> record: records) {
+ messages.add(record.value());
}
- consumer.close();
- return messages;
- }
- public ConsumerIterator<byte[], byte[]> getStreamIterator(String topic) {
- return getStreamIterator(topic, "group0", "consumer0");
- }
- public ConsumerIterator<byte[], byte[]> getStreamIterator(String topic, String group, String consumerName) {
- // setup simple consumer
- Properties consumerProperties = TestUtils.createConsumerProperties(zookeeperConnectString, group, consumerName, -1);
- consumer = kafka.consumer.Consumer.createJavaConsumerConnector(new ConsumerConfig(consumerProperties));
- Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
- topicCountMap.put(topic, 1);
- Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap);
- KafkaStream<byte[], byte[]> stream = consumerMap.get(topic).get(0);
- ConsumerIterator<byte[], byte[]> iterator = stream.iterator();
- return iterator;
+ return messages;
}
public void shutdownConsumer() {
if(consumer != null) {
- consumer.shutdown();
+ consumer.close();
}
}
diff --git a/metron-platform/metron-management/pom.xml b/metron-platform/metron-management/pom.xml
index c5c319e..8fa10bb 100644
--- a/metron-platform/metron-management/pom.xml
+++ b/metron-platform/metron-management/pom.xml
@@ -150,6 +150,12 @@
<artifactId>metron-integration-test</artifactId>
<version>${project.parent.version}</version>
<scope>test</scope>
+ <exclusions>
+ <exclusion>
+ <groupId>org.slf4j</groupId>
+ <artifactId>log4j-over-slf4j</artifactId>
+ </exclusion>
+ </exclusions>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
@@ -176,22 +182,6 @@
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
- <artifactId>kafka_2.10</artifactId>
- <version>${global_kafka_version}</version>
- <classifier>test</classifier>
- <exclusions>
- <exclusion>
- <artifactId>slf4j-log4j12</artifactId>
- <groupId>org.slf4j</groupId>
- </exclusion>
- <exclusion>
- <groupId>log4j</groupId>
- <artifactId>log4j</artifactId>
- </exclusion>
- </exclusions>
- </dependency>
- <dependency>
- <groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>${global_kafka_version}</version>
<classifier>test</classifier>
diff --git a/metron-platform/metron-parsing/metron-parsers-common/pom.xml b/metron-platform/metron-parsing/metron-parsers-common/pom.xml
index 631bdd8..ad9a2b0 100644
--- a/metron-platform/metron-parsing/metron-parsers-common/pom.xml
+++ b/metron-platform/metron-parsing/metron-parsers-common/pom.xml
@@ -144,21 +144,6 @@
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
- <artifactId>kafka_2.10</artifactId>
- <version>${global_kafka_version}</version>
- <exclusions>
- <exclusion>
- <artifactId>slf4j-log4j12</artifactId>
- <groupId>org.slf4j</groupId>
- </exclusion>
- <exclusion>
- <groupId>log4j</groupId>
- <artifactId>log4j</artifactId>
- </exclusion>
- </exclusions>
- </dependency>
- <dependency>
- <groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>${global_kafka_version}</version>
<classifier>test</classifier>
@@ -180,6 +165,12 @@
<artifactId>metron-integration-test</artifactId>
<version>${project.parent.version}</version>
<scope>test</scope>
+ <exclusions>
+ <exclusion>
+ <groupId>org.slf4j</groupId>
+ <artifactId>log4j-over-slf4j</artifactId>
+ </exclusion>
+ </exclusions>
</dependency>
<dependency>
<groupId>com.github.fge</groupId>
diff --git a/metron-platform/metron-parsing/metron-parsers/pom.xml b/metron-platform/metron-parsing/metron-parsers/pom.xml
index d888e19..5c323d8 100644
--- a/metron-platform/metron-parsing/metron-parsers/pom.xml
+++ b/metron-platform/metron-parsing/metron-parsers/pom.xml
@@ -32,6 +32,11 @@
<dependencies>
<!-- Metron Dependencies -->
<dependency>
+ <groupId>com.google.guava</groupId>
+ <artifactId>guava</artifactId>
+ <version>${global_guava_version}</version>
+ </dependency>
+ <dependency>
<groupId>org.apache.metron</groupId>
<artifactId>metron-parsers-common</artifactId>
<version>${project.parent.version}</version>
@@ -43,11 +48,7 @@
</dependency>
<!-- 3rd party dependencies -->
- <dependency>
- <groupId>com.google.guava</groupId>
- <artifactId>guava</artifactId>
- <version>${global_guava_version}</version>
- </dependency>
+
<dependency>
<groupId>io.thekraken</groupId>
<artifactId>grok</artifactId>
diff --git a/metron-platform/metron-parsing/metron-parsing-storm/src/main/java/org/apache/metron/parsers/topology/ParserTopologyCLI.java b/metron-platform/metron-parsing/metron-parsing-storm/src/main/java/org/apache/metron/parsers/topology/ParserTopologyCLI.java
index 1382bba..34ba9da 100644
--- a/metron-platform/metron-parsing/metron-parsing-storm/src/main/java/org/apache/metron/parsers/topology/ParserTopologyCLI.java
+++ b/metron-platform/metron-parsing/metron-parsing-storm/src/main/java/org/apache/metron/parsers/topology/ParserTopologyCLI.java
@@ -37,7 +37,7 @@ import org.apache.commons.cli.Options;
import org.apache.commons.cli.ParseException;
import org.apache.commons.cli.PosixParser;
import org.apache.commons.io.FileUtils;
-import org.apache.kafka.common.protocol.SecurityProtocol;
+import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.apache.metron.common.Constants;
import org.apache.metron.common.configuration.SensorParserConfig;
import org.apache.metron.common.utils.JSONUtils;
diff --git a/metron-platform/metron-pcap-backend/pom.xml b/metron-platform/metron-pcap-backend/pom.xml
index ccc69b8..d1bd1fd 100644
--- a/metron-platform/metron-pcap-backend/pom.xml
+++ b/metron-platform/metron-pcap-backend/pom.xml
@@ -131,21 +131,6 @@
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
- <artifactId>kafka_2.10</artifactId>
- <version>${global_kafka_version}</version>
- <exclusions>
- <exclusion>
- <groupId>log4j</groupId>
- <artifactId>log4j</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.slf4j</groupId>
- <artifactId>slf4j-log4j12</artifactId>
- </exclusion>
- </exclusions>
- </dependency>
- <dependency>
- <groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>${global_kafka_version}</version>
<exclusions>
diff --git a/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/integration/PcapTopologyIntegrationTest.java b/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/integration/PcapTopologyIntegrationTest.java
index a8f0676..3f9a200 100644
--- a/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/integration/PcapTopologyIntegrationTest.java
+++ b/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/integration/PcapTopologyIntegrationTest.java
@@ -35,7 +35,6 @@ import java.util.List;
import java.util.Map;
import java.util.Properties;
import javax.annotation.Nullable;
-import kafka.consumer.ConsumerIterator;
import org.apache.commons.io.FileUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
@@ -162,12 +161,8 @@ public class PcapTopologyIntegrationTest extends BaseIntegrationTest {
KafkaUtil.send(producer, pcapEntries, KAFKA_TOPIC, 2);
System.out.println("Sent pcap data: " + pcapEntries.size());
{
- int numMessages = 0;
- ConsumerIterator<?, ?> it = kafkaComponent.getStreamIterator(KAFKA_TOPIC);
- for (int i = 0; i < pcapEntries.size(); ++i, it.next()) {
- numMessages++;
- }
- Assert.assertEquals(pcapEntries.size(), numMessages);
+ List<byte[]> messages = kafkaComponent.readMessages(KAFKA_TOPIC);
+ Assert.assertEquals(pcapEntries.size(), messages.size());
System.out.println("Wrote " + pcapEntries.size() + " to kafka");
}
}
diff --git a/metron-platform/metron-pcap/pom.xml b/metron-platform/metron-pcap/pom.xml
index ffa7f3d..02180fb 100644
--- a/metron-platform/metron-pcap/pom.xml
+++ b/metron-platform/metron-pcap/pom.xml
@@ -28,6 +28,11 @@
</properties>
<dependencies>
<dependency>
+ <groupId>org.apache.commons</groupId>
+ <artifactId>commons-lang3</artifactId>
+ <version>3.2</version>
+ </dependency>
+ <dependency>
<groupId>net.byteseek</groupId>
<artifactId>byteseek</artifactId>
<version>2.0.3</version>
@@ -123,30 +128,6 @@
</exclusions>
</dependency>
<dependency>
- <groupId>org.apache.kafka</groupId>
- <artifactId>kafka_2.10</artifactId>
- <version>${global_kafka_version}</version>
- <scope>provided</scope>
- <exclusions>
- <exclusion>
- <groupId>com.sun.jmx</groupId>
- <artifactId>jmxri</artifactId>
- </exclusion>
- <exclusion>
- <groupId>com.sun.jdmk</groupId>
- <artifactId>jmxtools</artifactId>
- </exclusion>
- <exclusion>
- <groupId>javax.jms</groupId>
- <artifactId>jms</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.slf4j</groupId>
- <artifactId>slf4j-log4j12</artifactId>
- </exclusion>
- </exclusions>
- </dependency>
- <dependency>
<groupId>org.apache.metron</groupId>
<artifactId>metron-storm-kafka</artifactId>
<version>${project.parent.version}</version>
diff --git a/metron-platform/metron-solr/metron-solr-common/pom.xml b/metron-platform/metron-solr/metron-solr-common/pom.xml
index ffaf6d9..0346850 100644
--- a/metron-platform/metron-solr/metron-solr-common/pom.xml
+++ b/metron-platform/metron-solr/metron-solr-common/pom.xml
@@ -85,21 +85,6 @@
</exclusions>
</dependency>
<dependency>
- <groupId>org.apache.kafka</groupId>
- <artifactId>kafka_2.10</artifactId>
- <version>${global_kafka_version}</version>
- <exclusions>
- <exclusion>
- <groupId>log4j</groupId>
- <artifactId>log4j</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.slf4j</groupId>
- <artifactId>slf4j-log4j12</artifactId>
- </exclusion>
- </exclusions>
- </dependency>
- <dependency>
<groupId>org.apache.metron</groupId>
<artifactId>metron-indexing-common</artifactId>
<version>${project.parent.version}</version>
@@ -191,6 +176,10 @@
<groupId>org.hamcrest</groupId>
<artifactId>hamcrest-core</artifactId>
</exclusion>
+ <exclusion>
+ <groupId>org.slf4j</groupId>
+ <artifactId>log4j-over-slf4j</artifactId>
+ </exclusion>
</exclusions>
</dependency>
<dependency>
diff --git a/metron-platform/metron-solr/metron-solr-storm/pom.xml b/metron-platform/metron-solr/metron-solr-storm/pom.xml
index 22c8c50..b515ceb 100644
--- a/metron-platform/metron-solr/metron-solr-storm/pom.xml
+++ b/metron-platform/metron-solr/metron-solr-storm/pom.xml
@@ -31,30 +31,6 @@
</properties>
<dependencies>
<dependency>
- <groupId>org.apache.metron</groupId>
- <artifactId>metron-solr-common</artifactId>
- <version>${project.parent.version}</version>
- </dependency>
- <dependency>
- <groupId>org.apache.metron</groupId>
- <artifactId>metron-solr-common</artifactId>
- <version>${project.parent.version}</version>
- <scope>test</scope>
- <type>test-jar</type>
- </dependency>
- <dependency>
- <groupId>org.apache.metron</groupId>
- <artifactId>metron-indexing-storm</artifactId>
- <version>${project.parent.version}</version>
- </dependency>
- <dependency>
- <groupId>org.apache.metron</groupId>
- <artifactId>metron-indexing-storm</artifactId>
- <version>${project.parent.version}</version>
- <scope>test</scope>
- <type>test-jar</type>
- </dependency>
- <dependency>
<groupId>org.apache.solr</groupId>
<artifactId>solr-test-framework</artifactId>
<version>${global_solr_version}</version>
@@ -83,6 +59,30 @@
</exclusions>
</dependency>
<dependency>
+ <groupId>org.apache.metron</groupId>
+ <artifactId>metron-solr-common</artifactId>
+ <version>${project.parent.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.metron</groupId>
+ <artifactId>metron-solr-common</artifactId>
+ <version>${project.parent.version}</version>
+ <scope>test</scope>
+ <type>test-jar</type>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.metron</groupId>
+ <artifactId>metron-indexing-storm</artifactId>
+ <version>${project.parent.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.metron</groupId>
+ <artifactId>metron-indexing-storm</artifactId>
+ <version>${project.parent.version}</version>
+ <scope>test</scope>
+ <type>test-jar</type>
+ </dependency>
+ <dependency>
<groupId>org.apache.storm</groupId>
<artifactId>flux-core</artifactId>
<version>${global_flux_version}</version>
diff --git a/metron-platform/metron-storm-kafka-override/pom.xml b/metron-platform/metron-storm-kafka-override/pom.xml
index 3addee3..c30b8e6 100644
--- a/metron-platform/metron-storm-kafka-override/pom.xml
+++ b/metron-platform/metron-storm-kafka-override/pom.xml
@@ -74,30 +74,6 @@
</exclusion>
</exclusions>
</dependency>
- <dependency>
- <groupId>org.apache.kafka</groupId>
- <artifactId>kafka_2.10</artifactId>
- <version>${global_kafka_version}</version>
- <scope>provided</scope>
- <exclusions>
- <exclusion>
- <groupId>com.sun.jmx</groupId>
- <artifactId>jmxri</artifactId>
- </exclusion>
- <exclusion>
- <groupId>com.sun.jdmk</groupId>
- <artifactId>jmxtools</artifactId>
- </exclusion>
- <exclusion>
- <groupId>javax.jms</groupId>
- <artifactId>jms</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.slf4j</groupId>
- <artifactId>slf4j-log4j12</artifactId>
- </exclusion>
- </exclusions>
- </dependency>
</dependencies>
<build>
<plugins>
diff --git a/metron-platform/metron-storm-kafka/pom.xml b/metron-platform/metron-storm-kafka/pom.xml
index da762bf..82fece2 100644
--- a/metron-platform/metron-storm-kafka/pom.xml
+++ b/metron-platform/metron-storm-kafka/pom.xml
@@ -69,30 +69,6 @@
</exclusions>
</dependency>
<dependency>
- <groupId>org.apache.kafka</groupId>
- <artifactId>kafka_2.10</artifactId>
- <version>${global_kafka_version}</version>
- <scope>provided</scope>
- <exclusions>
- <exclusion>
- <groupId>com.sun.jmx</groupId>
- <artifactId>jmxri</artifactId>
- </exclusion>
- <exclusion>
- <groupId>com.sun.jdmk</groupId>
- <artifactId>jmxtools</artifactId>
- </exclusion>
- <exclusion>
- <groupId>javax.jms</groupId>
- <artifactId>jms</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.slf4j</groupId>
- <artifactId>slf4j-log4j12</artifactId>
- </exclusion>
- </exclusions>
- </dependency>
- <dependency>
<groupId>org.apache.metron</groupId>
<artifactId>metron-common</artifactId>
<version>${project.parent.version}</version>
diff --git a/metron-platform/metron-test-utilities/pom.xml b/metron-platform/metron-test-utilities/pom.xml
index 221b13c..c651fc7 100644
--- a/metron-platform/metron-test-utilities/pom.xml
+++ b/metron-platform/metron-test-utilities/pom.xml
@@ -131,7 +131,7 @@
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
- <artifactId>kafka_2.10</artifactId>
+ <artifactId>kafka-clients</artifactId>
<version>${global_kafka_version}</version>
<exclusions>
<exclusion>
diff --git a/metron-platform/metron-writer/metron-writer-common/pom.xml b/metron-platform/metron-writer/metron-writer-common/pom.xml
index 0d91b52..8140a6e 100644
--- a/metron-platform/metron-writer/metron-writer-common/pom.xml
+++ b/metron-platform/metron-writer/metron-writer-common/pom.xml
@@ -135,7 +135,7 @@
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
- <artifactId>kafka_2.10</artifactId>
+ <artifactId>kafka-clients</artifactId>
<version>${global_kafka_version}</version>
<scope>provided</scope>
<exclusions>
diff --git a/metron-platform/metron-writer/metron-writer-common/src/main/java/org/apache/metron/writer/kafka/KafkaWriter.java b/metron-platform/metron-writer/metron-writer-common/src/main/java/org/apache/metron/writer/kafka/KafkaWriter.java
index a313057..8e7bac7 100644
--- a/metron-platform/metron-writer/metron-writer-common/src/main/java/org/apache/metron/writer/kafka/KafkaWriter.java
+++ b/metron-platform/metron-writer/metron-writer-common/src/main/java/org/apache/metron/writer/kafka/KafkaWriter.java
@@ -90,7 +90,7 @@ public class KafkaWriter extends AbstractWriter implements BulkMessageWriter<JSO
private int requiredAcks = 1;
private String kafkaTopic = Constants.ENRICHMENT_TOPIC;
private String kafkaTopicField = null;
- private KafkaProducer kafkaProducer;
+ private KafkaProducer<String, String> kafkaProducer;
private String configPrefix = null;
private String zkQuorum = null;
private Map<String, Object> producerConfigs = new HashMap<>();
@@ -213,10 +213,10 @@ public class KafkaWriter extends AbstractWriter implements BulkMessageWriter<JSO
public Map<String, Object> createProducerConfigs() {
Map<String, Object> producerConfig = new HashMap<>();
- producerConfig.put("bootstrap.servers", brokerUrl);
- producerConfig.put("key.serializer", keySerializer);
- producerConfig.put("value.serializer", valueSerializer);
- producerConfig.put("request.required.acks", requiredAcks);
+ producerConfig.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerUrl);
+ producerConfig.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, keySerializer);
+ producerConfig.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, valueSerializer);
+ producerConfig.put(ProducerConfig.ACKS_CONFIG, Integer.toString(requiredAcks));
producerConfig.put(ProducerConfig.BATCH_SIZE_CONFIG, DEFAULT_BATCH_SIZE);
producerConfig.putAll(producerConfigs == null?new HashMap<>():producerConfigs);
producerConfig = KafkaUtils.INSTANCE.normalizeProtocol(producerConfig);
diff --git a/metron-platform/metron-writer/metron-writer-common/src/test/java/org/apache/metron/writer/kafka/KafkaWriterTest.java b/metron-platform/metron-writer/metron-writer-common/src/test/java/org/apache/metron/writer/kafka/KafkaWriterTest.java
index cac3d0b..21bf3fc 100644
--- a/metron-platform/metron-writer/metron-writer-common/src/test/java/org/apache/metron/writer/kafka/KafkaWriterTest.java
+++ b/metron-platform/metron-writer/metron-writer-common/src/test/java/org/apache/metron/writer/kafka/KafkaWriterTest.java
@@ -93,7 +93,7 @@ public class KafkaWriterTest {
assertEquals(producerConfigs.get("bootstrap.servers"), "localhost:6667");
assertEquals(producerConfigs.get("key.serializer"), "org.apache.kafka.common.serialization.StringSerializer");
assertEquals(producerConfigs.get("value.serializer"), "org.apache.kafka.common.serialization.StringSerializer");
- assertEquals(producerConfigs.get("request.required.acks"), 1);
+ assertEquals(producerConfigs.get("acks"), "1");
assertEquals(producerConfigs.get("key1"), 1);
assertEquals(producerConfigs.get("key2"), "value2");
}
@@ -115,7 +115,7 @@ public class KafkaWriterTest {
assertEquals(producerConfigs.get("bootstrap.servers"), "localhost:6667");
assertEquals(producerConfigs.get("key.serializer"), "org.apache.kafka.common.serialization.StringSerializer");
assertEquals(producerConfigs.get("value.serializer"), "org.apache.kafka.common.serialization.StringSerializer");
- assertEquals(producerConfigs.get("request.required.acks"), 1);
+ assertEquals(producerConfigs.get("acks"), "1");
assertEquals(producerConfigs.get("key1"), 1);
assertEquals(producerConfigs.get("key2"), "value2");
}
diff --git a/metron-stellar/stellar-zeppelin/pom.xml b/metron-stellar/stellar-zeppelin/pom.xml
index 6ce6804..9626fd4 100644
--- a/metron-stellar/stellar-zeppelin/pom.xml
+++ b/metron-stellar/stellar-zeppelin/pom.xml
@@ -39,6 +39,12 @@
<artifactId>metron-integration-test</artifactId>
<version>${project.parent.version}</version>
<scope>test</scope>
+ <exclusions>
+ <exclusion>
+ <groupId>org.slf4j</groupId>
+ <artifactId>log4j-over-slf4j</artifactId>
+ </exclusion>
+ </exclusions>
</dependency>
<dependency>
<groupId>org.apache.metron</groupId>
diff --git a/pom.xml b/pom.xml
index f4e8856..73782ec 100644
--- a/pom.xml
+++ b/pom.xml
@@ -99,11 +99,12 @@
<global_opencsv_version>3.7</global_opencsv_version>
<global_curator_version>2.7.1</global_curator_version>
<global_classindex_version>3.3</global_classindex_version>
+ <global_hbase_version>1.1.1</global_hbase_version>
+ <global_hbase_guava_version>12.0</global_hbase_guava_version>
<global_storm_version>1.0.3</global_storm_version>
<global_storm_kafka_version>1.2.2</global_storm_kafka_version>
<global_flux_version>${base_flux_version}</global_flux_version>
<global_pcap_version>1.7.1</global_pcap_version>
- <global_kafka_version>0.10.0.1</global_kafka_version>
<global_hadoop_version>${base_hadoop_version}</global_hadoop_version>
<global_flume_version>${base_flume_version}</global_flume_version>
<global_elasticsearch_version>5.6.14</global_elasticsearch_version>
@@ -112,7 +113,6 @@
<global_junit_version>4.12</global_junit_version>
<global_guava_version>17.0</global_guava_version>
<global_json_schema_validator_version>2.2.5</global_json_schema_validator_version>
- <global_slf4j_version>1.7.7</global_slf4j_version>
<global_opencsv_version>3.7</global_opencsv_version>
<global_java_version>1.8</global_java_version>
<global_solr_version>6.6.2</global_solr_version>
@@ -139,22 +139,14 @@
<profiles>
<profile>
<id>HDP-3.1</id>
- <properties>
- <hdp_version>3.1.0.0</hdp_version>
- <global_hbase_version>2.0.2</global_hbase_version>
- <global_hbase_guava_version>17.0</global_hbase_guava_version>
- <global_zeppelin_version>0.8.0</global_zeppelin_version>
- </properties>
- </profile>
- <profile>
- <id>HDP-2.6</id>
<activation>
<activeByDefault>true</activeByDefault>
</activation>
<properties>
- <hdp_version>2.6.5.0</hdp_version>
- <global_hbase_version>1.1.1</global_hbase_version>
- <global_hbase_guava_version>12.0</global_hbase_guava_version>
+ <hdp_version>3.1.0.0</hdp_version>
+ <global_kafka_version>2.0.0</global_kafka_version>
+ <global_slf4j_version>1.7.25</global_slf4j_version>
+ <global_storm_version>1.2.1</global_storm_version>
<global_zeppelin_version>0.8.0</global_zeppelin_version>
</properties>
</profile>
@@ -163,12 +155,12 @@
<properties>
<hdp_version>2.5.0.0</hdp_version>
<build_number>1245</build_number>
+ <global_hbase_version>1.1.1</global_hbase_version>
+ <global_hbase_guava_version>12.0</global_hbase_guava_version>
<global_storm_kafka_version>1.2.2</global_storm_kafka_version>
<global_storm_version>${base_storm_version}.${hdp_version}-${build_number}</global_storm_version>
<global_kafka_version>${base_kafka_version}.${hdp_version}-${build_number}</global_kafka_version>
- <global_hbase_version>1.1.1</global_hbase_version>
- <global_hbase_guava_version>12.0</global_hbase_guava_version>
- <global_zeppelin_version>0.8.0</global_zeppelin_version>
+ <global_zeppelin_version>0.7.3</global_zeppelin_version>
</properties>
</profile>
</profiles>