You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@metron.apache.org by rm...@apache.org on 2017/03/06 21:25:50 UTC

[3/3] incubator-metron git commit: METRON-694 Index Errors from Topologies (merrimanr) closes apache/incubator-metron#453

METRON-694 Index Errors from Topologies (merrimanr) closes apache/incubator-metron#453


Project: http://git-wip-us.apache.org/repos/asf/incubator-metron/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-metron/commit/134a2331
Tree: http://git-wip-us.apache.org/repos/asf/incubator-metron/tree/134a2331
Diff: http://git-wip-us.apache.org/repos/asf/incubator-metron/diff/134a2331

Branch: refs/heads/master
Commit: 134a23311c51693381fb52e6a90026420ff61b63
Parents: 27ee490
Author: merrimanr <me...@gmail.com>
Authored: Mon Mar 6 15:23:28 2017 -0600
Committer: YOUR NAME as In Apache <YO...@apache.org>
Committed: Mon Mar 6 15:23:28 2017 -0600

----------------------------------------------------------------------
 .../src/main/flux/profiler/remote.yaml          |   2 -
 .../METRON/CURRENT/role_command_order.json      |   3 +-
 .../METRON/CURRENT/configuration/metron-env.xml |   5 +-
 .../CURRENT/package/files/error_index.template  |  53 +++++
 .../package/scripts/enrichment_commands.py      |   6 +-
 .../CURRENT/package/scripts/indexing_master.py  |  10 +
 .../package/scripts/params/params_linux.py      |   3 +-
 .../package/scripts/params/status_params.py     |   2 -
 .../CURRENT/package/scripts/parser_commands.py  |  12 -
 .../docker/rpm-docker/SPECS/metron.spec         |   1 +
 .../files/es_templates/error_index.template     |  53 +++++
 .../templates/config/elasticsearch.global.json  |   3 +-
 metron-platform/metron-common/README.md         |  37 ++++
 metron-platform/metron-common/pom.xml           |   8 +-
 .../org/apache/metron/common/Constants.java     |  52 ++++-
 .../common/configuration/FieldValidator.java    |   2 +
 .../apache/metron/common/error/MetronError.java | 218 +++++++++++++++++++
 .../common/message/BytesFromPosition.java       |  36 +++
 .../metron/common/message/JSONFromField.java    |  37 ++++
 .../metron/common/message/JSONFromPosition.java |  49 +++++
 .../common/message/MessageGetStrategy.java      |  25 +++
 .../metron/common/message/MessageGetters.java   |  68 ++++++
 .../metron/common/message/ObjectFromField.java  |  36 +++
 .../apache/metron/common/utils/ErrorUtils.java  |  87 +-------
 .../apache/metron/common/utils/HashUtils.java   |  44 ++++
 .../metron/common/error/MetronErrorTest.java    | 109 ++++++++++
 .../common/message/MessageGettersTest.java      | 134 ++++++++++++
 .../metron/common/utils/ErrorUtilsTest.java     |  21 ++
 .../metron/common/utils/HashUtilsTest.java      |  55 +++++
 .../src/main/config/elasticsearch.properties    |   2 +-
 .../ElasticsearchIndexingIntegrationTest.java   |   5 +-
 .../src/main/config/enrichment.properties       |  12 +-
 .../enrichment/bolt/GenericEnrichmentBolt.java  |  16 +-
 .../apache/metron/enrichment/bolt/JoinBolt.java |  43 ++--
 .../bolt/BulkMessageWriterBoltTest.java         |   8 +-
 .../bolt/GenericEnrichmentBoltTest.java         |  44 +++-
 .../metron/enrichment/bolt/JoinBoltTest.java    |  23 +-
 .../integration/EnrichmentIntegrationTest.java  |  76 ++++---
 metron-platform/metron-indexing/README.md       |   2 +-
 .../main/config/zookeeper/indexing/error.json   |  17 ++
 .../src/main/flux/indexing/remote.yaml          |   5 +-
 .../integration/IndexingIntegrationTest.java    |   6 +-
 .../metron/integration/ProcessorResult.java     |  23 +-
 .../integration/processors/KafkaMessageSet.java |   7 +-
 .../integration/processors/KafkaProcessor.java  |  16 +-
 metron-platform/metron-parsers/README.md        |   8 +-
 .../apache/metron/parsers/bolt/ParserBolt.java  |  69 ++++--
 .../apache/metron/parsers/bolt/WriterBolt.java  |  34 ++-
 .../metron/parsers/bolt/WriterHandler.java      |   8 +-
 .../parsers/topology/ParserTopologyBuilder.java |  39 +---
 .../parsers/topology/ParserTopologyCLI.java     |   2 -
 .../metron/parsers/bolt/ParserBoltTest.java     |  71 ++++++
 .../metron/parsers/bolt/WriterBoltTest.java     |  36 ++-
 .../integration/ParserIntegrationTest.java      |   9 +-
 .../components/ParserTopologyComponent.java     |   9 +-
 .../integration/WriterBoltIntegrationTest.java  |  43 ++--
 .../metron-solr/src/main/config/solr.properties |   2 +-
 .../SolrIndexingIntegrationTest.java            |   9 +-
 .../test/error/MetronErrorJSONMatcher.java      |  42 ++++
 .../metron/writer/BulkWriterComponent.java      |  48 ++--
 .../writer/bolt/BulkMessageWriterBolt.java      |  43 ++--
 .../metron/writer/message/MessageGetter.java    |  26 ---
 .../metron/writer/message/MessageGetters.java   |  37 ----
 .../writer/message/NamedMessageGetter.java      |  34 ---
 .../metron/writer/message/RawMessageGetter.java |  50 -----
 .../metron/writer/BulkWriterComponentTest.java  | 197 +++++++++++++++++
 66 files changed, 1752 insertions(+), 540 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/134a2331/metron-analytics/metron-profiler/src/main/flux/profiler/remote.yaml
----------------------------------------------------------------------
diff --git a/metron-analytics/metron-profiler/src/main/flux/profiler/remote.yaml b/metron-analytics/metron-profiler/src/main/flux/profiler/remote.yaml
index 0a26b73..f97b97a 100644
--- a/metron-analytics/metron-profiler/src/main/flux/profiler/remote.yaml
+++ b/metron-analytics/metron-profiler/src/main/flux/profiler/remote.yaml
@@ -127,8 +127,6 @@ bolts:
         configMethods:
             -   name: "withMessageWriter"
                 args: [ref: "kafkaWriter"]
