You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@metron.apache.org by ni...@apache.org on 2019/07/01 12:25:57 UTC

[metron] branch master updated: METRON-2164 Remove the Split-Join Enrichment Topology (nickwallen) closes apache/metron#1448

This is an automated email from the ASF dual-hosted git repository.

nickallen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/metron.git


The following commit(s) were added to refs/heads/master by this push:
     new 02f88a3  METRON-2164 Remove the Split-Join Enrichment Topology (nickwallen) closes apache/metron#1448
02f88a3 is described below

commit 02f88a313068a876e20227a2347cedafb4cec0ae
Author: nickwallen <ni...@nickallen.org>
AuthorDate: Mon Jul 1 08:25:35 2019 -0400

    METRON-2164 Remove the Split-Join Enrichment Topology (nickwallen) closes apache/metron#1448
---
 Upgrading.md                                       |   5 +
 metron-deployment/packaging/ambari/.gitignore      |   2 -
 .../packaging/ambari/metron-mpack/pom.xml          |   6 +-
 .../configuration/metron-enrichment-env.xml        |  82 ---
 .../CURRENT/package/scripts/enrichment_commands.py |  15 +-
 .../CURRENT/package/scripts/enrichment_master.py   |   9 +-
 .../CURRENT/package/scripts/params/params_linux.py |  12 -
 .../METRON/CURRENT/themes/metron_theme.json        |  63 +--
 .../packaging/docker/rpm-docker/SPECS/metron.spec  |   6 +-
 metron-platform/Performance-tuning-guide.md        |  10 +-
 .../metron-enrichment-storm/README.md              |  15 +-
 .../metron-enrichment-storm/enrichment_arch.png    | Bin 113606 -> 0 bytes
 .../main/config/enrichment-splitjoin.properties    |  63 ---
 .../main/config/enrichment-splitjoin.properties.j2 |  63 ---
 ...nt-unified.properties => enrichment.properties} |   0
 ...fied.properties.j2 => enrichment.properties.j2} |   0
 .../src/main/flux/enrichment/remote-splitjoin.yaml | 593 ---------------------
 .../{remote-unified.yaml => remote.yaml}           |   0
 .../metron/enrichment/bolt/EnrichmentJoinBolt.java | 122 -----
 .../enrichment/bolt/EnrichmentSplitterBolt.java    | 184 -------
 .../apache/metron/enrichment/bolt/JoinBolt.java    | 189 -------
 .../apache/metron/enrichment/bolt/SplitBolt.java   | 113 ----
 .../enrichment/bolt/ThreatIntelJoinBolt.java       | 123 -----
 .../enrichment/bolt/ThreatIntelSplitterBolt.java   |  67 ---
 .../src/main/scripts/start_enrichment_topology.sh  |   7 +-
 .../enrichment/bolt/EnrichmentJoinBoltTest.java    |  96 ----
 .../bolt/EnrichmentSplitterBoltTest.java           | 106 ----
 .../metron/enrichment/bolt/JoinBoltTest.java       | 193 -------
 .../metron/enrichment/bolt/SplitBoltTest.java      | 123 -----
 .../enrichment/bolt/ThreatIntelJoinBoltTest.java   | 222 --------
 .../bolt/ThreatIntelSplitterBoltTest.java          |  46 --
 .../integration/EnrichmentIntegrationTest.java     |  87 +--
 .../UnifiedEnrichmentIntegrationTest.java          |  96 ----
 33 files changed, 70 insertions(+), 2648 deletions(-)

diff --git a/Upgrading.md b/Upgrading.md
index d59aa57..66fe9c3 100644
--- a/Upgrading.md
+++ b/Upgrading.md
@@ -19,6 +19,11 @@ limitations under the License.
 This document constitutes a per-version listing of changes of
 configuration which are non-backwards compatible.
 
+## 0.7.1 to 0.7.2
+
+### [METRON-2164: Remove the Split-Join Enrichment Topology](https://issues.apache.org/jira/browse/METRON-2164)
+The Split-Join Enrichment topology has been deprecated since November 2018. Metron has defaulted to using the Unified Enrichment topology since that time. All users of the Split-Join Enrichment topology should migrate to the Unified Enrichment topology. Both topologies provide equivalent functionality.
+
 ## 0.7.0 to 0.7.1
 
 ### [METRON-2100: Update developer documentation for full dev management UI parser aggregation feature gap](https://issues.apache.org/jira/browse/METRON-2100)
diff --git a/metron-deployment/packaging/ambari/.gitignore b/metron-deployment/packaging/ambari/.gitignore
index 242a4da..ab5dc85 100644
--- a/metron-deployment/packaging/ambari/.gitignore
+++ b/metron-deployment/packaging/ambari/.gitignore
@@ -4,6 +4,4 @@ elasticsearch.properties.j2
 solr.properties.j2
 hdfs.properties.j2
 enrichment.properties.j2
-enrichment-splitjoin.properties.j2
-enrichment-unified.properties.j2
 pcap.properties.j2
diff --git a/metron-deployment/packaging/ambari/metron-mpack/pom.xml b/metron-deployment/packaging/ambari/metron-mpack/pom.xml
index ae62bd8..35bf369 100644
--- a/metron-deployment/packaging/ambari/metron-mpack/pom.xml
+++ b/metron-deployment/packaging/ambari/metron-mpack/pom.xml
@@ -110,8 +110,7 @@
                                 <resource>
                                     <directory>${basedir}/../../../../metron-platform/metron-enrichment/metron-enrichment-storm/src/main/config</directory>
                                     <includes>
-                                        <include>enrichment-splitjoin.properties.j2</include>
-                                        <include>enrichment-unified.properties.j2</include>
+                                        <include>enrichment.properties.j2</include>
                                     </includes>
                                     <filtering>false</filtering>
                                 </resource>
@@ -185,8 +184,7 @@
                         <fileset>
                             <directory>${basedir}/src/main/resources/common-services/METRON/CURRENT/package/templates</directory>
                             <includes>
-                                <include>enrichment-unified.properties.j2</include>
-                                <include>enrichment-splitjoin.properties.j2</include>
+                                <include>enrichment.properties.j2</include>
                                 <include>elasticsearch.properties.j2</include>
                                 <include>hdfs.properties.j2</include>
                             </includes>
diff --git a/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/configuration/metron-enrichment-env.xml b/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/configuration/metron-enrichment-env.xml
index 1fd4702..9fe29e3 100644
--- a/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/configuration/metron-enrichment-env.xml
+++ b/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/configuration/metron-enrichment-env.xml
@@ -193,88 +193,6 @@
       <empty-value-valid>true</empty-value-valid>
     </value-attributes>
   </property>
-  <property>
-    <name>enrichment_topology</name>
-    <description>Which Enrichment topology to execute. Note: Split-Join is deprecated in favor of the Unified topology.</description>
-    <value>Unified</value>
-    <display-name>Enrichment Topology</display-name>
-    <value-attributes>
-      <type>value-list</type>
-      <entries>
-        <entry>
-          <value>Unified</value>
-        </entry>
-        <entry>
-          <value>Split-Join</value>
-        </entry>
-      </entries>
-      <selection-cardinality>1</selection-cardinality>
-    </value-attributes>
-  </property>
-
-  <!--
-    split-join topology parameters
-  -->
-  <property>
-    <name>enrichment_join_cache_size</name>
-    <description>Enrichment join bolt max cache size for the Split Join Enrichment Topology</description>
-    <value>100000</value>
-    <display-name>Enrichment Join Max Cache Size</display-name>
-  </property>
-  <property>
-    <name>threatintel_join_cache_size</name>
-    <description>Threat Intel join bolt max cache size for the Split Join Enrichment Topology</description>
-    <value>100000</value>
-    <display-name>Threat Intel Join Max Cache Size</display-name>
-  </property>
-  <property>
-    <name>enrichment_kafka_spout_parallelism</name>
-    <description>Kafka spout parallelism for the Split Join Enrichment Topology</description>
-    <value>1</value>
-    <display-name>Enrichment Spout Parallelism</display-name>
-  </property>
-  <property>
-    <name>enrichment_split_parallelism</name>
-    <description>Enrichment split bolt parallelism for the Split Join Enrichment Topology</description>
-    <value>1</value>
-    <display-name>Enrichment Split Parallelism</display-name>
-  </property>
-  <property>
-    <name>enrichment_stellar_parallelism</name>
-    <description>Stellar enrichment bolt parallelism for the Split Join Enrichment Topology</description>
-    <value>1</value>
-    <display-name>Stellar Enrichment Parallelism</display-name>
-  </property>
-  <property>
-    <name>enrichment_join_parallelism</name>
-    <description>Enrichment join bolt parallelism for the Split Join Enrichment Topology</description>
-    <value>1</value>
-    <display-name>Enrichment Join Parallelism</display-name>
-  </property>
-  <property>
-    <name>threat_intel_split_parallelism</name>
-    <description>Threat Intel split bolt parallelism for the Split Join Enrichment Topology</description>
-    <value>1</value>
-    <display-name>Threat Intel Split Parallelism</display-name>
-  </property>
-  <property>
-    <name>threat_intel_stellar_parallelism</name>
-    <description>Threat Intel stellar bolt parallelism for the Split Join Enrichment Topology</description>
-    <value>1</value>
-    <display-name>Threat Intel Stellar Parallelism</display-name>
-  </property>
-  <property>
-    <name>threat_intel_join_parallelism</name>
-    <description>Threat Intel join bolt parallelism for the Split Join Enrichment Topology</description>
-    <value>1</value>
-    <display-name>Threat Intel Join Parallelism</display-name>
-  </property>
-  <property>
-    <name>kafka_writer_parallelism</name>
-    <description>Kafka writer parallelism for the Split Join Enrichment Topology</description>
-    <value>1</value>
-    <display-name>Enrichment Kafka Writer Parallelism</display-name>
-  </property>
 
   <!--
     unified topology parameters
diff --git a/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/enrichment_commands.py b/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/enrichment_commands.py
index 5d82c8c..80cc1fa 100755
--- a/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/enrichment_commands.py
+++ b/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/enrichment_commands.py
@@ -146,20 +146,9 @@ class EnrichmentCommands:
 
     def start_enrichment_topology(self, env):
         Logger.info("Starting Metron enrichment topology: {0}".format(self.__enrichment_topology))
-
         if not self.is_topology_active(env):
-
-            # which enrichment topology needs started?
-            if self.__params.enrichment_topology == "Unified":
-                topology_flux = "{0}/flux/enrichment/remote-unified.yaml".format(self.__params.metron_home)
-                topology_props = "{0}/config/enrichment-unified.properties".format(self.__params.metron_home)
-            elif self.__params.enrichment_topology == "Split-Join":
-                topology_flux = "{0}/flux/enrichment/remote-splitjoin.yaml".format(self.__params.metron_home)
-                topology_props = "{0}/config/enrichment-splitjoin.properties".format(self.__params.metron_home)
-            else:
-                raise Fail("Unexpected enrichment topology; name=" + self.__params.enrichment_topology)
-
-            # start the topology
+            topology_flux = "{0}/flux/enrichment/remote.yaml".format(self.__params.metron_home)
+            topology_props = "{0}/config/enrichment.properties".format(self.__params.metron_home)
             start_cmd_template = """{0}/bin/start_enrichment_topology.sh --remote {1} --filter {2}"""
             Logger.info('Starting ' + self.__enrichment_topology)
             start_cmd = start_cmd_template.format(self.__params.metron_home, topology_flux, topology_props)
diff --git a/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/enrichment_master.py b/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/enrichment_master.py
index ff89b6b..cd54a6d 100755
--- a/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/enrichment_master.py
+++ b/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/enrichment_master.py
@@ -38,16 +38,11 @@ class Enrichment(Script):
         env.set_params(params)
 
         Logger.info("Running enrichment configure")
