You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@metron.apache.org by ni...@apache.org on 2019/07/01 12:25:57 UTC
[metron] branch master updated: METRON-2164 Remove the Split-Join
Enrichment Topology (nickwallen) closes apache/metron#1448
This is an automated email from the ASF dual-hosted git repository.
nickallen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/metron.git
The following commit(s) were added to refs/heads/master by this push:
new 02f88a3 METRON-2164 Remove the Split-Join Enrichment Topology (nickwallen) closes apache/metron#1448
02f88a3 is described below
commit 02f88a313068a876e20227a2347cedafb4cec0ae
Author: nickwallen <ni...@nickallen.org>
AuthorDate: Mon Jul 1 08:25:35 2019 -0400
METRON-2164 Remove the Split-Join Enrichment Topology (nickwallen) closes apache/metron#1448
---
Upgrading.md | 5 +
metron-deployment/packaging/ambari/.gitignore | 2 -
.../packaging/ambari/metron-mpack/pom.xml | 6 +-
.../configuration/metron-enrichment-env.xml | 82 ---
.../CURRENT/package/scripts/enrichment_commands.py | 15 +-
.../CURRENT/package/scripts/enrichment_master.py | 9 +-
.../CURRENT/package/scripts/params/params_linux.py | 12 -
.../METRON/CURRENT/themes/metron_theme.json | 63 +--
.../packaging/docker/rpm-docker/SPECS/metron.spec | 6 +-
metron-platform/Performance-tuning-guide.md | 10 +-
.../metron-enrichment-storm/README.md | 15 +-
.../metron-enrichment-storm/enrichment_arch.png | Bin 113606 -> 0 bytes
.../main/config/enrichment-splitjoin.properties | 63 ---
.../main/config/enrichment-splitjoin.properties.j2 | 63 ---
...nt-unified.properties => enrichment.properties} | 0
...fied.properties.j2 => enrichment.properties.j2} | 0
.../src/main/flux/enrichment/remote-splitjoin.yaml | 593 ---------------------
.../{remote-unified.yaml => remote.yaml} | 0
.../metron/enrichment/bolt/EnrichmentJoinBolt.java | 122 -----
.../enrichment/bolt/EnrichmentSplitterBolt.java | 184 -------
.../apache/metron/enrichment/bolt/JoinBolt.java | 189 -------
.../apache/metron/enrichment/bolt/SplitBolt.java | 113 ----
.../enrichment/bolt/ThreatIntelJoinBolt.java | 123 -----
.../enrichment/bolt/ThreatIntelSplitterBolt.java | 67 ---
.../src/main/scripts/start_enrichment_topology.sh | 7 +-
.../enrichment/bolt/EnrichmentJoinBoltTest.java | 96 ----
.../bolt/EnrichmentSplitterBoltTest.java | 106 ----
.../metron/enrichment/bolt/JoinBoltTest.java | 193 -------
.../metron/enrichment/bolt/SplitBoltTest.java | 123 -----
.../enrichment/bolt/ThreatIntelJoinBoltTest.java | 222 --------
.../bolt/ThreatIntelSplitterBoltTest.java | 46 --
.../integration/EnrichmentIntegrationTest.java | 87 +--
.../UnifiedEnrichmentIntegrationTest.java | 96 ----
33 files changed, 70 insertions(+), 2648 deletions(-)
diff --git a/Upgrading.md b/Upgrading.md
index d59aa57..66fe9c3 100644
--- a/Upgrading.md
+++ b/Upgrading.md
@@ -19,6 +19,11 @@ limitations under the License.
This document constitutes a per-version listing of changes of
configuration which are non-backwards compatible.
+## 0.7.1 to 0.7.2
+
+### [METRON-2164: Remove the Split-Join Enrichment Topology](https://issues.apache.org/jira/browse/METRON-2164)
+The Split-Join Enrichment topology has been deprecated since November 2018. Metron has defaulted to using the Unified Enrichment topology since that time. All users of the Split-Join Enrichment topology should migrate to the Unified Enrichment topology. Both topologies provide equivalent functionality.
+
## 0.7.0 to 0.7.1
### [METRON-2100: Update developer documentation for full dev management UI parser aggregation feature gap](https://issues.apache.org/jira/browse/METRON-2100)
diff --git a/metron-deployment/packaging/ambari/.gitignore b/metron-deployment/packaging/ambari/.gitignore
index 242a4da..ab5dc85 100644
--- a/metron-deployment/packaging/ambari/.gitignore
+++ b/metron-deployment/packaging/ambari/.gitignore
@@ -4,6 +4,4 @@ elasticsearch.properties.j2
solr.properties.j2
hdfs.properties.j2
enrichment.properties.j2
-enrichment-splitjoin.properties.j2
-enrichment-unified.properties.j2
pcap.properties.j2
diff --git a/metron-deployment/packaging/ambari/metron-mpack/pom.xml b/metron-deployment/packaging/ambari/metron-mpack/pom.xml
index ae62bd8..35bf369 100644
--- a/metron-deployment/packaging/ambari/metron-mpack/pom.xml
+++ b/metron-deployment/packaging/ambari/metron-mpack/pom.xml
@@ -110,8 +110,7 @@
<resource>
<directory>${basedir}/../../../../metron-platform/metron-enrichment/metron-enrichment-storm/src/main/config</directory>
<includes>
- <include>enrichment-splitjoin.properties.j2</include>
- <include>enrichment-unified.properties.j2</include>
+ <include>enrichment.properties.j2</include>
</includes>
<filtering>false</filtering>
</resource>
@@ -185,8 +184,7 @@
<fileset>
<directory>${basedir}/src/main/resources/common-services/METRON/CURRENT/package/templates</directory>
<includes>
- <include>enrichment-unified.properties.j2</include>
- <include>enrichment-splitjoin.properties.j2</include>
+ <include>enrichment.properties.j2</include>
<include>elasticsearch.properties.j2</include>
<include>hdfs.properties.j2</include>
</includes>
diff --git a/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/configuration/metron-enrichment-env.xml b/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/configuration/metron-enrichment-env.xml
index 1fd4702..9fe29e3 100644
--- a/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/configuration/metron-enrichment-env.xml
+++ b/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/configuration/metron-enrichment-env.xml
@@ -193,88 +193,6 @@
<empty-value-valid>true</empty-value-valid>
</value-attributes>
</property>
- <property>
- <name>enrichment_topology</name>
- <description>Which Enrichment topology to execute. Note: Split-Join is deprecated in favor of the Unified topology.</description>
- <value>Unified</value>
- <display-name>Enrichment Topology</display-name>
- <value-attributes>
- <type>value-list</type>
- <entries>
- <entry>
- <value>Unified</value>
- </entry>
- <entry>
- <value>Split-Join</value>
- </entry>
- </entries>
- <selection-cardinality>1</selection-cardinality>
- </value-attributes>
- </property>
-
- <!--
- split-join topology parameters
- -->
- <property>
- <name>enrichment_join_cache_size</name>
- <description>Enrichment join bolt max cache size for the Split Join Enrichment Topology</description>
- <value>100000</value>
- <display-name>Enrichment Join Max Cache Size</display-name>
- </property>
- <property>
- <name>threatintel_join_cache_size</name>
- <description>Threat Intel join bolt max cache size for the Split Join Enrichment Topology</description>
- <value>100000</value>
- <display-name>Threat Intel Join Max Cache Size</display-name>
- </property>
- <property>
- <name>enrichment_kafka_spout_parallelism</name>
- <description>Kafka spout parallelism for the Split Join Enrichment Topology</description>
- <value>1</value>
- <display-name>Enrichment Spout Parallelism</display-name>
- </property>
- <property>
- <name>enrichment_split_parallelism</name>
- <description>Enrichment split bolt parallelism for the Split Join Enrichment Topology</description>
- <value>1</value>
- <display-name>Enrichment Split Parallelism</display-name>
- </property>
- <property>
- <name>enrichment_stellar_parallelism</name>
- <description>Stellar enrichment bolt parallelism for the Split Join Enrichment Topology</description>
- <value>1</value>
- <display-name>Stellar Enrichment Parallelism</display-name>
- </property>
- <property>
- <name>enrichment_join_parallelism</name>
- <description>Enrichment join bolt parallelism for the Split Join Enrichment Topology</description>
- <value>1</value>
- <display-name>Enrichment Join Parallelism</display-name>
- </property>
- <property>
- <name>threat_intel_split_parallelism</name>
- <description>Threat Intel split bolt parallelism for the Split Join Enrichment Topology</description>
- <value>1</value>
- <display-name>Threat Intel Split Parallelism</display-name>
- </property>
- <property>
- <name>threat_intel_stellar_parallelism</name>
- <description>Threat Intel stellar bolt parallelism for the Split Join Enrichment Topology</description>
- <value>1</value>
- <display-name>Threat Intel Stellar Parallelism</display-name>
- </property>
- <property>
- <name>threat_intel_join_parallelism</name>
- <description>Threat Intel join bolt parallelism for the Split Join Enrichment Topology</description>
- <value>1</value>
- <display-name>Threat Intel Join Parallelism</display-name>
- </property>
- <property>
- <name>kafka_writer_parallelism</name>
- <description>Kafka writer parallelism for the Split Join Enrichment Topology</description>
- <value>1</value>
- <display-name>Enrichment Kafka Writer Parallelism</display-name>
- </property>
<!--
unified topology parameters
diff --git a/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/enrichment_commands.py b/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/enrichment_commands.py
index 5d82c8c..80cc1fa 100755
--- a/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/enrichment_commands.py
+++ b/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/enrichment_commands.py
@@ -146,20 +146,9 @@ class EnrichmentCommands:
def start_enrichment_topology(self, env):
Logger.info("Starting Metron enrichment topology: {0}".format(self.__enrichment_topology))
-
if not self.is_topology_active(env):
-
- # which enrichment topology needs started?
- if self.__params.enrichment_topology == "Unified":
- topology_flux = "{0}/flux/enrichment/remote-unified.yaml".format(self.__params.metron_home)
- topology_props = "{0}/config/enrichment-unified.properties".format(self.__params.metron_home)
- elif self.__params.enrichment_topology == "Split-Join":
- topology_flux = "{0}/flux/enrichment/remote-splitjoin.yaml".format(self.__params.metron_home)
- topology_props = "{0}/config/enrichment-splitjoin.properties".format(self.__params.metron_home)
- else:
- raise Fail("Unexpected enrichment topology; name=" + self.__params.enrichment_topology)
-
- # start the topology
+ topology_flux = "{0}/flux/enrichment/remote.yaml".format(self.__params.metron_home)
+ topology_props = "{0}/config/enrichment.properties".format(self.__params.metron_home)
start_cmd_template = """{0}/bin/start_enrichment_topology.sh --remote {1} --filter {2}"""
Logger.info('Starting ' + self.__enrichment_topology)
start_cmd = start_cmd_template.format(self.__params.metron_home, topology_flux, topology_props)
diff --git a/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/enrichment_master.py b/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/enrichment_master.py
index ff89b6b..cd54a6d 100755
--- a/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/enrichment_master.py
+++ b/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/enrichment_master.py
@@ -38,16 +38,11 @@ class Enrichment(Script):
env.set_params(params)
Logger.info("Running enrichment configure")
- File(format("{metron_config_path}/enrichment-splitjoin.properties"),
- content=Template("enrichment-splitjoin.properties.j2"),
+ File(format("{metron_config_path}/enrichment.properties"),
+ content=Template("enrichment.properties.j2"),
owner=params.metron_user,
group=params.metron_group)
- File(format("{metron_config_path}/enrichment-unified.properties"),
- content=Template("enrichment-unified.properties.j2"),
- owner=params.metron_user,
- group=params.metron_group)
-
if not metron_service.is_zk_configured(params):
metron_service.init_zk_config(params)
metron_service.set_zk_configured(params)
diff --git a/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/params/params_linux.py b/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/params/params_linux.py
index a7f20fc..09205bc 100755
--- a/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/params/params_linux.py
+++ b/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/params/params_linux.py
@@ -345,18 +345,6 @@ enrichment_topology_worker_childopts += config['configurations']['metron-enrichm
enrichment_topology_max_spout_pending = config['configurations']['metron-enrichment-env']['enrichment_topology_max_spout_pending']
enrichment_topology = config['configurations']['metron-enrichment-env']['enrichment_topology']
-# Enrichment - Split Join topology
-enrichment_join_cache_size = config['configurations']['metron-enrichment-env']['enrichment_join_cache_size']
-threatintel_join_cache_size = config['configurations']['metron-enrichment-env']['threatintel_join_cache_size']
-enrichment_kafka_spout_parallelism = config['configurations']['metron-enrichment-env']['enrichment_kafka_spout_parallelism']
-enrichment_split_parallelism = config['configurations']['metron-enrichment-env']['enrichment_split_parallelism']
-enrichment_stellar_parallelism = config['configurations']['metron-enrichment-env']['enrichment_stellar_parallelism']
-enrichment_join_parallelism = config['configurations']['metron-enrichment-env']['enrichment_join_parallelism']
-threat_intel_split_parallelism = config['configurations']['metron-enrichment-env']['threat_intel_split_parallelism']
-threat_intel_stellar_parallelism = config['configurations']['metron-enrichment-env']['threat_intel_stellar_parallelism']
-threat_intel_join_parallelism = config['configurations']['metron-enrichment-env']['threat_intel_join_parallelism']
-kafka_writer_parallelism = config['configurations']['metron-enrichment-env']['kafka_writer_parallelism']
-
# Enrichment - Unified topology
unified_kafka_spout_parallelism = config['configurations']['metron-enrichment-env']['unified_kafka_spout_parallelism']
unified_enrichment_parallelism = config['configurations']['metron-enrichment-env']['unified_enrichment_parallelism']
diff --git a/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/themes/metron_theme.json b/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/themes/metron_theme.json
index a9b7322..41fd044 100644
--- a/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/themes/metron_theme.json
+++ b/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/themes/metron_theme.json
@@ -65,7 +65,7 @@
"display-name": "Enrichment",
"layout": {
"tab-columns": "1",
- "tab-rows": "5",
+ "tab-rows": "4",
"sections": [
{
"name": "section-enrichment-adapters",
@@ -135,26 +135,7 @@
"subsections": [
{
"name": "subsection-enrichment-unified",
- "display-name": "Unified Topology",
- "row-index": "0",
- "column-index": "0",
- "row-span": "1",
- "column-span": "1"
- }
- ]
- },
- {
- "name": "section-enrichment-splitjoin",
- "row-index": "4",
- "column-index": "0",
- "row-span": "1",
- "column-span": "1",
- "section-columns": "1",
- "section-rows": "1",
- "subsections": [
- {
- "name": "subsection-enrichment-splitjoin",
- "display-name": "Split Join Topology",
+ "display-name": "Topology",
"row-index": "0",
"column-index": "0",
"row-span": "1",
@@ -611,46 +592,6 @@
"subsection-name": "subsection-enrichment-storm"
},
{
- "config": "metron-enrichment-env/enrichment_join_cache_size",
- "subsection-name": "subsection-enrichment-splitjoin"
- },
- {
- "config": "metron-enrichment-env/threatintel_join_cache_size",
- "subsection-name": "subsection-enrichment-splitjoin"
- },
- {
- "config": "metron-enrichment-env/enrichment_kafka_spout_parallelism",
- "subsection-name": "subsection-enrichment-splitjoin"
- },
- {
- "config": "metron-enrichment-env/enrichment_split_parallelism",
- "subsection-name": "subsection-enrichment-splitjoin"
- },
- {
- "config": "metron-enrichment-env/enrichment_stellar_parallelism",
- "subsection-name": "subsection-enrichment-splitjoin"
- },
- {
- "config": "metron-enrichment-env/enrichment_join_parallelism",
- "subsection-name": "subsection-enrichment-splitjoin"
- },
- {
- "config": "metron-enrichment-env/threat_intel_split_parallelism",
- "subsection-name": "subsection-enrichment-splitjoin"
- },
- {
- "config": "metron-enrichment-env/threat_intel_stellar_parallelism",
- "subsection-name": "subsection-enrichment-splitjoin"
- },
- {
- "config": "metron-enrichment-env/threat_intel_join_parallelism",
- "subsection-name": "subsection-enrichment-splitjoin"
- },
- {
- "config": "metron-enrichment-env/kafka_writer_parallelism",
- "subsection-name": "subsection-enrichment-splitjoin"
- },
- {
"config": "metron-enrichment-env/unified_kafka_spout_parallelism",
"subsection-name": "subsection-enrichment-unified"
},
diff --git a/metron-deployment/packaging/docker/rpm-docker/SPECS/metron.spec b/metron-deployment/packaging/docker/rpm-docker/SPECS/metron.spec
index 8cd9bef..e12f6cd 100644
--- a/metron-deployment/packaging/docker/rpm-docker/SPECS/metron.spec
+++ b/metron-deployment/packaging/docker/rpm-docker/SPECS/metron.spec
@@ -388,10 +388,8 @@ This package installs the Metron Enrichment Storm files
%dir %{metron_home}/flux
%dir %{metron_home}/flux/enrichment
%{metron_home}/bin/start_enrichment_topology.sh
-%{metron_home}/config/enrichment-splitjoin.properties
-%{metron_home}/config/enrichment-unified.properties
-%{metron_home}/flux/enrichment/remote-splitjoin.yaml
-%{metron_home}/flux/enrichment/remote-unified.yaml
+%{metron_home}/config/enrichment.properties
+%{metron_home}/flux/enrichment/remote.yaml
%attr(0644,root,root) %{metron_home}/lib/metron-enrichment-storm-%{full_version}-uber.jar
# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
diff --git a/metron-platform/Performance-tuning-guide.md b/metron-platform/Performance-tuning-guide.md
index 5203d5f..8829abb 100644
--- a/metron-platform/Performance-tuning-guide.md
+++ b/metron-platform/Performance-tuning-guide.md
@@ -188,15 +188,14 @@ See more detail on starting parsers [here](https://github.com/apache/metron/blob
**Enrichment**
-__Note__ These recommendations are based on the deprecated split-join enrichment topology. See [Enrichment Performance](metron-enrichment/Performance.md) for tuning recommendations for the new default unified enrichment topology.
+See [Enrichment Performance](metron-enrichment/Performance.md) for tuning recommendations for the enrichment topology.
This is a mapping of the various performance tuning properties for enrichments and how they are materialized.
-Flux file found here - $METRON_HOME/flux/enrichment/remote-splitjoin.yaml
+Flux file found here - $METRON_HOME/flux/enrichment/remote.yaml
_Note 1:_ Changes to Flux file properties that are managed by Ambari will render Ambari unable to further manage the property.
-_Note 2:_ Many of these settings will be irrelevant in the alternate non-split-join topology
| Category | Ambari Property Name | enrichment.properties property | Flux Property | Flux Section Location | Storm Property Name | Notes |
|-----------------------------|--------------------------------------------|--------------------------------------------------------|--------------------------------------------------------|-------------------------------------|---------------------------------|----------------------------------------|
@@ -209,11 +208,6 @@ _Note 2:_ Many of these settings will be irrelevant in the alternate non-split-j
| | n/a | n/a | setPollTimeoutMs | line 230, id: kafkaConfig | n/a | Kafka consumer client property |
| | n/a | n/a | setMaxUncommittedOffsets | line 230, id: kafkaConfig | n/a | Kafka consumer client property |
| | n/a | n/a | setOffsetCommitPeriodMs | line 230, id: kafkaConfig | n/a | Kafka consumer client property |
-| Enrichment splitter | enrichment_split_parallelism | enrichment.split.parallelism | parallelism | line 253, id: enrichmentSplitBolt | n/a | |
-| Enrichment joiner | enrichment_join_parallelism | enrichment.join.parallelism | parallelism | line 316, id: enrichmentJoinBolt | n/a | |
-| Threat intel splitter | threat_intel_split_parallelism | threat.intel.split.parallelism | parallelism | line 338, id: threatIntelSplitBolt | n/a | |
-| Threat intel joiner | threat_intel_join_parallelism | threat.intel.join.parallelism | parallelism | line 376, id: threatIntelJoinBolt | n/a | |
-| Output bolt | kafka_writer_parallelism | kafka.writer.parallelism | parallelism | line 397, id: outputBolt | n/a | |
When adding Kafka spout properties, there are 3 ways you'll do this.
diff --git a/metron-platform/metron-enrichment/metron-enrichment-storm/README.md b/metron-platform/metron-enrichment/metron-enrichment-storm/README.md
index 0766a29..47d5c35 100644
--- a/metron-platform/metron-enrichment/metron-enrichment-storm/README.md
+++ b/metron-platform/metron-enrichment/metron-enrichment-storm/README.md
@@ -43,7 +43,7 @@ to split across multiple bolts).
There are two parameters which you might want to tune in this topology.
Both of them are topology configuration adjustable in the flux file
-`$METRON_HOME/config/flux/enrichment/remote-unified.yaml`:
+`$METRON_HOME/config/flux/enrichment/remote.yaml`:
* `metron.threadpool.size` : The size of the threadpool. This can take a number or a multiple of the number of cores (e.g. `5C` to 5 times the number of cores). The default is `2C`.
* `metron.threadpool.type` : The type of threadpool. (note: descriptions taken from [here](https://zeroturnaround.com/rebellabs/fixedthreadpool-cachedthreadpool-or-forkjoinpool-picking-correct-java-executors-for-background-tasks/)).
* `FIXED` is a fixed threadpool of size `n`. `n` threads will process tasks at the time, when the pool is saturated, new tasks will get added to a queue without a limit on size. Good for CPU intensive tasks. This is the default.
@@ -54,19 +54,6 @@ intel bolt, the configurations will be taken from the respective join bolt
parallelism. When proper ambari support for this is added, we will add
its own property.
-### Split-Join Enrichment Topology
-
-The now-deprecated split/join topology is also available and performs enrichments in parallel.
-This poses some issues in terms of ease of tuning and reasoning about performance.
-
-![Architecture](enrichment_arch.png)
-
-#### Using It
-
-In order to use the older, deprecated topology, you will need to
-* Edit `$METRON_HOME/bin/start_enrichment_topology.sh` and adjust it to use `remote-splitjoin.yaml` instead of `remote-unified.yaml`
-* Restart the enrichment topology.
-
## Enrichment Configuration
The configuration for the `enrichment` topology, the topology primarily
diff --git a/metron-platform/metron-enrichment/metron-enrichment-storm/enrichment_arch.png b/metron-platform/metron-enrichment/metron-enrichment-storm/enrichment_arch.png
deleted file mode 100644
index 3b8bcdb..0000000
Binary files a/metron-platform/metron-enrichment/metron-enrichment-storm/enrichment_arch.png and /dev/null differ
diff --git a/metron-platform/metron-enrichment/metron-enrichment-storm/src/main/config/enrichment-splitjoin.properties b/metron-platform/metron-enrichment/metron-enrichment-storm/src/main/config/enrichment-splitjoin.properties
deleted file mode 100644
index 109c2ee..0000000
--- a/metron-platform/metron-enrichment/metron-enrichment-storm/src/main/config/enrichment-splitjoin.properties
+++ /dev/null
@@ -1,63 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-##### Storm #####
-enrichment.workers=1
-enrichment.acker.executors=0
-topology.worker.childopts=
-topology.auto-credentials=
-topology.max.spout.pending=500
-
-##### Kafka #####
-kafka.zk=node1:2181
-kafka.broker=node1:6667
-kafka.security.protocol=PLAINTEXT
-
-# One of EARLIEST, LATEST, UNCOMMITTED_EARLIEST, UNCOMMITTED_LATEST
-kafka.start=UNCOMMITTED_EARLIEST
-
-enrichment.input.topic=enrichments
-enrichment.output.topic=indexing
-enrichment.error.topic=indexing
-threat.intel.error.topic=indexing
-
-##### JoinBolt #####
-enrichment.join.cache.size=100000
-threat.intel.join.cache.size=100000
-
-##### Enrichment #####
-hbase.provider.impl=org.apache.metron.hbase.HTableProvider
-enrichment.simple.hbase.table=enrichment
-enrichment.simple.hbase.cf=t
-enrichment.host.known_hosts=[{"ip":"10.1.128.236", "local":"YES", "type":"webserver", "asset_value" : "important"},\
-{"ip":"10.1.128.237", "local":"UNKNOWN", "type":"unknown", "asset_value" : "important"},\
-{"ip":"10.60.10.254", "local":"YES", "type":"printer", "asset_value" : "important"}]
-
-##### Threat Intel #####
-threat.intel.tracker.table=access_tracker
-threat.intel.tracker.cf=t
-threat.intel.simple.hbase.table=threatintel
-threat.intel.simple.hbase.cf=t
-
-##### Parallelism #####
-kafka.spout.parallelism=1
-enrichment.split.parallelism=1
-enrichment.stellar.parallelism=1
-enrichment.join.parallelism=1
-threat.intel.split.parallelism=1
-threat.intel.stellar.parallelism=1
-threat.intel.join.parallelism=1
-kafka.writer.parallelism=1
diff --git a/metron-platform/metron-enrichment/metron-enrichment-storm/src/main/config/enrichment-splitjoin.properties.j2 b/metron-platform/metron-enrichment/metron-enrichment-storm/src/main/config/enrichment-splitjoin.properties.j2
deleted file mode 100755
index a0b21c9..0000000
--- a/metron-platform/metron-enrichment/metron-enrichment-storm/src/main/config/enrichment-splitjoin.properties.j2
+++ /dev/null
@@ -1,63 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#}
-
-##### Storm #####
-enrichment.workers={{enrichment_workers}}
-enrichment.acker.executors={{enrichment_acker_executors}}
-topology.worker.childopts={{enrichment_topology_worker_childopts}}
-topology.auto-credentials={{topology_auto_credentials}}
-topology.max.spout.pending={{enrichment_topology_max_spout_pending}}
-
-##### Kafka #####
-kafka.zk={{zookeeper_quorum}}
-kafka.broker={{kafka_brokers}}
-kafka.security.protocol={{kafka_security_protocol}}
-
-# One of EARLIEST, LATEST, UNCOMMITTED_EARLIEST, UNCOMMITTED_LATEST
-kafka.start={{enrichment_kafka_start}}
-
-enrichment.input.topic={{enrichment_input_topic}}
-enrichment.output.topic={{enrichment_output_topic}}
-enrichment.error.topic={{enrichment_error_topic}}
-threat.intel.error.topic={{threatintel_error_topic}}
-
-##### JoinBolt #####
-enrichment.join.cache.size={{enrichment_join_cache_size}}
-threat.intel.join.cache.size={{threatintel_join_cache_size}}
-
-##### Enrichment #####
-hbase.provider.impl={{enrichment_hbase_provider_impl}}
-enrichment.simple.hbase.table={{enrichment_hbase_table}}
-enrichment.simple.hbase.cf={{enrichment_hbase_cf}}
-enrichment.host.known_hosts={{enrichment_host_known_hosts}}
-
-##### Threat Intel #####
-threat.intel.tracker.table={{threatintel_hbase_table}}
-threat.intel.tracker.cf={{threatintel_hbase_cf}}
-threat.intel.simple.hbase.table={{threatintel_hbase_table}}
-threat.intel.simple.hbase.cf={{threatintel_hbase_cf}}
-
-##### Parallelism #####
-kafka.spout.parallelism={{enrichment_kafka_spout_parallelism}}
-enrichment.split.parallelism={{enrichment_split_parallelism}}
-enrichment.stellar.parallelism={{enrichment_stellar_parallelism}}
-enrichment.join.parallelism={{enrichment_join_parallelism}}
-threat.intel.split.parallelism={{threat_intel_split_parallelism}}
-threat.intel.stellar.parallelism={{threat_intel_stellar_parallelism}}
-threat.intel.join.parallelism={{threat_intel_join_parallelism}}
-kafka.writer.parallelism={{kafka_writer_parallelism}}
diff --git a/metron-platform/metron-enrichment/metron-enrichment-storm/src/main/config/enrichment-unified.properties b/metron-platform/metron-enrichment/metron-enrichment-storm/src/main/config/enrichment.properties
similarity index 100%
rename from metron-platform/metron-enrichment/metron-enrichment-storm/src/main/config/enrichment-unified.properties
rename to metron-platform/metron-enrichment/metron-enrichment-storm/src/main/config/enrichment.properties
diff --git a/metron-platform/metron-enrichment/metron-enrichment-storm/src/main/config/enrichment-unified.properties.j2 b/metron-platform/metron-enrichment/metron-enrichment-storm/src/main/config/enrichment.properties.j2
similarity index 100%
rename from metron-platform/metron-enrichment/metron-enrichment-storm/src/main/config/enrichment-unified.properties.j2
rename to metron-platform/metron-enrichment/metron-enrichment-storm/src/main/config/enrichment.properties.j2
diff --git a/metron-platform/metron-enrichment/metron-enrichment-storm/src/main/flux/enrichment/remote-splitjoin.yaml b/metron-platform/metron-enrichment/metron-enrichment-storm/src/main/flux/enrichment/remote-splitjoin.yaml
deleted file mode 100644
index 8a1bba1..0000000
--- a/metron-platform/metron-enrichment/metron-enrichment-storm/src/main/flux/enrichment/remote-splitjoin.yaml
+++ /dev/null
@@ -1,593 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-name: "enrichment"
-config:
- topology.workers: ${enrichment.workers}
- topology.acker.executors: ${enrichment.acker.executors}
- topology.worker.childopts: ${topology.worker.childopts}
- topology.auto-credentials: ${topology.auto-credentials}
- topology.max.spout.pending: ${topology.max.spout.pending}
-
-components:
-
-# Enrichment
- - id: "stellarEnrichmentAdapter"
- className: "org.apache.metron.enrichment.adapters.stellar.StellarAdapter"
- configMethods:
- - name: "ofType"
- args:
- - "ENRICHMENT"
-
- # Any kafka props for the producer go here.
- - id: "kafkaWriterProps"
- className: "java.util.HashMap"
- configMethods:
- - name: "put"
- args:
- - "security.protocol"
- - "${kafka.security.protocol}"
-
- - id: "stellarEnrichment"
- className: "org.apache.metron.enrichment.configuration.Enrichment"
- constructorArgs:
- - "stellar"
- - ref: "stellarEnrichmentAdapter"
-
- - id: "geoEnrichmentAdapter"
- className: "org.apache.metron.enrichment.adapters.geo.GeoAdapter"
- - id: "geoEnrichment"
- className: "org.apache.metron.enrichment.configuration.Enrichment"
- constructorArgs:
- - "geo"
- - ref: "geoEnrichmentAdapter"
- - id: "hostEnrichmentAdapter"
- className: "org.apache.metron.enrichment.adapters.host.HostFromJSONListAdapter"
- constructorArgs:
- - '${enrichment.host.known_hosts}'
- - id: "hostEnrichment"
- className: "org.apache.metron.enrichment.configuration.Enrichment"
- constructorArgs:
- - "host"
- - ref: "hostEnrichmentAdapter"
-
- - id: "simpleHBaseEnrichmentConfig"
- className: "org.apache.metron.enrichment.adapters.simplehbase.SimpleHBaseConfig"
- configMethods:
- - name: "withProviderImpl"
- args:
- - "${hbase.provider.impl}"
- - name: "withHBaseTable"
- args:
- - "${enrichment.simple.hbase.table}"
- - name: "withHBaseCF"
- args:
- - "${enrichment.simple.hbase.cf}"
- - id: "simpleHBaseEnrichmentAdapter"
- className: "org.apache.metron.enrichment.adapters.simplehbase.SimpleHBaseAdapter"
- configMethods:
- - name: "withConfig"
- args:
- - ref: "simpleHBaseEnrichmentConfig"
- - id: "simpleHBaseEnrichment"
- className: "org.apache.metron.enrichment.configuration.Enrichment"
- constructorArgs:
- - "hbaseEnrichment"
- - ref: "simpleHBaseEnrichmentAdapter"
- - id: "enrichments"
- className: "java.util.ArrayList"
- configMethods:
- - name: "add"
- args:
- - ref: "geoEnrichment"
- - name: "add"
- args:
- - ref: "hostEnrichment"
- - name: "add"
- args:
- - ref: "simpleHBaseEnrichment"
- - name: "add"
- args:
- - ref: "stellarEnrichment"
-
- #enrichment error
- - id: "enrichmentErrorKafkaWriter"
- className: "org.apache.metron.writer.kafka.KafkaWriter"
- configMethods:
- - name: "withTopic"
- args:
- - "${enrichment.error.topic}"
- - name: "withZkQuorum"
- args:
- - "${kafka.zk}"
- - name: "withProducerConfigs"
- args:
- - ref: "kafkaWriterProps"
-
-# Threat Intel
- - id: "stellarThreatIntelAdapter"
- className: "org.apache.metron.enrichment.adapters.stellar.StellarAdapter"
- configMethods:
- - name: "ofType"
- args:
- - "THREAT_INTEL"
- - id: "stellarThreatIntelEnrichment"
- className: "org.apache.metron.enrichment.configuration.Enrichment"
- constructorArgs:
- - "stellar"
- - ref: "stellarThreatIntelAdapter"
- - id: "simpleHBaseThreatIntelConfig"
- className: "org.apache.metron.enrichment.adapters.threatintel.ThreatIntelConfig"
- configMethods:
- - name: "withProviderImpl"
- args:
- - "${hbase.provider.impl}"
- - name: "withTrackerHBaseTable"
- args:
- - "${threat.intel.tracker.table}"
- - name: "withTrackerHBaseCF"
- args:
- - "${threat.intel.tracker.cf}"
- - name: "withHBaseTable"
- args:
- - "${threat.intel.simple.hbase.table}"
- - name: "withHBaseCF"
- args:
- - "${threat.intel.simple.hbase.cf}"
- - id: "simpleHBaseThreatIntelAdapter"
- className: "org.apache.metron.enrichment.adapters.threatintel.ThreatIntelAdapter"
- configMethods:
- - name: "withConfig"
- args:
- - ref: "simpleHBaseThreatIntelConfig"
- - id: "simpleHBaseThreatIntelEnrichment"
- className: "org.apache.metron.enrichment.configuration.Enrichment"
- constructorArgs:
- - "hbaseThreatIntel"
- - ref: "simpleHBaseThreatIntelAdapter"
-
- - id: "threatIntels"
- className: "java.util.ArrayList"
- configMethods:
- - name: "add"
- args:
- - ref: "simpleHBaseThreatIntelEnrichment"
- - name: "add"
- args:
- - ref: "stellarThreatIntelEnrichment"
-
- #threatintel error
- - id: "threatIntelErrorKafkaWriter"
- className: "org.apache.metron.writer.kafka.KafkaWriter"
- configMethods:
- - name: "withTopic"
- args:
- - "${threat.intel.error.topic}"
- - name: "withZkQuorum"
- args:
- - "${kafka.zk}"
- - name: "withProducerConfigs"
- args:
- - ref: "kafkaWriterProps"
-#indexing
- - id: "kafkaWriter"
- className: "org.apache.metron.writer.kafka.KafkaWriter"
- configMethods:
- - name: "withTopic"
- args:
- - "${enrichment.output.topic}"
- - name: "withZkQuorum"
- args:
- - "${kafka.zk}"
- - name: "withProducerConfigs"
- args:
- - ref: "kafkaWriterProps"
-
-#kafka/zookeeper
- # Any kafka props for the consumer go here.
- - id: "kafkaProps"
- className: "java.util.HashMap"
- configMethods:
- - name: "put"
- args:
- - "value.deserializer"
- - "org.apache.kafka.common.serialization.ByteArrayDeserializer"
- - name: "put"
- args:
- - "key.deserializer"
- - "org.apache.kafka.common.serialization.ByteArrayDeserializer"
- - name: "put"
- args:
- - "group.id"
- - "enrichments"
- - name: "put"
- args:
- - "security.protocol"
- - "${kafka.security.protocol}"
-
-
- # The fields to pull out of the kafka messages
- - id: "fields"
- className: "java.util.ArrayList"
- configMethods:
- - name: "add"
- args:
- - "value"
-
- - id: "kafkaConfig"
- className: "org.apache.metron.storm.kafka.flux.SimpleStormKafkaBuilder"
- constructorArgs:
- - ref: "kafkaProps"
- # topic name
- - "${enrichment.input.topic}"
- - "${kafka.zk}"
- - ref: "fields"
- configMethods:
- - name: "setFirstPollOffsetStrategy"
- args:
- - "${kafka.start}"
-
-
-spouts:
- - id: "kafkaSpout"
- className: "org.apache.metron.storm.kafka.flux.StormKafkaSpout"
- constructorArgs:
- - ref: "kafkaConfig"
- parallelism: ${kafka.spout.parallelism}
-
-bolts:
-# Enrichment Bolts
- - id: "enrichmentSplitBolt"
- className: "org.apache.metron.enrichment.bolt.EnrichmentSplitterBolt"
- constructorArgs:
- - "${kafka.zk}"
- configMethods:
- - name: "withEnrichments"
- args:
- - ref: "enrichments"
- parallelism: ${enrichment.split.parallelism}
-
- - id: "geoEnrichmentBolt"
- className: "org.apache.metron.enrichment.bolt.GenericEnrichmentBolt"
- constructorArgs:
- - "${kafka.zk}"
- configMethods:
- - name: "withEnrichment"
- args:
- - ref: "geoEnrichment"
- - name: "withMaxCacheSize"
- args: [10000]
- - name: "withMaxTimeRetain"
- args: [10]
-
- - id: "stellarEnrichmentBolt"
- className: "org.apache.metron.enrichment.bolt.GenericEnrichmentBolt"
- constructorArgs:
- - "${kafka.zk}"
- configMethods:
- - name: "withEnrichment"
- args:
- - ref: "stellarEnrichment"
- - name: "withMaxCacheSize"
- args: [10000]
- - name: "withMaxTimeRetain"
- args: [10]
- parallelism: ${enrichment.stellar.parallelism}
-
- - id: "hostEnrichmentBolt"
- className: "org.apache.metron.enrichment.bolt.GenericEnrichmentBolt"
- constructorArgs:
- - "${kafka.zk}"
- configMethods:
- - name: "withEnrichment"
- args:
- - ref: "hostEnrichment"
- - name: "withMaxCacheSize"
- args: [10000]
- - name: "withMaxTimeRetain"
- args: [10]
-
- - id: "simpleHBaseEnrichmentBolt"
- className: "org.apache.metron.enrichment.bolt.GenericEnrichmentBolt"
- constructorArgs:
- - "${kafka.zk}"
- configMethods:
- - name: "withEnrichment"
- args:
- - ref: "simpleHBaseEnrichment"
- - name: "withMaxCacheSize"
- args: [10000]
- - name: "withMaxTimeRetain"
- args: [10]
-
- - id: "enrichmentJoinBolt"
- className: "org.apache.metron.enrichment.bolt.EnrichmentJoinBolt"
- constructorArgs:
- - "${kafka.zk}"
- configMethods:
- - name: "withMaxCacheSize"
- args: [${enrichment.join.cache.size}]
- - name: "withMaxTimeRetain"
- args: [10]
- parallelism: ${enrichment.join.parallelism}
-
- - id: "enrichmentErrorOutputBolt"
- className: "org.apache.metron.writer.bolt.BulkMessageWriterBolt"
- constructorArgs:
- - "${kafka.zk}"
- - "ENRICHMENT"
- configMethods:
- - name: "withBulkMessageWriter"
- args:
- - ref: "enrichmentErrorKafkaWriter"
-
-
-# Threat Intel Bolts
- - id: "threatIntelSplitBolt"
- className: "org.apache.metron.enrichment.bolt.ThreatIntelSplitterBolt"
- constructorArgs:
- - "${kafka.zk}"
- configMethods:
- - name: "withEnrichments"
- args:
- - ref: "threatIntels"
- - name: "withMessageFieldName"
- args: ["message"]
- parallelism: ${threat.intel.split.parallelism}
-
- - id: "simpleHBaseThreatIntelBolt"
- className: "org.apache.metron.enrichment.bolt.GenericEnrichmentBolt"
- constructorArgs:
- - "${kafka.zk}"
- configMethods:
- - name: "withEnrichment"
- args:
- - ref: "simpleHBaseThreatIntelEnrichment"
- - name: "withMaxCacheSize"
- args: [10000]
- - name: "withMaxTimeRetain"
- args: [10]
- - id: "stellarThreatIntelBolt"
- className: "org.apache.metron.enrichment.bolt.GenericEnrichmentBolt"
- constructorArgs:
- - "${kafka.zk}"
- configMethods:
- - name: "withEnrichment"
- args:
- - ref: "stellarThreatIntelEnrichment"
- - name: "withMaxCacheSize"
- args: [10000]
- - name: "withMaxTimeRetain"
- args: [10]
- parallelism: ${threat.intel.stellar.parallelism}
-
- - id: "threatIntelJoinBolt"
- className: "org.apache.metron.enrichment.bolt.ThreatIntelJoinBolt"
- constructorArgs:
- - "${kafka.zk}"
- configMethods:
- - name: "withMaxCacheSize"
- args: [${threat.intel.join.cache.size}]
- - name: "withMaxTimeRetain"
- args: [10]
- parallelism: ${threat.intel.join.parallelism}
-
- - id: "threatIntelErrorOutputBolt"
- className: "org.apache.metron.writer.bolt.BulkMessageWriterBolt"
- constructorArgs:
- - "${kafka.zk}"
- - "ENRICHMENT"
- configMethods:
- - name: "withBulkMessageWriter"
- args:
- - ref: "threatIntelErrorKafkaWriter"
-
-# Indexing Bolts
- - id: "outputBolt"
- className: "org.apache.metron.writer.bolt.BulkMessageWriterBolt"
- constructorArgs:
- - "${kafka.zk}"
- - "ENRICHMENT"
- configMethods:
- - name: "withBulkMessageWriter"
- args:
- - ref: "kafkaWriter"
- parallelism: ${kafka.writer.parallelism}
-
-
-streams:
-#parser
- - name: "spout -> enrichmentSplit"
- from: "kafkaSpout"
- to: "enrichmentSplitBolt"
- grouping:
- type: LOCAL_OR_SHUFFLE
-
-#enrichment
- - name: "enrichmentSplit -> host"
- from: "enrichmentSplitBolt"
- to: "hostEnrichmentBolt"
- grouping:
- streamId: "host"
- type: FIELDS
- args: ["message"]
-
- - name: "enrichmentSplit -> geo"
- from: "enrichmentSplitBolt"
- to: "geoEnrichmentBolt"
- grouping:
- streamId: "geo"
- type: FIELDS
- args: ["message"]
-
- - name: "enrichmentSplit -> stellar"
- from: "enrichmentSplitBolt"
- to: "stellarEnrichmentBolt"
- grouping:
- streamId: "stellar"
- type: FIELDS
- args: ["message"]
-
-
- - name: "enrichmentSplit -> simpleHBaseEnrichmentBolt"
- from: "enrichmentSplitBolt"
- to: "simpleHBaseEnrichmentBolt"
- grouping:
- streamId: "hbaseEnrichment"
- type: FIELDS
- args: ["message"]
-
- - name: "splitter -> join"
- from: "enrichmentSplitBolt"
- to: "enrichmentJoinBolt"
- grouping:
- streamId: "message"
- type: FIELDS
- args: ["key"]
-
- - name: "geo -> join"
- from: "geoEnrichmentBolt"
- to: "enrichmentJoinBolt"
- grouping:
- streamId: "geo"
- type: FIELDS
- args: ["key"]
-
- - name: "stellar -> join"
- from: "stellarEnrichmentBolt"
- to: "enrichmentJoinBolt"
- grouping:
- streamId: "stellar"
- type: FIELDS
- args: ["key"]
-
- - name: "simpleHBaseEnrichmentBolt -> join"
- from: "simpleHBaseEnrichmentBolt"
- to: "enrichmentJoinBolt"
- grouping:
- streamId: "hbaseEnrichment"
- type: FIELDS
- args: ["key"]
-
- - name: "host -> join"
- from: "hostEnrichmentBolt"
- to: "enrichmentJoinBolt"
- grouping:
- streamId: "host"
- type: FIELDS
- args: ["key"]
-
- # Error output
- - name: "geoEnrichmentBolt -> enrichmentErrorOutputBolt"
- from: "geoEnrichmentBolt"
- to: "enrichmentErrorOutputBolt"
- grouping:
- streamId: "error"
- type: LOCAL_OR_SHUFFLE
-
- - name: "stellarEnrichmentBolt -> enrichmentErrorOutputBolt"
- from: "stellarEnrichmentBolt"
- to: "enrichmentErrorOutputBolt"
- grouping:
- streamId: "error"
- type: LOCAL_OR_SHUFFLE
-
- - name: "hostEnrichmentBolt -> enrichmentErrorOutputBolt"
- from: "hostEnrichmentBolt"
- to: "enrichmentErrorOutputBolt"
- grouping:
- streamId: "error"
- type: LOCAL_OR_SHUFFLE
-
- - name: "simpleHBaseEnrichmentBolt -> enrichmentErrorOutputBolt"
- from: "simpleHBaseEnrichmentBolt"
- to: "enrichmentErrorOutputBolt"
- grouping:
- streamId: "error"
- type: LOCAL_OR_SHUFFLE
-
-#threat intel
- - name: "enrichmentJoin -> threatSplit"
- from: "enrichmentJoinBolt"
- to: "threatIntelSplitBolt"
- grouping:
- streamId: "message"
- type: FIELDS
- args: ["key"]
-
- - name: "threatSplit -> simpleHBaseThreatIntel"
- from: "threatIntelSplitBolt"
- to: "simpleHBaseThreatIntelBolt"
- grouping:
- streamId: "hbaseThreatIntel"
- type: FIELDS
- args: ["message"]
-
- - name: "threatSplit -> stellarThreatIntel"
- from: "threatIntelSplitBolt"
- to: "stellarThreatIntelBolt"
- grouping:
- streamId: "stellar"
- type: FIELDS
- args: ["message"]
-
-
- - name: "simpleHBaseThreatIntel -> join"
- from: "simpleHBaseThreatIntelBolt"
- to: "threatIntelJoinBolt"
- grouping:
- streamId: "hbaseThreatIntel"
- type: FIELDS
- args: ["key"]
-
- - name: "stellarThreatIntel -> join"
- from: "stellarThreatIntelBolt"
- to: "threatIntelJoinBolt"
- grouping:
- streamId: "stellar"
- type: FIELDS
- args: ["key"]
-
- - name: "threatIntelSplit -> threatIntelJoin"
- from: "threatIntelSplitBolt"
- to: "threatIntelJoinBolt"
- grouping:
- streamId: "message"
- type: FIELDS
- args: ["key"]
-#output
- - name: "threatIntelJoin -> output"
- from: "threatIntelJoinBolt"
- to: "outputBolt"
- grouping:
- streamId: "message"
- type: LOCAL_OR_SHUFFLE
-
- # Error output
- - name: "simpleHBaseThreatIntelBolt -> threatIntelErrorOutputBolt"
- from: "simpleHBaseThreatIntelBolt"
- to: "threatIntelErrorOutputBolt"
- grouping:
- streamId: "error"
- type: LOCAL_OR_SHUFFLE
-
- - name: "stellarThreatIntelBolt -> threatIntelErrorOutputBolt"
- from: "stellarThreatIntelBolt"
- to: "threatIntelErrorOutputBolt"
- grouping:
- streamId: "error"
- type: LOCAL_OR_SHUFFLE
-
diff --git a/metron-platform/metron-enrichment/metron-enrichment-storm/src/main/flux/enrichment/remote-unified.yaml b/metron-platform/metron-enrichment/metron-enrichment-storm/src/main/flux/enrichment/remote.yaml
similarity index 100%
rename from metron-platform/metron-enrichment/metron-enrichment-storm/src/main/flux/enrichment/remote-unified.yaml
rename to metron-platform/metron-enrichment/metron-enrichment-storm/src/main/flux/enrichment/remote.yaml
diff --git a/metron-platform/metron-enrichment/metron-enrichment-storm/src/main/java/org/apache/metron/enrichment/bolt/EnrichmentJoinBolt.java b/metron-platform/metron-enrichment/metron-enrichment-storm/src/main/java/org/apache/metron/enrichment/bolt/EnrichmentJoinBolt.java
deleted file mode 100644
index 671e6b8..0000000
--- a/metron-platform/metron-enrichment/metron-enrichment-storm/src/main/java/org/apache/metron/enrichment/bolt/EnrichmentJoinBolt.java
+++ /dev/null
@@ -1,122 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.metron.enrichment.bolt;
-
-import org.apache.metron.storm.common.message.MessageGetStrategy;
-import org.apache.storm.task.TopologyContext;
-import com.google.common.base.Joiner;
-import org.apache.metron.common.configuration.enrichment.SensorEnrichmentConfig;
-import org.apache.metron.common.configuration.enrichment.handler.ConfigHandler;
-import org.apache.metron.common.utils.MessageUtils;
-import org.apache.storm.tuple.Tuple;
-import org.json.simple.JSONObject;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.*;
-
-public class EnrichmentJoinBolt extends JoinBolt<JSONObject> {
-
- protected static final Logger LOG = LoggerFactory
- .getLogger(EnrichmentJoinBolt.class);
-
- public EnrichmentJoinBolt(String zookeeperUrl) {
- super(zookeeperUrl);
- }
-
- @Override
- public void prepare(Map map, TopologyContext topologyContext) {
-
- }
-
- @Override
- public Set<String> getStreamIds(JSONObject message) {
- Set<String> streamIds = new HashSet<>();
- String sourceType = MessageUtils.getSensorType(message);
- if(sourceType == null) {
- String errorMessage = "Unable to find source type for message: " + message;
- throw new IllegalStateException(errorMessage);
- }
- Map<String, Object> fieldMap = getFieldMap(sourceType);
- Map<String, ConfigHandler> handlerMap = getFieldToHandlerMap(sourceType);
- if(fieldMap != null) {
- for (String enrichmentType : fieldMap.keySet()) {
- ConfigHandler handler = handlerMap.get(enrichmentType);
- List<String> subgroups = handler.getType().getSubgroups(handler.getType().toConfig(handler.getConfig()));
- for(String subgroup : subgroups) {
- streamIds.add(Joiner.on(":").join(enrichmentType, subgroup));
- }
- }
- }
- streamIds.add("message:");
- return streamIds;
- }
-
-
- @Override
- public JSONObject joinMessages(Map<String, Tuple> streamMessageMap, MessageGetStrategy messageGetStrategy) {
- JSONObject message = new JSONObject();
- for (String key : streamMessageMap.keySet()) {
- Tuple tuple = streamMessageMap.get(key);
- JSONObject obj = (JSONObject) messageGetStrategy.get(tuple);
- message.putAll(obj);
- }
- List<Object> emptyKeys = new ArrayList<>();
- for(Object key : message.keySet()) {
- Object value = message.get(key);
- if(value == null || value.toString().length() == 0) {
- emptyKeys.add(key);
- }
- }
- for(Object o : emptyKeys) {
- message.remove(o);
- }
- message.put(getClass().getSimpleName().toLowerCase() + ".joiner.ts", "" + System.currentTimeMillis());
- return message;
- }
-
- protected Map<String, ConfigHandler> getFieldToHandlerMap(String sensorType) {
- if(sensorType != null) {
- SensorEnrichmentConfig config = getConfigurations().getSensorEnrichmentConfig(sensorType);
- if (config != null) {
- return config.getEnrichment().getEnrichmentConfigs();
- } else {
- LOG.debug("Unable to retrieve a sensor enrichment config of {}", sensorType);
- }
- } else {
- LOG.error("Trying to retrieve a field map with sensor type of null");
- }
- return new HashMap<>();
- }
-
- public Map<String, Object> getFieldMap(String sourceType) {
- if(sourceType != null) {
- SensorEnrichmentConfig config = getConfigurations().getSensorEnrichmentConfig(sourceType);
- if (config != null && config.getEnrichment() != null) {
- return config.getEnrichment().getFieldMap();
- }
- else {
- LOG.debug("Unable to retrieve a sensor enrichment config of {}", sourceType);
- }
- }
- else {
- LOG.error("Trying to retrieve a field map with source type of null");
- }
- return null;
- }
-}
diff --git a/metron-platform/metron-enrichment/metron-enrichment-storm/src/main/java/org/apache/metron/enrichment/bolt/EnrichmentSplitterBolt.java b/metron-platform/metron-enrichment/metron-enrichment-storm/src/main/java/org/apache/metron/enrichment/bolt/EnrichmentSplitterBolt.java
deleted file mode 100644
index 3298c76..0000000
--- a/metron-platform/metron-enrichment/metron-enrichment-storm/src/main/java/org/apache/metron/enrichment/bolt/EnrichmentSplitterBolt.java
+++ /dev/null
@@ -1,184 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.metron.enrichment.bolt;
-
-import java.io.UnsupportedEncodingException;
-import java.lang.invoke.MethodHandles;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.UUID;
-import org.apache.metron.common.Constants;
-import org.apache.metron.common.configuration.enrichment.SensorEnrichmentConfig;
-import org.apache.metron.common.configuration.enrichment.handler.ConfigHandler;
-import org.apache.metron.common.utils.MessageUtils;
-import org.apache.metron.enrichment.configuration.Enrichment;
-import org.apache.metron.enrichment.utils.EnrichmentUtils;
-import org.apache.storm.task.TopologyContext;
-import org.apache.storm.topology.OutputFieldsDeclarer;
-import org.apache.storm.tuple.Tuple;
-import org.json.simple.JSONObject;
-import org.json.simple.parser.JSONParser;
-import org.json.simple.parser.ParseException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class EnrichmentSplitterBolt extends SplitBolt<JSONObject> {
-
- protected static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
- private List<Enrichment> enrichments;
- protected String messageFieldName;
- private transient JSONParser parser;
-
-
- public EnrichmentSplitterBolt(String zookeeperUrl) {
- super(zookeeperUrl);
- }
-
- public EnrichmentSplitterBolt withEnrichments(List<Enrichment> enrichments) {
- this.enrichments = enrichments;
- return this;
- }
-
- public EnrichmentSplitterBolt withMessageFieldName(String messageFieldName) {
- this.messageFieldName = messageFieldName;
- return this;
- }
- @Override
- public void prepare(Map map, TopologyContext topologyContext) {
- parser = new JSONParser();
- }
- @Override
- public String getKey(Tuple tuple, JSONObject message) {
- String key = null, guid = null;
- try {
- key = tuple.getStringByField("key");
- guid = (String)message.get(Constants.GUID);
- }
- catch(Throwable t) {
- //swallowing this just in case.
- }
- if(key != null) {
- return key;
- }
- else if(guid != null) {
- return guid;
- }
- else {
- return UUID.randomUUID().toString();
- }
- }
-
- @Override
- public JSONObject generateMessage(Tuple tuple) {
- JSONObject message = null;
- if (messageFieldName == null) {
- byte[] data = tuple.getBinary(0);
- try {
- message = (JSONObject) parser.parse(new String(data, "UTF8"));
- message.put(getClass().getSimpleName().toLowerCase() + ".splitter.begin.ts", "" + System.currentTimeMillis());
- } catch (ParseException | UnsupportedEncodingException e) {
- e.printStackTrace();
- }
- } else {
- message = (JSONObject) tuple.getValueByField(messageFieldName);
- message.put(getClass().getSimpleName().toLowerCase() + ".splitter.begin.ts", "" + System.currentTimeMillis());
- }
- return message;
- }
-
- @Override
- public Set<String> getStreamIds() {
- Set<String> streamIds = new HashSet<>();
- for(Enrichment enrichment: enrichments) {
- streamIds.add(enrichment.getType());
- }
- return streamIds;
- }
-
- @SuppressWarnings("unchecked")
- @Override
- public Map<String, List<JSONObject>> splitMessage(JSONObject message) {
- Map<String, List<JSONObject>> streamMessageMap = new HashMap<>();
- String sensorType = MessageUtils.getSensorType(message);
- Map<String, Object> enrichmentFieldMap = getFieldMap(sensorType);
- Map<String, ConfigHandler> fieldToHandler = getFieldToHandlerMap(sensorType);
- Set<String> enrichmentTypes = new HashSet<>(enrichmentFieldMap.keySet());
- enrichmentTypes.addAll(fieldToHandler.keySet());
- for (String enrichmentType : enrichmentTypes) {
- Object fields = enrichmentFieldMap.get(enrichmentType);
- ConfigHandler retriever = fieldToHandler.get(enrichmentType);
-
- List<JSONObject> enrichmentObject = retriever.getType()
- .splitByFields( message
- , fields
- , field -> getKeyName(enrichmentType, field)
- , retriever
- );
- for(JSONObject eo : enrichmentObject) {
- eo.put(Constants.SENSOR_TYPE, sensorType);
- }
- streamMessageMap.put(enrichmentType, enrichmentObject);
- }
- message.put(getClass().getSimpleName().toLowerCase() + ".splitter.end.ts", "" + System.currentTimeMillis());
- return streamMessageMap;
- }
-
- protected Map<String, ConfigHandler> getFieldToHandlerMap(String sensorType) {
- if(sensorType != null) {
- SensorEnrichmentConfig config = getConfigurations().getSensorEnrichmentConfig(sensorType);
- if (config != null) {
- return config.getEnrichment().getEnrichmentConfigs();
- } else {
- LOG.debug("Unable to retrieve a sensor enrichment config of {}", sensorType);
- }
- } else {
- LOG.error("Trying to retrieve a field map with sensor type of null");
- }
- return new HashMap<>();
- }
- protected Map<String, Object > getFieldMap(String sensorType) {
- if(sensorType != null) {
- SensorEnrichmentConfig config = getConfigurations().getSensorEnrichmentConfig(sensorType);
- if (config != null) {
- return config.getEnrichment().getFieldMap();
- } else {
- LOG.debug("Unable to retrieve a sensor enrichment config of {}", sensorType);
- }
- } else {
- LOG.error("Trying to retrieve a field map with sensor type of null");
- }
- return new HashMap<>();
- }
-
- protected String getKeyName(String type, String field) {
- return EnrichmentUtils.getEnrichmentKey(type, field);
- }
-
- @Override
- public void declareOther(OutputFieldsDeclarer declarer) {
-
- }
-
- @Override
- public void emitOther(Tuple tuple, JSONObject message) {
-
- }
-}
diff --git a/metron-platform/metron-enrichment/metron-enrichment-storm/src/main/java/org/apache/metron/enrichment/bolt/JoinBolt.java b/metron-platform/metron-enrichment/metron-enrichment-storm/src/main/java/org/apache/metron/enrichment/bolt/JoinBolt.java
deleted file mode 100644
index ac6a1cf..0000000
--- a/metron-platform/metron-enrichment/metron-enrichment-storm/src/main/java/org/apache/metron/enrichment/bolt/JoinBolt.java
+++ /dev/null
@@ -1,189 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.metron.enrichment.bolt;
-
-import com.github.benmanes.caffeine.cache.CacheLoader;
-import com.github.benmanes.caffeine.cache.Caffeine;
-import com.github.benmanes.caffeine.cache.LoadingCache;
-import com.github.benmanes.caffeine.cache.RemovalCause;
-import com.github.benmanes.caffeine.cache.RemovalListener;
-import com.google.common.base.Joiner;
-import com.google.common.collect.Sets;
-import java.lang.invoke.MethodHandles;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.TimeUnit;
-import org.apache.metron.common.Constants;
-import org.apache.metron.storm.common.bolt.ConfiguredEnrichmentBolt;
-import org.apache.metron.common.error.MetronError;
-import org.apache.metron.storm.common.message.MessageGetStrategy;
-import org.apache.metron.storm.common.message.MessageGetters;
-import org.apache.metron.common.performance.PerformanceLogger;
-import org.apache.metron.storm.common.utils.StormErrorUtils;
-import org.apache.storm.task.OutputCollector;
-import org.apache.storm.task.TopologyContext;
-import org.apache.storm.topology.OutputFieldsDeclarer;
-import org.apache.storm.tuple.Fields;
-import org.apache.storm.tuple.Tuple;
-import org.apache.storm.tuple.Values;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import javax.annotation.Nonnull;
-import javax.annotation.Nullable;
-
-public abstract class JoinBolt<V> extends ConfiguredEnrichmentBolt {
-
- public static class Perf {} // used for performance logging
- private PerformanceLogger perfLog; // not static bc multiple bolts may exist in same worker
- private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
- protected OutputCollector collector;
-
- protected transient CacheLoader<String, Map<String, Tuple>> loader;
- protected transient LoadingCache<String, Map<String, Tuple>> cache;
- protected transient MessageGetStrategy keyGetStrategy;
- protected transient MessageGetStrategy subgroupGetStrategy;
- protected transient MessageGetStrategy messageGetStrategy;
- protected Long maxCacheSize;
- protected Long maxTimeRetain;
-
- public JoinBolt(String zookeeperUrl) {
- super(zookeeperUrl);
- }
-
- public JoinBolt withMaxCacheSize(long maxCacheSize) {
- this.maxCacheSize = maxCacheSize;
- return this;
- }
-
- public JoinBolt withMaxTimeRetain(long maxTimeRetain) {
- this.maxTimeRetain = maxTimeRetain;
- return this;
- }
-
- @Override
- public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {
- super.prepare(map, topologyContext, outputCollector);
- perfLog = new PerformanceLogger(() -> getConfigurations().getGlobalConfig(), Perf.class.getName());
- keyGetStrategy = MessageGetters.OBJECT_FROM_FIELD.get("key");
- subgroupGetStrategy = MessageGetters.OBJECT_FROM_FIELD.get("subgroup");
- messageGetStrategy = MessageGetters.OBJECT_FROM_FIELD.get("message");
- this.collector = outputCollector;
- if (this.maxCacheSize == null) {
- throw new IllegalStateException("maxCacheSize must be specified");
- }
- if (this.maxTimeRetain == null) {
- throw new IllegalStateException("maxTimeRetain must be specified");
- }
- loader = s -> new HashMap<>();
- cache = Caffeine.newBuilder().maximumSize(maxCacheSize)
- .expireAfterWrite(maxTimeRetain, TimeUnit.MINUTES)
- .removalListener(new JoinRemoveListener())
- .build(loader);
- prepare(map, topologyContext);
- }
-
- class JoinRemoveListener implements RemovalListener<String, Map<String, Tuple>> {
-
- @Override
- public void onRemoval(@Nullable String s, @Nullable Map<String, Tuple> stringTupleMap, @Nonnull RemovalCause removalCause) {
- if (removalCause == RemovalCause.SIZE) {
- String errorMessage = "Join cache reached max size limit. Increase the maxCacheSize setting or add more tasks to enrichment/threatintel join bolt.";
- Exception exception = new Exception(errorMessage);
- LOG.error(errorMessage, exception);
- collector.reportError(exception);
- }
- if (removalCause == RemovalCause.EXPIRED) {
- String errorMessage = "Message was in the join cache too long which may be caused by slow enrichments/threatintels. Increase the maxTimeRetain setting.";
- Exception exception = new Exception(errorMessage);
- LOG.error(errorMessage, exception);
- collector.reportError(exception);
- }
- }
- }
-
- @SuppressWarnings("unchecked")
- @Override
- public void execute(Tuple tuple) {
- perfLog.mark("execute");
- String streamId = tuple.getSourceStreamId();
- String key = (String) keyGetStrategy.get(tuple);
- String subgroup = (String) subgroupGetStrategy.get(tuple);
- streamId = Joiner.on(":").join("" + streamId, subgroup == null?"":subgroup);
- V message = (V) messageGetStrategy.get(tuple);
- try {
- Map<String, Tuple> streamMessageMap = cache.get(key);
- if (streamMessageMap.containsKey(streamId)) {
- LOG.warn("Received key {} twice for stream {}", key, streamId);
- }
- streamMessageMap.put(streamId, tuple);
- Set<String> streamIds = getStreamIds(message);
- Set<String> streamMessageKeys = streamMessageMap.keySet();
- if ( streamMessageKeys.size() == streamIds.size()
- && Sets.symmetricDifference(streamMessageKeys, streamIds)
- .isEmpty()
- ) {
-
- perfLog.mark("join-message");
- V joinedMessages = joinMessages(streamMessageMap, this.messageGetStrategy);
- perfLog.log("join-message", "key={}, elapsed time to join messages", key);
-
- perfLog.mark("emit-message");
- collector.emit("message",
- tuple,
- new Values(key, joinedMessages));
- perfLog.log("emit-message", "key={}, elapsed time to emit messages", key);
-
- cache.invalidate(key);
- Tuple messageTuple = streamMessageMap.get("message:");
- collector.ack(messageTuple);
- LOG.trace("Emitted message for key: {}", key);
- } else {
- cache.put(key, streamMessageMap);
- if(LOG.isDebugEnabled()) {
- LOG.debug("{}: Missed joining portions for {}. Expected {} != {}",
- getClass().getSimpleName(), key, Joiner.on(",").join(streamIds),
- Joiner.on(",").join(streamMessageKeys));
- }
- }
- } catch (Exception e) {
- LOG.error("[Metron] Unable to join messages: {}", message, e);
- MetronError error = new MetronError()
- .withErrorType(Constants.ErrorType.ENRICHMENT_ERROR)
- .withMessage("Joining problem: " + message)
- .withThrowable(e)
- .addRawMessage(message);
- StormErrorUtils.handleError(collector, error);
- collector.ack(tuple);
- }
- perfLog.log("execute", "key={}, elapsed time to run execute", key);
- }
-
- @Override
- public void declareOutputFields(OutputFieldsDeclarer declarer) {
- declarer.declareStream("message", new Fields("key", "message"));
- declarer.declareStream("error", new Fields("message"));
- }
-
- public abstract void prepare(Map map, TopologyContext topologyContext);
-
- public abstract Set<String> getStreamIds(V value);
-
- public abstract V joinMessages(Map<String, Tuple> streamMessageMap, MessageGetStrategy messageGetStrategy);
-}
diff --git a/metron-platform/metron-enrichment/metron-enrichment-storm/src/main/java/org/apache/metron/enrichment/bolt/SplitBolt.java b/metron-platform/metron-enrichment/metron-enrichment-storm/src/main/java/org/apache/metron/enrichment/bolt/SplitBolt.java
deleted file mode 100644
index 48cec0b..0000000
--- a/metron-platform/metron-enrichment/metron-enrichment-storm/src/main/java/org/apache/metron/enrichment/bolt/SplitBolt.java
+++ /dev/null
@@ -1,113 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.metron.enrichment.bolt;
-
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import org.apache.metron.storm.common.bolt.ConfiguredEnrichmentBolt;
-import org.apache.metron.common.performance.PerformanceLogger;
-import org.apache.storm.task.OutputCollector;
-import org.apache.storm.task.TopologyContext;
-import org.apache.storm.topology.OutputFieldsDeclarer;
-import org.apache.storm.tuple.Fields;
-import org.apache.storm.tuple.Tuple;
-import org.apache.storm.tuple.Values;
-
-public abstract class SplitBolt<T extends Cloneable> extends ConfiguredEnrichmentBolt {
- public static class Perf {} // used for performance logging
- private PerformanceLogger perfLog; // not static bc multiple bolts may exist in same worker
-
- protected OutputCollector collector;
-
- public SplitBolt(String zookeeperUrl) {
- super(zookeeperUrl);
- }
-
- @Override
- public final void prepare(Map map, TopologyContext topologyContext,
- OutputCollector outputCollector) {
- super.prepare(map, topologyContext, outputCollector);
- perfLog = new PerformanceLogger(() -> getConfigurations().getGlobalConfig(), Perf.class.getName());
- collector = outputCollector;
- prepare(map, topologyContext);
- }
-
- @Override
- public final void execute(Tuple tuple) {
- emit(tuple, generateMessage(tuple));
- }
-
- @Override
- public final void declareOutputFields(OutputFieldsDeclarer declarer) {
- declarer.declareStream("message", new Fields("key", "message", "subgroup"));
- for (String streamId : getStreamIds()) {
- declarer.declareStream(streamId, new Fields("key", "message"));
- }
- declarer.declareStream("error", new Fields("message"));
- declareOther(declarer);
- }
-
- public void emit(Tuple tuple, T message) {
- perfLog.mark("emit");
- if (message == null) return;
- String key = getKey(tuple, message);
-
- perfLog.mark("split-message");
- Map<String, List<T>> streamMessageMap = splitMessage(message);
- perfLog.log("split-message", "key={}, elapsed time to split message", key);
-
- for (String streamId : streamMessageMap.keySet()) {
- List<T> streamMessages = streamMessageMap.get(streamId);
- if (streamMessages != null) {
- for (T streamMessage : streamMessages) {
- if (streamMessage == null) {
- streamMessage = getDefaultMessage(streamId);
- }
- collector.emit(streamId, new Values(key, streamMessage));
- }
- } else {
- throw new IllegalArgumentException("Enrichment must send some list of messages, not null.");
- }
- }
- collector.emit("message", tuple, new Values(key, message, ""));
- collector.ack(tuple);
- emitOther(tuple, message);
- perfLog.log("emit", "key={}, elapsed time to run emit", key);
- }
-
- protected T getDefaultMessage(String streamId) {
- throw new IllegalArgumentException("Could not find a message for stream: " + streamId);
- }
-
- public abstract void prepare(Map map, TopologyContext topologyContext);
-
- public abstract Set<String> getStreamIds();
-
- public abstract String getKey(Tuple tuple, T message);
-
- public abstract T generateMessage(Tuple tuple);
-
- public abstract Map<String, List<T>> splitMessage(T message);
-
- public abstract void declareOther(OutputFieldsDeclarer declarer);
-
- public abstract void emitOther(Tuple tuple, T message);
-
-
-}
diff --git a/metron-platform/metron-enrichment/metron-enrichment-storm/src/main/java/org/apache/metron/enrichment/bolt/ThreatIntelJoinBolt.java b/metron-platform/metron-enrichment/metron-enrichment-storm/src/main/java/org/apache/metron/enrichment/bolt/ThreatIntelJoinBolt.java
deleted file mode 100644
index 2b19375..0000000
--- a/metron-platform/metron-enrichment/metron-enrichment-storm/src/main/java/org/apache/metron/enrichment/bolt/ThreatIntelJoinBolt.java
+++ /dev/null
@@ -1,123 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.metron.enrichment.bolt;
-
-import java.lang.invoke.MethodHandles;
-import java.util.HashMap;
-import java.util.Map;
-import org.apache.metron.common.configuration.ConfigurationType;
-import org.apache.metron.common.configuration.enrichment.SensorEnrichmentConfig;
-import org.apache.metron.common.configuration.enrichment.handler.ConfigHandler;
-import org.apache.metron.storm.common.message.MessageGetStrategy;
-import org.apache.metron.common.utils.MessageUtils;
-import org.apache.metron.enrichment.adapters.maxmind.asn.GeoLiteAsnDatabase;
-import org.apache.metron.enrichment.adapters.maxmind.geo.GeoLiteCityDatabase;
-import org.apache.metron.enrichment.utils.ThreatIntelUtils;
-import org.apache.metron.stellar.dsl.Context;
-import org.apache.metron.stellar.dsl.StellarFunctions;
-import org.apache.metron.stellar.dsl.functions.resolver.FunctionResolver;
-import org.apache.storm.task.TopologyContext;
-import org.apache.storm.tuple.Tuple;
-import org.json.simple.JSONObject;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class ThreatIntelJoinBolt extends EnrichmentJoinBolt {
-
- protected static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
-
-
- /**
- * The Stellar function resolver.
- */
- private FunctionResolver functionResolver;
-
- /**
- * The execution context for Stellar.
- */
- private Context stellarContext;
-
- public ThreatIntelJoinBolt(String zookeeperUrl) {
- super(zookeeperUrl);
- }
-
- @Override
- protected Map<String, ConfigHandler> getFieldToHandlerMap(String sensorType) {
- if(sensorType != null) {
- SensorEnrichmentConfig config = getConfigurations().getSensorEnrichmentConfig(sensorType);
- if (config != null) {
- return config.getThreatIntel().getEnrichmentConfigs();
- } else {
- LOG.debug("Unable to retrieve a sensor enrichment config of {}", sensorType);
- }
- } else {
- LOG.error("Trying to retrieve a field map with sensor type of null");
- }
- return new HashMap<>();
- }
-
- @Override
- public void prepare(Map map, TopologyContext topologyContext) {
- super.prepare(map, topologyContext);
- GeoLiteCityDatabase.INSTANCE.update((String)getConfigurations().getGlobalConfig().get(
- GeoLiteCityDatabase.GEO_HDFS_FILE));
- GeoLiteAsnDatabase.INSTANCE.update((String)getConfigurations().getGlobalConfig().get(
- GeoLiteAsnDatabase.ASN_HDFS_FILE));
- initializeStellar();
- }
-
- protected void initializeStellar() {
- this.stellarContext = new Context.Builder()
- .with(Context.Capabilities.ZOOKEEPER_CLIENT, () -> client)
- .with(Context.Capabilities.GLOBAL_CONFIG, () -> getConfigurations().getGlobalConfig())
- .with(Context.Capabilities.STELLAR_CONFIG, () -> getConfigurations().getGlobalConfig())
- .build();
- StellarFunctions.initialize(stellarContext);
- this.functionResolver = StellarFunctions.FUNCTION_RESOLVER();
- }
-
- @Override
- public Map<String, Object> getFieldMap(String sourceType) {
- SensorEnrichmentConfig config = getConfigurations().getSensorEnrichmentConfig(sourceType);
- if(config != null) {
- return config.getThreatIntel().getFieldMap();
- }
- else {
- LOG.debug("Unable to retrieve sensor config: {}", sourceType);
- return null;
- }
- }
-
-
- @Override
- public JSONObject joinMessages(Map<String, Tuple> streamMessageMap, MessageGetStrategy messageGetStrategy) {
- JSONObject ret = super.joinMessages(streamMessageMap, messageGetStrategy);
- String sourceType = MessageUtils.getSensorType(ret);
- return ThreatIntelUtils.triage(ret, getConfigurations().getSensorEnrichmentConfig(sourceType), functionResolver, stellarContext);
- }
-
- @Override
- public void reloadCallback(String name, ConfigurationType type) {
- super.reloadCallback(name, type);
- if(type == ConfigurationType.GLOBAL) {
- GeoLiteCityDatabase.INSTANCE.updateIfNecessary(getConfigurations().getGlobalConfig());
- }
- }
-
-
-}
diff --git a/metron-platform/metron-enrichment/metron-enrichment-storm/src/main/java/org/apache/metron/enrichment/bolt/ThreatIntelSplitterBolt.java b/metron-platform/metron-enrichment/metron-enrichment-storm/src/main/java/org/apache/metron/enrichment/bolt/ThreatIntelSplitterBolt.java
deleted file mode 100644
index 76c65c6..0000000
--- a/metron-platform/metron-enrichment/metron-enrichment-storm/src/main/java/org/apache/metron/enrichment/bolt/ThreatIntelSplitterBolt.java
+++ /dev/null
@@ -1,67 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.metron.enrichment.bolt;
-
-import org.apache.metron.common.configuration.enrichment.SensorEnrichmentConfig;
-import org.apache.metron.common.configuration.enrichment.handler.ConfigHandler;
-import org.apache.metron.enrichment.utils.ThreatIntelUtils;
-
-import java.util.HashMap;
-import java.util.Map;
-
-public class ThreatIntelSplitterBolt extends EnrichmentSplitterBolt {
-
- public ThreatIntelSplitterBolt(String zookeeperUrl) {
- super(zookeeperUrl);
- }
-
- @Override
- protected Map<String, ConfigHandler> getFieldToHandlerMap(String sensorType) {
- if(sensorType != null) {
- SensorEnrichmentConfig config = getConfigurations().getSensorEnrichmentConfig(sensorType);
- if (config != null) {
- return config.getThreatIntel().getEnrichmentConfigs();
- } else {
- LOG.debug("Unable to retrieve a sensor config of {}", sensorType);
- }
- } else {
- LOG.error("Trying to retrieve a field map with sensor type of null");
- }
- return new HashMap<>();
- }
-
- @Override
- protected Map<String, Object> getFieldMap(String sensorType) {
- if (sensorType != null) {
- SensorEnrichmentConfig config = getConfigurations().getSensorEnrichmentConfig(sensorType);
- if (config != null) {
- return config.getThreatIntel().getFieldMap();
- } else {
- LOG.debug("Unable to retrieve sensor config: {}", sensorType);
- }
- } else {
- LOG.error("Trying to retrieve a field map with sensor type of null");
- }
- return new HashMap<>();
- }
-
- @Override
- protected String getKeyName(String type, String field) {
- return ThreatIntelUtils.getThreatIntelKey(type, field);
- }
-}
diff --git a/metron-platform/metron-enrichment/metron-enrichment-storm/src/main/scripts/start_enrichment_topology.sh b/metron-platform/metron-enrichment/metron-enrichment-storm/src/main/scripts/start_enrichment_topology.sh
index d3ed8ad..d9c2dc4 100755
--- a/metron-platform/metron-enrichment/metron-enrichment-storm/src/main/scripts/start_enrichment_topology.sh
+++ b/metron-platform/metron-enrichment/metron-enrichment-storm/src/main/scripts/start_enrichment_topology.sh
@@ -19,12 +19,9 @@
METRON_VERSION=${project.version}
METRON_HOME=/usr/metron/$METRON_VERSION
TOPOLOGY_JAR=${project.artifactId}-$METRON_VERSION-uber.jar
-
-# There are two enrichment topologies. By default, the unified enrichment topology is executed. Split-join is now deprecated.
-SPLIT_JOIN_ARGS="--remote $METRON_HOME/flux/enrichment/remote-splitjoin.yaml --filter $METRON_HOME/config/enrichment-splitjoin.properties"
-UNIFIED_ARGS="--remote $METRON_HOME/flux/enrichment/remote-unified.yaml --filter $METRON_HOME/config/enrichment-unified.properties"
+DEFAULT_ARGS="--remote $METRON_HOME/flux/enrichment/remote.yaml --filter $METRON_HOME/config/enrichment.properties"
# by passing in different args, the user can execute an alternative enrichment topology
-ARGS=${@:-$UNIFIED_ARGS}
+ARGS=${@:-$DEFAULT_ARGS}
storm jar $METRON_HOME/lib/$TOPOLOGY_JAR org.apache.storm.flux.Flux $ARGS
diff --git a/metron-platform/metron-enrichment/metron-enrichment-storm/src/test/java/org/apache/metron/enrichment/bolt/EnrichmentJoinBoltTest.java b/metron-platform/metron-enrichment/metron-enrichment-storm/src/test/java/org/apache/metron/enrichment/bolt/EnrichmentJoinBoltTest.java
deleted file mode 100644
index ed623f3..0000000
--- a/metron-platform/metron-enrichment/metron-enrichment-storm/src/test/java/org/apache/metron/enrichment/bolt/EnrichmentJoinBoltTest.java
+++ /dev/null
@@ -1,96 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.metron.enrichment.bolt;
-
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
-import java.io.FileInputStream;
-import java.io.IOException;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Set;
-import org.adrianwalker.multilinestring.Multiline;
-import org.apache.metron.storm.common.message.MessageGetStrategy;
-import org.apache.metron.test.bolt.BaseEnrichmentBoltTest;
-import org.apache.storm.tuple.Tuple;
-import org.json.simple.JSONObject;
-import org.json.simple.parser.JSONParser;
-import org.json.simple.parser.ParseException;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Test;
-
-public class EnrichmentJoinBoltTest extends BaseEnrichmentBoltTest {
-
- private static final String enrichmentConfigPath = "../" + sampleSensorEnrichmentConfigPath;
-
- /**
- * {
- * "enrichedField": "enrichedValue",
- * "emptyEnrichedField": ""
- * }
- */
- @Multiline
- private String enrichedMessageString;
-
- /**
- * {
- * "ip_src_addr": "ip1",
- * "ip_dst_addr": "ip2",
- * "source.type": "test",
- * "enrichedField": "enrichedValue"
- * }
- */
- @Multiline
- private String expectedJoinedMessageString;
-
- private JSONObject enrichedMessage;
- private JSONObject expectedJoinedMessage;
-
- @Before
- public void parseMessages() throws ParseException {
- JSONParser parser = new JSONParser();
- enrichedMessage = (JSONObject) parser.parse(enrichedMessageString);
- expectedJoinedMessage = (JSONObject) parser.parse(expectedJoinedMessageString);
- }
-
- @Test
- public void test() throws IOException {
- EnrichmentJoinBolt enrichmentJoinBolt = new EnrichmentJoinBolt("zookeeperUrl");
- enrichmentJoinBolt.setCuratorFramework(client);
- enrichmentJoinBolt.setZKCache(cache);
- enrichmentJoinBolt.getConfigurations().updateSensorEnrichmentConfig(sensorType, new FileInputStream(enrichmentConfigPath));
- enrichmentJoinBolt.withMaxCacheSize(100);
- enrichmentJoinBolt.withMaxTimeRetain(10000);
- enrichmentJoinBolt.prepare(new HashMap<>(), topologyContext, outputCollector);
- Set<String> actualStreamIds = enrichmentJoinBolt.getStreamIds(sampleMessage);
- Assert.assertEquals(joinStreamIds, actualStreamIds);
- Map<String, Tuple> streamMessageMap = new HashMap<>();
- MessageGetStrategy messageGetStrategy = mock(MessageGetStrategy.class);
- Tuple sampleTuple = mock(Tuple.class);
- when(messageGetStrategy.get(sampleTuple)).thenReturn(sampleMessage);
- Tuple enrichedTuple = mock(Tuple.class);
- when(messageGetStrategy.get(enrichedTuple)).thenReturn(enrichedMessage);
- streamMessageMap.put("message", sampleTuple);
- streamMessageMap.put("enriched", enrichedTuple);
- JSONObject joinedMessage = enrichmentJoinBolt.joinMessages(streamMessageMap, messageGetStrategy);
- removeTimingFields(joinedMessage);
- Assert.assertEquals(expectedJoinedMessage, joinedMessage);
- }
-}
diff --git a/metron-platform/metron-enrichment/metron-enrichment-storm/src/test/java/org/apache/metron/enrichment/bolt/EnrichmentSplitterBoltTest.java b/metron-platform/metron-enrichment/metron-enrichment-storm/src/test/java/org/apache/metron/enrichment/bolt/EnrichmentSplitterBoltTest.java
deleted file mode 100644
index 1e9177b..0000000
--- a/metron-platform/metron-enrichment/metron-enrichment-storm/src/test/java/org/apache/metron/enrichment/bolt/EnrichmentSplitterBoltTest.java
+++ /dev/null
@@ -1,106 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.metron.enrichment.bolt;
-
-import static org.mockito.Mockito.when;
-
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableSet;
-import java.io.FileInputStream;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import org.apache.metron.enrichment.configuration.Enrichment;
-import org.apache.metron.test.bolt.BaseEnrichmentBoltTest;
-import org.json.simple.JSONObject;
-import org.json.simple.parser.ParseException;
-import org.junit.Assert;
-import org.junit.Test;
-
-
-public class EnrichmentSplitterBoltTest extends BaseEnrichmentBoltTest {
-
- private static final String enrichmentConfigPath = "../" + sampleSensorEnrichmentConfigPath;
-
- @Test
- public void test() throws ParseException, IOException {
- final Enrichment geo = new Enrichment();
- geo.setType("geo");
- final Enrichment host = new Enrichment();
- host.setType("host");
- final Enrichment hbaseEnrichment = new Enrichment();
- hbaseEnrichment.setType("hbaseEnrichment");
- final Enrichment stellarEnrichment = new Enrichment();
- stellarEnrichment.setType("stellar");
- List<Enrichment> enrichments = new ArrayList<Enrichment>() {{
- add(geo);
- add(host);
- add(hbaseEnrichment);
- add(stellarEnrichment);
- }};
-
- EnrichmentSplitterBolt enrichmentSplitterBolt = new EnrichmentSplitterBolt("zookeeperUrl").withEnrichments(enrichments);
- enrichmentSplitterBolt.setCuratorFramework(client);
- enrichmentSplitterBolt.setZKCache(cache);
- enrichmentSplitterBolt.getConfigurations().updateSensorEnrichmentConfig(sensorType, new FileInputStream(enrichmentConfigPath));
- enrichmentSplitterBolt.prepare(new HashMap<>(), topologyContext, outputCollector);
-
- String key = enrichmentSplitterBolt.getKey(tuple, sampleMessage);
- Assert.assertTrue(key != null && key.length() == 36);
- String someKey = "someKey";
- when(tuple.getStringByField("key")).thenReturn(someKey);
- key = enrichmentSplitterBolt.getKey(tuple, sampleMessage);
- Assert.assertEquals(someKey, key);
- String guid = "sample-guid";
- when(sampleMessage.get("guid")).thenReturn(guid);
- key = enrichmentSplitterBolt.getKey(tuple, sampleMessage);
- Assert.assertEquals(guid, key);
- when(tuple.getBinary(0)).thenReturn(sampleMessageString.getBytes());
- JSONObject generatedMessage = enrichmentSplitterBolt.generateMessage(tuple);
- removeTimingFields(generatedMessage);
- Assert.assertEquals(sampleMessage, generatedMessage);
- String messageFieldName = "messageFieldName";
- enrichmentSplitterBolt.withMessageFieldName(messageFieldName);
- when(tuple.getValueByField(messageFieldName)).thenReturn(sampleMessage);
- generatedMessage = enrichmentSplitterBolt.generateMessage(tuple);
- Assert.assertEquals(sampleMessage, generatedMessage);
- Set<String> actualStreamIds = enrichmentSplitterBolt.getStreamIds();
- Assert.assertEquals(streamIds, actualStreamIds);
-
- Map<String, List<JSONObject> > actualSplitMessages = enrichmentSplitterBolt.splitMessage(sampleMessage);
- Assert.assertEquals(enrichments.size(), actualSplitMessages.size());
- Assert.assertEquals(ImmutableList.of(geoMessage), actualSplitMessages.get("geo"));
- Assert.assertEquals(ImmutableList.of(hostMessage), actualSplitMessages.get("host"));
- Assert.assertEquals(ImmutableList.of(hbaseEnrichmentMessage), actualSplitMessages.get("hbaseEnrichment"));
-
-
- }
-
- @Override
- public void removeTimingFields(JSONObject message) {
- ImmutableSet keys = ImmutableSet.copyOf(message.keySet());
- for(Object key: keys) {
- if (key.toString().contains("splitter.begin.ts")) {
- message.remove(key);
- }
- }
- }
-}
diff --git a/metron-platform/metron-enrichment/metron-enrichment-storm/src/test/java/org/apache/metron/enrichment/bolt/JoinBoltTest.java b/metron-platform/metron-enrichment/metron-enrichment-storm/src/test/java/org/apache/metron/enrichment/bolt/JoinBoltTest.java
deleted file mode 100644
index 8bdf409..0000000
--- a/metron-platform/metron-enrichment/metron-enrichment-storm/src/test/java/org/apache/metron/enrichment/bolt/JoinBoltTest.java
+++ /dev/null
@@ -1,193 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.metron.enrichment.bolt;
-
-import com.github.benmanes.caffeine.cache.LoadingCache;
-import org.adrianwalker.multilinestring.Multiline;
-import org.apache.metron.common.Constants;
-import org.apache.metron.common.error.MetronError;
-import org.apache.metron.storm.common.message.MessageGetStrategy;
-import org.apache.metron.test.bolt.BaseEnrichmentBoltTest;
-import org.apache.metron.test.error.MetronErrorJSONMatcher;
-import org.apache.storm.task.TopologyContext;
-import org.apache.storm.tuple.Tuple;
-import org.apache.storm.tuple.Values;
-import org.json.simple.JSONObject;
-import org.json.simple.parser.JSONParser;
-import org.json.simple.parser.ParseException;
-import org.junit.Before;
-import org.junit.Test;
-
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.ExecutionException;
-
-import static org.junit.Assert.fail;
-import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.argThat;
-import static org.mockito.Matchers.eq;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.verifyNoMoreInteractions;
-import static org.mockito.Mockito.when;
-
-public class JoinBoltTest extends BaseEnrichmentBoltTest {
-
- public class StandAloneJoinBolt extends JoinBolt<JSONObject> {
-
- public StandAloneJoinBolt(String zookeeperUrl) {
- super(zookeeperUrl);
- }
-
- @Override
- public void prepare(Map map, TopologyContext topologyContext) {
-
- }
-
- @Override
- public Set<String> getStreamIds(JSONObject value) {
- HashSet<String> ret = new HashSet<>();
- for(String s : streamIds) {
- ret.add(s + ":");
- }
- ret.add("message:");
- return ret;
- }
-
- @Override
- public JSONObject joinMessages(Map<String, Tuple> streamMessageMap, MessageGetStrategy messageGetStrategy) {
- return joinedMessage;
- }
- }
-
- /**
- {
- "joinField": "joinValue"
- }
- */
- @Multiline
- private String joinedMessageString;
-
- private JSONObject joinedMessage;
- private JoinBolt<JSONObject> joinBolt;
-
- @Before
- public void parseMessages() {
- JSONParser parser = new JSONParser();
- try {
- joinedMessage = (JSONObject) parser.parse(joinedMessageString);
- } catch (ParseException e) {
- e.printStackTrace();
- }
- joinBolt = new StandAloneJoinBolt("zookeeperUrl");
- joinBolt.setCuratorFramework(client);
- joinBolt.setZKCache(cache);
- }
-
- @Test
- public void testPrepare() {
- try {
- joinBolt.prepare(new HashMap(), topologyContext, outputCollector);
- fail("Should fail if a maxCacheSize property is not set");
- } catch(IllegalStateException e) {}
- joinBolt.withMaxCacheSize(100);
- try {
- joinBolt.prepare(new HashMap(), topologyContext, outputCollector);
- fail("Should fail if a maxTimeRetain property is not set");
- } catch(IllegalStateException e) {}
- joinBolt.withMaxTimeRetain(10000);
- joinBolt.prepare(new HashMap(), topologyContext, outputCollector);
- }
-
- @Test
- public void testDeclareOutputFields() {
- joinBolt.declareOutputFields(declarer);
- verify(declarer, times(1)).declareStream(eq("message"), argThat(new FieldsMatcher("key", "message")));
- verify(declarer, times(1)).declareStream(eq("error"), argThat(new FieldsMatcher("message")));
- verifyNoMoreInteractions(declarer);
- }
-
- @Test
- public void testExecute() {
- joinBolt.withMaxCacheSize(100);
- joinBolt.withMaxTimeRetain(10000);
- joinBolt.prepare(new HashMap(), topologyContext, outputCollector);
-
- Tuple geoTuple = mock(Tuple.class);
- when(geoTuple.getValueByField("key")).thenReturn(key);
- when(geoTuple.getSourceStreamId()).thenReturn("geo");
- when(geoTuple.getValueByField("message")).thenReturn(geoMessage);
- joinBolt.execute(geoTuple);
-
- Tuple messageTuple = mock(Tuple.class);
- when(messageTuple.getValueByField("key")).thenReturn(key);
- when(messageTuple.getSourceStreamId()).thenReturn("message");
- when(messageTuple.getValueByField("message")).thenReturn(sampleMessage);
- joinBolt.execute(messageTuple);
-
- Tuple hostTuple = mock(Tuple.class);
- when(hostTuple.getValueByField("key")).thenReturn(key);
- when(hostTuple.getSourceStreamId()).thenReturn("host");
- when(hostTuple.getValueByField("message")).thenReturn(hostMessage);
- joinBolt.execute(hostTuple);
-
- Tuple hbaseEnrichmentTuple = mock(Tuple.class);
- when(hbaseEnrichmentTuple.getValueByField("key")).thenReturn(key);
- when(hbaseEnrichmentTuple.getSourceStreamId()).thenReturn("hbaseEnrichment");
- when(hbaseEnrichmentTuple.getValueByField("message")).thenReturn(hbaseEnrichmentMessage);
- joinBolt.execute(hbaseEnrichmentTuple);
-
- Tuple stellarTuple = mock(Tuple.class);
- when(stellarTuple.getValueByField("key")).thenReturn(key);
- when(stellarTuple.getSourceStreamId()).thenReturn("stellar");
- when(stellarTuple.getValueByField("message")).thenReturn(new JSONObject());
- joinBolt.execute(stellarTuple);
-
- verify(outputCollector, times(1)).emit(eq("message"), any(tuple.getClass()), eq(new Values(key, joinedMessage)));
- verify(outputCollector, times(1)).ack(messageTuple);
-
- verifyNoMoreInteractions(outputCollector);
- }
-
- @SuppressWarnings("unchecked")
- @Test
- public void testExecuteShouldReportError() throws ExecutionException {
- joinBolt.withMaxCacheSize(100);
- joinBolt.withMaxTimeRetain(10000);
- joinBolt.prepare(new HashMap(), topologyContext, outputCollector);
- when(tuple.getValueByField("key")).thenReturn(key);
- when(tuple.getValueByField("message")).thenReturn(new JSONObject());
- joinBolt.cache = mock(LoadingCache.class);
- when(joinBolt.cache.get(any())).thenThrow(new RuntimeException(new Exception("join exception")));
-
- joinBolt.execute(tuple);
- RuntimeException expectedExecutionException = new RuntimeException(new Exception("join exception"));
- MetronError error = new MetronError()
- .withErrorType(Constants.ErrorType.ENRICHMENT_ERROR)
- .withMessage("Joining problem: {}")
- .withThrowable(expectedExecutionException)
- .addRawMessage(new JSONObject());
- verify(outputCollector, times(1)).emit(eq(Constants.ERROR_STREAM), argThat(new MetronErrorJSONMatcher(error.getJSONObject())));
- verify(outputCollector, times(1)).reportError(any(ExecutionException.class));
- verify(outputCollector, times(1)).ack(eq(tuple));
- verifyNoMoreInteractions(outputCollector);
- }
-}
diff --git a/metron-platform/metron-enrichment/metron-enrichment-storm/src/test/java/org/apache/metron/enrichment/bolt/SplitBoltTest.java b/metron-platform/metron-enrichment/metron-enrichment-storm/src/test/java/org/apache/metron/enrichment/bolt/SplitBoltTest.java
deleted file mode 100644
index 57dae13..0000000
--- a/metron-platform/metron-enrichment/metron-enrichment-storm/src/test/java/org/apache/metron/enrichment/bolt/SplitBoltTest.java
+++ /dev/null
@@ -1,123 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.metron.enrichment.bolt;
-
-import org.apache.storm.task.TopologyContext;
-import org.apache.storm.topology.OutputFieldsDeclarer;
-import org.apache.storm.tuple.Tuple;
-import org.apache.storm.tuple.Values;
-import com.google.common.collect.ImmutableList;
-import org.apache.metron.common.configuration.ConfigurationType;
-import org.apache.metron.test.bolt.BaseEnrichmentBoltTest;
-import org.json.simple.JSONObject;
-import org.junit.Assert;
-import org.junit.Test;
-
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.anyString;
-import static org.mockito.Matchers.argThat;
-import static org.mockito.Matchers.eq;
-import static org.mockito.Mockito.*;
-
-public class SplitBoltTest extends BaseEnrichmentBoltTest {
-
- public class StandAloneSplitBolt extends SplitBolt<JSONObject> {
-
- public StandAloneSplitBolt(String zookeeperUrl) {
- super(zookeeperUrl);
- }
-
-
- @Override
- public void prepare(Map map, TopologyContext topologyContext) {
-
- }
-
- @Override
- public Set<String> getStreamIds() {
- return streamIds;
- }
-
- @Override
- public String getKey(Tuple tuple, JSONObject message) {
- return key;
- }
-
- @Override
- public JSONObject generateMessage(Tuple tuple) {
- return sampleMessage;
- }
-
- @Override
- public Map<String, List<JSONObject>> splitMessage(JSONObject message) {
- return null;
- }
-
- @Override
- public void declareOther(OutputFieldsDeclarer declarer) {
-
- }
-
- @Override
- public void emitOther(Tuple tuple, JSONObject message) {
-
- }
- }
-
- @Test
- public void test() {
- StandAloneSplitBolt splitBolt = spy(new StandAloneSplitBolt("zookeeperUrl"));
- splitBolt.setCuratorFramework(client);
- splitBolt.setZKCache(cache);
- doCallRealMethod().when(splitBolt).reloadCallback(anyString(), any(ConfigurationType.class));
- splitBolt.prepare(new HashMap(), topologyContext, outputCollector);
- splitBolt.declareOutputFields(declarer);
- verify(declarer, times(1)).declareStream(eq("message"), argThat(new FieldsMatcher("key", "message", "subgroup")));
- for(String streamId: streamIds) {
- verify(declarer, times(1)).declareStream(eq(streamId), argThat(new FieldsMatcher("key", "message")));
- }
- verify(declarer, times(1)).declareStream(eq("error"), argThat(new FieldsMatcher("message")));
-
- JSONObject sampleMessage = splitBolt.generateMessage(tuple);
- Map<String, List<JSONObject>> streamMessageMap = new HashMap<>();
- streamMessageMap.put("geo", ImmutableList.of(geoMessage));
- streamMessageMap.put("host", ImmutableList.of(hostMessage));
- streamMessageMap.put("hbaseEnrichment", ImmutableList.of(hbaseEnrichmentMessage));
- doReturn(streamMessageMap).when(splitBolt).splitMessage(sampleMessage);
- splitBolt.execute(tuple);
- verify(outputCollector, times(1)).emit(eq("message"), any(tuple.getClass()), eq(new Values(key, sampleMessage, "")));
- verify(outputCollector, times(1)).emit(eq("geo"), eq(new Values(key, geoMessage)));
- verify(outputCollector, times(1)).emit(eq("host"), eq(new Values(key, hostMessage)));
- verify(outputCollector, times(1)).emit(eq("hbaseEnrichment"), eq(new Values(key, hbaseEnrichmentMessage)));
- verify(outputCollector, times(1)).ack(tuple);
- streamMessageMap = new HashMap<>();
- streamMessageMap.put("host", null);
- doReturn(streamMessageMap).when(splitBolt).splitMessage(sampleMessage);
- try {
- splitBolt.execute(tuple);
- Assert.fail("An exception should be thrown when splitMessage produces a null value for a stream");
- }catch (IllegalArgumentException e) {}
- }
-
-
-}
diff --git a/metron-platform/metron-enrichment/metron-enrichment-storm/src/test/java/org/apache/metron/enrichment/bolt/ThreatIntelJoinBoltTest.java b/metron-platform/metron-enrichment/metron-enrichment-storm/src/test/java/org/apache/metron/enrichment/bolt/ThreatIntelJoinBoltTest.java
deleted file mode 100644
index aeafede..0000000
--- a/metron-platform/metron-enrichment/metron-enrichment-storm/src/test/java/org/apache/metron/enrichment/bolt/ThreatIntelJoinBoltTest.java
+++ /dev/null
@@ -1,222 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.metron.enrichment.bolt;
-
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
-import com.fasterxml.jackson.databind.JsonMappingException;
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.IOException;
-import java.util.HashMap;
-import java.util.Map;
-import org.adrianwalker.multilinestring.Multiline;
-import org.apache.metron.common.configuration.enrichment.SensorEnrichmentConfig;
-import org.apache.metron.common.configuration.enrichment.threatintel.ThreatTriageConfig;
-import org.apache.metron.storm.common.message.MessageGetStrategy;
-import org.apache.metron.common.utils.JSONUtils;
-import org.apache.metron.enrichment.adapters.maxmind.asn.GeoLiteAsnDatabase;
-import org.apache.metron.enrichment.adapters.maxmind.geo.GeoLiteCityDatabase;
-import org.apache.metron.test.bolt.BaseEnrichmentBoltTest;
-import org.apache.metron.test.utils.UnitTestHelper;
-import org.apache.storm.tuple.Tuple;
-import org.json.simple.JSONObject;
-import org.json.simple.parser.JSONParser;
-import org.json.simple.parser.ParseException;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Test;
-
-public class ThreatIntelJoinBoltTest extends BaseEnrichmentBoltTest {
-
- private static final String enrichmentConfigPath = "../" + sampleSensorEnrichmentConfigPath;
-
- /**
- * {
- * "field1": "value1",
- * "enrichedField1": "enrichedValue1",
- * "source.type": "test"
- * }
- */
- @Multiline
- private String messageString;
-
- /**
- * {
- * "field1": "value1",
- * "enrichedField1": "enrichedValue1",
- * "source.type": "test",
- * "threatintels.field.end.ts": "timing"
- * }
- */
- @Multiline
- private String messageWithTimingString;
-
- /**
- * {
- * "field1": "value1",
- * "enrichedField1": "enrichedValue1",
- * "source.type": "test",
- * "threatintels.field": "threatIntelValue"
- * }
- */
- @Multiline
- private String alertMessageString;
-
- private JSONObject message;
- private JSONObject messageWithTiming;
- private JSONObject alertMessage;
-
- @Before
- public void parseMessages() throws ParseException {
- JSONParser parser = new JSONParser();
- message = (JSONObject) parser.parse(messageString);
- messageWithTiming = (JSONObject) parser.parse(messageWithTimingString);
- alertMessage = (JSONObject) parser.parse(alertMessageString);
- }
-
- /**
- * {
- * "riskLevelRules" : [
- * {
- * "rule" : "enrichedField1 == 'enrichedValue1'",
- * "score" : 10
- * }
- * ],
- * "aggregator" : "MAX"
- * }
- */
- @Multiline
- private static String testWithTriageConfig;
-
- @Test
- public void testWithTriage() throws IOException {
- test(testWithTriageConfig, false);
- }
-
- /**
- * {
- * "riskLevelRules" : [
- * {
- * "rule" : "enrichedField1 == 'enrichedValue1",
- * "score" : 10
- * }
- * ],
- * "aggregator" : "MAX"
- * }
- */
- @Multiline
- private static String testWithBadTriageRuleConfig;
-
- @Test
- public void testWithBadTriageRule() throws IOException {
- test(testWithBadTriageRuleConfig, true);
- }
-
- @Test
- public void testWithoutTriage() throws IOException {
- test(null, false);
- }
-
- /**
- * {
- * "riskLevelRules": [
- * {
- * "rule" : "not(IN_SUBNET(ip_dst_addr, '192.168.0.0/24'))",
- * "score" : 10
- * }
- * ],
- * "aggregator": "MAX"
- * }
- */
- @Multiline
- private static String testWithStellarFunctionConfig;
-
- @Test
- public void testWithStellarFunction() throws IOException {
- test(testWithStellarFunctionConfig, false);
- }
-
- public void test(String threatTriageConfig, boolean badConfig) throws IOException {
-
- ThreatIntelJoinBolt threatIntelJoinBolt = new ThreatIntelJoinBolt("zookeeperUrl");
- threatIntelJoinBolt.setCuratorFramework(client);
- threatIntelJoinBolt.setZKCache(cache);
-
- SensorEnrichmentConfig enrichmentConfig = JSONUtils.INSTANCE.load(
- new FileInputStream(enrichmentConfigPath), SensorEnrichmentConfig.class);
- boolean withThreatTriage = threatTriageConfig != null;
- if (withThreatTriage) {
- try {
- enrichmentConfig.getThreatIntel().setTriageConfig(JSONUtils.INSTANCE.load(threatTriageConfig, ThreatTriageConfig.class));
- if (badConfig) {
- Assert.fail(threatTriageConfig + "\nThis should not parse!");
- }
- } catch (JsonMappingException pe) {
- if (!badConfig) {
- throw pe;
- }
- }
- }
- threatIntelJoinBolt.getConfigurations().updateSensorEnrichmentConfig(sensorType, enrichmentConfig);
- HashMap<String, Object> globalConfig = new HashMap<>();
- String baseDir = UnitTestHelper.findDir(new File("../metron-enrichment-common"), "GeoLite");
- File geoHdfsFile = new File(new File(baseDir), "GeoLite2-City.mmdb.gz");
- globalConfig.put(GeoLiteCityDatabase.GEO_HDFS_FILE, geoHdfsFile.getAbsolutePath());
- File asnHdfsFile = new File(new File(baseDir), "GeoLite2-ASN.tar.gz");
- globalConfig.put(GeoLiteAsnDatabase.ASN_HDFS_FILE, asnHdfsFile.getAbsolutePath());
- threatIntelJoinBolt.getConfigurations().updateGlobalConfig(globalConfig);
- threatIntelJoinBolt.withMaxCacheSize(100);
- threatIntelJoinBolt.withMaxTimeRetain(10000);
- threatIntelJoinBolt.prepare(new HashMap<>(), topologyContext, outputCollector);
-
- Map<String, Object> fieldMap = threatIntelJoinBolt.getFieldMap("incorrectSourceType");
- Assert.assertNull(fieldMap);
-
- fieldMap = threatIntelJoinBolt.getFieldMap(sensorType);
- Assert.assertTrue(fieldMap.containsKey("hbaseThreatIntel"));
-
- MessageGetStrategy messageGetStrategy = mock(MessageGetStrategy.class);
- Tuple messageTuple = mock(Tuple.class);
- when(messageGetStrategy.get(messageTuple)).thenReturn(message);
- Map<String, Tuple> streamMessageMap = new HashMap<>();
- streamMessageMap.put("message", messageTuple);
- JSONObject joinedMessage = threatIntelJoinBolt.joinMessages(streamMessageMap, messageGetStrategy);
- assertFalse(joinedMessage.containsKey("is_alert"));
-
- when(messageGetStrategy.get(messageTuple)).thenReturn(messageWithTiming);
- joinedMessage = threatIntelJoinBolt.joinMessages(streamMessageMap, messageGetStrategy);
- assertFalse(joinedMessage.containsKey("is_alert"));
-
- when(messageGetStrategy.get(messageTuple)).thenReturn(alertMessage);
- joinedMessage = threatIntelJoinBolt.joinMessages(streamMessageMap, messageGetStrategy);
- assertTrue(joinedMessage.containsKey("is_alert") && "true".equals(joinedMessage.get("is_alert")));
-
- if(withThreatTriage && !badConfig) {
- assertTrue(joinedMessage.containsKey("threat.triage.score"));
- Double score = (Double) joinedMessage.get("threat.triage.score");
- assertTrue(Math.abs(10d - score) < 1e-10);
- }
- else {
- assertFalse(joinedMessage.containsKey("threat.triage.score"));
- }
- }
-}
diff --git a/metron-platform/metron-enrichment/metron-enrichment-storm/src/test/java/org/apache/metron/enrichment/bolt/ThreatIntelSplitterBoltTest.java b/metron-platform/metron-enrichment/metron-enrichment-storm/src/test/java/org/apache/metron/enrichment/bolt/ThreatIntelSplitterBoltTest.java
deleted file mode 100644
index a27297f..0000000
--- a/metron-platform/metron-enrichment/metron-enrichment-storm/src/test/java/org/apache/metron/enrichment/bolt/ThreatIntelSplitterBoltTest.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.metron.enrichment.bolt;
-
-import org.apache.metron.test.bolt.BaseEnrichmentBoltTest;
-import org.junit.Assert;
-import org.junit.Test;
-
-import java.io.FileInputStream;
-import java.io.IOException;
-import java.util.HashMap;
-import java.util.Map;
-
-public class ThreatIntelSplitterBoltTest extends BaseEnrichmentBoltTest {
-
- private static final String enrichmentConfigPath = "../" + sampleSensorEnrichmentConfigPath;
-
- @Test
- public void test() throws IOException {
- String threatIntelType = "hbaseThreatIntel";
- ThreatIntelSplitterBolt threatIntelSplitterBolt = new ThreatIntelSplitterBolt("zookeeperUrl");
- threatIntelSplitterBolt.setCuratorFramework(client);
- threatIntelSplitterBolt.setZKCache(cache);
- threatIntelSplitterBolt.getConfigurations().updateSensorEnrichmentConfig(sensorType, new FileInputStream(enrichmentConfigPath));
- threatIntelSplitterBolt.prepare(new HashMap<>(), topologyContext, outputCollector);
- Map<String, Object> fieldMap = threatIntelSplitterBolt.getFieldMap(sensorType);
- Assert.assertTrue(fieldMap.containsKey(threatIntelType));
- String fieldName = threatIntelSplitterBolt.getKeyName(threatIntelType, "field");
- Assert.assertEquals("threatintels.hbaseThreatIntel.field", fieldName);
- }
-}
diff --git a/metron-platform/metron-enrichment/metron-enrichment-storm/src/test/java/org/apache/metron/enrichment/integration/EnrichmentIntegrationTest.java b/metron-platform/metron-enrichment/metron-enrichment-storm/src/test/java/org/apache/metron/enrichment/integration/EnrichmentIntegrationTest.java
index 6b047fb..a138c0d 100644
--- a/metron-platform/metron-enrichment/metron-enrichment-storm/src/test/java/org/apache/metron/enrichment/integration/EnrichmentIntegrationTest.java
+++ b/metron-platform/metron-enrichment/metron-enrichment-storm/src/test/java/org/apache/metron/enrichment/integration/EnrichmentIntegrationTest.java
@@ -23,18 +23,6 @@ import com.google.common.base.Predicate;
import com.google.common.base.Predicates;
import com.google.common.base.Splitter;
import com.google.common.collect.Iterables;
-import java.io.File;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Objects;
-import java.util.Properties;
-import java.util.Set;
-import java.util.stream.Stream;
-import javax.annotation.Nullable;
import org.apache.commons.lang3.StringUtils;
import org.apache.metron.TestConstants;
import org.apache.metron.common.Constants;
@@ -44,7 +32,6 @@ import org.apache.metron.enrichment.adapters.maxmind.geo.GeoLiteCityDatabase;
import org.apache.metron.enrichment.converter.EnrichmentHelper;
import org.apache.metron.enrichment.converter.EnrichmentKey;
import org.apache.metron.enrichment.converter.EnrichmentValue;
-import org.apache.metron.integration.components.ConfigUploadComponent;
import org.apache.metron.enrichment.lookup.LookupKV;
import org.apache.metron.enrichment.lookup.accesstracker.PersistentBloomTrackerCreator;
import org.apache.metron.enrichment.stellar.SimpleHBaseEnrichmentFunctions;
@@ -54,6 +41,7 @@ import org.apache.metron.hbase.mock.MockHTable;
import org.apache.metron.integration.BaseIntegrationTest;
import org.apache.metron.integration.ComponentRunner;
import org.apache.metron.integration.ProcessorResult;
+import org.apache.metron.integration.components.ConfigUploadComponent;
import org.apache.metron.integration.components.FluxTopologyComponent;
import org.apache.metron.integration.components.KafkaComponent;
import org.apache.metron.integration.components.ZKServerComponent;
@@ -66,8 +54,21 @@ import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
+import javax.annotation.Nullable;
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Properties;
+import java.util.Set;
+import java.util.stream.Stream;
+
/**
- * Integration test for the 'Split-Join' enrichment topology.
+ * Integration test for the enrichment topology.
*/
public class EnrichmentIntegrationTest extends BaseIntegrationTest {
@@ -98,12 +99,8 @@ public class EnrichmentIntegrationTest extends BaseIntegrationTest {
private static File geoHdfsFile;
private static File asnHdfsFile;
- protected String fluxPath() {
- return "src/main/flux/enrichment/remote-splitjoin.yaml";
- }
-
private static List<byte[]> getInputMessages(String path){
- try{
+ try {
List<byte[]> ret = TestUtils.readSampleData(path);
{
//we want one of the fields without a destination IP to ensure that enrichments can function
@@ -113,7 +110,7 @@ public class EnrichmentIntegrationTest extends BaseIntegrationTest {
ret.add(JSONUtils.INSTANCE.toJSONPretty(sansDestinationIp));
}
return ret;
- }catch(IOException ioe){
+ } catch(IOException ioe){
return null;
}
}
@@ -126,34 +123,41 @@ public class EnrichmentIntegrationTest extends BaseIntegrationTest {
}
/**
- * Returns the path to the topology properties template.
- *
* @return The path to the topology properties template.
*/
public String getTemplatePath() {
- return "src/main/config/enrichment-splitjoin.properties.j2";
+ return "src/main/config/enrichment.properties.j2";
+ }
+
+ /**
+ * @return The path to the flux file defining the topology.
+ */
+ public String fluxPath() {
+ return "src/main/flux/enrichment/remote.yaml";
}
/**
- * Properties for the 'Split-Join' topology.
- *
* @return The topology properties.
*/
public Properties getTopologyProperties() {
return new Properties() {{
+
+ // storm
setProperty("enrichment_workers", "1");
setProperty("enrichment_acker_executors", "0");
setProperty("enrichment_topology_worker_childopts", "");
setProperty("topology_auto_credentials", "[]");
- setProperty("enrichment_topology_max_spout_pending", "");
- setProperty("enrichment_kafka_start", "UNCOMMITTED_EARLIEST");
+ setProperty("enrichment_topology_max_spout_pending", "500");
+
+ // kafka - zookeeper_quorum, kafka_brokers set elsewhere
setProperty("kafka_security_protocol", "PLAINTEXT");
+ setProperty("enrichment_kafka_start", "UNCOMMITTED_EARLIEST");
setProperty("enrichment_input_topic", Constants.ENRICHMENT_TOPIC);
setProperty("enrichment_output_topic", Constants.INDEXING_TOPIC);
setProperty("enrichment_error_topic", ERROR_TOPIC);
setProperty("threatintel_error_topic", ERROR_TOPIC);
- setProperty("enrichment_join_cache_size", "1000");
- setProperty("threatintel_join_cache_size", "1000");
+
+ // enrichment
setProperty("enrichment_hbase_provider_impl", "" + MockHBaseTableProvider.class.getName());
setProperty("enrichment_hbase_table", enrichmentsTableName);
setProperty("enrichment_hbase_cf", cf);
@@ -161,19 +165,28 @@ public class EnrichmentIntegrationTest extends BaseIntegrationTest {
"{\"ip\":\"10.1.128.237\", \"local\":\"UNKNOWN\", \"type\":\"unknown\", \"asset_value\" : \"important\"}," +
"{\"ip\":\"10.60.10.254\", \"local\":\"YES\", \"type\":\"printer\", \"asset_value\" : \"important\"}," +
"{\"ip\":\"10.0.2.15\", \"local\":\"YES\", \"type\":\"printer\", \"asset_value\" : \"important\"}]");
+
+ // threat intel
setProperty("threatintel_hbase_table", threatIntelTableName);
setProperty("threatintel_hbase_cf", cf);
- setProperty("enrichment_kafka_spout_parallelism", "1");
- setProperty("enrichment_split_parallelism", "1");
- setProperty("enrichment_stellar_parallelism", "1");
- setProperty("enrichment_join_parallelism", "1");
- setProperty("threat_intel_split_parallelism", "1");
- setProperty("threat_intel_stellar_parallelism", "1");
- setProperty("threat_intel_join_parallelism", "1");
- setProperty("kafka_writer_parallelism", "1");
+
+ // parallelism
+ setProperty("unified_kafka_spout_parallelism", "1");
+ setProperty("unified_enrichment_parallelism", "1");
+ setProperty("unified_threat_intel_parallelism", "1");
+ setProperty("unified_kafka_writer_parallelism", "1");
+
+ // caches
+ setProperty("unified_enrichment_cache_size", "1000");
+ setProperty("unified_threat_intel_cache_size", "1000");
+
+ // threads
+ setProperty("unified_enrichment_threadpool_size", "1");
+ setProperty("unified_enrichment_threadpool_type", "FIXED");
}};
}
+
@Test
public void test() throws Exception {
diff --git a/metron-platform/metron-enrichment/metron-enrichment-storm/src/test/java/org/apache/metron/enrichment/integration/UnifiedEnrichmentIntegrationTest.java b/metron-platform/metron-enrichment/metron-enrichment-storm/src/test/java/org/apache/metron/enrichment/integration/UnifiedEnrichmentIntegrationTest.java
deleted file mode 100644
index 93c9700..0000000
--- a/metron-platform/metron-enrichment/metron-enrichment-storm/src/test/java/org/apache/metron/enrichment/integration/UnifiedEnrichmentIntegrationTest.java
+++ /dev/null
@@ -1,96 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.metron.enrichment.integration;
-
-import org.apache.metron.common.Constants;
-import org.apache.metron.hbase.mock.MockHBaseTableProvider;
-
-import java.util.Properties;
-
-/**
- * Integration test for the 'Unified' enrichment topology.
- */
-public class UnifiedEnrichmentIntegrationTest extends EnrichmentIntegrationTest {
-
- /**
- * Returns the path to the topology properties template.
- *
- * @return The path to the topology properties template.
- */
- public String getTemplatePath() {
- return "src/main/config/enrichment-unified.properties.j2";
- }
-
- /**
- * Properties for the 'Unified' topology.
- *
- * @return The topology properties.
- */
- @Override
- public Properties getTopologyProperties() {
- return new Properties() {{
-
- // storm
- setProperty("enrichment_workers", "1");
- setProperty("enrichment_acker_executors", "0");
- setProperty("enrichment_topology_worker_childopts", "");
- setProperty("topology_auto_credentials", "[]");
- setProperty("enrichment_topology_max_spout_pending", "500");
-
- // kafka - zookeeper_quorum, kafka_brokers set elsewhere
- setProperty("kafka_security_protocol", "PLAINTEXT");
- setProperty("enrichment_kafka_start", "UNCOMMITTED_EARLIEST");
- setProperty("enrichment_input_topic", Constants.ENRICHMENT_TOPIC);
- setProperty("enrichment_output_topic", Constants.INDEXING_TOPIC);
- setProperty("enrichment_error_topic", ERROR_TOPIC);
- setProperty("threatintel_error_topic", ERROR_TOPIC);
-
- // enrichment
- setProperty("enrichment_hbase_provider_impl", "" + MockHBaseTableProvider.class.getName());
- setProperty("enrichment_hbase_table", enrichmentsTableName);
- setProperty("enrichment_hbase_cf", cf);
- setProperty("enrichment_host_known_hosts", "[{\"ip\":\"10.1.128.236\", \"local\":\"YES\", \"type\":\"webserver\", \"asset_value\" : \"important\"}," +
- "{\"ip\":\"10.1.128.237\", \"local\":\"UNKNOWN\", \"type\":\"unknown\", \"asset_value\" : \"important\"}," +
- "{\"ip\":\"10.60.10.254\", \"local\":\"YES\", \"type\":\"printer\", \"asset_value\" : \"important\"}," +
- "{\"ip\":\"10.0.2.15\", \"local\":\"YES\", \"type\":\"printer\", \"asset_value\" : \"important\"}]");
-
- // threat intel
- setProperty("threatintel_hbase_table", threatIntelTableName);
- setProperty("threatintel_hbase_cf", cf);
-
- // parallelism
- setProperty("unified_kafka_spout_parallelism", "1");
- setProperty("unified_enrichment_parallelism", "1");
- setProperty("unified_threat_intel_parallelism", "1");
- setProperty("unified_kafka_writer_parallelism", "1");
-
- // caches
- setProperty("unified_enrichment_cache_size", "1000");
- setProperty("unified_threat_intel_cache_size", "1000");
-
- // threads
- setProperty("unified_enrichment_threadpool_size", "1");
- setProperty("unified_enrichment_threadpool_type", "FIXED");
- }};
- }
-
- @Override
- public String fluxPath() {
- return "src/main/flux/enrichment/remote-unified.yaml";
- }
-}