You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@metron.apache.org by rm...@apache.org on 2017/03/06 21:25:50 UTC
[3/3] incubator-metron git commit: METRON-694 Index Errors from
Topologies (merrimanr) closes apache/incubator-metron#453
METRON-694 Index Errors from Topologies (merrimanr) closes apache/incubator-metron#453
Project: http://git-wip-us.apache.org/repos/asf/incubator-metron/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-metron/commit/134a2331
Tree: http://git-wip-us.apache.org/repos/asf/incubator-metron/tree/134a2331
Diff: http://git-wip-us.apache.org/repos/asf/incubator-metron/diff/134a2331
Branch: refs/heads/master
Commit: 134a23311c51693381fb52e6a90026420ff61b63
Parents: 27ee490
Author: merrimanr <me...@gmail.com>
Authored: Mon Mar 6 15:23:28 2017 -0600
Committer: YOUR NAME as In Apache <YO...@apache.org>
Committed: Mon Mar 6 15:23:28 2017 -0600
----------------------------------------------------------------------
.../src/main/flux/profiler/remote.yaml | 2 -
.../METRON/CURRENT/role_command_order.json | 3 +-
.../METRON/CURRENT/configuration/metron-env.xml | 5 +-
.../CURRENT/package/files/error_index.template | 53 +++++
.../package/scripts/enrichment_commands.py | 6 +-
.../CURRENT/package/scripts/indexing_master.py | 10 +
.../package/scripts/params/params_linux.py | 3 +-
.../package/scripts/params/status_params.py | 2 -
.../CURRENT/package/scripts/parser_commands.py | 12 -
.../docker/rpm-docker/SPECS/metron.spec | 1 +
.../files/es_templates/error_index.template | 53 +++++
.../templates/config/elasticsearch.global.json | 3 +-
metron-platform/metron-common/README.md | 37 ++++
metron-platform/metron-common/pom.xml | 8 +-
.../org/apache/metron/common/Constants.java | 52 ++++-
.../common/configuration/FieldValidator.java | 2 +
.../apache/metron/common/error/MetronError.java | 218 +++++++++++++++++++
.../common/message/BytesFromPosition.java | 36 +++
.../metron/common/message/JSONFromField.java | 37 ++++
.../metron/common/message/JSONFromPosition.java | 49 +++++
.../common/message/MessageGetStrategy.java | 25 +++
.../metron/common/message/MessageGetters.java | 68 ++++++
.../metron/common/message/ObjectFromField.java | 36 +++
.../apache/metron/common/utils/ErrorUtils.java | 87 +-------
.../apache/metron/common/utils/HashUtils.java | 44 ++++
.../metron/common/error/MetronErrorTest.java | 109 ++++++++++
.../common/message/MessageGettersTest.java | 134 ++++++++++++
.../metron/common/utils/ErrorUtilsTest.java | 21 ++
.../metron/common/utils/HashUtilsTest.java | 55 +++++
.../src/main/config/elasticsearch.properties | 2 +-
.../ElasticsearchIndexingIntegrationTest.java | 5 +-
.../src/main/config/enrichment.properties | 12 +-
.../enrichment/bolt/GenericEnrichmentBolt.java | 16 +-
.../apache/metron/enrichment/bolt/JoinBolt.java | 43 ++--
.../bolt/BulkMessageWriterBoltTest.java | 8 +-
.../bolt/GenericEnrichmentBoltTest.java | 44 +++-
.../metron/enrichment/bolt/JoinBoltTest.java | 23 +-
.../integration/EnrichmentIntegrationTest.java | 76 ++++---
metron-platform/metron-indexing/README.md | 2 +-
.../main/config/zookeeper/indexing/error.json | 17 ++
.../src/main/flux/indexing/remote.yaml | 5 +-
.../integration/IndexingIntegrationTest.java | 6 +-
.../metron/integration/ProcessorResult.java | 23 +-
.../integration/processors/KafkaMessageSet.java | 7 +-
.../integration/processors/KafkaProcessor.java | 16 +-
metron-platform/metron-parsers/README.md | 8 +-
.../apache/metron/parsers/bolt/ParserBolt.java | 69 ++++--
.../apache/metron/parsers/bolt/WriterBolt.java | 34 ++-
.../metron/parsers/bolt/WriterHandler.java | 8 +-
.../parsers/topology/ParserTopologyBuilder.java | 39 +---
.../parsers/topology/ParserTopologyCLI.java | 2 -
.../metron/parsers/bolt/ParserBoltTest.java | 71 ++++++
.../metron/parsers/bolt/WriterBoltTest.java | 36 ++-
.../integration/ParserIntegrationTest.java | 9 +-
.../components/ParserTopologyComponent.java | 9 +-
.../integration/WriterBoltIntegrationTest.java | 43 ++--
.../metron-solr/src/main/config/solr.properties | 2 +-
.../SolrIndexingIntegrationTest.java | 9 +-
.../test/error/MetronErrorJSONMatcher.java | 42 ++++
.../metron/writer/BulkWriterComponent.java | 48 ++--
.../writer/bolt/BulkMessageWriterBolt.java | 43 ++--
.../metron/writer/message/MessageGetter.java | 26 ---
.../metron/writer/message/MessageGetters.java | 37 ----
.../writer/message/NamedMessageGetter.java | 34 ---
.../metron/writer/message/RawMessageGetter.java | 50 -----
.../metron/writer/BulkWriterComponentTest.java | 197 +++++++++++++++++
66 files changed, 1752 insertions(+), 540 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/134a2331/metron-analytics/metron-profiler/src/main/flux/profiler/remote.yaml
----------------------------------------------------------------------
diff --git a/metron-analytics/metron-profiler/src/main/flux/profiler/remote.yaml b/metron-analytics/metron-profiler/src/main/flux/profiler/remote.yaml
index 0a26b73..f97b97a 100644
--- a/metron-analytics/metron-profiler/src/main/flux/profiler/remote.yaml
+++ b/metron-analytics/metron-profiler/src/main/flux/profiler/remote.yaml
@@ -127,8 +127,6 @@ bolts:
configMethods:
- name: "withMessageWriter"
args: [ref: "kafkaWriter"]
- - name: "withMessageGetter"
- args: ["NAMED"]
streams:
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/134a2331/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/addon-services/METRON/CURRENT/role_command_order.json
----------------------------------------------------------------------
diff --git a/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/addon-services/METRON/CURRENT/role_command_order.json b/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/addon-services/METRON/CURRENT/role_command_order.json
index e08f401..0b04f12 100755
--- a/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/addon-services/METRON/CURRENT/role_command_order.json
+++ b/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/addon-services/METRON/CURRENT/role_command_order.json
@@ -4,8 +4,9 @@
"general_deps" : {
"_comment" : "dependencies for all cases",
"METRON_INDEXING-INSTALL" : ["METRON_PARSERS-INSTALL"],
+ "METRON_ENRICHMENT-INSTALL" : ["METRON_INDEXING-INSTALL"],
"METRON_PARSERS-START" : ["NAMENODE-START", "ZOOKEEPER_SERVER-START", "KAFKA_BROKER-START", "STORM_REST_API-START"],
- "METRON_ENRICHMENT_MASTER-START" : ["NAMENODE-START", "ZOOKEEPER_SERVER-START", "KAFKA_BROKER-START", "STORM_REST_API-START", "HBASE_MASTER-START", "HBASE_REGIONSERVER-START"],
+ "METRON_ENRICHMENT_MASTER-START" : ["NAMENODE-START", "ZOOKEEPER_SERVER-START", "KAFKA_BROKER-START", "STORM_REST_API-START", "HBASE_MASTER-START", "HBASE_REGIONSERVER-START", "METRON_INDEXING-START"],
"METRON_ENRICHMENT_SERVICE_CHECK-SERVICE_CHECK" : ["METRON_ENRICHMENT_MASTER-START"],
"METRON_INDEXING-START" : ["NAMENODE-START", "ZOOKEEPER_SERVER-START", "KAFKA_BROKER-START", "STORM_REST_API-START","METRON_PARSERS-START"],
"METRON_SERVICE_CHECK-SERVICE_CHECK" : ["METRON_PARSERS-START","METRON_INDEXING-START"]
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/134a2331/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/configuration/metron-env.xml
----------------------------------------------------------------------
diff --git a/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/configuration/metron-env.xml b/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/configuration/metron-env.xml
index eeb2037..cbff4a9 100644
--- a/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/configuration/metron-env.xml
+++ b/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/configuration/metron-env.xml
@@ -150,7 +150,8 @@
{
"es.clustername": "{{ es_cluster_name }}",
"es.ip": "{{ es_url }}",
-"es.date.format": "yyyy.MM.dd.HH"
+"es.date.format": "yyyy.MM.dd.HH",
+"parser.error.topic": "indexing"
}
</value>
<value-attributes>
@@ -171,7 +172,7 @@ kafka.broker={{ kafka_brokers }}
kafka.start=WHERE_I_LEFT_OFF
##### Indexing #####
index.input.topic=indexing
-index.error.topic=indexing_error
+index.error.topic=indexing
writer.class.name=org.apache.metron.elasticsearch.writer.ElasticsearchWriter
##### Metrics #####
#reporters
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/134a2331/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/files/error_index.template
----------------------------------------------------------------------
diff --git a/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/files/error_index.template b/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/files/error_index.template
new file mode 100644
index 0000000..070c90f
--- /dev/null
+++ b/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/files/error_index.template
@@ -0,0 +1,53 @@
+{
+ "template": "error_index*",
+ "mappings": {
+ "error_doc": {
+ "_timestamp": {
+ "enabled": true
+ },
+ "properties": {
+ "exception": {
+ "type": "string",
+ "index": "not_analyzed"
+ },
+ "hostname": {
+ "type": "string",
+ "index": "not_analyzed"
+ },
+ "stack": {
+ "type": "string",
+ "index": "not_analyzed"
+ },
+ "time": {
+ "type": "date",
+ "format": "epoch_millis"
+ },
+ "message": {
+ "type": "string",
+ "index": "not_analyzed"
+ },
+ "raw_message": {
+ "type": "string",
+ "index": "not_analyzed",
+ "ignore_above": 8191
+ },
+ "raw_message_bytes": {
+ "type": "binary",
+ "index": "no"
+ },
+ "raw_message_hash": {
+ "type": "string",
+ "index": "not_analyzed"
+ },
+ "source_type": {
+ "type": "string",
+ "index": "not_analyzed"
+ },
+ "error_type": {
+ "type": "string",
+ "index": "not_analyzed"
+ }
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/134a2331/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/enrichment_commands.py
----------------------------------------------------------------------
diff --git a/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/enrichment_commands.py b/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/enrichment_commands.py
index bc73c87..817f266 100755
--- a/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/enrichment_commands.py
+++ b/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/enrichment_commands.py
@@ -28,8 +28,6 @@ class EnrichmentCommands:
__params = None
__enrichment_topology = None
__enrichment_topic = None
- __enrichment_error_topic = None
- __threat_intel_error_topic = None
__configured = False
def __init__(self, params):
@@ -38,8 +36,6 @@ class EnrichmentCommands:
self.__params = params
self.__enrichment_topology = params.metron_enrichment_topology
self.__enrichment_topic = params.metron_enrichment_topic
- self.__enrichment_error_topic = params.metron_enrichment_error_topic
- self.__threat_intel_error_topic = params.metron_threat_intel_error_topic
self.__configured = os.path.isfile(self.__params.enrichment_configured_flag_file)
def is_configured(self):
@@ -121,7 +117,7 @@ class EnrichmentCommands:
retention_bytes = retention_gigabytes * 1024 * 1024 * 1024
Logger.info("Creating topics for enrichment")
- topics = [self.__enrichment_topic, self.__enrichment_error_topic, self.__threat_intel_error_topic]
+ topics = [self.__enrichment_topic]
for topic in topics:
Logger.info("Creating topic'{0}'".format(topic))
Execute(command_template.format(self.__params.kafka_bin_dir,
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/134a2331/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/indexing_master.py
----------------------------------------------------------------------
diff --git a/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/indexing_master.py b/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/indexing_master.py
index efc048d..53fb17b 100755
--- a/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/indexing_master.py
+++ b/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/indexing_master.py
@@ -95,6 +95,11 @@ class Indexing(Script):
content=StaticFile('yaf_index.template')
)
+ File(params.error_index_path,
+ mode=0755,
+ content=StaticFile('error_index.template')
+ )
+
bro_cmd = ambari_format(
'curl -s -XPOST http://{es_http_url}/_template/bro_index -d @{bro_index_path}')
Execute(bro_cmd, logoutput=True)
@@ -104,6 +109,9 @@ class Indexing(Script):
yaf_cmd = ambari_format(
'curl -s -XPOST http://{es_http_url}/_template/yaf_index -d @{yaf_index_path}')
Execute(yaf_cmd, logoutput=True)
+ error_cmd = ambari_format(
+ 'curl -s -XPOST http://{es_http_url}/_template/error_index -d @{error_index_path}')
+ Execute(error_cmd, logoutput=True)
def elasticsearch_template_delete(self, env):
from params import params
@@ -115,6 +123,8 @@ class Indexing(Script):
Execute(snort_cmd, logoutput=True)
yaf_cmd = ambari_format('curl -s -XDELETE "http://{es_http_url}/yaf_index*"')
Execute(yaf_cmd, logoutput=True)
+ error_cmd = ambari_format('curl -s -XDELETE "http://{es_http_url}/error_index*"')
+ Execute(error_cmd, logoutput=True)
def zeppelin_notebook_import(self, env):
from params import params
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/134a2331/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/params/params_linux.py
----------------------------------------------------------------------
diff --git a/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/params/params_linux.py b/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/params/params_linux.py
index 2427d25..2b8276b 100755
--- a/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/params/params_linux.py
+++ b/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/params/params_linux.py
@@ -158,13 +158,12 @@ threatintel_cf = status_params.threatintel_cf
metron_enrichment_topology = status_params.metron_enrichment_topology
metron_enrichment_topic = status_params.metron_enrichment_topic
-metron_enrichment_error_topic = status_params.metron_enrichment_error_topic
-metron_threat_intel_error_topic = status_params.metron_threat_intel_error_topic
# ES Templates
bro_index_path = tmp_dir + "/bro_index.template"
snort_index_path = tmp_dir + "/snort_index.template"
yaf_index_path = tmp_dir + "/yaf_index.template"
+error_index_path = tmp_dir + "/error_index.template"
# Zeppelin Notebooks
metron_config_zeppelin_path = format("{metron_config_path}/zeppelin")
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/134a2331/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/params/status_params.py
----------------------------------------------------------------------
diff --git a/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/params/status_params.py b/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/params/status_params.py
index e8a8568..961102f 100644
--- a/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/params/status_params.py
+++ b/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/params/status_params.py
@@ -34,8 +34,6 @@ parsers_configured_flag_file = metron_zookeeper_config_path + '/../metron_parser
# Enrichment
metron_enrichment_topology = 'enrichment'
metron_enrichment_topic = 'enrichments'
-metron_enrichment_error_topic = 'enrichments_error'
-metron_threat_intel_error_topic = 'threatintel_error'
enrichment_table = 'enrichment'
enrichment_cf = 't'
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/134a2331/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/parser_commands.py
----------------------------------------------------------------------
diff --git a/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/parser_commands.py b/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/parser_commands.py
index e6f0f3a..574d7e8 100755
--- a/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/parser_commands.py
+++ b/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/parser_commands.py
@@ -118,18 +118,6 @@ class ParserCommands:
num_partitions,
replication_factor,
retention_bytes))
- Logger.info("Creating topics for error handling")
- Execute(command_template.format(self.__params.kafka_bin_dir,
- self.__params.zookeeper_quorum,
- "parser_invalid",
- num_partitions,
- replication_factor,
- retention_bytes))
- Execute(command_template.format(self.__params.kafka_bin_dir,
- self.__params.zookeeper_quorum,
- "parser_error",
- num_partitions, replication_factor,
- retention_bytes))
Logger.info("Done creating Kafka topics")
def start_parser_topologies(self):
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/134a2331/metron-deployment/packaging/docker/rpm-docker/SPECS/metron.spec
----------------------------------------------------------------------
diff --git a/metron-deployment/packaging/docker/rpm-docker/SPECS/metron.spec b/metron-deployment/packaging/docker/rpm-docker/SPECS/metron.spec
index 3e8a11c..2c619e1 100644
--- a/metron-deployment/packaging/docker/rpm-docker/SPECS/metron.spec
+++ b/metron-deployment/packaging/docker/rpm-docker/SPECS/metron.spec
@@ -260,6 +260,7 @@ This package installs the Metron Indexing files
%{metron_home}/config/zookeeper/indexing/websphere.json
%{metron_home}/config/zookeeper/indexing/yaf.json
%{metron_home}/config/zookeeper/indexing/asa.json
+%{metron_home}/config/zookeeper/indexing/error.json
%{metron_home}/config/zeppelin/metron/metron-yaf-telemetry.json
# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/134a2331/metron-deployment/roles/metron_elasticsearch_templates/files/es_templates/error_index.template
----------------------------------------------------------------------
diff --git a/metron-deployment/roles/metron_elasticsearch_templates/files/es_templates/error_index.template b/metron-deployment/roles/metron_elasticsearch_templates/files/es_templates/error_index.template
new file mode 100644
index 0000000..070c90f
--- /dev/null
+++ b/metron-deployment/roles/metron_elasticsearch_templates/files/es_templates/error_index.template
@@ -0,0 +1,53 @@
+{
+ "template": "error_index*",
+ "mappings": {
+ "error_doc": {
+ "_timestamp": {
+ "enabled": true
+ },
+ "properties": {
+ "exception": {
+ "type": "string",
+ "index": "not_analyzed"
+ },
+ "hostname": {
+ "type": "string",
+ "index": "not_analyzed"
+ },
+ "stack": {
+ "type": "string",
+ "index": "not_analyzed"
+ },
+ "time": {
+ "type": "date",
+ "format": "epoch_millis"
+ },
+ "message": {
+ "type": "string",
+ "index": "not_analyzed"
+ },
+ "raw_message": {
+ "type": "string",
+ "index": "not_analyzed",
+ "ignore_above": 8191
+ },
+ "raw_message_bytes": {
+ "type": "binary",
+ "index": "no"
+ },
+ "raw_message_hash": {
+ "type": "string",
+ "index": "not_analyzed"
+ },
+ "source_type": {
+ "type": "string",
+ "index": "not_analyzed"
+ },
+ "error_type": {
+ "type": "string",
+ "index": "not_analyzed"
+ }
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/134a2331/metron-deployment/roles/metron_streaming/templates/config/elasticsearch.global.json
----------------------------------------------------------------------
diff --git a/metron-deployment/roles/metron_streaming/templates/config/elasticsearch.global.json b/metron-deployment/roles/metron_streaming/templates/config/elasticsearch.global.json
index 8177102..87af1c0 100644
--- a/metron-deployment/roles/metron_streaming/templates/config/elasticsearch.global.json
+++ b/metron-deployment/roles/metron_streaming/templates/config/elasticsearch.global.json
@@ -2,5 +2,6 @@
"es.clustername": "{{ elasticsearch_cluster_name }}",
"es.ip": "{{ groups.search[0] }}",
"es.port": "{{ elasticsearch_transport_port }}",
- "es.date.format": "yyyy.MM.dd.HH"
+ "es.date.format": "yyyy.MM.dd.HH",
+ "parser.error.topic": "indexing"
}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/134a2331/metron-platform/metron-common/README.md
----------------------------------------------------------------------
diff --git a/metron-platform/metron-common/README.md b/metron-platform/metron-common/README.md
index 97ecd58..d66ab0c 100644
--- a/metron-platform/metron-common/README.md
+++ b/metron-platform/metron-common/README.md
@@ -784,6 +784,7 @@ This configuration is stored in zookeeper, but looks something like
"es.ip": "node1",
"es.port": "9300",
"es.date.format": "yyyy.MM.dd.HH",
+ "parser.error.topic": "indexing"
"fieldValidations" : [
{
"input" : [ "ip_src_addr", "ip_dst_addr" ],
@@ -901,3 +902,39 @@ Usage examples:
* To dump the existing configs from zookeeper on the singlenode vagrant machine: `$METRON_HOME/bin/zk_load_configs.sh -z node1:2181 -m DUMP`
* To push the configs into zookeeper on the singlenode vagrant machine: `$METRON_HOME/bin/zk_load_configs.sh -z node1:2181 -m PUSH -i $METRON_HOME/config/zookeeper`
* To pull the configs from zookeeper to the singlenode vagrant machine disk: `$METRON_HOME/bin/zk_load_configs.sh -z node1:2181 -m PULL -o $METRON_HOME/config/zookeeper -f`
+
+# Topology Errors
+
+Errors generated in Metron topologies are transformed into JSON format and follow this structure:
+
+```
+{
+ "exception": "java.lang.IllegalStateException: Unable to parse Message: ...",
+ "failed_sensor_type": "bro",
+ "stack": "java.lang.IllegalStateException: Unable to parse Message: ...",
+ "hostname": "node1",
+ "source:type": "error",
+ "raw_message": "{\"http\": {\"ts\":1488809627.000000.31915,\"uid\":\"C9JpSd2vFAWo3mXKz1\", ...",
+ "error_hash": "f7baf053f2d3c801a01d196f40f3468e87eea81788b2567423030100865c5061",
+ "error_type": "parser_error",
+ "message": "Unable to parse Message: {\"http\": {\"ts\":1488809627.000000.31915,\"uid\":\"C9JpSd2vFAWo3mXKz1\", ...",
+ "timestamp": 1488809630698
+}
+```
+
+Each topology can be configured to send error messages to a specific Kafka topic. The parser topologies retrieve this setting from the the `parser.error.topic` setting in the global config:
+```
+{
+ "es.clustername": "metron",
+ "es.ip": "node1",
+ "es.port": "9300",
+ "es.date.format": "yyyy.MM.dd.HH",
+ "parser.error.topic": "indexing"
+}
+```
+
+Error topics for enrichment and threat intel errors are passed into the enrichment topology as flux properties named `enrichment.error.topic` and `threat.intel.error.topic`. These properties can be found in `$METRON_HOME/config/enrichment.properties`.
+
+The error topic for indexing errors is passed into the indexing topology as a flux property named `index.error.topic`. This property can be found in either `$METRON_HOME/config/elasticsearch.properties` or `$METRON_HOME/config/solr.properties` depending on the search engine selected.
+
+By default all error messages are sent to the `indexing` topic so that they are indexed and archived, just like other messages. The indexing config for error messages can be found at `$METRON_HOME/config/zookeeper/indexing/error.json`.
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/134a2331/metron-platform/metron-common/pom.xml
----------------------------------------------------------------------
diff --git a/metron-platform/metron-common/pom.xml b/metron-platform/metron-common/pom.xml
index d51762a..8d1e183 100644
--- a/metron-platform/metron-common/pom.xml
+++ b/metron-platform/metron-common/pom.xml
@@ -308,7 +308,6 @@
</exclusion>
</exclusions>
</dependency>
-
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-test</artifactId>
@@ -363,7 +362,12 @@
<artifactId>stream</artifactId>
<version>2.9.5</version>
</dependency>
-
+ <dependency>
+ <groupId>org.apache.metron</groupId>
+ <artifactId>metron-test-utilities</artifactId>
+ <version>${project.parent.version}</version>
+ <scope>test</scope>
+ </dependency>
</dependencies>
<reporting>
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/134a2331/metron-platform/metron-common/src/main/java/org/apache/metron/common/Constants.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/Constants.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/Constants.java
index 4230678..29be31e 100644
--- a/metron-platform/metron-common/src/main/java/org/apache/metron/common/Constants.java
+++ b/metron-platform/metron-common/src/main/java/org/apache/metron/common/Constants.java
@@ -27,14 +27,8 @@ public class Constants {
public static final long DEFAULT_CONFIGURED_BOLT_TIMEOUT = 5000;
public static final String SENSOR_TYPE = "source.type";
public static final String ENRICHMENT_TOPIC = "enrichments";
- public static final String ENRICHMENT_ERROR_TOPIC = "enrichments_error";
- public static final String THREAT_INTEL_ERROR_TOPIC = "threatintel_error";
public static final String INDEXING_TOPIC = "indexing";
- public static final String INDEXING_ERROR_TOPIC = "indexing_error";
- public static final String DEFAULT_PARSER_ERROR_TOPIC = "parser_error";
- public static final String DEFAULT_PARSER_INVALID_TOPIC = "parser_invalid";
public static final String ERROR_STREAM = "error";
- public static final String INVALID_STREAM = "invalid";
public static final String SIMPLE_HBASE_ENRICHMENT = "hbaseEnrichment";
public static final String SIMPLE_HBASE_THREAT_INTEL = "hbaseThreatIntel";
@@ -72,5 +66,51 @@ public class Constants {
}
}
+ public enum ErrorFields {
+ MESSAGE("message")
+ ,FAILED_SENSOR_TYPE("failed_sensor_type")
+ ,ERROR_TYPE("error_type")
+ ,EXCEPTION("exception")
+ ,STACK("stack")
+ ,TIMESTAMP("timestamp")
+ ,HOSTNAME("hostname")
+ ,RAW_MESSAGE("raw_message")
+ ,RAW_MESSAGE_BYTES("raw_message_bytes")
+ ,ERROR_FIELDS("error_fields")
+ ,ERROR_HASH("error_hash")
+ ;
+
+ private String name;
+
+ ErrorFields(String name) {
+ this.name = name;
+ }
+
+ public String getName() {
+ return name;
+ }
+ }
+
+ public enum ErrorType {
+
+ PARSER_ERROR("parser_error")
+ ,PARSER_INVALID("parser_invalid")
+ ,ENRICHMENT_ERROR("enrichments_error")
+ ,THREAT_INTEL_ERROR("threatintel_error")
+ ,INDEXING_ERROR("indexing_error")
+ ,DEFAULT_ERROR("error")
+ ;
+
+ private String type;
+
+ ErrorType(String type) {
+ this.type = type;
+ }
+
+ public String getType() {
+ return type;
+ }
+ }
+
}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/134a2331/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/FieldValidator.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/FieldValidator.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/FieldValidator.java
index 3302426..970566c 100644
--- a/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/FieldValidator.java
+++ b/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/FieldValidator.java
@@ -66,6 +66,8 @@ public class FieldValidator implements Serializable {
for(Object inputO : (List<Object>)inputObj) {
input.add(inputO.toString());
}
+ } else {
+ input = new ArrayList<>();
}
config = Config.CONFIG.get(validatorConfig, Map.class);
if(config == null) {
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/134a2331/metron-platform/metron-common/src/main/java/org/apache/metron/common/error/MetronError.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/error/MetronError.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/error/MetronError.java
new file mode 100644
index 0000000..2837d34
--- /dev/null
+++ b/metron-platform/metron-common/src/main/java/org/apache/metron/common/error/MetronError.java
@@ -0,0 +1,218 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.common.error;
+
+import org.apache.commons.lang.exception.ExceptionUtils;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.metron.common.Constants;
+import org.apache.metron.common.Constants.ErrorType;
+import org.apache.metron.common.utils.HashUtils;
+import org.json.simple.JSONObject;
+
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Optional;
+import java.util.Set;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static org.apache.metron.common.Constants.ErrorFields;
+
+public class MetronError {
+
+ private String message;
+ private Throwable throwable;
+ private String sensorType = "error";
+ private ErrorType errorType = ErrorType.DEFAULT_ERROR;
+ private Set<String> errorFields;
+ private List<Object> rawMessages;
+
+ public MetronError withMessage(String message) {
+ this.message = message;
+ return this;
+ }
+
+ public MetronError withThrowable(Throwable throwable) {
+ this.throwable = throwable;
+ return this;
+ }
+
+ public MetronError withSensorType(String sensorType) {
+ this.sensorType = sensorType;
+ return this;
+ }
+
+ public MetronError withErrorType(ErrorType errorType) {
+ this.errorType = errorType;
+ return this;
+ }
+
+ public MetronError withErrorFields(Set<String> errorFields) {
+ this.errorFields = errorFields;
+ return this;
+ }
+
+
+ public MetronError addRawMessage(Object rawMessage) {
+ if (rawMessage != null) {
+ if (this.rawMessages == null) {
+ this.rawMessages = new ArrayList<>();
+ }
+ this.rawMessages.add(rawMessage);
+ }
+ return this;
+ }
+
+ public MetronError withRawMessages(List<Object> rawMessages) {
+ this.rawMessages = rawMessages;
+ return this;
+ }
+
+ public Optional<Throwable> getThrowable() {
+ return throwable != null ? Optional.of(throwable) : Optional.empty();
+ }
+
+ @SuppressWarnings({"unchecked"})
+ public JSONObject getJSONObject() {
+ JSONObject errorMessage = new JSONObject();
+ errorMessage.put(Constants.SENSOR_TYPE, "error");
+ errorMessage.put(ErrorFields.FAILED_SENSOR_TYPE.getName(), sensorType);
+ errorMessage.put(ErrorFields.ERROR_TYPE.getName(), errorType.getType());
+
+ addMessageString(errorMessage);
+ addStacktrace(errorMessage);
+ addTimestamp(errorMessage);
+ addHostname(errorMessage);
+ addRawMessages(errorMessage);
+ addErrorHash(errorMessage);
+
+ return errorMessage;
+ }
+
+ @SuppressWarnings({"unchecked"})
+ private void addMessageString(JSONObject errorMessage) {
+ if (message != null) {
+ errorMessage.put(ErrorFields.MESSAGE.getName(), message);
+ } else if (throwable != null) {
+ errorMessage.put(ErrorFields.MESSAGE.getName(), throwable.getMessage());
+ }
+ }
+
+ @SuppressWarnings({"unchecked"})
+ private void addStacktrace(JSONObject errorMessage) {
+ if (throwable != null) {
+ String stackTrace = ExceptionUtils.getStackTrace(throwable);
+ String exception = throwable.toString();
+ errorMessage.put(ErrorFields.EXCEPTION.getName(), exception);
+ errorMessage.put(ErrorFields.STACK.getName(), stackTrace);
+ }
+ }
+
+ @SuppressWarnings({"unchecked"})
+ private void addTimestamp(JSONObject errorMessage) {
+ errorMessage.put(ErrorFields.TIMESTAMP.getName(), System.currentTimeMillis());
+ }
+
+ @SuppressWarnings({"unchecked"})
+ private void addHostname(JSONObject errorMessage) {
+ try {
+ errorMessage.put(ErrorFields.HOSTNAME.getName(), InetAddress.getLocalHost().getHostName());
+ } catch (UnknownHostException ex) {
+ // Leave the hostname field off if it cannot be found
+ }
+ }
+
+ @SuppressWarnings({"unchecked"})
+ private void addRawMessages(JSONObject errorMessage) {
+ if(rawMessages != null) {
+ for(int i = 0; i < rawMessages.size(); i++) {
+ Object rawMessage = rawMessages.get(i);
+ // If multiple messages are included add an index to the field name, otherwise leave it off
+ String rawMessageField = rawMessages.size() == 1 ? ErrorFields.RAW_MESSAGE.getName() : ErrorFields.RAW_MESSAGE.getName() + "_" + i;
+ // It's unclear if we need a rawMessageBytes field so commenting out for now
+ //String rawMessageBytesField = rawMessages.size() == 1 ? ErrorFields.RAW_MESSAGE_BYTES.getName() : ErrorFields.RAW_MESSAGE_BYTES.getName() + "_" + i;
+ if(rawMessage instanceof byte[]) {
+ errorMessage.put(rawMessageField, Bytes.toString((byte[])rawMessage));
+ //errorMessage.put(rawMessageBytesField, com.google.common.primitives.Bytes.asList((byte[])rawMessage));
+ } else if (rawMessage instanceof JSONObject) {
+ JSONObject rawMessageJSON = (JSONObject) rawMessage;
+ String rawMessageJSONString = rawMessageJSON.toJSONString();
+ errorMessage.put(rawMessageField, rawMessageJSONString);
+ //errorMessage.put(rawMessageBytesField, com.google.common.primitives.Bytes.asList(rawMessageJSONString.getBytes(UTF_8)));
+ } else {
+ errorMessage.put(rawMessageField, rawMessage.toString());
+ //errorMessage.put(rawMessageBytesField, com.google.common.primitives.Bytes.asList(rawMessage.toString().getBytes(UTF_8)));
+ }
+ }
+ }
+ }
+
+ @SuppressWarnings({"unchecked"})
+ private void addErrorHash(JSONObject errorMessage) {
+ if (rawMessages != null && rawMessages.size() == 1) {
+ Object rawMessage = rawMessages.get(0);
+ if (rawMessage instanceof JSONObject) {
+ JSONObject rawJSON = (JSONObject) rawMessage;
+ if (errorFields != null) {
+ errorMessage.put(ErrorFields.ERROR_FIELDS.getName(), String.join(",", errorFields));
+ errorMessage.put(ErrorFields.ERROR_HASH.getName(), HashUtils.getMessageHash(rawJSON, errorFields));
+ } else {
+ errorMessage.put(ErrorFields.ERROR_HASH.getName(), HashUtils.getMessageHash(rawJSON));
+ }
+ } else if (rawMessage instanceof byte[]) {
+ errorMessage.put(ErrorFields.ERROR_HASH.getName(), HashUtils.getMessageHash((byte[])rawMessage));
+ } else {
+ errorMessage.put(ErrorFields.ERROR_HASH.getName(), HashUtils.getMessageHash(rawMessage.toString().getBytes(UTF_8)));
+ }
+ }
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+
+ MetronError that = (MetronError) o;
+
+ if (message != null ? !message.equals(that.message) : that.message != null)
+ return false;
+ if (throwable != null ? !throwable.equals(that.throwable) : that.throwable != null)
+ return false;
+ if (sensorType != null ? !sensorType.equals(that.sensorType) : that.sensorType != null)
+ return false;
+ if (errorType != null ? !errorType.equals(that.errorType) : that.errorType != null)
+ return false;
+ if (errorFields != null ? !errorFields.equals(that.errorFields) : that.errorFields != null)
+ return false;
+ return rawMessages != null ? rawMessages.equals(that.rawMessages) : that.rawMessages == null;
+
+ }
+
+ @Override
+ public int hashCode() {
+ int result = message != null ? message.hashCode() : 0;
+ result = 31 * result + (throwable != null ? throwable.hashCode() : 0);
+ result = 31 * result + (sensorType != null ? sensorType.hashCode() : 0);
+ result = 31 * result + (errorType != null ? errorType.hashCode() : 0);
+ result = 31 * result + (errorFields != null ? errorFields.hashCode() : 0);
+ result = 31 * result + (rawMessages != null ? rawMessages.hashCode() : 0);
+ return result;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/134a2331/metron-platform/metron-common/src/main/java/org/apache/metron/common/message/BytesFromPosition.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/message/BytesFromPosition.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/message/BytesFromPosition.java
new file mode 100644
index 0000000..b73228f
--- /dev/null
+++ b/metron-platform/metron-common/src/main/java/org/apache/metron/common/message/BytesFromPosition.java
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.common.message;
+
+import org.apache.storm.tuple.Tuple;
+
+public class BytesFromPosition implements MessageGetStrategy {
+
+ private int position = 0;
+
+ public BytesFromPosition() {};
+
+ public BytesFromPosition(int position) {
+ this.position = position;
+ }
+
+ @Override
+ public byte[] get(Tuple tuple) {
+ return tuple.getBinary(position);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/134a2331/metron-platform/metron-common/src/main/java/org/apache/metron/common/message/JSONFromField.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/message/JSONFromField.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/message/JSONFromField.java
new file mode 100644
index 0000000..39fe9dd
--- /dev/null
+++ b/metron-platform/metron-common/src/main/java/org/apache/metron/common/message/JSONFromField.java
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.common.message;
+
+import org.apache.storm.tuple.Tuple;
+import org.json.simple.JSONObject;
+
+public class JSONFromField implements MessageGetStrategy {
+
+ private String fieldValue = "message";
+
+ public JSONFromField() {};
+
+ public JSONFromField(String fieldValue) {
+ this.fieldValue = fieldValue;
+ }
+
+ @Override
+ public JSONObject get(Tuple tuple) {
+ return (JSONObject) ((JSONObject) tuple.getValueByField(fieldValue)).clone();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/134a2331/metron-platform/metron-common/src/main/java/org/apache/metron/common/message/JSONFromPosition.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/message/JSONFromPosition.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/message/JSONFromPosition.java
new file mode 100644
index 0000000..4407d4f
--- /dev/null
+++ b/metron-platform/metron-common/src/main/java/org/apache/metron/common/message/JSONFromPosition.java
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.common.message;
+
+import org.apache.storm.tuple.Tuple;
+import org.json.simple.JSONObject;
+import org.json.simple.parser.JSONParser;
+
+public class JSONFromPosition implements MessageGetStrategy {
+
+ private int position = 0;
+
+ private ThreadLocal<JSONParser> parser = new ThreadLocal<JSONParser>() {
+ @Override
+ protected JSONParser initialValue() {
+ return new JSONParser();
+ }
+ };
+
+ public JSONFromPosition() {};
+
+ public JSONFromPosition(int position) {
+ this.position = position;
+ }
+
+ @Override
+ public JSONObject get(Tuple tuple) {
+ try {
+ return (JSONObject) parser.get().parse(new String(tuple.getBinary(position), "UTF8"));
+ } catch (Exception e) {
+ throw new IllegalStateException(e.getMessage(), e);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/134a2331/metron-platform/metron-common/src/main/java/org/apache/metron/common/message/MessageGetStrategy.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/message/MessageGetStrategy.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/message/MessageGetStrategy.java
new file mode 100644
index 0000000..0595ce1
--- /dev/null
+++ b/metron-platform/metron-common/src/main/java/org/apache/metron/common/message/MessageGetStrategy.java
@@ -0,0 +1,25 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.common.message;
+
+import org.apache.storm.tuple.Tuple;
+
+public interface MessageGetStrategy {
+
+ Object get(Tuple tuple);
+}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/134a2331/metron-platform/metron-common/src/main/java/org/apache/metron/common/message/MessageGetters.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/message/MessageGetters.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/message/MessageGetters.java
new file mode 100644
index 0000000..a496e08
--- /dev/null
+++ b/metron-platform/metron-common/src/main/java/org/apache/metron/common/message/MessageGetters.java
@@ -0,0 +1,68 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.common.message;
+
+import org.apache.metron.common.utils.ConversionUtils;
+
+import java.util.function.Function;
+
+/**
+ * MessageGetters is a convenience enum for looking up the various implementations of MessageGetStrategy. The MessageGetStrategy
+ * abstraction returns a value from a tuple. The implementations include:
+ * <ul>
+ * <li>BYTES_FROM_POSITION - gets a byte array from the provided position</li>
+ * <li>JSON_FROM_POSITION - gets a byte array from the provided position then converts to a string and parses the string to JSON</li>
+ * <li>JSON_FROM_FIELD - gets a JSONObject from the provided field</li>
+ * <li>OBJECT_FROM_FIELD - gets an Object from the provided field</li>
+ * <li>DEFAULT_BYTES_FROM_POSITION - gets a byte array from position 0</li>
+ * <li>DEFAULT_JSON_FROM_POSITION - gets a byte array from position 0 then converts to a string and parses the string to JSON</li>
+ * <li>DEFAULT_JSON_FROM_FIELD - gets a JSONObject from the "message" field</li>
+ * <li>DEFAULT_OBJECT_FROM_FIELD - gets an Object from the "message" field</li>
+ * </ul>
+ *
+ */
+public enum MessageGetters {
+
+ BYTES_FROM_POSITION((String arg) -> new BytesFromPosition(ConversionUtils.convert(arg, Integer.class))),
+ JSON_FROM_POSITION((String arg) -> new JSONFromPosition(ConversionUtils.convert(arg, Integer.class))),
+ JSON_FROM_FIELD((String arg) -> new JSONFromField(arg)),
+ OBJECT_FROM_FIELD((String arg) -> new ObjectFromField(arg)),
+ DEFAULT_BYTES_FROM_POSITION(new BytesFromPosition()),
+ DEFAULT_JSON_FROM_POSITION(new JSONFromPosition()),
+ DEFAULT_JSON_FROM_FIELD(new JSONFromField()),
+ DEFAULT_OBJECT_FROM_FIELD(new ObjectFromField());
+
+ Function<String, MessageGetStrategy> messageGetStrategyFunction;
+ MessageGetStrategy messageGetStrategy;
+
+ MessageGetters(MessageGetStrategy messageGetStrategy) {
+ this.messageGetStrategy = messageGetStrategy;
+ }
+
+ MessageGetters(Function<String, MessageGetStrategy> messageGetStrategy) {
+ this.messageGetStrategyFunction = messageGetStrategy;
+ }
+
+ public MessageGetStrategy get(String arg) {
+ return messageGetStrategyFunction.apply(arg);
+ }
+
+ public MessageGetStrategy get() {
+ return messageGetStrategy;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/134a2331/metron-platform/metron-common/src/main/java/org/apache/metron/common/message/ObjectFromField.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/message/ObjectFromField.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/message/ObjectFromField.java
new file mode 100644
index 0000000..120c09c
--- /dev/null
+++ b/metron-platform/metron-common/src/main/java/org/apache/metron/common/message/ObjectFromField.java
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.common.message;
+
+import org.apache.storm.tuple.Tuple;
+
+public class ObjectFromField implements MessageGetStrategy {
+
+ private String fieldValue = "message";
+
+ public ObjectFromField() {};
+
+ public ObjectFromField(String fieldValue) {
+ this.fieldValue = fieldValue;
+ }
+
+ @Override
+ public Object get(Tuple tuple) {
+ return tuple.getValueByField(fieldValue);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/134a2331/metron-platform/metron-common/src/main/java/org/apache/metron/common/utils/ErrorUtils.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/utils/ErrorUtils.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/utils/ErrorUtils.java
index 3b3f426..f4e3a8d 100644
--- a/metron-platform/metron-common/src/main/java/org/apache/metron/common/utils/ErrorUtils.java
+++ b/metron-platform/metron-common/src/main/java/org/apache/metron/common/utils/ErrorUtils.java
@@ -17,30 +17,17 @@
*/
package org.apache.metron.common.utils;
-import java.net.InetAddress;
-import java.net.UnknownHostException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-import java.util.Optional;
-
-import org.apache.storm.task.OutputCollector;
-import org.apache.storm.tuple.Values;
-import org.apache.commons.beanutils.Converter;
-import org.apache.commons.lang.exception.ExceptionUtils;
-import org.apache.commons.lang3.ArrayUtils;
-import org.apache.hadoop.hbase.util.Bytes;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.metron.common.Constants;
-import org.json.simple.JSONObject;
+import org.apache.metron.common.error.MetronError;
+import org.apache.storm.task.OutputCollector;
+import org.apache.storm.tuple.Values;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.lang.management.ManagementFactory;
import java.lang.management.ThreadInfo;
import java.lang.management.ThreadMXBean;
-import java.net.InetAddress;
-import java.net.UnknownHostException;
import java.util.Optional;
import java.util.function.Function;
@@ -102,74 +89,16 @@ public class ErrorUtils {
}
}
- @SuppressWarnings("unchecked")
- public static JSONObject generateErrorMessage(String message, Throwable t)
+ public static void handleError(OutputCollector collector, MetronError error)
{
- return generateErrorMessage(message, t, Optional.empty(), Optional.empty());
- }
- public static JSONObject generateErrorMessage(String message
- , Throwable t
- , Optional<String> sensorType
- , Optional<Object> rawMessage
- )
- {
- JSONObject error_message = new JSONObject();
-
- /*
- * Save full stack trace in object.
- */
- String stackTrace = ExceptionUtils.getStackTrace(t);
-
- String exception = t.toString();
-
-
- error_message.put("time", System.currentTimeMillis());
- try {
- error_message.put("hostname", InetAddress.getLocalHost().getHostName());
- } catch (UnknownHostException ex) {
-
+ collector.emit(Constants.ERROR_STREAM, new Values(error.getJSONObject()));
+ Optional<Throwable> throwable = error.getThrowable();
+ if (throwable.isPresent()) {
+ collector.reportError(throwable.get());
}
- if(rawMessage.isPresent()) {
- if(rawMessage.get() instanceof byte[]) {
- error_message.put("rawMessage", Bytes.toString((byte[])rawMessage.get()));
- error_message.put("rawMessage_bytes", toByteArrayList((byte[])rawMessage.get()));
- }
- else {
- error_message.put("rawMessage", rawMessage.get());
- }
- }
- error_message.put("message", message);
- error_message.put(Constants.SENSOR_TYPE, StringUtils.join("_", sensorType, Optional.of("error")));
- error_message.put("exception", exception);
- error_message.put("stack", stackTrace);
- return error_message;
- }
-
- private static List<Byte> toByteArrayList(byte[] list) {
- List<Byte> ret = new ArrayList<>();
- for(byte b : list) {
- ret.add(b);
- }
- return ret;
}
- public static void handleError(OutputCollector collector, Throwable t, String errorStream) {
- handleError(collector, t, errorStream, Optional.empty(), Optional.empty());
- }
- public static void handleError(OutputCollector collector
- , Throwable t
- , String errorStream
- , Optional<String> sensorType
- , Optional<Object> rawMessage
- )
- {
- JSONObject error = ErrorUtils.generateErrorMessage(t.getMessage(), t, sensorType, rawMessage);
- collector.emit(errorStream, new Values(error));
- collector.reportError(t);
- }
-
-
public static String generateThreadDump() {
final StringBuilder dump = new StringBuilder();
final ThreadMXBean threadMXBean = ManagementFactory.getThreadMXBean();
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/134a2331/metron-platform/metron-common/src/main/java/org/apache/metron/common/utils/HashUtils.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/utils/HashUtils.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/utils/HashUtils.java
new file mode 100644
index 0000000..b5170ce
--- /dev/null
+++ b/metron-platform/metron-common/src/main/java/org/apache/metron/common/utils/HashUtils.java
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.common.utils;
+
+import org.apache.commons.codec.digest.DigestUtils;
+import org.json.simple.JSONObject;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+
+public class HashUtils {
+
+ public static String getMessageHash(JSONObject message, Collection<String> hashFields) {
+ List<String> hashElements = hashFields.stream().map(errorField ->
+ String.format("%s-%s", errorField, message.get(errorField))).collect(Collectors.toList());
+ return DigestUtils.sha256Hex(String.join("|", hashElements).getBytes(UTF_8));
+ }
+
+ public static String getMessageHash(JSONObject message) {
+ return DigestUtils.sha256Hex(message.toJSONString().getBytes(UTF_8));
+ }
+
+ public static String getMessageHash(byte[] bytes) {
+ return DigestUtils.sha256Hex(bytes);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/134a2331/metron-platform/metron-common/src/test/java/org/apache/metron/common/error/MetronErrorTest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-common/src/test/java/org/apache/metron/common/error/MetronErrorTest.java b/metron-platform/metron-common/src/test/java/org/apache/metron/common/error/MetronErrorTest.java
new file mode 100644
index 0000000..5e505a8
--- /dev/null
+++ b/metron-platform/metron-common/src/test/java/org/apache/metron/common/error/MetronErrorTest.java
@@ -0,0 +1,109 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.common.error;
+
+import com.google.common.collect.Sets;
+import com.google.common.primitives.Bytes;
+import org.apache.metron.common.Constants;
+import org.json.simple.JSONObject;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.Arrays;
+
+import static org.apache.metron.common.Constants.ErrorFields.RAW_MESSAGE;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+public class MetronErrorTest {
+
+ private JSONObject message1 = new JSONObject();
+ private JSONObject message2 = new JSONObject();
+
+ @Before
+ public void setup() {
+ message1.put("value", "message1");
+ message2.put("value", "message2");
+ }
+
+ @Test
+ public void getJSONObjectShouldReturnBasicInformation() {
+ MetronError error = new MetronError()
+ .withMessage("test message")
+ .withErrorType(Constants.ErrorType.PARSER_ERROR)
+ .withSensorType("sensorType");
+
+ JSONObject errorJSON = error.getJSONObject();
+ assertEquals("test message", errorJSON.get(Constants.ErrorFields.MESSAGE.getName()));
+ assertEquals(Constants.ErrorType.PARSER_ERROR.getType(), errorJSON.get(Constants.ErrorFields.ERROR_TYPE.getName()));
+ assertEquals("error", errorJSON.get(Constants.SENSOR_TYPE));
+ assertEquals("sensorType", errorJSON.get(Constants.ErrorFields.FAILED_SENSOR_TYPE.getName()));
+ assertTrue(((String) errorJSON.get(Constants.ErrorFields.HOSTNAME.getName())).length() > 0);
+ assertTrue(((long) errorJSON.get(Constants.ErrorFields.TIMESTAMP.getName())) > 0);
+ }
+
+ @Test
+ public void getJSONObjectShouldHandleThrowable() {
+ Throwable e = new Exception("test exception");
+ MetronError error = new MetronError().withThrowable(e);
+
+ JSONObject errorJSON = error.getJSONObject();
+ assertEquals("java.lang.Exception: test exception", errorJSON.get(Constants.ErrorFields.EXCEPTION.getName()));
+ assertTrue(((String) errorJSON.get(Constants.ErrorFields.STACK.getName())).startsWith("java.lang.Exception: test exception"));
+ assertEquals(e.getMessage(), errorJSON.get(Constants.ErrorFields.MESSAGE.getName()));
+ }
+
+ @Test
+ public void getJSONObjectShouldIncludeRawMessages() {
+ JSONObject message1 = new JSONObject();
+ JSONObject message2 = new JSONObject();
+ message1.put("value", "message1");
+ message2.put("value", "message2");
+ MetronError error = new MetronError().withRawMessages(Arrays.asList(message1, message2));
+
+ JSONObject errorJSON = error.getJSONObject();
+
+ assertEquals("{\"value\":\"message1\"}", errorJSON.get(Constants.ErrorFields.RAW_MESSAGE.getName() + "_0"));
+ assertEquals("{\"value\":\"message2\"}", errorJSON.get(Constants.ErrorFields.RAW_MESSAGE.getName() + "_1"));
+
+ error = new MetronError().addRawMessage("raw message".getBytes());
+ errorJSON = error.getJSONObject();
+ assertEquals("raw message", errorJSON.get(Constants.ErrorFields.RAW_MESSAGE.getName()));
+ // It's unclear if we need a rawMessageBytes field so commenting out for now
+ //assertEquals(Bytes.asList("raw message".getBytes()), errorJSON.get(Constants.ErrorFields.RAW_MESSAGE_BYTES.getName()));
+ assertEquals("3b02cb29676bc448c69da1ec5eef7c89f4d6dc6a5a7ce0296ea25b207eea36be", errorJSON.get(Constants.ErrorFields.ERROR_HASH.getName()));
+
+ error = new MetronError().addRawMessage(message1);
+ errorJSON = error.getJSONObject();
+ assertEquals("{\"value\":\"message1\"}", errorJSON.get(Constants.ErrorFields.RAW_MESSAGE.getName()));
+ assertEquals("e8aaf87c8494d345aac2d612ffd94fcf0b98c975fe6c4b991e2f8280a3a0bd10", errorJSON.get(Constants.ErrorFields.ERROR_HASH.getName()));
+ }
+
+ @Test
+ public void getJSONObjectShouldIncludeErrorFields() {
+ JSONObject message = new JSONObject();
+ message.put("field1", "value1");
+ message.put("field2", "value2");
+
+ MetronError error = new MetronError().addRawMessage(message).withErrorFields(Sets.newHashSet("field1", "field2"));
+
+ JSONObject errorJSON = error.getJSONObject();
+ assertEquals(Sets.newHashSet("field1", "field2"), Sets.newHashSet(((String) errorJSON.get(Constants.ErrorFields.ERROR_FIELDS.getName())).split(",")));
+ assertEquals("04a2629c39e098c3944be85f35c75876598f2b44b8e5e3f52c59fa1ac182817c", errorJSON.get(Constants.ErrorFields.ERROR_HASH.getName()));
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/134a2331/metron-platform/metron-common/src/test/java/org/apache/metron/common/message/MessageGettersTest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-common/src/test/java/org/apache/metron/common/message/MessageGettersTest.java b/metron-platform/metron-common/src/test/java/org/apache/metron/common/message/MessageGettersTest.java
new file mode 100644
index 0000000..ea7583a
--- /dev/null
+++ b/metron-platform/metron-common/src/test/java/org/apache/metron/common/message/MessageGettersTest.java
@@ -0,0 +1,134 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.common.message;
+
+import org.apache.storm.tuple.Tuple;
+import org.json.simple.JSONObject;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+public class MessageGettersTest {
+
+ @Rule
+ public final ExpectedException exception = ExpectedException.none();
+
+ @Test
+ public void bytesFromPositionShouldReturnBytes() {
+ Tuple tuple = mock(Tuple.class);
+ when(tuple.getBinary(1)).thenReturn("bytes".getBytes(UTF_8));
+
+ MessageGetStrategy messageGetStrategy = MessageGetters.BYTES_FROM_POSITION.get("1");
+ assertEquals("bytes", new String((byte[]) messageGetStrategy.get(tuple), UTF_8));
+ }
+
+ @Test
+ public void jsonFromPositionShouldReturnJSON() {
+ Tuple tuple = mock(Tuple.class);
+ when(tuple.getBinary(1)).thenReturn("{\"field\":\"value\"}".getBytes(UTF_8));
+
+ JSONObject expected = new JSONObject();
+ expected.put("field", "value");
+ MessageGetStrategy messageGetStrategy = MessageGetters.JSON_FROM_POSITION.get("1");
+ assertEquals(expected, messageGetStrategy.get(tuple));
+ }
+
+ @Test
+ public void jsonFromPositionShouldThrowException() {
+ exception.expect(IllegalStateException.class);
+
+ Tuple tuple = mock(Tuple.class);
+ when(tuple.getBinary(1)).thenReturn("{\"field\":".getBytes(UTF_8));
+
+ MessageGetStrategy messageGetStrategy = MessageGetters.JSON_FROM_POSITION.get("1");
+ messageGetStrategy.get(tuple);
+ }
+
+ @Test
+ public void jsonFromFieldShouldReturnJSON() {
+ JSONObject actual = new JSONObject();
+ actual.put("field", "value");
+ Tuple tuple = mock(Tuple.class);
+ when(tuple.getValueByField("tuple_field")).thenReturn(actual);
+
+ JSONObject expected = new JSONObject();
+ expected.put("field", "value");
+ MessageGetStrategy messageGetStrategy = MessageGetters.JSON_FROM_FIELD.get("tuple_field");
+ assertEquals(expected, messageGetStrategy.get(tuple));
+ }
+
+ @Test
+ public void objectFromFieldShouldReturnObject() {
+ Object actual = "object";
+ Tuple tuple = mock(Tuple.class);
+ when(tuple.getValueByField("tuple_field")).thenReturn(actual);
+
+ Object expected = "object";
+ MessageGetStrategy messageGetStrategy = MessageGetters.OBJECT_FROM_FIELD.get("tuple_field");
+ assertEquals(expected, messageGetStrategy.get(tuple));
+ }
+
+ @Test
+ public void defaultBytesFromPositionShouldReturnBytes() {
+ Tuple tuple = mock(Tuple.class);
+ when(tuple.getBinary(0)).thenReturn("bytes".getBytes(UTF_8));
+
+ MessageGetStrategy messageGetStrategy = MessageGetters.DEFAULT_BYTES_FROM_POSITION.get();
+ assertEquals("bytes", new String((byte[]) messageGetStrategy.get(tuple), UTF_8));
+ }
+
+ @Test
+ public void defaultJSONFromPositionShouldReturnJSON() {
+ Tuple tuple = mock(Tuple.class);
+ when(tuple.getBinary(0)).thenReturn("{\"field\":\"value\"}".getBytes(UTF_8));
+
+ JSONObject expected = new JSONObject();
+ expected.put("field", "value");
+ MessageGetStrategy messageGetStrategy = MessageGetters.DEFAULT_JSON_FROM_POSITION.get();
+ assertEquals(expected, messageGetStrategy.get(tuple));
+ }
+
+ @Test
+ public void defaultJSONFromFieldShouldReturnJSON() {
+ JSONObject actual = new JSONObject();
+ actual.put("field", "value");
+ Tuple tuple = mock(Tuple.class);
+ when(tuple.getValueByField("message")).thenReturn(actual);
+
+ JSONObject expected = new JSONObject();
+ expected.put("field", "value");
+ MessageGetStrategy messageGetStrategy = MessageGetters.DEFAULT_JSON_FROM_FIELD.get();
+ assertEquals(expected, messageGetStrategy.get(tuple));
+ }
+
+ @Test
+ public void defaultObjectFromFieldShouldReturnObject() {
+ Object actual = "object";
+ Tuple tuple = mock(Tuple.class);
+ when(tuple.getValueByField("message")).thenReturn(actual);
+
+ Object expected = "object";
+ MessageGetStrategy messageGetStrategy = MessageGetters.DEFAULT_OBJECT_FROM_FIELD.get();
+ assertEquals(expected, messageGetStrategy.get(tuple));
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/134a2331/metron-platform/metron-common/src/test/java/org/apache/metron/common/utils/ErrorUtilsTest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-common/src/test/java/org/apache/metron/common/utils/ErrorUtilsTest.java b/metron-platform/metron-common/src/test/java/org/apache/metron/common/utils/ErrorUtilsTest.java
index f11a5c9..77ea9da 100644
--- a/metron-platform/metron-common/src/test/java/org/apache/metron/common/utils/ErrorUtilsTest.java
+++ b/metron-platform/metron-common/src/test/java/org/apache/metron/common/utils/ErrorUtilsTest.java
@@ -17,6 +17,10 @@
*/
package org.apache.metron.common.utils;
+import org.apache.metron.common.Constants;
+import org.apache.metron.common.error.MetronError;
+import org.apache.metron.test.error.MetronErrorJSONMatcher;
+import org.apache.storm.task.OutputCollector;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
@@ -25,6 +29,12 @@ import java.io.IOException;
import static org.hamcrest.CoreMatchers.instanceOf;
import static org.hamcrest.CoreMatchers.nullValue;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.argThat;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
public class ErrorUtilsTest {
@@ -62,4 +72,15 @@ public class ErrorUtilsTest {
exception.expectCause(instanceOf(IOException.class));
ErrorUtils.RuntimeErrors.ILLEGAL_STATE.throwRuntime("illegal state happened", new IOException("bad io"));
}
+
+ @Test
+ public void handleErrorShouldEmitAndReportError() throws Exception {
+ Throwable e = new Exception("error");
+ MetronError error = new MetronError().withMessage("error message").withThrowable(e);
+ OutputCollector collector = mock(OutputCollector.class);
+
+ ErrorUtils.handleError(collector, error);
+ verify(collector, times(1)).emit(eq(Constants.ERROR_STREAM), argThat(new MetronErrorJSONMatcher(error.getJSONObject())));
+ verify(collector, times(1)).reportError(any());
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/134a2331/metron-platform/metron-common/src/test/java/org/apache/metron/common/utils/HashUtilsTest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-common/src/test/java/org/apache/metron/common/utils/HashUtilsTest.java b/metron-platform/metron-common/src/test/java/org/apache/metron/common/utils/HashUtilsTest.java
new file mode 100644
index 0000000..3037341
--- /dev/null
+++ b/metron-platform/metron-common/src/test/java/org/apache/metron/common/utils/HashUtilsTest.java
@@ -0,0 +1,55 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.common.utils;
+
+import org.json.simple.JSONObject;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.Collection;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static org.junit.Assert.assertEquals;
+
+@SuppressWarnings({"unchecked"})
+public class HashUtilsTest {
+
+ @Test
+ public void getMessageHashShouldReturnHashForHashFields() {
+ JSONObject message = new JSONObject();
+ message.put("field1", "value1");
+ message.put("field2", "value2");
+ message.put("field3", "value3");
+ Collection<String> fields = Arrays.asList("field2", "field3");
+ assertEquals("6eab1c2c827387803ce457c76552f0511858fc1f9505c7dc620e198c0d1f4d02", HashUtils.getMessageHash(message, fields));
+ }
+
+ @Test
+ public void getMessageHashShouldReturnHashForMessage() {
+ JSONObject message = new JSONObject();
+ message.put("field1", "value1");
+ message.put("field2", "value2");
+ message.put("field3", "value3");
+ assertEquals("a76cdafc5aa49180c0b22c78d4415c505f9997c54847cec6c623f4cacf6a2811", HashUtils.getMessageHash(message));
+ }
+
+ @Test
+ public void getMessageHashShouldReturnHashForBytes() {
+ assertEquals("ab530a13e45914982b79f9b7e3fba994cfd1f3fb22f71cea1afbf02b460c6d1d", HashUtils.getMessageHash("message".getBytes(UTF_8)));
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/134a2331/metron-platform/metron-elasticsearch/src/main/config/elasticsearch.properties
----------------------------------------------------------------------
diff --git a/metron-platform/metron-elasticsearch/src/main/config/elasticsearch.properties b/metron-platform/metron-elasticsearch/src/main/config/elasticsearch.properties
index c2c10af..27e9173 100644
--- a/metron-platform/metron-elasticsearch/src/main/config/elasticsearch.properties
+++ b/metron-platform/metron-elasticsearch/src/main/config/elasticsearch.properties
@@ -26,7 +26,7 @@ kafka.start=WHERE_I_LEFT_OFF
##### Indexing #####
index.input.topic=indexing
-index.error.topic=indexing_error
+index.error.topic=indexing
writer.class.name=org.apache.metron.elasticsearch.writer.ElasticsearchWriter
##### ElasticSearch #####
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/134a2331/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/integration/ElasticsearchIndexingIntegrationTest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/integration/ElasticsearchIndexingIntegrationTest.java b/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/integration/ElasticsearchIndexingIntegrationTest.java
index acc1565..87c0081 100644
--- a/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/integration/ElasticsearchIndexingIntegrationTest.java
+++ b/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/integration/ElasticsearchIndexingIntegrationTest.java
@@ -17,8 +17,8 @@
*/
package org.apache.metron.elasticsearch.integration;
-import org.apache.metron.common.Constants;
import org.apache.metron.common.interfaces.FieldNameConverter;
+import org.apache.metron.elasticsearch.integration.components.ElasticSearchComponent;
import org.apache.metron.elasticsearch.writer.ElasticsearchFieldNameConverter;
import org.apache.metron.indexing.integration.IndexingIntegrationTest;
import org.apache.metron.integration.ComponentRunner;
@@ -26,7 +26,6 @@ import org.apache.metron.integration.InMemoryComponent;
import org.apache.metron.integration.Processor;
import org.apache.metron.integration.ProcessorResult;
import org.apache.metron.integration.ReadinessState;
-import org.apache.metron.elasticsearch.integration.components.ElasticSearchComponent;
import org.apache.metron.integration.components.KafkaComponent;
import java.io.File;
@@ -76,7 +75,7 @@ public class ElasticsearchIndexingIntegrationTest extends IndexingIntegrationTes
throw new IllegalStateException("Unable to retrieve indexed documents.", e);
}
if (docs.size() < inputMessages.size() || docs.size() != docsFromDisk.size()) {
- errors = kafkaComponent.readMessages(Constants.INDEXING_ERROR_TOPIC);
+ errors = kafkaComponent.readMessages(ERROR_TOPIC);
if(errors.size() > 0){
return ReadinessState.READY;
}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/134a2331/metron-platform/metron-enrichment/src/main/config/enrichment.properties
----------------------------------------------------------------------
diff --git a/metron-platform/metron-enrichment/src/main/config/enrichment.properties b/metron-platform/metron-enrichment/src/main/config/enrichment.properties
index 84d8461..c905d30 100644
--- a/metron-platform/metron-enrichment/src/main/config/enrichment.properties
+++ b/metron-platform/metron-enrichment/src/main/config/enrichment.properties
@@ -20,8 +20,8 @@
kafka.zk=node1:2181
kafka.broker=node1:6667
enrichment.output.topic=indexing
-enrichment.error.topic=enrichments_error
-threat.intel.error.topic=threatintel_error
+enrichment.error.topic=indexing
+threat.intel.error.topic=indexing
##### Metrics #####
@@ -71,7 +71,11 @@ bolt.hbase.partitioner.region.info.refresh.interval.mins=60
##### Threat Intel #####
-threat.intel.tracker.table=
-threat.intel.tracker.cf=
+threat.intel.tracker.table=access_tracker
+threat.intel.tracker.cf=t
threat.intel.ip.table=
threat.intel.ip.cf=
+threat.intel.simple.hbase.table=threatintel
+threat.intel.simple.hbase.cf=t
+enrichment.simple.hbase.table=enrichment
+enrichment.simple.hbase.cf=t
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/134a2331/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/bolt/GenericEnrichmentBolt.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/bolt/GenericEnrichmentBolt.java b/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/bolt/GenericEnrichmentBolt.java
index bb77b84..d6f1304 100644
--- a/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/bolt/GenericEnrichmentBolt.java
+++ b/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/bolt/GenericEnrichmentBolt.java
@@ -18,6 +18,7 @@
package org.apache.metron.enrichment.bolt;
+import org.apache.metron.common.error.MetronError;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
@@ -41,7 +42,9 @@ import org.json.simple.JSONObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.util.HashSet;
import java.util.Map;
+import java.util.Optional;
import java.util.concurrent.TimeUnit;
/**
@@ -225,6 +228,12 @@ public class GenericEnrichmentBolt extends ConfiguredEnrichmentBolt {
catch(Exception e) {
LOG.error(e.getMessage(), e);
error = true;
+ MetronError metronError = new MetronError()
+ .withErrorType(Constants.ErrorType.ENRICHMENT_ERROR)
+ .withThrowable(e)
+ .withErrorFields(new HashSet() {{ add(field); }})
+ .addRawMessage(rawMessage);
+ ErrorUtils.handleError(collector, metronError);
continue;
}
}
@@ -257,11 +266,14 @@ public class GenericEnrichmentBolt extends ConfiguredEnrichmentBolt {
// errors, so this is made available in order to ensure ERROR_STREAM is output properly.
protected void handleError(String key, JSONObject rawMessage, String subGroup, JSONObject enrichedMessage, Exception e) {
LOG.error("[Metron] Unable to enrich message: " + rawMessage, e);
- JSONObject error = ErrorUtils.generateErrorMessage("Enrichment problem: " + rawMessage, e);
if (key != null) {
collector.emit(enrichmentType, new Values(key, enrichedMessage, subGroup));
}
- collector.emit(ERROR_STREAM, new Values(error));
+ MetronError error = new MetronError()
+ .withErrorType(Constants.ErrorType.ENRICHMENT_ERROR)
+ .withThrowable(e)
+ .addRawMessage(rawMessage);
+ ErrorUtils.handleError(collector, error);
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/134a2331/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/bolt/JoinBolt.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/bolt/JoinBolt.java b/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/bolt/JoinBolt.java
index 101d056..3bbb3f5 100644
--- a/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/bolt/JoinBolt.java
+++ b/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/bolt/JoinBolt.java
@@ -17,27 +17,29 @@
*/
package org.apache.metron.enrichment.bolt;
-import org.apache.storm.task.OutputCollector;
-import org.apache.storm.task.TopologyContext;
-import org.apache.storm.topology.OutputFieldsDeclarer;
-import org.apache.storm.tuple.Fields;
-import org.apache.storm.tuple.Tuple;
-import org.apache.storm.tuple.Values;
import com.google.common.base.Joiner;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
import com.google.common.collect.Sets;
-import org.apache.metron.common.bolt.ConfiguredBolt;
-import org.apache.metron.common.utils.ErrorUtils;
-import org.json.simple.JSONObject;
+import org.apache.metron.common.Constants;
import org.apache.metron.common.bolt.ConfiguredEnrichmentBolt;
+import org.apache.metron.common.error.MetronError;
+import org.apache.metron.common.message.MessageGetStrategy;
+import org.apache.metron.common.message.MessageGetters;
+import org.apache.metron.common.utils.ErrorUtils;
+import org.apache.storm.task.OutputCollector;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.Tuple;
+import org.apache.storm.tuple.Values;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
-import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
public abstract class JoinBolt<V> extends ConfiguredEnrichmentBolt {
@@ -48,6 +50,9 @@ public abstract class JoinBolt<V> extends ConfiguredEnrichmentBolt {
protected transient CacheLoader<String, Map<String, V>> loader;
protected transient LoadingCache<String, Map<String, V>> cache;
+ private transient MessageGetStrategy keyGetStrategy;
+ private transient MessageGetStrategy subgroupGetStrategy;
+ private transient MessageGetStrategy messageGetStrategy;
protected Long maxCacheSize;
protected Long maxTimeRetain;
@@ -68,6 +73,9 @@ public abstract class JoinBolt<V> extends ConfiguredEnrichmentBolt {
@Override
public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {
super.prepare(map, topologyContext, outputCollector);
+ keyGetStrategy = MessageGetters.OBJECT_FROM_FIELD.get("key");
+ subgroupGetStrategy = MessageGetters.OBJECT_FROM_FIELD.get("subgroup");
+ messageGetStrategy = MessageGetters.OBJECT_FROM_FIELD.get("message");
this.collector = outputCollector;
if (this.maxCacheSize == null) {
throw new IllegalStateException("maxCacheSize must be specified");
@@ -91,10 +99,10 @@ public abstract class JoinBolt<V> extends ConfiguredEnrichmentBolt {
@Override
public void execute(Tuple tuple) {
String streamId = tuple.getSourceStreamId();
- String key = (String) tuple.getValueByField("key");
- String subgroup = (String) tuple.getValueByField("subgroup");
+ String key = (String) keyGetStrategy.get(tuple);
+ String subgroup = (String) subgroupGetStrategy.get(tuple);
streamId = Joiner.on(":").join("" + streamId, subgroup == null?"":subgroup);
- V message = (V) tuple.getValueByField("message");
+ V message = (V) messageGetStrategy.get(tuple);
try {
Map<String, V> streamMessageMap = cache.get(key);
if (streamMessageMap.containsKey(streamId)) {
@@ -127,10 +135,13 @@ public abstract class JoinBolt<V> extends ConfiguredEnrichmentBolt {
}
} catch (Exception e) {
LOG.error("[Metron] Unable to join messages: " + message, e);
- JSONObject error = ErrorUtils.generateErrorMessage("Joining problem: " + message, e);
+ MetronError error = new MetronError()
+ .withErrorType(Constants.ErrorType.ENRICHMENT_ERROR)
+ .withMessage("Joining problem: " + message)
+ .withThrowable(e)
+ .addRawMessage(message);
+ ErrorUtils.handleError(collector, error);
collector.ack(tuple);
- collector.emit("error", new Values(error));
- collector.reportError(e);
}
}