You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@metron.apache.org by ce...@apache.org on 2016/03/21 18:06:00 UTC
[18/43] incubator-metron git commit: METRON-56 Create unified
enrichment topology (merrimanr via cestella) closes
apache/incubator-metron#33
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/9f96399d/metron-streaming/Metron-Testing/src/main/java/org/apache/metron/integration/util/integration/components/KafkaWithZKComponent.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Testing/src/main/java/org/apache/metron/integration/util/integration/components/KafkaWithZKComponent.java b/metron-streaming/Metron-Testing/src/main/java/org/apache/metron/integration/util/integration/components/KafkaWithZKComponent.java
new file mode 100644
index 0000000..83ecd42
--- /dev/null
+++ b/metron-streaming/Metron-Testing/src/main/java/org/apache/metron/integration/util/integration/components/KafkaWithZKComponent.java
@@ -0,0 +1,228 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.integration.util.integration.components;
+
+
+import com.google.common.base.Function;
+import kafka.Kafka;
+import kafka.admin.AdminUtils;
+import kafka.api.FetchRequest;
+import kafka.api.FetchRequestBuilder;
+import kafka.consumer.ConsumerConfig;
+import kafka.consumer.ConsumerIterator;
+import kafka.consumer.KafkaStream;
+import kafka.javaapi.FetchResponse;
+import kafka.javaapi.consumer.ConsumerConnector;
+import kafka.javaapi.consumer.SimpleConsumer;
+import kafka.message.MessageAndOffset;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import kafka.server.KafkaConfig;
+import kafka.server.KafkaServer;
+import kafka.utils.*;
+import kafka.zk.EmbeddedZookeeper;
+import org.I0Itec.zkclient.ZkClient;
+import org.apache.metron.integration.util.integration.InMemoryComponent;
+import org.apache.zookeeper.KeeperException;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.*;
+
+
+public class KafkaWithZKComponent implements InMemoryComponent {
+
+
+ public static class Topic {
+ public int numPartitions;
+ public String name;
+
+ public Topic(String name, int numPartitions) {
+ this.numPartitions = numPartitions;
+ this.name = name;
+ }
+ }
+ private transient KafkaServer kafkaServer;
+ private transient EmbeddedZookeeper zkServer;
+ private transient ZkClient zkClient;
+ private transient ConsumerConnector consumer;
+ private String zookeeperConnectString;
+ private int brokerPort = 6667;
+ private List<Topic> topics = Collections.emptyList();
+ private Function<KafkaWithZKComponent, Void> postStartCallback;
+
+ public KafkaWithZKComponent withPostStartCallback(Function<KafkaWithZKComponent, Void> f) {
+ postStartCallback = f;
+ return this;
+ }
+
+ public KafkaWithZKComponent withExistingZookeeper(String zookeeperConnectString) {
+ this.zookeeperConnectString = zookeeperConnectString;
+ return this;
+ }
+
+ public KafkaWithZKComponent withBrokerPort(int brokerPort) {
+ if(brokerPort <= 0)
+ {
+ brokerPort = TestUtils.choosePort();
+ }
+ this.brokerPort = brokerPort;
+ return this;
+ }
+
+ public KafkaWithZKComponent withTopics(List<Topic> topics) {
+ this.topics = topics;
+ return this;
+ }
+
+ public List<Topic> getTopics() {
+ return topics;
+ }
+
+ public int getBrokerPort() {
+ return brokerPort;
+ }
+
+
+ public String getBrokerList() {
+ return "localhost:" + brokerPort;
+ }
+
+ public KafkaProducer<String, byte[]> createProducer()
+ {
+ return createProducer(new HashMap<String, Object>());
+ }
+
+ public KafkaProducer<String, byte[]> createProducer(Map<String, Object> properties)
+ {
+ Map<String, Object> producerConfig = new HashMap<>();
+ producerConfig.put("bootstrap.servers", getBrokerList());
+ producerConfig.put("key.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
+ producerConfig.put("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
+ producerConfig.put("request.required.acks", "-1");
+ producerConfig.put("fetch.message.max.bytes", ""+ 1024*1024*10);
+ producerConfig.put("replica.fetch.max.bytes", "" + 1024*1024*10);
+ producerConfig.put("message.max.bytes", "" + 1024*1024*10);
+ producerConfig.put("message.send.max.retries", "10");
+ producerConfig.putAll(properties);
+ return new KafkaProducer<>(producerConfig);
+ }
+
+ @Override
+ public void start() {
+ // setup Zookeeper
+ if(zookeeperConnectString == null) {
+ String zkConnect = TestZKUtils.zookeeperConnect();
+ zkServer = new EmbeddedZookeeper(zkConnect);
+ zookeeperConnectString = zkServer.connectString();
+ }
+ zkClient = new ZkClient(zookeeperConnectString, 30000, 30000, ZKStringSerializer$.MODULE$);
+
+ // setup Broker
+ Properties props = TestUtils.createBrokerConfig(0, brokerPort, true);
+ KafkaConfig config = new KafkaConfig(props);
+ Time mock = new MockTime();
+ kafkaServer = TestUtils.createServer(config, mock);
+ for(Topic topic : getTopics()) {
+ try {
+ createTopic(topic.name, topic.numPartitions, true);
+ } catch (InterruptedException e) {
+ throw new RuntimeException("Unable to create topic", e);
+ }
+ }
+ postStartCallback.apply(this);
+ }
+
+ public String getZookeeperConnect() {
+ return zookeeperConnectString;
+ }
+
+ @Override
+ public void stop() {
+ kafkaServer.shutdown();
+ zkClient.close();
+ if(zkServer != null) {
+ zkServer.shutdown();
+ }
+
+ }
+
+ public List<byte[]> readMessages(String topic) {
+ SimpleConsumer consumer = new SimpleConsumer("localhost", 6667, 100000, 64 * 1024, "consumer");
+ FetchRequest req = new FetchRequestBuilder()
+ .clientId("consumer")
+ .addFetch(topic, 0, 0, 100000)
+ .build();
+ FetchResponse fetchResponse = consumer.fetch(req);
+ Iterator<MessageAndOffset> results = fetchResponse.messageSet(topic, 0).iterator();
+ List<byte[]> messages = new ArrayList<>();
+ while(results.hasNext()) {
+ ByteBuffer payload = results.next().message().payload();
+ byte[] bytes = new byte[payload.limit()];
+ payload.get(bytes);
+ messages.add(bytes);
+ }
+ return messages;
+ }
+
+ public ConsumerIterator<byte[], byte[]> getStreamIterator(String topic) {
+ return getStreamIterator(topic, "group0", "consumer0");
+ }
+ public ConsumerIterator<byte[], byte[]> getStreamIterator(String topic, String group, String consumerName) {
+ // setup simple consumer
+ Properties consumerProperties = TestUtils.createConsumerProperties(zkServer.connectString(), group, consumerName, -1);
+ consumer = kafka.consumer.Consumer.createJavaConsumerConnector(new ConsumerConfig(consumerProperties));
+ Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
+ topicCountMap.put(topic, 1);
+ Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap);
+ KafkaStream<byte[], byte[]> stream = consumerMap.get(topic).get(0);
+ ConsumerIterator<byte[], byte[]> iterator = stream.iterator();
+ return iterator;
+ }
+
+ public void shutdownConsumer() {
+ consumer.shutdown();
+ }
+
+ public void createTopic(String name) throws InterruptedException {
+ createTopic(name, 1, true);
+ }
+
+ public void waitUntilMetadataIsPropagated(String topic, int numPartitions) {
+ List<KafkaServer> servers = new ArrayList<>();
+ servers.add(kafkaServer);
+ for(int part = 0;part < numPartitions;++part) {
+ TestUtils.waitUntilMetadataIsPropagated(scala.collection.JavaConversions.asScalaBuffer(servers), topic, part, 5000);
+ }
+ }
+
+ public void createTopic(String name, int numPartitions, boolean waitUntilMetadataIsPropagated) throws InterruptedException {
+ AdminUtils.createTopic(zkClient, name, numPartitions, 1, new Properties());
+ if(waitUntilMetadataIsPropagated) {
+ waitUntilMetadataIsPropagated(name, numPartitions);
+ }
+ }
+
+ public void writeMessages(String topic, List<byte[]> messages) {
+ KafkaProducer<String, byte[]> kafkaProducer = createProducer();
+ for(byte[] message: messages) {
+ kafkaProducer.send(new ProducerRecord<String, byte[]>(topic, message));
+ }
+ kafkaProducer.close();
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/9f96399d/metron-streaming/Metron-Testing/src/main/java/org/apache/metron/integration/util/integration/util/KafkaUtil.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Testing/src/main/java/org/apache/metron/integration/util/integration/util/KafkaUtil.java b/metron-streaming/Metron-Testing/src/main/java/org/apache/metron/integration/util/integration/util/KafkaUtil.java
new file mode 100644
index 0000000..bf2ef4f
--- /dev/null
+++ b/metron-streaming/Metron-Testing/src/main/java/org/apache/metron/integration/util/integration/util/KafkaUtil.java
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.integration.util.integration.util;
+
+
+import kafka.consumer.ConsumerIterator;
+import kafka.consumer.KafkaStream;
+import kafka.javaapi.producer.Producer;
+import kafka.producer.KeyedMessage;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class KafkaUtil {
+ public static <K,V> void send(Producer<K,V> producer, K key, V value, String topic) {
+ producer.send(new KeyedMessage<>(topic, key,value));
+ }
+
+ public static <K,V> void send(Producer<K,V> producer, Iterable<Map.Entry<K,V>> messages, String topic) {
+ for(Map.Entry<K,V> kv : messages) {
+ send(producer, kv.getKey(), kv.getValue(), topic);
+ }
+ }
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/9f96399d/metron-streaming/Metron-Topologies/src/main/java/org/apache/metron/utils/KafkaLoader.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Topologies/src/main/java/org/apache/metron/utils/KafkaLoader.java b/metron-streaming/Metron-Topologies/src/main/java/org/apache/metron/utils/KafkaLoader.java
new file mode 100644
index 0000000..4f53e5a
--- /dev/null
+++ b/metron-streaming/Metron-Topologies/src/main/java/org/apache/metron/utils/KafkaLoader.java
@@ -0,0 +1,88 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.utils;
+
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.storm.flux.Flux;
+import storm.kafka.SpoutConfig;
+
+import java.io.BufferedReader;
+import java.io.FileReader;
+import java.util.HashMap;
+import java.util.Map;
+
+public class KafkaLoader {
+
+ private String brokerUrl;
+ private String topic;
+ private String samplePath;
+ private int delay = 1000;
+ private int iterations = -1;
+ private KafkaProducer kafkaProducer;
+
+ public KafkaLoader(String brokerUrl, String topic, String samplePath) {
+ this.brokerUrl = brokerUrl;
+ this.topic = topic;
+ this.samplePath = samplePath;
+ }
+
+ public KafkaLoader withDelay(int delay) {
+ this.delay = delay;
+ return this;
+ }
+
+ public KafkaLoader withIterations(int iterations) {
+ this.iterations = iterations;
+ return this;
+ }
+
+ public void start() {
+ Map<String, Object> producerConfig = new HashMap<>();
+ producerConfig.put("bootstrap.servers", brokerUrl);
+ producerConfig.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
+ producerConfig.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
+ kafkaProducer = new KafkaProducer<>(producerConfig);
+ try {
+ while (iterations == -1 || iterations-- > 0) {
+ BufferedReader reader = new BufferedReader(new FileReader(samplePath));
+ String line;
+ while((line = reader.readLine()) != null) {
+ kafkaProducer.send(new ProducerRecord<String, String>(topic, line));
+ Thread.sleep(delay);
+ }
+ reader.close();
+ }
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+
+ public void stop() {
+ kafkaProducer.close();
+ }
+
+
+ public static void main(String[] args) {
+ KafkaLoader kafkaLoader = new KafkaLoader(args[0], args[1], args[2]);
+ if (args.length > 3) kafkaLoader.withDelay(Integer.parseInt(args[3]));
+ if (args.length > 4) kafkaLoader.withIterations(Integer.parseInt(args[4]));
+ kafkaLoader.start();
+ kafkaLoader.stop();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/9f96399d/metron-streaming/Metron-Topologies/src/main/java/org/apache/metron/utils/SourceConfigUtils.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Topologies/src/main/java/org/apache/metron/utils/SourceConfigUtils.java b/metron-streaming/Metron-Topologies/src/main/java/org/apache/metron/utils/SourceConfigUtils.java
new file mode 100644
index 0000000..ef8b2e2
--- /dev/null
+++ b/metron-streaming/Metron-Topologies/src/main/java/org/apache/metron/utils/SourceConfigUtils.java
@@ -0,0 +1,95 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.utils;
+
+import org.apache.curator.RetryPolicy;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.retry.ExponentialBackoffRetry;
+import org.apache.metron.Constants;
+import org.apache.metron.domain.SourceConfig;
+import org.apache.zookeeper.KeeperException;
+
+import java.io.ByteArrayInputStream;
+import java.io.File;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.util.List;
+
+public class SourceConfigUtils {
+
+ public static CuratorFramework getClient(String zookeeperUrl) {
+ RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
+ return CuratorFrameworkFactory.newClient(zookeeperUrl, retryPolicy);
+ }
+
+ public static void writeToZookeeperFromFile(String sourceName, String filePath, String zookeeperUrl) throws Exception {
+ writeToZookeeper(sourceName, Files.readAllBytes(Paths.get(filePath)), zookeeperUrl);
+ }
+
+ public static void writeToZookeeper(String sourceName, byte[] configData, String zookeeperUrl) throws Exception {
+ CuratorFramework client = getClient(zookeeperUrl);
+ client.start();
+ try {
+ client.setData().forPath(Constants.ZOOKEEPER_TOPOLOGY_ROOT + "/" + sourceName, configData);
+ } catch(KeeperException.NoNodeException e) {
+ client.create().creatingParentsIfNeeded().forPath(Constants.ZOOKEEPER_TOPOLOGY_ROOT + "/" + sourceName, configData);
+ }
+ client.close();
+ }
+
+ public static byte[] readConfigBytesFromZookeeper(String sourceName, String zookeeperUrl) throws Exception {
+ CuratorFramework client = getClient(zookeeperUrl);
+ client.start();
+ byte[] data = client.getData().forPath(Constants.ZOOKEEPER_TOPOLOGY_ROOT + "/" + sourceName);
+ client.close();
+ return data;
+ }
+
+ public static SourceConfig readConfigFromZookeeper(String sourceName, String zookeeperUrl) throws Exception {
+ byte[] data = readConfigBytesFromZookeeper(sourceName, zookeeperUrl);
+ return SourceConfig.load(new ByteArrayInputStream(data));
+ }
+
+ public static void dumpConfigs(String zookeeperUrl) throws Exception {
+ CuratorFramework client = getClient(zookeeperUrl);
+ client.start();
+ List<String> children = client.getChildren().forPath(Constants.ZOOKEEPER_TOPOLOGY_ROOT);
+ for(String child: children) {
+ byte[] data = client.getData().forPath(Constants.ZOOKEEPER_TOPOLOGY_ROOT + "/" + child);
+ System.out.println("Config for source " + child);
+ System.out.println(new String(data));
+ System.out.println();
+ }
+ client.close();
+ }
+
+ public static void main(String[] args) {
+ try {
+ File root = new File("./metron-streaming/Metron-Common/src/test/resources/config/source/");
+ for(File child: root.listFiles()) {
+ writeToZookeeperFromFile(child.getName().replaceFirst("-config.json", ""), child.getPath(), "node1:2181");
+ }
+ SourceConfigUtils.dumpConfigs("node1:2181");
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/9f96399d/metron-streaming/Metron-Topologies/src/main/resources/Metron_Configs/topologies/asa/local.yaml
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Topologies/src/main/resources/Metron_Configs/topologies/asa/local.yaml b/metron-streaming/Metron-Topologies/src/main/resources/Metron_Configs/topologies/asa/local.yaml
deleted file mode 100644
index 7473b01..0000000
--- a/metron-streaming/Metron-Topologies/src/main/resources/Metron_Configs/topologies/asa/local.yaml
+++ /dev/null
@@ -1,401 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-name: "asa-local"
-config:
- topology.workers: 1
-
-components:
- - id: "parser"
- className: "org.apache.metron.parsing.parsers.GrokAsaParser"
- - id: "jdbcConfig"
- className: "org.apache.metron.enrichment.adapters.jdbc.MySqlConfig"
- properties:
- - name: "host"
- value: "${mysql.ip}"
- - name: "port"
- value: ${mysql.port}
- - name: "username"
- value: "${mysql.username}"
- - name: "password"
- value: "${mysql.password}"
- - name: "table"
- value: "GEO"
- - id: "geoEnrichmentAdapter"
- className: "org.apache.metron.enrichment.adapters.geo.GeoAdapter"
- configMethods:
- - name: "withJdbcConfig"
- args:
- - ref: "jdbcConfig"
- - id: "geoEnrichment"
- className: "org.apache.metron.domain.Enrichment"
- properties:
- - name: "name"
- value: "geo"
- - name: "fields"
- value: ["ip_src_addr", "ip_dst_addr"]
- - name: "adapter"
- ref: "geoEnrichmentAdapter"
- - id: "hostEnrichmentAdapter"
- className: "org.apache.metron.enrichment.adapters.host.HostFromJSONListAdapter"
- constructorArgs:
- - '${org.apache.metron.enrichment.host.known_hosts}'
- - id: "hostEnrichment"
- className: "org.apache.metron.domain.Enrichment"
- properties:
- - name: "name"
- value: "host"
- - name: "fields"
- value: ["ip_src_addr", "ip_dst_addr"]
- - name: "adapter"
- ref: "hostEnrichmentAdapter"
- - id: "enrichments"
- className: "java.util.ArrayList"
- configMethods:
- - name: "add"
- args:
- - ref: "geoEnrichment"
- - name: "add"
- args:
- - ref: "hostEnrichment"
- - id: "indexAdapter"
- className: "org.apache.metron.indexing.adapters.ESTimedRotatingAdapter"
- - id: "alertsConfig"
- className: "java.util.HashMap"
- configMethods:
- - name: "put"
- args: ["whitelist_table_name", "ip_whitelist"]
- - name: "put"
- args: ["blacklist_table_name", "ip_blacklist"]
- - name: "put"
- args: ["quorum", "mon.cluster2.ctolab.hortonworks.com, nn1.cluster2.ctolab.hortonworks.com, nn2.cluster2.ctolab.hortonworks.com"]
- - name: "put"
- args: ["port", "2181"]
- - name: "put"
- args: ["_MAX_CACHE_SIZE_OBJECTS_NUM", "3600"]
- - name: "put"
- args: ["_MAX_TIME_RETAIN_MINUTES", "1000"]
- - id: "alertsAdapter"
- className: "org.apache.metron.alerts.adapters.CIFAlertsAdapter"
- constructorArgs:
- - ref: "alertsConfig"
- - id: "alertsIdentifier"
- className: "org.json.simple.JSONObject"
- configMethods:
- - name: "put"
- args: ["environment", "local"]
- - name: "put"
- args: ["topology", "asa"]
- - id: "metricConfig"
- className: "org.apache.commons.configuration.BaseConfiguration"
- configMethods:
- - name: "setProperty"
- args:
- - "org.apache.metron.metrics.reporter.graphite"
- - "${org.apache.metron.metrics.reporter.graphite}"
- - name: "setProperty"
- args:
- - "org.apache.metron.metrics.reporter.console"
- - "${org.apache.metron.metrics.reporter.console}"
- - name: "setProperty"
- args:
- - "org.apache.metron.metrics.reporter.jmx"
- - "${org.apache.metron.metrics.reporter.jmx}"
- - name: "setProperty"
- args:
- - "org.apache.metron.metrics.graphite.address"
- - "${org.apache.metron.metrics.graphite.address}"
- - name: "setProperty"
- args:
- - "org.apache.metron.metrics.graphite.port"
- - "${org.apache.metron.metrics.graphite.port}"
- - name: "setProperty"
- args:
- - "org.apache.metron.metrics.TelemetryParserBolt.acks"
- - "${org.apache.metron.metrics.TelemetryParserBolt.acks}"
- - name: "setProperty"
- args:
- - "org.apache.metron.metrics.TelemetryParserBolt.emits"
- - "${org.apache.metron.metrics.TelemetryParserBolt.emits}"
- - name: "setProperty"
- args:
- - "org.apache.metron.metrics.TelemetryParserBolt.fails"
- - "${org.apache.metron.metrics.TelemetryParserBolt.fails}"
- - name: "setProperty"
- args:
- - "org.apache.metron.metrics.GenericEnrichmentBolt.acks"
- - "${org.apache.metron.metrics.GenericEnrichmentBolt.acks}"
- - name: "setProperty"
- args:
- - "org.apache.metron.metrics.GenericEnrichmentBolt.emits"
- - "${org.apache.metron.metrics.GenericEnrichmentBolt.emits}"
- - name: "setProperty"
- args:
- - "org.apache.metron.metrics.GenericEnrichmentBolt.fails"
- - "${org.apache.metron.metrics.GenericEnrichmentBolt.fails}"
- - name: "setProperty"
- args:
- - "org.apache.metron.metrics.TelemetryIndexingBolt.acks"
- - "${org.apache.metron.metrics.TelemetryIndexingBolt.acks}"
- - name: "setProperty"
- args:
- - "org.apache.metron.metrics.TelemetryIndexingBolt.emits"
- - "${org.apache.metron.metrics.TelemetryIndexingBolt.emits}"
- - name: "setProperty"
- args:
- - "org.apache.metron.metrics.TelemetryIndexingBolt.fails"
- - "${org.apache.metron.metrics.TelemetryIndexingBolt.fails}"
-
-spouts:
- - id: "testingSpout"
- className: "org.apache.metron.test.spouts.GenericInternalTestSpout"
- parallelism: 1
- configMethods:
- - name: "withFilename"
- args:
- - "SampleInput/AsaOutput"
- - name: "withRepeating"
- args:
- - true
-
-bolts:
- - id: "parserBolt"
- className: "org.apache.metron.bolt.TelemetryParserBolt"
- configMethods:
- - name: "withMessageParser"
- args:
- - ref: "parser"
- - name: "withEnrichments"
- args:
- - ref: "enrichments"
- - id: "indexingBolt"
- className: "org.apache.metron.indexing.TelemetryIndexingBolt"
- configMethods:
- - name: "withIndexIP"
- args:
- - "${es.ip}"
- - name: "withIndexPort"
- args:
- - ${es.port}
- - name: "withClusterName"
- args:
- - "${es.clustername}"
- - name: "withIndexName"
- args:
- - "asa_index"
- - name: "withIndexTimestamp"
- args:
- - "yyyy.MM.dd.hh"
- - name: "withDocumentName"
- args:
- - "asa_doc"
- - name: "withBulk"
- args:
- - 1
- - name: "withIndexAdapter"
- args:
- - ref: "indexAdapter"
- - name: "withMetricConfiguration"
- args:
- - ref: "metricConfig"
- - id: "alertsBolt"
- className: "org.apache.metron.alerts.TelemetryAlertsBolt"
- configMethods:
- - name: "withIdentifier"
- args:
- - ref: "alertsIdentifier"
- - name: "withMaxCacheSize"
- args: [1000]
- - name: "withMaxTimeRetain"
- args: [3600]
- - name: "withAlertsAdapter"
- args:
- - ref: "alertsAdapter"
- - name: "withOutputFieldName"
- args: ["message"]
- - name: "withMetricConfiguration"
- args:
- - ref: "metricConfig"
- - id: "alertsIndexingBolt"
- className: "org.apache.metron.indexing.TelemetryIndexingBolt"
- configMethods:
- - name: "withIndexIP"
- args:
- - "${es.ip}"
- - name: "withIndexPort"
- args:
- - ${es.port}
- - name: "withClusterName"
- args:
- - "${es.clustername}"
- - name: "withIndexName"
- args:
- - "alert"
- - name: "withIndexTimestamp"
- args:
- - "yyyy.MM.ww"
- - name: "withDocumentName"
- args:
- - "asa_alert"
- - name: "withBulk"
- args:
- - 1
- - name: "withIndexAdapter"
- args:
- - ref: "indexAdapter"
- - name: "withMetricConfiguration"
- args:
- - ref: "metricConfig"
- - id: "errorIndexingBolt"
- className: "org.apache.metron.indexing.TelemetryIndexingBolt"
- configMethods:
- - name: "withIndexIP"
- args:
- - "${es.ip}"
- - name: "withIndexPort"
- args:
- - ${es.port}
- - name: "withClusterName"
- args:
- - "${es.clustername}"
- - name: "withIndexName"
- args:
- - "error"
- - name: "withIndexTimestamp"
- args:
- - "yyyy.MM"
- - name: "withDocumentName"
- args:
- - "asa_error"
- - name: "withBulk"
- args:
- - 1
- - name: "withIndexAdapter"
- args:
- - ref: "indexAdapter"
- - name: "withMetricConfiguration"
- args:
- - ref: "metricConfig"
- - id: "geoEnrichmentBolt"
- className: "org.apache.metron.enrichment.bolt.GenericEnrichmentBolt"
- configMethods:
- - name: "withEnrichment"
- args:
- - ref: "geoEnrichment"
- - name: "withMaxCacheSize"
- args: [10000]
- - name: "withMaxTimeRetain"
- args: [10]
- - id: "hostEnrichmentBolt"
- className: "org.apache.metron.enrichment.bolt.GenericEnrichmentBolt"
- configMethods:
- - name: "withEnrichment"
- args:
- - ref: "hostEnrichment"
- - name: "withMaxCacheSize"
- args: [10000]
- - name: "withMaxTimeRetain"
- args: [10]
- - id: "joinBolt"
- className: "org.apache.metron.enrichment.bolt.EnrichmentJoinBolt"
- configMethods:
- - name: "withEnrichments"
- args:
- - ref: "enrichments"
- - name: "withMaxCacheSize"
- args: [10000]
- - name: "withMaxTimeRetain"
- args: [10]
-
-streams:
- - name: "spout -> parser"
- from: "testingSpout"
- to: "parserBolt"
- grouping:
- type: SHUFFLE
- - name: "parser -> host"
- from: "parserBolt"
- to: "hostEnrichmentBolt"
- grouping:
- streamId: "host"
- type: FIELDS
- args: ["key"]
- - name: "parser -> geo"
- from: "parserBolt"
- to: "geoEnrichmentBolt"
- grouping:
- streamId: "geo"
- type: FIELDS
- args: ["key"]
- - name: "parser -> join"
- from: "parserBolt"
- to: "joinBolt"
- grouping:
- streamId: "message"
- type: FIELDS
- args: ["key"]
- - name: "geo -> join"
- from: "geoEnrichmentBolt"
- to: "joinBolt"
- grouping:
- streamId: "geo"
- type: FIELDS
- args: ["key"]
- - name: "host -> join"
- from: "hostEnrichmentBolt"
- to: "joinBolt"
- grouping:
- streamId: "host"
- type: FIELDS
- args: ["key"]
- - name: "join -> alerts"
- from: "joinBolt"
- to: "alertsBolt"
- grouping:
- streamId: "message"
- type: FIELDS
- args: ["key"]
- - name: "alerts -> alertsIndexing"
- from: "alertsBolt"
- to: "alertsIndexingBolt"
- grouping:
- streamId: "message"
- type: SHUFFLE
- - name: "join -> indexing"
- from: "joinBolt"
- to: "indexingBolt"
- grouping:
- streamId: "message"
- type: FIELDS
- args: ["key"]
- - name: "parser -> errors"
- from: "parserBolt"
- to: "errorIndexingBolt"
- grouping:
- streamId: "error"
- type: SHUFFLE
- - name: "indexing -> errors"
- from: "indexingBolt"
- to: "errorIndexingBolt"
- grouping:
- streamId: "error"
- type: SHUFFLE
- - name: "alerts -> errors"
- from: "alertsBolt"
- to: "errorIndexingBolt"
- grouping:
- streamId: "error"
- type: SHUFFLE
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/9f96399d/metron-streaming/Metron-Topologies/src/main/resources/Metron_Configs/topologies/asa/remote.yaml
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Topologies/src/main/resources/Metron_Configs/topologies/asa/remote.yaml b/metron-streaming/Metron-Topologies/src/main/resources/Metron_Configs/topologies/asa/remote.yaml
index 94694ab..78c68d5 100644
--- a/metron-streaming/Metron-Topologies/src/main/resources/Metron_Configs/topologies/asa/remote.yaml
+++ b/metron-streaming/Metron-Topologies/src/main/resources/Metron_Configs/topologies/asa/remote.yaml
@@ -18,146 +18,14 @@ name: "asa"
config:
topology.workers: 1
+
components:
- id: "parser"
className: "org.apache.metron.parsing.parsers.GrokAsaParser"
- - id: "jdbcConfig"
- className: "org.apache.metron.enrichment.adapters.jdbc.MySqlConfig"
- properties:
- - name: "host"
- value: "${mysql.ip}"
- - name: "port"
- value: ${mysql.port}
- - name: "username"
- value: "${mysql.username}"
- - name: "password"
- value: "${mysql.password}"
- - name: "table"
- value: "GEO"
- - id: "geoEnrichmentAdapter"
- className: "org.apache.metron.enrichment.adapters.geo.GeoAdapter"
- configMethods:
- - name: "withJdbcConfig"
- args:
- - ref: "jdbcConfig"
- - id: "geoEnrichment"
- className: "org.apache.metron.domain.Enrichment"
- properties:
- - name: "name"
- value: "geo"
- - name: "fields"
- value: ["ip_src_addr", "ip_dst_addr"]
- - name: "adapter"
- ref: "geoEnrichmentAdapter"
- - id: "hostEnrichmentAdapter"
- className: "org.apache.metron.enrichment.adapters.host.HostFromJSONListAdapter"
- constructorArgs:
- - '${org.apache.metron.enrichment.host.known_hosts}'
- - id: "hostEnrichment"
- className: "org.apache.metron.domain.Enrichment"
- properties:
- - name: "name"
- value: "host"
- - name: "fields"
- value: ["ip_src_addr", "ip_dst_addr"]
- - name: "adapter"
- ref: "hostEnrichmentAdapter"
- - id: "enrichments"
- className: "java.util.ArrayList"
- configMethods:
- - name: "add"
- args:
- - ref: "geoEnrichment"
- - name: "add"
- args:
- - ref: "hostEnrichment"
- - id: "indexAdapter"
- className: "org.apache.metron.indexing.adapters.ESTimedRotatingAdapter"
- - id: "alertsConfig"
- className: "java.util.HashMap"
- configMethods:
- - name: "put"
- args: ["whitelist_table_name", "ip_whitelist"]
- - name: "put"
- args: ["blacklist_table_name", "ip_blacklist"]
- - name: "put"
- args: ["quorum", "mon.cluster2.ctolab.hortonworks.com, nn1.cluster2.ctolab.hortonworks.com, nn2.cluster2.ctolab.hortonworks.com"]
- - name: "put"
- args: ["port", "2181"]
- - name: "put"
- args: ["_MAX_CACHE_SIZE_OBJECTS_NUM", "3600"]
- - name: "put"
- args: ["_MAX_TIME_RETAIN_MINUTES", "1000"]
- - id: "alertsAdapter"
- className: "org.apache.metron.alerts.adapters.CIFAlertsAdapter"
+ - id: "writer"
+ className: "org.apache.metron.writer.KafkaWriter"
constructorArgs:
- - ref: "alertsConfig"
- - id: "alertsIdentifier"
- className: "org.json.simple.JSONObject"
- configMethods:
- - name: "put"
- args: ["environment", "local"]
- - name: "put"
- args: ["topology", "asa"]
- - id: "metricConfig"
- className: "org.apache.commons.configuration.BaseConfiguration"
- configMethods:
- - name: "setProperty"
- args:
- - "org.apache.metron.metrics.reporter.graphite"
- - "${org.apache.metron.metrics.reporter.graphite}"
- - name: "setProperty"
- args:
- - "org.apache.metron.metrics.reporter.console"
- - "${org.apache.metron.metrics.reporter.console}"
- - name: "setProperty"
- args:
- - "org.apache.metron.metrics.reporter.jmx"
- - "${org.apache.metron.metrics.reporter.jmx}"
- - name: "setProperty"
- args:
- - "org.apache.metron.metrics.graphite.address"
- - "${org.apache.metron.metrics.graphite.address}"
- - name: "setProperty"
- args:
- - "org.apache.metron.metrics.graphite.port"
- - "${org.apache.metron.metrics.graphite.port}"
- - name: "setProperty"
- args:
- - "org.apache.metron.metrics.TelemetryParserBolt.acks"
- - "${org.apache.metron.metrics.TelemetryParserBolt.acks}"
- - name: "setProperty"
- args:
- - "org.apache.metron.metrics.TelemetryParserBolt.emits"
- - "${org.apache.metron.metrics.TelemetryParserBolt.emits}"
- - name: "setProperty"
- args:
- - "org.apache.metron.metrics.TelemetryParserBolt.fails"
- - "${org.apache.metron.metrics.TelemetryParserBolt.fails}"
- - name: "setProperty"
- args:
- - "org.apache.metron.metrics.GenericEnrichmentBolt.acks"
- - "${org.apache.metron.metrics.GenericEnrichmentBolt.acks}"
- - name: "setProperty"
- args:
- - "org.apache.metron.metrics.GenericEnrichmentBolt.emits"
- - "${org.apache.metron.metrics.GenericEnrichmentBolt.emits}"
- - name: "setProperty"
- args:
- - "org.apache.metron.metrics.GenericEnrichmentBolt.fails"
- - "${org.apache.metron.metrics.GenericEnrichmentBolt.fails}"
- - name: "setProperty"
- args:
- - "org.apache.metron.metrics.TelemetryIndexingBolt.acks"
- - "${org.apache.metron.metrics.TelemetryIndexingBolt.acks}"
- - name: "setProperty"
- args:
- - "org.apache.metron.metrics.TelemetryIndexingBolt.emits"
- - "${org.apache.metron.metrics.TelemetryIndexingBolt.emits}"
- - name: "setProperty"
- args:
- - "org.apache.metron.metrics.TelemetryIndexingBolt.fails"
- - "${org.apache.metron.metrics.TelemetryIndexingBolt.fails}"
+ - "${kafka.broker}"
- id: "zkHosts"
className: "storm.kafka.ZkHosts"
constructorArgs:
@@ -168,18 +36,30 @@ components:
# zookeeper hosts
- ref: "zkHosts"
# topic name
- - "${spout.kafka.topic.pcap}"
+ - "${spout.kafka.topic.asa}"
# zk root
- ""
# id
- - "${spout.kafka.topic.pcap}"
+ - "${spout.kafka.topic.asa}"
properties:
- - name: "forceFromStart"
+ - name: "ignoreZkOffsets"
value: true
- name: "startOffsetTime"
value: -1
+ - name: "socketTimeoutMs"
+ value: 1000000
spouts:
+ - id: "testingSpout"
+ className: "org.apache.metron.test.spouts.GenericInternalTestSpout"
+ parallelism: 1
+ configMethods:
+ - name: "withFilename"
+ args:
+ - "SampleInput/YafExampleOutput"
+ - name: "withRepeating"
+ args:
+ - true
- id: "kafkaSpout"
className: "storm.kafka.KafkaSpout"
constructorArgs:
@@ -187,229 +67,16 @@ spouts:
bolts:
- id: "parserBolt"
- className: "org.apache.metron.bolt.TelemetryParserBolt"
- configMethods:
- - name: "withMessageParser"
- args:
- - ref: "parser"
- - name: "withEnrichments"
- args:
- - ref: "enrichments"
- - id: "indexingBolt"
- className: "org.apache.metron.indexing.TelemetryIndexingBolt"
- configMethods:
- - name: "withIndexIP"
- args:
- - "${es.ip}"
- - name: "withIndexPort"
- args:
- - ${es.port}
- - name: "withClusterName"
- args:
- - "${es.clustername}"
- - name: "withIndexName"
- args:
- - "asa_index"
- - name: "withIndexTimestamp"
- args:
- - "yyyy.MM.dd.hh"
- - name: "withDocumentName"
- args:
- - "asa_doc"
- - name: "withBulk"
- args:
- - 1
- - name: "withIndexAdapter"
- args:
- - ref: "indexAdapter"
- - name: "withMetricConfiguration"
- args:
- - ref: "metricConfig"
- - id: "alertsBolt"
- className: "org.apache.metron.alerts.TelemetryAlertsBolt"
- configMethods:
- - name: "withIdentifier"
- args:
- - ref: "alertsIdentifier"
- - name: "withMaxCacheSize"
- args: [1000]
- - name: "withMaxTimeRetain"
- args: [3600]
- - name: "withAlertsAdapter"
- args:
- - ref: "alertsAdapter"
- - name: "withOutputFieldName"
- args: ["message"]
- - name: "withMetricConfiguration"
- args:
- - ref: "metricConfig"
- - id: "alertsIndexingBolt"
- className: "org.apache.metron.indexing.TelemetryIndexingBolt"
- configMethods:
- - name: "withIndexIP"
- args:
- - "${es.ip}"
- - name: "withIndexPort"
- args:
- - ${es.port}
- - name: "withClusterName"
- args:
- - "${es.clustername}"
- - name: "withIndexName"
- args:
- - "alert"
- - name: "withIndexTimestamp"
- args:
- - "yyyy.MM.ww"
- - name: "withDocumentName"
- args:
- - "asa_alert"
- - name: "withBulk"
- args:
- - 1
- - name: "withIndexAdapter"
- args:
- - ref: "indexAdapter"
- - name: "withMetricConfiguration"
- args:
- - ref: "metricConfig"
- - id: "errorIndexingBolt"
- className: "org.apache.metron.indexing.TelemetryIndexingBolt"
- configMethods:
- - name: "withIndexIP"
- args:
- - "${es.ip}"
- - name: "withIndexPort"
- args:
- - ${es.port}
- - name: "withClusterName"
- args:
- - "${es.clustername}"
- - name: "withIndexName"
- args:
- - "error"
- - name: "withIndexTimestamp"
- args:
- - "yyyy.MM"
- - name: "withDocumentName"
- args:
- - "asa_error"
- - name: "withBulk"
- args:
- - 1
- - name: "withIndexAdapter"
- args:
- - ref: "indexAdapter"
- - name: "withMetricConfiguration"
- args:
- - ref: "metricConfig"
- - id: "geoEnrichmentBolt"
- className: "org.apache.metron.enrichment.bolt.GenericEnrichmentBolt"
- configMethods:
- - name: "withEnrichment"
- args:
- - ref: "geoEnrichment"
- - name: "withMaxCacheSize"
- args: [10000]
- - name: "withMaxTimeRetain"
- args: [10]
- - id: "hostEnrichmentBolt"
- className: "org.apache.metron.enrichment.bolt.GenericEnrichmentBolt"
- configMethods:
- - name: "withEnrichment"
- args:
- - ref: "hostEnrichment"
- - name: "withMaxCacheSize"
- args: [10000]
- - name: "withMaxTimeRetain"
- args: [10]
- - id: "joinBolt"
- className: "org.apache.metron.enrichment.bolt.EnrichmentJoinBolt"
- configMethods:
- - name: "withEnrichments"
- args:
- - ref: "enrichments"
- - name: "withMaxCacheSize"
- args: [10000]
- - name: "withMaxTimeRetain"
- args: [10]
+ className: "org.apache.metron.bolt.ParserBolt"
+ constructorArgs:
+ - "${kafka.zk}"
+ - "yaf"
+ - ref: "parser"
+ - ref: "writer"
streams:
- - name: "spout -> parser"
+ - name: "spout -> bolt"
from: "kafkaSpout"
to: "parserBolt"
grouping:
type: SHUFFLE
- - name: "parser -> host"
- from: "parserBolt"
- to: "hostEnrichmentBolt"
- grouping:
- streamId: "host"
- type: FIELDS
- args: ["key"]
- - name: "parser -> geo"
- from: "parserBolt"
- to: "geoEnrichmentBolt"
- grouping:
- streamId: "geo"
- type: FIELDS
- args: ["key"]
- - name: "parser -> join"
- from: "parserBolt"
- to: "joinBolt"
- grouping:
- streamId: "message"
- type: FIELDS
- args: ["key"]
- - name: "geo -> join"
- from: "geoEnrichmentBolt"
- to: "joinBolt"
- grouping:
- streamId: "geo"
- type: FIELDS
- args: ["key"]
- - name: "host -> join"
- from: "hostEnrichmentBolt"
- to: "joinBolt"
- grouping:
- streamId: "host"
- type: FIELDS
- args: ["key"]
- - name: "join -> alerts"
- from: "joinBolt"
- to: "alertsBolt"
- grouping:
- streamId: "message"
- type: FIELDS
- args: ["key"]
- - name: "alerts -> alertsIndexing"
- from: "alertsBolt"
- to: "alertsIndexingBolt"
- grouping:
- streamId: "message"
- type: SHUFFLE
- - name: "join -> indexing"
- from: "joinBolt"
- to: "indexingBolt"
- grouping:
- streamId: "message"
- type: FIELDS
- args: ["key"]
- - name: "parser -> errors"
- from: "parserBolt"
- to: "errorIndexingBolt"
- grouping:
- streamId: "error"
- type: SHUFFLE
- - name: "indexing -> errors"
- from: "indexingBolt"
- to: "errorIndexingBolt"
- grouping:
- streamId: "error"
- type: SHUFFLE
- - name: "alerts -> errors"
- from: "alertsBolt"
- to: "errorIndexingBolt"
- grouping:
- streamId: "error"
- type: SHUFFLE
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/9f96399d/metron-streaming/Metron-Topologies/src/main/resources/Metron_Configs/topologies/asa/test.yaml
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Topologies/src/main/resources/Metron_Configs/topologies/asa/test.yaml b/metron-streaming/Metron-Topologies/src/main/resources/Metron_Configs/topologies/asa/test.yaml
new file mode 100644
index 0000000..9114d94
--- /dev/null
+++ b/metron-streaming/Metron-Topologies/src/main/resources/Metron_Configs/topologies/asa/test.yaml
@@ -0,0 +1,82 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+name: "asa-test"
+config:
+ topology.workers: 1
+
+
+components:
+ - id: "parser"
+ className: "org.apache.metron.parsing.parsers.GrokAsaParser"
+ - id: "writer"
+ className: "org.apache.metron.writer.KafkaWriter"
+ constructorArgs:
+ - "${kafka.broker}"
+ - id: "zkHosts"
+ className: "storm.kafka.ZkHosts"
+ constructorArgs:
+ - "${kafka.zk}"
+ - id: "kafkaConfig"
+ className: "storm.kafka.SpoutConfig"
+ constructorArgs:
+ # zookeeper hosts
+ - ref: "zkHosts"
+ # topic name
+ - "${spout.kafka.topic.asa}"
+ # zk root
+ - ""
+ # id
+ - "${spout.kafka.topic.asa}"
+ properties:
+ - name: "ignoreZkOffsets"
+ value: true
+ - name: "startOffsetTime"
+ value: -2
+ - name: "socketTimeoutMs"
+ value: 1000000
+
+spouts:
+ - id: "testingSpout"
+ className: "org.apache.metron.test.spouts.GenericInternalTestSpout"
+ parallelism: 1
+ configMethods:
+ - name: "withFilename"
+ args:
+ - "SampleInput/YafExampleOutput"
+ - name: "withRepeating"
+ args:
+ - false
+ - id: "kafkaSpout"
+ className: "storm.kafka.KafkaSpout"
+ constructorArgs:
+ - ref: "kafkaConfig"
+
+bolts:
+ - id: "parserBolt"
+ className: "org.apache.metron.bolt.ParserBolt"
+ constructorArgs:
+ - "${kafka.zk}"
+ - "yaf"
+ - ref: "parser"
+ - ref: "writer"
+
+streams:
+ - name: "spout -> bolt"
+ from: "kafkaSpout"
+ to: "parserBolt"
+ grouping:
+ type: SHUFFLE
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/9f96399d/metron-streaming/Metron-Topologies/src/main/resources/Metron_Configs/topologies/bro/local.yaml
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Topologies/src/main/resources/Metron_Configs/topologies/bro/local.yaml b/metron-streaming/Metron-Topologies/src/main/resources/Metron_Configs/topologies/bro/local.yaml
deleted file mode 100644
index 851f9d9..0000000
--- a/metron-streaming/Metron-Topologies/src/main/resources/Metron_Configs/topologies/bro/local.yaml
+++ /dev/null
@@ -1,192 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-name: "bro-local"
-config:
- topology.workers: 1
-
-components:
- - id: "broParser"
- className: "org.apache.metron.parsing.parsers.BasicBroParser"
- - id: "genericMessageFilter"
- className: "org.apache.metron.filters.GenericMessageFilter"
- - id: "indexAdapter"
- className: "org.apache.metron.indexing.adapters.ESTimedRotatingAdapter"
- - id: "metricConfig"
- className: "org.apache.commons.configuration.BaseConfiguration"
- configMethods:
- - name: "setProperty"
- args:
- - "org.apache.metron.metrics.reporter.graphite"
- - "${org.apache.metron.metrics.reporter.graphite}"
- - name: "setProperty"
- args:
- - "org.apache.metron.metrics.reporter.console"
- - "${org.apache.metron.metrics.reporter.console}"
- - name: "setProperty"
- args:
- - "org.apache.metron.metrics.reporter.jmx"
- - "${org.apache.metron.metrics.reporter.jmx}"
- - name: "setProperty"
- args:
- - "org.apache.metron.metrics.graphite.address"
- - "${org.apache.metron.metrics.graphite.address}"
- - name: "setProperty"
- args:
- - "org.apache.metron.metrics.graphite.port"
- - "${org.apache.metron.metrics.graphite.port}"
- - name: "setProperty"
- args:
- - "org.apache.metron.metrics.TelemetryParserBolt.acks"
- - "${org.apache.metron.metrics.TelemetryParserBolt.acks}"
- - name: "setProperty"
- args:
- - "org.apache.metron.metrics.TelemetryParserBolt.emits"
- - "${org.apache.metron.metrics.TelemetryParserBolt.emits}"
- - name: "setProperty"
- args:
- - "org.apache.metron.metrics.TelemetryParserBolt.fails"
- - "${org.apache.metron.metrics.TelemetryParserBolt.fails}"
- - name: "setProperty"
- args:
- - "org.apache.metron.metrics.GenericEnrichmentBolt.acks"
- - "${org.apache.metron.metrics.GenericEnrichmentBolt.acks}"
- - name: "setProperty"
- args:
- - "org.apache.metron.metrics.GenericEnrichmentBolt.emits"
- - "${org.apache.metron.metrics.GenericEnrichmentBolt.emits}"
- - name: "setProperty"
- args:
- - "org.apache.metron.metrics.GenericEnrichmentBolt.fails"
- - "${org.apache.metron.metrics.GenericEnrichmentBolt.fails}"
- - name: "setProperty"
- args:
- - "org.apache.metron.metrics.TelemetryIndexingBolt.acks"
- - "${org.apache.metron.metrics.TelemetryIndexingBolt.acks}"
- - name: "setProperty"
- args:
- - "org.apache.metron.metrics.TelemetryIndexingBolt.emits"
- - "${org.apache.metron.metrics.TelemetryIndexingBolt.emits}"
- - name: "setProperty"
- args:
- - "org.apache.metron.metrics.TelemetryIndexingBolt.fails"
- - "${org.apache.metron.metrics.TelemetryIndexingBolt.fails}"
-
-spouts:
- - id: "testingSpout"
- className: "org.apache.metron.test.spouts.GenericInternalTestSpout"
- parallelism: 1
- configMethods:
- - name: "withFilename"
- args:
- - "SampleInput/BroExampleOutput"
- - name: "withRepeating"
- args:
- - true
-
-bolts:
- - id: "parserBolt"
- className: "org.apache.metron.bolt.TelemetryParserBolt"
- configMethods:
- - name: "withMessageParser"
- args:
- - ref: "broParser"
- - id: "indexingBolt"
- className: "org.apache.metron.indexing.TelemetryIndexingBolt"
- configMethods:
- - name: "withIndexIP"
- args:
- - "${es.ip}"
- - name: "withIndexPort"
- args:
- - ${es.port}
- - name: "withClusterName"
- args:
- - "${es.clustername}"
- - name: "withIndexName"
- args:
- - "bro_index"
- - name: "withIndexTimestamp"
- args:
- - "yyyy.MM.dd.hh"
- - name: "withDocumentName"
- args:
- - "bro_doc"
- - name: "withBulk"
- args:
- - 1
- - name: "withIndexAdapter"
- args:
- - ref: "indexAdapter"
- - name: "withMetricConfiguration"
- args:
- - ref: "metricConfig"
- - id: "errorIndexingBolt"
- className: "org.apache.metron.indexing.TelemetryIndexingBolt"
- configMethods:
- - name: "withIndexIP"
- args:
- - "${es.ip}"
- - name: "withIndexPort"
- args:
- - ${es.port}
- - name: "withClusterName"
- args:
- - "${es.clustername}"
- - name: "withIndexName"
- args:
- - "error"
- - name: "withIndexTimestamp"
- args:
- - "yyyy.MM"
- - name: "withDocumentName"
- args:
- - "bro_error"
- - name: "withBulk"
- args:
- - 1
- - name: "withIndexAdapter"
- args:
- - ref: "indexAdapter"
- - name: "withMetricConfiguration"
- args:
- - ref: "metricConfig"
-
-streams:
- - name: "spout -> parser"
- from: "testingSpout"
- to: "parserBolt"
- grouping:
- type: SHUFFLE
- - name: "parser -> indexing"
- from: "parserBolt"
- to: "indexingBolt"
- grouping:
- streamId: "message"
- type: FIELDS
- args: ["key"]
- - name: "parser -> errors"
- from: "parserBolt"
- to: "errorIndexingBolt"
- grouping:
- streamId: "error"
- type: SHUFFLE
- - name: "indexing -> errors"
- from: "indexingBolt"
- to: "errorIndexingBolt"
- grouping:
- streamId: "error"
- type: SHUFFLE
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/9f96399d/metron-streaming/Metron-Topologies/src/main/resources/Metron_Configs/topologies/bro/remote.yaml
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Topologies/src/main/resources/Metron_Configs/topologies/bro/remote.yaml b/metron-streaming/Metron-Topologies/src/main/resources/Metron_Configs/topologies/bro/remote.yaml
index 96d836e..fb594b5 100644
--- a/metron-streaming/Metron-Topologies/src/main/resources/Metron_Configs/topologies/bro/remote.yaml
+++ b/metron-streaming/Metron-Topologies/src/main/resources/Metron_Configs/topologies/bro/remote.yaml
@@ -18,72 +18,14 @@ name: "bro"
config:
topology.workers: 1
+
components:
- - id: "broParser"
+ - id: "parser"
className: "org.apache.metron.parsing.parsers.BasicBroParser"
- - id: "genericMessageFilter"
- className: "org.apache.metron.filters.GenericMessageFilter"
- - id: "indexAdapter"
- className: "org.apache.metron.indexing.adapters.ESTimedRotatingAdapter"
- - id: "metricConfig"
- className: "org.apache.commons.configuration.BaseConfiguration"
- configMethods:
- - name: "setProperty"
- args:
- - "org.apache.metron.metrics.reporter.graphite"
- - "${org.apache.metron.metrics.reporter.graphite}"
- - name: "setProperty"
- args:
- - "org.apache.metron.metrics.reporter.console"
- - "${org.apache.metron.metrics.reporter.console}"
- - name: "setProperty"
- args:
- - "org.apache.metron.metrics.reporter.jmx"
- - "${org.apache.metron.metrics.reporter.jmx}"
- - name: "setProperty"
- args:
- - "org.apache.metron.metrics.graphite.address"
- - "${org.apache.metron.metrics.graphite.address}"
- - name: "setProperty"
- args:
- - "org.apache.metron.metrics.graphite.port"
- - "${org.apache.metron.metrics.graphite.port}"
- - name: "setProperty"
- args:
- - "org.apache.metron.metrics.TelemetryParserBolt.acks"
- - "${org.apache.metron.metrics.TelemetryParserBolt.acks}"
- - name: "setProperty"
- args:
- - "org.apache.metron.metrics.TelemetryParserBolt.emits"
- - "${org.apache.metron.metrics.TelemetryParserBolt.emits}"
- - name: "setProperty"
- args:
- - "org.apache.metron.metrics.TelemetryParserBolt.fails"
- - "${org.apache.metron.metrics.TelemetryParserBolt.fails}"
- - name: "setProperty"
- args:
- - "org.apache.metron.metrics.GenericEnrichmentBolt.acks"
- - "${org.apache.metron.metrics.GenericEnrichmentBolt.acks}"
- - name: "setProperty"
- args:
- - "org.apache.metron.metrics.GenericEnrichmentBolt.emits"
- - "${org.apache.metron.metrics.GenericEnrichmentBolt.emits}"
- - name: "setProperty"
- args:
- - "org.apache.metron.metrics.GenericEnrichmentBolt.fails"
- - "${org.apache.metron.metrics.GenericEnrichmentBolt.fails}"
- - name: "setProperty"
- args:
- - "org.apache.metron.metrics.TelemetryIndexingBolt.acks"
- - "${org.apache.metron.metrics.TelemetryIndexingBolt.acks}"
- - name: "setProperty"
- args:
- - "org.apache.metron.metrics.TelemetryIndexingBolt.emits"
- - "${org.apache.metron.metrics.TelemetryIndexingBolt.emits}"
- - name: "setProperty"
- args:
- - "org.apache.metron.metrics.TelemetryIndexingBolt.fails"
- - "${org.apache.metron.metrics.TelemetryIndexingBolt.fails}"
+ - id: "writer"
+ className: "org.apache.metron.writer.KafkaWriter"
+ constructorArgs:
+ - "${kafka.broker}"
- id: "zkHosts"
className: "storm.kafka.ZkHosts"
constructorArgs:
@@ -100,12 +42,24 @@ components:
# id
- "${spout.kafka.topic.bro}"
properties:
- - name: "forceFromStart"
+ - name: "ignoreZkOffsets"
value: true
- name: "startOffsetTime"
value: -1
+ - name: "socketTimeoutMs"
+ value: 1000000
spouts:
+ - id: "testingSpout"
+ className: "org.apache.metron.test.spouts.GenericInternalTestSpout"
+ parallelism: 1
+ configMethods:
+ - name: "withFilename"
+ args:
+ - "SampleInput/YafExampleOutput"
+ - name: "withRepeating"
+ args:
+ - true
- id: "kafkaSpout"
className: "storm.kafka.KafkaSpout"
constructorArgs:
@@ -113,94 +67,16 @@ spouts:
bolts:
- id: "parserBolt"
- className: "org.apache.metron.bolt.TelemetryParserBolt"
- configMethods:
- - name: "withMessageParser"
- args:
- - ref: "broParser"
- - id: "indexingBolt"
- className: "org.apache.metron.indexing.TelemetryIndexingBolt"
- configMethods:
- - name: "withIndexIP"
- args:
- - "${es.ip}"
- - name: "withIndexPort"
- args:
- - ${es.port}
- - name: "withClusterName"
- args:
- - "${es.clustername}"
- - name: "withIndexName"
- args:
- - "bro_index"
- - name: "withIndexTimestamp"
- args:
- - "yyyy.MM.dd.hh"
- - name: "withDocumentName"
- args:
- - "bro_doc"
- - name: "withBulk"
- args:
- - 1
- - name: "withIndexAdapter"
- args:
- - ref: "indexAdapter"
- - name: "withMetricConfiguration"
- args:
- - ref: "metricConfig"
- - id: "errorIndexingBolt"
- className: "org.apache.metron.indexing.TelemetryIndexingBolt"
- configMethods:
- - name: "withIndexIP"
- args:
- - "${es.ip}"
- - name: "withIndexPort"
- args:
- - ${es.port}
- - name: "withClusterName"
- args:
- - "${es.clustername}"
- - name: "withIndexName"
- args:
- - "error"
- - name: "withIndexTimestamp"
- args:
- - "yyyy.MM"
- - name: "withDocumentName"
- args:
- - "bro_error"
- - name: "withBulk"
- args:
- - 1
- - name: "withIndexAdapter"
- args:
- - ref: "indexAdapter"
- - name: "withMetricConfiguration"
- args:
- - ref: "metricConfig"
+ className: "org.apache.metron.bolt.ParserBolt"
+ constructorArgs:
+ - "${kafka.zk}"
+ - "yaf"
+ - ref: "parser"
+ - ref: "writer"
streams:
- - name: "spout -> parser"
+ - name: "spout -> bolt"
from: "kafkaSpout"
to: "parserBolt"
grouping:
type: SHUFFLE
- - name: "parser -> indexing"
- from: "parserBolt"
- to: "indexingBolt"
- grouping:
- streamId: "message"
- type: FIELDS
- args: ["key"]
- - name: "parser -> errors"
- from: "parserBolt"
- to: "errorIndexingBolt"
- grouping:
- streamId: "error"
- type: SHUFFLE
- - name: "indexing -> errors"
- from: "indexingBolt"
- to: "errorIndexingBolt"
- grouping:
- streamId: "error"
- type: SHUFFLE
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/9f96399d/metron-streaming/Metron-Topologies/src/main/resources/Metron_Configs/topologies/bro/test.yaml
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Topologies/src/main/resources/Metron_Configs/topologies/bro/test.yaml b/metron-streaming/Metron-Topologies/src/main/resources/Metron_Configs/topologies/bro/test.yaml
new file mode 100644
index 0000000..3bd3eed
--- /dev/null
+++ b/metron-streaming/Metron-Topologies/src/main/resources/Metron_Configs/topologies/bro/test.yaml
@@ -0,0 +1,82 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+name: "bro-test"
+config:
+ topology.workers: 1
+
+
+components:
+ - id: "parser"
+ className: "org.apache.metron.parsing.parsers.BasicBroParser"
+ - id: "writer"
+ className: "org.apache.metron.writer.KafkaWriter"
+ constructorArgs:
+ - "${kafka.broker}"
+ - id: "zkHosts"
+ className: "storm.kafka.ZkHosts"
+ constructorArgs:
+ - "${kafka.zk}"
+ - id: "kafkaConfig"
+ className: "storm.kafka.SpoutConfig"
+ constructorArgs:
+ # zookeeper hosts
+ - ref: "zkHosts"
+ # topic name
+ - "${spout.kafka.topic.bro}"
+ # zk root
+ - ""
+ # id
+ - "${spout.kafka.topic.bro}"
+ properties:
+ - name: "ignoreZkOffsets"
+ value: true
+ - name: "startOffsetTime"
+ value: -2
+ - name: "socketTimeoutMs"
+ value: 1000000
+
+spouts:
+ - id: "testingSpout"
+ className: "org.apache.metron.test.spouts.GenericInternalTestSpout"
+ parallelism: 1
+ configMethods:
+ - name: "withFilename"
+ args:
+ - "SampleInput/YafExampleOutput"
+ - name: "withRepeating"
+ args:
+ - false
+ - id: "kafkaSpout"
+ className: "storm.kafka.KafkaSpout"
+ constructorArgs:
+ - ref: "kafkaConfig"
+
+bolts:
+ - id: "parserBolt"
+ className: "org.apache.metron.bolt.ParserBolt"
+ constructorArgs:
+ - "${kafka.zk}"
+ - "yaf"
+ - ref: "parser"
+ - ref: "writer"
+
+streams:
+ - name: "spout -> bolt"
+ from: "kafkaSpout"
+ to: "parserBolt"
+ grouping:
+ type: SHUFFLE
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/9f96399d/metron-streaming/Metron-Topologies/src/main/resources/Metron_Configs/topologies/enrichment/remote.yaml
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Topologies/src/main/resources/Metron_Configs/topologies/enrichment/remote.yaml b/metron-streaming/Metron-Topologies/src/main/resources/Metron_Configs/topologies/enrichment/remote.yaml
new file mode 100644
index 0000000..8033374
--- /dev/null
+++ b/metron-streaming/Metron-Topologies/src/main/resources/Metron_Configs/topologies/enrichment/remote.yaml
@@ -0,0 +1,331 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+name: "enrichment"
+config:
+ topology.workers: 1
+
+components:
+# Enrichment
+ - id: "jdbcConfig"
+ className: "org.apache.metron.enrichment.adapters.jdbc.MySqlConfig"
+ properties:
+ - name: "host"
+ value: "${mysql.ip}"
+ - name: "port"
+ value: ${mysql.port}
+ - name: "username"
+ value: "${mysql.username}"
+ - name: "password"
+ value: "${mysql.password}"
+ - name: "table"
+ value: "GEO"
+ - id: "geoEnrichmentAdapter"
+ className: "org.apache.metron.enrichment.adapters.geo.GeoAdapter"
+ configMethods:
+ - name: "withJdbcConfig"
+ args:
+ - ref: "jdbcConfig"
+ - id: "geoEnrichment"
+ className: "org.apache.metron.domain.Enrichment"
+ constructorArgs:
+ - "geo"
+ - ref: "geoEnrichmentAdapter"
+ - id: "hostEnrichmentAdapter"
+ className: "org.apache.metron.enrichment.adapters.host.HostFromJSONListAdapter"
+ constructorArgs:
+ - '${org.apache.metron.enrichment.host.known_hosts}'
+ - id: "hostEnrichment"
+ className: "org.apache.metron.domain.Enrichment"
+ constructorArgs:
+ - "host"
+ - ref: "hostEnrichmentAdapter"
+ - id: "enrichments"
+ className: "java.util.ArrayList"
+ configMethods:
+ - name: "add"
+ args:
+ - ref: "geoEnrichment"
+ - name: "add"
+ args:
+ - ref: "hostEnrichment"
+
+# Threat Intel
+ - id: "ipThreatIntelConfig"
+ className: "org.apache.metron.threatintel.ThreatIntelConfig"
+ configMethods:
+ - name: "withProviderImpl"
+ args:
+ - "${hbase.provider.impl}"
+ - name: "withTrackerHBaseTable"
+ args:
+ - "${threat.intel.tracker.table}"
+ - name: "withTrackerHBaseCF"
+ args:
+ - "${threat.intel.tracker.cf}"
+ - name: "withHBaseTable"
+ args:
+ - "${threat.intel.ip.table}"
+ - name: "withHBaseCF"
+ args:
+ - "${threat.intel.ip.cf}"
+ - id: "ipThreatIntelAdapter"
+ className: "org.apache.metron.threatintel.ThreatIntelAdapter"
+ configMethods:
+ - name: "withConfig"
+ args:
+ - ref: "ipThreatIntelConfig"
+ - id: "ipThreatIntelEnrichment"
+ className: "org.apache.metron.domain.Enrichment"
+ constructorArgs:
+ - "ip"
+ - ref: "ipThreatIntelAdapter"
+ - id: "threatIntels"
+ className: "java.util.ArrayList"
+ configMethods:
+ - name: "add"
+ args:
+ - ref: "ipThreatIntelEnrichment"
+
+#indexing
+ - id: "indexWriter"
+ className: "org.apache.metron.writer.ElasticsearchWriter"
+ constructorArgs:
+ - "${es.clustername}"
+ - "${es.ip}"
+ - ${es.port}
+ - "yyyy.MM.dd.hh"
+
+#kafka/zookeeper
+ - id: "zkHosts"
+ className: "storm.kafka.ZkHosts"
+ constructorArgs:
+ - "${kafka.zk}"
+ - id: "kafkaConfig"
+ className: "storm.kafka.SpoutConfig"
+ constructorArgs:
+ # zookeeper hosts
+ - ref: "zkHosts"
+ # topic name
+ - "enrichments"
+ # zk root
+ - ""
+ # id
+ - "enrichments"
+ properties:
+ - name: "ignoreZkOffsets"
+ value: true
+ - name: "startOffsetTime"
+ value: -1
+
+spouts:
+ - id: "testingSpout"
+ className: "org.apache.metron.test.spouts.GenericInternalTestSpout"
+ parallelism: 1
+ configMethods:
+ - name: "withFilename"
+ args:
+ - "SampleInput/YafExampleOutput"
+ - name: "withRepeating"
+ args:
+ - true
+ - id: "kafkaSpout"
+ className: "storm.kafka.KafkaSpout"
+ constructorArgs:
+ - ref: "kafkaConfig"
+bolts:
+# Enrichment Bolts
+ - id: "enrichmentSplitBolt"
+ className: "org.apache.metron.enrichment.bolt.EnrichmentSplitterBolt"
+ constructorArgs:
+ - "${kafka.zk}"
+ configMethods:
+ - name: "withEnrichments"
+ args:
+ - ref: "enrichments"
+ - id: "geoEnrichmentBolt"
+ className: "org.apache.metron.enrichment.bolt.GenericEnrichmentBolt"
+ constructorArgs:
+ - "${kafka.zk}"
+ configMethods:
+ - name: "withEnrichment"
+ args:
+ - ref: "geoEnrichment"
+ - name: "withMaxCacheSize"
+ args: [10000]
+ - name: "withMaxTimeRetain"
+ args: [10]
+ - id: "hostEnrichmentBolt"
+ className: "org.apache.metron.enrichment.bolt.GenericEnrichmentBolt"
+ constructorArgs:
+ - "${kafka.zk}"
+ configMethods:
+ - name: "withEnrichment"
+ args:
+ - ref: "hostEnrichment"
+ - name: "withMaxCacheSize"
+ args: [10000]
+ - name: "withMaxTimeRetain"
+ args: [10]
+ - id: "enrichmentJoinBolt"
+ className: "org.apache.metron.enrichment.bolt.EnrichmentJoinBolt"
+ constructorArgs:
+ - "${kafka.zk}"
+ configMethods:
+ - name: "withEnrichments"
+ args:
+ - ref: "enrichments"
+ - name: "withMaxCacheSize"
+ args: [10000]
+ - name: "withMaxTimeRetain"
+ args: [10]
+
+# Threat Intel Bolts
+ - id: "threatIntelSplitBolt"
+ className: "org.apache.metron.enrichment.bolt.ThreatIntelSplitterBolt"
+ constructorArgs:
+ - "${kafka.zk}"
+ configMethods:
+ - name: "withEnrichments"
+ args:
+ - ref: "threatIntels"
+ - name: "withMessageFieldName"
+ args: ["message"]
+ - id: "ipThreatIntelBolt"
+ className: "org.apache.metron.enrichment.bolt.GenericEnrichmentBolt"
+ constructorArgs:
+ - "${kafka.zk}"
+ configMethods:
+ - name: "withEnrichment"
+ args:
+ - ref: "ipThreatIntelEnrichment"
+ - name: "withMaxCacheSize"
+ args: [10000]
+ - name: "withMaxTimeRetain"
+ args: [10]
+ - id: "threatIntelJoinBolt"
+ className: "org.apache.metron.enrichment.bolt.ThreatIntelJoinBolt"
+ constructorArgs:
+ - "${kafka.zk}"
+ configMethods:
+ - name: "withEnrichments"
+ args:
+ - ref: "threatIntels"
+ - name: "withMaxCacheSize"
+ args: [10000]
+ - name: "withMaxTimeRetain"
+ args: [10]
+# Indexing Bolts
+ - id: "indexingBolt"
+ className: "org.apache.metron.bolt.BulkMessageWriterBolt"
+ constructorArgs:
+ - "${kafka.zk}"
+ configMethods:
+ - name: "withBulkMessageWriter"
+ args:
+ - ref: "indexWriter"
+
+
+streams:
+#parser
+ - name: "spout -> enrichmentSplit"
+ from: "kafkaSpout"
+ to: "enrichmentSplitBolt"
+ grouping:
+ type: SHUFFLE
+
+#enrichment
+ - name: "enrichmentSplit -> host"
+ from: "enrichmentSplitBolt"
+ to: "hostEnrichmentBolt"
+ grouping:
+ streamId: "host"
+ type: FIELDS
+ args: ["key"]
+ - name: "enrichmentSplit -> geo"
+ from: "enrichmentSplitBolt"
+ to: "geoEnrichmentBolt"
+ grouping:
+ streamId: "geo"
+ type: FIELDS
+ args: ["key"]
+ - name: "splitter -> join"
+ from: "enrichmentSplitBolt"
+ to: "enrichmentJoinBolt"
+ grouping:
+ streamId: "message"
+ type: FIELDS
+ args: ["key"]
+ - name: "geo -> join"
+ from: "geoEnrichmentBolt"
+ to: "enrichmentJoinBolt"
+ grouping:
+ streamId: "geo"
+ type: FIELDS
+ args: ["key"]
+ - name: "host -> join"
+ from: "hostEnrichmentBolt"
+ to: "enrichmentJoinBolt"
+ grouping:
+ streamId: "host"
+ type: FIELDS
+ args: ["key"]
+
+#threat intel
+ - name: "enrichmentJoin -> threatSplit"
+ from: "enrichmentJoinBolt"
+ to: "threatIntelSplitBolt"
+ grouping:
+ streamId: "message"
+ type: FIELDS
+ args: ["key"]
+
+ - name: "threatSplit -> ip"
+ from: "threatIntelSplitBolt"
+ to: "ipThreatIntelBolt"
+ grouping:
+ streamId: "ip"
+ type: FIELDS
+ args: ["key"]
+
+ - name: "ip -> join"
+ from: "ipThreatIntelBolt"
+ to: "threatIntelJoinBolt"
+ grouping:
+ streamId: "ip"
+ type: FIELDS
+ args: ["key"]
+ - name: "threatIntelSplit -> threatIntelJoin"
+ from: "threatIntelSplitBolt"
+ to: "threatIntelJoinBolt"
+ grouping:
+ streamId: "message"
+ type: FIELDS
+ args: ["key"]
+#indexing
+ - name: "threatIntelJoin -> indexing"
+ from: "threatIntelJoinBolt"
+ to: "indexingBolt"
+ grouping:
+ streamId: "message"
+ type: FIELDS
+ args: ["key"]
+ - name: "indexingBolt -> errorIndexingBolt"
+ from: "indexingBolt"
+ to: "indexingBolt"
+ grouping:
+ streamId: "error"
+ type: SHUFFLE