You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@metron.apache.org by ce...@apache.org on 2017/03/30 22:17:25 UTC
[1/3] incubator-metron git commit: METRON-797: Pass security.protocol
and enable auto-renew for the storm topologies
Repository: incubator-metron
Updated Branches:
refs/heads/master 98dc7659a -> aef84636a
METRON-797: Pass security.protocol and enable auto-renew for the storm topologies
Project: http://git-wip-us.apache.org/repos/asf/incubator-metron/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-metron/commit/dae102b0
Tree: http://git-wip-us.apache.org/repos/asf/incubator-metron/tree/dae102b0
Diff: http://git-wip-us.apache.org/repos/asf/incubator-metron/diff/dae102b0
Branch: refs/heads/master
Commit: dae102b0228b969d4e685a81dd6df25e59f63cb5
Parents: 98dc765
Author: cstella <ce...@gmail.com>
Authored: Wed Mar 29 08:17:52 2017 -0400
Committer: cstella <ce...@gmail.com>
Committed: Wed Mar 29 08:17:52 2017 -0400
----------------------------------------------------------------------
dependencies_with_url.csv | 5 ++
.../src/main/config/profiler.properties | 6 +-
.../src/main/flux/profiler/remote.yaml | 17 ++++-
.../integration/ProfilerIntegrationTest.java | 2 +
.../METRON/CURRENT/configuration/metron-env.xml | 2 +
.../package/templates/enrichment.properties.j2 | 2 +
.../roles/metron-builder/tasks/main.yml | 2 +-
.../flatfile/importer/MapReduceImporter.java | 2 +
.../src/main/config/elasticsearch.properties | 2 +
.../src/main/config/enrichment.properties | 3 +
.../src/main/flux/enrichment/remote.yaml | 29 +++++++-
.../integration/EnrichmentIntegrationTest.java | 2 +
metron-platform/metron-hbase/pom.xml | 15 ++++
.../src/main/flux/indexing/remote.yaml | 17 +++++
.../integration/IndexingIntegrationTest.java | 2 +
.../parsers/topology/ParserTopologyBuilder.java | 75 +++++++++++++++++---
.../parsers/topology/ParserTopologyCLI.java | 53 +++++++++++---
.../components/ParserTopologyComponent.java | 8 +--
.../parsers/topology/ParserTopologyCLITest.java | 18 ++++-
.../src/main/config/pcap.properties | 2 +
.../src/main/flux/pcap/remote.yaml | 6 ++
.../PcapTopologyIntegrationTest.java | 2 +
.../metron-solr/src/main/config/solr.properties | 2 +
.../metron/writer/hdfs/SourceHandler.java | 2 -
.../apache/metron/writer/kafka/KafkaWriter.java | 7 +-
25 files changed, 247 insertions(+), 36 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/dae102b0/dependencies_with_url.csv
----------------------------------------------------------------------
diff --git a/dependencies_with_url.csv b/dependencies_with_url.csv
index 21f0cb5..25650a3 100644
--- a/dependencies_with_url.csv
+++ b/dependencies_with_url.csv
@@ -65,6 +65,7 @@ com.sun.jersey:jersey-json:jar:1.9:compile,CDDL 1.1,https://jersey.java.net/
com.sun.jersey:jersey-server:jar:1.9:compile,CDDL 1.1,https://jersey.java.net/
com.thoughtworks.paranamer:paranamer:jar:2.3:compile,BSD,https://github.com/paul-hammant/paranamer
javax.servlet.jsp:jsp-api:jar:2.1:runtime,CDDL,http://oracle.com
+javax.servlet.jsp:jsp-api:jar:2.1:compile,CDDL,http://oracle.com
javax.servlet:servlet-api:jar:2.5:compile,CDDL,http://oracle.com
net.jcip:jcip-annotations:jar:1.0:compile,Public,http://jcip.net/
org.codehaus.jettison:jettison:jar:1.1:compile,ASLv2,https://github.com/codehaus/jettison
@@ -157,6 +158,7 @@ commons-digester:commons-digester:jar:1.8:compile,The Apache Software License, V
commons-digester:commons-digester:jar:2.1:compile,ASLv2,http://commons.apache.org/digester/
commons-el:commons-el:jar:1.0:provided,The Apache Software License, Version 2.0,http://jakarta.apache.org/commons/el/
commons-el:commons-el:jar:1.0:runtime,The Apache Software License, Version 2.0,http://jakarta.apache.org/commons/el/
+commons-el:commons-el:jar:1.0:compile,The Apache Software License, Version 2.0,http://jakarta.apache.org/commons/el/
commons-httpclient:commons-httpclient:jar:3.1:compile,Apache License,http://jakarta.apache.org/httpcomponents/httpclient-3.x/
commons-io:commons-io:jar:2.4:compile,ASLv2,http://commons.apache.org/io/
commons-io:commons-io:jar:2.5:compile,ASLv2,http://commons.apache.org/io/
@@ -288,3 +290,6 @@ com.h2database:h2:jar:1.4.192:compile,EPL 1.0,http://www.h2database.com/html/lic
de.jollyday:jollyday:jar:0.5.2:compile,ASLv2,http://jollyday.sourceforge.net/license.html
org.threeten:threeten-extra:jar:1.0:compile,BSD,http://www.threeten.org/threeten-extra/license.html
org.atteo.classindex:classindex:jar:3.3:compile,ASLv2,https://github.com/atteo/classindex
+com.squareup.okhttp:okhttp:jar:2.4.0:compile,ASLv2,https://github.com/square/okhttp
+com.squareup.okio:okio:jar:1.4.0:compile,ASLv2,https://github.com/square/okhttp
+org.htrace:htrace-core:jar:3.0.4:compile,ASLv2,http://htrace.incubator.apache.org/
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/dae102b0/metron-analytics/metron-profiler/src/main/config/profiler.properties
----------------------------------------------------------------------
diff --git a/metron-analytics/metron-profiler/src/main/config/profiler.properties b/metron-analytics/metron-profiler/src/main/config/profiler.properties
index 860934f..b79ac73 100644
--- a/metron-analytics/metron-profiler/src/main/config/profiler.properties
+++ b/metron-analytics/metron-profiler/src/main/config/profiler.properties
@@ -20,6 +20,10 @@
##### Storm #####
+storm.auto.credentials=[]
+
+##### Profiler #####
+
profiler.workers=1
profiler.executors=0
profiler.input.topic=indexing
@@ -34,10 +38,10 @@ profiler.hbase.column.family=P
profiler.hbase.batch=10
profiler.hbase.flush.interval.seconds=30
-
##### Kafka #####
kafka.zk=node1:2181
kafka.broker=node1:6667
# One of EARLIEST, LATEST, UNCOMMITTED_EARLIEST, UNCOMMITTED_LATEST
kafka.start=UNCOMMITTED_EARLIEST
+kafka.security.protocol=PLAINTEXT
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/dae102b0/metron-analytics/metron-profiler/src/main/flux/profiler/remote.yaml
----------------------------------------------------------------------
diff --git a/metron-analytics/metron-profiler/src/main/flux/profiler/remote.yaml b/metron-analytics/metron-profiler/src/main/flux/profiler/remote.yaml
index 7ea77a5..0b14bce 100644
--- a/metron-analytics/metron-profiler/src/main/flux/profiler/remote.yaml
+++ b/metron-analytics/metron-profiler/src/main/flux/profiler/remote.yaml
@@ -17,7 +17,7 @@
name: "profiler"
config:
-
+ topology.auto-credentials: ${storm.auto.credentials}
topology.workers: ${profiler.workers}
topology.acker.executors: ${profiler.executors}
@@ -60,6 +60,10 @@ components:
args:
- "group.id"
- "profiler"
+ - name: "put"
+ args:
+ - "security.protocol"
+ - "${kafka.security.protocol}"
# The fields to pull out of the kafka messages
- id: "fields"
@@ -83,6 +87,13 @@ components:
args:
- "${kafka.start}"
+ - id: "kafkaWriterProps"
+ className: "java.util.HashMap"
+ configMethods:
+ - name: "put"
+ args:
+ - "security.protocol"
+ - "${kafka.security.protocol}"
- id: "kafkaWriter"
className: "org.apache.metron.writer.kafka.KafkaWriter"
@@ -91,6 +102,8 @@ components:
args: ["${profiler.output.topic}"]
- name: "withZkQuorum"
args: ["${kafka.zk}"]
+ - name: "withProducerConfigs"
+ args: [ref: "kafkaWriterProps"]
- id: "kafkaDestinationHandler"
className: "org.apache.metron.profiler.bolt.KafkaDestinationHandler"
@@ -174,4 +187,4 @@ streams:
to: "kafkaBolt"
grouping:
streamId: "kafka"
- type: SHUFFLE
\ No newline at end of file
+ type: SHUFFLE
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/dae102b0/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/integration/ProfilerIntegrationTest.java
----------------------------------------------------------------------
diff --git a/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/integration/ProfilerIntegrationTest.java b/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/integration/ProfilerIntegrationTest.java
index bca8ed5..7591300 100644
--- a/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/integration/ProfilerIntegrationTest.java
+++ b/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/integration/ProfilerIntegrationTest.java
@@ -305,6 +305,8 @@ public class ProfilerIntegrationTest extends BaseIntegrationTest {
setProperty("profiler.hbase.flush.interval.seconds", "1");
setProperty("profiler.profile.ttl", "20");
setProperty("hbase.provider.impl", "" + MockTableProvider.class.getName());
+ setProperty("storm.auto.credentials", "[]");
+ setProperty("kafka.security.protocol", "PLAINTEXT");
}};
// create the mock table
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/dae102b0/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/configuration/metron-env.xml
----------------------------------------------------------------------
diff --git a/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/configuration/metron-env.xml b/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/configuration/metron-env.xml
index 199e708..108e0ba 100644
--- a/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/configuration/metron-env.xml
+++ b/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/configuration/metron-env.xml
@@ -178,6 +178,8 @@ indexing.executors=0
kafka.zk={{ zookeeper_quorum }}
kafka.broker={{ kafka_brokers }}
kafka.start=UNCOMMITTED_EARLIEST
+kafka.security.protocol=PLAINTEXT
+storm.auto.credentials=[]
##### Indexing #####
index.input.topic=indexing
index.error.topic=indexing
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/dae102b0/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/templates/enrichment.properties.j2
----------------------------------------------------------------------
diff --git a/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/templates/enrichment.properties.j2 b/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/templates/enrichment.properties.j2
index dc108f7..8fc2335 100755
--- a/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/templates/enrichment.properties.j2
+++ b/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/templates/enrichment.properties.j2
@@ -19,6 +19,8 @@
kafka.zk={{zookeeper_quorum}}
kafka.broker={{kafka_brokers}}
+kafka.security.protocol=PLAINTEXT
+storm.auto.credentials=[]
enrichment.output.topic=indexing
enrichment.error.topic=enrichments_error
threat.intel.error.topic=threatintel_error
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/dae102b0/metron-deployment/roles/metron-builder/tasks/main.yml
----------------------------------------------------------------------
diff --git a/metron-deployment/roles/metron-builder/tasks/main.yml b/metron-deployment/roles/metron-builder/tasks/main.yml
index 889eafe..3f4906e 100644
--- a/metron-deployment/roles/metron-builder/tasks/main.yml
+++ b/metron-deployment/roles/metron-builder/tasks/main.yml
@@ -16,6 +16,6 @@
#
---
- name: Build Deployment Artifacts
- local_action: shell cd {{ metron_build_dir }} && mvn clean package -DskipTests -P HDP-2.5.0.0,mpack,build-rpms
+ local_action: shell cd {{ metron_build_dir }} && mvn clean package -DskipTests -T 2C -P HDP-2.5.0.0,mpack,build-rpms
become: false
run_once: true
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/dae102b0/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/nonbulk/flatfile/importer/MapReduceImporter.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/nonbulk/flatfile/importer/MapReduceImporter.java b/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/nonbulk/flatfile/importer/MapReduceImporter.java
index e83bdd6..63a84cb 100644
--- a/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/nonbulk/flatfile/importer/MapReduceImporter.java
+++ b/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/nonbulk/flatfile/importer/MapReduceImporter.java
@@ -21,6 +21,7 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.mapreduce.TableOutputFormat;
import org.apache.hadoop.mapreduce.Job;
import org.apache.log4j.Logger;
@@ -66,6 +67,7 @@ public enum MapReduceImporter implements Importer{
job.setNumReduceTasks(0);
List<Path> paths = inputs.stream().map(p -> new Path(p)).collect(Collectors.toList());
handler.getInputFormat().set(job, paths, handler.getConfig());
+ TableMapReduceUtil.initCredentials(job);
try {
job.waitForCompletion(true);
} catch (Exception e) {
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/dae102b0/metron-platform/metron-elasticsearch/src/main/config/elasticsearch.properties
----------------------------------------------------------------------
diff --git a/metron-platform/metron-elasticsearch/src/main/config/elasticsearch.properties b/metron-platform/metron-elasticsearch/src/main/config/elasticsearch.properties
index 317742b..d45d3d4 100644
--- a/metron-platform/metron-elasticsearch/src/main/config/elasticsearch.properties
+++ b/metron-platform/metron-elasticsearch/src/main/config/elasticsearch.properties
@@ -17,6 +17,7 @@
##### Storm #####
indexing.workers=1
indexing.executors=0
+storm.auto.credentials=[]
##### Kafka #####
@@ -24,6 +25,7 @@ kafka.zk=node1:2181
kafka.broker=node1:6667
# One of EARLIEST, LATEST, UNCOMMITTED_EARLIEST, UNCOMMITTED_LATEST
kafka.start=UNCOMMITTED_EARLIEST
+kafka.security.protocol=PLAINTEXT
##### Indexing #####
index.input.topic=indexing
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/dae102b0/metron-platform/metron-enrichment/src/main/config/enrichment.properties
----------------------------------------------------------------------
diff --git a/metron-platform/metron-enrichment/src/main/config/enrichment.properties b/metron-platform/metron-enrichment/src/main/config/enrichment.properties
index c905d30..af5b27b 100644
--- a/metron-platform/metron-enrichment/src/main/config/enrichment.properties
+++ b/metron-platform/metron-enrichment/src/main/config/enrichment.properties
@@ -19,6 +19,9 @@
kafka.zk=node1:2181
kafka.broker=node1:6667
+kafka.security.protocol=PLAINTEXT
+storm.auto.credentials=[]
+
enrichment.output.topic=indexing
enrichment.error.topic=indexing
threat.intel.error.topic=indexing
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/dae102b0/metron-platform/metron-enrichment/src/main/flux/enrichment/remote.yaml
----------------------------------------------------------------------
diff --git a/metron-platform/metron-enrichment/src/main/flux/enrichment/remote.yaml b/metron-platform/metron-enrichment/src/main/flux/enrichment/remote.yaml
index 439105f..51fc7ce 100644
--- a/metron-platform/metron-enrichment/src/main/flux/enrichment/remote.yaml
+++ b/metron-platform/metron-enrichment/src/main/flux/enrichment/remote.yaml
@@ -18,8 +18,10 @@ name: "enrichment"
config:
topology.workers: 1
topology.acker.executors: 0
+ topology.auto-credentials: ${storm.auto.credentials}
components:
+
# Enrichment
- id: "stellarEnrichmentAdapter"
className: "org.apache.metron.enrichment.adapters.stellar.StellarAdapter"
@@ -28,6 +30,15 @@ components:
args:
- "ENRICHMENT"
+ # Any kafka props for the producer go here.
+ - id: "kafkaWriterProps"
+ className: "java.util.HashMap"
+ configMethods:
+ - name: "put"
+ args:
+ - "security.protocol"
+ - "${kafka.security.protocol}"
+
- id: "stellarEnrichment"
className: "org.apache.metron.enrichment.configuration.Enrichment"
constructorArgs:
@@ -100,6 +111,9 @@ components:
- name: "withZkQuorum"
args:
- "${kafka.zk}"
+ - name: "withProducerConfigs"
+ args:
+ - ref: "kafkaWriterProps"
# Threat Intel
- id: "stellarThreatIntelAdapter"
@@ -163,7 +177,9 @@ components:
- name: "withZkQuorum"
args:
- "${kafka.zk}"
-
+ - name: "withProducerConfigs"
+ args:
+ - ref: "kafkaWriterProps"
#indexing
- id: "kafkaWriter"
className: "org.apache.metron.writer.kafka.KafkaWriter"
@@ -174,9 +190,12 @@ components:
- name: "withZkQuorum"
args:
- "${kafka.zk}"
+ - name: "withProducerConfigs"
+ args:
+ - ref: "kafkaWriterProps"
#kafka/zookeeper
- # Any kafka props for the producer go here.
+ # Any kafka props for the consumer go here.
- id: "kafkaProps"
className: "java.util.HashMap"
configMethods:
@@ -192,6 +211,11 @@ components:
args:
- "group.id"
- "enrichments"
+ - name: "put"
+ args:
+ - "security.protocol"
+ - "${kafka.security.protocol}"
+
# The fields to pull out of the kafka messages
- id: "fields"
@@ -299,6 +323,7 @@ bolts:
args:
- ref: "enrichmentErrorKafkaWriter"
+
# Threat Intel Bolts
- id: "threatIntelSplitBolt"
className: "org.apache.metron.enrichment.bolt.ThreatIntelSplitterBolt"
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/dae102b0/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/integration/EnrichmentIntegrationTest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/integration/EnrichmentIntegrationTest.java b/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/integration/EnrichmentIntegrationTest.java
index e012c55..77b64dc 100644
--- a/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/integration/EnrichmentIntegrationTest.java
+++ b/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/integration/EnrichmentIntegrationTest.java
@@ -141,6 +141,8 @@ public class EnrichmentIntegrationTest extends BaseIntegrationTest {
setProperty("enrichment.simple.hbase.cf", cf);
setProperty("enrichment.output.topic", Constants.INDEXING_TOPIC);
setProperty("enrichment.error.topic", ERROR_TOPIC);
+ setProperty("kafka.security.protocol", "PLAINTEXT");
+ setProperty("storm.auto.credentials", "[]");
}};
final ZKServerComponent zkServerComponent = getZKServerComponent(topologyProperties);
final KafkaComponent kafkaComponent = getKafkaComponent(topologyProperties, new ArrayList<KafkaComponent.Topic>() {{
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/dae102b0/metron-platform/metron-hbase/pom.xml
----------------------------------------------------------------------
diff --git a/metron-platform/metron-hbase/pom.xml b/metron-platform/metron-hbase/pom.xml
index 36df5a3..4921859 100644
--- a/metron-platform/metron-hbase/pom.xml
+++ b/metron-platform/metron-hbase/pom.xml
@@ -118,6 +118,21 @@
</dependency>
<dependency>
<groupId>org.apache.storm</groupId>
+ <artifactId>storm-hbase</artifactId>
+ <version>${global_storm_version}</version>
+ <exclusions>
+ <exclusion>
+ <groupId>org.apache.hbase</groupId>
+ <artifactId>hbase-server</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.apache.hbase</groupId>
+ <artifactId>hbase-client</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.storm</groupId>
<artifactId>storm-core</artifactId>
<version>${global_storm_version}</version>
<scope>provided</scope>
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/dae102b0/metron-platform/metron-indexing/src/main/flux/indexing/remote.yaml
----------------------------------------------------------------------
diff --git a/metron-platform/metron-indexing/src/main/flux/indexing/remote.yaml b/metron-platform/metron-indexing/src/main/flux/indexing/remote.yaml
index 3905a7a..ec423c5 100644
--- a/metron-platform/metron-indexing/src/main/flux/indexing/remote.yaml
+++ b/metron-platform/metron-indexing/src/main/flux/indexing/remote.yaml
@@ -15,9 +15,11 @@
# limitations under the License.
name: "indexing"
+
config:
topology.workers: ${indexing.workers}
topology.acker.executors: ${indexing.executors}
+ topology.auto-credentials: ${storm.auto.credentials}
components:
@@ -49,6 +51,15 @@ components:
- name: "withRotationPolicy"
args:
- ref: "hdfsRotationPolicy"
+
+ - id: "kafkaWriterProps"
+ className: "java.util.HashMap"
+ configMethods:
+ - name: "put"
+ args:
+ - "security.protocol"
+ - "${kafka.security.protocol}"
+
- id: "kafkaWriter"
className: "org.apache.metron.writer.kafka.KafkaWriter"
configMethods:
@@ -58,6 +69,8 @@ components:
- name: "withZkQuorum"
args:
- "${kafka.zk}"
+ - name: "withProducerConfigs"
+ args: [ref: "kafkaWriterProps"]
- id: "indexWriter"
className: "${writer.class.name}"
@@ -79,6 +92,10 @@ components:
args:
- "group.id"
- "indexing"
+ - name: "put"
+ args:
+ - "security.protocol"
+ - "${kafka.security.protocol}"
# The fields to pull out of the kafka messages
- id: "fields"
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/dae102b0/metron-platform/metron-indexing/src/test/java/org/apache/metron/indexing/integration/IndexingIntegrationTest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-indexing/src/test/java/org/apache/metron/indexing/integration/IndexingIntegrationTest.java b/metron-platform/metron-indexing/src/test/java/org/apache/metron/indexing/integration/IndexingIntegrationTest.java
index 394fbf0..cc7d7e3 100644
--- a/metron-platform/metron-indexing/src/test/java/org/apache/metron/indexing/integration/IndexingIntegrationTest.java
+++ b/metron-platform/metron-indexing/src/test/java/org/apache/metron/indexing/integration/IndexingIntegrationTest.java
@@ -121,6 +121,8 @@ public abstract class IndexingIntegrationTest extends BaseIntegrationTest {
final List<byte[]> inputMessages = TestUtils.readSampleData(sampleParsedPath);
final Properties topologyProperties = new Properties() {{
setProperty("kafka.start", "UNCOMMITTED_EARLIEST");
+ setProperty("kafka.security.protocol", "PLAINTEXT");
+ setProperty("storm.auto.credentials", "[]");
setProperty("indexing.workers", "1");
setProperty("indexing.executors", "0");
setProperty("index.input.topic", Constants.INDEXING_TOPIC);
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/dae102b0/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/topology/ParserTopologyBuilder.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/topology/ParserTopologyBuilder.java b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/topology/ParserTopologyBuilder.java
index b347ca5..e9acbaa 100644
--- a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/topology/ParserTopologyBuilder.java
+++ b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/topology/ParserTopologyBuilder.java
@@ -64,7 +64,7 @@ public class ParserTopologyBuilder {
* @throws Exception
*/
public static TopologyBuilder build(String zookeeperUrl,
- String brokerUrl,
+ Optional<String> brokerUrl,
String sensorType,
int spoutParallelism,
int spoutNumTasks,
@@ -72,7 +72,8 @@ public class ParserTopologyBuilder {
int parserNumTasks,
int errorWriterParallelism,
int errorWriterNumTasks,
- Map<String, Object> kafkaSpoutConfig
+ Map<String, Object> kafkaSpoutConfig,
+ Optional<String> securityProtocol
) throws Exception {
// fetch configuration from zookeeper
@@ -81,19 +82,19 @@ public class ParserTopologyBuilder {
// create the spout
TopologyBuilder builder = new TopologyBuilder();
- KafkaSpout kafkaSpout = createKafkaSpout(zookeeperUrl, sensorType, Optional.ofNullable(kafkaSpoutConfig) , parserConfig);
+ KafkaSpout kafkaSpout = createKafkaSpout(zookeeperUrl, sensorType, securityProtocol, Optional.ofNullable(kafkaSpoutConfig) , parserConfig);
builder.setSpout("kafkaSpout", kafkaSpout, spoutParallelism)
.setNumTasks(spoutNumTasks);
// create the parser bolt
- ParserBolt parserBolt = createParserBolt(zookeeperUrl, brokerUrl, sensorType, configs, parserConfig);
+ ParserBolt parserBolt = createParserBolt(zookeeperUrl, brokerUrl, sensorType, securityProtocol, configs, parserConfig);
builder.setBolt("parserBolt", parserBolt, parserParallelism)
.setNumTasks(parserNumTasks)
.shuffleGrouping("kafkaSpout");
// create the error bolt, if needed
if (errorWriterNumTasks > 0) {
- WriterBolt errorBolt = createErrorBolt(brokerUrl, sensorType, configs, parserConfig);
+ WriterBolt errorBolt = createErrorBolt(zookeeperUrl, brokerUrl, sensorType, securityProtocol, configs, parserConfig);
builder.setBolt("errorMessageWriter", errorBolt, errorWriterParallelism)
.setNumTasks(errorWriterNumTasks)
.shuffleGrouping("parserBolt", Constants.ERROR_STREAM);
@@ -111,7 +112,13 @@ public class ParserTopologyBuilder {
* @param parserConfig Configuration for the parser
* @return
*/
- private static StormKafkaSpout<Object, Object> createKafkaSpout(String zkQuorum, String sensorType, Optional<Map<String, Object>> kafkaConfigOptional, SensorParserConfig parserConfig) {
+ private static StormKafkaSpout<Object, Object> createKafkaSpout( String zkQuorum
+ , String sensorType
+ , Optional<String> securityProtocol
+ , Optional<Map<String, Object>> kafkaConfigOptional
+ , SensorParserConfig parserConfig
+ )
+ {
Map<String, Object> kafkaSpoutConfigOptions = kafkaConfigOptional.orElse(new HashMap<>());
String inputTopic = parserConfig.getSensorTopic() != null ? parserConfig.getSensorTopic() : sensorType;
kafkaSpoutConfigOptions.putIfAbsent( SpoutConfiguration.FIRST_POLL_OFFSET_STRATEGY.key
@@ -120,9 +127,32 @@ public class ParserTopologyBuilder {
kafkaSpoutConfigOptions.putIfAbsent( KafkaSpoutConfig.Consumer.GROUP_ID
, inputTopic + "_parser"
);
+ if(securityProtocol.isPresent()) {
+ kafkaSpoutConfigOptions.putIfAbsent("security.protocol", securityProtocol.get());
+ }
return SimpleStormKafkaBuilder.create(inputTopic, zkQuorum, Arrays.asList("value"), kafkaSpoutConfigOptions);
}
+ private static KafkaWriter createKafkaWriter( Optional<String> broker
+ , String zkQuorum
+ , Optional<String> securityProtocol
+ )
+ {
+ KafkaWriter ret = null;
+ if(broker.isPresent()) {
+ ret = new KafkaWriter(broker.get());
+ }
+ else {
+ ret = new KafkaWriter().withZkQuorum(zkQuorum);
+ }
+ if(securityProtocol.isPresent()) {
+ HashMap<String, Object> config = new HashMap<>();
+ config.put("security.protocol", securityProtocol.get());
+ ret.withProducerConfigs(config);
+ }
+ return ret;
+ }
+
/**
* Create a bolt that parses input from a sensor.
*
@@ -133,7 +163,14 @@ public class ParserTopologyBuilder {
* @param parserConfig
* @return A Storm bolt that parses input from a sensor
*/
- private static ParserBolt createParserBolt(String zookeeperUrl, String brokerUrl, String sensorType, ParserConfigurations configs, SensorParserConfig parserConfig) {
+ private static ParserBolt createParserBolt( String zookeeperUrl
+ , Optional<String> brokerUrl
+ , String sensorType
+ , Optional<String> securityProtocol
+ , ParserConfigurations configs
+ , SensorParserConfig parserConfig
+ )
+ {
// create message parser
MessageParser<JSONObject> parser = ReflectionUtils.createInstance(parserConfig.getParserClassName());
@@ -141,7 +178,10 @@ public class ParserTopologyBuilder {
// create writer - if not configured uses a sensible default
AbstractWriter writer = parserConfig.getWriterClassName() == null ?
- new KafkaWriter(brokerUrl).withTopic(Constants.ENRICHMENT_TOPIC) :
+ createKafkaWriter( brokerUrl
+ , zookeeperUrl
+ , securityProtocol
+ ).withTopic(Constants.ENRICHMENT_TOPIC) :
ReflectionUtils.createInstance(parserConfig.getWriterClassName());
writer.configure(sensorType, new ParserWriterConfiguration(configs));
@@ -154,17 +194,30 @@ public class ParserTopologyBuilder {
/**
* Create a bolt that handles error messages.
*
+ * @param zookeeperUrl Kafka zookeeper URL
* @param brokerUrl Kafka Broker URL
* @param sensorType Type of sensor that is being consumed.
+ * @param securityProtocol Security protocol used (if any)
* @param configs
* @param parserConfig
* @return A Storm bolt that handles error messages.
*/
- private static WriterBolt createErrorBolt(String brokerUrl, String sensorType, ParserConfigurations configs, SensorParserConfig parserConfig) {
+ private static WriterBolt createErrorBolt( String zookeeperUrl
+ , Optional<String> brokerUrl
+ , String sensorType
+ , Optional<String> securityProtocol
+ , ParserConfigurations configs
+ , SensorParserConfig parserConfig
+ )
+ {
// create writer - if not configured uses a sensible default
- AbstractWriter writer = parserConfig.getErrorWriterClassName() == null
- ? new KafkaWriter(brokerUrl).withTopic((String) configs.getGlobalConfig().get("parser.error.topic")).withConfigPrefix("error")
+ AbstractWriter writer = parserConfig.getErrorWriterClassName() == null ?
+ createKafkaWriter( brokerUrl
+ , zookeeperUrl
+ , securityProtocol
+ ).withTopic((String) configs.getGlobalConfig().get("parser.error.topic"))
+ .withConfigPrefix("error")
: ReflectionUtils.createInstance(parserConfig.getWriterClassName());
writer.configure(sensorType, new ParserWriterConfiguration(configs));
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/dae102b0/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/topology/ParserTopologyCLI.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/topology/ParserTopologyCLI.java b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/topology/ParserTopologyCLI.java
index 8cf921e..d83146f 100644
--- a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/topology/ParserTopologyCLI.java
+++ b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/topology/ParserTopologyCLI.java
@@ -17,10 +17,13 @@
*/
package org.apache.metron.parsers.topology;
+import com.google.common.collect.ImmutableList;
import org.apache.metron.storm.kafka.flux.SpoutConfiguration;
import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.StormSubmitter;
+import org.apache.storm.hbase.security.AutoHBase;
+import org.apache.storm.hdfs.common.security.AutoHDFS;
import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.utils.Utils;
import com.fasterxml.jackson.core.type.TypeReference;
@@ -33,9 +36,7 @@ import org.apache.metron.parsers.topology.config.ConfigHandlers;
import java.io.File;
import java.io.IOException;
-import java.util.EnumMap;
-import java.util.HashMap;
-import java.util.Map;
+import java.util.*;
import java.util.function.Function;
public class ParserTopologyCLI {
@@ -55,7 +56,7 @@ public class ParserTopologyCLI {
BROKER_URL("k", code -> {
Option o = new Option(code, "kafka", true, "Kafka Broker URL");
o.setArgName("BROKER_URL");
- o.setRequired(true);
+ o.setRequired(false);
return o;
}),
SENSOR_TYPE("s", code -> {
@@ -176,6 +177,18 @@ public class ParserTopologyCLI {
return o;
}
)
+ ,SECURITY_PROTOCOL("ksp", code -> {
+ Option o = new Option(code
+ , "kafka_security_protocol"
+ , true
+ , "The kafka security protocol to use (if running with a kerberized cluster). E.g. PLAINTEXTSASL"
+ );
+ o.setArgName("SECURITY_PROTOCOL");
+ o.setRequired(false);
+ o.setType(String.class);
+ return o;
+ }
+ )
,TEST("t", code ->
{
Option o = new Option("t", "test", true, "Run in Test Mode");
@@ -270,7 +283,7 @@ public class ParserTopologyCLI {
System.exit(0);
}
String zookeeperUrl = ParserOptions.ZK_QUORUM.get(cmd);;
- String brokerUrl = ParserOptions.BROKER_URL.get(cmd);
+ Optional<String> brokerUrl = ParserOptions.BROKER_URL.has(cmd)?Optional.of(ParserOptions.BROKER_URL.get(cmd)):Optional.empty();
String sensorType= ParserOptions.SENSOR_TYPE.get(cmd);
int spoutParallelism = Integer.parseInt(ParserOptions.SPOUT_PARALLELISM.get(cmd, "1"));
int spoutNumTasks = Integer.parseInt(ParserOptions.SPOUT_NUM_TASKS.get(cmd, "1"));
@@ -284,7 +297,8 @@ public class ParserTopologyCLI {
if(ParserOptions.SPOUT_CONFIG.has(cmd)) {
spoutConfig = readSpoutConfig(new File(ParserOptions.SPOUT_CONFIG.get(cmd)));
}
-
+ Optional<String> securityProtocol = ParserOptions.SECURITY_PROTOCOL.has(cmd)?Optional.of(ParserOptions.SECURITY_PROTOCOL.get(cmd)):Optional.empty();
+ securityProtocol = getSecurityProtocol(securityProtocol, spoutConfig);
TopologyBuilder builder = ParserTopologyBuilder.build(zookeeperUrl,
brokerUrl,
sensorType,
@@ -294,10 +308,18 @@ public class ParserTopologyCLI {
parserNumTasks,
errorParallelism,
errorNumTasks,
- spoutConfig
+ spoutConfig,
+ securityProtocol
);
Config stormConf = ParserOptions.getConfig(cmd);
-
+ if(securityProtocol.isPresent() && !stormConf.containsKey(Config.TOPOLOGY_AUTO_CREDENTIALS)) {
+ //if I'm specifying it already, then I won't impose autohdfs and autohbase
+ List<String> autoCredentials = new ArrayList<>();
+ for (String credential : ImmutableList.of(AutoHDFS.class.getName(), AutoHBase.class.getName())) {
+ autoCredentials.add(credential);
+ }
+ stormConf.put( Config.TOPOLOGY_AUTO_CREDENTIALS , autoCredentials );
+ }
if (ParserOptions.TEST.has(cmd)) {
stormConf.put(Config.TOPOLOGY_DEBUG, true);
LocalCluster cluster = new LocalCluster();
@@ -312,6 +334,21 @@ public class ParserTopologyCLI {
System.exit(-1);
}
}
+
+ private static Optional<String> getSecurityProtocol(Optional<String> protocol, Map<String, Object> spoutConfig) {
+ Optional<String> ret = protocol;
+ if(ret.isPresent() && protocol.get().equalsIgnoreCase("PLAINTEXT")) {
+ ret = Optional.empty();
+ }
+ if(!ret.isPresent()) {
+ ret = Optional.ofNullable((String) spoutConfig.get("security.protocol"));
+ }
+ if(ret.isPresent() && protocol.get().equalsIgnoreCase("PLAINTEXT")) {
+ ret = Optional.empty();
+ }
+ return ret;
+ }
+
private static Map<String, Object> readSpoutConfig(File inputFile) {
String json = null;
if (inputFile.exists()) {
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/dae102b0/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/integration/components/ParserTopologyComponent.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/integration/components/ParserTopologyComponent.java b/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/integration/components/ParserTopologyComponent.java
index 48bcbec..b6a76d0 100644
--- a/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/integration/components/ParserTopologyComponent.java
+++ b/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/integration/components/ParserTopologyComponent.java
@@ -33,10 +33,7 @@ import java.nio.file.FileVisitOption;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
-import java.util.Comparator;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Properties;
+import java.util.*;
import static org.apache.metron.integration.components.FluxTopologyComponent.assassinateSlots;
import static org.apache.metron.integration.components.FluxTopologyComponent.cleanupWorkerDir;
@@ -82,7 +79,7 @@ public class ParserTopologyComponent implements InMemoryComponent {
public void start() throws UnableToStartException {
try {
TopologyBuilder topologyBuilder = ParserTopologyBuilder.build(topologyProperties.getProperty("kafka.zk")
- , brokerUrl
+ , Optional.ofNullable(brokerUrl)
, sensorType
, 1
, 1
@@ -91,6 +88,7 @@ public class ParserTopologyComponent implements InMemoryComponent {
, 1
, 1
, null
+ , Optional.empty()
);
Map<String, Object> stormConf = new HashMap<>();
stormConf.put(Config.TOPOLOGY_DEBUG, true);
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/dae102b0/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/topology/ParserTopologyCLITest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/topology/ParserTopologyCLITest.java b/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/topology/ParserTopologyCLITest.java
index 5e70177..ac73a2b 100644
--- a/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/topology/ParserTopologyCLITest.java
+++ b/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/topology/ParserTopologyCLITest.java
@@ -34,9 +34,7 @@ import org.junit.Test;
import java.io.File;
import java.io.IOException;
-import java.util.ArrayList;
-import java.util.EnumMap;
-import java.util.Map;
+import java.util.*;
public class ParserTopologyCLITest {
@@ -75,6 +73,20 @@ public class ParserTopologyCLITest {
}
}
+ @Test
+ public void testNoOverlappingArgs() throws Exception {
+ Set<String> optionStrs = new HashSet<>();
+ for(ParserTopologyCLI.ParserOptions option : ParserTopologyCLI.ParserOptions.values()) {
+ if(optionStrs.contains(option.option.getLongOpt())) {
+ throw new IllegalStateException("Reused long option: " + option.option.getLongOpt());
+ }
+ if(optionStrs.contains(option.shortCode)) {
+ throw new IllegalStateException("Reused short option: " + option.shortCode);
+ }
+ optionStrs.add(option.option.getLongOpt());
+ optionStrs.add(option.shortCode);
+ }
+ }
@Test
public void testKafkaOffset_happyPath() throws ParseException {
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/dae102b0/metron-platform/metron-pcap-backend/src/main/config/pcap.properties
----------------------------------------------------------------------
diff --git a/metron-platform/metron-pcap-backend/src/main/config/pcap.properties b/metron-platform/metron-pcap-backend/src/main/config/pcap.properties
index 48810c5..6e51dc5 100644
--- a/metron-platform/metron-pcap-backend/src/main/config/pcap.properties
+++ b/metron-platform/metron-pcap-backend/src/main/config/pcap.properties
@@ -15,7 +15,9 @@
# limitations under the License.
spout.kafka.topic.pcap=pcap
+storm.auto.credentials=[]
kafka.zk=node1:2181
+kafka.security.protocol=PLAINTEXT
kafka.pcap.start=END
kafka.pcap.numPackets=1000
kafka.pcap.maxTimeMS=300000
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/dae102b0/metron-platform/metron-pcap-backend/src/main/flux/pcap/remote.yaml
----------------------------------------------------------------------
diff --git a/metron-platform/metron-pcap-backend/src/main/flux/pcap/remote.yaml b/metron-platform/metron-pcap-backend/src/main/flux/pcap/remote.yaml
index 732991b..2b7e0fd 100644
--- a/metron-platform/metron-pcap-backend/src/main/flux/pcap/remote.yaml
+++ b/metron-platform/metron-pcap-backend/src/main/flux/pcap/remote.yaml
@@ -17,6 +17,7 @@
name: "pcap"
config:
topology.workers: 1
+ topology.auto-credentials: ${storm.auto.credentials}
components:
@@ -36,6 +37,11 @@ components:
args:
- "group.id"
- "pcap"
+ - name: "put"
+ args:
+ - "security.protocol"
+ - "${kafka.security.protocol}"
+
- id: "kafkaConfig"
className: "org.apache.metron.storm.kafka.flux.SimpleStormKafkaBuilder"
constructorArgs:
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/dae102b0/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/integration/PcapTopologyIntegrationTest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/integration/PcapTopologyIntegrationTest.java b/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/integration/PcapTopologyIntegrationTest.java
index 8b292d7..84e7574 100644
--- a/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/integration/PcapTopologyIntegrationTest.java
+++ b/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/integration/PcapTopologyIntegrationTest.java
@@ -212,6 +212,8 @@ public class PcapTopologyIntegrationTest {
setProperty("kafka.pcap.numPackets", "2");
setProperty("kafka.pcap.maxTimeMS", "200000000");
setProperty("kafka.pcap.ts_granularity", "NANOSECONDS");
+ setProperty("storm.auto.credentials", "[]");
+ setProperty("kafka.security.protocol", "PLAINTEXT");
}};
updatePropertiesCallback.apply(topologyProperties);
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/dae102b0/metron-platform/metron-solr/src/main/config/solr.properties
----------------------------------------------------------------------
diff --git a/metron-platform/metron-solr/src/main/config/solr.properties b/metron-platform/metron-solr/src/main/config/solr.properties
index 35f368c..914832d 100644
--- a/metron-platform/metron-solr/src/main/config/solr.properties
+++ b/metron-platform/metron-solr/src/main/config/solr.properties
@@ -17,6 +17,7 @@
##### Storm #####
indexing.workers=1
indexing.executors=0
+storm.auto.credentials=[]
##### Kafka #####
@@ -24,6 +25,7 @@ kafka.zk=node1:2181
kafka.broker=node1:6667
# One of EARLIEST, LATEST, UNCOMMITTED_EARLIEST, UNCOMMITTED_LATEST
kafka.start=UNCOMMITTED_EARLIEST
+kafka.security.protocol=PLAINTEXT
##### Indexing #####
index.input.topic=indexing
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/dae102b0/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/hdfs/SourceHandler.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/hdfs/SourceHandler.java b/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/hdfs/SourceHandler.java
index f03ac41..ba3f96c 100644
--- a/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/hdfs/SourceHandler.java
+++ b/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/hdfs/SourceHandler.java
@@ -92,9 +92,7 @@ public class SourceHandler {
}
private void initialize(Map config) throws IOException {
- Configuration hdfsConfig = new Configuration();
this.fs = FileSystem.get(new Configuration());
- HdfsSecurityUtil.login(config, hdfsConfig);
this.currentFile = createOutputFile();
if(this.rotationPolicy instanceof TimedRotationPolicy){
long interval = ((TimedRotationPolicy)this.rotationPolicy).getInterval();
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/dae102b0/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/kafka/KafkaWriter.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/kafka/KafkaWriter.java b/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/kafka/KafkaWriter.java
index 5c00e52..1884f5d 100644
--- a/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/kafka/KafkaWriter.java
+++ b/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/kafka/KafkaWriter.java
@@ -105,7 +105,12 @@ public class KafkaWriter extends AbstractWriter implements MessageWriter<JSONObj
}
public KafkaWriter withProducerConfigs(Map<String, Object> extraConfigs) {
- this.producerConfigs = extraConfigs;
+ if(producerConfigs == null) {
+ this.producerConfigs = extraConfigs;
+ }
+ else if(extraConfigs != null){
+ producerConfigs.putAll(extraConfigs);
+ }
return this;
}
[3/3] incubator-metron git commit: METRON-797: Pass security.protocol
and enable auto-renew for the storm topologies closes
apache/incubator-metron#495
Posted by ce...@apache.org.
METRON-797: Pass security.protocol and enable auto-renew for the storm topologies closes apache/incubator-metron#495
Project: http://git-wip-us.apache.org/repos/asf/incubator-metron/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-metron/commit/aef84636
Tree: http://git-wip-us.apache.org/repos/asf/incubator-metron/tree/aef84636
Diff: http://git-wip-us.apache.org/repos/asf/incubator-metron/diff/aef84636
Branch: refs/heads/master
Commit: aef84636a3427da20b1d54acfe1b8de23e2aaf97
Parents: e03636d
Author: mmiklavc <mi...@gmail.com>
Authored: Wed Mar 29 09:23:09 2017 -0400
Committer: cstella <ce...@gmail.com>
Committed: Thu Mar 30 18:16:06 2017 -0400
----------------------------------------------------------------------
.../metron-profiler/src/main/config/profiler.properties | 2 +-
.../metron-profiler/src/main/flux/profiler/remote.yaml | 2 +-
.../METRON/CURRENT/package/templates/enrichment.properties.j2 | 2 +-
.../metron-elasticsearch/src/main/config/elasticsearch.properties | 2 +-
.../metron-enrichment/src/main/config/enrichment.properties | 2 +-
.../metron-enrichment/src/main/flux/enrichment/remote.yaml | 2 +-
metron-platform/metron-indexing/src/main/flux/indexing/remote.yaml | 2 +-
7 files changed, 7 insertions(+), 7 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/aef84636/metron-analytics/metron-profiler/src/main/config/profiler.properties
----------------------------------------------------------------------
diff --git a/metron-analytics/metron-profiler/src/main/config/profiler.properties b/metron-analytics/metron-profiler/src/main/config/profiler.properties
index b79ac73..f020b30 100644
--- a/metron-analytics/metron-profiler/src/main/config/profiler.properties
+++ b/metron-analytics/metron-profiler/src/main/config/profiler.properties
@@ -20,7 +20,7 @@
##### Storm #####
-storm.auto.credentials=[]
+topology.worker.childopts=
##### Profiler #####
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/aef84636/metron-analytics/metron-profiler/src/main/flux/profiler/remote.yaml
----------------------------------------------------------------------
diff --git a/metron-analytics/metron-profiler/src/main/flux/profiler/remote.yaml b/metron-analytics/metron-profiler/src/main/flux/profiler/remote.yaml
index 0b14bce..4c96b08 100644
--- a/metron-analytics/metron-profiler/src/main/flux/profiler/remote.yaml
+++ b/metron-analytics/metron-profiler/src/main/flux/profiler/remote.yaml
@@ -17,7 +17,7 @@
name: "profiler"
config:
- topology.auto-credentials: ${storm.auto.credentials}
+ topology.worker.childopts: ${topology.worker.childopts}
topology.workers: ${profiler.workers}
topology.acker.executors: ${profiler.executors}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/aef84636/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/templates/enrichment.properties.j2
----------------------------------------------------------------------
diff --git a/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/templates/enrichment.properties.j2 b/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/templates/enrichment.properties.j2
index 8fc2335..508bce9 100755
--- a/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/templates/enrichment.properties.j2
+++ b/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/templates/enrichment.properties.j2
@@ -20,7 +20,7 @@
kafka.zk={{zookeeper_quorum}}
kafka.broker={{kafka_brokers}}
kafka.security.protocol=PLAINTEXT
-storm.auto.credentials=[]
+topology.worker.childopts=
enrichment.output.topic=indexing
enrichment.error.topic=enrichments_error
threat.intel.error.topic=threatintel_error
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/aef84636/metron-platform/metron-elasticsearch/src/main/config/elasticsearch.properties
----------------------------------------------------------------------
diff --git a/metron-platform/metron-elasticsearch/src/main/config/elasticsearch.properties b/metron-platform/metron-elasticsearch/src/main/config/elasticsearch.properties
index d45d3d4..47a5092 100644
--- a/metron-platform/metron-elasticsearch/src/main/config/elasticsearch.properties
+++ b/metron-platform/metron-elasticsearch/src/main/config/elasticsearch.properties
@@ -17,7 +17,7 @@
##### Storm #####
indexing.workers=1
indexing.executors=0
-storm.auto.credentials=[]
+topology.worker.childopts=
##### Kafka #####
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/aef84636/metron-platform/metron-enrichment/src/main/config/enrichment.properties
----------------------------------------------------------------------
diff --git a/metron-platform/metron-enrichment/src/main/config/enrichment.properties b/metron-platform/metron-enrichment/src/main/config/enrichment.properties
index af5b27b..e293201 100644
--- a/metron-platform/metron-enrichment/src/main/config/enrichment.properties
+++ b/metron-platform/metron-enrichment/src/main/config/enrichment.properties
@@ -20,7 +20,7 @@
kafka.zk=node1:2181
kafka.broker=node1:6667
kafka.security.protocol=PLAINTEXT
-storm.auto.credentials=[]
+topology.worker.childopts=
enrichment.output.topic=indexing
enrichment.error.topic=indexing
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/aef84636/metron-platform/metron-enrichment/src/main/flux/enrichment/remote.yaml
----------------------------------------------------------------------
diff --git a/metron-platform/metron-enrichment/src/main/flux/enrichment/remote.yaml b/metron-platform/metron-enrichment/src/main/flux/enrichment/remote.yaml
index 51fc7ce..82863ed 100644
--- a/metron-platform/metron-enrichment/src/main/flux/enrichment/remote.yaml
+++ b/metron-platform/metron-enrichment/src/main/flux/enrichment/remote.yaml
@@ -18,7 +18,7 @@ name: "enrichment"
config:
topology.workers: 1
topology.acker.executors: 0
- topology.auto-credentials: ${storm.auto.credentials}
+ topology.worker.childopts: ${topology.worker.childopts}
components:
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/aef84636/metron-platform/metron-indexing/src/main/flux/indexing/remote.yaml
----------------------------------------------------------------------
diff --git a/metron-platform/metron-indexing/src/main/flux/indexing/remote.yaml b/metron-platform/metron-indexing/src/main/flux/indexing/remote.yaml
index ec423c5..25ecdab 100644
--- a/metron-platform/metron-indexing/src/main/flux/indexing/remote.yaml
+++ b/metron-platform/metron-indexing/src/main/flux/indexing/remote.yaml
@@ -19,7 +19,7 @@ name: "indexing"
config:
topology.workers: ${indexing.workers}
topology.acker.executors: ${indexing.executors}
- topology.auto-credentials: ${storm.auto.credentials}
+ topology.worker.childopts: ${topology.worker.childopts}
components:
[2/3] incubator-metron git commit: METRON-797: Pass security.protocol
and enable auto-renew for the storm topologies
Posted by ce...@apache.org.
METRON-797: Pass security.protocol and enable auto-renew for the storm topologies
Project: http://git-wip-us.apache.org/repos/asf/incubator-metron/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-metron/commit/e03636d8
Tree: http://git-wip-us.apache.org/repos/asf/incubator-metron/tree/e03636d8
Diff: http://git-wip-us.apache.org/repos/asf/incubator-metron/diff/e03636d8
Branch: refs/heads/master
Commit: e03636d8d3fa18bbd3ed53cd55e66b81eb84603d
Parents: dae102b
Author: justinleet <ju...@gmail.com>
Authored: Wed Mar 29 09:16:34 2017 -0400
Committer: cstella <ce...@gmail.com>
Committed: Wed Mar 29 09:16:34 2017 -0400
----------------------------------------------------------------------
.../apache/metron/parsers/topology/ParserTopologyCLI.java | 9 ---------
1 file changed, 9 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/e03636d8/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/topology/ParserTopologyCLI.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/topology/ParserTopologyCLI.java b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/topology/ParserTopologyCLI.java
index d83146f..7523333 100644
--- a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/topology/ParserTopologyCLI.java
+++ b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/topology/ParserTopologyCLI.java
@@ -17,7 +17,6 @@
*/
package org.apache.metron.parsers.topology;
-import com.google.common.collect.ImmutableList;
import org.apache.metron.storm.kafka.flux.SpoutConfiguration;
import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
@@ -312,14 +311,6 @@ public class ParserTopologyCLI {
securityProtocol
);
Config stormConf = ParserOptions.getConfig(cmd);
- if(securityProtocol.isPresent() && !stormConf.containsKey(Config.TOPOLOGY_AUTO_CREDENTIALS)) {
- //if I'm specifying it already, then I won't impose autohdfs and autohbase
- List<String> autoCredentials = new ArrayList<>();
- for (String credential : ImmutableList.of(AutoHDFS.class.getName(), AutoHBase.class.getName())) {
- autoCredentials.add(credential);
- }
- stormConf.put( Config.TOPOLOGY_AUTO_CREDENTIALS , autoCredentials );
- }
if (ParserOptions.TEST.has(cmd)) {
stormConf.put(Config.TOPOLOGY_DEBUG, true);
LocalCluster cluster = new LocalCluster();