You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@metron.apache.org by ce...@apache.org on 2016/05/10 16:53:57 UTC

[5/5] incubator-metron git commit: METRON-141: The ability to do threat triage closes apache/incubator-metron#108

METRON-141: The ability to do threat triage closes apache/incubator-metron#108


Project: http://git-wip-us.apache.org/repos/asf/incubator-metron/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-metron/commit/deed21e6
Tree: http://git-wip-us.apache.org/repos/asf/incubator-metron/tree/deed21e6
Diff: http://git-wip-us.apache.org/repos/asf/incubator-metron/diff/deed21e6

Branch: refs/heads/master
Commit: deed21e6e8d8a8924c199c38e68c69481263d0d1
Parents: 743f37b
Author: cstella <ce...@gmail.com>
Authored: Tue May 10 12:53:35 2016 -0400
Committer: cstella <ce...@gmail.com>
Committed: Tue May 10 12:53:35 2016 -0400

----------------------------------------------------------------------
 .../files/config/sensors/bro.json               |   25 +-
 .../files/config/sensors/pcap.json              |   26 +-
 .../files/config/sensors/snort.json             |   34 +-
 .../files/config/sensors/yaf.json               |   29 +-
 .../metron_streaming/tasks/source_config.yml    |    2 +-
 metron-platform/metron-common/README.md         |  186 +++
 metron-platform/metron-common/pom.xml           |   22 +
 .../metron/common/query/generated/Predicate.g4  |  131 +++
 .../src/main/java/Predicate.tokens              |   31 +
 .../src/main/java/PredicateLexer.tokens         |   31 +
 .../metron/common/aggregator/Aggregator.java    |   26 +
 .../metron/common/aggregator/Aggregators.java   |   62 +
 .../metron/common/bolt/ConfiguredBolt.java      |    2 +-
 .../metron/common/cli/ConfigurationManager.java |  200 ++++
 .../metron/common/cli/ConfigurationsUtils.java  |  232 ----
 .../common/configuration/Configuration.java     |    1 -
 .../common/configuration/ConfigurationType.java |   75 ++
 .../common/configuration/Configurations.java    |    1 +
 .../configuration/ConfigurationsUtils.java      |  195 +++
 .../common/configuration/EnrichmentConfig.java  |  213 ----
 .../configuration/SensorEnrichmentConfig.java   |  129 --
 .../enrichment/EnrichmentConfig.java            |   71 ++
 .../enrichment/SensorEnrichmentConfig.java      |  108 ++
 .../SensorEnrichmentUpdateConfig.java           |  209 ++++
 .../common/configuration/enrichment/Type.java   |   24 +
 .../threatintel/ThreatIntelConfig.java          |   59 +
 .../threatintel/ThreatTriageConfig.java         |  101 ++
 .../apache/metron/common/query/BooleanOp.java   |   23 +
 .../metron/common/query/ErrorListener.java      |   50 +
 .../metron/common/query/FunctionMarker.java     |   21 +
 .../metron/common/query/LogicalFunctions.java   |   99 ++
 .../common/query/MapVariableResolver.java       |   33 +
 .../metron/common/query/ParseException.java     |   28 +
 .../metron/common/query/PredicateProcessor.java |   59 +
 .../metron/common/query/PredicateToken.java     |   58 +
 .../metron/common/query/QueryCompiler.java      |  288 +++++
 .../metron/common/query/StringFunctions.java    |   42 +
 .../metron/common/query/VariableResolver.java   |   23 +
 .../query/generated/PredicateBaseListener.java  |  336 ++++++
 .../common/query/generated/PredicateLexer.java  |  178 +++
 .../query/generated/PredicateListener.java      |  299 +++++
 .../common/query/generated/PredicateParser.java | 1108 ++++++++++++++++++
 .../src/main/scripts/zk_load_configs.sh         |    2 +-
 .../metron/common/bolt/ConfiguredBoltTest.java  |    8 +-
 .../ConfigurationManagerIntegrationTest.java    |  176 +++
 .../common/cli/ConfigurationsUtilsTest.java     |   13 +-
 .../configuration/EnrichmentConfigTest.java     |  211 ----
 .../SensorEnrichmentConfigTest.java             |    2 +-
 .../SensorEnrichmentUpdateConfigTest.java       |  224 ++++
 .../metron/common/query/QueryParserTest.java    |  152 +++
 .../src/test/resources/config/sensors/bro.json  |   30 +-
 .../src/main/assembly/assembly.xml              |    2 +-
 .../src/main/bash/Whois_CSV_to_JSON.py          |  208 ----
 .../src/main/bash/flatfile_loader.sh            |   42 -
 .../main/bash/prune_elasticsearch_indices.sh    |   21 -
 .../src/main/bash/prune_hdfs_files.sh           |   21 -
 .../src/main/bash/threatintel_bulk_load.sh      |   41 -
 .../src/main/bash/threatintel_bulk_prune.sh     |   40 -
 .../src/main/bash/threatintel_taxii_load.sh     |   42 -
 .../dataloads/bulk/ThreatIntelBulkLoader.java   |   12 +-
 .../SimpleEnrichmentFlatFileLoader.java         |   12 +-
 .../dataloads/nonbulk/taxii/TaxiiLoader.java    |   10 +-
 .../src/main/scripts/Whois_CSV_to_JSON.py       |  208 ++++
 .../src/main/scripts/flatfile_loader.sh         |   42 +
 .../main/scripts/prune_elasticsearch_indices.sh |   21 +
 .../src/main/scripts/prune_hdfs_files.sh        |   21 +
 .../src/main/scripts/threatintel_bulk_load.sh   |   41 +
 .../src/main/scripts/threatintel_bulk_prune.sh  |   40 +
 .../src/main/scripts/threatintel_taxii_load.sh  |   42 +
 .../writer/ElasticsearchWriter.java             |    2 +-
 metron-platform/metron-enrichment/pom.xml       |    1 +
 .../simplehbase/SimpleHBaseAdapter.java         |    2 +-
 .../threatintel/ThreatIntelAdapter.java         |    4 +-
 .../enrichment/bolt/BulkMessageWriterBolt.java  |    2 +-
 .../apache/metron/enrichment/bolt/CacheKey.java |    2 +-
 .../enrichment/bolt/EnrichmentJoinBolt.java     |   10 +-
 .../enrichment/bolt/EnrichmentSplitterBolt.java |    4 +-
 .../enrichment/bolt/GenericEnrichmentBolt.java  |    2 +-
 .../apache/metron/enrichment/bolt/JoinBolt.java |   33 +-
 .../enrichment/bolt/ThreatIntelJoinBolt.java    |   59 +-
 .../bolt/ThreatIntelSplitterBolt.java           |    4 +-
 .../triage/ThreatTriageProcessor.java           |   51 +
 .../simplehbase/SimpleHBaseAdapterTest.java     |   34 +-
 .../threatintel/ThreatIntelAdapterTest.java     |   42 +-
 .../bolt/GenericEnrichmentBoltTest.java         |    4 +-
 .../bolt/ThreatIntelJoinBoltTest.java           |   65 +-
 .../threatintel/triage/ThreatTriageTest.java    |  136 +++
 .../metron/integration/BaseIntegrationTest.java |    2 +-
 .../integration/EnrichmentIntegrationTest.java  |    2 +
 .../metron/integration/utils/SampleUtil.java    |    2 +-
 .../resources/sample/config/sensors/bro.json    |   34 +-
 .../resources/sample/config/sensors/pcap.json   |   25 +-
 .../resources/sample/config/sensors/snort.json  |   37 +-
 .../resources/sample/config/sensors/yaf.json    |   62 +-
 .../SolrEnrichmentIntegrationTest.java          |    2 +-
 metron-platform/style/checkstyle.xml            |   10 +-
 metron-platform/style/suppressions.xml          |   27 +
 pom.xml                                         |    1 +
 98 files changed, 5793 insertions(+), 1422 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/deed21e6/metron-deployment/roles/metron_streaming/files/config/sensors/bro.json
