You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@metron.apache.org by ce...@apache.org on 2016/05/10 16:53:57 UTC
[5/5] incubator-metron git commit: METRON-141: The ability to do
threat triage closes apache/incubator-metron#108
METRON-141: The ability to do threat triage closes apache/incubator-metron#108
Project: http://git-wip-us.apache.org/repos/asf/incubator-metron/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-metron/commit/deed21e6
Tree: http://git-wip-us.apache.org/repos/asf/incubator-metron/tree/deed21e6
Diff: http://git-wip-us.apache.org/repos/asf/incubator-metron/diff/deed21e6
Branch: refs/heads/master
Commit: deed21e6e8d8a8924c199c38e68c69481263d0d1
Parents: 743f37b
Author: cstella <ce...@gmail.com>
Authored: Tue May 10 12:53:35 2016 -0400
Committer: cstella <ce...@gmail.com>
Committed: Tue May 10 12:53:35 2016 -0400
----------------------------------------------------------------------
.../files/config/sensors/bro.json | 25 +-
.../files/config/sensors/pcap.json | 26 +-
.../files/config/sensors/snort.json | 34 +-
.../files/config/sensors/yaf.json | 29 +-
.../metron_streaming/tasks/source_config.yml | 2 +-
metron-platform/metron-common/README.md | 186 +++
metron-platform/metron-common/pom.xml | 22 +
.../metron/common/query/generated/Predicate.g4 | 131 +++
.../src/main/java/Predicate.tokens | 31 +
.../src/main/java/PredicateLexer.tokens | 31 +
.../metron/common/aggregator/Aggregator.java | 26 +
.../metron/common/aggregator/Aggregators.java | 62 +
.../metron/common/bolt/ConfiguredBolt.java | 2 +-
.../metron/common/cli/ConfigurationManager.java | 200 ++++
.../metron/common/cli/ConfigurationsUtils.java | 232 ----
.../common/configuration/Configuration.java | 1 -
.../common/configuration/ConfigurationType.java | 75 ++
.../common/configuration/Configurations.java | 1 +
.../configuration/ConfigurationsUtils.java | 195 +++
.../common/configuration/EnrichmentConfig.java | 213 ----
.../configuration/SensorEnrichmentConfig.java | 129 --
.../enrichment/EnrichmentConfig.java | 71 ++
.../enrichment/SensorEnrichmentConfig.java | 108 ++
.../SensorEnrichmentUpdateConfig.java | 209 ++++
.../common/configuration/enrichment/Type.java | 24 +
.../threatintel/ThreatIntelConfig.java | 59 +
.../threatintel/ThreatTriageConfig.java | 101 ++
.../apache/metron/common/query/BooleanOp.java | 23 +
.../metron/common/query/ErrorListener.java | 50 +
.../metron/common/query/FunctionMarker.java | 21 +
.../metron/common/query/LogicalFunctions.java | 99 ++
.../common/query/MapVariableResolver.java | 33 +
.../metron/common/query/ParseException.java | 28 +
.../metron/common/query/PredicateProcessor.java | 59 +
.../metron/common/query/PredicateToken.java | 58 +
.../metron/common/query/QueryCompiler.java | 288 +++++
.../metron/common/query/StringFunctions.java | 42 +
.../metron/common/query/VariableResolver.java | 23 +
.../query/generated/PredicateBaseListener.java | 336 ++++++
.../common/query/generated/PredicateLexer.java | 178 +++
.../query/generated/PredicateListener.java | 299 +++++
.../common/query/generated/PredicateParser.java | 1108 ++++++++++++++++++
.../src/main/scripts/zk_load_configs.sh | 2 +-
.../metron/common/bolt/ConfiguredBoltTest.java | 8 +-
.../ConfigurationManagerIntegrationTest.java | 176 +++
.../common/cli/ConfigurationsUtilsTest.java | 13 +-
.../configuration/EnrichmentConfigTest.java | 211 ----
.../SensorEnrichmentConfigTest.java | 2 +-
.../SensorEnrichmentUpdateConfigTest.java | 224 ++++
.../metron/common/query/QueryParserTest.java | 152 +++
.../src/test/resources/config/sensors/bro.json | 30 +-
.../src/main/assembly/assembly.xml | 2 +-
.../src/main/bash/Whois_CSV_to_JSON.py | 208 ----
.../src/main/bash/flatfile_loader.sh | 42 -
.../main/bash/prune_elasticsearch_indices.sh | 21 -
.../src/main/bash/prune_hdfs_files.sh | 21 -
.../src/main/bash/threatintel_bulk_load.sh | 41 -
.../src/main/bash/threatintel_bulk_prune.sh | 40 -
.../src/main/bash/threatintel_taxii_load.sh | 42 -
.../dataloads/bulk/ThreatIntelBulkLoader.java | 12 +-
.../SimpleEnrichmentFlatFileLoader.java | 12 +-
.../dataloads/nonbulk/taxii/TaxiiLoader.java | 10 +-
.../src/main/scripts/Whois_CSV_to_JSON.py | 208 ++++
.../src/main/scripts/flatfile_loader.sh | 42 +
.../main/scripts/prune_elasticsearch_indices.sh | 21 +
.../src/main/scripts/prune_hdfs_files.sh | 21 +
.../src/main/scripts/threatintel_bulk_load.sh | 41 +
.../src/main/scripts/threatintel_bulk_prune.sh | 40 +
.../src/main/scripts/threatintel_taxii_load.sh | 42 +
.../writer/ElasticsearchWriter.java | 2 +-
metron-platform/metron-enrichment/pom.xml | 1 +
.../simplehbase/SimpleHBaseAdapter.java | 2 +-
.../threatintel/ThreatIntelAdapter.java | 4 +-
.../enrichment/bolt/BulkMessageWriterBolt.java | 2 +-
.../apache/metron/enrichment/bolt/CacheKey.java | 2 +-
.../enrichment/bolt/EnrichmentJoinBolt.java | 10 +-
.../enrichment/bolt/EnrichmentSplitterBolt.java | 4 +-
.../enrichment/bolt/GenericEnrichmentBolt.java | 2 +-
.../apache/metron/enrichment/bolt/JoinBolt.java | 33 +-
.../enrichment/bolt/ThreatIntelJoinBolt.java | 59 +-
.../bolt/ThreatIntelSplitterBolt.java | 4 +-
.../triage/ThreatTriageProcessor.java | 51 +
.../simplehbase/SimpleHBaseAdapterTest.java | 34 +-
.../threatintel/ThreatIntelAdapterTest.java | 42 +-
.../bolt/GenericEnrichmentBoltTest.java | 4 +-
.../bolt/ThreatIntelJoinBoltTest.java | 65 +-
.../threatintel/triage/ThreatTriageTest.java | 136 +++
.../metron/integration/BaseIntegrationTest.java | 2 +-
.../integration/EnrichmentIntegrationTest.java | 2 +
.../metron/integration/utils/SampleUtil.java | 2 +-
.../resources/sample/config/sensors/bro.json | 34 +-
.../resources/sample/config/sensors/pcap.json | 25 +-
.../resources/sample/config/sensors/snort.json | 37 +-
.../resources/sample/config/sensors/yaf.json | 62 +-
.../SolrEnrichmentIntegrationTest.java | 2 +-
metron-platform/style/checkstyle.xml | 10 +-
metron-platform/style/suppressions.xml | 27 +
pom.xml | 1 +
98 files changed, 5793 insertions(+), 1422 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/deed21e6/metron-deployment/roles/metron_streaming/files/config/sensors/bro.json
----------------------------------------------------------------------
diff --git a/metron-deployment/roles/metron_streaming/files/config/sensors/bro.json b/metron-deployment/roles/metron_streaming/files/config/sensors/bro.json
index 2b534b4..0eb34b3 100644
--- a/metron-deployment/roles/metron_streaming/files/config/sensors/bro.json
+++ b/metron-deployment/roles/metron_streaming/files/config/sensors/bro.json
@@ -1,19 +1,20 @@
{
"index": "bro",
"batchSize": 5,
- "enrichmentFieldMap":
- {
- "geo": ["ip_dst_addr", "ip_src_addr"],
- "host": ["host"]
+ "enrichment" : {
+ "fieldMap": {
+ "geo": ["ip_dst_addr", "ip_src_addr"],
+ "host": ["host"]
+ }
},
- "threatIntelFieldMap":
- {
- "hbaseThreatIntel": ["ip_src_addr", "ip_dst_addr"]
- },
- "fieldToThreatIntelTypeMap":
- {
- "ip_src_addr" : ["malicious_ip"],
- "ip_dst_addr" : ["malicious_ip"]
+ "threatIntel": {
+ "fieldMap": {
+ "hbaseThreatIntel": ["ip_src_addr", "ip_dst_addr"]
+ },
+ "fieldToTypeMap": {
+ "ip_src_addr" : ["malicious_ip"],
+ "ip_dst_addr" : ["malicious_ip"]
+ }
}
}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/deed21e6/metron-deployment/roles/metron_streaming/files/config/sensors/pcap.json
----------------------------------------------------------------------
diff --git a/metron-deployment/roles/metron_streaming/files/config/sensors/pcap.json b/metron-deployment/roles/metron_streaming/files/config/sensors/pcap.json
index 7792165..8a3eab1 100644
--- a/metron-deployment/roles/metron_streaming/files/config/sensors/pcap.json
+++ b/metron-deployment/roles/metron_streaming/files/config/sensors/pcap.json
@@ -1,19 +1,23 @@
{
"index": "pcap",
"batchSize": 5,
- "enrichmentFieldMap":
- {
- "geo": ["ip_src_addr", "ip_dst_addr"],
- "host": ["ip_src_addr", "ip_dst_addr"]
+ "enrichment" : {
+ "fieldMap":
+ {
+ "geo": ["ip_src_addr", "ip_dst_addr"],
+ "host": ["ip_src_addr", "ip_dst_addr"]
+ }
},
- "threatIntelFieldMap":
- {
- "hbaseThreatIntel": ["ip_dst_addr", "ip_src_addr"]
- },
- "fieldToThreatIntelTypeMap":
- {
- "ip_dst_addr" : [ "malicious_ip" ]
+ "threatIntel" : {
+ "fieldMap":
+ {
+ "hbaseThreatIntel": ["ip_dst_addr", "ip_src_addr"]
+ },
+ "fieldToTypeMap":
+ {
+ "ip_dst_addr" : [ "malicious_ip" ]
,"ip_src_addr" : [ "malicious_ip" ]
+ }
}
}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/deed21e6/metron-deployment/roles/metron_streaming/files/config/sensors/snort.json
----------------------------------------------------------------------
diff --git a/metron-deployment/roles/metron_streaming/files/config/sensors/snort.json b/metron-deployment/roles/metron_streaming/files/config/sensors/snort.json
index c5b6dcc..9dfc80e 100644
--- a/metron-deployment/roles/metron_streaming/files/config/sensors/snort.json
+++ b/metron-deployment/roles/metron_streaming/files/config/sensors/snort.json
@@ -1,18 +1,28 @@
{
"index": "snort",
"batchSize": 1,
- "enrichmentFieldMap":
- {
- "geo": ["ip_dst_addr", "ip_src_addr"],
- "host": ["host"]
+ "enrichment" : {
+ "fieldMap":
+ {
+ "geo": ["ip_dst_addr", "ip_src_addr"],
+ "host": ["host"]
+ }
},
- "threatIntelFieldMap":
- {
- "hbaseThreatIntel": ["ip_src_addr", "ip_dst_addr"]
- },
- "fieldToThreatIntelTypeMap":
- {
- "ip_src_addr" : ["malicious_ip"],
- "ip_dst_addr" : ["malicious_ip"]
+ "threatIntel" : {
+ "fieldMap":
+ {
+ "hbaseThreatIntel": ["ip_src_addr", "ip_dst_addr"]
+ },
+ "fieldToTypeMap":
+ {
+ "ip_src_addr" : ["malicious_ip"],
+ "ip_dst_addr" : ["malicious_ip"]
+ },
+ "triageConfig" : {
+ "riskLevelRules" : {
+ "not(IN_SUBNET(ip_dst_addr, '192.168.0.0/24'))" : 10
+ },
+ "aggregator" : "MAX"
+ }
}
}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/deed21e6/metron-deployment/roles/metron_streaming/files/config/sensors/yaf.json
----------------------------------------------------------------------
diff --git a/metron-deployment/roles/metron_streaming/files/config/sensors/yaf.json b/metron-deployment/roles/metron_streaming/files/config/sensors/yaf.json
index 2b46c9a..4e67748 100644
--- a/metron-deployment/roles/metron_streaming/files/config/sensors/yaf.json
+++ b/metron-deployment/roles/metron_streaming/files/config/sensors/yaf.json
@@ -1,19 +1,22 @@
{
"index": "yaf",
"batchSize": 5,
- "enrichmentFieldMap":
- {
- "geo": ["ip_dst_addr", "ip_src_addr"],
- "host": ["host"]
+ "enrichment" : {
+ "fieldMap":
+ {
+ "geo": ["ip_dst_addr", "ip_src_addr"],
+ "host": ["host"]
+ }
},
- "threatIntelFieldMap":
- {
- "hbaseThreatIntel": ["ip_src_addr", "ip_dst_addr"]
- },
- "fieldToThreatIntelTypeMap":
- {
- "ip_src_addr" : ["malicious_ip"],
- "ip_dst_addr" : ["malicious_ip"]
+ "threatIntel": {
+ "fieldMap":
+ {
+ "hbaseThreatIntel": ["ip_src_addr", "ip_dst_addr"]
+ },
+ "fieldToTypeMap":
+ {
+ "ip_src_addr" : ["malicious_ip"],
+ "ip_dst_addr" : ["malicious_ip"]
+ }
}
}
-
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/deed21e6/metron-deployment/roles/metron_streaming/tasks/source_config.yml
----------------------------------------------------------------------
diff --git a/metron-deployment/roles/metron_streaming/tasks/source_config.yml b/metron-deployment/roles/metron_streaming/tasks/source_config.yml
index f3039af..21292b1 100644
--- a/metron-deployment/roles/metron_streaming/tasks/source_config.yml
+++ b/metron-deployment/roles/metron_streaming/tasks/source_config.yml
@@ -43,7 +43,7 @@
- ../roles/metron_streaming/files/config/
- name: Load Config
- shell: "{{ metron_directory }}/bin/zk_load_configs.sh -p {{ zookeeper_config_path }} -z {{ zookeeper_url }} && touch {{ zookeeper_config_path }}/configured"
+ shell: "{{ metron_directory }}/bin/zk_load_configs.sh --mode PUSH -i {{ zookeeper_config_path }} -z {{ zookeeper_url }} && touch {{ zookeeper_config_path }}/configured"
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/deed21e6/metron-platform/metron-common/README.md
----------------------------------------------------------------------
diff --git a/metron-platform/metron-common/README.md b/metron-platform/metron-common/README.md
new file mode 100644
index 0000000..60bd287
--- /dev/null
+++ b/metron-platform/metron-common/README.md
@@ -0,0 +1,186 @@
+#Query Language
+
+For a variety of components (currently only threat intelligence triage) we have the need to determine if a condition is true of the JSON documents being enriched. For those purposes, there exists a simple DSL created to define those conditions.
+
+The query language supports the following:
+* Referencing fields in the enriched JSON
+* Simple boolean operations: and, not, or
+* Determining whether a field exists (via `exists`)
+* The ability to have parenthesis to make order of operations explicit
+* A fixed set of functions which take strings and return boolean. Currently:
+ * `IN_SUBNET(ip, cidr1, cidr2, ...)`
+ * `IS_EMPTY(str)`
+ * `STARTS_WITH(str, prefix)`
+ * `ENDS_WITH(str, suffix)`
+ * `REGEXP_MATCH(str, pattern)`
+* A fixed set of string to string transformation functions:
+ * `TO_LOWER`
+ * `TO_UPPER`
+ * `TRIM`
+
+Example query:
+
+`IN_SUBNET( ip, '192.168.0.0/24') or ip in [ '10.0.0.1', '10.0.0.2' ] or exists(is_local)`
+
+This evaluates to true precisely when one of the following is true:
+* The value of the `ip` field is in the `192.168.0.0/24` subnet
+* The value of the `ip` field is `10.0.0.1` or `10.0.0.2`
+* The field `is_local` exists
+
+#Enrichment Configuration
+
+The configuration for the `enrichment` topology, the topology primarily
+responsible for enrichment and threat intelligence enrichment, is
+defined by JSON documents stored in zookeeper.
+
+There are two types of configurations at the moment, `global` and
+`sensor` specific.
+
+##Global Configuration
+The format of the global enrichment is a JSON String to Object map. This is intended for
+configuration which is non sensor specific configuration.
+
+This configuration is stored in zookeeper, but looks something like
+
+```json
+{
+ "es.clustername": "metron",
+ "es.ip": "node1",
+ "es.port": "9300",
+ "es.date.format": "yyyy.MM.dd.HH"
+}
+```
+
+##Sensor Enrichment Configuration
+
+The sensor specific configuration is intended to configure the
+individual enrichments and threat intelligence enrichments for a given
+sensor type (e.g. `snort`).
+
+Just like the global config, the format is a JSON stored in zookeeper.
+The configuration is a complex JSON object with the following top level fields:
+
+* `index` : The name of the sensor
+* `batchSize` : The size of the batch that is written to the indices at once.
+* `enrichment` : A complex JSON object representing the configuration of the enrichments
+* `threatIntel` : A complex JSON object representing the configuration of the threat intelligence enrichments
+
+###The `enrichment` Configuration
+
+
+| Field | Description | Example |
+|------------------|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|---------------------------------------------------------------------------|
+| `fieldToTypeMap` | In the case of a simple HBase enrichment (i.e. a key/value lookup), the mapping between fields and the enrichment types associated with those fields must be known. This enrichment type is used as part of the HBase key. | `"fieldToTypeMap" : { "ip_src_addr" : [ "asset_enrichment" ] }` |
+| `fieldMap` | The map of enrichment bolts names to fields in the JSON messages.,Each field is sent to the enrichment referenced in the key. | `"fieldMap": {"hbaseEnrichment": ["ip_src_addr","ip_dst_addr"]}` |
+
+###The `threatIntel` Configuration
+
+| Field | Description | Example |
+|------------------|--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|--------------------------------------------------------------------------|
+| `fieldToTypeMap` | In the case of a simple HBase threat intel enrichment (i.e. a key/value lookup), the mapping between fields and the enrichment types associated with those fields must be known. This enrichment type is used as part of the HBase key. | `"fieldToTypeMap" : { "ip_src_addr" : [ "malicious_ips" ] }` |
+| `fieldMap` | The map of threat intel enrichment bolts names to fields in the JSON messages. Each field is sent to the threat intel enrichment bolt referenced in the key. | `"fieldMap": {"hbaseThreatIntel": ["ip_src_addr","ip_dst_addr"]}` |
+| `triageConfig` | The configuration of the threat triage scorer. In the situation where a threat is detected, a score is assigned to the message and embedded in the indexed message. | `"riskLevelRules" : { "IN_SUBNET(ip_dst_addr, '192.168.0.0/24')" : 10 }` |
+
+The `triageConfig` field is also a complex field and it bears some description:
+
+| Field | Description | Example |
+|------------------|------------------------------------------------------------------------------------------------------------------------------------------------------------|--------------------------------------------------------------------------|
+| `riskLevelRules` | The mapping of Metron Query Language (see above) queries to a score. | `"riskLevelRules" : { "IN_SUBNET(ip_dst_addr, '192.168.0.0/24')" : 10 }` |
+| `aggregator` | An aggregation function that takes all non-zero scores representing the matching queries from `riskLevelRules` and aggregates them into a single score. | `"MAX"` |
+
+The supported aggregation functions are:
+* `MAX` : The max of all of the associated values for matching queries
+* `MIN` : The min of all of the associated values for matching queries
+* `MEAN` : The mean of all of the associated values for matching queries
+* `POSITIVE_MEAN` : The mean of the positive associated values for the matching queries.
+
+###Example
+
+An example configuration for the YAF sensor is as follows:
+```json
+{
+ "index": "yaf",
+ "batchSize": 5,
+ "enrichment": {
+ "fieldMap": {
+ "geo": [
+ "ip_src_addr",
+ "ip_dst_addr"
+ ],
+ "host": [
+ "ip_src_addr",
+ "ip_dst_addr"
+ ],
+ "hbaseEnrichment": [
+ "ip_src_addr",
+ "ip_dst_addr"
+ ]
+ }
+ ,"fieldToTypeMap": {
+ "ip_src_addr": [
+ "playful_classification"
+ ],
+ "ip_dst_addr": [
+ "playful_classification"
+ ]
+ }
+ },
+ "threatIntel": {
+ "fieldMap": {
+ "hbaseThreatIntel": [
+ "ip_src_addr",
+ "ip_dst_addr"
+ ]
+ },
+ "fieldToTypeMap": {
+ "ip_src_addr": [
+ "malicious_ip"
+ ],
+ "ip_dst_addr": [
+ "malicious_ip"
+ ]
+ },
+ "triageConfig" : {
+ "riskLevelRules" : {
+ "ip_src_addr == '10.0.2.3' or ip_dst_addr == '10.0.2.3'" : 10
+ },
+ "aggregator" : "MAX"
+ }
+ }
+}
+```
+
+
+##Management Utility
+
+Configurations should be stored on disk in the following structure starting at `$BASE_DIR`:
+* global.json : The global config
+* `sensors` : The subdirectory containing sensor enrichment configuration JSON (e.g. `snort.json`, `bro.json`)
+
+By default, this directory as deployed by the ansible infrastructure is at `$METRON_HOME/config/zookeeper`
+
+While the configs are stored on disk, they must be loaded into Zookeeper to be used. To this end, there is a
+utility program to assist in this called `$METRON_HOME/bin/zk_load_config.sh`
+
+This has the following options:
+
+```
+ -f,--force Force operation
+ -h,--help Generate Help screen
+ -i,--input_dir <DIR> The input directory containing
+ the configuration files named
+ like "$source.json"
+ -m,--mode <MODE> The mode of operation: DUMP,
+ PULL, PUSH
+ -o,--output_dir <DIR> The output directory which will
+ store the JSON configuration
+ from Zookeeper
+ -z,--zk_quorum <host:port,[host:port]*> Zookeeper Quorum URL
+ (zk1:port,zk2:port,...)
+```
+
+Usage examples:
+
+* To dump the existing configs from zookeeper on the singlenode vagrant machine: `$METRON_HOME/bin/zk_load_configs.sh -z node1:2181 -m DUMP`
+* To push the configs into zookeeper on the singlenode vagrant machine: `$METRON_HOME/bin/zk_load_configs.sh -z node1:2181 -m PUSH -i $METRON_HOME/config/zookeeper`
+* To pull the configs from zookeeper to the singlenode vagrant machine disk: `$METRON_HOME/bin/zk_load_configs.sh -z node1:2181 -m PULL -o $METRON_HOME/config/zookeeper -f`
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/deed21e6/metron-platform/metron-common/pom.xml
----------------------------------------------------------------------
diff --git a/metron-platform/metron-common/pom.xml b/metron-platform/metron-common/pom.xml
index 91d50fe..9d439e0 100644
--- a/metron-platform/metron-common/pom.xml
+++ b/metron-platform/metron-common/pom.xml
@@ -27,6 +27,7 @@
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<commons.config.version>1.10</commons.config.version>
+ <antlr.version>4.5</antlr.version>
</properties>
<repositories>
<repository>
@@ -47,6 +48,11 @@
<version>${global_json_simple_version}</version>
</dependency>
<dependency>
+ <groupId>org.antlr</groupId>
+ <artifactId>antlr4-runtime</artifactId>
+ <version>${antlr.version}</version>
+ </dependency>
+ <dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-core</artifactId>
<version>${global_storm_version}</version>
@@ -236,6 +242,22 @@
</configuration>
</plugin>
<plugin>
+ <groupId>org.antlr</groupId>
+ <artifactId>antlr4-maven-plugin</artifactId>
+ <version>${antlr.version}</version>
+ <configuration>
+ <outputDirectory>${basedir}/src/main/java</outputDirectory>
+ </configuration>
+ <executions>
+ <execution>
+ <goals>
+ <goal>antlr4</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+
+ <plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>${global_shade_version}</version>
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/deed21e6/metron-platform/metron-common/src/main/antlr4/org/apache/metron/common/query/generated/Predicate.g4
----------------------------------------------------------------------
diff --git a/metron-platform/metron-common/src/main/antlr4/org/apache/metron/common/query/generated/Predicate.g4 b/metron-platform/metron-common/src/main/antlr4/org/apache/metron/common/query/generated/Predicate.g4
new file mode 100644
index 0000000..73c60cd
--- /dev/null
+++ b/metron-platform/metron-common/src/main/antlr4/org/apache/metron/common/query/generated/Predicate.g4
@@ -0,0 +1,131 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+grammar Predicate;
+
+@header {
+//CHECKSTYLE:OFF
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+}
+
+/* Lexical rules */
+
+AND : 'and'
+ | '&&'
+ | 'AND'
+ ;
+OR : 'or'
+ | '||'
+ | 'OR';
+
+NOT : 'not'
+ | 'NOT';
+
+TRUE : 'true'
+ | 'TRUE' ;
+
+FALSE : 'false'
+ | 'FALSE';
+
+EQ : '==' ;
+NEQ : '!=' ;
+COMMA : ',';
+
+LBRACKET : '[';
+RBRACKET : ']';
+LPAREN : '(' ;
+RPAREN : ')' ;
+
+IN : 'in'
+ ;
+NIN : 'not in'
+ ;
+EXISTS : 'exists';
+IDENTIFIER : [a-zA-Z_][a-zA-Z_\.0-9]* ;
+fragment SCHAR: ~['"\\\r\n];
+STRING_LITERAL : '"' SCHAR* '"'
+ | '\'' SCHAR* '\'' ;
+SEMI : ';' ;
+
+
+// COMMENT and WS are stripped from the output token stream by sending
+// to a different channel 'skip'
+
+COMMENT : '//' .+? ('\n'|EOF) -> skip ;
+
+WS : [ \r\t\u000C\n]+ -> skip ;
+
+
+/* Parser rules */
+
+single_rule : logical_expr EOF;
+
+logical_expr
+ : logical_expr AND logical_expr # LogicalExpressionAnd
+ | logical_expr OR logical_expr # LogicalExpressionOr
+ | comparison_expr # ComparisonExpression
+ | LPAREN logical_expr RPAREN # LogicalExpressionInParen
+ | NOT LPAREN logical_expr RPAREN #NotFunc
+ | logical_entity # LogicalEntity
+ ;
+
+comparison_expr : comparison_operand comp_operator comparison_operand # ComparisonExpressionWithOperator
+ | identifier_operand IN list_entity #InExpression
+ | identifier_operand NIN list_entity #NInExpression
+ | LPAREN comparison_expr RPAREN # ComparisonExpressionParens
+ ;
+
+logical_entity : (TRUE | FALSE) # LogicalConst
+ | EXISTS LPAREN IDENTIFIER RPAREN #ExistsFunc
+ | IDENTIFIER LPAREN func_args RPAREN #LogicalFunc
+ ;
+
+list_entity : LBRACKET op_list RBRACKET
+ ;
+func_args : op_list
+ ;
+op_list : identifier_operand
+ | op_list COMMA identifier_operand
+ ;
+identifier_operand : STRING_LITERAL # StringLiteral
+ | IDENTIFIER # LogicalVariable
+ | IDENTIFIER LPAREN func_args RPAREN #StringFunc
+ ;
+
+comparison_operand : identifier_operand #IdentifierOperand
+ | logical_entity # LogicalConstComparison
+ ;
+
+comp_operator : (EQ | NEQ) # ComparisonOp
+ ;
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/deed21e6/metron-platform/metron-common/src/main/java/Predicate.tokens
----------------------------------------------------------------------
diff --git a/metron-platform/metron-common/src/main/java/Predicate.tokens b/metron-platform/metron-common/src/main/java/Predicate.tokens
new file mode 100644
index 0000000..e0726f9
--- /dev/null
+++ b/metron-platform/metron-common/src/main/java/Predicate.tokens
@@ -0,0 +1,31 @@
+AND=1
+OR=2
+NOT=3
+TRUE=4
+FALSE=5
+EQ=6
+NEQ=7
+COMMA=8
+LBRACKET=9
+RBRACKET=10
+LPAREN=11
+RPAREN=12
+IN=13
+NIN=14
+EXISTS=15
+IDENTIFIER=16
+STRING_LITERAL=17
+SEMI=18
+COMMENT=19
+WS=20
+'=='=6
+'!='=7
+','=8
+'['=9
+']'=10
+'('=11
+')'=12
+'in'=13
+'not in'=14
+'exists'=15
+';'=18
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/deed21e6/metron-platform/metron-common/src/main/java/PredicateLexer.tokens
----------------------------------------------------------------------
diff --git a/metron-platform/metron-common/src/main/java/PredicateLexer.tokens b/metron-platform/metron-common/src/main/java/PredicateLexer.tokens
new file mode 100644
index 0000000..e0726f9
--- /dev/null
+++ b/metron-platform/metron-common/src/main/java/PredicateLexer.tokens
@@ -0,0 +1,31 @@
+AND=1
+OR=2
+NOT=3
+TRUE=4
+FALSE=5
+EQ=6
+NEQ=7
+COMMA=8
+LBRACKET=9
+RBRACKET=10
+LPAREN=11
+RPAREN=12
+IN=13
+NIN=14
+EXISTS=15
+IDENTIFIER=16
+STRING_LITERAL=17
+SEMI=18
+COMMENT=19
+WS=20
+'=='=6
+'!='=7
+','=8
+'['=9
+']'=10
+'('=11
+')'=12
+'in'=13
+'not in'=14
+'exists'=15
+';'=18
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/deed21e6/metron-platform/metron-common/src/main/java/org/apache/metron/common/aggregator/Aggregator.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/aggregator/Aggregator.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/aggregator/Aggregator.java
new file mode 100644
index 0000000..d7ca31e
--- /dev/null
+++ b/metron-platform/metron-common/src/main/java/org/apache/metron/common/aggregator/Aggregator.java
@@ -0,0 +1,26 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.metron.common.aggregator;
+
+import java.util.List;
+import java.util.Map;
+
+public interface Aggregator {
+ Double aggregate(List<Number> scores, Map<String, Object> config);
+}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/deed21e6/metron-platform/metron-common/src/main/java/org/apache/metron/common/aggregator/Aggregators.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/aggregator/Aggregators.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/aggregator/Aggregators.java
new file mode 100644
index 0000000..35b7ada
--- /dev/null
+++ b/metron-platform/metron-common/src/main/java/org/apache/metron/common/aggregator/Aggregators.java
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.metron.common.aggregator;
+
+import java.util.List;
+import java.util.Map;
+import java.util.function.BinaryOperator;
+import java.util.function.Predicate;
+
+public enum Aggregators implements Aggregator {
+ MAX( (numbers, config) -> accumulate(0d, (x,y) -> Math.max(x.doubleValue(),y.doubleValue()), numbers))
+ ,MIN( (numbers, config) -> accumulate(0d, (x,y) -> Math.min(x.doubleValue(),y.doubleValue()), numbers))
+ ,SUM( (numbers, config) -> accumulate(0d, (x,y) -> x.doubleValue() + y.doubleValue(), numbers))
+ ,MEAN( (numbers, config) -> scale(SUM.aggregate(numbers, config), numbers, n -> true))
+ ,POSITIVE_MEAN( (numbers, config) -> scale(SUM.aggregate(numbers, config), numbers, n -> n.doubleValue() > 0))
+ ;
+ Aggregator aggregator;
+ Aggregators(Aggregator agg) {
+ aggregator = agg;
+ }
+ public Aggregator getAggregator() {
+ return aggregator;
+ }
+
+ private static double accumulate(double initial, BinaryOperator<Number> op, List<Number> list) {
+ if(list.isEmpty()) {
+ return 0d;
+ }
+ return list.stream()
+ .reduce(initial, op)
+ .doubleValue();
+ }
+
+ private static double scale(double numberToScale, List<Number> list, Predicate<Number> filterFunc) {
+ double scale = list.stream().filter(filterFunc).count();
+ if(scale < 1e-5) {
+ scale = 1;
+ }
+ return numberToScale / scale;
+ }
+
+ @Override
+ public Double aggregate(List<Number> scores, Map<String, Object> config) {
+ return aggregator.aggregate(scores, config);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/deed21e6/metron-platform/metron-common/src/main/java/org/apache/metron/common/bolt/ConfiguredBolt.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/bolt/ConfiguredBolt.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/bolt/ConfiguredBolt.java
index aa654fb..1364305 100644
--- a/metron-platform/metron-common/src/main/java/org/apache/metron/common/bolt/ConfiguredBolt.java
+++ b/metron-platform/metron-common/src/main/java/org/apache/metron/common/bolt/ConfiguredBolt.java
@@ -30,7 +30,7 @@ import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.log4j.Logger;
import org.apache.metron.common.Constants;
import org.apache.metron.common.configuration.Configurations;
-import org.apache.metron.common.cli.ConfigurationsUtils;
+import org.apache.metron.common.configuration.ConfigurationsUtils;
import java.io.IOException;
import java.util.Map;
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/deed21e6/metron-platform/metron-common/src/main/java/org/apache/metron/common/cli/ConfigurationManager.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/cli/ConfigurationManager.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/cli/ConfigurationManager.java
new file mode 100644
index 0000000..9685841
--- /dev/null
+++ b/metron-platform/metron-common/src/main/java/org/apache/metron/common/cli/ConfigurationManager.java
@@ -0,0 +1,200 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.metron.common.cli;
+
+import com.google.common.base.Function;
+import com.google.common.base.Joiner;
+import com.google.common.io.Files;
+import org.apache.commons.cli.*;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.metron.common.configuration.ConfigurationsUtils;
+import org.apache.metron.common.configuration.ConfigurationType;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.PrintWriter;
+import java.nio.charset.Charset;
+
+public class ConfigurationManager {
+ public enum ConfigurationOptions {
+ HELP("h", s -> new Option(s, "help", false, "Generate Help screen"))
+ ,INPUT("i", s -> OptionBuilder.isRequired(false)
+ .withLongOpt("input_dir")
+ .hasArg()
+ .withArgName("DIR")
+ .withDescription("The input directory containing the configuration files named like \"$source.json\"")
+ .create(s)
+ )
+ ,OUTPUT("o", s -> OptionBuilder.isRequired(false)
+ .hasArg()
+ .withLongOpt("output_dir")
+ .withArgName("DIR")
+ .withDescription("The output directory which will store the JSON configuration from Zookeeper")
+ .create(s)
+ )
+ ,ZK_QUORUM("z", s -> OptionBuilder.isRequired(true)
+ .hasArg()
+ .withLongOpt("zk_quorum")
+ .withArgName("host:port,[host:port]*")
+ .withDescription("Zookeeper Quorum URL (zk1:port,zk2:port,...)")
+ .create(s)
+ )
+ ,MODE("m", s -> OptionBuilder.isRequired(true)
+ .hasArg()
+ .withLongOpt("mode")
+ .withArgName("MODE")
+ .withDescription("The mode of operation: DUMP, PULL, PUSH")
+ .create(s)
+ )
+ ,FORCE("f", s -> new Option(s, "force", false, "Force operation"))
+ ;
+ Option option;
+ String shortCode;
+ ConfigurationOptions(String shortCode, Function<String, Option> optionHandler) {
+ this.shortCode = shortCode;
+ this.option = optionHandler.apply(shortCode);
+ }
+
+ public boolean has(CommandLine cli) {
+ return cli.hasOption(shortCode);
+ }
+
+ public String get(CommandLine cli) {
+ return cli.getOptionValue(shortCode);
+ }
+
+ public static CommandLine parse(CommandLineParser parser, String[] args) {
+ try {
+ CommandLine cli = parser.parse(getOptions(), args);
+ if(ConfigurationOptions.HELP.has(cli)) {
+ printHelp();
+ System.exit(0);
+ }
+ return cli;
+ } catch (ParseException e) {
+ System.err.println("Unable to parse args: " + Joiner.on(' ').join(args));
+ e.printStackTrace(System.err);
+ printHelp();
+ System.exit(-1);
+ return null;
+ }
+ }
+
+ public static void printHelp() {
+ HelpFormatter formatter = new HelpFormatter();
+ formatter.printHelp( "configuration_manager", getOptions());
+ }
+
+ public static Options getOptions() {
+ Options ret = new Options();
+ for(ConfigurationOptions o : ConfigurationOptions.values()) {
+ ret.addOption(o.option);
+ }
+ return ret;
+ }
+ }
+
+ public void dump(CuratorFramework client) throws Exception {
+ ConfigurationsUtils.dumpConfigs(System.out, client);
+ }
+
+
+
+ public void pull(CuratorFramework client, String outFileStr, final boolean force) throws Exception {
+ final File outputDir = new File(outFileStr);
+ if (!outputDir.exists()) {
+ if (!outputDir.mkdirs()) {
+ throw new IllegalStateException("Unable to make directories: " + outputDir.getAbsolutePath());
+ }
+ }
+
+ ConfigurationsUtils.visitConfigs(client, new ConfigurationsUtils.ConfigurationVisitor() {
+ @Override
+ public void visit(ConfigurationType configurationType, String name, String data) {
+ File out = getFile(outputDir, configurationType, name);
+ if (!out.exists() || force) {
+ if(!out.exists()) {
+ out.getParentFile().mkdirs();
+ }
+ try {
+ Files.write(data, out, Charset.defaultCharset());
+ } catch (IOException e) {
+ throw new RuntimeException("Sorry, something bad happened writing the config to " + out.getAbsolutePath() + ": " + e.getMessage(), e);
+ }
+ }
+ else if(out.exists() && !force) {
+ throw new IllegalStateException("Unable to overwrite existing file (" + out.getAbsolutePath() + ") without the force flag (-f or --force) being set.");
+ }
+ }
+ });
+ }
+
+ public void push(String inputDirStr, CuratorFramework client) throws Exception {
+ final File inputDir = new File(inputDirStr);
+
+ if(!inputDir.exists() || !inputDir.isDirectory()) {
+ throw new IllegalStateException("Input directory: " + inputDir + " does not exist or is not a directory.");
+ }
+ ConfigurationsUtils.uploadConfigsToZookeeper(inputDirStr, client);
+ }
+
+ public void run(CommandLine cli) throws Exception {
+ try(CuratorFramework client = ConfigurationsUtils.getClient(ConfigurationOptions.ZK_QUORUM.get(cli))) {
+ client.start();
+ run(client, cli);
+ }
+ }
+ public void run(CuratorFramework client, CommandLine cli) throws Exception {
+ final boolean force = ConfigurationOptions.FORCE.has(cli);
+ String mode = ConfigurationOptions.MODE.get(cli);
+
+ if (mode.toLowerCase().equals("push")) {
+ String inputDirStr = ConfigurationOptions.INPUT.get(cli);
+ push(inputDirStr, client);
+ }
+ else {
+
+ switch (mode.toLowerCase()) {
+
+ case "dump":
+ dump(client);
+ break;
+
+ case "pull":
+ pull(client, ConfigurationOptions.OUTPUT.get(cli), force);
+ break;
+
+ default:
+ throw new IllegalStateException("Invalid mode: " + mode + " expected DUMP, PULL or PUSH");
+ }
+ }
+ }
+
+ private static File getFile(File baseDir, ConfigurationType configurationType, String name) {
+ return new File(new File(baseDir, configurationType.getDirectory()), name + ".json");
+ }
+
+ public static void main(String... argv) throws Exception {
+ CommandLineParser parser = new PosixParser();
+ CommandLine cli = ConfigurationOptions.parse(parser, argv);
+ ConfigurationManager manager = new ConfigurationManager();
+ manager.run(cli);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/deed21e6/metron-platform/metron-common/src/main/java/org/apache/metron/common/cli/ConfigurationsUtils.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/cli/ConfigurationsUtils.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/cli/ConfigurationsUtils.java
deleted file mode 100644
index 27f4c2a..0000000
--- a/metron-platform/metron-common/src/main/java/org/apache/metron/common/cli/ConfigurationsUtils.java
+++ /dev/null
@@ -1,232 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.metron.common.cli;
-
-import org.apache.commons.cli.CommandLine;
-import org.apache.commons.cli.CommandLineParser;
-import org.apache.commons.cli.HelpFormatter;
-import org.apache.commons.cli.Option;
-import org.apache.commons.cli.Options;
-import org.apache.commons.cli.ParseException;
-import org.apache.commons.cli.PosixParser;
-import org.apache.commons.io.FilenameUtils;
-import org.apache.curator.RetryPolicy;
-import org.apache.curator.framework.CuratorFramework;
-import org.apache.curator.framework.CuratorFrameworkFactory;
-import org.apache.curator.retry.ExponentialBackoffRetry;
-import org.apache.metron.common.Constants;
-import org.apache.metron.common.configuration.Configurations;
-import org.apache.metron.common.configuration.SensorEnrichmentConfig;
-import org.apache.metron.common.utils.JSONUtils;
-import org.apache.zookeeper.KeeperException;
-
-import java.io.File;
-import java.io.IOException;
-import java.nio.file.Files;
-import java.nio.file.Paths;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-public class ConfigurationsUtils {
-
- public static CuratorFramework getClient(String zookeeperUrl) {
- RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
- return CuratorFrameworkFactory.newClient(zookeeperUrl, retryPolicy);
- }
-
- public static void writeGlobalConfigToZookeeper(Map<String, Object> globalConfig, String zookeeperUrl) throws Exception {
- writeGlobalConfigToZookeeper(JSONUtils.INSTANCE.toJSON(globalConfig), zookeeperUrl);
- }
-
- public static void writeGlobalConfigToZookeeper(byte[] globalConfig, String zookeeperUrl) throws Exception {
- CuratorFramework client = getClient(zookeeperUrl);
- client.start();
- try {
- writeGlobalConfigToZookeeper(globalConfig, client);
- }
- finally {
- client.close();
- }
- }
-
- public static void writeGlobalConfigToZookeeper(byte[] globalConfig, CuratorFramework client) throws Exception {
- writeToZookeeper(Constants.ZOOKEEPER_GLOBAL_ROOT, globalConfig, client);
- }
-
- public static void writeSensorEnrichmentConfigToZookeeper(String sensorType, SensorEnrichmentConfig sensorEnrichmentConfig, String zookeeperUrl) throws Exception {
- writeSensorEnrichmentConfigToZookeeper(sensorType, JSONUtils.INSTANCE.toJSON(sensorEnrichmentConfig), zookeeperUrl);
- }
-
- public static void writeSensorEnrichmentConfigToZookeeper(String sensorType, byte[] configData, String zookeeperUrl) throws Exception {
- CuratorFramework client = getClient(zookeeperUrl);
- client.start();
- try {
- writeSensorEnrichmentConfigToZookeeper(sensorType, configData, client);
- }
- finally {
- client.close();
- }
- }
-
- public static void writeSensorEnrichmentConfigToZookeeper(String sensorType, byte[] configData, CuratorFramework client) throws Exception {
- writeToZookeeper(Constants.ZOOKEEPER_SENSOR_ROOT + "/" + sensorType, configData, client);
- }
-
- public static void writeConfigToZookeeper(String name, Map<String, Object> config, String zookeeperUrl) throws Exception {
- writeConfigToZookeeper(name, JSONUtils.INSTANCE.toJSON(config), zookeeperUrl);
- }
-
- public static void writeConfigToZookeeper(String name, byte[] config, String zookeeperUrl) throws Exception {
- CuratorFramework client = getClient(zookeeperUrl);
- client.start();
- try {
- writeToZookeeper(Constants.ZOOKEEPER_TOPOLOGY_ROOT + "/" + name, config, client);
- }
- finally {
- client.close();
- }
- }
-
- public static void writeToZookeeper(String path, byte[] configData, CuratorFramework client) throws Exception {
- try {
- client.setData().forPath(path, configData);
- } catch (KeeperException.NoNodeException e) {
- client.create().creatingParentsIfNeeded().forPath(path, configData);
- }
- }
-
- public static void updateConfigsFromZookeeper(Configurations configurations, CuratorFramework client) throws Exception {
- configurations.updateGlobalConfig(readGlobalConfigBytesFromZookeeper(client));
- List<String> sensorTypes = client.getChildren().forPath(Constants.ZOOKEEPER_SENSOR_ROOT);
- for(String sensorType: sensorTypes) {
- configurations.updateSensorEnrichmentConfig(sensorType, readSensorEnrichmentConfigBytesFromZookeeper(sensorType, client));
- }
- }
-
- public static byte[] readGlobalConfigBytesFromZookeeper(CuratorFramework client) throws Exception {
- return readFromZookeeper(Constants.ZOOKEEPER_GLOBAL_ROOT, client);
- }
-
- public static byte[] readSensorEnrichmentConfigBytesFromZookeeper(String sensorType, CuratorFramework client) throws Exception {
- return readFromZookeeper(Constants.ZOOKEEPER_SENSOR_ROOT + "/" + sensorType, client);
- }
-
- public static byte[] readConfigBytesFromZookeeper(String name, CuratorFramework client) throws Exception {
- return readFromZookeeper(Constants.ZOOKEEPER_TOPOLOGY_ROOT + "/" + name, client);
- }
-
- public static byte[] readFromZookeeper(String path, CuratorFramework client) throws Exception {
- return client.getData().forPath(path);
- }
-
- public static void uploadConfigsToZookeeper(String rootFilePath, String zookeeperUrl) throws Exception {
- ConfigurationsUtils.writeGlobalConfigToZookeeper(readGlobalConfigFromFile(rootFilePath), zookeeperUrl);
- Map<String, byte[]> sensorEnrichmentConfigs = readSensorEnrichmentConfigsFromFile(rootFilePath);
- for(String sensorType: sensorEnrichmentConfigs.keySet()) {
- ConfigurationsUtils.writeSensorEnrichmentConfigToZookeeper(sensorType, sensorEnrichmentConfigs.get(sensorType), zookeeperUrl);
- }
- }
-
- public static byte[] readGlobalConfigFromFile(String rootFilePath) throws IOException {
- return Files.readAllBytes(Paths.get(rootFilePath, Constants.GLOBAL_CONFIG_NAME + ".json"));
- }
-
- public static Map<String, byte[]> readSensorEnrichmentConfigsFromFile(String rootPath) throws IOException {
- Map<String, byte[]> sensorEnrichmentConfigs = new HashMap<>();
- for(File file: new File(rootPath, Constants.SENSORS_CONFIG_NAME).listFiles()) {
- sensorEnrichmentConfigs.put(FilenameUtils.removeExtension(file.getName()), Files.readAllBytes(file.toPath()));
- }
- return sensorEnrichmentConfigs;
- }
-
- public static void dumpConfigs(String zookeeperUrl) throws Exception {
- CuratorFramework client = getClient(zookeeperUrl);
- client.start();
- //Output global configs
- {
- System.out.println("Global config");
- byte[] globalConfigData = client.getData().forPath(Constants.ZOOKEEPER_GLOBAL_ROOT);
- System.out.println(new String(globalConfigData));
- }
- //Output sensor specific configs
- {
- List<String> children = client.getChildren().forPath(Constants.ZOOKEEPER_SENSOR_ROOT);
- for (String child : children) {
- byte[] data = client.getData().forPath(Constants.ZOOKEEPER_SENSOR_ROOT + "/" + child);
- System.out.println("Config for source " + child);
- System.out.println(new String(data));
- System.out.println();
- }
- }
- client.close();
- }
-
- public static void main(String[] args) {
-
- Options options = new Options();
- {
- Option o = new Option("h", "help", false, "This screen");
- o.setRequired(false);
- options.addOption(o);
- }
- {
- Option o = new Option("p", "config_files", true, "Path to the source config files. Must be named like \"$source\".json");
- o.setArgName("DIR_NAME");
- o.setRequired(false);
- options.addOption(o);
- }
- {
- Option o = new Option("z", "zk", true, "Zookeeper Quroum URL (zk1:2181,zk2:2181,...");
- o.setArgName("ZK_QUORUM");
- o.setRequired(true);
- options.addOption(o);
- }
-
- try {
- CommandLineParser parser = new PosixParser();
- CommandLine cmd = null;
- try {
- cmd = parser.parse(options, args);
- } catch (ParseException pe) {
- pe.printStackTrace();
- final HelpFormatter usageFormatter = new HelpFormatter();
- usageFormatter.printHelp("ConfigurationsUtils", null, options, null, true);
- System.exit(-1);
- }
- if (cmd.hasOption("h")) {
- final HelpFormatter usageFormatter = new HelpFormatter();
- usageFormatter.printHelp("ConfigurationsUtils", null, options, null, true);
- System.exit(0);
- }
-
- String zkQuorum = cmd.getOptionValue("z");
- if (cmd.hasOption("p")) {
- String sourcePath = cmd.getOptionValue("p");
- uploadConfigsToZookeeper(sourcePath, zkQuorum);
- }
-
- ConfigurationsUtils.dumpConfigs(zkQuorum);
-
- } catch (Exception e) {
- e.printStackTrace();
- System.exit(-1);
- }
-
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/deed21e6/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/Configuration.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/Configuration.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/Configuration.java
index 1ccf47b..e526ee4 100644
--- a/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/Configuration.java
+++ b/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/Configuration.java
@@ -18,7 +18,6 @@
package org.apache.metron.common.configuration;
import org.apache.curator.framework.CuratorFramework;
-import org.apache.metron.common.cli.ConfigurationsUtils;
import java.nio.file.Path;
import java.util.Map;
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/deed21e6/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/ConfigurationType.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/ConfigurationType.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/ConfigurationType.java
new file mode 100644
index 0000000..2b9f6cf
--- /dev/null
+++ b/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/ConfigurationType.java
@@ -0,0 +1,75 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.metron.common.configuration;
+
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.google.common.base.Function;
+import org.apache.metron.common.Constants;
+import org.apache.metron.common.configuration.enrichment.SensorEnrichmentConfig;
+import org.apache.metron.common.utils.JSONUtils;
+
+import java.io.IOException;
+import java.util.Map;
+
+public enum ConfigurationType implements Function<String, Object> {
+ GLOBAL("."
+ ,Constants.ZOOKEEPER_GLOBAL_ROOT
+ , s -> {
+ try {
+ return JSONUtils.INSTANCE.load(s, new TypeReference<Map<String, Object>>() {
+ });
+ } catch (IOException e) {
+ throw new RuntimeException("Unable to load " + s, e);
+ }
+ })
+ , SENSOR(Constants.SENSORS_CONFIG_NAME
+ ,Constants.ZOOKEEPER_SENSOR_ROOT
+ , s -> {
+ try {
+ return JSONUtils.INSTANCE.load(s, SensorEnrichmentConfig.class);
+ } catch (IOException e) {
+ throw new RuntimeException("Unable to load " + s, e);
+ }
+ });
+ String directory;
+ String zookeeperRoot;
+ Function<String,?> deserializer;
+ ConfigurationType(String directory, String zookeeperRoot, Function<String, ?> deserializer) {
+ this.directory = directory;
+ this.zookeeperRoot = zookeeperRoot;
+ this.deserializer = deserializer;
+ }
+
+ public String getDirectory() {
+ return directory;
+ }
+
+ public Object deserialize(String s)
+ {
+ return deserializer.apply(s);
+ }
+ @Override
+ public Object apply(String s) {
+ return deserialize(s);
+ }
+
+ public String getZookeeperRoot() {
+ return zookeeperRoot;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/deed21e6/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/Configurations.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/Configurations.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/Configurations.java
index 6aaa2b4..a152d40 100644
--- a/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/Configurations.java
+++ b/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/Configurations.java
@@ -19,6 +19,7 @@ package org.apache.metron.common.configuration;
import com.fasterxml.jackson.core.type.TypeReference;
import org.apache.log4j.Logger;
+import org.apache.metron.common.configuration.enrichment.SensorEnrichmentConfig;
import org.apache.metron.common.utils.JSONUtils;
import java.io.ByteArrayInputStream;
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/deed21e6/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/ConfigurationsUtils.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/ConfigurationsUtils.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/ConfigurationsUtils.java
new file mode 100644
index 0000000..1aa2ca8
--- /dev/null
+++ b/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/ConfigurationsUtils.java
@@ -0,0 +1,195 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.common.configuration;
+
+import org.apache.commons.io.FilenameUtils;
+import org.apache.curator.RetryPolicy;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.retry.ExponentialBackoffRetry;
+import org.apache.metron.common.Constants;
+import org.apache.metron.common.configuration.enrichment.SensorEnrichmentConfig;
+import org.apache.metron.common.utils.JSONUtils;
+import org.apache.zookeeper.KeeperException;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.PrintStream;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class ConfigurationsUtils {
+
+ public static CuratorFramework getClient(String zookeeperUrl) {
+ RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
+ return CuratorFrameworkFactory.newClient(zookeeperUrl, retryPolicy);
+ }
+
+ public static void writeGlobalConfigToZookeeper(Map<String, Object> globalConfig, String zookeeperUrl) throws Exception {
+ try(CuratorFramework client = getClient(zookeeperUrl)) {
+ client.start();
+ writeGlobalConfigToZookeeper(globalConfig, client);
+ }
+ }
+ public static void writeGlobalConfigToZookeeper(Map<String, Object> globalConfig, CuratorFramework client) throws Exception {
+ writeGlobalConfigToZookeeper(JSONUtils.INSTANCE.toJSON(globalConfig), client);
+ }
+
+ public static void writeGlobalConfigToZookeeper(byte[] globalConfig, String zookeeperUrl) throws Exception {
+ CuratorFramework client = getClient(zookeeperUrl);
+ client.start();
+ try {
+ writeGlobalConfigToZookeeper(globalConfig, client);
+ }
+ finally {
+ client.close();
+ }
+ }
+
+ public static void writeGlobalConfigToZookeeper(byte[] globalConfig, CuratorFramework client) throws Exception {
+ ConfigurationType.GLOBAL.deserialize(new String(globalConfig));
+ writeToZookeeper(ConfigurationType.GLOBAL.getZookeeperRoot(), globalConfig, client);
+ }
+
+ public static void writeSensorEnrichmentConfigToZookeeper(String sensorType, SensorEnrichmentConfig sensorEnrichmentConfig, String zookeeperUrl) throws Exception {
+ writeSensorEnrichmentConfigToZookeeper(sensorType, JSONUtils.INSTANCE.toJSON(sensorEnrichmentConfig), zookeeperUrl);
+ }
+
+ public static void writeSensorEnrichmentConfigToZookeeper(String sensorType, byte[] configData, String zookeeperUrl) throws Exception {
+ CuratorFramework client = getClient(zookeeperUrl);
+ client.start();
+ try {
+ writeSensorEnrichmentConfigToZookeeper(sensorType, configData, client);
+ }
+ finally {
+ client.close();
+ }
+ }
+
+ public static void writeSensorEnrichmentConfigToZookeeper(String sensorType, byte[] configData, CuratorFramework client) throws Exception {
+ ConfigurationType.SENSOR.deserialize(new String(configData));
+ writeToZookeeper(ConfigurationType.SENSOR.getZookeeperRoot()+ "/" + sensorType, configData, client);
+ }
+
+ public static void writeConfigToZookeeper(String name, Map<String, Object> config, String zookeeperUrl) throws Exception {
+ writeConfigToZookeeper(name, JSONUtils.INSTANCE.toJSON(config), zookeeperUrl);
+ }
+
+ public static void writeConfigToZookeeper(String name, byte[] config, String zookeeperUrl) throws Exception {
+ CuratorFramework client = getClient(zookeeperUrl);
+ client.start();
+ try {
+ writeToZookeeper(Constants.ZOOKEEPER_TOPOLOGY_ROOT + "/" + name, config, client);
+ }
+ finally {
+ client.close();
+ }
+ }
+
+ public static void writeToZookeeper(String path, byte[] configData, CuratorFramework client) throws Exception {
+ try {
+ client.setData().forPath(path, configData);
+ } catch (KeeperException.NoNodeException e) {
+ client.create().creatingParentsIfNeeded().forPath(path, configData);
+ }
+ }
+
+ public static void updateConfigsFromZookeeper(Configurations configurations, CuratorFramework client) throws Exception {
+ configurations.updateGlobalConfig(readGlobalConfigBytesFromZookeeper(client));
+ List<String> sensorTypes = client.getChildren().forPath(Constants.ZOOKEEPER_SENSOR_ROOT);
+ for(String sensorType: sensorTypes) {
+ configurations.updateSensorEnrichmentConfig(sensorType, readSensorEnrichmentConfigBytesFromZookeeper(sensorType, client));
+ }
+ }
+
+ public static byte[] readGlobalConfigBytesFromZookeeper(CuratorFramework client) throws Exception {
+ return readFromZookeeper(Constants.ZOOKEEPER_GLOBAL_ROOT, client);
+ }
+
+ public static byte[] readSensorEnrichmentConfigBytesFromZookeeper(String sensorType, CuratorFramework client) throws Exception {
+ return readFromZookeeper(Constants.ZOOKEEPER_SENSOR_ROOT + "/" + sensorType, client);
+ }
+
+ public static byte[] readConfigBytesFromZookeeper(String name, CuratorFramework client) throws Exception {
+ return readFromZookeeper(Constants.ZOOKEEPER_TOPOLOGY_ROOT + "/" + name, client);
+ }
+
+ public static byte[] readFromZookeeper(String path, CuratorFramework client) throws Exception {
+ return client.getData().forPath(path);
+ }
+
+ public static void uploadConfigsToZookeeper(String rootFilePath, CuratorFramework client) throws Exception {
+ ConfigurationsUtils.writeGlobalConfigToZookeeper(readGlobalConfigFromFile(rootFilePath), client);
+ Map<String, byte[]> sensorEnrichmentConfigs = readSensorEnrichmentConfigsFromFile(rootFilePath);
+ for(String sensorType: sensorEnrichmentConfigs.keySet()) {
+ ConfigurationsUtils.writeSensorEnrichmentConfigToZookeeper(sensorType, sensorEnrichmentConfigs.get(sensorType), client);
+ }
+ }
+
+ public static void uploadConfigsToZookeeper(String rootFilePath, String zookeeperUrl) throws Exception {
+ try(CuratorFramework client = getClient(zookeeperUrl)) {
+ client.start();
+ uploadConfigsToZookeeper(rootFilePath, client);
+ }
+ }
+
+ public static byte[] readGlobalConfigFromFile(String rootFilePath) throws IOException {
+ return Files.readAllBytes(Paths.get(rootFilePath, Constants.GLOBAL_CONFIG_NAME + ".json"));
+ }
+
+ public static Map<String, byte[]> readSensorEnrichmentConfigsFromFile(String rootPath) throws IOException {
+ Map<String, byte[]> sensorEnrichmentConfigs = new HashMap<>();
+ for(File file: new File(rootPath, Constants.SENSORS_CONFIG_NAME).listFiles()) {
+ if(file.getName().endsWith(".json")) {
+ sensorEnrichmentConfigs.put(FilenameUtils.removeExtension(file.getName()), Files.readAllBytes(file.toPath()));
+ }
+ }
+ return sensorEnrichmentConfigs;
+ }
+
+ public interface ConfigurationVisitor{
+ void visit(ConfigurationType configurationType, String name, String data);
+ }
+ public static void visitConfigs(CuratorFramework client, ConfigurationVisitor callback) throws Exception {
+ //Output global configs
+ {
+ ConfigurationType configType = ConfigurationType.GLOBAL;
+ byte[] globalConfigData = client.getData().forPath(configType.getZookeeperRoot());
+ callback.visit(configType, "global", new String(globalConfigData));
+ }
+ //Output sensor specific configs
+ {
+ ConfigurationType configType = ConfigurationType.SENSOR;
+ List<String> children = client.getChildren().forPath(configType.getZookeeperRoot());
+ for (String child : children) {
+ byte[] data = client.getData().forPath(configType.getZookeeperRoot() + "/" + child);
+ callback.visit(configType, child, new String(data));
+ }
+ }
+ }
+ public static void dumpConfigs(PrintStream out, CuratorFramework client) throws Exception {
+ ConfigurationsUtils.visitConfigs(client, (type, name, data) -> {
+ type.deserialize(data);
+ out.println(type + " Config: " + name + "\n" + data);
+ });
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/deed21e6/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/EnrichmentConfig.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/EnrichmentConfig.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/EnrichmentConfig.java
deleted file mode 100644
index bcc91fa..0000000
--- a/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/EnrichmentConfig.java
+++ /dev/null
@@ -1,213 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.metron.common.configuration;
-
-import com.google.common.base.Joiner;
-import org.apache.curator.framework.CuratorFramework;
-import org.apache.metron.common.Constants;
-import org.apache.metron.common.cli.ConfigurationsUtils;
-import org.apache.zookeeper.KeeperException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.*;
-
-public class EnrichmentConfig {
- public static enum Type {
- THREAT_INTEL
- ,ENRICHMENT
- }
-
- protected static final Logger _LOG = LoggerFactory.getLogger(EnrichmentConfig.class);
- public static class FieldList {
- Type type;
- Map<String, List<String>> fieldToEnrichmentTypes;
-
- public Type getType() {
- return type;
- }
-
- public void setType(Type type) {
- this.type = type;
- }
-
- public Map<String, List<String>> getFieldToEnrichmentTypes() {
- return fieldToEnrichmentTypes;
- }
-
- public void setFieldToEnrichmentTypes(Map<String, List<String>> fieldToEnrichmentTypes) {
- this.fieldToEnrichmentTypes = fieldToEnrichmentTypes;
- }
- }
- public String zkQuorum;
- public Map<String, FieldList> sensorToFieldList;
-
- public String getZkQuorum() {
- return zkQuorum;
- }
-
- public void setZkQuorum(String zkQuorum) {
- this.zkQuorum = zkQuorum;
- }
-
- public Map<String, FieldList> getSensorToFieldList() {
- return sensorToFieldList;
- }
-
- public void setSensorToFieldList(Map<String, FieldList> sensorToFieldList) {
- this.sensorToFieldList = sensorToFieldList;
- }
-
- public void updateSensorConfigs( ) throws Exception {
- CuratorFramework client = ConfigurationsUtils.getClient(getZkQuorum());
- try {
- client.start();
- updateSensorConfigs(new ZKSourceConfigHandler(client), sensorToFieldList);
- }
- finally {
- client.close();
- }
- }
-
- public static interface SourceConfigHandler {
- SensorEnrichmentConfig readConfig(String sensor) throws Exception;
- void persistConfig(String sensor, SensorEnrichmentConfig config) throws Exception;
- }
-
- public static class ZKSourceConfigHandler implements SourceConfigHandler {
- CuratorFramework client;
- public ZKSourceConfigHandler(CuratorFramework client) {
- this.client = client;
- }
- @Override
- public SensorEnrichmentConfig readConfig(String sensor) throws Exception {
- SensorEnrichmentConfig sensorEnrichmentConfig = new SensorEnrichmentConfig();
- try {
- sensorEnrichmentConfig = SensorEnrichmentConfig.fromBytes(ConfigurationsUtils.readSensorEnrichmentConfigBytesFromZookeeper(sensor, client));
- }catch (KeeperException.NoNodeException e) {
- sensorEnrichmentConfig.setIndex(sensor);
- sensorEnrichmentConfig.setBatchSize(1);
- }
- return sensorEnrichmentConfig;
- }
-
- @Override
- public void persistConfig(String sensor, SensorEnrichmentConfig config) throws Exception {
- ConfigurationsUtils.writeSensorEnrichmentConfigToZookeeper(sensor, config.toJSON().getBytes(), client);
- }
- }
-
- public static void updateSensorConfigs( SourceConfigHandler scHandler
- , Map<String, FieldList> sensorToFieldList
- ) throws Exception
- {
- Map<String, SensorEnrichmentConfig> sourceConfigsChanged = new HashMap<>();
- for (Map.Entry<String, FieldList> kv : sensorToFieldList.entrySet()) {
- SensorEnrichmentConfig config = sourceConfigsChanged.get(kv.getKey());
- if(config == null) {
- config = scHandler.readConfig(kv.getKey());
- if(_LOG.isDebugEnabled()) {
- _LOG.debug(config.toJSON());
- }
- }
- Map<String, List<String> > fieldMap = null;
- Map<String, List<String> > fieldToTypeMap = null;
- List<String> fieldList = null;
- if(kv.getValue().type == Type.THREAT_INTEL) {
- fieldMap = config.getThreatIntelFieldMap();
- if(fieldMap!= null) {
- fieldList = fieldMap.get(Constants.SIMPLE_HBASE_THREAT_INTEL);
- } else {
- fieldMap = new HashMap<>();
- }
- if(fieldList == null) {
- fieldList = new ArrayList<>();
- fieldMap.put(Constants.SIMPLE_HBASE_THREAT_INTEL, fieldList);
- }
- fieldToTypeMap = config.getFieldToThreatIntelTypeMap();
- if(fieldToTypeMap == null) {
- fieldToTypeMap = new HashMap<>();
- config.setFieldToThreatIntelTypeMap(fieldToTypeMap);
- }
- }
- else if(kv.getValue().type == Type.ENRICHMENT) {
- fieldMap = config.getEnrichmentFieldMap();
- if(fieldMap!= null) {
- fieldList = fieldMap.get(Constants.SIMPLE_HBASE_ENRICHMENT);
- } else {
- fieldMap = new HashMap<>();
- }
- if(fieldList == null) {
- fieldList = new ArrayList<>();
- fieldMap.put(Constants.SIMPLE_HBASE_ENRICHMENT, fieldList);
- }
- fieldToTypeMap = config.getFieldToEnrichmentTypeMap();
- if(fieldToTypeMap == null) {
- fieldToTypeMap = new HashMap<>();
- config.setFieldToEnrichmentTypeMap(fieldToTypeMap);
- }
- }
- if(fieldToTypeMap == null || fieldMap == null) {
- _LOG.debug("fieldToTypeMap is null or fieldMap is null, so skipping");
- continue;
- }
- //Add the additional fields to the field list associated with the hbase adapter
- {
- HashSet<String> fieldSet = new HashSet<>(fieldList);
- List<String> additionalFields = new ArrayList<>();
- for (String field : kv.getValue().getFieldToEnrichmentTypes().keySet()) {
- if (!fieldSet.contains(field)) {
- additionalFields.add(field);
- }
- }
- //adding only the ones that we don't already have to the field list
- if (additionalFields.size() > 0) {
- _LOG.debug("Adding additional fields: " + Joiner.on(',').join(additionalFields));
- fieldList.addAll(additionalFields);
- sourceConfigsChanged.put(kv.getKey(), config);
- }
- }
- //Add the additional enrichment types to the mapping between the fields
- {
- for(Map.Entry<String, List<String>> fieldToType : kv.getValue().getFieldToEnrichmentTypes().entrySet()) {
- String field = fieldToType.getKey();
- final HashSet<String> types = new HashSet<>(fieldToType.getValue());
- int sizeBefore = 0;
- if(fieldToTypeMap.containsKey(field)) {
- List<String> typeList = fieldToTypeMap.get(field);
- sizeBefore = new HashSet<>(typeList).size();
- types.addAll(typeList);
- }
- int sizeAfter = types.size();
- boolean changed = sizeBefore != sizeAfter;
- if(changed) {
- fieldToTypeMap.put(field, new ArrayList<String>() {{
- addAll(types);
- }});
- sourceConfigsChanged.put(kv.getKey(), config);
- }
- }
- }
- }
- for(Map.Entry<String, SensorEnrichmentConfig> kv : sourceConfigsChanged.entrySet()) {
- scHandler.persistConfig(kv.getKey(), kv.getValue());
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/deed21e6/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/SensorEnrichmentConfig.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/SensorEnrichmentConfig.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/SensorEnrichmentConfig.java
deleted file mode 100644
index 6a45ec9..0000000
--- a/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/SensorEnrichmentConfig.java
+++ /dev/null
@@ -1,129 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.metron.common.configuration;
-
-import com.fasterxml.jackson.core.JsonProcessingException;
-import org.apache.metron.common.utils.JSONUtils;
-
-import java.io.IOException;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-public class SensorEnrichmentConfig {
-
- private String index;
- private Map<String, List<String>> enrichmentFieldMap = new HashMap<>();
- private Map<String, List<String>> threatIntelFieldMap = new HashMap<>();
- private Map<String, List<String>> fieldToEnrichmentTypeMap = new HashMap<>();
- private Map<String, List<String>> fieldToThreatIntelTypeMap = new HashMap<>();
- private int batchSize;
-
- public String getIndex() {
- return index;
- }
-
- public void setIndex(String index) {
- this.index = index;
- }
-
- public Map<String, List<String>> getEnrichmentFieldMap() {
- return enrichmentFieldMap;
- }
-
- public void setEnrichmentFieldMap(Map<String, List<String>> enrichmentFieldMap) {
- this.enrichmentFieldMap = enrichmentFieldMap;
- }
-
- public Map<String, List<String>> getThreatIntelFieldMap() {
- return threatIntelFieldMap;
- }
-
- public void setThreatIntelFieldMap(Map<String, List<String>> threatIntelFieldMap) {
- this.threatIntelFieldMap = threatIntelFieldMap;
- }
-
- public Map<String, List<String>> getFieldToEnrichmentTypeMap() {
- return fieldToEnrichmentTypeMap;
- }
-
- public Map<String, List<String>> getFieldToThreatIntelTypeMap() {
- return fieldToThreatIntelTypeMap;
- }
- public void setFieldToEnrichmentTypeMap(Map<String, List<String>> fieldToEnrichmentTypeMap) {
- this.fieldToEnrichmentTypeMap = fieldToEnrichmentTypeMap;
- }
-
- public void setFieldToThreatIntelTypeMap(Map<String, List<String>> fieldToThreatIntelTypeMap) {
- this.fieldToThreatIntelTypeMap= fieldToThreatIntelTypeMap;
- }
- public int getBatchSize() {
- return batchSize;
- }
-
- public void setBatchSize(int batchSize) {
- this.batchSize = batchSize;
- }
-
- public static SensorEnrichmentConfig fromBytes(byte[] config) throws IOException {
- return JSONUtils.INSTANCE.load(new String(config), SensorEnrichmentConfig.class);
- }
-
- public String toJSON() throws JsonProcessingException {
- return JSONUtils.INSTANCE.toJSON(this, true);
- }
-
- @Override
- public boolean equals(Object o) {
- if (this == o) return true;
- if (o == null || getClass() != o.getClass()) return false;
-
- SensorEnrichmentConfig that = (SensorEnrichmentConfig) o;
-
- if (getBatchSize() != that.getBatchSize()) return false;
- if (getIndex() != null ? !getIndex().equals(that.getIndex()) : that.getIndex() != null) return false;
- if (getEnrichmentFieldMap() != null ? !getEnrichmentFieldMap().equals(that.getEnrichmentFieldMap()) : that.getEnrichmentFieldMap() != null)
- return false;
- if (getThreatIntelFieldMap() != null ? !getThreatIntelFieldMap().equals(that.getThreatIntelFieldMap()) : that.getThreatIntelFieldMap() != null)
- return false;
- if (getFieldToEnrichmentTypeMap() != null ? !getFieldToEnrichmentTypeMap().equals(that.getFieldToEnrichmentTypeMap()) : that.getFieldToEnrichmentTypeMap() != null)
- return false;
- return getFieldToThreatIntelTypeMap() != null ? getFieldToThreatIntelTypeMap().equals(that.getFieldToThreatIntelTypeMap()) : that.getFieldToThreatIntelTypeMap() == null;
-
- }
-
- @Override
- public String toString() {
- return "{index=" + index + ", batchSize=" + batchSize +
- ", enrichmentFieldMap=" + enrichmentFieldMap +
- ", threatIntelFieldMap" + threatIntelFieldMap +
- ", fieldToEnrichmentTypeMap=" + fieldToEnrichmentTypeMap +
- ", fieldToThreatIntelTypeMap=" + fieldToThreatIntelTypeMap + "}";
- }
-
- @Override
- public int hashCode() {
- int result = getIndex() != null ? getIndex().hashCode() : 0;
- result = 31 * result + (getEnrichmentFieldMap() != null ? getEnrichmentFieldMap().hashCode() : 0);
- result = 31 * result + (getThreatIntelFieldMap() != null ? getThreatIntelFieldMap().hashCode() : 0);
- result = 31 * result + (getFieldToEnrichmentTypeMap() != null ? getFieldToEnrichmentTypeMap().hashCode() : 0);
- result = 31 * result + (getFieldToThreatIntelTypeMap() != null ? getFieldToThreatIntelTypeMap().hashCode() : 0);
- result = 31 * result + getBatchSize();
- return result;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/deed21e6/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/enrichment/EnrichmentConfig.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/enrichment/EnrichmentConfig.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/enrichment/EnrichmentConfig.java
new file mode 100644
index 0000000..af6c148
--- /dev/null
+++ b/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/enrichment/EnrichmentConfig.java
@@ -0,0 +1,71 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.metron.common.configuration.enrichment;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class EnrichmentConfig {
+ private Map<String, List<String>> fieldMap = new HashMap<>();
+ private Map<String, List<String>> fieldToTypeMap = new HashMap<>();
+
+ public Map<String, List<String>> getFieldMap() {
+ return fieldMap;
+ }
+
+ public void setFieldMap(Map<String, List<String>> fieldMap) {
+ this.fieldMap = fieldMap;
+ }
+
+ public Map<String, List<String>> getFieldToTypeMap() {
+ return fieldToTypeMap;
+ }
+
+ public void setFieldToTypeMap(Map<String, List<String>> fieldToTypeMap) {
+ this.fieldToTypeMap = fieldToTypeMap;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+
+ EnrichmentConfig that = (EnrichmentConfig) o;
+
+ if (getFieldMap() != null ? !getFieldMap().equals(that.getFieldMap()) : that.getFieldMap() != null) return false;
+ return getFieldToTypeMap() != null ? getFieldToTypeMap().equals(that.getFieldToTypeMap()) : that.getFieldToTypeMap() == null;
+
+ }
+
+ @Override
+ public int hashCode() {
+ int result = getFieldMap() != null ? getFieldMap().hashCode() : 0;
+ result = 31 * result + (getFieldToTypeMap() != null ? getFieldToTypeMap().hashCode() : 0);
+ return result;
+ }
+
+ @Override
+ public String toString() {
+ return "EnrichmentConfig{" +
+ "fieldMap=" + fieldMap +
+ ", fieldToTypeMap=" + fieldToTypeMap +
+ '}';
+ }
+}