-            -   name: "withMessageGetter"
-                args: ["NAMED"]
 
 streams:
 

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/134a2331/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/addon-services/METRON/CURRENT/role_command_order.json
----------------------------------------------------------------------
diff --git a/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/addon-services/METRON/CURRENT/role_command_order.json b/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/addon-services/METRON/CURRENT/role_command_order.json
index e08f401..0b04f12 100755
--- a/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/addon-services/METRON/CURRENT/role_command_order.json
+++ b/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/addon-services/METRON/CURRENT/role_command_order.json
@@ -4,8 +4,9 @@
     "general_deps" : {
         "_comment" : "dependencies for all cases",
         "METRON_INDEXING-INSTALL" : ["METRON_PARSERS-INSTALL"],
+        "METRON_ENRICHMENT-INSTALL" : ["METRON_INDEXING-INSTALL"],
         "METRON_PARSERS-START" : ["NAMENODE-START", "ZOOKEEPER_SERVER-START", "KAFKA_BROKER-START", "STORM_REST_API-START"],
-        "METRON_ENRICHMENT_MASTER-START" : ["NAMENODE-START", "ZOOKEEPER_SERVER-START", "KAFKA_BROKER-START", "STORM_REST_API-START", "HBASE_MASTER-START", "HBASE_REGIONSERVER-START"],
+        "METRON_ENRICHMENT_MASTER-START" : ["NAMENODE-START", "ZOOKEEPER_SERVER-START", "KAFKA_BROKER-START", "STORM_REST_API-START", "HBASE_MASTER-START", "HBASE_REGIONSERVER-START", "METRON_INDEXING-START"],
         "METRON_ENRICHMENT_SERVICE_CHECK-SERVICE_CHECK" : ["METRON_ENRICHMENT_MASTER-START"],
         "METRON_INDEXING-START" : ["NAMENODE-START", "ZOOKEEPER_SERVER-START", "KAFKA_BROKER-START", "STORM_REST_API-START","METRON_PARSERS-START"],
         "METRON_SERVICE_CHECK-SERVICE_CHECK" : ["METRON_PARSERS-START","METRON_INDEXING-START"]

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/134a2331/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/configuration/metron-env.xml
----------------------------------------------------------------------
diff --git a/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/configuration/metron-env.xml b/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/configuration/metron-env.xml
index eeb2037..cbff4a9 100644
--- a/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/configuration/metron-env.xml
+++ b/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/configuration/metron-env.xml
@@ -150,7 +150,8 @@
 {
 "es.clustername": "{{ es_cluster_name }}",
 "es.ip": "{{ es_url }}",
-"es.date.format": "yyyy.MM.dd.HH"
+"es.date.format": "yyyy.MM.dd.HH",
+"parser.error.topic": "indexing"
 }
         </value>
         <value-attributes>
@@ -171,7 +172,7 @@ kafka.broker={{ kafka_brokers }}
 kafka.start=WHERE_I_LEFT_OFF
 ##### Indexing #####
 index.input.topic=indexing
-index.error.topic=indexing_error
+index.error.topic=indexing
 writer.class.name=org.apache.metron.elasticsearch.writer.ElasticsearchWriter
 ##### Metrics #####
 #reporters

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/134a2331/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/files/error_index.template
----------------------------------------------------------------------
diff --git a/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/files/error_index.template b/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/files/error_index.template
new file mode 100644
index 0000000..070c90f
--- /dev/null
+++ b/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/files/error_index.template
@@ -0,0 +1,53 @@
+{
+  "template": "error_index*",
+  "mappings": {
+    "error_doc": {
+      "_timestamp": {
+        "enabled": true
+      },
+      "properties": {
+        "exception": {
+          "type": "string",
+          "index": "not_analyzed"
+        },
+        "hostname": {
+          "type": "string",
+          "index": "not_analyzed"
+        },
+        "stack": {
+          "type": "string",
+          "index": "not_analyzed"
+        },
+        "time": {
+          "type": "date",
+          "format": "epoch_millis"
+        },
+        "message": {
+          "type": "string",
+          "index": "not_analyzed"
+        },
+        "raw_message": {
+          "type": "string",
+          "index": "not_analyzed",
+          "ignore_above": 8191
+        },
+        "raw_message_bytes": {
+          "type": "binary",
+          "index": "no"
+        },
+        "raw_message_hash": {
+          "type": "string",
+          "index": "not_analyzed"
+        },
+        "source_type": {
+          "type": "string",
+          "index": "not_analyzed"
+        },
+        "error_type": {
+          "type": "string",
+          "index": "not_analyzed"
+        }
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/134a2331/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/enrichment_commands.py
----------------------------------------------------------------------
diff --git a/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/enrichment_commands.py b/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/enrichment_commands.py
index bc73c87..817f266 100755
--- a/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/enrichment_commands.py
+++ b/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/enrichment_commands.py
@@ -28,8 +28,6 @@ class EnrichmentCommands:
     __params = None
     __enrichment_topology = None
     __enrichment_topic = None
-    __enrichment_error_topic = None
-    __threat_intel_error_topic = None
     __configured = False
 
     def __init__(self, params):
@@ -38,8 +36,6 @@ class EnrichmentCommands:
         self.__params = params
         self.__enrichment_topology = params.metron_enrichment_topology
         self.__enrichment_topic = params.metron_enrichment_topic
-        self.__enrichment_error_topic = params.metron_enrichment_error_topic
-        self.__threat_intel_error_topic = params.metron_threat_intel_error_topic
         self.__configured = os.path.isfile(self.__params.enrichment_configured_flag_file)
 
     def is_configured(self):
@@ -121,7 +117,7 @@ class EnrichmentCommands:
         retention_bytes = retention_gigabytes * 1024 * 1024 * 1024
 
         Logger.info("Creating topics for enrichment")
-        topics = [self.__enrichment_topic, self.__enrichment_error_topic, self.__threat_intel_error_topic]
+        topics = [self.__enrichment_topic]
         for topic in topics:
             Logger.info("Creating topic'{0}'".format(topic))
             Execute(command_template.format(self.__params.kafka_bin_dir,

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/134a2331/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/indexing_master.py
----------------------------------------------------------------------
diff --git a/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/indexing_master.py b/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/indexing_master.py
index efc048d..53fb17b 100755
--- a/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/indexing_master.py
+++ b/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/indexing_master.py
@@ -95,6 +95,11 @@ class Indexing(Script):
              content=StaticFile('yaf_index.template')
              )
 
+        File(params.error_index_path,
+             mode=0755,
+             content=StaticFile('error_index.template')
+             )
+
         bro_cmd = ambari_format(
             'curl -s -XPOST http://{es_http_url}/_template/bro_index -d @{bro_index_path}')
         Execute(bro_cmd, logoutput=True)
@@ -104,6 +109,9 @@ class Indexing(Script):
         yaf_cmd = ambari_format(
             'curl -s -XPOST http://{es_http_url}/_template/yaf_index -d @{yaf_index_path}')
         Execute(yaf_cmd, logoutput=True)
+        error_cmd = ambari_format(
+            'curl -s -XPOST http://{es_http_url}/_template/error_index -d @{error_index_path}')
+        Execute(error_cmd, logoutput=True)
 
     def elasticsearch_template_delete(self, env):
         from params import params
@@ -115,6 +123,8 @@ class Indexing(Script):
         Execute(snort_cmd, logoutput=True)
         yaf_cmd = ambari_format('curl -s -XDELETE "http://{es_http_url}/yaf_index*"')
         Execute(yaf_cmd, logoutput=True)
+        error_cmd = ambari_format('curl -s -XDELETE "http://{es_http_url}/error_index*"')
+        Execute(error_cmd, logoutput=True)
 
     def zeppelin_notebook_import(self, env):
         from params import params

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/134a2331/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/params/params_linux.py
----------------------------------------------------------------------
diff --git a/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/params/params_linux.py b/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/params/params_linux.py
index 2427d25..2b8276b 100755
--- a/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/params/params_linux.py
+++ b/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/params/params_linux.py
@@ -158,13 +158,12 @@ threatintel_cf = status_params.threatintel_cf
 
 metron_enrichment_topology = status_params.metron_enrichment_topology
 metron_enrichment_topic = status_params.metron_enrichment_topic
-metron_enrichment_error_topic = status_params.metron_enrichment_error_topic
-metron_threat_intel_error_topic = status_params.metron_threat_intel_error_topic
 
 # ES Templates
 bro_index_path = tmp_dir + "/bro_index.template"
 snort_index_path = tmp_dir + "/snort_index.template"
 yaf_index_path = tmp_dir + "/yaf_index.template"
+error_index_path = tmp_dir + "/error_index.template"
 
 # Zeppelin Notebooks
 metron_config_zeppelin_path = format("{metron_config_path}/zeppelin")

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/134a2331/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/params/status_params.py
----------------------------------------------------------------------
diff --git a/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/params/status_params.py b/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/params/status_params.py
index e8a8568..961102f 100644
--- a/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/params/status_params.py
+++ b/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/params/status_params.py
@@ -34,8 +34,6 @@ parsers_configured_flag_file = metron_zookeeper_config_path + '/../metron_parser
 # Enrichment
 metron_enrichment_topology = 'enrichment'
 metron_enrichment_topic = 'enrichments'
-metron_enrichment_error_topic = 'enrichments_error'
-metron_threat_intel_error_topic = 'threatintel_error'
 
 enrichment_table = 'enrichment'
 enrichment_cf = 't'

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/134a2331/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/parser_commands.py
----------------------------------------------------------------------
diff --git a/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/parser_commands.py b/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/parser_commands.py
index e6f0f3a..574d7e8 100755
--- a/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/parser_commands.py
+++ b/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/parser_commands.py
@@ -118,18 +118,6 @@ class ParserCommands:
                                             num_partitions,
                                             replication_factor,
                                             retention_bytes))
-        Logger.info("Creating topics for error handling")
-        Execute(command_template.format(self.__params.kafka_bin_dir,
-                                        self.__params.zookeeper_quorum,
-                                        "parser_invalid",
-                                        num_partitions,
-                                        replication_factor,
-                                        retention_bytes))
-        Execute(command_template.format(self.__params.kafka_bin_dir,
-                                        self.__params.zookeeper_quorum,
-                                        "parser_error",
-                                        num_partitions, replication_factor,
-                                        retention_bytes))
         Logger.info("Done creating Kafka topics")
 
     def start_parser_topologies(self):

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/134a2331/metron-deployment/packaging/docker/rpm-docker/SPECS/metron.spec
----------------------------------------------------------------------
diff --git a/metron-deployment/packaging/docker/rpm-docker/SPECS/metron.spec b/metron-deployment/packaging/docker/rpm-docker/SPECS/metron.spec
index 3e8a11c..2c619e1 100644
--- a/metron-deployment/packaging/docker/rpm-docker/SPECS/metron.spec
+++ b/metron-deployment/packaging/docker/rpm-docker/SPECS/metron.spec
@@ -260,6 +260,7 @@ This package installs the Metron Indexing files
 %{metron_home}/config/zookeeper/indexing/websphere.json
 %{metron_home}/config/zookeeper/indexing/yaf.json
 %{metron_home}/config/zookeeper/indexing/asa.json
+%{metron_home}/config/zookeeper/indexing/error.json
 %{metron_home}/config/zeppelin/metron/metron-yaf-telemetry.json
 
 # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/134a2331/metron-deployment/roles/metron_elasticsearch_templates/files/es_templates/error_index.template
----------------------------------------------------------------------
diff --git a/metron-deployment/roles/metron_elasticsearch_templates/files/es_templates/error_index.template b/metron-deployment/roles/metron_elasticsearch_templates/files/es_templates/error_index.template
new file mode 100644
index 0000000..070c90f
--- /dev/null
+++ b/metron-deployment/roles/metron_elasticsearch_templates/files/es_templates/error_index.template
@@ -0,0 +1,53 @@
+{
+  "template": "error_index*",
+  "mappings": {
+    "error_doc": {
+      "_timestamp": {
+        "enabled": true
+      },
+      "properties": {
+        "exception": {
+          "type": "string",
+          "index": "not_analyzed"
+        },
+        "hostname": {
+          "type": "string",
+          "index": "not_analyzed"
+        },
+        "stack": {
+          "type": "string",
+          "index": "not_analyzed"
+        },
+        "time": {
+          "type": "date",
+          "format": "epoch_millis"
+        },
+        "message": {
+          "type": "string",
+          "index": "not_analyzed"
+        },
+        "raw_message": {
+          "type": "string",
+          "index": "not_analyzed",
+          "ignore_above": 8191
+        },
+        "raw_message_bytes": {
+          "type": "binary",
+          "index": "no"
+        },
+        "raw_message_hash": {
+          "type": "string",
+          "index": "not_analyzed"
+        },
+        "source_type": {
+          "type": "string",
+          "index": "not_analyzed"
+        },
+        "error_type": {
+          "type": "string",
+          "index": "not_analyzed"
+        }
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/134a2331/metron-deployment/roles/metron_streaming/templates/config/elasticsearch.global.json
----------------------------------------------------------------------
diff --git a/metron-deployment/roles/metron_streaming/templates/config/elasticsearch.global.json b/metron-deployment/roles/metron_streaming/templates/config/elasticsearch.global.json
index 8177102..87af1c0 100644
--- a/metron-deployment/roles/metron_streaming/templates/config/elasticsearch.global.json
+++ b/metron-deployment/roles/metron_streaming/templates/config/elasticsearch.global.json
@@ -2,5 +2,6 @@
   "es.clustername": "{{ elasticsearch_cluster_name }}",
   "es.ip": "{{ groups.search[0] }}",
   "es.port": "{{ elasticsearch_transport_port }}",
-  "es.date.format": "yyyy.MM.dd.HH"
+  "es.date.format": "yyyy.MM.dd.HH",
+  "parser.error.topic": "indexing"
 }

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/134a2331/metron-platform/metron-common/README.md
----------------------------------------------------------------------
diff --git a/metron-platform/metron-common/README.md b/metron-platform/metron-common/README.md
index 97ecd58..d66ab0c 100644
--- a/metron-platform/metron-common/README.md
+++ b/metron-platform/metron-common/README.md
@@ -784,6 +784,7 @@ This configuration is stored in zookeeper, but looks something like
   "es.ip": "node1",
   "es.port": "9300",
   "es.date.format": "yyyy.MM.dd.HH",
+  "parser.error.topic": "indexing"
   "fieldValidations" : [
               {
                 "input" : [ "ip_src_addr", "ip_dst_addr" ],
@@ -901,3 +902,39 @@ Usage examples:
 * To dump the existing configs from zookeeper on the singlenode vagrant machine: `$METRON_HOME/bin/zk_load_configs.sh -z node1:2181 -m DUMP`
 * To push the configs into zookeeper on the singlenode vagrant machine: `$METRON_HOME/bin/zk_load_configs.sh -z node1:2181 -m PUSH -i $METRON_HOME/config/zookeeper`
 * To pull the configs from zookeeper to the singlenode vagrant machine disk: `$METRON_HOME/bin/zk_load_configs.sh -z node1:2181 -m PULL -o $METRON_HOME/config/zookeeper -f`
+
+# Topology Errors
+
+Errors generated in Metron topologies are transformed into JSON format and follow this structure:
+
+```
+{
+  "exception": "java.lang.IllegalStateException: Unable to parse Message: ...",
+  "failed_sensor_type": "bro",
+  "stack": "java.lang.IllegalStateException: Unable to parse Message: ...",
+  "hostname": "node1",
+  "source:type": "error",
+  "raw_message": "{\"http\": {\"ts\":1488809627.000000.31915,\"uid\":\"C9JpSd2vFAWo3mXKz1\", ...",
+  "error_hash": "f7baf053f2d3c801a01d196f40f3468e87eea81788b2567423030100865c5061",
+  "error_type": "parser_error",
+  "message": "Unable to parse Message: {\"http\": {\"ts\":1488809627.000000.31915,\"uid\":\"C9JpSd2vFAWo3mXKz1\", ...",
+  "timestamp": 1488809630698
+}
+```
+
+Each topology can be configured to send error messages to a specific Kafka topic.  The parser topologies retrieve this setting from the the `parser.error.topic` setting in the global config:
+```
+{
+  "es.clustername": "metron",
+  "es.ip": "node1",
+  "es.port": "9300",
+  "es.date.format": "yyyy.MM.dd.HH",
+  "parser.error.topic": "indexing"
+}
+```
+
+Error topics for enrichment and threat intel errors are passed into the enrichment topology as flux properties named `enrichment.error.topic` and `threat.intel.error.topic`.  These properties can be found in `$METRON_HOME/config/enrichment.properties`.
+  
+The error topic for indexing errors is passed into the indexing topology as a flux property named `index.error.topic`.  This property can be found in either `$METRON_HOME/config/elasticsearch.properties` or `$METRON_HOME/config/solr.properties` depending on the search engine selected.
+
+By default all error messages are sent to the `indexing` topic so that they are indexed and archived, just like other messages.  The indexing config for error messages can be found at `$METRON_HOME/config/zookeeper/indexing/error.json`.

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/134a2331/metron-platform/metron-common/pom.xml
----------------------------------------------------------------------
diff --git a/metron-platform/metron-common/pom.xml b/metron-platform/metron-common/pom.xml
index d51762a..8d1e183 100644
--- a/metron-platform/metron-common/pom.xml
+++ b/metron-platform/metron-common/pom.xml
@@ -308,7 +308,6 @@
                 </exclusion>
             </exclusions>
         </dependency>
-
         <dependency>
             <groupId>org.apache.curator</groupId>
             <artifactId>curator-test</artifactId>
@@ -363,7 +362,12 @@
             <artifactId>stream</artifactId>
             <version>2.9.5</version>
         </dependency>
-
+        <dependency>
+          <groupId>org.apache.metron</groupId>
+          <artifactId>metron-test-utilities</artifactId>
+          <version>${project.parent.version}</version>
+          <scope>test</scope>
+        </dependency>
     </dependencies>
 
     <reporting>

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/134a2331/metron-platform/metron-common/src/main/java/org/apache/metron/common/Constants.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/Constants.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/Constants.java
index 4230678..29be31e 100644
--- a/metron-platform/metron-common/src/main/java/org/apache/metron/common/Constants.java
+++ b/metron-platform/metron-common/src/main/java/org/apache/metron/common/Constants.java
@@ -27,14 +27,8 @@ public class Constants {
   public static final long DEFAULT_CONFIGURED_BOLT_TIMEOUT = 5000;
   public static final String SENSOR_TYPE = "source.type";
   public static final String ENRICHMENT_TOPIC = "enrichments";
-  public static final String ENRICHMENT_ERROR_TOPIC = "enrichments_error";
-  public static final String THREAT_INTEL_ERROR_TOPIC = "threatintel_error";
   public static final String INDEXING_TOPIC = "indexing";
-  public static final String INDEXING_ERROR_TOPIC = "indexing_error";
-  public static final String DEFAULT_PARSER_ERROR_TOPIC = "parser_error";
-  public static final String DEFAULT_PARSER_INVALID_TOPIC = "parser_invalid";
   public static final String ERROR_STREAM = "error";
-  public static final String INVALID_STREAM = "invalid";
   public static final String SIMPLE_HBASE_ENRICHMENT = "hbaseEnrichment";
   public static final String SIMPLE_HBASE_THREAT_INTEL = "hbaseThreatIntel";
 
@@ -72,5 +66,51 @@ public class Constants {
     }
   }
 
+  public enum ErrorFields {
+    MESSAGE("message")
+    ,FAILED_SENSOR_TYPE("failed_sensor_type")
+    ,ERROR_TYPE("error_type")
+    ,EXCEPTION("exception")
+    ,STACK("stack")
+    ,TIMESTAMP("timestamp")
+    ,HOSTNAME("hostname")
+    ,RAW_MESSAGE("raw_message")
+    ,RAW_MESSAGE_BYTES("raw_message_bytes")
+    ,ERROR_FIELDS("error_fields")
+    ,ERROR_HASH("error_hash")
+    ;
+
+    private String name;
+
+    ErrorFields(String name) {
+      this.name = name;
+    }
+
+    public String getName() {
+      return name;
+    }
+  }
+
+  public enum ErrorType {
+
+     PARSER_ERROR("parser_error")
+    ,PARSER_INVALID("parser_invalid")
+    ,ENRICHMENT_ERROR("enrichments_error")
+    ,THREAT_INTEL_ERROR("threatintel_error")
+    ,INDEXING_ERROR("indexing_error")
+    ,DEFAULT_ERROR("error")
+    ;
+
+    private String type;
+
+    ErrorType(String type) {
+      this.type = type;
+    }
+
+    public String getType() {
+      return type;
+    }
+  }
+
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/134a2331/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/FieldValidator.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/FieldValidator.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/FieldValidator.java
index 3302426..970566c 100644
--- a/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/FieldValidator.java
+++ b/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/FieldValidator.java
@@ -66,6 +66,8 @@ public class FieldValidator implements Serializable {
         for(Object inputO : (List<Object>)inputObj) {
           input.add(inputO.toString());
         }
+      } else {
+        input = new ArrayList<>();
       }
       config = Config.CONFIG.get(validatorConfig, Map.class);
       if(config == null) {

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/134a2331/metron-platform/metron-common/src/main/java/org/apache/metron/common/error/MetronError.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/error/MetronError.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/error/MetronError.java
new file mode 100644
index 0000000..2837d34
--- /dev/null
+++ b/metron-platform/metron-common/src/main/java/org/apache/metron/common/error/MetronError.java
@@ -0,0 +1,218 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.common.error;
+
+import org.apache.commons.lang.exception.ExceptionUtils;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.metron.common.Constants;
+import org.apache.metron.common.Constants.ErrorType;
+import org.apache.metron.common.utils.HashUtils;
+import org.json.simple.JSONObject;
+
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Optional;
+import java.util.Set;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static org.apache.metron.common.Constants.ErrorFields;
+
+public class MetronError {
+
+  private String message;
+  private Throwable throwable;
+  private String sensorType = "error";
+  private ErrorType errorType = ErrorType.DEFAULT_ERROR;
+  private Set<String> errorFields;
+  private List<Object> rawMessages;
+
+  public MetronError withMessage(String message) {
+    this.message = message;
+    return this;
+  }
+
+  public MetronError withThrowable(Throwable throwable) {
+    this.throwable = throwable;
+    return this;
+  }
+
+  public MetronError withSensorType(String sensorType) {
+    this.sensorType = sensorType;
+    return this;
+  }
+
+  public MetronError withErrorType(ErrorType errorType) {
+    this.errorType = errorType;
+    return this;
+  }
+
+  public MetronError withErrorFields(Set<String> errorFields) {
+    this.errorFields = errorFields;
+    return this;
+  }
+
+
+  public MetronError addRawMessage(Object rawMessage) {
+    if (rawMessage != null) {
+      if (this.rawMessages == null) {
+        this.rawMessages = new ArrayList<>();
+      }
+      this.rawMessages.add(rawMessage);
+    }
+    return this;
+  }
+
+  public MetronError withRawMessages(List<Object> rawMessages) {
+    this.rawMessages = rawMessages;
+    return this;
+  }
+
+  public Optional<Throwable> getThrowable() {
+    return throwable != null ? Optional.of(throwable) : Optional.empty();
+  }
+
+  @SuppressWarnings({"unchecked"})
+  public JSONObject getJSONObject() {
+    JSONObject errorMessage = new JSONObject();
+    errorMessage.put(Constants.SENSOR_TYPE, "error");
+    errorMessage.put(ErrorFields.FAILED_SENSOR_TYPE.getName(), sensorType);
+    errorMessage.put(ErrorFields.ERROR_TYPE.getName(), errorType.getType());
+
+    addMessageString(errorMessage);
+		addStacktrace(errorMessage);
+    addTimestamp(errorMessage);
+    addHostname(errorMessage);
+    addRawMessages(errorMessage);
+    addErrorHash(errorMessage);
+
+    return errorMessage;
+  }
+
+  @SuppressWarnings({"unchecked"})
+  private void addMessageString(JSONObject errorMessage) {
+    if (message != null) {
+      errorMessage.put(ErrorFields.MESSAGE.getName(), message);
+    } else if (throwable != null) {
+      errorMessage.put(ErrorFields.MESSAGE.getName(), throwable.getMessage());
+    }
+  }
+
+  @SuppressWarnings({"unchecked"})
+  private void addStacktrace(JSONObject errorMessage) {
+    if (throwable != null) {
+      String stackTrace = ExceptionUtils.getStackTrace(throwable);
+      String exception = throwable.toString();
+      errorMessage.put(ErrorFields.EXCEPTION.getName(), exception);
+      errorMessage.put(ErrorFields.STACK.getName(), stackTrace);
+    }
+  }
+
+  @SuppressWarnings({"unchecked"})
+  private void addTimestamp(JSONObject errorMessage) {
+    errorMessage.put(ErrorFields.TIMESTAMP.getName(), System.currentTimeMillis());
+  }
+
+  @SuppressWarnings({"unchecked"})
+  private void addHostname(JSONObject errorMessage) {
+    try {
+      errorMessage.put(ErrorFields.HOSTNAME.getName(), InetAddress.getLocalHost().getHostName());
+    } catch (UnknownHostException ex) {
+      // Leave the hostname field off if it cannot be found
+    }
+  }
+
+  @SuppressWarnings({"unchecked"})
+  private void addRawMessages(JSONObject errorMessage) {
+    if(rawMessages != null) {
+      for(int i = 0; i < rawMessages.size(); i++) {
+        Object rawMessage = rawMessages.get(i);
+        // If multiple messages are included add an index to the field name, otherwise leave it off
+        String rawMessageField = rawMessages.size() == 1 ? ErrorFields.RAW_MESSAGE.getName() : ErrorFields.RAW_MESSAGE.getName() + "_" + i;
+        // It's unclear if we need a rawMessageBytes field so commenting out for now
+        //String rawMessageBytesField = rawMessages.size() == 1 ? ErrorFields.RAW_MESSAGE_BYTES.getName() : ErrorFields.RAW_MESSAGE_BYTES.getName() + "_" + i;
+        if(rawMessage instanceof byte[]) {
+          errorMessage.put(rawMessageField, Bytes.toString((byte[])rawMessage));
+          //errorMessage.put(rawMessageBytesField, com.google.common.primitives.Bytes.asList((byte[])rawMessage));
+        } else if (rawMessage instanceof JSONObject) {
+          JSONObject rawMessageJSON = (JSONObject) rawMessage;
+          String rawMessageJSONString = rawMessageJSON.toJSONString();
+          errorMessage.put(rawMessageField, rawMessageJSONString);
+          //errorMessage.put(rawMessageBytesField, com.google.common.primitives.Bytes.asList(rawMessageJSONString.getBytes(UTF_8)));
+        } else {
+          errorMessage.put(rawMessageField, rawMessage.toString());
+          //errorMessage.put(rawMessageBytesField, com.google.common.primitives.Bytes.asList(rawMessage.toString().getBytes(UTF_8)));
+        }
+      }
+    }
+  }
+
+  @SuppressWarnings({"unchecked"})
+  private void addErrorHash(JSONObject errorMessage) {
+    if (rawMessages != null && rawMessages.size() == 1) {
+      Object rawMessage = rawMessages.get(0);
+      if (rawMessage instanceof JSONObject) {
+        JSONObject rawJSON = (JSONObject) rawMessage;
+        if (errorFields != null) {
+          errorMessage.put(ErrorFields.ERROR_FIELDS.getName(), String.join(",", errorFields));
+          errorMessage.put(ErrorFields.ERROR_HASH.getName(), HashUtils.getMessageHash(rawJSON, errorFields));
+        } else {
+          errorMessage.put(ErrorFields.ERROR_HASH.getName(), HashUtils.getMessageHash(rawJSON));
+        }
+      } else if (rawMessage instanceof byte[]) {
+        errorMessage.put(ErrorFields.ERROR_HASH.getName(), HashUtils.getMessageHash((byte[])rawMessage));
+      } else {
+        errorMessage.put(ErrorFields.ERROR_HASH.getName(), HashUtils.getMessageHash(rawMessage.toString().getBytes(UTF_8)));
+      }
+    }
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) return true;
+    if (o == null || getClass() != o.getClass()) return false;
+
+    MetronError that = (MetronError) o;
+
+    if (message != null ? !message.equals(that.message) : that.message != null)
+      return false;
+    if (throwable != null ? !throwable.equals(that.throwable) : that.throwable != null)
+      return false;
+    if (sensorType != null ? !sensorType.equals(that.sensorType) : that.sensorType != null)
+      return false;
+    if (errorType != null ? !errorType.equals(that.errorType) : that.errorType != null)
+      return false;
+    if (errorFields != null ? !errorFields.equals(that.errorFields) : that.errorFields != null)
+      return false;
+    return rawMessages != null ? rawMessages.equals(that.rawMessages) : that.rawMessages == null;
+
+  }
+
+  @Override
+  public int hashCode() {
+    int result = message != null ? message.hashCode() : 0;
+    result = 31 * result + (throwable != null ? throwable.hashCode() : 0);
+    result = 31 * result + (sensorType != null ? sensorType.hashCode() : 0);
+    result = 31 * result + (errorType != null ? errorType.hashCode() : 0);
+    result = 31 * result + (errorFields != null ? errorFields.hashCode() : 0);
+    result = 31 * result + (rawMessages != null ? rawMessages.hashCode() : 0);
+    return result;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/134a2331/metron-platform/metron-common/src/main/java/org/apache/metron/common/message/BytesFromPosition.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/message/BytesFromPosition.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/message/BytesFromPosition.java
new file mode 100644
index 0000000..b73228f
--- /dev/null
+++ b/metron-platform/metron-common/src/main/java/org/apache/metron/common/message/BytesFromPosition.java
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.common.message;
+
+import org.apache.storm.tuple.Tuple;
+
+public class BytesFromPosition implements MessageGetStrategy {
+
+  private int position = 0;
+
+  public BytesFromPosition() {};
+
+  public BytesFromPosition(int position) {
+    this.position = position;
+  }
+
+  @Override
+  public byte[] get(Tuple tuple) {
+    return tuple.getBinary(position);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/134a2331/metron-platform/metron-common/src/main/java/org/apache/metron/common/message/JSONFromField.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/message/JSONFromField.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/message/JSONFromField.java
new file mode 100644
index 0000000..39fe9dd
--- /dev/null
+++ b/metron-platform/metron-common/src/main/java/org/apache/metron/common/message/JSONFromField.java
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.common.message;
+
+import org.apache.storm.tuple.Tuple;
+import org.json.simple.JSONObject;
+
+public class JSONFromField implements MessageGetStrategy {
+
+  private String fieldValue = "message";
+
+  public JSONFromField() {};
+
+  public JSONFromField(String fieldValue) {
+    this.fieldValue = fieldValue;
+  }
+
+  @Override
+  public JSONObject get(Tuple tuple) {
+    return (JSONObject) ((JSONObject) tuple.getValueByField(fieldValue)).clone();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/134a2331/metron-platform/metron-common/src/main/java/org/apache/metron/common/message/JSONFromPosition.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/message/JSONFromPosition.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/message/JSONFromPosition.java
new file mode 100644
index 0000000..4407d4f
--- /dev/null
+++ b/metron-platform/metron-common/src/main/java/org/apache/metron/common/message/JSONFromPosition.java
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.common.message;
+
+import org.apache.storm.tuple.Tuple;
+import org.json.simple.JSONObject;
+import org.json.simple.parser.JSONParser;
+
+public class JSONFromPosition implements MessageGetStrategy {
+
+  private int position = 0;
+
+  private ThreadLocal<JSONParser> parser = new ThreadLocal<JSONParser>() {
+    @Override
+    protected JSONParser initialValue() {
+      return new JSONParser();
+    }
+  };
+
+  public JSONFromPosition() {};
+
+  public JSONFromPosition(int position) {
+    this.position = position;
+  }
+
+  @Override
+  public JSONObject get(Tuple tuple) {
+    try {
+      return (JSONObject) parser.get().parse(new String(tuple.getBinary(position), "UTF8"));
+    } catch (Exception e) {
+      throw new IllegalStateException(e.getMessage(), e);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/134a2331/metron-platform/metron-common/src/main/java/org/apache/metron/common/message/MessageGetStrategy.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/message/MessageGetStrategy.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/message/MessageGetStrategy.java
new file mode 100644
index 0000000..0595ce1
--- /dev/null
+++ b/metron-platform/metron-common/src/main/java/org/apache/metron/common/message/MessageGetStrategy.java
@@ -0,0 +1,25 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.common.message;
+
+import org.apache.storm.tuple.Tuple;
+
+public interface MessageGetStrategy {
+
+  Object get(Tuple tuple);
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/134a2331/metron-platform/metron-common/src/main/java/org/apache/metron/common/message/MessageGetters.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/message/MessageGetters.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/message/MessageGetters.java
new file mode 100644
index 0000000..a496e08
--- /dev/null
+++ b/metron-platform/metron-common/src/main/java/org/apache/metron/common/message/MessageGetters.java
@@ -0,0 +1,68 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.common.message;
+
+import org.apache.metron.common.utils.ConversionUtils;
+
+import java.util.function.Function;
+
+/**
+ * MessageGetters is a convenience enum for looking up the various implementations of MessageGetStrategy. The MessageGetStrategy
+ * abstraction returns a value from a tuple.  The implementations include:
+ * <ul>
+ *   <li>BYTES_FROM_POSITION - gets a byte array from the provided position</li>
+ *   <li>JSON_FROM_POSITION - gets a byte array from the provided position then converts to a string and parses the string to JSON</li>
+ *   <li>JSON_FROM_FIELD - gets a JSONObject from the provided field</li>
+ *   <li>OBJECT_FROM_FIELD - gets an Object from the provided field</li>
+ *   <li>DEFAULT_BYTES_FROM_POSITION - gets a byte array from position 0</li>
+ *   <li>DEFAULT_JSON_FROM_POSITION - gets a byte array from position 0 then converts to a string and parses the string to JSON</li>
+ *   <li>DEFAULT_JSON_FROM_FIELD - gets a JSONObject from the "message" field</li>
+ *   <li>DEFAULT_OBJECT_FROM_FIELD - gets an Object from the "message" field</li>
+ * </ul>
+ *
+ */
+public enum MessageGetters {
+
+  BYTES_FROM_POSITION((String arg) -> new BytesFromPosition(ConversionUtils.convert(arg, Integer.class))),
+  JSON_FROM_POSITION((String arg) -> new JSONFromPosition(ConversionUtils.convert(arg, Integer.class))),
+  JSON_FROM_FIELD((String arg) -> new JSONFromField(arg)),
+  OBJECT_FROM_FIELD((String arg) -> new ObjectFromField(arg)),
+  DEFAULT_BYTES_FROM_POSITION(new BytesFromPosition()),
+  DEFAULT_JSON_FROM_POSITION(new JSONFromPosition()),
+  DEFAULT_JSON_FROM_FIELD(new JSONFromField()),
+  DEFAULT_OBJECT_FROM_FIELD(new ObjectFromField());
+
+  Function<String, MessageGetStrategy> messageGetStrategyFunction;
+  MessageGetStrategy messageGetStrategy;
+
+  MessageGetters(MessageGetStrategy messageGetStrategy) {
+    this.messageGetStrategy = messageGetStrategy;
+  }
+
+  MessageGetters(Function<String, MessageGetStrategy> messageGetStrategy) {
+    this.messageGetStrategyFunction = messageGetStrategy;
+  }
+
+  public MessageGetStrategy get(String arg) {
+    return messageGetStrategyFunction.apply(arg);
+  }
+
+  public MessageGetStrategy get() {
+    return messageGetStrategy;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/134a2331/metron-platform/metron-common/src/main/java/org/apache/metron/common/message/ObjectFromField.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/message/ObjectFromField.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/message/ObjectFromField.java
new file mode 100644
index 0000000..120c09c
--- /dev/null
+++ b/metron-platform/metron-common/src/main/java/org/apache/metron/common/message/ObjectFromField.java
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.common.message;
+
+import org.apache.storm.tuple.Tuple;
+
+public class ObjectFromField implements MessageGetStrategy {
+
+  private String fieldValue = "message";
+
+  public ObjectFromField() {};
+
+  public ObjectFromField(String fieldValue) {
+    this.fieldValue = fieldValue;
+  }
+
+  @Override
+  public Object get(Tuple tuple) {
+    return tuple.getValueByField(fieldValue);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/134a2331/metron-platform/metron-common/src/main/java/org/apache/metron/common/utils/ErrorUtils.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/utils/ErrorUtils.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/utils/ErrorUtils.java
index 3b3f426..f4e3a8d 100644
--- a/metron-platform/metron-common/src/main/java/org/apache/metron/common/utils/ErrorUtils.java
+++ b/metron-platform/metron-common/src/main/java/org/apache/metron/common/utils/ErrorUtils.java
@@ -17,30 +17,17 @@
  */
 package org.apache.metron.common.utils;
 
-import java.net.InetAddress;
-import java.net.UnknownHostException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-import java.util.Optional;
-
-import org.apache.storm.task.OutputCollector;
-import org.apache.storm.tuple.Values;
-import org.apache.commons.beanutils.Converter;
-import org.apache.commons.lang.exception.ExceptionUtils;
-import org.apache.commons.lang3.ArrayUtils;
-import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.metron.common.Constants;
-import org.json.simple.JSONObject;
+import org.apache.metron.common.error.MetronError;
+import org.apache.storm.task.OutputCollector;
+import org.apache.storm.tuple.Values;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.lang.management.ManagementFactory;
 import java.lang.management.ThreadInfo;
 import java.lang.management.ThreadMXBean;
-import java.net.InetAddress;
-import java.net.UnknownHostException;
 import java.util.Optional;
 import java.util.function.Function;
 
@@ -102,74 +89,16 @@ public class ErrorUtils {
     }
   }
 
-  @SuppressWarnings("unchecked")
-  public static JSONObject generateErrorMessage(String message, Throwable t)
+  public static void handleError(OutputCollector collector, MetronError error)
   {
-    return generateErrorMessage(message, t, Optional.empty(), Optional.empty());
-  }
-  public static JSONObject generateErrorMessage(String message
-                                               , Throwable t
-                                               , Optional<String> sensorType
-                                               , Optional<Object> rawMessage
-                                               )
-  {
-    JSONObject error_message = new JSONObject();
-		
-		/*
-     * Save full stack trace in object.
-		 */
-    String stackTrace = ExceptionUtils.getStackTrace(t);
-
-    String exception = t.toString();
-
-
-    error_message.put("time", System.currentTimeMillis());
-    try {
-      error_message.put("hostname", InetAddress.getLocalHost().getHostName());
-    } catch (UnknownHostException ex) {
-
+    collector.emit(Constants.ERROR_STREAM, new Values(error.getJSONObject()));
+    Optional<Throwable> throwable = error.getThrowable();
+    if (throwable.isPresent()) {
+      collector.reportError(throwable.get());
     }
-    if(rawMessage.isPresent()) {
-      if(rawMessage.get() instanceof byte[]) {
-        error_message.put("rawMessage", Bytes.toString((byte[])rawMessage.get()));
-        error_message.put("rawMessage_bytes", toByteArrayList((byte[])rawMessage.get()));
-      }
-      else {
-        error_message.put("rawMessage", rawMessage.get());
-      }
-    }
-    error_message.put("message", message);
-    error_message.put(Constants.SENSOR_TYPE, StringUtils.join("_", sensorType, Optional.of("error")));
-    error_message.put("exception", exception);
-    error_message.put("stack", stackTrace);
 
-    return error_message;
-  }
-
-  private static List<Byte> toByteArrayList(byte[] list) {
-    List<Byte> ret = new ArrayList<>();
-    for(byte b : list) {
-      ret.add(b);
-    }
-    return ret;
   }
 
-  public static void handleError(OutputCollector collector, Throwable t, String errorStream) {
-    handleError(collector, t, errorStream, Optional.empty(), Optional.empty());
-  }
-  public static void handleError(OutputCollector collector
-                                , Throwable t
-                                , String errorStream
-                                , Optional<String> sensorType
-                                , Optional<Object> rawMessage
-                                )
-  {
-    JSONObject error = ErrorUtils.generateErrorMessage(t.getMessage(), t, sensorType, rawMessage);
-    collector.emit(errorStream, new Values(error));
-    collector.reportError(t);
-  }
-
-
 	public static String generateThreadDump() {
 		final StringBuilder dump = new StringBuilder();
 		final ThreadMXBean threadMXBean = ManagementFactory.getThreadMXBean();

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/134a2331/metron-platform/metron-common/src/main/java/org/apache/metron/common/utils/HashUtils.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/utils/HashUtils.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/utils/HashUtils.java
new file mode 100644
index 0000000..b5170ce
--- /dev/null
+++ b/metron-platform/metron-common/src/main/java/org/apache/metron/common/utils/HashUtils.java
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.common.utils;
+
+import org.apache.commons.codec.digest.DigestUtils;
+import org.json.simple.JSONObject;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+
+public class HashUtils {
+
+  public static String getMessageHash(JSONObject message, Collection<String> hashFields) {
+    List<String> hashElements = hashFields.stream().map(errorField ->
+            String.format("%s-%s", errorField, message.get(errorField))).collect(Collectors.toList());
+    return DigestUtils.sha256Hex(String.join("|", hashElements).getBytes(UTF_8));
+  }
+
+  public static String getMessageHash(JSONObject message) {
+    return DigestUtils.sha256Hex(message.toJSONString().getBytes(UTF_8));
+  }
+
+  public static String getMessageHash(byte[] bytes) {
+    return DigestUtils.sha256Hex(bytes);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/134a2331/metron-platform/metron-common/src/test/java/org/apache/metron/common/error/MetronErrorTest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-common/src/test/java/org/apache/metron/common/error/MetronErrorTest.java b/metron-platform/metron-common/src/test/java/org/apache/metron/common/error/MetronErrorTest.java
new file mode 100644
index 0000000..5e505a8
--- /dev/null
+++ b/metron-platform/metron-common/src/test/java/org/apache/metron/common/error/MetronErrorTest.java
@@ -0,0 +1,109 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.common.error;
+
+import com.google.common.collect.Sets;
+import com.google.common.primitives.Bytes;
+import org.apache.metron.common.Constants;
+import org.json.simple.JSONObject;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.Arrays;
+
+import static org.apache.metron.common.Constants.ErrorFields.RAW_MESSAGE;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+public class MetronErrorTest {
+
+  private JSONObject message1 = new JSONObject();
+  private JSONObject message2 = new JSONObject();
+
+  @Before
+  public void setup() {
+    message1.put("value", "message1");
+    message2.put("value", "message2");
+  }
+
+  @Test
+  public void getJSONObjectShouldReturnBasicInformation() {
+    MetronError error = new MetronError()
+            .withMessage("test message")
+            .withErrorType(Constants.ErrorType.PARSER_ERROR)
+            .withSensorType("sensorType");
+
+    JSONObject errorJSON = error.getJSONObject();
+    assertEquals("test message", errorJSON.get(Constants.ErrorFields.MESSAGE.getName()));
+    assertEquals(Constants.ErrorType.PARSER_ERROR.getType(), errorJSON.get(Constants.ErrorFields.ERROR_TYPE.getName()));
+    assertEquals("error", errorJSON.get(Constants.SENSOR_TYPE));
+    assertEquals("sensorType", errorJSON.get(Constants.ErrorFields.FAILED_SENSOR_TYPE.getName()));
+    assertTrue(((String) errorJSON.get(Constants.ErrorFields.HOSTNAME.getName())).length() > 0);
+    assertTrue(((long) errorJSON.get(Constants.ErrorFields.TIMESTAMP.getName())) > 0);
+  }
+
+  @Test
+  public void getJSONObjectShouldHandleThrowable() {
+    Throwable e = new Exception("test exception");
+    MetronError error = new MetronError().withThrowable(e);
+
+    JSONObject errorJSON = error.getJSONObject();
+    assertEquals("java.lang.Exception: test exception", errorJSON.get(Constants.ErrorFields.EXCEPTION.getName()));
+    assertTrue(((String) errorJSON.get(Constants.ErrorFields.STACK.getName())).startsWith("java.lang.Exception: test exception"));
+    assertEquals(e.getMessage(), errorJSON.get(Constants.ErrorFields.MESSAGE.getName()));
+  }
+
+  @Test
+  public void getJSONObjectShouldIncludeRawMessages() {
+    JSONObject message1 = new JSONObject();
+    JSONObject message2 = new JSONObject();
+    message1.put("value", "message1");
+    message2.put("value", "message2");
+    MetronError error = new MetronError().withRawMessages(Arrays.asList(message1, message2));
+
+    JSONObject errorJSON = error.getJSONObject();
+
+    assertEquals("{\"value\":\"message1\"}", errorJSON.get(Constants.ErrorFields.RAW_MESSAGE.getName() + "_0"));
+    assertEquals("{\"value\":\"message2\"}", errorJSON.get(Constants.ErrorFields.RAW_MESSAGE.getName() + "_1"));
+
+    error = new MetronError().addRawMessage("raw message".getBytes());
+    errorJSON = error.getJSONObject();
+    assertEquals("raw message", errorJSON.get(Constants.ErrorFields.RAW_MESSAGE.getName()));
+    // It's unclear if we need a rawMessageBytes field so commenting out for now
+    //assertEquals(Bytes.asList("raw message".getBytes()), errorJSON.get(Constants.ErrorFields.RAW_MESSAGE_BYTES.getName()));
+    assertEquals("3b02cb29676bc448c69da1ec5eef7c89f4d6dc6a5a7ce0296ea25b207eea36be", errorJSON.get(Constants.ErrorFields.ERROR_HASH.getName()));
+
+    error = new MetronError().addRawMessage(message1);
+    errorJSON = error.getJSONObject();
+    assertEquals("{\"value\":\"message1\"}", errorJSON.get(Constants.ErrorFields.RAW_MESSAGE.getName()));
+    assertEquals("e8aaf87c8494d345aac2d612ffd94fcf0b98c975fe6c4b991e2f8280a3a0bd10", errorJSON.get(Constants.ErrorFields.ERROR_HASH.getName()));
+  }
+
+  @Test
+  public void getJSONObjectShouldIncludeErrorFields() {
+    JSONObject message = new JSONObject();
+    message.put("field1", "value1");
+    message.put("field2", "value2");
+
+    MetronError error = new MetronError().addRawMessage(message).withErrorFields(Sets.newHashSet("field1", "field2"));
+
+    JSONObject errorJSON = error.getJSONObject();
+    assertEquals(Sets.newHashSet("field1", "field2"), Sets.newHashSet(((String) errorJSON.get(Constants.ErrorFields.ERROR_FIELDS.getName())).split(",")));
+    assertEquals("04a2629c39e098c3944be85f35c75876598f2b44b8e5e3f52c59fa1ac182817c", errorJSON.get(Constants.ErrorFields.ERROR_HASH.getName()));
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/134a2331/metron-platform/metron-common/src/test/java/org/apache/metron/common/message/MessageGettersTest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-common/src/test/java/org/apache/metron/common/message/MessageGettersTest.java b/metron-platform/metron-common/src/test/java/org/apache/metron/common/message/MessageGettersTest.java
new file mode 100644
index 0000000..ea7583a
--- /dev/null
+++ b/metron-platform/metron-common/src/test/java/org/apache/metron/common/message/MessageGettersTest.java
@@ -0,0 +1,134 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.common.message;
+
+import org.apache.storm.tuple.Tuple;
+import org.json.simple.JSONObject;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+public class MessageGettersTest {
+
+  @Rule
+  public final ExpectedException exception = ExpectedException.none();
+
+  @Test
+  public void bytesFromPositionShouldReturnBytes() {
+    Tuple tuple = mock(Tuple.class);
+    when(tuple.getBinary(1)).thenReturn("bytes".getBytes(UTF_8));
+
+    MessageGetStrategy messageGetStrategy = MessageGetters.BYTES_FROM_POSITION.get("1");
+    assertEquals("bytes", new String((byte[]) messageGetStrategy.get(tuple), UTF_8));
+  }
+
+  @Test
+  public void jsonFromPositionShouldReturnJSON() {
+    Tuple tuple = mock(Tuple.class);
+    when(tuple.getBinary(1)).thenReturn("{\"field\":\"value\"}".getBytes(UTF_8));
+
+    JSONObject expected = new JSONObject();
+    expected.put("field", "value");
+    MessageGetStrategy messageGetStrategy = MessageGetters.JSON_FROM_POSITION.get("1");
+    assertEquals(expected, messageGetStrategy.get(tuple));
+  }
+
+  @Test
+  public void jsonFromPositionShouldThrowException() {
+    exception.expect(IllegalStateException.class);
+
+    Tuple tuple = mock(Tuple.class);
+    when(tuple.getBinary(1)).thenReturn("{\"field\":".getBytes(UTF_8));
+
+    MessageGetStrategy messageGetStrategy = MessageGetters.JSON_FROM_POSITION.get("1");
+    messageGetStrategy.get(tuple);
+  }
+
+  @Test
+  public void jsonFromFieldShouldReturnJSON() {
+    JSONObject actual = new JSONObject();
+    actual.put("field", "value");
+    Tuple tuple = mock(Tuple.class);
+    when(tuple.getValueByField("tuple_field")).thenReturn(actual);
+
+    JSONObject expected = new JSONObject();
+    expected.put("field", "value");
+    MessageGetStrategy messageGetStrategy = MessageGetters.JSON_FROM_FIELD.get("tuple_field");
+    assertEquals(expected, messageGetStrategy.get(tuple));
+  }
+
+  @Test
+  public void objectFromFieldShouldReturnObject() {
+    Object actual = "object";
+    Tuple tuple = mock(Tuple.class);
+    when(tuple.getValueByField("tuple_field")).thenReturn(actual);
+
+    Object expected = "object";
+    MessageGetStrategy messageGetStrategy = MessageGetters.OBJECT_FROM_FIELD.get("tuple_field");
+    assertEquals(expected, messageGetStrategy.get(tuple));
+  }
+
+  @Test
+  public void defaultBytesFromPositionShouldReturnBytes() {
+    Tuple tuple = mock(Tuple.class);
+    when(tuple.getBinary(0)).thenReturn("bytes".getBytes(UTF_8));
+
+    MessageGetStrategy messageGetStrategy = MessageGetters.DEFAULT_BYTES_FROM_POSITION.get();
+    assertEquals("bytes", new String((byte[]) messageGetStrategy.get(tuple), UTF_8));
+  }
+
+  @Test
+  public void defaultJSONFromPositionShouldReturnJSON() {
+    Tuple tuple = mock(Tuple.class);
+    when(tuple.getBinary(0)).thenReturn("{\"field\":\"value\"}".getBytes(UTF_8));
+
+    JSONObject expected = new JSONObject();
+    expected.put("field", "value");
+    MessageGetStrategy messageGetStrategy = MessageGetters.DEFAULT_JSON_FROM_POSITION.get();
+    assertEquals(expected, messageGetStrategy.get(tuple));
+  }
+
+  @Test
+  public void defaultJSONFromFieldShouldReturnJSON() {
+    JSONObject actual = new JSONObject();
+    actual.put("field", "value");
+    Tuple tuple = mock(Tuple.class);
+    when(tuple.getValueByField("message")).thenReturn(actual);
+
+    JSONObject expected = new JSONObject();
+    expected.put("field", "value");
+    MessageGetStrategy messageGetStrategy = MessageGetters.DEFAULT_JSON_FROM_FIELD.get();
+    assertEquals(expected, messageGetStrategy.get(tuple));
+  }
+
+  @Test
+  public void defaultObjectFromFieldShouldReturnObject() {
+    Object actual = "object";
+    Tuple tuple = mock(Tuple.class);
+    when(tuple.getValueByField("message")).thenReturn(actual);
+
+    Object expected = "object";
+    MessageGetStrategy messageGetStrategy = MessageGetters.DEFAULT_OBJECT_FROM_FIELD.get();
+    assertEquals(expected, messageGetStrategy.get(tuple));
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/134a2331/metron-platform/metron-common/src/test/java/org/apache/metron/common/utils/ErrorUtilsTest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-common/src/test/java/org/apache/metron/common/utils/ErrorUtilsTest.java b/metron-platform/metron-common/src/test/java/org/apache/metron/common/utils/ErrorUtilsTest.java
index f11a5c9..77ea9da 100644
--- a/metron-platform/metron-common/src/test/java/org/apache/metron/common/utils/ErrorUtilsTest.java
+++ b/metron-platform/metron-common/src/test/java/org/apache/metron/common/utils/ErrorUtilsTest.java
@@ -17,6 +17,10 @@
  */
 package org.apache.metron.common.utils;
 
+import org.apache.metron.common.Constants;
+import org.apache.metron.common.error.MetronError;
+import org.apache.metron.test.error.MetronErrorJSONMatcher;
+import org.apache.storm.task.OutputCollector;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.ExpectedException;
@@ -25,6 +29,12 @@ import java.io.IOException;
 
 import static org.hamcrest.CoreMatchers.instanceOf;
 import static org.hamcrest.CoreMatchers.nullValue;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.argThat;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
 
 public class ErrorUtilsTest {
 
@@ -62,4 +72,15 @@ public class ErrorUtilsTest {
     exception.expectCause(instanceOf(IOException.class));
     ErrorUtils.RuntimeErrors.ILLEGAL_STATE.throwRuntime("illegal state happened", new IOException("bad io"));
   }
+
+  @Test
+  public void handleErrorShouldEmitAndReportError() throws Exception {
+    Throwable e = new Exception("error");
+    MetronError error = new MetronError().withMessage("error message").withThrowable(e);
+    OutputCollector collector = mock(OutputCollector.class);
+
+    ErrorUtils.handleError(collector, error);
+    verify(collector, times(1)).emit(eq(Constants.ERROR_STREAM), argThat(new MetronErrorJSONMatcher(error.getJSONObject())));
+    verify(collector, times(1)).reportError(any());
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/134a2331/metron-platform/metron-common/src/test/java/org/apache/metron/common/utils/HashUtilsTest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-common/src/test/java/org/apache/metron/common/utils/HashUtilsTest.java b/metron-platform/metron-common/src/test/java/org/apache/metron/common/utils/HashUtilsTest.java
new file mode 100644
index 0000000..3037341
--- /dev/null
+++ b/metron-platform/metron-common/src/test/java/org/apache/metron/common/utils/HashUtilsTest.java
@@ -0,0 +1,55 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.common.utils;
+
+import org.json.simple.JSONObject;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.Collection;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static org.junit.Assert.assertEquals;
+
+@SuppressWarnings({"unchecked"})
+public class HashUtilsTest {
+
+  @Test
+  public void getMessageHashShouldReturnHashForHashFields() {
+    JSONObject message = new JSONObject();
+    message.put("field1", "value1");
+    message.put("field2", "value2");
+    message.put("field3", "value3");
+    Collection<String> fields = Arrays.asList("field2", "field3");
+    assertEquals("6eab1c2c827387803ce457c76552f0511858fc1f9505c7dc620e198c0d1f4d02", HashUtils.getMessageHash(message, fields));
+  }
+
+  @Test
+  public void getMessageHashShouldReturnHashForMessage() {
+    JSONObject message = new JSONObject();
+    message.put("field1", "value1");
+    message.put("field2", "value2");
+    message.put("field3", "value3");
+    assertEquals("a76cdafc5aa49180c0b22c78d4415c505f9997c54847cec6c623f4cacf6a2811", HashUtils.getMessageHash(message));
+  }
+
+  @Test
+  public void getMessageHashShouldReturnHashForBytes() {
+    assertEquals("ab530a13e45914982b79f9b7e3fba994cfd1f3fb22f71cea1afbf02b460c6d1d", HashUtils.getMessageHash("message".getBytes(UTF_8)));
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/134a2331/metron-platform/metron-elasticsearch/src/main/config/elasticsearch.properties
----------------------------------------------------------------------
diff --git a/metron-platform/metron-elasticsearch/src/main/config/elasticsearch.properties b/metron-platform/metron-elasticsearch/src/main/config/elasticsearch.properties
index c2c10af..27e9173 100644
--- a/metron-platform/metron-elasticsearch/src/main/config/elasticsearch.properties
+++ b/metron-platform/metron-elasticsearch/src/main/config/elasticsearch.properties
@@ -26,7 +26,7 @@ kafka.start=WHERE_I_LEFT_OFF
 
 ##### Indexing #####
 index.input.topic=indexing
-index.error.topic=indexing_error
+index.error.topic=indexing
 writer.class.name=org.apache.metron.elasticsearch.writer.ElasticsearchWriter
 
 ##### ElasticSearch #####

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/134a2331/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/integration/ElasticsearchIndexingIntegrationTest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/integration/ElasticsearchIndexingIntegrationTest.java b/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/integration/ElasticsearchIndexingIntegrationTest.java
index acc1565..87c0081 100644
--- a/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/integration/ElasticsearchIndexingIntegrationTest.java
+++ b/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/integration/ElasticsearchIndexingIntegrationTest.java
@@ -17,8 +17,8 @@
  */
 package org.apache.metron.elasticsearch.integration;
 
-import org.apache.metron.common.Constants;
 import org.apache.metron.common.interfaces.FieldNameConverter;
+import org.apache.metron.elasticsearch.integration.components.ElasticSearchComponent;
 import org.apache.metron.elasticsearch.writer.ElasticsearchFieldNameConverter;
 import org.apache.metron.indexing.integration.IndexingIntegrationTest;
 import org.apache.metron.integration.ComponentRunner;
@@ -26,7 +26,6 @@ import org.apache.metron.integration.InMemoryComponent;
 import org.apache.metron.integration.Processor;
 import org.apache.metron.integration.ProcessorResult;
 import org.apache.metron.integration.ReadinessState;
-import org.apache.metron.elasticsearch.integration.components.ElasticSearchComponent;
 import org.apache.metron.integration.components.KafkaComponent;
 
 import java.io.File;
@@ -76,7 +75,7 @@ public class ElasticsearchIndexingIntegrationTest extends IndexingIntegrationTes
             throw new IllegalStateException("Unable to retrieve indexed documents.", e);
           }
           if (docs.size() < inputMessages.size() || docs.size() != docsFromDisk.size()) {
-            errors = kafkaComponent.readMessages(Constants.INDEXING_ERROR_TOPIC);
+            errors = kafkaComponent.readMessages(ERROR_TOPIC);
             if(errors.size() > 0){
               return ReadinessState.READY;
             }

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/134a2331/metron-platform/metron-enrichment/src/main/config/enrichment.properties
----------------------------------------------------------------------
diff --git a/metron-platform/metron-enrichment/src/main/config/enrichment.properties b/metron-platform/metron-enrichment/src/main/config/enrichment.properties
index 84d8461..c905d30 100644
--- a/metron-platform/metron-enrichment/src/main/config/enrichment.properties
+++ b/metron-platform/metron-enrichment/src/main/config/enrichment.properties
@@ -20,8 +20,8 @@
 kafka.zk=node1:2181
 kafka.broker=node1:6667
 enrichment.output.topic=indexing
-enrichment.error.topic=enrichments_error
-threat.intel.error.topic=threatintel_error
+enrichment.error.topic=indexing
+threat.intel.error.topic=indexing
 
 ##### Metrics #####
 
@@ -71,7 +71,11 @@ bolt.hbase.partitioner.region.info.refresh.interval.mins=60
 
 ##### Threat Intel #####
 
-threat.intel.tracker.table=
-threat.intel.tracker.cf=
+threat.intel.tracker.table=access_tracker
+threat.intel.tracker.cf=t
 threat.intel.ip.table=
 threat.intel.ip.cf=
+threat.intel.simple.hbase.table=threatintel
+threat.intel.simple.hbase.cf=t
+enrichment.simple.hbase.table=enrichment
+enrichment.simple.hbase.cf=t

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/134a2331/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/bolt/GenericEnrichmentBolt.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/bolt/GenericEnrichmentBolt.java b/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/bolt/GenericEnrichmentBolt.java
index bb77b84..d6f1304 100644
--- a/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/bolt/GenericEnrichmentBolt.java
+++ b/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/bolt/GenericEnrichmentBolt.java
@@ -18,6 +18,7 @@
 
 package org.apache.metron.enrichment.bolt;
 
+import org.apache.metron.common.error.MetronError;
 import org.apache.storm.task.OutputCollector;
 import org.apache.storm.task.TopologyContext;
 import org.apache.storm.topology.OutputFieldsDeclarer;
@@ -41,7 +42,9 @@ import org.json.simple.JSONObject;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.HashSet;
 import java.util.Map;
+import java.util.Optional;
 import java.util.concurrent.TimeUnit;
 
 /**
@@ -225,6 +228,12 @@ public class GenericEnrichmentBolt extends ConfiguredEnrichmentBolt {
             catch(Exception e) {
               LOG.error(e.getMessage(), e);
               error = true;
+              MetronError metronError = new MetronError()
+                      .withErrorType(Constants.ErrorType.ENRICHMENT_ERROR)
+                      .withThrowable(e)
+                      .withErrorFields(new HashSet() {{ add(field); }})
+                      .addRawMessage(rawMessage);
+              ErrorUtils.handleError(collector, metronError);
               continue;
             }
           }
@@ -257,11 +266,14 @@ public class GenericEnrichmentBolt extends ConfiguredEnrichmentBolt {
   // errors, so this is made available in order to ensure ERROR_STREAM is output properly.
   protected void handleError(String key, JSONObject rawMessage, String subGroup, JSONObject enrichedMessage, Exception e) {
     LOG.error("[Metron] Unable to enrich message: " + rawMessage, e);
-    JSONObject error = ErrorUtils.generateErrorMessage("Enrichment problem: " + rawMessage, e);
     if (key != null) {
       collector.emit(enrichmentType, new Values(key, enrichedMessage, subGroup));
     }
-    collector.emit(ERROR_STREAM, new Values(error));
+    MetronError error = new MetronError()
+            .withErrorType(Constants.ErrorType.ENRICHMENT_ERROR)
+            .withThrowable(e)
+            .addRawMessage(rawMessage);
+    ErrorUtils.handleError(collector, error);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/134a2331/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/bolt/JoinBolt.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/bolt/JoinBolt.java b/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/bolt/JoinBolt.java
index 101d056..3bbb3f5 100644
--- a/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/bolt/JoinBolt.java
+++ b/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/bolt/JoinBolt.java
@@ -17,27 +17,29 @@
  */
 package org.apache.metron.enrichment.bolt;
 
-import org.apache.storm.task.OutputCollector;
-import org.apache.storm.task.TopologyContext;
-import org.apache.storm.topology.OutputFieldsDeclarer;
-import org.apache.storm.tuple.Fields;
-import org.apache.storm.tuple.Tuple;
-import org.apache.storm.tuple.Values;
 import com.google.common.base.Joiner;
 import com.google.common.cache.CacheBuilder;
 import com.google.common.cache.CacheLoader;
 import com.google.common.cache.LoadingCache;
 import com.google.common.collect.Sets;
-import org.apache.metron.common.bolt.ConfiguredBolt;
-import org.apache.metron.common.utils.ErrorUtils;
-import org.json.simple.JSONObject;
+import org.apache.metron.common.Constants;
 import org.apache.metron.common.bolt.ConfiguredEnrichmentBolt;
+import org.apache.metron.common.error.MetronError;
+import org.apache.metron.common.message.MessageGetStrategy;
+import org.apache.metron.common.message.MessageGetters;
+import org.apache.metron.common.utils.ErrorUtils;
+import org.apache.storm.task.OutputCollector;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.Tuple;
+import org.apache.storm.tuple.Values;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Set;
-import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 
 public abstract class JoinBolt<V> extends ConfiguredEnrichmentBolt {
@@ -48,6 +50,9 @@ public abstract class JoinBolt<V> extends ConfiguredEnrichmentBolt {
 
   protected transient CacheLoader<String, Map<String, V>> loader;
   protected transient LoadingCache<String, Map<String, V>> cache;
+  private transient MessageGetStrategy keyGetStrategy;
+  private transient MessageGetStrategy subgroupGetStrategy;
+  private transient MessageGetStrategy messageGetStrategy;
   protected Long maxCacheSize;
   protected Long maxTimeRetain;
 
@@ -68,6 +73,9 @@ public abstract class JoinBolt<V> extends ConfiguredEnrichmentBolt {
   @Override
   public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {
     super.prepare(map, topologyContext, outputCollector);
+    keyGetStrategy = MessageGetters.OBJECT_FROM_FIELD.get("key");
+    subgroupGetStrategy = MessageGetters.OBJECT_FROM_FIELD.get("subgroup");
+    messageGetStrategy = MessageGetters.OBJECT_FROM_FIELD.get("message");
     this.collector = outputCollector;
     if (this.maxCacheSize == null) {
       throw new IllegalStateException("maxCacheSize must be specified");
@@ -91,10 +99,10 @@ public abstract class JoinBolt<V> extends ConfiguredEnrichmentBolt {
   @Override
   public void execute(Tuple tuple) {
     String streamId = tuple.getSourceStreamId();
-    String key = (String) tuple.getValueByField("key");
-    String subgroup = (String) tuple.getValueByField("subgroup");
+    String key = (String) keyGetStrategy.get(tuple);
+    String subgroup = (String) subgroupGetStrategy.get(tuple);
     streamId = Joiner.on(":").join("" + streamId, subgroup == null?"":subgroup);
-    V message = (V) tuple.getValueByField("message");
+    V message = (V) messageGetStrategy.get(tuple);
     try {
       Map<String, V> streamMessageMap = cache.get(key);
       if (streamMessageMap.containsKey(streamId)) {
@@ -127,10 +135,13 @@ public abstract class JoinBolt<V> extends ConfiguredEnrichmentBolt {
       }
     } catch (Exception e) {
       LOG.error("[Metron] Unable to join messages: " + message, e);
-      JSONObject error = ErrorUtils.generateErrorMessage("Joining problem: " + message, e);
+      MetronError error = new MetronError()
+              .withErrorType(Constants.ErrorType.ENRICHMENT_ERROR)
+              .withMessage("Joining problem: " + message)
+              .withThrowable(e)
+              .addRawMessage(message);
+      ErrorUtils.handleError(collector, error);
       collector.ack(tuple);
-      collector.emit("error", new Values(error));
-      collector.reportError(e);
     }
   }