-        File(format("{metron_config_path}/enrichment-splitjoin.properties"),
-             content=Template("enrichment-splitjoin.properties.j2"),
+        File(format("{metron_config_path}/enrichment.properties"),
+             content=Template("enrichment.properties.j2"),
              owner=params.metron_user,
              group=params.metron_group)
 
-        File(format("{metron_config_path}/enrichment-unified.properties"),
-            content=Template("enrichment-unified.properties.j2"),
-            owner=params.metron_user,
-            group=params.metron_group)
-
         if not metron_service.is_zk_configured(params):
             metron_service.init_zk_config(params)
             metron_service.set_zk_configured(params)
diff --git a/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/params/params_linux.py b/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/params/params_linux.py
index a7f20fc..09205bc 100755
--- a/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/params/params_linux.py
+++ b/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/params/params_linux.py
@@ -345,18 +345,6 @@ enrichment_topology_worker_childopts += config['configurations']['metron-enrichm
 enrichment_topology_max_spout_pending = config['configurations']['metron-enrichment-env']['enrichment_topology_max_spout_pending']
 enrichment_topology = config['configurations']['metron-enrichment-env']['enrichment_topology']
 
-# Enrichment - Split Join topology
-enrichment_join_cache_size = config['configurations']['metron-enrichment-env']['enrichment_join_cache_size']
-threatintel_join_cache_size = config['configurations']['metron-enrichment-env']['threatintel_join_cache_size']
-enrichment_kafka_spout_parallelism = config['configurations']['metron-enrichment-env']['enrichment_kafka_spout_parallelism']
-enrichment_split_parallelism = config['configurations']['metron-enrichment-env']['enrichment_split_parallelism']
-enrichment_stellar_parallelism = config['configurations']['metron-enrichment-env']['enrichment_stellar_parallelism']
-enrichment_join_parallelism = config['configurations']['metron-enrichment-env']['enrichment_join_parallelism']
-threat_intel_split_parallelism = config['configurations']['metron-enrichment-env']['threat_intel_split_parallelism']
-threat_intel_stellar_parallelism = config['configurations']['metron-enrichment-env']['threat_intel_stellar_parallelism']
-threat_intel_join_parallelism = config['configurations']['metron-enrichment-env']['threat_intel_join_parallelism']
-kafka_writer_parallelism = config['configurations']['metron-enrichment-env']['kafka_writer_parallelism']
-
 # Enrichment - Unified topology
 unified_kafka_spout_parallelism = config['configurations']['metron-enrichment-env']['unified_kafka_spout_parallelism']
 unified_enrichment_parallelism = config['configurations']['metron-enrichment-env']['unified_enrichment_parallelism']
diff --git a/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/themes/metron_theme.json b/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/themes/metron_theme.json
index a9b7322..41fd044 100644
--- a/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/themes/metron_theme.json
+++ b/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/themes/metron_theme.json
@@ -65,7 +65,7 @@
             "display-name": "Enrichment",
             "layout": {
               "tab-columns": "1",
-              "tab-rows": "5",
+              "tab-rows": "4",
               "sections": [
                 {
                   "name": "section-enrichment-adapters",
@@ -135,26 +135,7 @@
                   "subsections": [
                     {
                       "name": "subsection-enrichment-unified",
-                      "display-name": "Unified Topology",
-                      "row-index": "0",
-                      "column-index": "0",
-                      "row-span": "1",
-                      "column-span": "1"
-                    }
-                  ]
-                },
-                {
-                  "name": "section-enrichment-splitjoin",
-                  "row-index": "4",
-                  "column-index": "0",
-                  "row-span": "1",
-                  "column-span": "1",
-                  "section-columns": "1",
-                  "section-rows": "1",
-                  "subsections": [
-                    {
-                      "name": "subsection-enrichment-splitjoin",
-                      "display-name": "Split Join Topology",
+                      "display-name": "Topology",
                       "row-index": "0",
                       "column-index": "0",
                       "row-span": "1",
@@ -611,46 +592,6 @@
           "subsection-name": "subsection-enrichment-storm"
         },
         {
-          "config": "metron-enrichment-env/enrichment_join_cache_size",
-          "subsection-name": "subsection-enrichment-splitjoin"
-        },
-        {
-          "config": "metron-enrichment-env/threatintel_join_cache_size",
-          "subsection-name": "subsection-enrichment-splitjoin"
-        },
-        {
-          "config": "metron-enrichment-env/enrichment_kafka_spout_parallelism",
-          "subsection-name": "subsection-enrichment-splitjoin"
-        },
-        {
-          "config": "metron-enrichment-env/enrichment_split_parallelism",
-          "subsection-name": "subsection-enrichment-splitjoin"
-        },
-        {
-          "config": "metron-enrichment-env/enrichment_stellar_parallelism",
-          "subsection-name": "subsection-enrichment-splitjoin"
-        },
-        {
-          "config": "metron-enrichment-env/enrichment_join_parallelism",
-          "subsection-name": "subsection-enrichment-splitjoin"
-        },
-        {
-          "config": "metron-enrichment-env/threat_intel_split_parallelism",
-          "subsection-name": "subsection-enrichment-splitjoin"
-        },
-        {
-          "config": "metron-enrichment-env/threat_intel_stellar_parallelism",
-          "subsection-name": "subsection-enrichment-splitjoin"
-        },
-        {
-          "config": "metron-enrichment-env/threat_intel_join_parallelism",
-          "subsection-name": "subsection-enrichment-splitjoin"
-        },
-        {
-          "config": "metron-enrichment-env/kafka_writer_parallelism",
-          "subsection-name": "subsection-enrichment-splitjoin"
-        },
-        {
           "config": "metron-enrichment-env/unified_kafka_spout_parallelism",
           "subsection-name": "subsection-enrichment-unified"
         },
diff --git a/metron-deployment/packaging/docker/rpm-docker/SPECS/metron.spec b/metron-deployment/packaging/docker/rpm-docker/SPECS/metron.spec
index 8cd9bef..e12f6cd 100644
--- a/metron-deployment/packaging/docker/rpm-docker/SPECS/metron.spec
+++ b/metron-deployment/packaging/docker/rpm-docker/SPECS/metron.spec
@@ -388,10 +388,8 @@ This package installs the Metron Enrichment Storm files
 %dir %{metron_home}/flux
 %dir %{metron_home}/flux/enrichment
 %{metron_home}/bin/start_enrichment_topology.sh
-%{metron_home}/config/enrichment-splitjoin.properties
-%{metron_home}/config/enrichment-unified.properties
-%{metron_home}/flux/enrichment/remote-splitjoin.yaml
-%{metron_home}/flux/enrichment/remote-unified.yaml
+%{metron_home}/config/enrichment.properties
+%{metron_home}/flux/enrichment/remote.yaml
 %attr(0644,root,root) %{metron_home}/lib/metron-enrichment-storm-%{full_version}-uber.jar
 
 # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
diff --git a/metron-platform/Performance-tuning-guide.md b/metron-platform/Performance-tuning-guide.md
index 5203d5f..8829abb 100644
--- a/metron-platform/Performance-tuning-guide.md
+++ b/metron-platform/Performance-tuning-guide.md
@@ -188,15 +188,14 @@ See more detail on starting parsers [here](https://github.com/apache/metron/blob
 
 **Enrichment**
 
-__Note__ These recommendations are based on the deprecated split-join enrichment topology. See [Enrichment Performance](metron-enrichment/Performance.md) for tuning recommendations for the new default unified enrichment topology.
+See [Enrichment Performance](metron-enrichment/Performance.md) for tuning recommendations for the enrichment topology.
 
 This is a mapping of the various performance tuning properties for enrichments and how they are materialized.
 
-Flux file found here - $METRON_HOME/flux/enrichment/remote-splitjoin.yaml
+Flux file found here - $METRON_HOME/flux/enrichment/remote.yaml
 
 _Note 1:_ Changes to Flux file properties that are managed by Ambari will render Ambari unable to further manage the property.
 
-_Note 2:_ Many of these settings will be irrelevant in the alternate non-split-join topology
 
 | Category                    | Ambari Property Name                       | enrichment.properties property                         | Flux Property                                          | Flux Section Location               | Storm Property Name             | Notes                                  |
 |-----------------------------|--------------------------------------------|--------------------------------------------------------|--------------------------------------------------------|-------------------------------------|---------------------------------|----------------------------------------|
@@ -209,11 +208,6 @@ _Note 2:_ Many of these settings will be irrelevant in the alternate non-split-j
 |                             | n/a                                        | n/a                                                    | setPollTimeoutMs                                       | line 230, id: kafkaConfig           | n/a                             | Kafka consumer client property         |
 |                             | n/a                                        | n/a                                                    | setMaxUncommittedOffsets                               | line 230, id: kafkaConfig           | n/a                             | Kafka consumer client property         |
 |                             | n/a                                        | n/a                                                    | setOffsetCommitPeriodMs                                | line 230, id: kafkaConfig           | n/a                             | Kafka consumer client property         |
-| Enrichment splitter         | enrichment_split_parallelism               | enrichment.split.parallelism                           | parallelism                                            | line 253, id: enrichmentSplitBolt   | n/a                             |                                        |
-| Enrichment joiner           | enrichment_join_parallelism                | enrichment.join.parallelism                            | parallelism                                            | line 316, id: enrichmentJoinBolt    | n/a                             |                                        |
-| Threat intel splitter       | threat_intel_split_parallelism             | threat.intel.split.parallelism                         | parallelism                                            | line 338, id: threatIntelSplitBolt  | n/a                             |                                        |
-| Threat intel joiner         | threat_intel_join_parallelism              | threat.intel.join.parallelism                          | parallelism                                            | line 376, id: threatIntelJoinBolt   | n/a                             |                                        |
-| Output bolt                 | kafka_writer_parallelism                   | kafka.writer.parallelism                               | parallelism                                            | line 397, id: outputBolt            | n/a                             |                                        |
 
 When adding Kafka spout properties, there are 3 ways you'll do this.
 
diff --git a/metron-platform/metron-enrichment/metron-enrichment-storm/README.md b/metron-platform/metron-enrichment/metron-enrichment-storm/README.md
index 0766a29..47d5c35 100644
--- a/metron-platform/metron-enrichment/metron-enrichment-storm/README.md
+++ b/metron-platform/metron-enrichment/metron-enrichment-storm/README.md
@@ -43,7 +43,7 @@ to split across multiple bolts).
 
 There are two parameters which you might want to tune in this topology.
 Both of them are topology configuration adjustable in the flux file
-`$METRON_HOME/config/flux/enrichment/remote-unified.yaml`:
+`$METRON_HOME/config/flux/enrichment/remote.yaml`:
 * `metron.threadpool.size` : The size of the threadpool.  This can take a number or a multiple of the number of cores (e.g. `5C` to 5 times the number of cores).  The default is `2C`.
 * `metron.threadpool.type` : The type of threadpool. (note: descriptions taken from [here](https://zeroturnaround.com/rebellabs/fixedthreadpool-cachedthreadpool-or-forkjoinpool-picking-correct-java-executors-for-background-tasks/)).
    * `FIXED` is a fixed threadpool of size `n`. `n` threads will process tasks at the time, when the pool is saturated, new tasks will get added to a queue without a limit on size. Good for CPU intensive tasks.  This is the default.
@@ -54,19 +54,6 @@ intel bolt, the configurations will be taken from the respective join bolt
 parallelism.  When proper ambari support for this is added, we will add
 its own property.
 
-### Split-Join Enrichment Topology
-
-The now-deprecated split/join topology is also available and performs enrichments in parallel.
-This poses some issues in terms of ease of tuning and reasoning about performance.
-
-![Architecture](enrichment_arch.png)
-
-#### Using It
-
-In order to use the older, deprecated topology, you will need to
-* Edit `$METRON_HOME/bin/start_enrichment_topology.sh` and adjust it to use `remote-splitjoin.yaml` instead of `remote-unified.yaml`
-* Restart the enrichment topology.
-
 ## Enrichment Configuration
 
 The configuration for the `enrichment` topology, the topology primarily
diff --git a/metron-platform/metron-enrichment/metron-enrichment-storm/enrichment_arch.png b/metron-platform/metron-enrichment/metron-enrichment-storm/enrichment_arch.png
deleted file mode 100644
index 3b8bcdb..0000000
Binary files a/metron-platform/metron-enrichment/metron-enrichment-storm/enrichment_arch.png and /dev/null differ
diff --git a/metron-platform/metron-enrichment/metron-enrichment-storm/src/main/config/enrichment-splitjoin.properties b/metron-platform/metron-enrichment/metron-enrichment-storm/src/main/config/enrichment-splitjoin.properties
deleted file mode 100644
index 109c2ee..0000000
--- a/metron-platform/metron-enrichment/metron-enrichment-storm/src/main/config/enrichment-splitjoin.properties
+++ /dev/null
@@ -1,63 +0,0 @@
-#  Licensed to the Apache Software Foundation (ASF) under one
-#  or more contributor license agreements.  See the NOTICE file
-#  distributed with this work for additional information
-#  regarding copyright ownership.  The ASF licenses this file
-#  to you under the Apache License, Version 2.0 (the
-#  "License"); you may not use this file except in compliance
-#  with the License.  You may obtain a copy of the License at
-#
-#      http://www.apache.org/licenses/LICENSE-2.0
-#
-#  Unless required by applicable law or agreed to in writing, software
-#  distributed under the License is distributed on an "AS IS" BASIS,
-#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-#  See the License for the specific language governing permissions and
-#  limitations under the License.
-
-##### Storm #####
-enrichment.workers=1
-enrichment.acker.executors=0
-topology.worker.childopts=
-topology.auto-credentials=
-topology.max.spout.pending=500
-
-##### Kafka #####
-kafka.zk=node1:2181
-kafka.broker=node1:6667
-kafka.security.protocol=PLAINTEXT
-
-# One of EARLIEST, LATEST, UNCOMMITTED_EARLIEST, UNCOMMITTED_LATEST
-kafka.start=UNCOMMITTED_EARLIEST
-
-enrichment.input.topic=enrichments
-enrichment.output.topic=indexing
-enrichment.error.topic=indexing
-threat.intel.error.topic=indexing
-
-##### JoinBolt #####
-enrichment.join.cache.size=100000
-threat.intel.join.cache.size=100000
-
-##### Enrichment #####
-hbase.provider.impl=org.apache.metron.hbase.HTableProvider
-enrichment.simple.hbase.table=enrichment
-enrichment.simple.hbase.cf=t
-enrichment.host.known_hosts=[{"ip":"10.1.128.236", "local":"YES", "type":"webserver", "asset_value" : "important"},\
-{"ip":"10.1.128.237", "local":"UNKNOWN", "type":"unknown", "asset_value" : "important"},\
-{"ip":"10.60.10.254", "local":"YES", "type":"printer", "asset_value" : "important"}]
-
-##### Threat Intel #####
-threat.intel.tracker.table=access_tracker
-threat.intel.tracker.cf=t
-threat.intel.simple.hbase.table=threatintel
-threat.intel.simple.hbase.cf=t
-
-##### Parallelism #####
-kafka.spout.parallelism=1
-enrichment.split.parallelism=1
-enrichment.stellar.parallelism=1
-enrichment.join.parallelism=1
-threat.intel.split.parallelism=1
-threat.intel.stellar.parallelism=1
-threat.intel.join.parallelism=1
-kafka.writer.parallelism=1
diff --git a/metron-platform/metron-enrichment/metron-enrichment-storm/src/main/config/enrichment-splitjoin.properties.j2 b/metron-platform/metron-enrichment/metron-enrichment-storm/src/main/config/enrichment-splitjoin.properties.j2
deleted file mode 100755
index a0b21c9..0000000
--- a/metron-platform/metron-enrichment/metron-enrichment-storm/src/main/config/enrichment-splitjoin.properties.j2
+++ /dev/null
@@ -1,63 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-#
-#   http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#}
-
-##### Storm #####
-enrichment.workers={{enrichment_workers}}
-enrichment.acker.executors={{enrichment_acker_executors}}
-topology.worker.childopts={{enrichment_topology_worker_childopts}}
-topology.auto-credentials={{topology_auto_credentials}}
-topology.max.spout.pending={{enrichment_topology_max_spout_pending}}
-
-##### Kafka #####
-kafka.zk={{zookeeper_quorum}}
-kafka.broker={{kafka_brokers}}
-kafka.security.protocol={{kafka_security_protocol}}
-
-# One of EARLIEST, LATEST, UNCOMMITTED_EARLIEST, UNCOMMITTED_LATEST
-kafka.start={{enrichment_kafka_start}}
-
-enrichment.input.topic={{enrichment_input_topic}}
-enrichment.output.topic={{enrichment_output_topic}}
-enrichment.error.topic={{enrichment_error_topic}}
-threat.intel.error.topic={{threatintel_error_topic}}
-
-##### JoinBolt #####
-enrichment.join.cache.size={{enrichment_join_cache_size}}
-threat.intel.join.cache.size={{threatintel_join_cache_size}}
-
-##### Enrichment #####
-hbase.provider.impl={{enrichment_hbase_provider_impl}}
-enrichment.simple.hbase.table={{enrichment_hbase_table}}
-enrichment.simple.hbase.cf={{enrichment_hbase_cf}}
-enrichment.host.known_hosts={{enrichment_host_known_hosts}}
-
-##### Threat Intel #####
-threat.intel.tracker.table={{threatintel_hbase_table}}
-threat.intel.tracker.cf={{threatintel_hbase_cf}}
-threat.intel.simple.hbase.table={{threatintel_hbase_table}}
-threat.intel.simple.hbase.cf={{threatintel_hbase_cf}}
-
-##### Parallelism #####
-kafka.spout.parallelism={{enrichment_kafka_spout_parallelism}}
-enrichment.split.parallelism={{enrichment_split_parallelism}}
-enrichment.stellar.parallelism={{enrichment_stellar_parallelism}}
-enrichment.join.parallelism={{enrichment_join_parallelism}}
-threat.intel.split.parallelism={{threat_intel_split_parallelism}}
-threat.intel.stellar.parallelism={{threat_intel_stellar_parallelism}}
-threat.intel.join.parallelism={{threat_intel_join_parallelism}}
-kafka.writer.parallelism={{kafka_writer_parallelism}}
diff --git a/metron-platform/metron-enrichment/metron-enrichment-storm/src/main/config/enrichment-unified.properties b/metron-platform/metron-enrichment/metron-enrichment-storm/src/main/config/enrichment.properties
similarity index 100%
rename from metron-platform/metron-enrichment/metron-enrichment-storm/src/main/config/enrichment-unified.properties
rename to metron-platform/metron-enrichment/metron-enrichment-storm/src/main/config/enrichment.properties
diff --git a/metron-platform/metron-enrichment/metron-enrichment-storm/src/main/config/enrichment-unified.properties.j2 b/metron-platform/metron-enrichment/metron-enrichment-storm/src/main/config/enrichment.properties.j2
similarity index 100%
rename from metron-platform/metron-enrichment/metron-enrichment-storm/src/main/config/enrichment-unified.properties.j2
rename to metron-platform/metron-enrichment/metron-enrichment-storm/src/main/config/enrichment.properties.j2
diff --git a/metron-platform/metron-enrichment/metron-enrichment-storm/src/main/flux/enrichment/remote-splitjoin.yaml b/metron-platform/metron-enrichment/metron-enrichment-storm/src/main/flux/enrichment/remote-splitjoin.yaml
deleted file mode 100644
index 8a1bba1..0000000
--- a/metron-platform/metron-enrichment/metron-enrichment-storm/src/main/flux/enrichment/remote-splitjoin.yaml
+++ /dev/null
@@ -1,593 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-#
-#     http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-name: "enrichment"
-config:
-    topology.workers: ${enrichment.workers}
-    topology.acker.executors: ${enrichment.acker.executors}
-    topology.worker.childopts: ${topology.worker.childopts}
-    topology.auto-credentials: ${topology.auto-credentials}
-    topology.max.spout.pending: ${topology.max.spout.pending}
-
-components:
-
-# Enrichment
-    -   id: "stellarEnrichmentAdapter"
-        className: "org.apache.metron.enrichment.adapters.stellar.StellarAdapter"
-        configMethods:
-            -   name: "ofType"
-                args:
-                    - "ENRICHMENT"
-
-    # Any kafka props for the producer go here.
-    -   id: "kafkaWriterProps"
-        className: "java.util.HashMap"
-        configMethods:
-          -   name: "put"
-              args:
-                  - "security.protocol"
-                  - "${kafka.security.protocol}"
-
-    -   id: "stellarEnrichment"
-        className: "org.apache.metron.enrichment.configuration.Enrichment"
-        constructorArgs:
-            -   "stellar"
-            -   ref: "stellarEnrichmentAdapter"
-
-    -   id: "geoEnrichmentAdapter"
-        className: "org.apache.metron.enrichment.adapters.geo.GeoAdapter"
-    -   id: "geoEnrichment"
-        className: "org.apache.metron.enrichment.configuration.Enrichment"
-        constructorArgs:
-            -   "geo"
-            -   ref: "geoEnrichmentAdapter"
-    -   id: "hostEnrichmentAdapter"
-        className: "org.apache.metron.enrichment.adapters.host.HostFromJSONListAdapter"
-        constructorArgs:
-            - '${enrichment.host.known_hosts}'
-    -   id: "hostEnrichment"
-        className: "org.apache.metron.enrichment.configuration.Enrichment"
-        constructorArgs:
-            -   "host"
-            -   ref: "hostEnrichmentAdapter"
-
-    -   id: "simpleHBaseEnrichmentConfig"
-        className: "org.apache.metron.enrichment.adapters.simplehbase.SimpleHBaseConfig"
-        configMethods:
-            -   name: "withProviderImpl"
-                args:
-                    - "${hbase.provider.impl}"
-            -   name: "withHBaseTable"
-                args:
-                    - "${enrichment.simple.hbase.table}"
-            -   name: "withHBaseCF"
-                args:
-                    - "${enrichment.simple.hbase.cf}"
-    -   id: "simpleHBaseEnrichmentAdapter"
-        className: "org.apache.metron.enrichment.adapters.simplehbase.SimpleHBaseAdapter"
-        configMethods:
-           -    name: "withConfig"
-                args:
-                    - ref: "simpleHBaseEnrichmentConfig"
-    -   id: "simpleHBaseEnrichment"
-        className: "org.apache.metron.enrichment.configuration.Enrichment"
-        constructorArgs:
-          -   "hbaseEnrichment"
-          -   ref: "simpleHBaseEnrichmentAdapter"
-    -   id: "enrichments"
-        className: "java.util.ArrayList"
-        configMethods:
-            -   name: "add"
-                args:
-                    - ref: "geoEnrichment"
-            -   name: "add"
-                args:
-                    - ref: "hostEnrichment"
-            -   name: "add"
-                args:
-                    - ref: "simpleHBaseEnrichment"
-            -   name: "add"
-                args:
-                    - ref: "stellarEnrichment"
-
-    #enrichment error
-    -   id: "enrichmentErrorKafkaWriter"
-        className: "org.apache.metron.writer.kafka.KafkaWriter"
-        configMethods:
-            -   name: "withTopic"
-                args:
-                    - "${enrichment.error.topic}"
-            -   name: "withZkQuorum"
-                args:
-                    - "${kafka.zk}"
-            -   name: "withProducerConfigs"
-                args: 
-                    - ref: "kafkaWriterProps"
-
-# Threat Intel
-    -   id: "stellarThreatIntelAdapter"
-        className: "org.apache.metron.enrichment.adapters.stellar.StellarAdapter"
-        configMethods:
-            -   name: "ofType"
-                args:
-                    - "THREAT_INTEL"
-    -   id: "stellarThreatIntelEnrichment"
-        className: "org.apache.metron.enrichment.configuration.Enrichment"
-        constructorArgs:
-            -   "stellar"
-            -   ref: "stellarThreatIntelAdapter"
-    -   id: "simpleHBaseThreatIntelConfig"
-        className: "org.apache.metron.enrichment.adapters.threatintel.ThreatIntelConfig"
-        configMethods:
-            -   name: "withProviderImpl"
-                args:
-                    - "${hbase.provider.impl}"
-            -   name: "withTrackerHBaseTable"
-                args:
-                    - "${threat.intel.tracker.table}"
-            -   name: "withTrackerHBaseCF"
-                args:
-                    - "${threat.intel.tracker.cf}"
-            -   name: "withHBaseTable"
-                args:
-                    - "${threat.intel.simple.hbase.table}"
-            -   name: "withHBaseCF"
-                args:
-                    - "${threat.intel.simple.hbase.cf}"
-    -   id: "simpleHBaseThreatIntelAdapter"
-        className: "org.apache.metron.enrichment.adapters.threatintel.ThreatIntelAdapter"
-        configMethods:
-           -    name: "withConfig"
-                args:
-                    - ref: "simpleHBaseThreatIntelConfig"
-    -   id: "simpleHBaseThreatIntelEnrichment"
-        className: "org.apache.metron.enrichment.configuration.Enrichment"
-        constructorArgs:
-          -   "hbaseThreatIntel"
-          -   ref: "simpleHBaseThreatIntelAdapter"
-
-    -   id: "threatIntels"
-        className: "java.util.ArrayList"
-        configMethods:
-            -   name: "add"
-                args:
-                    - ref: "simpleHBaseThreatIntelEnrichment"
-            -   name: "add"
-                args:
-                    - ref: "stellarThreatIntelEnrichment"
-
-    #threatintel error
-    -   id: "threatIntelErrorKafkaWriter"
-        className: "org.apache.metron.writer.kafka.KafkaWriter"
-        configMethods:
-            -   name: "withTopic"
-                args:
-                    - "${threat.intel.error.topic}"
-            -   name: "withZkQuorum"
-                args:
-                    - "${kafka.zk}"
-            -   name: "withProducerConfigs"
-                args: 
-                    - ref: "kafkaWriterProps"
-#indexing
-    -   id: "kafkaWriter"
-        className: "org.apache.metron.writer.kafka.KafkaWriter"
-        configMethods:
-            -   name: "withTopic"
-                args:
-                    - "${enrichment.output.topic}"
-            -   name: "withZkQuorum"
-                args:
-                    - "${kafka.zk}"
-            -   name: "withProducerConfigs"
-                args: 
-                    - ref: "kafkaWriterProps"
-
-#kafka/zookeeper
-    # Any kafka props for the consumer go here.
-    -   id: "kafkaProps"
-        className: "java.util.HashMap"
-        configMethods:
-          -   name: "put"
-              args:
-                  - "value.deserializer"
-                  - "org.apache.kafka.common.serialization.ByteArrayDeserializer"
-          -   name: "put"
-              args:
-                  - "key.deserializer"
-                  - "org.apache.kafka.common.serialization.ByteArrayDeserializer"
-          -   name: "put"
-              args:
-                  - "group.id"
-                  - "enrichments"
-          -   name: "put"
-              args:
-                  - "security.protocol"
-                  - "${kafka.security.protocol}"
-
-
-  # The fields to pull out of the kafka messages
-    -   id: "fields"
-        className: "java.util.ArrayList"
-        configMethods:
-          -   name: "add"
-              args:
-                  - "value"
-
-    -   id: "kafkaConfig"
-        className: "org.apache.metron.storm.kafka.flux.SimpleStormKafkaBuilder"
-        constructorArgs:
-          - ref: "kafkaProps"
-          # topic name
-          - "${enrichment.input.topic}"
-          - "${kafka.zk}"
-          - ref: "fields"
-        configMethods:
-            -   name: "setFirstPollOffsetStrategy"
-                args:
-                    - "${kafka.start}"
-
-
-spouts:
-    -   id: "kafkaSpout"
-        className: "org.apache.metron.storm.kafka.flux.StormKafkaSpout"
-        constructorArgs:
-            - ref: "kafkaConfig"
-        parallelism: ${kafka.spout.parallelism}
-
-bolts:
-# Enrichment Bolts
-    -   id: "enrichmentSplitBolt"
-        className: "org.apache.metron.enrichment.bolt.EnrichmentSplitterBolt"
-        constructorArgs:
-            - "${kafka.zk}"
-        configMethods:
-            -   name: "withEnrichments"
-                args:
-                    - ref: "enrichments"
-        parallelism: ${enrichment.split.parallelism}
-
-    -   id: "geoEnrichmentBolt"
-        className: "org.apache.metron.enrichment.bolt.GenericEnrichmentBolt"
-        constructorArgs:
-            - "${kafka.zk}"
-        configMethods:
-            -   name: "withEnrichment"
-                args:
-                    - ref: "geoEnrichment"
-            -   name: "withMaxCacheSize"
-                args: [10000]
-            -   name: "withMaxTimeRetain"
-                args: [10]
-
-    -   id: "stellarEnrichmentBolt"
-        className: "org.apache.metron.enrichment.bolt.GenericEnrichmentBolt"
-        constructorArgs:
-            - "${kafka.zk}"
-        configMethods:
-            -   name: "withEnrichment"
-                args:
-                    - ref: "stellarEnrichment"
-            -   name: "withMaxCacheSize"
-                args: [10000]
-            -   name: "withMaxTimeRetain"
-                args: [10]
-        parallelism: ${enrichment.stellar.parallelism}
-
-    -   id: "hostEnrichmentBolt"
-        className: "org.apache.metron.enrichment.bolt.GenericEnrichmentBolt"
-        constructorArgs:
-            - "${kafka.zk}"
-        configMethods:
-            -   name: "withEnrichment"
-                args:
-                    - ref: "hostEnrichment"
-            -   name: "withMaxCacheSize"
-                args: [10000]
-            -   name: "withMaxTimeRetain"
-                args: [10]
-
-    -   id: "simpleHBaseEnrichmentBolt"
-        className: "org.apache.metron.enrichment.bolt.GenericEnrichmentBolt"
-        constructorArgs:
-            - "${kafka.zk}"
-        configMethods:
-            -   name: "withEnrichment"
-                args:
-                    - ref: "simpleHBaseEnrichment"
-            -   name: "withMaxCacheSize"
-                args: [10000]
-            -   name: "withMaxTimeRetain"
-                args: [10]
-
-    -   id: "enrichmentJoinBolt"
-        className: "org.apache.metron.enrichment.bolt.EnrichmentJoinBolt"
-        constructorArgs:
-            - "${kafka.zk}"
-        configMethods:
-            -   name: "withMaxCacheSize"
-                args: [${enrichment.join.cache.size}]
-            -   name: "withMaxTimeRetain"
-                args: [10]
-        parallelism: ${enrichment.join.parallelism}
-
-    -   id: "enrichmentErrorOutputBolt"
-        className: "org.apache.metron.writer.bolt.BulkMessageWriterBolt"
-        constructorArgs:
-            - "${kafka.zk}"
-            - "ENRICHMENT"
-        configMethods:
-            -   name: "withBulkMessageWriter"
-                args:
-                    - ref: "enrichmentErrorKafkaWriter"
-
-
-# Threat Intel Bolts
-    -   id: "threatIntelSplitBolt"
-        className: "org.apache.metron.enrichment.bolt.ThreatIntelSplitterBolt"
-        constructorArgs:
-            - "${kafka.zk}"
-        configMethods:
-            -   name: "withEnrichments"
-                args:
-                    - ref: "threatIntels"
-            -   name: "withMessageFieldName"
-                args: ["message"]
-        parallelism: ${threat.intel.split.parallelism}
-
-    -   id: "simpleHBaseThreatIntelBolt"
-        className: "org.apache.metron.enrichment.bolt.GenericEnrichmentBolt"
-        constructorArgs:
-            - "${kafka.zk}"
-        configMethods:
-            -   name: "withEnrichment"
-                args:
-                    - ref: "simpleHBaseThreatIntelEnrichment"
-            -   name: "withMaxCacheSize"
-                args: [10000]
-            -   name: "withMaxTimeRetain"
-                args: [10]
-    -   id: "stellarThreatIntelBolt"
-        className: "org.apache.metron.enrichment.bolt.GenericEnrichmentBolt"
-        constructorArgs:
-            - "${kafka.zk}"
-        configMethods:
-            -   name: "withEnrichment"
-                args:
-                    - ref: "stellarThreatIntelEnrichment"
-            -   name: "withMaxCacheSize"
-                args: [10000]
-            -   name: "withMaxTimeRetain"
-                args: [10]
-        parallelism: ${threat.intel.stellar.parallelism}
-
-    -   id: "threatIntelJoinBolt"
-        className: "org.apache.metron.enrichment.bolt.ThreatIntelJoinBolt"
-        constructorArgs:
-            - "${kafka.zk}"
-        configMethods:
-            -   name: "withMaxCacheSize"
-                args: [${threat.intel.join.cache.size}]
-            -   name: "withMaxTimeRetain"
-                args: [10]
-        parallelism: ${threat.intel.join.parallelism}
-
-    -   id: "threatIntelErrorOutputBolt"
-        className: "org.apache.metron.writer.bolt.BulkMessageWriterBolt"
-        constructorArgs:
-            - "${kafka.zk}"
-            - "ENRICHMENT"
-        configMethods:
-            -   name: "withBulkMessageWriter"
-                args:
-                    - ref: "threatIntelErrorKafkaWriter"
-
-# Indexing Bolts
-    -   id: "outputBolt"
-        className: "org.apache.metron.writer.bolt.BulkMessageWriterBolt"
-        constructorArgs:
-            - "${kafka.zk}"
-            - "ENRICHMENT"
-        configMethods:
-            -   name: "withBulkMessageWriter"
-                args:
-                    - ref: "kafkaWriter"
-        parallelism: ${kafka.writer.parallelism}
-
-
-streams:
-#parser
-    -   name: "spout -> enrichmentSplit"
-        from: "kafkaSpout"
-        to: "enrichmentSplitBolt"
-        grouping:
-            type: LOCAL_OR_SHUFFLE
-
-#enrichment
-    -   name: "enrichmentSplit -> host"
-        from: "enrichmentSplitBolt"
-        to: "hostEnrichmentBolt"
-        grouping:
-            streamId: "host"
-            type: FIELDS
-            args: ["message"]
-
-    -   name: "enrichmentSplit -> geo"
-        from: "enrichmentSplitBolt"
-        to: "geoEnrichmentBolt"
-        grouping:
-            streamId: "geo"
-            type: FIELDS
-            args: ["message"]
-
-    -   name: "enrichmentSplit -> stellar"
-        from: "enrichmentSplitBolt"
-        to: "stellarEnrichmentBolt"
-        grouping:
-            streamId: "stellar"
-            type: FIELDS
-            args: ["message"]
-
-
-    -   name: "enrichmentSplit -> simpleHBaseEnrichmentBolt"
-        from: "enrichmentSplitBolt"
-        to: "simpleHBaseEnrichmentBolt"
-        grouping:
-            streamId: "hbaseEnrichment"
-            type: FIELDS
-            args: ["message"]
-
-    -   name: "splitter -> join"
-        from: "enrichmentSplitBolt"
-        to: "enrichmentJoinBolt"
-        grouping:
-            streamId: "message"
-            type: FIELDS
-            args: ["key"]
-
-    -   name: "geo -> join"
-        from: "geoEnrichmentBolt"
-        to: "enrichmentJoinBolt"
-        grouping:
-            streamId: "geo"
-            type: FIELDS
-            args: ["key"]
-
-    -   name: "stellar -> join"
-        from: "stellarEnrichmentBolt"
-        to: "enrichmentJoinBolt"
-        grouping:
-            streamId: "stellar"
-            type: FIELDS
-            args: ["key"]
-
-    -   name: "simpleHBaseEnrichmentBolt -> join"
-        from: "simpleHBaseEnrichmentBolt"
-        to: "enrichmentJoinBolt"
-        grouping:
-            streamId: "hbaseEnrichment"
-            type: FIELDS
-            args: ["key"]
-
-    -   name: "host -> join"
-        from: "hostEnrichmentBolt"
-        to: "enrichmentJoinBolt"
-        grouping:
-            streamId: "host"
-            type: FIELDS
-            args: ["key"]
-
-    # Error output
-    -   name: "geoEnrichmentBolt -> enrichmentErrorOutputBolt"
-        from: "geoEnrichmentBolt"
-        to: "enrichmentErrorOutputBolt"
-        grouping:
-            streamId: "error"
-            type: LOCAL_OR_SHUFFLE
-
-    -   name: "stellarEnrichmentBolt -> enrichmentErrorOutputBolt"
-        from: "stellarEnrichmentBolt"
-        to: "enrichmentErrorOutputBolt"
-        grouping:
-            streamId: "error"
-            type: LOCAL_OR_SHUFFLE
-
-    -   name: "hostEnrichmentBolt -> enrichmentErrorOutputBolt"
-        from: "hostEnrichmentBolt"
-        to: "enrichmentErrorOutputBolt"
-        grouping:
-            streamId: "error"
-            type: LOCAL_OR_SHUFFLE
-
-    -   name: "simpleHBaseEnrichmentBolt -> enrichmentErrorOutputBolt"
-        from: "simpleHBaseEnrichmentBolt"
-        to: "enrichmentErrorOutputBolt"
-        grouping:
-            streamId: "error"
-            type: LOCAL_OR_SHUFFLE
-
-#threat intel
-    -   name: "enrichmentJoin -> threatSplit"
-        from: "enrichmentJoinBolt"
-        to: "threatIntelSplitBolt"
-        grouping:
-            streamId: "message"
-            type: FIELDS
-            args: ["key"]
-
-    -   name: "threatSplit -> simpleHBaseThreatIntel"
-        from: "threatIntelSplitBolt"
-        to: "simpleHBaseThreatIntelBolt"
-        grouping:
-            streamId: "hbaseThreatIntel"
-            type: FIELDS
-            args: ["message"]
-
-    -   name: "threatSplit -> stellarThreatIntel"
-        from: "threatIntelSplitBolt"
-        to: "stellarThreatIntelBolt"
-        grouping:
-            streamId: "stellar"
-            type: FIELDS
-            args: ["message"]
-
-
-    -   name: "simpleHBaseThreatIntel -> join"
-        from: "simpleHBaseThreatIntelBolt"
-        to: "threatIntelJoinBolt"
-        grouping:
-            streamId: "hbaseThreatIntel"
-            type: FIELDS
-            args: ["key"]
-
-    -   name: "stellarThreatIntel -> join"
-        from: "stellarThreatIntelBolt"
-        to: "threatIntelJoinBolt"
-        grouping:
-            streamId: "stellar"
-            type: FIELDS
-            args: ["key"]
-
-    -   name: "threatIntelSplit -> threatIntelJoin"
-        from: "threatIntelSplitBolt"
-        to: "threatIntelJoinBolt"
-        grouping:
-            streamId: "message"
-            type: FIELDS
-            args: ["key"]
-#output
-    -   name: "threatIntelJoin -> output"
-        from: "threatIntelJoinBolt"
-        to: "outputBolt"
-        grouping:
-            streamId: "message"
-            type: LOCAL_OR_SHUFFLE
-
-    # Error output
-    -   name: "simpleHBaseThreatIntelBolt -> threatIntelErrorOutputBolt"
-        from: "simpleHBaseThreatIntelBolt"
-        to: "threatIntelErrorOutputBolt"
-        grouping:
-            streamId: "error"
-            type: LOCAL_OR_SHUFFLE
-
-    -   name: "stellarThreatIntelBolt -> threatIntelErrorOutputBolt"
-        from: "stellarThreatIntelBolt"
-        to: "threatIntelErrorOutputBolt"
-        grouping:
-            streamId: "error"
-            type: LOCAL_OR_SHUFFLE
-
diff --git a/metron-platform/metron-enrichment/metron-enrichment-storm/src/main/flux/enrichment/remote-unified.yaml b/metron-platform/metron-enrichment/metron-enrichment-storm/src/main/flux/enrichment/remote.yaml
similarity index 100%
rename from metron-platform/metron-enrichment/metron-enrichment-storm/src/main/flux/enrichment/remote-unified.yaml
rename to metron-platform/metron-enrichment/metron-enrichment-storm/src/main/flux/enrichment/remote.yaml
diff --git a/metron-platform/metron-enrichment/metron-enrichment-storm/src/main/java/org/apache/metron/enrichment/bolt/EnrichmentJoinBolt.java b/metron-platform/metron-enrichment/metron-enrichment-storm/src/main/java/org/apache/metron/enrichment/bolt/EnrichmentJoinBolt.java
deleted file mode 100644
index 671e6b8..0000000
--- a/metron-platform/metron-enrichment/metron-enrichment-storm/src/main/java/org/apache/metron/enrichment/bolt/EnrichmentJoinBolt.java
+++ /dev/null
@@ -1,122 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.metron.enrichment.bolt;
-
-import org.apache.metron.storm.common.message.MessageGetStrategy;
-import org.apache.storm.task.TopologyContext;
-import com.google.common.base.Joiner;
-import org.apache.metron.common.configuration.enrichment.SensorEnrichmentConfig;
-import org.apache.metron.common.configuration.enrichment.handler.ConfigHandler;
-import org.apache.metron.common.utils.MessageUtils;
-import org.apache.storm.tuple.Tuple;
-import org.json.simple.JSONObject;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.*;
-
-public class EnrichmentJoinBolt extends JoinBolt<JSONObject> {
-
-  protected static final Logger LOG = LoggerFactory
-          .getLogger(EnrichmentJoinBolt.class);
-
-  public EnrichmentJoinBolt(String zookeeperUrl) {
-    super(zookeeperUrl);
-  }
-
-  @Override
-  public void prepare(Map map, TopologyContext topologyContext) {
-
-  }
-
-  @Override
-  public Set<String> getStreamIds(JSONObject message) {
-    Set<String> streamIds = new HashSet<>();
-    String sourceType = MessageUtils.getSensorType(message);
-    if(sourceType == null) {
-      String errorMessage = "Unable to find source type for message: " + message;
-      throw new IllegalStateException(errorMessage);
-    }
-    Map<String, Object>  fieldMap = getFieldMap(sourceType);
-    Map<String, ConfigHandler> handlerMap = getFieldToHandlerMap(sourceType);
-    if(fieldMap != null) {
-      for (String enrichmentType : fieldMap.keySet()) {
-        ConfigHandler handler = handlerMap.get(enrichmentType);
-        List<String> subgroups = handler.getType().getSubgroups(handler.getType().toConfig(handler.getConfig()));
-        for(String subgroup : subgroups) {
-          streamIds.add(Joiner.on(":").join(enrichmentType, subgroup));
-        }
-      }
-    }
-    streamIds.add("message:");
-    return streamIds;
-  }
-
-
-  @Override
-  public JSONObject joinMessages(Map<String, Tuple> streamMessageMap, MessageGetStrategy messageGetStrategy) {
-    JSONObject message = new JSONObject();
-    for (String key : streamMessageMap.keySet()) {
-      Tuple tuple = streamMessageMap.get(key);
-      JSONObject obj = (JSONObject) messageGetStrategy.get(tuple);
-      message.putAll(obj);
-    }
-    List<Object> emptyKeys = new ArrayList<>();
-    for(Object key : message.keySet()) {
-      Object value = message.get(key);
-      if(value == null || value.toString().length() == 0) {
-        emptyKeys.add(key);
-      }
-    }
-    for(Object o : emptyKeys) {
-      message.remove(o);
-    }
-    message.put(getClass().getSimpleName().toLowerCase() + ".joiner.ts", "" + System.currentTimeMillis());
-    return  message;
-  }
-
- protected Map<String, ConfigHandler> getFieldToHandlerMap(String sensorType) {
-    if(sensorType != null) {
-      SensorEnrichmentConfig config = getConfigurations().getSensorEnrichmentConfig(sensorType);
-      if (config != null) {
-        return config.getEnrichment().getEnrichmentConfigs();
-      } else {
-        LOG.debug("Unable to retrieve a sensor enrichment config of {}", sensorType);
-      }
-    } else {
-      LOG.error("Trying to retrieve a field map with sensor type of null");
-    }
-    return new HashMap<>();
-  }
-
-  public Map<String, Object> getFieldMap(String sourceType) {
-    if(sourceType != null) {
-      SensorEnrichmentConfig config = getConfigurations().getSensorEnrichmentConfig(sourceType);
-      if (config != null && config.getEnrichment() != null) {
-        return config.getEnrichment().getFieldMap();
-      }
-      else {
-        LOG.debug("Unable to retrieve a sensor enrichment config of {}", sourceType);
-      }
-    }
-    else {
-      LOG.error("Trying to retrieve a field map with source type of null");
-    }
-    return null;
-  }
-}
diff --git a/metron-platform/metron-enrichment/metron-enrichment-storm/src/main/java/org/apache/metron/enrichment/bolt/EnrichmentSplitterBolt.java b/metron-platform/metron-enrichment/metron-enrichment-storm/src/main/java/org/apache/metron/enrichment/bolt/EnrichmentSplitterBolt.java
deleted file mode 100644
index 3298c76..0000000
--- a/metron-platform/metron-enrichment/metron-enrichment-storm/src/main/java/org/apache/metron/enrichment/bolt/EnrichmentSplitterBolt.java
+++ /dev/null
@@ -1,184 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.metron.enrichment.bolt;
-
-import java.io.UnsupportedEncodingException;
-import java.lang.invoke.MethodHandles;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.UUID;
-import org.apache.metron.common.Constants;
-import org.apache.metron.common.configuration.enrichment.SensorEnrichmentConfig;
-import org.apache.metron.common.configuration.enrichment.handler.ConfigHandler;
-import org.apache.metron.common.utils.MessageUtils;
-import org.apache.metron.enrichment.configuration.Enrichment;
-import org.apache.metron.enrichment.utils.EnrichmentUtils;
-import org.apache.storm.task.TopologyContext;
-import org.apache.storm.topology.OutputFieldsDeclarer;
-import org.apache.storm.tuple.Tuple;
-import org.json.simple.JSONObject;
-import org.json.simple.parser.JSONParser;
-import org.json.simple.parser.ParseException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class EnrichmentSplitterBolt extends SplitBolt<JSONObject> {
-
-  protected static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
-  private List<Enrichment> enrichments;
-  protected String messageFieldName;
-  private transient JSONParser parser;
-
-
-  public EnrichmentSplitterBolt(String zookeeperUrl) {
-    super(zookeeperUrl);
-  }
-
-  public EnrichmentSplitterBolt withEnrichments(List<Enrichment> enrichments) {
-    this.enrichments = enrichments;
-    return this;
-  }
-
-  public EnrichmentSplitterBolt withMessageFieldName(String messageFieldName) {
-    this.messageFieldName = messageFieldName;
-    return this;
-  }
-  @Override
-  public void prepare(Map map, TopologyContext topologyContext) {
-    parser = new JSONParser();
-  }
-  @Override
-  public String getKey(Tuple tuple, JSONObject message) {
-    String key = null, guid = null;
-    try {
-      key = tuple.getStringByField("key");
-      guid = (String)message.get(Constants.GUID);
-    }
-    catch(Throwable t) {
-      //swallowing this just in case.
-    }
-    if(key != null) {
-      return key;
-    }
-    else if(guid != null) {
-      return guid;
-    }
-    else {
-      return UUID.randomUUID().toString();
-    }
-  }
-
-  @Override
-  public JSONObject generateMessage(Tuple tuple) {
-    JSONObject message = null;
-    if (messageFieldName == null) {
-      byte[] data = tuple.getBinary(0);
-      try {
-        message = (JSONObject) parser.parse(new String(data, "UTF8"));
-        message.put(getClass().getSimpleName().toLowerCase() + ".splitter.begin.ts", "" + System.currentTimeMillis());
-      } catch (ParseException | UnsupportedEncodingException e) {
-        e.printStackTrace();
-      }
-    } else {
-      message = (JSONObject) tuple.getValueByField(messageFieldName);
-      message.put(getClass().getSimpleName().toLowerCase() + ".splitter.begin.ts", "" + System.currentTimeMillis());
-    }
-    return message;
-  }
-
-  @Override
-  public Set<String> getStreamIds() {
-    Set<String> streamIds = new HashSet<>();
-    for(Enrichment enrichment: enrichments) {
-      streamIds.add(enrichment.getType());
-    }
-    return streamIds;
-  }
-
-  @SuppressWarnings("unchecked")
-  @Override
-  public Map<String, List<JSONObject>> splitMessage(JSONObject message) {
-    Map<String, List<JSONObject>> streamMessageMap = new HashMap<>();
-    String sensorType = MessageUtils.getSensorType(message);
-    Map<String, Object> enrichmentFieldMap = getFieldMap(sensorType);
-    Map<String, ConfigHandler> fieldToHandler = getFieldToHandlerMap(sensorType);
-    Set<String> enrichmentTypes = new HashSet<>(enrichmentFieldMap.keySet());
-    enrichmentTypes.addAll(fieldToHandler.keySet());
-    for (String enrichmentType : enrichmentTypes) {
-      Object fields = enrichmentFieldMap.get(enrichmentType);
-      ConfigHandler retriever = fieldToHandler.get(enrichmentType);
-
-      List<JSONObject> enrichmentObject = retriever.getType()
-              .splitByFields( message
-                      , fields
-                      , field -> getKeyName(enrichmentType, field)
-                      , retriever
-              );
-      for(JSONObject eo : enrichmentObject) {
-        eo.put(Constants.SENSOR_TYPE, sensorType);
-      }
-      streamMessageMap.put(enrichmentType, enrichmentObject);
-    }
-    message.put(getClass().getSimpleName().toLowerCase() + ".splitter.end.ts", "" + System.currentTimeMillis());
-    return streamMessageMap;
-  }
-
-  protected Map<String, ConfigHandler> getFieldToHandlerMap(String sensorType) {
-    if(sensorType != null) {
-      SensorEnrichmentConfig config = getConfigurations().getSensorEnrichmentConfig(sensorType);
-      if (config != null) {
-        return config.getEnrichment().getEnrichmentConfigs();
-      } else {
-        LOG.debug("Unable to retrieve a sensor enrichment config of {}", sensorType);
-      }
-    } else {
-      LOG.error("Trying to retrieve a field map with sensor type of null");
-    }
-    return new HashMap<>();
-  }
-  protected Map<String, Object > getFieldMap(String sensorType) {
-    if(sensorType != null) {
-      SensorEnrichmentConfig config = getConfigurations().getSensorEnrichmentConfig(sensorType);
-      if (config != null) {
-        return config.getEnrichment().getFieldMap();
-      } else {
-        LOG.debug("Unable to retrieve a sensor enrichment config of {}", sensorType);
-      }
-    } else {
-      LOG.error("Trying to retrieve a field map with sensor type of null");
-    }
-    return new HashMap<>();
-  }
-
-  protected String getKeyName(String type, String field) {
-    return EnrichmentUtils.getEnrichmentKey(type, field);
-  }
-
-  @Override
-  public void declareOther(OutputFieldsDeclarer declarer) {
-
-  }
-
-  @Override
-  public void emitOther(Tuple tuple, JSONObject message) {
-
-  }
-}
diff --git a/metron-platform/metron-enrichment/metron-enrichment-storm/src/main/java/org/apache/metron/enrichment/bolt/JoinBolt.java b/metron-platform/metron-enrichment/metron-enrichment-storm/src/main/java/org/apache/metron/enrichment/bolt/JoinBolt.java
deleted file mode 100644
index ac6a1cf..0000000
--- a/metron-platform/metron-enrichment/metron-enrichment-storm/src/main/java/org/apache/metron/enrichment/bolt/JoinBolt.java
+++ /dev/null
@@ -1,189 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.metron.enrichment.bolt;
-
-import com.github.benmanes.caffeine.cache.CacheLoader;
-import com.github.benmanes.caffeine.cache.Caffeine;
-import com.github.benmanes.caffeine.cache.LoadingCache;
-import com.github.benmanes.caffeine.cache.RemovalCause;
-import com.github.benmanes.caffeine.cache.RemovalListener;
-import com.google.common.base.Joiner;
-import com.google.common.collect.Sets;
-import java.lang.invoke.MethodHandles;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.TimeUnit;
-import org.apache.metron.common.Constants;
-import org.apache.metron.storm.common.bolt.ConfiguredEnrichmentBolt;
-import org.apache.metron.common.error.MetronError;
-import org.apache.metron.storm.common.message.MessageGetStrategy;
-import org.apache.metron.storm.common.message.MessageGetters;
-import org.apache.metron.common.performance.PerformanceLogger;
-import org.apache.metron.storm.common.utils.StormErrorUtils;
-import org.apache.storm.task.OutputCollector;
-import org.apache.storm.task.TopologyContext;
-import org.apache.storm.topology.OutputFieldsDeclarer;
-import org.apache.storm.tuple.Fields;
-import org.apache.storm.tuple.Tuple;
-import org.apache.storm.tuple.Values;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import javax.annotation.Nonnull;
-import javax.annotation.Nullable;
-
-public abstract class JoinBolt<V> extends ConfiguredEnrichmentBolt {
-
-  public static class Perf {} // used for performance logging
-  private PerformanceLogger perfLog; // not static bc multiple bolts may exist in same worker
-  private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
-  protected OutputCollector collector;
-
-  protected transient CacheLoader<String, Map<String, Tuple>> loader;
-  protected transient LoadingCache<String, Map<String, Tuple>> cache;
-  protected transient MessageGetStrategy keyGetStrategy;
-  protected transient MessageGetStrategy subgroupGetStrategy;
-  protected transient MessageGetStrategy messageGetStrategy;
-  protected Long maxCacheSize;
-  protected Long maxTimeRetain;
-
-  public JoinBolt(String zookeeperUrl) {
-    super(zookeeperUrl);
-  }
-
-  public JoinBolt withMaxCacheSize(long maxCacheSize) {
-    this.maxCacheSize = maxCacheSize;
-    return this;
-  }
-
-  public JoinBolt withMaxTimeRetain(long maxTimeRetain) {
-    this.maxTimeRetain = maxTimeRetain;
-    return this;
-  }
-
-  @Override
-  public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {
-    super.prepare(map, topologyContext, outputCollector);
-    perfLog = new PerformanceLogger(() -> getConfigurations().getGlobalConfig(), Perf.class.getName());
-    keyGetStrategy = MessageGetters.OBJECT_FROM_FIELD.get("key");
-    subgroupGetStrategy = MessageGetters.OBJECT_FROM_FIELD.get("subgroup");
-    messageGetStrategy = MessageGetters.OBJECT_FROM_FIELD.get("message");
-    this.collector = outputCollector;
-    if (this.maxCacheSize == null) {
-      throw new IllegalStateException("maxCacheSize must be specified");
-    }
-    if (this.maxTimeRetain == null) {
-      throw new IllegalStateException("maxTimeRetain must be specified");
-    }
-    loader = s -> new HashMap<>();
-    cache = Caffeine.newBuilder().maximumSize(maxCacheSize)
-                         .expireAfterWrite(maxTimeRetain, TimeUnit.MINUTES)
-                         .removalListener(new JoinRemoveListener())
-                         .build(loader);
-    prepare(map, topologyContext);
-  }
-
-  class JoinRemoveListener implements RemovalListener<String, Map<String, Tuple>> {
-
-    @Override
-    public void onRemoval(@Nullable String s, @Nullable Map<String, Tuple> stringTupleMap, @Nonnull RemovalCause removalCause) {
-      if (removalCause == RemovalCause.SIZE) {
-        String errorMessage = "Join cache reached max size limit. Increase the maxCacheSize setting or add more tasks to enrichment/threatintel join bolt.";
-        Exception exception = new Exception(errorMessage);
-        LOG.error(errorMessage, exception);
-        collector.reportError(exception);
-      }
-      if (removalCause == RemovalCause.EXPIRED) {
-        String errorMessage = "Message was in the join cache too long which may be caused by slow enrichments/threatintels.  Increase the maxTimeRetain setting.";
-        Exception exception = new Exception(errorMessage);
-        LOG.error(errorMessage, exception);
-        collector.reportError(exception);
-      }
-    }
-  }
-
-  @SuppressWarnings("unchecked")
-  @Override
-  public void execute(Tuple tuple) {
-    perfLog.mark("execute");
-    String streamId = tuple.getSourceStreamId();
-    String key = (String) keyGetStrategy.get(tuple);
-    String subgroup = (String) subgroupGetStrategy.get(tuple);
-    streamId = Joiner.on(":").join("" + streamId, subgroup == null?"":subgroup);
-    V message = (V) messageGetStrategy.get(tuple);
-    try {
-      Map<String, Tuple> streamMessageMap = cache.get(key);
-      if (streamMessageMap.containsKey(streamId)) {
-        LOG.warn("Received key {} twice for stream {}", key, streamId);
-      }
-      streamMessageMap.put(streamId, tuple);
-      Set<String> streamIds = getStreamIds(message);
-      Set<String> streamMessageKeys = streamMessageMap.keySet();
-      if ( streamMessageKeys.size() == streamIds.size()
-        && Sets.symmetricDifference(streamMessageKeys, streamIds)
-               .isEmpty()
-         ) {
-
-        perfLog.mark("join-message");
-        V joinedMessages = joinMessages(streamMessageMap, this.messageGetStrategy);
-        perfLog.log("join-message", "key={}, elapsed time to join messages", key);
-
-        perfLog.mark("emit-message");
-        collector.emit("message",
-                       tuple,
-                       new Values(key, joinedMessages));
-        perfLog.log("emit-message", "key={}, elapsed time to emit messages", key);
-
-        cache.invalidate(key);
-        Tuple messageTuple = streamMessageMap.get("message:");
-        collector.ack(messageTuple);
-        LOG.trace("Emitted message for key: {}", key);
-      } else {
-        cache.put(key, streamMessageMap);
-        if(LOG.isDebugEnabled()) {
-          LOG.debug("{}: Missed joining portions for {}. Expected {} != {}",
-              getClass().getSimpleName(), key, Joiner.on(",").join(streamIds),
-              Joiner.on(",").join(streamMessageKeys));
-        }
-      }
-    } catch (Exception e) {
-      LOG.error("[Metron] Unable to join messages: {}", message, e);
-      MetronError error = new MetronError()
-              .withErrorType(Constants.ErrorType.ENRICHMENT_ERROR)
-              .withMessage("Joining problem: " + message)
-              .withThrowable(e)
-              .addRawMessage(message);
-      StormErrorUtils.handleError(collector, error);
-      collector.ack(tuple);
-    }
-    perfLog.log("execute", "key={}, elapsed time to run execute", key);
-  }
-
-  @Override
-  public void declareOutputFields(OutputFieldsDeclarer declarer) {
-    declarer.declareStream("message", new Fields("key", "message"));
-    declarer.declareStream("error", new Fields("message"));
-  }
-
-  public abstract void prepare(Map map, TopologyContext topologyContext);
-
-  public abstract Set<String> getStreamIds(V value);
-
-  public abstract V joinMessages(Map<String, Tuple> streamMessageMap, MessageGetStrategy messageGetStrategy);
-}
diff --git a/metron-platform/metron-enrichment/metron-enrichment-storm/src/main/java/org/apache/metron/enrichment/bolt/SplitBolt.java b/metron-platform/metron-enrichment/metron-enrichment-storm/src/main/java/org/apache/metron/enrichment/bolt/SplitBolt.java
deleted file mode 100644
index 48cec0b..0000000
--- a/metron-platform/metron-enrichment/metron-enrichment-storm/src/main/java/org/apache/metron/enrichment/bolt/SplitBolt.java
+++ /dev/null
@@ -1,113 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.metron.enrichment.bolt;
-
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import org.apache.metron.storm.common.bolt.ConfiguredEnrichmentBolt;
-import org.apache.metron.common.performance.PerformanceLogger;
-import org.apache.storm.task.OutputCollector;
-import org.apache.storm.task.TopologyContext;
-import org.apache.storm.topology.OutputFieldsDeclarer;
-import org.apache.storm.tuple.Fields;
-import org.apache.storm.tuple.Tuple;
-import org.apache.storm.tuple.Values;
-
-public abstract class SplitBolt<T extends Cloneable> extends ConfiguredEnrichmentBolt {
-  public static class Perf {} // used for performance logging
-  private PerformanceLogger perfLog; // not static bc multiple bolts may exist in same worker
-
-  protected OutputCollector collector;
-
-  public SplitBolt(String zookeeperUrl) {
-    super(zookeeperUrl);
-  }
-
-  @Override
-  public final void prepare(Map map, TopologyContext topologyContext,
-                       OutputCollector outputCollector) {
-    super.prepare(map, topologyContext, outputCollector);
-    perfLog = new PerformanceLogger(() -> getConfigurations().getGlobalConfig(), Perf.class.getName());
-    collector = outputCollector;
-    prepare(map, topologyContext);
-  }
-
-  @Override
-  public final void execute(Tuple tuple) {
-    emit(tuple, generateMessage(tuple));
-  }
-
-  @Override
-  public final void declareOutputFields(OutputFieldsDeclarer declarer) {
-    declarer.declareStream("message", new Fields("key", "message", "subgroup"));
-    for (String streamId : getStreamIds()) {
-      declarer.declareStream(streamId, new Fields("key", "message"));
-    }
-    declarer.declareStream("error", new Fields("message"));
-    declareOther(declarer);
-  }
-
-  public void emit(Tuple tuple, T message) {
-    perfLog.mark("emit");
-    if (message == null) return;
-    String key = getKey(tuple, message);
-
-    perfLog.mark("split-message");
-    Map<String, List<T>> streamMessageMap = splitMessage(message);
-    perfLog.log("split-message", "key={}, elapsed time to split message", key);
-
-    for (String streamId : streamMessageMap.keySet()) {
-      List<T> streamMessages = streamMessageMap.get(streamId);
-      if (streamMessages != null) {
-        for (T streamMessage : streamMessages) {
-          if (streamMessage == null) {
-            streamMessage = getDefaultMessage(streamId);
-          }
-          collector.emit(streamId, new Values(key, streamMessage));
-        }
-      } else {
-        throw new IllegalArgumentException("Enrichment must send some list of messages, not null.");
-      }
-    }
-    collector.emit("message", tuple, new Values(key, message, ""));
-    collector.ack(tuple);
-    emitOther(tuple, message);
-    perfLog.log("emit", "key={}, elapsed time to run emit", key);
-  }
-
-  protected T getDefaultMessage(String streamId) {
-    throw new IllegalArgumentException("Could not find a message for stream: " + streamId);
-  }
-
-  public abstract void prepare(Map map, TopologyContext topologyContext);
-
-  public abstract Set<String> getStreamIds();
-
-  public abstract String getKey(Tuple tuple, T message);
-
-  public abstract T generateMessage(Tuple tuple);
-
-  public abstract Map<String, List<T>> splitMessage(T message);
-
-  public abstract void declareOther(OutputFieldsDeclarer declarer);
-
-  public abstract void emitOther(Tuple tuple, T message);
-
-
-}
diff --git a/metron-platform/metron-enrichment/metron-enrichment-storm/src/main/java/org/apache/metron/enrichment/bolt/ThreatIntelJoinBolt.java b/metron-platform/metron-enrichment/metron-enrichment-storm/src/main/java/org/apache/metron/enrichment/bolt/ThreatIntelJoinBolt.java
deleted file mode 100644
index 2b19375..0000000
--- a/metron-platform/metron-enrichment/metron-enrichment-storm/src/main/java/org/apache/metron/enrichment/bolt/ThreatIntelJoinBolt.java
+++ /dev/null
@@ -1,123 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.metron.enrichment.bolt;
-
-import java.lang.invoke.MethodHandles;
-import java.util.HashMap;
-import java.util.Map;
-import org.apache.metron.common.configuration.ConfigurationType;
-import org.apache.metron.common.configuration.enrichment.SensorEnrichmentConfig;
-import org.apache.metron.common.configuration.enrichment.handler.ConfigHandler;
-import org.apache.metron.storm.common.message.MessageGetStrategy;
-import org.apache.metron.common.utils.MessageUtils;
-import org.apache.metron.enrichment.adapters.maxmind.asn.GeoLiteAsnDatabase;
-import org.apache.metron.enrichment.adapters.maxmind.geo.GeoLiteCityDatabase;
-import org.apache.metron.enrichment.utils.ThreatIntelUtils;
-import org.apache.metron.stellar.dsl.Context;
-import org.apache.metron.stellar.dsl.StellarFunctions;
-import org.apache.metron.stellar.dsl.functions.resolver.FunctionResolver;
-import org.apache.storm.task.TopologyContext;
-import org.apache.storm.tuple.Tuple;
-import org.json.simple.JSONObject;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class ThreatIntelJoinBolt extends EnrichmentJoinBolt {
-
-  protected static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
-
-
-  /**
-   * The Stellar function resolver.
-   */
-  private FunctionResolver functionResolver;
-
-  /**
-   * The execution context for Stellar.
-   */
-  private Context stellarContext;
-
-  public ThreatIntelJoinBolt(String zookeeperUrl) {
-    super(zookeeperUrl);
-  }
-
-  @Override
-  protected Map<String, ConfigHandler> getFieldToHandlerMap(String sensorType) {
-    if(sensorType != null) {
-      SensorEnrichmentConfig config = getConfigurations().getSensorEnrichmentConfig(sensorType);
-      if (config != null) {
-        return config.getThreatIntel().getEnrichmentConfigs();
-      } else {
-        LOG.debug("Unable to retrieve a sensor enrichment config of {}", sensorType);
-      }
-    } else {
-      LOG.error("Trying to retrieve a field map with sensor type of null");
-    }
-    return new HashMap<>();
-  }
-
-  @Override
-  public void prepare(Map map, TopologyContext topologyContext) {
-    super.prepare(map, topologyContext);
-    GeoLiteCityDatabase.INSTANCE.update((String)getConfigurations().getGlobalConfig().get(
-        GeoLiteCityDatabase.GEO_HDFS_FILE));
-    GeoLiteAsnDatabase.INSTANCE.update((String)getConfigurations().getGlobalConfig().get(
-        GeoLiteAsnDatabase.ASN_HDFS_FILE));
-    initializeStellar();
-  }
-
-  protected void initializeStellar() {
-    this.stellarContext = new Context.Builder()
-                                .with(Context.Capabilities.ZOOKEEPER_CLIENT, () -> client)
-                                .with(Context.Capabilities.GLOBAL_CONFIG, () -> getConfigurations().getGlobalConfig())
-                                .with(Context.Capabilities.STELLAR_CONFIG, () -> getConfigurations().getGlobalConfig())
-                                .build();
-    StellarFunctions.initialize(stellarContext);
-    this.functionResolver = StellarFunctions.FUNCTION_RESOLVER();
-  }
-
-  @Override
-  public Map<String, Object> getFieldMap(String sourceType) {
-    SensorEnrichmentConfig config = getConfigurations().getSensorEnrichmentConfig(sourceType);
-    if(config != null) {
-      return config.getThreatIntel().getFieldMap();
-    }
-    else {
-      LOG.debug("Unable to retrieve sensor config: {}", sourceType);
-      return null;
-    }
-  }
-
-
-  @Override
-  public JSONObject joinMessages(Map<String, Tuple> streamMessageMap, MessageGetStrategy messageGetStrategy) {
-    JSONObject ret = super.joinMessages(streamMessageMap, messageGetStrategy);
-    String sourceType = MessageUtils.getSensorType(ret);
-    return ThreatIntelUtils.triage(ret, getConfigurations().getSensorEnrichmentConfig(sourceType), functionResolver, stellarContext);
-  }
-
-  @Override
-  public void reloadCallback(String name, ConfigurationType type) {
-    super.reloadCallback(name, type);
-    if(type == ConfigurationType.GLOBAL) {
-      GeoLiteCityDatabase.INSTANCE.updateIfNecessary(getConfigurations().getGlobalConfig());
-    }
-  }
-
-
-}
diff --git a/metron-platform/metron-enrichment/metron-enrichment-storm/src/main/java/org/apache/metron/enrichment/bolt/ThreatIntelSplitterBolt.java b/metron-platform/metron-enrichment/metron-enrichment-storm/src/main/java/org/apache/metron/enrichment/bolt/ThreatIntelSplitterBolt.java
deleted file mode 100644
index 76c65c6..0000000
--- a/metron-platform/metron-enrichment/metron-enrichment-storm/src/main/java/org/apache/metron/enrichment/bolt/ThreatIntelSplitterBolt.java
+++ /dev/null
@@ -1,67 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.metron.enrichment.bolt;
-
-import org.apache.metron.common.configuration.enrichment.SensorEnrichmentConfig;
-import org.apache.metron.common.configuration.enrichment.handler.ConfigHandler;
-import org.apache.metron.enrichment.utils.ThreatIntelUtils;
-
-import java.util.HashMap;
-import java.util.Map;
-
-public class ThreatIntelSplitterBolt extends EnrichmentSplitterBolt {
-
-  public ThreatIntelSplitterBolt(String zookeeperUrl) {
-    super(zookeeperUrl);
-  }
-
-  @Override
-  protected Map<String, ConfigHandler> getFieldToHandlerMap(String sensorType) {
-    if(sensorType != null) {
-      SensorEnrichmentConfig config = getConfigurations().getSensorEnrichmentConfig(sensorType);
-      if (config != null) {
-        return config.getThreatIntel().getEnrichmentConfigs();
-      } else {
-        LOG.debug("Unable to retrieve a sensor config of {}", sensorType);
-      }
-    } else {
-      LOG.error("Trying to retrieve a field map with sensor type of null");
-    }
-    return new HashMap<>();
-  }
-
-  @Override
-  protected Map<String, Object> getFieldMap(String sensorType) {
-    if (sensorType != null) {
-      SensorEnrichmentConfig config = getConfigurations().getSensorEnrichmentConfig(sensorType);
-      if (config != null) {
-        return config.getThreatIntel().getFieldMap();
-      } else {
-        LOG.debug("Unable to retrieve sensor config: {}", sensorType);
-      }
-    } else {
-      LOG.error("Trying to retrieve a field map with sensor type of null");
-    }
-    return new HashMap<>();
-  }
-
-  @Override
-  protected String getKeyName(String type, String field) {
-    return ThreatIntelUtils.getThreatIntelKey(type, field);
-  }
-}
diff --git a/metron-platform/metron-enrichment/metron-enrichment-storm/src/main/scripts/start_enrichment_topology.sh b/metron-platform/metron-enrichment/metron-enrichment-storm/src/main/scripts/start_enrichment_topology.sh
index d3ed8ad..d9c2dc4 100755
--- a/metron-platform/metron-enrichment/metron-enrichment-storm/src/main/scripts/start_enrichment_topology.sh
+++ b/metron-platform/metron-enrichment/metron-enrichment-storm/src/main/scripts/start_enrichment_topology.sh
@@ -19,12 +19,9 @@
 METRON_VERSION=${project.version}
 METRON_HOME=/usr/metron/$METRON_VERSION
 TOPOLOGY_JAR=${project.artifactId}-$METRON_VERSION-uber.jar
-
-# There are two enrichment topologies. By default, the unified enrichment topology is executed. Split-join is now deprecated.
-SPLIT_JOIN_ARGS="--remote $METRON_HOME/flux/enrichment/remote-splitjoin.yaml --filter $METRON_HOME/config/enrichment-splitjoin.properties"
-UNIFIED_ARGS="--remote $METRON_HOME/flux/enrichment/remote-unified.yaml --filter $METRON_HOME/config/enrichment-unified.properties"
+DEFAULT_ARGS="--remote $METRON_HOME/flux/enrichment/remote.yaml --filter $METRON_HOME/config/enrichment.properties"
 
 # by passing in different args, the user can execute an alternative enrichment topology
-ARGS=${@:-$UNIFIED_ARGS}
+ARGS=${@:-$DEFAULT_ARGS}
 
 storm jar $METRON_HOME/lib/$TOPOLOGY_JAR org.apache.storm.flux.Flux $ARGS
diff --git a/metron-platform/metron-enrichment/metron-enrichment-storm/src/test/java/org/apache/metron/enrichment/bolt/EnrichmentJoinBoltTest.java b/metron-platform/metron-enrichment/metron-enrichment-storm/src/test/java/org/apache/metron/enrichment/bolt/EnrichmentJoinBoltTest.java
deleted file mode 100644
index ed623f3..0000000
--- a/metron-platform/metron-enrichment/metron-enrichment-storm/src/test/java/org/apache/metron/enrichment/bolt/EnrichmentJoinBoltTest.java
+++ /dev/null
@@ -1,96 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.metron.enrichment.bolt;
-
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
-import java.io.FileInputStream;
-import java.io.IOException;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Set;
-import org.adrianwalker.multilinestring.Multiline;
-import org.apache.metron.storm.common.message.MessageGetStrategy;
-import org.apache.metron.test.bolt.BaseEnrichmentBoltTest;
-import org.apache.storm.tuple.Tuple;
-import org.json.simple.JSONObject;
-import org.json.simple.parser.JSONParser;
-import org.json.simple.parser.ParseException;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Test;
-
-public class EnrichmentJoinBoltTest extends BaseEnrichmentBoltTest {
-
-  private static final String enrichmentConfigPath = "../" + sampleSensorEnrichmentConfigPath;
-
-  /**
-   * {
-   * "enrichedField": "enrichedValue",
-   * "emptyEnrichedField": ""
-   * }
-   */
-  @Multiline
-  private String enrichedMessageString;
-
-  /**
-   * {
-   * "ip_src_addr": "ip1",
-   * "ip_dst_addr": "ip2",
-   * "source.type": "test",
-   * "enrichedField": "enrichedValue"
-   * }
-   */
-  @Multiline
-  private String expectedJoinedMessageString;
-
-  private JSONObject enrichedMessage;
-  private JSONObject expectedJoinedMessage;
-
-  @Before
-  public void parseMessages() throws ParseException {
-    JSONParser parser = new JSONParser();
-    enrichedMessage = (JSONObject) parser.parse(enrichedMessageString);
-    expectedJoinedMessage = (JSONObject) parser.parse(expectedJoinedMessageString);
-  }
-
-  @Test
-  public void test() throws IOException {
-    EnrichmentJoinBolt enrichmentJoinBolt = new EnrichmentJoinBolt("zookeeperUrl");
-    enrichmentJoinBolt.setCuratorFramework(client);
-    enrichmentJoinBolt.setZKCache(cache);
-    enrichmentJoinBolt.getConfigurations().updateSensorEnrichmentConfig(sensorType, new FileInputStream(enrichmentConfigPath));
-    enrichmentJoinBolt.withMaxCacheSize(100);
-    enrichmentJoinBolt.withMaxTimeRetain(10000);
-    enrichmentJoinBolt.prepare(new HashMap<>(), topologyContext, outputCollector);
-    Set<String> actualStreamIds = enrichmentJoinBolt.getStreamIds(sampleMessage);
-    Assert.assertEquals(joinStreamIds, actualStreamIds);
-    Map<String, Tuple> streamMessageMap = new HashMap<>();
-    MessageGetStrategy messageGetStrategy = mock(MessageGetStrategy.class);
-    Tuple sampleTuple = mock(Tuple.class);
-    when(messageGetStrategy.get(sampleTuple)).thenReturn(sampleMessage);
-    Tuple enrichedTuple = mock(Tuple.class);
-    when(messageGetStrategy.get(enrichedTuple)).thenReturn(enrichedMessage);
-    streamMessageMap.put("message", sampleTuple);
-    streamMessageMap.put("enriched", enrichedTuple);
-    JSONObject joinedMessage = enrichmentJoinBolt.joinMessages(streamMessageMap, messageGetStrategy);
-    removeTimingFields(joinedMessage);
-    Assert.assertEquals(expectedJoinedMessage, joinedMessage);
-  }
-}
diff --git a/metron-platform/metron-enrichment/metron-enrichment-storm/src/test/java/org/apache/metron/enrichment/bolt/EnrichmentSplitterBoltTest.java b/metron-platform/metron-enrichment/metron-enrichment-storm/src/test/java/org/apache/metron/enrichment/bolt/EnrichmentSplitterBoltTest.java
deleted file mode 100644
index 1e9177b..0000000
--- a/metron-platform/metron-enrichment/metron-enrichment-storm/src/test/java/org/apache/metron/enrichment/bolt/EnrichmentSplitterBoltTest.java
+++ /dev/null
@@ -1,106 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.metron.enrichment.bolt;
-
-import static org.mockito.Mockito.when;
-
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableSet;
-import java.io.FileInputStream;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import org.apache.metron.enrichment.configuration.Enrichment;
-import org.apache.metron.test.bolt.BaseEnrichmentBoltTest;
-import org.json.simple.JSONObject;
-import org.json.simple.parser.ParseException;
-import org.junit.Assert;
-import org.junit.Test;
-
-
-public class EnrichmentSplitterBoltTest extends BaseEnrichmentBoltTest {
-
-  private static final String enrichmentConfigPath = "../" + sampleSensorEnrichmentConfigPath;
-
-  @Test
-  public void test() throws ParseException, IOException {
-    final Enrichment geo = new Enrichment();
-    geo.setType("geo");
-    final Enrichment host = new Enrichment();
-    host.setType("host");
-    final Enrichment hbaseEnrichment = new Enrichment();
-    hbaseEnrichment.setType("hbaseEnrichment");
-    final Enrichment stellarEnrichment = new Enrichment();
-    stellarEnrichment.setType("stellar");
-    List<Enrichment> enrichments = new ArrayList<Enrichment>() {{
-      add(geo);
-      add(host);
-      add(hbaseEnrichment);
-      add(stellarEnrichment);
-    }};
-
-    EnrichmentSplitterBolt enrichmentSplitterBolt = new EnrichmentSplitterBolt("zookeeperUrl").withEnrichments(enrichments);
-    enrichmentSplitterBolt.setCuratorFramework(client);
-    enrichmentSplitterBolt.setZKCache(cache);
-    enrichmentSplitterBolt.getConfigurations().updateSensorEnrichmentConfig(sensorType, new FileInputStream(enrichmentConfigPath));
-    enrichmentSplitterBolt.prepare(new HashMap<>(), topologyContext, outputCollector);
-
-    String key = enrichmentSplitterBolt.getKey(tuple, sampleMessage);
-    Assert.assertTrue(key != null && key.length() == 36);
-    String someKey = "someKey";
-    when(tuple.getStringByField("key")).thenReturn(someKey);
-    key = enrichmentSplitterBolt.getKey(tuple, sampleMessage);
-    Assert.assertEquals(someKey, key);
-    String guid = "sample-guid";
-    when(sampleMessage.get("guid")).thenReturn(guid);
-    key = enrichmentSplitterBolt.getKey(tuple, sampleMessage);
-    Assert.assertEquals(guid, key);
-    when(tuple.getBinary(0)).thenReturn(sampleMessageString.getBytes());
-    JSONObject generatedMessage = enrichmentSplitterBolt.generateMessage(tuple);
-    removeTimingFields(generatedMessage);
-    Assert.assertEquals(sampleMessage, generatedMessage);
-    String messageFieldName = "messageFieldName";
-    enrichmentSplitterBolt.withMessageFieldName(messageFieldName);
-    when(tuple.getValueByField(messageFieldName)).thenReturn(sampleMessage);
-    generatedMessage = enrichmentSplitterBolt.generateMessage(tuple);
-    Assert.assertEquals(sampleMessage, generatedMessage);
-    Set<String> actualStreamIds = enrichmentSplitterBolt.getStreamIds();
-    Assert.assertEquals(streamIds, actualStreamIds);
-
-    Map<String, List<JSONObject> > actualSplitMessages = enrichmentSplitterBolt.splitMessage(sampleMessage);
-    Assert.assertEquals(enrichments.size(), actualSplitMessages.size());
-    Assert.assertEquals(ImmutableList.of(geoMessage), actualSplitMessages.get("geo"));
-    Assert.assertEquals(ImmutableList.of(hostMessage), actualSplitMessages.get("host"));
-    Assert.assertEquals(ImmutableList.of(hbaseEnrichmentMessage), actualSplitMessages.get("hbaseEnrichment"));
-
-
-  }
-
-  @Override
-  public void removeTimingFields(JSONObject message) {
-    ImmutableSet keys = ImmutableSet.copyOf(message.keySet());
-    for(Object key: keys) {
-      if (key.toString().contains("splitter.begin.ts")) {
-        message.remove(key);
-      }
-    }
-  }
-}
diff --git a/metron-platform/metron-enrichment/metron-enrichment-storm/src/test/java/org/apache/metron/enrichment/bolt/JoinBoltTest.java b/metron-platform/metron-enrichment/metron-enrichment-storm/src/test/java/org/apache/metron/enrichment/bolt/JoinBoltTest.java
deleted file mode 100644
index 8bdf409..0000000
--- a/metron-platform/metron-enrichment/metron-enrichment-storm/src/test/java/org/apache/metron/enrichment/bolt/JoinBoltTest.java
+++ /dev/null
@@ -1,193 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.metron.enrichment.bolt;
-
-import com.github.benmanes.caffeine.cache.LoadingCache;
-import org.adrianwalker.multilinestring.Multiline;
-import org.apache.metron.common.Constants;
-import org.apache.metron.common.error.MetronError;
-import org.apache.metron.storm.common.message.MessageGetStrategy;
-import org.apache.metron.test.bolt.BaseEnrichmentBoltTest;
-import org.apache.metron.test.error.MetronErrorJSONMatcher;
-import org.apache.storm.task.TopologyContext;
-import org.apache.storm.tuple.Tuple;
-import org.apache.storm.tuple.Values;
-import org.json.simple.JSONObject;
-import org.json.simple.parser.JSONParser;
-import org.json.simple.parser.ParseException;
-import org.junit.Before;
-import org.junit.Test;
-
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.ExecutionException;
-
-import static org.junit.Assert.fail;
-import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.argThat;
-import static org.mockito.Matchers.eq;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.verifyNoMoreInteractions;
-import static org.mockito.Mockito.when;
-
-public class JoinBoltTest extends BaseEnrichmentBoltTest {
-
-  public class StandAloneJoinBolt extends JoinBolt<JSONObject> {
-
-    public StandAloneJoinBolt(String zookeeperUrl) {
-      super(zookeeperUrl);
-    }
-
-    @Override
-    public void prepare(Map map, TopologyContext topologyContext) {
-
-    }
-
-    @Override
-    public Set<String> getStreamIds(JSONObject value) {
-      HashSet<String> ret = new HashSet<>();
-      for(String s : streamIds) {
-        ret.add(s + ":");
-      }
-      ret.add("message:");
-      return ret;
-    }
-
-    @Override
-    public JSONObject joinMessages(Map<String, Tuple> streamMessageMap, MessageGetStrategy messageGetStrategy) {
-      return joinedMessage;
-    }
-  }
-
-  /**
-   {
-   "joinField": "joinValue"
-   }
-   */
-  @Multiline
-  private String joinedMessageString;
-
-  private JSONObject joinedMessage;
-  private JoinBolt<JSONObject> joinBolt;
-
-  @Before
-  public void parseMessages() {
-    JSONParser parser = new JSONParser();
-    try {
-      joinedMessage = (JSONObject) parser.parse(joinedMessageString);
-    } catch (ParseException e) {
-      e.printStackTrace();
-    }
-    joinBolt = new StandAloneJoinBolt("zookeeperUrl");
-    joinBolt.setCuratorFramework(client);
-    joinBolt.setZKCache(cache);
-  }
-
-  @Test
-  public void testPrepare() {
-    try {
-      joinBolt.prepare(new HashMap(), topologyContext, outputCollector);
-      fail("Should fail if a maxCacheSize property is not set");
-    } catch(IllegalStateException e) {}
-    joinBolt.withMaxCacheSize(100);
-    try {
-      joinBolt.prepare(new HashMap(), topologyContext, outputCollector);
-      fail("Should fail if a maxTimeRetain property is not set");
-    } catch(IllegalStateException e) {}
-    joinBolt.withMaxTimeRetain(10000);
-    joinBolt.prepare(new HashMap(), topologyContext, outputCollector);
-  }
-
-  @Test
-  public void testDeclareOutputFields() {
-    joinBolt.declareOutputFields(declarer);
-    verify(declarer, times(1)).declareStream(eq("message"), argThat(new FieldsMatcher("key", "message")));
-    verify(declarer, times(1)).declareStream(eq("error"), argThat(new FieldsMatcher("message")));
-    verifyNoMoreInteractions(declarer);
-  }
-
-  @Test
-  public void testExecute() {
-    joinBolt.withMaxCacheSize(100);
-    joinBolt.withMaxTimeRetain(10000);
-    joinBolt.prepare(new HashMap(), topologyContext, outputCollector);
-
-    Tuple geoTuple = mock(Tuple.class);
-    when(geoTuple.getValueByField("key")).thenReturn(key);
-    when(geoTuple.getSourceStreamId()).thenReturn("geo");
-    when(geoTuple.getValueByField("message")).thenReturn(geoMessage);
-    joinBolt.execute(geoTuple);
-
-    Tuple messageTuple = mock(Tuple.class);
-    when(messageTuple.getValueByField("key")).thenReturn(key);
-    when(messageTuple.getSourceStreamId()).thenReturn("message");
-    when(messageTuple.getValueByField("message")).thenReturn(sampleMessage);
-    joinBolt.execute(messageTuple);
-
-    Tuple hostTuple = mock(Tuple.class);
-    when(hostTuple.getValueByField("key")).thenReturn(key);
-    when(hostTuple.getSourceStreamId()).thenReturn("host");
-    when(hostTuple.getValueByField("message")).thenReturn(hostMessage);
-    joinBolt.execute(hostTuple);
-
-    Tuple hbaseEnrichmentTuple = mock(Tuple.class);
-    when(hbaseEnrichmentTuple.getValueByField("key")).thenReturn(key);
-    when(hbaseEnrichmentTuple.getSourceStreamId()).thenReturn("hbaseEnrichment");
-    when(hbaseEnrichmentTuple.getValueByField("message")).thenReturn(hbaseEnrichmentMessage);
-    joinBolt.execute(hbaseEnrichmentTuple);
-
-    Tuple stellarTuple = mock(Tuple.class);
-    when(stellarTuple.getValueByField("key")).thenReturn(key);
-    when(stellarTuple.getSourceStreamId()).thenReturn("stellar");
-    when(stellarTuple.getValueByField("message")).thenReturn(new JSONObject());
-    joinBolt.execute(stellarTuple);
-
-    verify(outputCollector, times(1)).emit(eq("message"), any(tuple.getClass()), eq(new Values(key, joinedMessage)));
-    verify(outputCollector, times(1)).ack(messageTuple);
-
-    verifyNoMoreInteractions(outputCollector);
-  }
-
-  @SuppressWarnings("unchecked")
-  @Test
-  public void testExecuteShouldReportError() throws ExecutionException {
-    joinBolt.withMaxCacheSize(100);
-    joinBolt.withMaxTimeRetain(10000);
-    joinBolt.prepare(new HashMap(), topologyContext, outputCollector);
-    when(tuple.getValueByField("key")).thenReturn(key);
-    when(tuple.getValueByField("message")).thenReturn(new JSONObject());
-    joinBolt.cache = mock(LoadingCache.class);
-    when(joinBolt.cache.get(any())).thenThrow(new RuntimeException(new Exception("join exception")));
-
-    joinBolt.execute(tuple);
-    RuntimeException expectedExecutionException = new RuntimeException(new Exception("join exception"));
-    MetronError error = new MetronError()
-            .withErrorType(Constants.ErrorType.ENRICHMENT_ERROR)
-            .withMessage("Joining problem: {}")
-            .withThrowable(expectedExecutionException)
-            .addRawMessage(new JSONObject());
-    verify(outputCollector, times(1)).emit(eq(Constants.ERROR_STREAM), argThat(new MetronErrorJSONMatcher(error.getJSONObject())));
-    verify(outputCollector, times(1)).reportError(any(ExecutionException.class));
-    verify(outputCollector, times(1)).ack(eq(tuple));
-    verifyNoMoreInteractions(outputCollector);
-  }
-}
diff --git a/metron-platform/metron-enrichment/metron-enrichment-storm/src/test/java/org/apache/metron/enrichment/bolt/SplitBoltTest.java b/metron-platform/metron-enrichment/metron-enrichment-storm/src/test/java/org/apache/metron/enrichment/bolt/SplitBoltTest.java
deleted file mode 100644
index 57dae13..0000000
--- a/metron-platform/metron-enrichment/metron-enrichment-storm/src/test/java/org/apache/metron/enrichment/bolt/SplitBoltTest.java
+++ /dev/null
@@ -1,123 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.metron.enrichment.bolt;
-
-import org.apache.storm.task.TopologyContext;
-import org.apache.storm.topology.OutputFieldsDeclarer;
-import org.apache.storm.tuple.Tuple;
-import org.apache.storm.tuple.Values;
-import com.google.common.collect.ImmutableList;
-import org.apache.metron.common.configuration.ConfigurationType;
-import org.apache.metron.test.bolt.BaseEnrichmentBoltTest;
-import org.json.simple.JSONObject;
-import org.junit.Assert;
-import org.junit.Test;
-
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.anyString;
-import static org.mockito.Matchers.argThat;
-import static org.mockito.Matchers.eq;
-import static org.mockito.Mockito.*;
-
-public class SplitBoltTest extends BaseEnrichmentBoltTest {
-
-  public class StandAloneSplitBolt extends SplitBolt<JSONObject> {
-
-    public StandAloneSplitBolt(String zookeeperUrl) {
-      super(zookeeperUrl);
-    }
-
-
-    @Override
-    public void prepare(Map map, TopologyContext topologyContext) {
-
-    }
-
-    @Override
-    public Set<String> getStreamIds() {
-      return streamIds;
-    }
-
-    @Override
-    public String getKey(Tuple tuple, JSONObject message) {
-      return key;
-    }
-
-    @Override
-    public JSONObject generateMessage(Tuple tuple) {
-      return sampleMessage;
-    }
-
-    @Override
-    public Map<String, List<JSONObject>> splitMessage(JSONObject message) {
-      return null;
-    }
-
-    @Override
-    public void declareOther(OutputFieldsDeclarer declarer) {
-
-    }
-
-    @Override
-    public void emitOther(Tuple tuple, JSONObject message) {
-
-    }
-  }
-
-  @Test
-  public void test() {
-    StandAloneSplitBolt splitBolt = spy(new StandAloneSplitBolt("zookeeperUrl"));
-    splitBolt.setCuratorFramework(client);
-    splitBolt.setZKCache(cache);
-    doCallRealMethod().when(splitBolt).reloadCallback(anyString(), any(ConfigurationType.class));
-    splitBolt.prepare(new HashMap(), topologyContext, outputCollector);
-    splitBolt.declareOutputFields(declarer);
-    verify(declarer, times(1)).declareStream(eq("message"), argThat(new FieldsMatcher("key", "message", "subgroup")));
-    for(String streamId: streamIds) {
-      verify(declarer, times(1)).declareStream(eq(streamId), argThat(new FieldsMatcher("key", "message")));
-    }
-    verify(declarer, times(1)).declareStream(eq("error"), argThat(new FieldsMatcher("message")));
-
-    JSONObject sampleMessage = splitBolt.generateMessage(tuple);
-    Map<String, List<JSONObject>> streamMessageMap = new HashMap<>();
-    streamMessageMap.put("geo", ImmutableList.of(geoMessage));
-    streamMessageMap.put("host", ImmutableList.of(hostMessage));
-    streamMessageMap.put("hbaseEnrichment", ImmutableList.of(hbaseEnrichmentMessage));
-    doReturn(streamMessageMap).when(splitBolt).splitMessage(sampleMessage);
-    splitBolt.execute(tuple);
-    verify(outputCollector, times(1)).emit(eq("message"), any(tuple.getClass()), eq(new Values(key, sampleMessage, "")));
-    verify(outputCollector, times(1)).emit(eq("geo"), eq(new Values(key, geoMessage)));
-    verify(outputCollector, times(1)).emit(eq("host"), eq(new Values(key, hostMessage)));
-    verify(outputCollector, times(1)).emit(eq("hbaseEnrichment"), eq(new Values(key, hbaseEnrichmentMessage)));
-    verify(outputCollector, times(1)).ack(tuple);
-    streamMessageMap = new HashMap<>();
-    streamMessageMap.put("host", null);
-    doReturn(streamMessageMap).when(splitBolt).splitMessage(sampleMessage);
-    try {
-      splitBolt.execute(tuple);
-      Assert.fail("An exception should be thrown when splitMessage produces a null value for a stream");
-    }catch (IllegalArgumentException e) {}
-  }
-
-
-}
diff --git a/metron-platform/metron-enrichment/metron-enrichment-storm/src/test/java/org/apache/metron/enrichment/bolt/ThreatIntelJoinBoltTest.java b/metron-platform/metron-enrichment/metron-enrichment-storm/src/test/java/org/apache/metron/enrichment/bolt/ThreatIntelJoinBoltTest.java
deleted file mode 100644
index aeafede..0000000
--- a/metron-platform/metron-enrichment/metron-enrichment-storm/src/test/java/org/apache/metron/enrichment/bolt/ThreatIntelJoinBoltTest.java
+++ /dev/null
@@ -1,222 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.metron.enrichment.bolt;
-
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
-import com.fasterxml.jackson.databind.JsonMappingException;
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.IOException;
-import java.util.HashMap;
-import java.util.Map;
-import org.adrianwalker.multilinestring.Multiline;
-import org.apache.metron.common.configuration.enrichment.SensorEnrichmentConfig;
-import org.apache.metron.common.configuration.enrichment.threatintel.ThreatTriageConfig;
-import org.apache.metron.storm.common.message.MessageGetStrategy;
-import org.apache.metron.common.utils.JSONUtils;
-import org.apache.metron.enrichment.adapters.maxmind.asn.GeoLiteAsnDatabase;
-import org.apache.metron.enrichment.adapters.maxmind.geo.GeoLiteCityDatabase;
-import org.apache.metron.test.bolt.BaseEnrichmentBoltTest;
-import org.apache.metron.test.utils.UnitTestHelper;
-import org.apache.storm.tuple.Tuple;
-import org.json.simple.JSONObject;
-import org.json.simple.parser.JSONParser;
-import org.json.simple.parser.ParseException;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Test;
-
-public class ThreatIntelJoinBoltTest extends BaseEnrichmentBoltTest {
-
-  private static final String enrichmentConfigPath = "../" + sampleSensorEnrichmentConfigPath;
-
-  /**
-   * {
-   * "field1": "value1",
-   * "enrichedField1": "enrichedValue1",
-   * "source.type": "test"
-   * }
-   */
-  @Multiline
-  private String messageString;
-
-  /**
-   * {
-   * "field1": "value1",
-   * "enrichedField1": "enrichedValue1",
-   * "source.type": "test",
-   * "threatintels.field.end.ts": "timing"
-   * }
-   */
-  @Multiline
-  private String messageWithTimingString;
-
-  /**
-   * {
-   * "field1": "value1",
-   * "enrichedField1": "enrichedValue1",
-   * "source.type": "test",
-   * "threatintels.field": "threatIntelValue"
-   * }
-   */
-  @Multiline
-  private String alertMessageString;
-
-  private JSONObject message;
-  private JSONObject messageWithTiming;
-  private JSONObject alertMessage;
-
-  @Before
-  public void parseMessages() throws ParseException {
-    JSONParser parser = new JSONParser();
-    message = (JSONObject) parser.parse(messageString);
-    messageWithTiming = (JSONObject) parser.parse(messageWithTimingString);
-    alertMessage = (JSONObject) parser.parse(alertMessageString);
-  }
-
-  /**
-   * {
-   *  "riskLevelRules" : [
-   *   {
-   *    "rule" : "enrichedField1 == 'enrichedValue1'",
-   *    "score" : 10
-   *   }
-   *  ],
-   *  "aggregator" : "MAX"
-   * }
-   */
-  @Multiline
-  private static String testWithTriageConfig;
-
-  @Test
-  public void testWithTriage() throws IOException {
-    test(testWithTriageConfig, false);
-  }
-
-  /**
-   * {
-   *  "riskLevelRules" : [
-   *  {
-   *    "rule" : "enrichedField1 == 'enrichedValue1",
-   *    "score" : 10
-   *  }
-   *  ],
-   *  "aggregator" : "MAX"
-   * }
-   */
-  @Multiline
-  private static String testWithBadTriageRuleConfig;
-
-  @Test
-  public void testWithBadTriageRule() throws IOException {
-    test(testWithBadTriageRuleConfig, true);
-  }
-
-  @Test
-  public void testWithoutTriage() throws IOException {
-    test(null, false);
-  }
-
-  /**
-   * {
-   *   "riskLevelRules": [
-   *   {
-   *      "rule" : "not(IN_SUBNET(ip_dst_addr, '192.168.0.0/24'))",
-   *      "score" : 10
-   *   }
-   *   ],
-   *   "aggregator": "MAX"
-   * }
-   */
-  @Multiline
-  private static String testWithStellarFunctionConfig;
-
-  @Test
-  public void testWithStellarFunction() throws IOException {
-    test(testWithStellarFunctionConfig, false);
-  }
-
-  public void test(String threatTriageConfig, boolean badConfig) throws IOException {
-
-    ThreatIntelJoinBolt threatIntelJoinBolt = new ThreatIntelJoinBolt("zookeeperUrl");
-    threatIntelJoinBolt.setCuratorFramework(client);
-    threatIntelJoinBolt.setZKCache(cache);
-
-    SensorEnrichmentConfig enrichmentConfig = JSONUtils.INSTANCE.load(
-            new FileInputStream(enrichmentConfigPath), SensorEnrichmentConfig.class);
-    boolean withThreatTriage = threatTriageConfig != null;
-    if (withThreatTriage) {
-      try {
-        enrichmentConfig.getThreatIntel().setTriageConfig(JSONUtils.INSTANCE.load(threatTriageConfig, ThreatTriageConfig.class));
-        if (badConfig) {
-          Assert.fail(threatTriageConfig + "\nThis should not parse!");
-        }
-      } catch (JsonMappingException pe) {
-        if (!badConfig) {
-          throw pe;
-        }
-      }
-    }
-    threatIntelJoinBolt.getConfigurations().updateSensorEnrichmentConfig(sensorType, enrichmentConfig);
-    HashMap<String, Object> globalConfig = new HashMap<>();
-    String baseDir = UnitTestHelper.findDir(new File("../metron-enrichment-common"), "GeoLite");
-    File geoHdfsFile = new File(new File(baseDir), "GeoLite2-City.mmdb.gz");
-    globalConfig.put(GeoLiteCityDatabase.GEO_HDFS_FILE, geoHdfsFile.getAbsolutePath());
-    File asnHdfsFile = new File(new File(baseDir), "GeoLite2-ASN.tar.gz");
-    globalConfig.put(GeoLiteAsnDatabase.ASN_HDFS_FILE, asnHdfsFile.getAbsolutePath());
-    threatIntelJoinBolt.getConfigurations().updateGlobalConfig(globalConfig);
-    threatIntelJoinBolt.withMaxCacheSize(100);
-    threatIntelJoinBolt.withMaxTimeRetain(10000);
-    threatIntelJoinBolt.prepare(new HashMap<>(), topologyContext, outputCollector);
-
-    Map<String, Object> fieldMap = threatIntelJoinBolt.getFieldMap("incorrectSourceType");
-    Assert.assertNull(fieldMap);
-
-    fieldMap = threatIntelJoinBolt.getFieldMap(sensorType);
-    Assert.assertTrue(fieldMap.containsKey("hbaseThreatIntel"));
-
-    MessageGetStrategy messageGetStrategy = mock(MessageGetStrategy.class);
-    Tuple messageTuple = mock(Tuple.class);
-    when(messageGetStrategy.get(messageTuple)).thenReturn(message);
-    Map<String, Tuple> streamMessageMap = new HashMap<>();
-    streamMessageMap.put("message", messageTuple);
-    JSONObject joinedMessage = threatIntelJoinBolt.joinMessages(streamMessageMap, messageGetStrategy);
-    assertFalse(joinedMessage.containsKey("is_alert"));
-
-    when(messageGetStrategy.get(messageTuple)).thenReturn(messageWithTiming);
-    joinedMessage = threatIntelJoinBolt.joinMessages(streamMessageMap, messageGetStrategy);
-    assertFalse(joinedMessage.containsKey("is_alert"));
-
-    when(messageGetStrategy.get(messageTuple)).thenReturn(alertMessage);
-    joinedMessage = threatIntelJoinBolt.joinMessages(streamMessageMap, messageGetStrategy);
-    assertTrue(joinedMessage.containsKey("is_alert") && "true".equals(joinedMessage.get("is_alert")));
-
-    if(withThreatTriage && !badConfig) {
-      assertTrue(joinedMessage.containsKey("threat.triage.score"));
-      Double score = (Double) joinedMessage.get("threat.triage.score");
-      assertTrue(Math.abs(10d - score) < 1e-10);
-    }
-    else {
-      assertFalse(joinedMessage.containsKey("threat.triage.score"));
-    }
-  }
-}
diff --git a/metron-platform/metron-enrichment/metron-enrichment-storm/src/test/java/org/apache/metron/enrichment/bolt/ThreatIntelSplitterBoltTest.java b/metron-platform/metron-enrichment/metron-enrichment-storm/src/test/java/org/apache/metron/enrichment/bolt/ThreatIntelSplitterBoltTest.java
deleted file mode 100644
index a27297f..0000000
--- a/metron-platform/metron-enrichment/metron-enrichment-storm/src/test/java/org/apache/metron/enrichment/bolt/ThreatIntelSplitterBoltTest.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.metron.enrichment.bolt;
-
-import org.apache.metron.test.bolt.BaseEnrichmentBoltTest;
-import org.junit.Assert;
-import org.junit.Test;
-
-import java.io.FileInputStream;
-import java.io.IOException;
-import java.util.HashMap;
-import java.util.Map;
-
-public class ThreatIntelSplitterBoltTest extends BaseEnrichmentBoltTest {
-
-  private static final String enrichmentConfigPath = "../" + sampleSensorEnrichmentConfigPath;
-
-  @Test
-  public void test() throws IOException {
-    String threatIntelType = "hbaseThreatIntel";
-    ThreatIntelSplitterBolt threatIntelSplitterBolt = new ThreatIntelSplitterBolt("zookeeperUrl");
-    threatIntelSplitterBolt.setCuratorFramework(client);
-    threatIntelSplitterBolt.setZKCache(cache);
-    threatIntelSplitterBolt.getConfigurations().updateSensorEnrichmentConfig(sensorType, new FileInputStream(enrichmentConfigPath));
-    threatIntelSplitterBolt.prepare(new HashMap<>(), topologyContext, outputCollector);
-    Map<String, Object> fieldMap = threatIntelSplitterBolt.getFieldMap(sensorType);
-    Assert.assertTrue(fieldMap.containsKey(threatIntelType));
-    String fieldName = threatIntelSplitterBolt.getKeyName(threatIntelType, "field");
-    Assert.assertEquals("threatintels.hbaseThreatIntel.field", fieldName);
-  }
-}
diff --git a/metron-platform/metron-enrichment/metron-enrichment-storm/src/test/java/org/apache/metron/enrichment/integration/EnrichmentIntegrationTest.java b/metron-platform/metron-enrichment/metron-enrichment-storm/src/test/java/org/apache/metron/enrichment/integration/EnrichmentIntegrationTest.java
index 6b047fb..a138c0d 100644
--- a/metron-platform/metron-enrichment/metron-enrichment-storm/src/test/java/org/apache/metron/enrichment/integration/EnrichmentIntegrationTest.java
+++ b/metron-platform/metron-enrichment/metron-enrichment-storm/src/test/java/org/apache/metron/enrichment/integration/EnrichmentIntegrationTest.java
@@ -23,18 +23,6 @@ import com.google.common.base.Predicate;
 import com.google.common.base.Predicates;
 import com.google.common.base.Splitter;
 import com.google.common.collect.Iterables;
-import java.io.File;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Objects;
-import java.util.Properties;
-import java.util.Set;
-import java.util.stream.Stream;
-import javax.annotation.Nullable;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.metron.TestConstants;
 import org.apache.metron.common.Constants;
@@ -44,7 +32,6 @@ import org.apache.metron.enrichment.adapters.maxmind.geo.GeoLiteCityDatabase;
 import org.apache.metron.enrichment.converter.EnrichmentHelper;
 import org.apache.metron.enrichment.converter.EnrichmentKey;
 import org.apache.metron.enrichment.converter.EnrichmentValue;
-import org.apache.metron.integration.components.ConfigUploadComponent;
 import org.apache.metron.enrichment.lookup.LookupKV;
 import org.apache.metron.enrichment.lookup.accesstracker.PersistentBloomTrackerCreator;
 import org.apache.metron.enrichment.stellar.SimpleHBaseEnrichmentFunctions;
@@ -54,6 +41,7 @@ import org.apache.metron.hbase.mock.MockHTable;
 import org.apache.metron.integration.BaseIntegrationTest;
 import org.apache.metron.integration.ComponentRunner;
 import org.apache.metron.integration.ProcessorResult;
+import org.apache.metron.integration.components.ConfigUploadComponent;
 import org.apache.metron.integration.components.FluxTopologyComponent;
 import org.apache.metron.integration.components.KafkaComponent;
 import org.apache.metron.integration.components.ZKServerComponent;
@@ -66,8 +54,21 @@ import org.junit.Assert;
 import org.junit.BeforeClass;
 import org.junit.Test;
 
+import javax.annotation.Nullable;
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Properties;
+import java.util.Set;
+import java.util.stream.Stream;
+
 /**
- * Integration test for the 'Split-Join' enrichment topology.
+ * Integration test for the enrichment topology.
  */
 public class EnrichmentIntegrationTest extends BaseIntegrationTest {
 
@@ -98,12 +99,8 @@ public class EnrichmentIntegrationTest extends BaseIntegrationTest {
   private static File geoHdfsFile;
   private static File asnHdfsFile;
 
-  protected String fluxPath() {
-    return "src/main/flux/enrichment/remote-splitjoin.yaml";
-  }
-
   private static List<byte[]> getInputMessages(String path){
-    try{
+    try {
       List<byte[]> ret = TestUtils.readSampleData(path);
       {
         //we want one of the fields without a destination IP to ensure that enrichments can function
@@ -113,7 +110,7 @@ public class EnrichmentIntegrationTest extends BaseIntegrationTest {
         ret.add(JSONUtils.INSTANCE.toJSONPretty(sansDestinationIp));
       }
       return ret;
-    }catch(IOException ioe){
+    } catch(IOException ioe){
       return null;
     }
   }
@@ -126,34 +123,41 @@ public class EnrichmentIntegrationTest extends BaseIntegrationTest {
   }
 
   /**
-   * Returns the path to the topology properties template.
-   *
    * @return The path to the topology properties template.
    */
   public String getTemplatePath() {
-    return "src/main/config/enrichment-splitjoin.properties.j2";
+    return "src/main/config/enrichment.properties.j2";
+  }
+
+  /**
+   * @return The path to the flux file defining the topology.
+   */
+  public String fluxPath() {
+    return "src/main/flux/enrichment/remote.yaml";
   }
 
   /**
-   * Properties for the 'Split-Join' topology.
-   *
    * @return The topology properties.
    */
   public Properties getTopologyProperties() {
     return new Properties() {{
+
+      // storm
       setProperty("enrichment_workers", "1");
       setProperty("enrichment_acker_executors", "0");
       setProperty("enrichment_topology_worker_childopts", "");
       setProperty("topology_auto_credentials", "[]");
-      setProperty("enrichment_topology_max_spout_pending", "");
-      setProperty("enrichment_kafka_start", "UNCOMMITTED_EARLIEST");
+      setProperty("enrichment_topology_max_spout_pending", "500");
+
+      // kafka - zookeeper_quorum, kafka_brokers set elsewhere
       setProperty("kafka_security_protocol", "PLAINTEXT");
+      setProperty("enrichment_kafka_start", "UNCOMMITTED_EARLIEST");
       setProperty("enrichment_input_topic", Constants.ENRICHMENT_TOPIC);
       setProperty("enrichment_output_topic", Constants.INDEXING_TOPIC);
       setProperty("enrichment_error_topic", ERROR_TOPIC);
       setProperty("threatintel_error_topic", ERROR_TOPIC);
-      setProperty("enrichment_join_cache_size", "1000");
-      setProperty("threatintel_join_cache_size", "1000");
+
+      // enrichment
       setProperty("enrichment_hbase_provider_impl", "" + MockHBaseTableProvider.class.getName());
       setProperty("enrichment_hbase_table", enrichmentsTableName);
       setProperty("enrichment_hbase_cf", cf);
@@ -161,19 +165,28 @@ public class EnrichmentIntegrationTest extends BaseIntegrationTest {
               "{\"ip\":\"10.1.128.237\", \"local\":\"UNKNOWN\", \"type\":\"unknown\", \"asset_value\" : \"important\"}," +
               "{\"ip\":\"10.60.10.254\", \"local\":\"YES\", \"type\":\"printer\", \"asset_value\" : \"important\"}," +
               "{\"ip\":\"10.0.2.15\", \"local\":\"YES\", \"type\":\"printer\", \"asset_value\" : \"important\"}]");
+
+      // threat intel
       setProperty("threatintel_hbase_table", threatIntelTableName);
       setProperty("threatintel_hbase_cf", cf);
-      setProperty("enrichment_kafka_spout_parallelism", "1");
-      setProperty("enrichment_split_parallelism", "1");
-      setProperty("enrichment_stellar_parallelism", "1");
-      setProperty("enrichment_join_parallelism", "1");
-      setProperty("threat_intel_split_parallelism", "1");
-      setProperty("threat_intel_stellar_parallelism", "1");
-      setProperty("threat_intel_join_parallelism", "1");
-      setProperty("kafka_writer_parallelism", "1");
+
+      // parallelism
+      setProperty("unified_kafka_spout_parallelism", "1");
+      setProperty("unified_enrichment_parallelism", "1");
+      setProperty("unified_threat_intel_parallelism", "1");
+      setProperty("unified_kafka_writer_parallelism", "1");
+
+      // caches
+      setProperty("unified_enrichment_cache_size", "1000");
+      setProperty("unified_threat_intel_cache_size", "1000");
+
+      // threads
+      setProperty("unified_enrichment_threadpool_size", "1");
+      setProperty("unified_enrichment_threadpool_type", "FIXED");
     }};
   }
 
+
   @Test
   public void test() throws Exception {
 
diff --git a/metron-platform/metron-enrichment/metron-enrichment-storm/src/test/java/org/apache/metron/enrichment/integration/UnifiedEnrichmentIntegrationTest.java b/metron-platform/metron-enrichment/metron-enrichment-storm/src/test/java/org/apache/metron/enrichment/integration/UnifiedEnrichmentIntegrationTest.java
deleted file mode 100644
index 93c9700..0000000
--- a/metron-platform/metron-enrichment/metron-enrichment-storm/src/test/java/org/apache/metron/enrichment/integration/UnifiedEnrichmentIntegrationTest.java
+++ /dev/null
@@ -1,96 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.metron.enrichment.integration;
-
-import org.apache.metron.common.Constants;
-import org.apache.metron.hbase.mock.MockHBaseTableProvider;
-
-import java.util.Properties;
-
-/**
- * Integration test for the 'Unified' enrichment topology.
- */
-public class UnifiedEnrichmentIntegrationTest extends EnrichmentIntegrationTest {
-
-  /**
-   * Returns the path to the topology properties template.
-   *
-   * @return The path to the topology properties template.
-   */
-  public String getTemplatePath() {
-    return "src/main/config/enrichment-unified.properties.j2";
-  }
-
-  /**
-   * Properties for the 'Unified' topology.
-   *
-   * @return The topology properties.
-   */
-  @Override
-  public Properties getTopologyProperties() {
-    return new Properties() {{
-
-      // storm
-      setProperty("enrichment_workers", "1");
-      setProperty("enrichment_acker_executors", "0");
-      setProperty("enrichment_topology_worker_childopts", "");
-      setProperty("topology_auto_credentials", "[]");
-      setProperty("enrichment_topology_max_spout_pending", "500");
-
-      // kafka - zookeeper_quorum, kafka_brokers set elsewhere
-      setProperty("kafka_security_protocol", "PLAINTEXT");
-      setProperty("enrichment_kafka_start", "UNCOMMITTED_EARLIEST");
-      setProperty("enrichment_input_topic", Constants.ENRICHMENT_TOPIC);
-      setProperty("enrichment_output_topic", Constants.INDEXING_TOPIC);
-      setProperty("enrichment_error_topic", ERROR_TOPIC);
-      setProperty("threatintel_error_topic", ERROR_TOPIC);
-
-      // enrichment
-      setProperty("enrichment_hbase_provider_impl", "" + MockHBaseTableProvider.class.getName());
-      setProperty("enrichment_hbase_table", enrichmentsTableName);
-      setProperty("enrichment_hbase_cf", cf);
-      setProperty("enrichment_host_known_hosts", "[{\"ip\":\"10.1.128.236\", \"local\":\"YES\", \"type\":\"webserver\", \"asset_value\" : \"important\"}," +
-              "{\"ip\":\"10.1.128.237\", \"local\":\"UNKNOWN\", \"type\":\"unknown\", \"asset_value\" : \"important\"}," +
-              "{\"ip\":\"10.60.10.254\", \"local\":\"YES\", \"type\":\"printer\", \"asset_value\" : \"important\"}," +
-              "{\"ip\":\"10.0.2.15\", \"local\":\"YES\", \"type\":\"printer\", \"asset_value\" : \"important\"}]");
-
-      // threat intel
-      setProperty("threatintel_hbase_table", threatIntelTableName);
-      setProperty("threatintel_hbase_cf", cf);
-
-      // parallelism
-      setProperty("unified_kafka_spout_parallelism", "1");
-      setProperty("unified_enrichment_parallelism", "1");
-      setProperty("unified_threat_intel_parallelism", "1");
-      setProperty("unified_kafka_writer_parallelism", "1");
-
-      // caches
-      setProperty("unified_enrichment_cache_size", "1000");
-      setProperty("unified_threat_intel_cache_size", "1000");
-
-      // threads
-      setProperty("unified_enrichment_threadpool_size", "1");
-      setProperty("unified_enrichment_threadpool_type", "FIXED");
-    }};
-  }
-
-  @Override
-  public String fluxPath() {
-    return "src/main/flux/enrichment/remote-unified.yaml";
-  }
-}