----------------------------------------------------------------------
diff --git a/metron-deployment/roles/metron_streaming/files/config/sensors/bro.json b/metron-deployment/roles/metron_streaming/files/config/sensors/bro.json
index 2b534b4..0eb34b3 100644
--- a/metron-deployment/roles/metron_streaming/files/config/sensors/bro.json
+++ b/metron-deployment/roles/metron_streaming/files/config/sensors/bro.json
@@ -1,19 +1,20 @@
 {
   "index": "bro",
   "batchSize": 5,
-  "enrichmentFieldMap":
-  {
-    "geo": ["ip_dst_addr", "ip_src_addr"],
-    "host": ["host"]
+  "enrichment" : {
+    "fieldMap": {
+      "geo": ["ip_dst_addr", "ip_src_addr"],
+      "host": ["host"]
+    }
   },
-  "threatIntelFieldMap":
-  {
-    "hbaseThreatIntel": ["ip_src_addr", "ip_dst_addr"]
-  },
-  "fieldToThreatIntelTypeMap":
-  {
-    "ip_src_addr" : ["malicious_ip"],
-    "ip_dst_addr" : ["malicious_ip"]
+  "threatIntel": {
+    "fieldMap": {
+      "hbaseThreatIntel": ["ip_src_addr", "ip_dst_addr"]
+    },
+    "fieldToTypeMap": {
+      "ip_src_addr" : ["malicious_ip"],
+      "ip_dst_addr" : ["malicious_ip"]
+    }
   }
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/deed21e6/metron-deployment/roles/metron_streaming/files/config/sensors/pcap.json
----------------------------------------------------------------------
diff --git a/metron-deployment/roles/metron_streaming/files/config/sensors/pcap.json b/metron-deployment/roles/metron_streaming/files/config/sensors/pcap.json
index 7792165..8a3eab1 100644
--- a/metron-deployment/roles/metron_streaming/files/config/sensors/pcap.json
+++ b/metron-deployment/roles/metron_streaming/files/config/sensors/pcap.json
@@ -1,19 +1,23 @@
 {
   "index": "pcap",
   "batchSize": 5,
-  "enrichmentFieldMap":
-  {
-    "geo": ["ip_src_addr", "ip_dst_addr"],
-    "host": ["ip_src_addr", "ip_dst_addr"]
+  "enrichment" : {
+    "fieldMap":
+      {
+      "geo": ["ip_src_addr", "ip_dst_addr"],
+      "host": ["ip_src_addr", "ip_dst_addr"]
+    }
   },
-  "threatIntelFieldMap":
-  {
-    "hbaseThreatIntel": ["ip_dst_addr", "ip_src_addr"]
-  },
-  "fieldToThreatIntelTypeMap":
-  {
-    "ip_dst_addr" : [ "malicious_ip" ]
+  "threatIntel" : {
+    "fieldMap":
+      {
+      "hbaseThreatIntel": ["ip_dst_addr", "ip_src_addr"]
+    },
+    "fieldToTypeMap":
+      {
+      "ip_dst_addr" : [ "malicious_ip" ]
     ,"ip_src_addr" : [ "malicious_ip" ]
+    }
   }
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/deed21e6/metron-deployment/roles/metron_streaming/files/config/sensors/snort.json
----------------------------------------------------------------------
diff --git a/metron-deployment/roles/metron_streaming/files/config/sensors/snort.json b/metron-deployment/roles/metron_streaming/files/config/sensors/snort.json
index c5b6dcc..9dfc80e 100644
--- a/metron-deployment/roles/metron_streaming/files/config/sensors/snort.json
+++ b/metron-deployment/roles/metron_streaming/files/config/sensors/snort.json
@@ -1,18 +1,28 @@
 {
   "index": "snort",
   "batchSize": 1,
-  "enrichmentFieldMap":
-  {
-    "geo": ["ip_dst_addr", "ip_src_addr"],
-    "host": ["host"]
+  "enrichment" : {
+    "fieldMap":
+      {
+      "geo": ["ip_dst_addr", "ip_src_addr"],
+      "host": ["host"]
+    }
   },
- "threatIntelFieldMap":
-  {
-    "hbaseThreatIntel": ["ip_src_addr", "ip_dst_addr"]
-  },
-  "fieldToThreatIntelTypeMap":
-  {
-    "ip_src_addr" : ["malicious_ip"],
-    "ip_dst_addr" : ["malicious_ip"]
+  "threatIntel" : {
+    "fieldMap":
+      {
+      "hbaseThreatIntel": ["ip_src_addr", "ip_dst_addr"]
+    },
+    "fieldToTypeMap":
+      {
+      "ip_src_addr" : ["malicious_ip"],
+      "ip_dst_addr" : ["malicious_ip"]
+    },
+    "triageConfig" : {
+      "riskLevelRules" : {
+        "not(IN_SUBNET(ip_dst_addr, '192.168.0.0/24'))" : 10
+      },
+      "aggregator" : "MAX"
+    }
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/deed21e6/metron-deployment/roles/metron_streaming/files/config/sensors/yaf.json
----------------------------------------------------------------------
diff --git a/metron-deployment/roles/metron_streaming/files/config/sensors/yaf.json b/metron-deployment/roles/metron_streaming/files/config/sensors/yaf.json
index 2b46c9a..4e67748 100644
--- a/metron-deployment/roles/metron_streaming/files/config/sensors/yaf.json
+++ b/metron-deployment/roles/metron_streaming/files/config/sensors/yaf.json
@@ -1,19 +1,22 @@
 {
   "index": "yaf",
   "batchSize": 5,
-  "enrichmentFieldMap":
-  {
-    "geo": ["ip_dst_addr", "ip_src_addr"],
-    "host": ["host"]
+  "enrichment" : {
+    "fieldMap":
+      {
+      "geo": ["ip_dst_addr", "ip_src_addr"],
+      "host": ["host"]
+    }
   },
-  "threatIntelFieldMap":
-  {
-    "hbaseThreatIntel": ["ip_src_addr", "ip_dst_addr"]
-  },
-  "fieldToThreatIntelTypeMap":
-  {
-    "ip_src_addr" : ["malicious_ip"],
-    "ip_dst_addr" : ["malicious_ip"]
+  "threatIntel": {
+    "fieldMap":
+      {
+      "hbaseThreatIntel": ["ip_src_addr", "ip_dst_addr"]
+    },
+    "fieldToTypeMap":
+      {
+      "ip_src_addr" : ["malicious_ip"],
+      "ip_dst_addr" : ["malicious_ip"]
+    }
   }
 }
-

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/deed21e6/metron-deployment/roles/metron_streaming/tasks/source_config.yml
----------------------------------------------------------------------
diff --git a/metron-deployment/roles/metron_streaming/tasks/source_config.yml b/metron-deployment/roles/metron_streaming/tasks/source_config.yml
index f3039af..21292b1 100644
--- a/metron-deployment/roles/metron_streaming/tasks/source_config.yml
+++ b/metron-deployment/roles/metron_streaming/tasks/source_config.yml
@@ -43,7 +43,7 @@
     - ../roles/metron_streaming/files/config/
 
 - name: Load Config
-  shell: "{{ metron_directory }}/bin/zk_load_configs.sh -p {{ zookeeper_config_path }} -z {{ zookeeper_url }} && touch {{ zookeeper_config_path }}/configured"
+  shell: "{{ metron_directory }}/bin/zk_load_configs.sh --mode PUSH -i {{ zookeeper_config_path }} -z {{ zookeeper_url }} && touch {{ zookeeper_config_path }}/configured"
 
 
 

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/deed21e6/metron-platform/metron-common/README.md
----------------------------------------------------------------------
diff --git a/metron-platform/metron-common/README.md b/metron-platform/metron-common/README.md
new file mode 100644
index 0000000..60bd287
--- /dev/null
+++ b/metron-platform/metron-common/README.md
@@ -0,0 +1,186 @@
+#Query Language
+
+For a variety of components (currently only threat intelligence triage) we have the need to determine if a condition is true of the JSON documents being enriched.  For those purposes, there exists a simple DSL created to define those conditions.
+
+The query language supports the following:
+* Referencing fields in the enriched JSON
+* Simple boolean operations: and, not, or
+* Determining whether a field exists (via `exists`)
+* The ability to have parenthesis to make order of operations explicit
+* A fixed set of functions which take strings and return boolean.  Currently:
+    * `IN_SUBNET(ip, cidr1, cidr2, ...)`
+    * `IS_EMPTY(str)`
+    * `STARTS_WITH(str, prefix)`
+    * `ENDS_WITH(str, suffix)`
+    * `REGEXP_MATCH(str, pattern)`
+* A fixed set of string to string transformation functions:
+    * `TO_LOWER`
+    * `TO_UPPER`
+    * `TRIM`
+
+Example query:
+
+`IN_SUBNET( ip, '192.168.0.0/24') or ip in [ '10.0.0.1', '10.0.0.2' ] or exists(is_local)`
+
+This evaluates to true precisely when one of the following is true:
+* The value of the `ip` field is in the `192.168.0.0/24` subnet
+* The value of the `ip` field is `10.0.0.1` or `10.0.0.2`
+* The field `is_local` exists
+
+#Enrichment Configuration
+
+The configuration for the `enrichment` topology, the topology primarily
+responsible for enrichment and threat intelligence enrichment, is
+defined by JSON documents stored in zookeeper.
+
+There are two types of configurations at the moment, `global` and
+`sensor` specific.  
+
+##Global Configuration
+The format of the global enrichment is a JSON String to Object map.  This is intended for
+configuration which is non sensor specific configuration.
+
+This configuration is stored in zookeeper, but looks something like
+
+```json
+{
+  "es.clustername": "metron",
+  "es.ip": "node1",
+  "es.port": "9300",
+  "es.date.format": "yyyy.MM.dd.HH"
+}
+```
+
+##Sensor Enrichment Configuration
+
+The sensor specific configuration is intended to configure the
+individual enrichments and threat intelligence enrichments for a given
+sensor type (e.g. `snort`).
+
+Just like the global config, the format is a JSON stored in zookeeper.
+The configuration is a complex JSON object with the following top level fields:
+
+* `index` : The name of the sensor
+* `batchSize` : The size of the batch that is written to the indices at once.
+* `enrichment` : A complex JSON object representing the configuration of the enrichments
+* `threatIntel` : A complex JSON object representing the configuration of the threat intelligence enrichments
+
+###The `enrichment` Configuration 
+
+
+| Field            | Description                                                                                                                                                                                                                      | Example                                                                   |
+|------------------|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|---------------------------------------------------------------------------|
+| `fieldToTypeMap` | In the case of a simple HBase enrichment (i.e. a key/value lookup), the mapping between fields and the enrichment types associated with those fields must be known.  This enrichment type is used as part of the HBase key. | `"fieldToTypeMap" : { "ip_src_addr" : [ "asset_enrichment" ] }`  |
+| `fieldMap`       | The map of enrichment bolts names to fields in the JSON messages.,Each field is sent to the enrichment referenced in the key.                                                                                                    | `"fieldMap": {"hbaseEnrichment": ["ip_src_addr","ip_dst_addr"]}` |
+
+###The `threatIntel` Configuration 
+
+| Field            | Description                                                                                                                                                                                                                                      | Example                                                                  |
+|------------------|--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|--------------------------------------------------------------------------|
+| `fieldToTypeMap` | In the case of a simple HBase threat intel enrichment (i.e. a key/value lookup), the mapping between fields and the enrichment types associated with those fields must be known.  This enrichment type is used as part of the HBase key. | `"fieldToTypeMap" : { "ip_src_addr" : [ "malicious_ips" ] }`             |
+| `fieldMap`       | The map of threat intel enrichment bolts names to fields in the JSON messages. Each field is sent to the threat intel enrichment bolt referenced in the key.                                                                              | `"fieldMap": {"hbaseThreatIntel": ["ip_src_addr","ip_dst_addr"]}`        |
+| `triageConfig`   | The configuration of the threat triage scorer.  In the situation where a threat is detected, a score is assigned to the message and embedded in the indexed message.                                                                             | `"riskLevelRules" : { "IN_SUBNET(ip_dst_addr, '192.168.0.0/24')" : 10 }` |
+
+The `triageConfig` field is also a complex field and it bears some description:
+
+| Field            | Description                                                                                                                                                | Example                                                                  |
+|------------------|------------------------------------------------------------------------------------------------------------------------------------------------------------|--------------------------------------------------------------------------|
+| `riskLevelRules` | The mapping of Metron Query Language (see above) queries to a score.                                                                                       | `"riskLevelRules" : { "IN_SUBNET(ip_dst_addr, '192.168.0.0/24')" : 10 }` |
+| `aggregator`     | An aggregation function that takes all non-zero scores representing the matching queries from `riskLevelRules` and aggregates them into a single score. | `"MAX"`                                                                  |
+
+The supported aggregation functions are:
+* `MAX` : The max of all of the associated values for matching queries
+* `MIN` : The min of all of the associated values for matching queries
+* `MEAN` : The mean of all of the associated values for matching queries
+* `POSITIVE_MEAN` : The mean of the positive associated values for the matching queries.
+
+###Example
+
+An example configuration for the YAF sensor is as follows:
+```json
+{
+  "index": "yaf",
+  "batchSize": 5,
+  "enrichment": {
+    "fieldMap": {
+      "geo": [
+        "ip_src_addr",
+        "ip_dst_addr"
+      ],
+      "host": [
+        "ip_src_addr",
+        "ip_dst_addr"
+      ],
+      "hbaseEnrichment": [
+        "ip_src_addr",
+        "ip_dst_addr"
+      ]
+    }
+  ,"fieldToTypeMap": {
+      "ip_src_addr": [
+        "playful_classification"
+      ],
+      "ip_dst_addr": [
+        "playful_classification"
+      ]
+    }
+  },
+  "threatIntel": {
+    "fieldMap": {
+      "hbaseThreatIntel": [
+        "ip_src_addr",
+        "ip_dst_addr"
+      ]
+    },
+    "fieldToTypeMap": {
+      "ip_src_addr": [
+        "malicious_ip"
+      ],
+      "ip_dst_addr": [
+        "malicious_ip"
+      ]
+    },
+    "triageConfig" : {
+      "riskLevelRules" : {
+        "ip_src_addr == '10.0.2.3' or ip_dst_addr == '10.0.2.3'" : 10
+      },
+      "aggregator" : "MAX"
+    }
+  }
+}
+```
+
+
+##Management Utility
+
+Configurations should be stored on disk in the following structure starting at `$BASE_DIR`:
+* global.json : The global config
+* `sensors` : The subdirectory containing sensor enrichment configuration JSON (e.g. `snort.json`, `bro.json`)
+
+By default, this directory as deployed by the ansible infrastructure is at `$METRON_HOME/config/zookeeper`
+
+While the configs are stored on disk, they must be loaded into Zookeeper to be used.  To this end, there is a
+utility program to assist in this called `$METRON_HOME/bin/zk_load_config.sh`
+
+This has the following options:
+
+```
+ -f,--force                                Force operation
+ -h,--help                                 Generate Help screen
+ -i,--input_dir <DIR>                      The input directory containing
+                                           the configuration files named
+                                           like "$source.json"
+ -m,--mode <MODE>                          The mode of operation: DUMP,
+                                           PULL, PUSH
+ -o,--output_dir <DIR>                     The output directory which will
+                                           store the JSON configuration
+                                           from Zookeeper
+ -z,--zk_quorum <host:port,[host:port]*>   Zookeeper Quorum URL
+                                           (zk1:port,zk2:port,...)
+```
+
+Usage examples:
+
+* To dump the existing configs from zookeeper on the singlenode vagrant machine: `$METRON_HOME/bin/zk_load_configs.sh -z node1:2181 -m DUMP`
+* To push the configs into zookeeper on the singlenode vagrant machine: `$METRON_HOME/bin/zk_load_configs.sh -z node1:2181 -m PUSH -i $METRON_HOME/config/zookeeper`
+* To pull the configs from zookeeper to the singlenode vagrant machine disk: `$METRON_HOME/bin/zk_load_configs.sh -z node1:2181 -m PULL -o $METRON_HOME/config/zookeeper -f`

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/deed21e6/metron-platform/metron-common/pom.xml
----------------------------------------------------------------------
diff --git a/metron-platform/metron-common/pom.xml b/metron-platform/metron-common/pom.xml
index 91d50fe..9d439e0 100644
--- a/metron-platform/metron-common/pom.xml
+++ b/metron-platform/metron-common/pom.xml
@@ -27,6 +27,7 @@
         <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
         <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
         <commons.config.version>1.10</commons.config.version>
+        <antlr.version>4.5</antlr.version>
     </properties>
     <repositories>
         <repository>
@@ -47,6 +48,11 @@
             <version>${global_json_simple_version}</version>
         </dependency>
         <dependency>
+            <groupId>org.antlr</groupId>
+            <artifactId>antlr4-runtime</artifactId>
+            <version>${antlr.version}</version>
+        </dependency>
+        <dependency>
             <groupId>org.apache.storm</groupId>
             <artifactId>storm-core</artifactId>
             <version>${global_storm_version}</version>
@@ -236,6 +242,22 @@
                 </configuration>
             </plugin>
             <plugin>
+                <groupId>org.antlr</groupId>
+                <artifactId>antlr4-maven-plugin</artifactId>
+                <version>${antlr.version}</version>
+                <configuration>
+                  <outputDirectory>${basedir}/src/main/java</outputDirectory>
+                </configuration>
+                <executions>
+                  <execution>
+                    <goals>
+                      <goal>antlr4</goal>
+                    </goals>
+                  </execution>
+                </executions>
+            </plugin>
+
+            <plugin>
                 <groupId>org.apache.maven.plugins</groupId>
                 <artifactId>maven-shade-plugin</artifactId>
                 <version>${global_shade_version}</version>

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/deed21e6/metron-platform/metron-common/src/main/antlr4/org/apache/metron/common/query/generated/Predicate.g4
----------------------------------------------------------------------
diff --git a/metron-platform/metron-common/src/main/antlr4/org/apache/metron/common/query/generated/Predicate.g4 b/metron-platform/metron-common/src/main/antlr4/org/apache/metron/common/query/generated/Predicate.g4
new file mode 100644
index 0000000..73c60cd
--- /dev/null
+++ b/metron-platform/metron-common/src/main/antlr4/org/apache/metron/common/query/generated/Predicate.g4
@@ -0,0 +1,131 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+grammar Predicate;
+
+@header {
+//CHECKSTYLE:OFF
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+}
+
+/* Lexical rules */
+
+AND : 'and'
+    | '&&'
+    | 'AND'
+    ;
+OR  : 'or'
+    | '||'
+    | 'OR';
+
+NOT : 'not'
+    | 'NOT';
+
+TRUE  : 'true'
+      | 'TRUE' ;
+
+FALSE : 'false'
+      | 'FALSE';
+
+EQ : '==' ;
+NEQ : '!=' ;
+COMMA : ',';
+
+LBRACKET : '[';
+RBRACKET : ']';
+LPAREN : '(' ;
+RPAREN : ')' ;
+
+IN : 'in'
+   ;
+NIN : 'not in'
+   ;
+EXISTS : 'exists';
+IDENTIFIER : [a-zA-Z_][a-zA-Z_\.0-9]* ;
+fragment SCHAR:  ~['"\\\r\n];
+STRING_LITERAL : '"' SCHAR* '"'
+               | '\'' SCHAR* '\'' ;
+SEMI : ';' ;
+
+
+// COMMENT and WS are stripped from the output token stream by sending
+// to a different channel 'skip'
+
+COMMENT : '//' .+? ('\n'|EOF) -> skip ;
+
+WS : [ \r\t\u000C\n]+ -> skip ;
+
+
+/* Parser rules */
+
+single_rule : logical_expr EOF;
+
+logical_expr
+ : logical_expr AND logical_expr # LogicalExpressionAnd
+ | logical_expr OR logical_expr  # LogicalExpressionOr
+ | comparison_expr               # ComparisonExpression
+ | LPAREN logical_expr RPAREN    # LogicalExpressionInParen
+ | NOT LPAREN logical_expr RPAREN #NotFunc
+ | logical_entity                # LogicalEntity
+ ;
+
+comparison_expr : comparison_operand comp_operator comparison_operand # ComparisonExpressionWithOperator
+                | identifier_operand IN list_entity #InExpression
+                | identifier_operand NIN list_entity #NInExpression
+                | LPAREN comparison_expr RPAREN # ComparisonExpressionParens
+                ;
+
+logical_entity : (TRUE | FALSE) # LogicalConst
+               | EXISTS LPAREN IDENTIFIER RPAREN #ExistsFunc
+               | IDENTIFIER LPAREN func_args RPAREN #LogicalFunc
+               ;
+
+list_entity : LBRACKET op_list RBRACKET
+            ;
+func_args : op_list
+          ;
+op_list : identifier_operand
+        | op_list COMMA identifier_operand
+        ;
+identifier_operand : STRING_LITERAL # StringLiteral
+                   | IDENTIFIER     # LogicalVariable
+                   | IDENTIFIER LPAREN func_args RPAREN #StringFunc
+                   ;
+
+comparison_operand : identifier_operand #IdentifierOperand
+                   | logical_entity # LogicalConstComparison
+                   ;
+
+comp_operator : (EQ | NEQ) # ComparisonOp
+              ;

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/deed21e6/metron-platform/metron-common/src/main/java/Predicate.tokens
----------------------------------------------------------------------
diff --git a/metron-platform/metron-common/src/main/java/Predicate.tokens b/metron-platform/metron-common/src/main/java/Predicate.tokens
new file mode 100644
index 0000000..e0726f9
--- /dev/null
+++ b/metron-platform/metron-common/src/main/java/Predicate.tokens
@@ -0,0 +1,31 @@
+AND=1
+OR=2
+NOT=3
+TRUE=4
+FALSE=5
+EQ=6
+NEQ=7
+COMMA=8
+LBRACKET=9
+RBRACKET=10
+LPAREN=11
+RPAREN=12
+IN=13
+NIN=14
+EXISTS=15
+IDENTIFIER=16
+STRING_LITERAL=17
+SEMI=18
+COMMENT=19
+WS=20
+'=='=6
+'!='=7
+','=8
+'['=9
+']'=10
+'('=11
+')'=12
+'in'=13
+'not in'=14
+'exists'=15
+';'=18

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/deed21e6/metron-platform/metron-common/src/main/java/PredicateLexer.tokens
----------------------------------------------------------------------
diff --git a/metron-platform/metron-common/src/main/java/PredicateLexer.tokens b/metron-platform/metron-common/src/main/java/PredicateLexer.tokens
new file mode 100644
index 0000000..e0726f9
--- /dev/null
+++ b/metron-platform/metron-common/src/main/java/PredicateLexer.tokens
@@ -0,0 +1,31 @@
+AND=1
+OR=2
+NOT=3
+TRUE=4
+FALSE=5
+EQ=6
+NEQ=7
+COMMA=8
+LBRACKET=9
+RBRACKET=10
+LPAREN=11
+RPAREN=12
+IN=13
+NIN=14
+EXISTS=15
+IDENTIFIER=16
+STRING_LITERAL=17
+SEMI=18
+COMMENT=19
+WS=20
+'=='=6
+'!='=7
+','=8
+'['=9
+']'=10
+'('=11
+')'=12
+'in'=13
+'not in'=14
+'exists'=15
+';'=18

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/deed21e6/metron-platform/metron-common/src/main/java/org/apache/metron/common/aggregator/Aggregator.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/aggregator/Aggregator.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/aggregator/Aggregator.java
new file mode 100644
index 0000000..d7ca31e
--- /dev/null
+++ b/metron-platform/metron-common/src/main/java/org/apache/metron/common/aggregator/Aggregator.java
@@ -0,0 +1,26 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.metron.common.aggregator;
+
+import java.util.List;
+import java.util.Map;
+
+public interface Aggregator {
+  Double aggregate(List<Number> scores, Map<String, Object> config);
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/deed21e6/metron-platform/metron-common/src/main/java/org/apache/metron/common/aggregator/Aggregators.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/aggregator/Aggregators.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/aggregator/Aggregators.java
new file mode 100644
index 0000000..35b7ada
--- /dev/null
+++ b/metron-platform/metron-common/src/main/java/org/apache/metron/common/aggregator/Aggregators.java
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.metron.common.aggregator;
+
+import java.util.List;
+import java.util.Map;
+import java.util.function.BinaryOperator;
+import java.util.function.Predicate;
+
+public enum Aggregators implements Aggregator {
+   MAX( (numbers, config) -> accumulate(0d, (x,y) -> Math.max(x.doubleValue(),y.doubleValue()), numbers))
+  ,MIN( (numbers, config) -> accumulate(0d, (x,y) -> Math.min(x.doubleValue(),y.doubleValue()), numbers))
+  ,SUM( (numbers, config) -> accumulate(0d, (x,y) -> x.doubleValue() + y.doubleValue(), numbers))
+  ,MEAN( (numbers, config) -> scale(SUM.aggregate(numbers, config), numbers, n -> true))
+  ,POSITIVE_MEAN( (numbers, config) -> scale(SUM.aggregate(numbers, config), numbers, n -> n.doubleValue() > 0))
+  ;
+  Aggregator aggregator;
+  Aggregators(Aggregator agg) {
+    aggregator = agg;
+  }
+  public Aggregator getAggregator() {
+    return aggregator;
+  }
+
+  private static double accumulate(double initial, BinaryOperator<Number> op, List<Number> list) {
+    if(list.isEmpty()) {
+      return 0d;
+    }
+    return list.stream()
+               .reduce(initial, op)
+               .doubleValue();
+  }
+
+  private static double scale(double numberToScale, List<Number> list, Predicate<Number> filterFunc) {
+    double scale = list.stream().filter(filterFunc).count();
+    if(scale < 1e-5) {
+      scale = 1;
+    }
+    return numberToScale / scale;
+  }
+
+  @Override
+  public Double aggregate(List<Number> scores, Map<String, Object> config) {
+    return aggregator.aggregate(scores, config);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/deed21e6/metron-platform/metron-common/src/main/java/org/apache/metron/common/bolt/ConfiguredBolt.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/bolt/ConfiguredBolt.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/bolt/ConfiguredBolt.java
index aa654fb..1364305 100644
--- a/metron-platform/metron-common/src/main/java/org/apache/metron/common/bolt/ConfiguredBolt.java
+++ b/metron-platform/metron-common/src/main/java/org/apache/metron/common/bolt/ConfiguredBolt.java
@@ -30,7 +30,7 @@ import org.apache.curator.retry.ExponentialBackoffRetry;
 import org.apache.log4j.Logger;
 import org.apache.metron.common.Constants;
 import org.apache.metron.common.configuration.Configurations;
-import org.apache.metron.common.cli.ConfigurationsUtils;
+import org.apache.metron.common.configuration.ConfigurationsUtils;
 
 import java.io.IOException;
 import java.util.Map;

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/deed21e6/metron-platform/metron-common/src/main/java/org/apache/metron/common/cli/ConfigurationManager.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/cli/ConfigurationManager.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/cli/ConfigurationManager.java
new file mode 100644
index 0000000..9685841
--- /dev/null
+++ b/metron-platform/metron-common/src/main/java/org/apache/metron/common/cli/ConfigurationManager.java
@@ -0,0 +1,200 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.metron.common.cli;
+
+import com.google.common.base.Function;
+import com.google.common.base.Joiner;
+import com.google.common.io.Files;
+import org.apache.commons.cli.*;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.metron.common.configuration.ConfigurationsUtils;
+import org.apache.metron.common.configuration.ConfigurationType;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.PrintWriter;
+import java.nio.charset.Charset;
+
+public class ConfigurationManager {
+  public enum ConfigurationOptions {
+    HELP("h", s -> new Option(s, "help", false, "Generate Help screen"))
+   ,INPUT("i", s -> OptionBuilder.isRequired(false)
+                                 .withLongOpt("input_dir")
+                                 .hasArg()
+                                 .withArgName("DIR")
+                                 .withDescription("The input directory containing the configuration files named like \"$source.json\"")
+                                 .create(s)
+         )
+    ,OUTPUT("o", s -> OptionBuilder.isRequired(false)
+                                 .hasArg()
+                                 .withLongOpt("output_dir")
+                                 .withArgName("DIR")
+                                 .withDescription("The output directory which will store the JSON configuration from Zookeeper")
+                                 .create(s)
+         )
+    ,ZK_QUORUM("z", s -> OptionBuilder.isRequired(true)
+                                 .hasArg()
+                                 .withLongOpt("zk_quorum")
+                                 .withArgName("host:port,[host:port]*")
+                                 .withDescription("Zookeeper Quorum URL (zk1:port,zk2:port,...)")
+                                 .create(s)
+         )
+    ,MODE("m", s -> OptionBuilder.isRequired(true)
+                                 .hasArg()
+                                 .withLongOpt("mode")
+                                 .withArgName("MODE")
+                                 .withDescription("The mode of operation: DUMP, PULL, PUSH")
+                                 .create(s)
+         )
+    ,FORCE("f", s -> new Option(s, "force", false, "Force operation"))
+    ;
+    Option option;
+    String shortCode;
+    ConfigurationOptions(String shortCode, Function<String, Option> optionHandler) {
+      this.shortCode = shortCode;
+      this.option = optionHandler.apply(shortCode);
+    }
+
+    public boolean has(CommandLine cli) {
+      return cli.hasOption(shortCode);
+    }
+
+    public String get(CommandLine cli) {
+      return cli.getOptionValue(shortCode);
+    }
+
+    public static CommandLine parse(CommandLineParser parser, String[] args) {
+      try {
+        CommandLine cli = parser.parse(getOptions(), args);
+        if(ConfigurationOptions.HELP.has(cli)) {
+          printHelp();
+          System.exit(0);
+        }
+        return cli;
+      } catch (ParseException e) {
+        System.err.println("Unable to parse args: " + Joiner.on(' ').join(args));
+        e.printStackTrace(System.err);
+        printHelp();
+        System.exit(-1);
+        return null;
+      }
+    }
+
+    public static void printHelp() {
+      HelpFormatter formatter = new HelpFormatter();
+      formatter.printHelp( "configuration_manager", getOptions());
+    }
+
+    public static Options getOptions() {
+      Options ret = new Options();
+      for(ConfigurationOptions o : ConfigurationOptions.values()) {
+        ret.addOption(o.option);
+      }
+      return ret;
+    }
+  }
+
+  public void dump(CuratorFramework client) throws Exception {
+    ConfigurationsUtils.dumpConfigs(System.out, client);
+  }
+
+
+
+  public void pull(CuratorFramework client, String outFileStr, final boolean force) throws Exception {
+    final File outputDir = new File(outFileStr);
+    if (!outputDir.exists()) {
+      if (!outputDir.mkdirs()) {
+        throw new IllegalStateException("Unable to make directories: " + outputDir.getAbsolutePath());
+      }
+    }
+
+    ConfigurationsUtils.visitConfigs(client, new ConfigurationsUtils.ConfigurationVisitor() {
+      @Override
+      public void visit(ConfigurationType configurationType, String name, String data) {
+        File out = getFile(outputDir, configurationType, name);
+        if (!out.exists() || force) {
+          if(!out.exists()) {
+            out.getParentFile().mkdirs();
+          }
+          try {
+            Files.write(data, out, Charset.defaultCharset());
+          } catch (IOException e) {
+            throw new RuntimeException("Sorry, something bad happened writing the config to " + out.getAbsolutePath() + ": " + e.getMessage(), e);
+          }
+        }
+        else if(out.exists() && !force) {
+          throw new IllegalStateException("Unable to overwrite existing file (" + out.getAbsolutePath() + ") without the force flag (-f or --force) being set.");
+        }
+      }
+    });
+  }
+
+  public void push(String inputDirStr, CuratorFramework client) throws Exception {
+      final File inputDir = new File(inputDirStr);
+
+      if(!inputDir.exists() || !inputDir.isDirectory()) {
+        throw new IllegalStateException("Input directory: " + inputDir + " does not exist or is not a directory.");
+      }
+      ConfigurationsUtils.uploadConfigsToZookeeper(inputDirStr, client);
+  }
+
+  public void run(CommandLine cli) throws Exception {
+    try(CuratorFramework client = ConfigurationsUtils.getClient(ConfigurationOptions.ZK_QUORUM.get(cli))) {
+      client.start();
+      run(client, cli);
+    }
+  }
+  public void run(CuratorFramework client, CommandLine cli) throws Exception {
+    final boolean force = ConfigurationOptions.FORCE.has(cli);
+    String mode = ConfigurationOptions.MODE.get(cli);
+
+    if (mode.toLowerCase().equals("push")) {
+      String inputDirStr = ConfigurationOptions.INPUT.get(cli);
+      push(inputDirStr, client);
+    }
+    else {
+
+      switch (mode.toLowerCase()) {
+
+        case "dump":
+          dump(client);
+          break;
+
+        case "pull":
+          pull(client, ConfigurationOptions.OUTPUT.get(cli), force);
+          break;
+
+        default:
+          throw new IllegalStateException("Invalid mode: " + mode + " expected DUMP, PULL or PUSH");
+      }
+    }
+  }
+
+  private static File getFile(File baseDir, ConfigurationType configurationType, String name) {
+    return new File(new File(baseDir, configurationType.getDirectory()), name + ".json");
+  }
+
+  public static void main(String... argv) throws Exception {
+    CommandLineParser parser = new PosixParser();
+    CommandLine cli = ConfigurationOptions.parse(parser, argv);
+    ConfigurationManager manager = new ConfigurationManager();
+    manager.run(cli);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/deed21e6/metron-platform/metron-common/src/main/java/org/apache/metron/common/cli/ConfigurationsUtils.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/cli/ConfigurationsUtils.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/cli/ConfigurationsUtils.java
deleted file mode 100644
index 27f4c2a..0000000
--- a/metron-platform/metron-common/src/main/java/org/apache/metron/common/cli/ConfigurationsUtils.java
+++ /dev/null
@@ -1,232 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.metron.common.cli;
-
-import org.apache.commons.cli.CommandLine;
-import org.apache.commons.cli.CommandLineParser;
-import org.apache.commons.cli.HelpFormatter;
-import org.apache.commons.cli.Option;
-import org.apache.commons.cli.Options;
-import org.apache.commons.cli.ParseException;
-import org.apache.commons.cli.PosixParser;
-import org.apache.commons.io.FilenameUtils;
-import org.apache.curator.RetryPolicy;
-import org.apache.curator.framework.CuratorFramework;
-import org.apache.curator.framework.CuratorFrameworkFactory;
-import org.apache.curator.retry.ExponentialBackoffRetry;
-import org.apache.metron.common.Constants;
-import org.apache.metron.common.configuration.Configurations;
-import org.apache.metron.common.configuration.SensorEnrichmentConfig;
-import org.apache.metron.common.utils.JSONUtils;
-import org.apache.zookeeper.KeeperException;
-
-import java.io.File;
-import java.io.IOException;
-import java.nio.file.Files;
-import java.nio.file.Paths;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-public class ConfigurationsUtils {
-
-  public static CuratorFramework getClient(String zookeeperUrl) {
-    RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
-    return CuratorFrameworkFactory.newClient(zookeeperUrl, retryPolicy);
-  }
-
-  public static void writeGlobalConfigToZookeeper(Map<String, Object> globalConfig, String zookeeperUrl) throws Exception {
-    writeGlobalConfigToZookeeper(JSONUtils.INSTANCE.toJSON(globalConfig), zookeeperUrl);
-  }
-
-  public static void writeGlobalConfigToZookeeper(byte[] globalConfig, String zookeeperUrl) throws Exception {
-    CuratorFramework client = getClient(zookeeperUrl);
-    client.start();
-    try {
-      writeGlobalConfigToZookeeper(globalConfig, client);
-    }
-    finally {
-      client.close();
-    }
-  }
-
-  public static void writeGlobalConfigToZookeeper(byte[] globalConfig, CuratorFramework client) throws Exception {
-    writeToZookeeper(Constants.ZOOKEEPER_GLOBAL_ROOT, globalConfig, client);
-  }
-
-  public static void writeSensorEnrichmentConfigToZookeeper(String sensorType, SensorEnrichmentConfig sensorEnrichmentConfig, String zookeeperUrl) throws Exception {
-    writeSensorEnrichmentConfigToZookeeper(sensorType, JSONUtils.INSTANCE.toJSON(sensorEnrichmentConfig), zookeeperUrl);
-  }
-
-  public static void writeSensorEnrichmentConfigToZookeeper(String sensorType, byte[] configData, String zookeeperUrl) throws Exception {
-    CuratorFramework client = getClient(zookeeperUrl);
-    client.start();
-    try {
-      writeSensorEnrichmentConfigToZookeeper(sensorType, configData, client);
-    }
-    finally {
-      client.close();
-    }
-  }
-
-  public static void writeSensorEnrichmentConfigToZookeeper(String sensorType, byte[] configData, CuratorFramework client) throws Exception {
-    writeToZookeeper(Constants.ZOOKEEPER_SENSOR_ROOT + "/" + sensorType, configData, client);
-  }
-
-  public static void writeConfigToZookeeper(String name, Map<String, Object> config, String zookeeperUrl) throws Exception {
-    writeConfigToZookeeper(name, JSONUtils.INSTANCE.toJSON(config), zookeeperUrl);
-  }
-
-  public static void writeConfigToZookeeper(String name, byte[] config, String zookeeperUrl) throws Exception {
-    CuratorFramework client = getClient(zookeeperUrl);
-    client.start();
-    try {
-      writeToZookeeper(Constants.ZOOKEEPER_TOPOLOGY_ROOT + "/" + name, config, client);
-    }
-    finally {
-      client.close();
-    }
-  }
-
-  public static void writeToZookeeper(String path, byte[] configData, CuratorFramework client) throws Exception {
-    try {
-      client.setData().forPath(path, configData);
-    } catch (KeeperException.NoNodeException e) {
-      client.create().creatingParentsIfNeeded().forPath(path, configData);
-    }
-  }
-
-  public static void updateConfigsFromZookeeper(Configurations configurations, CuratorFramework client) throws Exception {
-    configurations.updateGlobalConfig(readGlobalConfigBytesFromZookeeper(client));
-    List<String> sensorTypes = client.getChildren().forPath(Constants.ZOOKEEPER_SENSOR_ROOT);
-    for(String sensorType: sensorTypes) {
-      configurations.updateSensorEnrichmentConfig(sensorType, readSensorEnrichmentConfigBytesFromZookeeper(sensorType, client));
-    }
-  }
-
-  public static byte[] readGlobalConfigBytesFromZookeeper(CuratorFramework client) throws Exception {
-    return readFromZookeeper(Constants.ZOOKEEPER_GLOBAL_ROOT, client);
-  }
-
-  public static byte[] readSensorEnrichmentConfigBytesFromZookeeper(String sensorType, CuratorFramework client) throws Exception {
-    return readFromZookeeper(Constants.ZOOKEEPER_SENSOR_ROOT + "/" + sensorType, client);
-  }
-
-  public static byte[] readConfigBytesFromZookeeper(String name, CuratorFramework client) throws Exception {
-    return readFromZookeeper(Constants.ZOOKEEPER_TOPOLOGY_ROOT + "/" + name, client);
-  }
-
-  public static byte[] readFromZookeeper(String path, CuratorFramework client) throws Exception {
-    return client.getData().forPath(path);
-  }
-
-  public static void uploadConfigsToZookeeper(String rootFilePath, String zookeeperUrl) throws Exception {
-    ConfigurationsUtils.writeGlobalConfigToZookeeper(readGlobalConfigFromFile(rootFilePath), zookeeperUrl);
-    Map<String, byte[]> sensorEnrichmentConfigs = readSensorEnrichmentConfigsFromFile(rootFilePath);
-    for(String sensorType: sensorEnrichmentConfigs.keySet()) {
-      ConfigurationsUtils.writeSensorEnrichmentConfigToZookeeper(sensorType, sensorEnrichmentConfigs.get(sensorType), zookeeperUrl);
-    }
-  }
-
-  public static byte[] readGlobalConfigFromFile(String rootFilePath) throws IOException {
-    return Files.readAllBytes(Paths.get(rootFilePath, Constants.GLOBAL_CONFIG_NAME + ".json"));
-  }
-
-  public static Map<String, byte[]> readSensorEnrichmentConfigsFromFile(String rootPath) throws IOException {
-    Map<String, byte[]> sensorEnrichmentConfigs = new HashMap<>();
-    for(File file: new File(rootPath, Constants.SENSORS_CONFIG_NAME).listFiles()) {
-      sensorEnrichmentConfigs.put(FilenameUtils.removeExtension(file.getName()), Files.readAllBytes(file.toPath()));
-    }
-    return sensorEnrichmentConfigs;
-  }
-
-  public static void dumpConfigs(String zookeeperUrl) throws Exception {
-    CuratorFramework client = getClient(zookeeperUrl);
-    client.start();
-    //Output global configs
-    {
-      System.out.println("Global config");
-      byte[] globalConfigData = client.getData().forPath(Constants.ZOOKEEPER_GLOBAL_ROOT);
-      System.out.println(new String(globalConfigData));
-    }
-    //Output sensor specific configs
-    {
-      List<String> children = client.getChildren().forPath(Constants.ZOOKEEPER_SENSOR_ROOT);
-      for (String child : children) {
-        byte[] data = client.getData().forPath(Constants.ZOOKEEPER_SENSOR_ROOT + "/" + child);
-        System.out.println("Config for source " + child);
-        System.out.println(new String(data));
-        System.out.println();
-      }
-    }
-    client.close();
-  }
-
-  public static void main(String[] args) {
-
-    Options options = new Options();
-    {
-      Option o = new Option("h", "help", false, "This screen");
-      o.setRequired(false);
-      options.addOption(o);
-    }
-    {
-      Option o = new Option("p", "config_files", true, "Path to the source config files.  Must be named like \"$source\".json");
-      o.setArgName("DIR_NAME");
-      o.setRequired(false);
-      options.addOption(o);
-    }
-    {
-      Option o = new Option("z", "zk", true, "Zookeeper Quroum URL (zk1:2181,zk2:2181,...");
-      o.setArgName("ZK_QUORUM");
-      o.setRequired(true);
-      options.addOption(o);
-    }
-
-    try {
-      CommandLineParser parser = new PosixParser();
-      CommandLine cmd = null;
-      try {
-        cmd = parser.parse(options, args);
-      } catch (ParseException pe) {
-        pe.printStackTrace();
-        final HelpFormatter usageFormatter = new HelpFormatter();
-        usageFormatter.printHelp("ConfigurationsUtils", null, options, null, true);
-        System.exit(-1);
-      }
-      if (cmd.hasOption("h")) {
-        final HelpFormatter usageFormatter = new HelpFormatter();
-        usageFormatter.printHelp("ConfigurationsUtils", null, options, null, true);
-        System.exit(0);
-      }
-
-      String zkQuorum = cmd.getOptionValue("z");
-      if (cmd.hasOption("p")) {
-        String sourcePath = cmd.getOptionValue("p");
-        uploadConfigsToZookeeper(sourcePath, zkQuorum);
-      }
-
-      ConfigurationsUtils.dumpConfigs(zkQuorum);
-
-    } catch (Exception e) {
-      e.printStackTrace();
-      System.exit(-1);
-    }
-
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/deed21e6/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/Configuration.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/Configuration.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/Configuration.java
index 1ccf47b..e526ee4 100644
--- a/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/Configuration.java
+++ b/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/Configuration.java
@@ -18,7 +18,6 @@
 package org.apache.metron.common.configuration;
 
 import org.apache.curator.framework.CuratorFramework;
-import org.apache.metron.common.cli.ConfigurationsUtils;
 
 import java.nio.file.Path;
 import java.util.Map;

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/deed21e6/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/ConfigurationType.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/ConfigurationType.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/ConfigurationType.java
new file mode 100644
index 0000000..2b9f6cf
--- /dev/null
+++ b/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/ConfigurationType.java
@@ -0,0 +1,75 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.metron.common.configuration;
+
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.google.common.base.Function;
+import org.apache.metron.common.Constants;
+import org.apache.metron.common.configuration.enrichment.SensorEnrichmentConfig;
+import org.apache.metron.common.utils.JSONUtils;
+
+import java.io.IOException;
+import java.util.Map;
+
+public enum ConfigurationType implements Function<String, Object> {
+  GLOBAL("."
+        ,Constants.ZOOKEEPER_GLOBAL_ROOT
+        , s -> {
+    try {
+      return JSONUtils.INSTANCE.load(s, new TypeReference<Map<String, Object>>() {
+      });
+    } catch (IOException e) {
+      throw new RuntimeException("Unable to load " + s, e);
+    }
+  })
+  , SENSOR(Constants.SENSORS_CONFIG_NAME
+          ,Constants.ZOOKEEPER_SENSOR_ROOT
+          , s -> {
+    try {
+      return JSONUtils.INSTANCE.load(s, SensorEnrichmentConfig.class);
+    } catch (IOException e) {
+      throw new RuntimeException("Unable to load " + s, e);
+    }
+  });
+  String directory;
+  String zookeeperRoot;
+  Function<String,?> deserializer;
+  ConfigurationType(String directory, String zookeeperRoot, Function<String, ?> deserializer) {
+    this.directory = directory;
+    this.zookeeperRoot = zookeeperRoot;
+    this.deserializer = deserializer;
+  }
+
+  public String getDirectory() {
+    return directory;
+  }
+
+  public Object deserialize(String s)
+  {
+    return deserializer.apply(s);
+  }
+  @Override
+  public Object apply(String s) {
+    return deserialize(s);
+  }
+
+  public String getZookeeperRoot() {
+    return zookeeperRoot;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/deed21e6/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/Configurations.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/Configurations.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/Configurations.java
index 6aaa2b4..a152d40 100644
--- a/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/Configurations.java
+++ b/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/Configurations.java
@@ -19,6 +19,7 @@ package org.apache.metron.common.configuration;
 
 import com.fasterxml.jackson.core.type.TypeReference;
 import org.apache.log4j.Logger;
+import org.apache.metron.common.configuration.enrichment.SensorEnrichmentConfig;
 import org.apache.metron.common.utils.JSONUtils;
 
 import java.io.ByteArrayInputStream;

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/deed21e6/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/ConfigurationsUtils.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/ConfigurationsUtils.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/ConfigurationsUtils.java
new file mode 100644
index 0000000..1aa2ca8
--- /dev/null
+++ b/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/ConfigurationsUtils.java
@@ -0,0 +1,195 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.common.configuration;
+
+import org.apache.commons.io.FilenameUtils;
+import org.apache.curator.RetryPolicy;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.retry.ExponentialBackoffRetry;
+import org.apache.metron.common.Constants;
+import org.apache.metron.common.configuration.enrichment.SensorEnrichmentConfig;
+import org.apache.metron.common.utils.JSONUtils;
+import org.apache.zookeeper.KeeperException;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.PrintStream;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class ConfigurationsUtils {
+
+  public static CuratorFramework getClient(String zookeeperUrl) {
+    RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
+    return CuratorFrameworkFactory.newClient(zookeeperUrl, retryPolicy);
+  }
+
+  public static void writeGlobalConfigToZookeeper(Map<String, Object> globalConfig, String zookeeperUrl) throws Exception {
+    try(CuratorFramework client = getClient(zookeeperUrl)) {
+     client.start();
+      writeGlobalConfigToZookeeper(globalConfig, client);
+    }
+  }
+  public static void writeGlobalConfigToZookeeper(Map<String, Object> globalConfig, CuratorFramework client) throws Exception {
+    writeGlobalConfigToZookeeper(JSONUtils.INSTANCE.toJSON(globalConfig), client);
+  }
+
+  public static void writeGlobalConfigToZookeeper(byte[] globalConfig, String zookeeperUrl) throws Exception {
+    CuratorFramework client = getClient(zookeeperUrl);
+    client.start();
+    try {
+      writeGlobalConfigToZookeeper(globalConfig, client);
+    }
+    finally {
+      client.close();
+    }
+  }
+
+  public static void writeGlobalConfigToZookeeper(byte[] globalConfig, CuratorFramework client) throws Exception {
+    ConfigurationType.GLOBAL.deserialize(new String(globalConfig));
+    writeToZookeeper(ConfigurationType.GLOBAL.getZookeeperRoot(), globalConfig, client);
+  }
+
+  public static void writeSensorEnrichmentConfigToZookeeper(String sensorType, SensorEnrichmentConfig sensorEnrichmentConfig, String zookeeperUrl) throws Exception {
+    writeSensorEnrichmentConfigToZookeeper(sensorType, JSONUtils.INSTANCE.toJSON(sensorEnrichmentConfig), zookeeperUrl);
+  }
+
+  public static void writeSensorEnrichmentConfigToZookeeper(String sensorType, byte[] configData, String zookeeperUrl) throws Exception {
+    CuratorFramework client = getClient(zookeeperUrl);
+    client.start();
+    try {
+      writeSensorEnrichmentConfigToZookeeper(sensorType, configData, client);
+    }
+    finally {
+      client.close();
+    }
+  }
+
+  public static void writeSensorEnrichmentConfigToZookeeper(String sensorType, byte[] configData, CuratorFramework client) throws Exception {
+    ConfigurationType.SENSOR.deserialize(new String(configData));
+    writeToZookeeper(ConfigurationType.SENSOR.getZookeeperRoot()+ "/" + sensorType, configData, client);
+  }
+
+  public static void writeConfigToZookeeper(String name, Map<String, Object> config, String zookeeperUrl) throws Exception {
+    writeConfigToZookeeper(name, JSONUtils.INSTANCE.toJSON(config), zookeeperUrl);
+  }
+
+  public static void writeConfigToZookeeper(String name, byte[] config, String zookeeperUrl) throws Exception {
+    CuratorFramework client = getClient(zookeeperUrl);
+    client.start();
+    try {
+      writeToZookeeper(Constants.ZOOKEEPER_TOPOLOGY_ROOT + "/" + name, config, client);
+    }
+    finally {
+      client.close();
+    }
+  }
+
+  public static void writeToZookeeper(String path, byte[] configData, CuratorFramework client) throws Exception {
+    try {
+      client.setData().forPath(path, configData);
+    } catch (KeeperException.NoNodeException e) {
+      client.create().creatingParentsIfNeeded().forPath(path, configData);
+    }
+  }
+
+  public static void updateConfigsFromZookeeper(Configurations configurations, CuratorFramework client) throws Exception {
+    configurations.updateGlobalConfig(readGlobalConfigBytesFromZookeeper(client));
+    List<String> sensorTypes = client.getChildren().forPath(Constants.ZOOKEEPER_SENSOR_ROOT);
+    for(String sensorType: sensorTypes) {
+      configurations.updateSensorEnrichmentConfig(sensorType, readSensorEnrichmentConfigBytesFromZookeeper(sensorType, client));
+    }
+  }
+
+  public static byte[] readGlobalConfigBytesFromZookeeper(CuratorFramework client) throws Exception {
+    return readFromZookeeper(Constants.ZOOKEEPER_GLOBAL_ROOT, client);
+  }
+
+  public static byte[] readSensorEnrichmentConfigBytesFromZookeeper(String sensorType, CuratorFramework client) throws Exception {
+    return readFromZookeeper(Constants.ZOOKEEPER_SENSOR_ROOT + "/" + sensorType, client);
+  }
+
+  public static byte[] readConfigBytesFromZookeeper(String name, CuratorFramework client) throws Exception {
+    return readFromZookeeper(Constants.ZOOKEEPER_TOPOLOGY_ROOT + "/" + name, client);
+  }
+
+  public static byte[] readFromZookeeper(String path, CuratorFramework client) throws Exception {
+    return client.getData().forPath(path);
+  }
+
+  public static void uploadConfigsToZookeeper(String rootFilePath, CuratorFramework client) throws Exception {
+    ConfigurationsUtils.writeGlobalConfigToZookeeper(readGlobalConfigFromFile(rootFilePath), client);
+    Map<String, byte[]> sensorEnrichmentConfigs = readSensorEnrichmentConfigsFromFile(rootFilePath);
+    for(String sensorType: sensorEnrichmentConfigs.keySet()) {
+      ConfigurationsUtils.writeSensorEnrichmentConfigToZookeeper(sensorType, sensorEnrichmentConfigs.get(sensorType), client);
+    }
+  }
+
+  public static void uploadConfigsToZookeeper(String rootFilePath, String zookeeperUrl) throws Exception {
+    try(CuratorFramework client = getClient(zookeeperUrl)) {
+      client.start();
+      uploadConfigsToZookeeper(rootFilePath, client);
+    }
+  }
+
+  public static byte[] readGlobalConfigFromFile(String rootFilePath) throws IOException {
+    return Files.readAllBytes(Paths.get(rootFilePath, Constants.GLOBAL_CONFIG_NAME + ".json"));
+  }
+
+  public static Map<String, byte[]> readSensorEnrichmentConfigsFromFile(String rootPath) throws IOException {
+    Map<String, byte[]> sensorEnrichmentConfigs = new HashMap<>();
+    for(File file: new File(rootPath, Constants.SENSORS_CONFIG_NAME).listFiles()) {
+      if(file.getName().endsWith(".json")) {
+        sensorEnrichmentConfigs.put(FilenameUtils.removeExtension(file.getName()), Files.readAllBytes(file.toPath()));
+      }
+    }
+    return sensorEnrichmentConfigs;
+  }
+
+  public interface ConfigurationVisitor{
+    void visit(ConfigurationType configurationType, String name, String data);
+  }
+  public static void visitConfigs(CuratorFramework client, ConfigurationVisitor callback) throws Exception {
+    //Output global configs
+    {
+      ConfigurationType configType = ConfigurationType.GLOBAL;
+      byte[] globalConfigData = client.getData().forPath(configType.getZookeeperRoot());
+      callback.visit(configType, "global", new String(globalConfigData));
+    }
+    //Output sensor specific configs
+    {
+      ConfigurationType configType = ConfigurationType.SENSOR;
+      List<String> children = client.getChildren().forPath(configType.getZookeeperRoot());
+      for (String child : children) {
+        byte[] data = client.getData().forPath(configType.getZookeeperRoot() + "/" + child);
+        callback.visit(configType, child, new String(data));
+      }
+    }
+  }
+  public static void dumpConfigs(PrintStream out, CuratorFramework client) throws Exception {
+    ConfigurationsUtils.visitConfigs(client, (type, name, data) -> {
+      type.deserialize(data);
+      out.println(type + " Config: " + name + "\n" + data);
+    });
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/deed21e6/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/EnrichmentConfig.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/EnrichmentConfig.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/EnrichmentConfig.java
deleted file mode 100644
index bcc91fa..0000000
--- a/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/EnrichmentConfig.java
+++ /dev/null
@@ -1,213 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.metron.common.configuration;
-
-import com.google.common.base.Joiner;
-import org.apache.curator.framework.CuratorFramework;
-import org.apache.metron.common.Constants;
-import org.apache.metron.common.cli.ConfigurationsUtils;
-import org.apache.zookeeper.KeeperException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.*;
-
-public class EnrichmentConfig {
-  public static enum Type {
-     THREAT_INTEL
-    ,ENRICHMENT
-  }
-
-  protected static final Logger _LOG = LoggerFactory.getLogger(EnrichmentConfig.class);
-  public static class FieldList {
-    Type type;
-    Map<String, List<String>> fieldToEnrichmentTypes;
-
-    public Type getType() {
-      return type;
-    }
-
-    public void setType(Type type) {
-      this.type = type;
-    }
-
-    public Map<String, List<String>> getFieldToEnrichmentTypes() {
-      return fieldToEnrichmentTypes;
-    }
-
-    public void setFieldToEnrichmentTypes(Map<String, List<String>> fieldToEnrichmentTypes) {
-      this.fieldToEnrichmentTypes = fieldToEnrichmentTypes;
-    }
-  }
-  public String zkQuorum;
-  public Map<String, FieldList> sensorToFieldList;
-
-  public String getZkQuorum() {
-    return zkQuorum;
-  }
-
-  public void setZkQuorum(String zkQuorum) {
-    this.zkQuorum = zkQuorum;
-  }
-
-  public Map<String, FieldList> getSensorToFieldList() {
-    return sensorToFieldList;
-  }
-
-  public void setSensorToFieldList(Map<String, FieldList> sensorToFieldList) {
-    this.sensorToFieldList = sensorToFieldList;
-  }
-
-  public void updateSensorConfigs( ) throws Exception {
-    CuratorFramework client = ConfigurationsUtils.getClient(getZkQuorum());
-    try {
-      client.start();
-      updateSensorConfigs(new ZKSourceConfigHandler(client), sensorToFieldList);
-    }
-    finally {
-      client.close();
-    }
-  }
-
-  public static interface SourceConfigHandler {
-    SensorEnrichmentConfig readConfig(String sensor) throws Exception;
-    void persistConfig(String sensor, SensorEnrichmentConfig config) throws Exception;
-  }
-
-  public static class ZKSourceConfigHandler implements SourceConfigHandler {
-    CuratorFramework client;
-    public ZKSourceConfigHandler(CuratorFramework client) {
-      this.client = client;
-    }
-    @Override
-    public SensorEnrichmentConfig readConfig(String sensor) throws Exception {
-      SensorEnrichmentConfig sensorEnrichmentConfig = new SensorEnrichmentConfig();
-      try {
-        sensorEnrichmentConfig = SensorEnrichmentConfig.fromBytes(ConfigurationsUtils.readSensorEnrichmentConfigBytesFromZookeeper(sensor, client));
-      }catch (KeeperException.NoNodeException e) {
-        sensorEnrichmentConfig.setIndex(sensor);
-        sensorEnrichmentConfig.setBatchSize(1);
-      }
-      return sensorEnrichmentConfig;
-    }
-
-    @Override
-    public void persistConfig(String sensor, SensorEnrichmentConfig config) throws Exception {
-      ConfigurationsUtils.writeSensorEnrichmentConfigToZookeeper(sensor, config.toJSON().getBytes(), client);
-    }
-  }
-
-  public static void updateSensorConfigs( SourceConfigHandler scHandler
-                                        , Map<String, FieldList> sensorToFieldList
-                                        ) throws Exception
-  {
-    Map<String, SensorEnrichmentConfig> sourceConfigsChanged = new HashMap<>();
-    for (Map.Entry<String, FieldList> kv : sensorToFieldList.entrySet()) {
-      SensorEnrichmentConfig config = sourceConfigsChanged.get(kv.getKey());
-      if(config == null) {
-        config = scHandler.readConfig(kv.getKey());
-        if(_LOG.isDebugEnabled()) {
-          _LOG.debug(config.toJSON());
-        }
-      }
-      Map<String, List<String> > fieldMap = null;
-      Map<String, List<String> > fieldToTypeMap = null;
-      List<String> fieldList = null;
-      if(kv.getValue().type == Type.THREAT_INTEL) {
-        fieldMap = config.getThreatIntelFieldMap();
-        if(fieldMap!= null) {
-          fieldList = fieldMap.get(Constants.SIMPLE_HBASE_THREAT_INTEL);
-        } else {
-          fieldMap = new HashMap<>();
-        }
-        if(fieldList == null) {
-          fieldList = new ArrayList<>();
-          fieldMap.put(Constants.SIMPLE_HBASE_THREAT_INTEL, fieldList);
-        }
-        fieldToTypeMap = config.getFieldToThreatIntelTypeMap();
-        if(fieldToTypeMap == null) {
-          fieldToTypeMap = new HashMap<>();
-          config.setFieldToThreatIntelTypeMap(fieldToTypeMap);
-        }
-      }
-      else if(kv.getValue().type == Type.ENRICHMENT) {
-        fieldMap = config.getEnrichmentFieldMap();
-        if(fieldMap!= null) {
-          fieldList = fieldMap.get(Constants.SIMPLE_HBASE_ENRICHMENT);
-        } else {
-          fieldMap = new HashMap<>();
-        }
-        if(fieldList == null) {
-          fieldList = new ArrayList<>();
-          fieldMap.put(Constants.SIMPLE_HBASE_ENRICHMENT, fieldList);
-        }
-        fieldToTypeMap = config.getFieldToEnrichmentTypeMap();
-        if(fieldToTypeMap == null) {
-          fieldToTypeMap = new HashMap<>();
-          config.setFieldToEnrichmentTypeMap(fieldToTypeMap);
-        }
-      }
-      if(fieldToTypeMap == null  || fieldMap == null) {
-        _LOG.debug("fieldToTypeMap is null or fieldMap is null, so skipping");
-        continue;
-      }
-      //Add the additional fields to the field list associated with the hbase adapter
-      {
-        HashSet<String> fieldSet = new HashSet<>(fieldList);
-        List<String> additionalFields = new ArrayList<>();
-        for (String field : kv.getValue().getFieldToEnrichmentTypes().keySet()) {
-          if (!fieldSet.contains(field)) {
-            additionalFields.add(field);
-          }
-        }
-        //adding only the ones that we don't already have to the field list
-        if (additionalFields.size() > 0) {
-          _LOG.debug("Adding additional fields: " + Joiner.on(',').join(additionalFields));
-          fieldList.addAll(additionalFields);
-          sourceConfigsChanged.put(kv.getKey(), config);
-        }
-      }
-      //Add the additional enrichment types to the mapping between the fields
-      {
-        for(Map.Entry<String, List<String>> fieldToType : kv.getValue().getFieldToEnrichmentTypes().entrySet()) {
-          String field = fieldToType.getKey();
-          final HashSet<String> types = new HashSet<>(fieldToType.getValue());
-          int sizeBefore = 0;
-          if(fieldToTypeMap.containsKey(field)) {
-            List<String> typeList = fieldToTypeMap.get(field);
-            sizeBefore = new HashSet<>(typeList).size();
-            types.addAll(typeList);
-          }
-          int sizeAfter = types.size();
-          boolean changed = sizeBefore != sizeAfter;
-          if(changed) {
-            fieldToTypeMap.put(field, new ArrayList<String>() {{
-                addAll(types);
-              }});
-            sourceConfigsChanged.put(kv.getKey(), config);
-          }
-        }
-      }
-    }
-    for(Map.Entry<String, SensorEnrichmentConfig> kv : sourceConfigsChanged.entrySet()) {
-      scHandler.persistConfig(kv.getKey(), kv.getValue());
-    }
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/deed21e6/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/SensorEnrichmentConfig.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/SensorEnrichmentConfig.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/SensorEnrichmentConfig.java
deleted file mode 100644
index 6a45ec9..0000000
--- a/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/SensorEnrichmentConfig.java
+++ /dev/null
@@ -1,129 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.metron.common.configuration;
-
-import com.fasterxml.jackson.core.JsonProcessingException;
-import org.apache.metron.common.utils.JSONUtils;
-
-import java.io.IOException;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-public class SensorEnrichmentConfig {
-
-  private String index;
-  private Map<String, List<String>> enrichmentFieldMap = new HashMap<>();
-  private Map<String, List<String>> threatIntelFieldMap = new HashMap<>();
-  private Map<String, List<String>> fieldToEnrichmentTypeMap = new HashMap<>();
-  private Map<String, List<String>> fieldToThreatIntelTypeMap = new HashMap<>();
-  private int batchSize;
-
-  public String getIndex() {
-    return index;
-  }
-
-  public void setIndex(String index) {
-    this.index = index;
-  }
-
-  public Map<String, List<String>> getEnrichmentFieldMap() {
-    return enrichmentFieldMap;
-  }
-
-  public void setEnrichmentFieldMap(Map<String, List<String>> enrichmentFieldMap) {
-    this.enrichmentFieldMap = enrichmentFieldMap;
-  }
-
-  public Map<String, List<String>> getThreatIntelFieldMap() {
-    return threatIntelFieldMap;
-  }
-
-  public void setThreatIntelFieldMap(Map<String, List<String>> threatIntelFieldMap) {
-    this.threatIntelFieldMap = threatIntelFieldMap;
-  }
-
-  public Map<String, List<String>> getFieldToEnrichmentTypeMap() {
-    return fieldToEnrichmentTypeMap;
-  }
-
-  public Map<String, List<String>> getFieldToThreatIntelTypeMap() {
-    return fieldToThreatIntelTypeMap;
-  }
-  public void setFieldToEnrichmentTypeMap(Map<String, List<String>> fieldToEnrichmentTypeMap) {
-    this.fieldToEnrichmentTypeMap = fieldToEnrichmentTypeMap;
-  }
-
-  public void setFieldToThreatIntelTypeMap(Map<String, List<String>> fieldToThreatIntelTypeMap) {
-    this.fieldToThreatIntelTypeMap= fieldToThreatIntelTypeMap;
-  }
-  public int getBatchSize() {
-    return batchSize;
-  }
-
-  public void setBatchSize(int batchSize) {
-    this.batchSize = batchSize;
-  }
-
-  public static SensorEnrichmentConfig fromBytes(byte[] config) throws IOException {
-    return JSONUtils.INSTANCE.load(new String(config), SensorEnrichmentConfig.class);
-  }
-
-  public String toJSON() throws JsonProcessingException {
-    return JSONUtils.INSTANCE.toJSON(this, true);
-  }
-
-  @Override
-  public boolean equals(Object o) {
-    if (this == o) return true;
-    if (o == null || getClass() != o.getClass()) return false;
-
-    SensorEnrichmentConfig that = (SensorEnrichmentConfig) o;
-
-    if (getBatchSize() != that.getBatchSize()) return false;
-    if (getIndex() != null ? !getIndex().equals(that.getIndex()) : that.getIndex() != null) return false;
-    if (getEnrichmentFieldMap() != null ? !getEnrichmentFieldMap().equals(that.getEnrichmentFieldMap()) : that.getEnrichmentFieldMap() != null)
-      return false;
-    if (getThreatIntelFieldMap() != null ? !getThreatIntelFieldMap().equals(that.getThreatIntelFieldMap()) : that.getThreatIntelFieldMap() != null)
-      return false;
-    if (getFieldToEnrichmentTypeMap() != null ? !getFieldToEnrichmentTypeMap().equals(that.getFieldToEnrichmentTypeMap()) : that.getFieldToEnrichmentTypeMap() != null)
-      return false;
-    return getFieldToThreatIntelTypeMap() != null ? getFieldToThreatIntelTypeMap().equals(that.getFieldToThreatIntelTypeMap()) : that.getFieldToThreatIntelTypeMap() == null;
-
-  }
-
-  @Override
-  public String toString() {
-    return "{index=" + index + ", batchSize=" + batchSize +
-            ", enrichmentFieldMap=" + enrichmentFieldMap +
-            ", threatIntelFieldMap" + threatIntelFieldMap +
-            ", fieldToEnrichmentTypeMap=" + fieldToEnrichmentTypeMap +
-            ", fieldToThreatIntelTypeMap=" + fieldToThreatIntelTypeMap + "}";
-  }
-
-  @Override
-  public int hashCode() {
-    int result = getIndex() != null ? getIndex().hashCode() : 0;
-    result = 31 * result + (getEnrichmentFieldMap() != null ? getEnrichmentFieldMap().hashCode() : 0);
-    result = 31 * result + (getThreatIntelFieldMap() != null ? getThreatIntelFieldMap().hashCode() : 0);
-    result = 31 * result + (getFieldToEnrichmentTypeMap() != null ? getFieldToEnrichmentTypeMap().hashCode() : 0);
-    result = 31 * result + (getFieldToThreatIntelTypeMap() != null ? getFieldToThreatIntelTypeMap().hashCode() : 0);
-    result = 31 * result + getBatchSize();
-    return result;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/deed21e6/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/enrichment/EnrichmentConfig.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/enrichment/EnrichmentConfig.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/enrichment/EnrichmentConfig.java
new file mode 100644
index 0000000..af6c148
--- /dev/null
+++ b/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/enrichment/EnrichmentConfig.java
@@ -0,0 +1,71 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.metron.common.configuration.enrichment;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class EnrichmentConfig {
+  private Map<String, List<String>> fieldMap = new HashMap<>();
+  private Map<String, List<String>> fieldToTypeMap = new HashMap<>();
+
+  public Map<String, List<String>> getFieldMap() {
+    return fieldMap;
+  }
+
+  public void setFieldMap(Map<String, List<String>> fieldMap) {
+    this.fieldMap = fieldMap;
+  }
+
+  public Map<String, List<String>> getFieldToTypeMap() {
+    return fieldToTypeMap;
+  }
+
+  public void setFieldToTypeMap(Map<String, List<String>> fieldToTypeMap) {
+    this.fieldToTypeMap = fieldToTypeMap;
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) return true;
+    if (o == null || getClass() != o.getClass()) return false;
+
+    EnrichmentConfig that = (EnrichmentConfig) o;
+
+    if (getFieldMap() != null ? !getFieldMap().equals(that.getFieldMap()) : that.getFieldMap() != null) return false;
+    return getFieldToTypeMap() != null ? getFieldToTypeMap().equals(that.getFieldToTypeMap()) : that.getFieldToTypeMap() == null;
+
+  }
+
+  @Override
+  public int hashCode() {
+    int result = getFieldMap() != null ? getFieldMap().hashCode() : 0;
+    result = 31 * result + (getFieldToTypeMap() != null ? getFieldToTypeMap().hashCode() : 0);
+    return result;
+  }
+
+  @Override
+  public String toString() {
+    return "EnrichmentConfig{" +
+            "fieldMap=" + fieldMap +
+            ", fieldToTypeMap=" + fieldToTypeMap +
+            '}';
+  }
+}