You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@metron.apache.org by ce...@apache.org on 2016/03/14 14:15:40 UTC

incubator-metron git commit: METRON-66 Fix Grok parser to load patterns via HDFS closes apache/incubator-metron#44

Repository: incubator-metron
Updated Branches:
  refs/heads/master 2da0367c0 -> f32af0165


METRON-66 Fix Grok parser to load patterns via HDFS closes apache/incubator-metron#44


Project: http://git-wip-us.apache.org/repos/asf/incubator-metron/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-metron/commit/f32af016
Tree: http://git-wip-us.apache.org/repos/asf/incubator-metron/tree/f32af016
Diff: http://git-wip-us.apache.org/repos/asf/incubator-metron/diff/f32af016

Branch: refs/heads/master
Commit: f32af01653af4faff71ee6b8607fe06f7cb61d7a
Parents: 2da0367
Author: cestella <ce...@gmail.com>
Authored: Mon Mar 14 09:15:41 2016 -0400
Committer: cstella <ce...@gmail.com>
Committed: Mon Mar 14 09:15:41 2016 -0400

----------------------------------------------------------------------
 .gitignore                                      |  1 +
 deployment/amazon-ec2/conf/defaults.yml         |  4 --
 deployment/roles/hadoop_setup/defaults/main.yml |  2 +-
 .../roles/metron_pcapservice/tasks/main.yml     |  2 +-
 .../roles/metron_streaming/defaults/main.yml    |  2 +-
 .../roles/metron_streaming/tasks/main.yml       | 22 +++---
 deployment/roles/metron_streaming/vars/main.yml |  2 +
 deployment/roles/yaf/defaults/main.yml          |  2 +-
 .../metron/bolt/BulkMessageWriterBolt.java      |  2 +-
 .../src/main/bash/latency_summarizer.sh         | 32 ---------
 .../src/main/bash/start_topology.sh             | 22 ------
 .../src/main/bash/zk_load_configs.sh            | 33 ---------
 .../parsing/parsers/BasicSnortParser.java       | 16 ++---
 .../metron/parsing/parsers/GrokParser.java      | 38 ++++++++--
 .../src/main/resources/patterns/yaf             |  2 +-
 metron-streaming/Metron-Topologies/pom.xml      | 20 +++++-
 .../src/main/assembly/assembly.xml              | 74 +++++++++++++++++++
 .../src/main/bash/latency_summarizer.sh         | 32 +++++++++
 .../src/main/bash/start_topology.sh             | 22 ++++++
 .../src/main/bash/zk_load_configs.sh            | 33 +++++++++
 .../Metron_Configs/etc/env/config.properties    |  2 +-
 .../Metron_Configs/topologies/bro/remote.yaml   |  2 +-
 .../topologies/enrichment/remote.yaml           |  2 +-
 .../Metron_Configs/topologies/yaf/test.yaml     |  7 +-
 .../src/main/resources/SampleParsed/SnortParsed |  6 +-
 .../resources/SampleParsed/YafExampleParsed     | 20 +++---
 .../integration/EnrichmentIntegrationTest.java  | 75 ++++++++++----------
 .../integration/ParserIntegrationTest.java      |  1 +
 .../metron/integration/YafIntegrationTest.java  |  6 +-
 29 files changed, 299 insertions(+), 185 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/f32af016/.gitignore
----------------------------------------------------------------------
diff --git a/.gitignore b/.gitignore
index c9d729b..488d7ea 100644
--- a/.gitignore
+++ b/.gitignore
@@ -17,3 +17,4 @@ target
 *.classpath
 *.settings
 *hbase-site.xml
+log4j.properties

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/f32af016/deployment/amazon-ec2/conf/defaults.yml
----------------------------------------------------------------------
diff --git a/deployment/amazon-ec2/conf/defaults.yml b/deployment/amazon-ec2/conf/defaults.yml
index e3261b3..03557c8 100644
--- a/deployment/amazon-ec2/conf/defaults.yml
+++ b/deployment/amazon-ec2/conf/defaults.yml
@@ -36,10 +36,6 @@ tracker_hbase_table: access_tracker
 threatintel_ip_hbase_table: malicious_ip
 
 # kafka
-pycapa_topic: pcap
-bro_topic: bro
-yaf_topic: ipfix
-snort_topic: snort
 num_partitions: 3
 retention_in_gb: 25
 

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/f32af016/deployment/roles/hadoop_setup/defaults/main.yml
----------------------------------------------------------------------
diff --git a/deployment/roles/hadoop_setup/defaults/main.yml b/deployment/roles/hadoop_setup/defaults/main.yml
index c783cea..99a55f6 100644
--- a/deployment/roles/hadoop_setup/defaults/main.yml
+++ b/deployment/roles/hadoop_setup/defaults/main.yml
@@ -19,7 +19,7 @@ num_partitions: 1
 retention_in_gb: 10
 pycapa_topic: pcap
 bro_topic: bro
-yaf_topic: ipfix
+yaf_topic: yaf
 snort_topic: snort
 enrichments_topic: enrichments
 

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/f32af016/deployment/roles/metron_pcapservice/tasks/main.yml
----------------------------------------------------------------------
diff --git a/deployment/roles/metron_pcapservice/tasks/main.yml b/deployment/roles/metron_pcapservice/tasks/main.yml
index 949c5a3..989e49b 100644
--- a/deployment/roles/metron_pcapservice/tasks/main.yml
+++ b/deployment/roles/metron_pcapservice/tasks/main.yml
@@ -58,6 +58,6 @@
   yum: name=daemonize
 
 - name: Start pcap_service
-  shell: "daemonize -c {{ metron_directory }} -e /var/log/metron_pcapservice.log -o /var/log/metron_pcapservice.log -l /var/lock/subsys/metron_pcapservice /usr/bin/java -cp {{ metron_directory }}/lib/{{ pcapservice_jar_name }} com.opensoc.pcapservice.rest.PcapService -p {{ pcapservice_port }}"
+  shell: "daemonize -c {{ metron_directory }} -e /var/log/metron_pcapservice.log -o /var/log/metron_pcapservice.log -l /var/lock/subsys/metron_pcapservice /usr/bin/java -cp {{ metron_directory }}/lib/{{ pcapservice_jar_name }} org.apache.metron.pcapservice.rest.PcapService -p {{ pcapservice_port }}"
   args:
     creates: /var/lock/subsys/metron_pcapservice

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/f32af016/deployment/roles/metron_streaming/defaults/main.yml
----------------------------------------------------------------------
diff --git a/deployment/roles/metron_streaming/defaults/main.yml b/deployment/roles/metron_streaming/defaults/main.yml
index 21f9c7b..9c67a24 100644
--- a/deployment/roles/metron_streaming/defaults/main.yml
+++ b/deployment/roles/metron_streaming/defaults/main.yml
@@ -25,7 +25,7 @@ threat_intel_csv_filepath: "../roles/metron_streaming/templates/{{ threat_intel_
 
 pycapa_topic: pcap
 bro_topic: bro
-yaf_topic: ipfix
+yaf_topic: yaf
 snort_topic: snort
 enrichments_topic: enrichments
 storm_topologies:

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/f32af016/deployment/roles/metron_streaming/tasks/main.yml
----------------------------------------------------------------------
diff --git a/deployment/roles/metron_streaming/tasks/main.yml b/deployment/roles/metron_streaming/tasks/main.yml
index b1b8734..27b087c 100644
--- a/deployment/roles/metron_streaming/tasks/main.yml
+++ b/deployment/roles/metron_streaming/tasks/main.yml
@@ -22,18 +22,19 @@
       - { name: 'bin'}
       - { name: 'config'}
 
-- name: Copy Metron streaming jars
+
+- name: Copy Metron Topologies bundle
   copy:
-    src: "{{ metron_jar_path }}"
-    dest: "{{ metron_directory }}/lib/"
+    src: "{{ metron_topologies_bundle_path }}"
+    dest: "{{ metron_directory }}"
 
 - name: Copy Metron DataLoads bundle
   copy:
     src: "{{ metron_dataloads_path }}"
     dest: "{{ metron_directory }}"
 
-- name: Unbundle Metron DataLoads bundle
-  shell: cd {{ metron_directory }} && tar xzvf *.tar.gz
+- name: Unbundle Metron bundles
+  shell: cd {{ metron_directory }} && tar xzvf Metron-Topologies*.tar.gz && tar xzvf Metron-DataLoads*.tar.gz && rm *.tar.gz
 
 - name: Alternatives link for "java"
   alternatives: name={{ item.name }} link={{ item.link }}  path={{ item.path }}
@@ -50,14 +51,6 @@
 - name: Add hdfs-site.xml to topology jar
   shell: cd {{ hdfs_config_path }} && jar -uf {{ metron_directory }}/lib/{{ metron_jar_name }} hdfs-site.xml
 
-- name: Copy Metron topology config files
-  copy:
-    src: "{{ metron_src_config_path }}/{{ item }}"
-    dest: "{{ metron_directory }}/config/"
-  with_items:
-    - "etc"
-    - "topologies"
-
 - name: Get Default mysql passowrd
   include_vars: "../roles/mysql_server/vars/main.yml"
   when: mysql_root_password is undefined
@@ -68,6 +61,9 @@
 - name: Create Metron HDFS output directory
   command: su - hdfs -c "hdfs dfs -mkdir -p {{ metron_hdfs_output_dir }} && hdfs dfs -chown hdfs:hadoop {{ metron_hdfs_output_dir }} && hdfs dfs -chmod 775 {{ metron_hdfs_output_dir }}"
 
+- name: Upload grok patterns
+  command: su - hdfs -c "hdfs dfs -mkdir -p {{ metron_hdfs_output_dir }}/patterns && hdfs dfs -put {{ metron_directory }}/config/patterns/*  {{ metron_hdfs_output_dir }}/patterns && hdfs dfs -chown -R hdfs:hadoop {{ metron_hdfs_output_dir }}/patterns && hdfs dfs -chmod -R 775 {{ metron_hdfs_output_dir }}/patterns"
+
 - name: Configure Metron topologies
   lineinfile: >
     dest={{ metron_properties_config_path }}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/f32af016/deployment/roles/metron_streaming/vars/main.yml
----------------------------------------------------------------------
diff --git a/deployment/roles/metron_streaming/vars/main.yml b/deployment/roles/metron_streaming/vars/main.yml
index 83379c6..b9664c9 100644
--- a/deployment/roles/metron_streaming/vars/main.yml
+++ b/deployment/roles/metron_streaming/vars/main.yml
@@ -19,7 +19,9 @@ metron_jar_name: Metron-Topologies-{{ metron_version }}.jar
 metron_jar_path: "{{ playbook_dir }}/../../metron-streaming/Metron-Topologies/target/{{ metron_jar_name }}"
 metron_directory: /usr/metron/{{ metron_version }}
 metron_dataloads_name: Metron-DataLoads-{{ metron_version }}-archive.tar.gz
+metron_topologies_bundle_name: Metron-Topologies-{{ metron_version }}-archive.tar.gz
 metron_dataloads_path: "{{ playbook_dir }}/../../metron-streaming/Metron-DataLoads/target/{{ metron_dataloads_name }}"
+metron_topologies_bundle_path: "{{ playbook_dir }}/../../metron-streaming/Metron-Topologies/target/{{ metron_topologies_bundle_name }}"
 metron_src_config_path: "{{ playbook_dir }}/../../metron-streaming/Metron-Topologies/src/main/resources/Metron_Configs"
 metron_properties_config_path: "{{ metron_directory }}/config/etc/env/config.properties"
 elasticsearch_config_path: /etc/elasticsearch

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/f32af016/deployment/roles/yaf/defaults/main.yml
----------------------------------------------------------------------
diff --git a/deployment/roles/yaf/defaults/main.yml b/deployment/roles/yaf/defaults/main.yml
index 7891ad2..702b739 100644
--- a/deployment/roles/yaf/defaults/main.yml
+++ b/deployment/roles/yaf/defaults/main.yml
@@ -19,7 +19,7 @@
 fixbuf_version: 1.7.1
 yaf_version: 2.8.0
 yaf_home: /opt/yaf
-yaf_topic: ipfix
+yaf_topic: yaf
 hdp_repo_def: http://public-repo-1.hortonworks.com/HDP/centos6/2.x/updates/2.3.2.0/hdp.repo
 yaf_bin: /usr/local/bin/yaf
 yafscii_bin: /usr/local/bin/yafscii

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/f32af016/metron-streaming/Metron-Common/src/main/java/org/apache/metron/bolt/BulkMessageWriterBolt.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Common/src/main/java/org/apache/metron/bolt/BulkMessageWriterBolt.java b/metron-streaming/Metron-Common/src/main/java/org/apache/metron/bolt/BulkMessageWriterBolt.java
index d827536..e9a9237 100644
--- a/metron-streaming/Metron-Common/src/main/java/org/apache/metron/bolt/BulkMessageWriterBolt.java
+++ b/metron-streaming/Metron-Common/src/main/java/org/apache/metron/bolt/BulkMessageWriterBolt.java
@@ -64,7 +64,7 @@ public class BulkMessageWriterBolt extends ConfiguredBolt {
   @SuppressWarnings("unchecked")
   @Override
   public void execute(Tuple tuple) {
-    JSONObject message = (JSONObject) tuple.getValueByField("message");
+    JSONObject message = (JSONObject)((JSONObject) tuple.getValueByField("message")).clone();
     message.put("index." + bulkMessageWriter.getClass().getSimpleName().toLowerCase() + ".ts", "" + System.currentTimeMillis());
     String sourceType = TopologyUtils.getSourceType(message);
     SourceConfig configuration = configurations.get(sourceType);

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/f32af016/metron-streaming/Metron-DataLoads/src/main/bash/latency_summarizer.sh
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-DataLoads/src/main/bash/latency_summarizer.sh b/metron-streaming/Metron-DataLoads/src/main/bash/latency_summarizer.sh
deleted file mode 100755
index 335c3fd..0000000
--- a/metron-streaming/Metron-DataLoads/src/main/bash/latency_summarizer.sh
+++ /dev/null
@@ -1,32 +0,0 @@
-#!/bin/bash
-# 
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-# 
-#     http://www.apache.org/licenses/LICENSE-2.0
-# 
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-BIGTOP_DEFAULTS_DIR=${BIGTOP_DEFAULTS_DIR-/etc/default}
-[ -n "${BIGTOP_DEFAULTS_DIR}" -a -r ${BIGTOP_DEFAULTS_DIR}/hbase ] && . ${BIGTOP_DEFAULTS_DIR}/hbase
-
-# Autodetect JAVA_HOME if not defined
-if [ -e /usr/libexec/bigtop-detect-javahome ]; then
-  . /usr/libexec/bigtop-detect-javahome
-elif [ -e /usr/lib/bigtop-utils/bigtop-detect-javahome ]; then
-  . /usr/lib/bigtop-utils/bigtop-detect-javahome
-fi
-export METRON_VERSION=0.1BETA
-export METRON_HOME=/usr/metron/$METRON_VERSION
-export TOPOLOGIES_JAR=Metron-Topologies-$METRON_VERSION.jar
-java -cp $METRON_HOME/lib/$TOPOLOGIES_JAR org.apache.metron.utils.LatencySummarizer "$@"

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/f32af016/metron-streaming/Metron-DataLoads/src/main/bash/start_topology.sh
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-DataLoads/src/main/bash/start_topology.sh b/metron-streaming/Metron-DataLoads/src/main/bash/start_topology.sh
deleted file mode 100755
index 21626c2..0000000
--- a/metron-streaming/Metron-DataLoads/src/main/bash/start_topology.sh
+++ /dev/null
@@ -1,22 +0,0 @@
-#!/bin/bash
-# 
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-# 
-#     http://www.apache.org/licenses/LICENSE-2.0
-# 
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-METRON_VERSION=0.1BETA
-METRON_HOME=/usr/metron/$METRON_VERSION
-TOPOLOGY_JAR=Metron-Topologies-$METRON_VERSION.jar
-storm jar $METRON_HOME/lib/$TOPOLOGY_JAR org.apache.storm.flux.Flux --remote $METRON_HOME/config/topologies/$1/remote.yaml --filter $METRON_HOME/config/etc/env/config.properties

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/f32af016/metron-streaming/Metron-DataLoads/src/main/bash/zk_load_configs.sh
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-DataLoads/src/main/bash/zk_load_configs.sh b/metron-streaming/Metron-DataLoads/src/main/bash/zk_load_configs.sh
deleted file mode 100755
index 4273b7d..0000000
--- a/metron-streaming/Metron-DataLoads/src/main/bash/zk_load_configs.sh
+++ /dev/null
@@ -1,33 +0,0 @@
-#!/bin/bash
-# 
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-# 
-#     http://www.apache.org/licenses/LICENSE-2.0
-# 
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-BIGTOP_DEFAULTS_DIR=${BIGTOP_DEFAULTS_DIR-/etc/default}
-[ -n "${BIGTOP_DEFAULTS_DIR}" -a -r ${BIGTOP_DEFAULTS_DIR}/hbase ] && . ${BIGTOP_DEFAULTS_DIR}/hbase
-
-# Autodetect JAVA_HOME if not defined
-if [ -e /usr/libexec/bigtop-detect-javahome ]; then
-  . /usr/libexec/bigtop-detect-javahome
-elif [ -e /usr/lib/bigtop-utils/bigtop-detect-javahome ]; then
-  . /usr/lib/bigtop-utils/bigtop-detect-javahome
-fi
-export METRON_VERSION=0.1BETA
-export METRON_HOME=/usr/metron/$METRON_VERSION
-export TOPOLOGIES_JAR=Metron-Topologies-$METRON_VERSION.jar
-export ZK_HOME=${ZK_HOME:-/usr/hdp/current/hbase-client}
-java -cp $METRON_HOME/lib/$TOPOLOGIES_JAR org.apache.metron.utils.SourceConfigUtils "$@"

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/f32af016/metron-streaming/Metron-MessageParsers/src/main/java/org/apache/metron/parsing/parsers/BasicSnortParser.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-MessageParsers/src/main/java/org/apache/metron/parsing/parsers/BasicSnortParser.java b/metron-streaming/Metron-MessageParsers/src/main/java/org/apache/metron/parsing/parsers/BasicSnortParser.java
index 27b05a4..fa97801 100644
--- a/metron-streaming/Metron-MessageParsers/src/main/java/org/apache/metron/parsing/parsers/BasicSnortParser.java
+++ b/metron-streaming/Metron-MessageParsers/src/main/java/org/apache/metron/parsing/parsers/BasicSnortParser.java
@@ -23,10 +23,7 @@ import org.slf4j.LoggerFactory;
 
 import java.text.ParseException;
 import java.text.SimpleDateFormat;
-import java.util.ArrayList;
-import java.util.Calendar;
-import java.util.Date;
-import java.util.List;
+import java.util.*;
 
 @SuppressWarnings("serial")
 public class BasicSnortParser extends BasicParser {
@@ -43,11 +40,11 @@ public class BasicSnortParser extends BasicParser {
 			"sig_id",
 			"sig_rev",
 			"msg",
-			"proto",
-			"src",
-			"srcport",
-			"dst",
-			"dstport",
+			"protocol",
+			"ip_src_addr",
+			"ip_src_port",
+			"ip_dst_addr",
+			"ip_dst_port",
 			"ethsrc",
 			"ethdst",
 			"ethlen",
@@ -67,6 +64,7 @@ public class BasicSnortParser extends BasicParser {
 			"icmpseq"
 	};
 
+
 	/**
 	 * Snort alerts are received as CSV records
 	 */

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/f32af016/metron-streaming/Metron-MessageParsers/src/main/java/org/apache/metron/parsing/parsers/GrokParser.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-MessageParsers/src/main/java/org/apache/metron/parsing/parsers/GrokParser.java b/metron-streaming/Metron-MessageParsers/src/main/java/org/apache/metron/parsing/parsers/GrokParser.java
index 9c7e6af..c07c301 100644
--- a/metron-streaming/Metron-MessageParsers/src/main/java/org/apache/metron/parsing/parsers/GrokParser.java
+++ b/metron-streaming/Metron-MessageParsers/src/main/java/org/apache/metron/parsing/parsers/GrokParser.java
@@ -17,11 +17,13 @@
  */
 package org.apache.metron.parsing.parsers;
 
+import com.google.common.io.Resources;
 import oi.thekraken.grok.api.Grok;
 import oi.thekraken.grok.api.Match;
 import oi.thekraken.grok.api.exception.GrokException;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocalFileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.metron.parser.interfaces.MessageParser;
 import org.json.simple.JSONObject;
@@ -32,6 +34,7 @@ import java.io.IOException;
 import java.io.InputStream;
 import java.io.InputStreamReader;
 import java.io.Serializable;
+import java.net.URL;
 import java.text.ParseException;
 import java.text.SimpleDateFormat;
 import java.util.ArrayList;
@@ -51,11 +54,17 @@ public class GrokParser implements MessageParser<JSONObject>, Serializable {
   private String dateFormat = "yyyy-MM-dd HH:mm:ss.S z";
   private TimeZone timeZone = TimeZone.getTimeZone("UTC");
 
+  private String metronHdfsHome = "/apps/metron";
   public GrokParser(String grokHdfsPath, String patterLabel) {
     this.grokHdfsPath = grokHdfsPath;
     this.patternLabel = patterLabel;
   }
 
+  public GrokParser withMetronHDFSHome(String home) {
+    this.metronHdfsHome= home;
+    return this;
+  }
+
   public GrokParser withTimestampField(String timestampField) {
     this.timestampField = timestampField;
     return this;
@@ -76,21 +85,36 @@ public class GrokParser implements MessageParser<JSONObject>, Serializable {
     return this;
   }
 
+  public InputStream openInputStream(String streamName) throws IOException {
+    InputStream is = getClass().getResourceAsStream(streamName);
+    if(is == null) {
+      FileSystem fs = FileSystem.get(new Configuration());
+      Path path = new Path((metronHdfsHome != null && metronHdfsHome.length() > 0?metronHdfsHome + "/":"") + streamName);
+      if(fs.exists(path)) {
+        return fs.open(path);
+      }
+    }
+    return is;
+  }
+
   @Override
   public void init() {
     grok = new Grok();
     try {
-      InputStream commonInputStream = getClass().getResourceAsStream
-              ("/patterns/common");
+      InputStream commonInputStream = openInputStream("/patterns/common");
+      if(commonInputStream == null) {
+        throw new RuntimeException("Unable to initialize grok parser: Unable to load /patterns/common from either classpath or HDFS" );
+      }
       grok.addPatternFromReader(new InputStreamReader(commonInputStream));
-      InputStream patterInputStream = FileSystem.get(new Configuration()).open(new
-              Path(grokHdfsPath));
+      InputStream patterInputStream = openInputStream(grokHdfsPath);
+      if(patterInputStream == null) {
+        throw new RuntimeException("Unable to initialize grok parser: Unable to load " + grokHdfsPath + " from either classpath or HDFS" );
+      }
       grok.addPatternFromReader(new InputStreamReader(patterInputStream));
       grok.compile("%{" + patternLabel + "}");
-    } catch (GrokException e) {
+    } catch (Throwable e) {
       LOG.error(e.getMessage(), e);
-    } catch (IOException e) {
-      e.printStackTrace();
+      throw new RuntimeException("Grok parser Error: " + e.getMessage(), e);
     }
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/f32af016/metron-streaming/Metron-MessageParsers/src/main/resources/patterns/yaf
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-MessageParsers/src/main/resources/patterns/yaf b/metron-streaming/Metron-MessageParsers/src/main/resources/patterns/yaf
index 8fc130e..c664586 100644
--- a/metron-streaming/Metron-MessageParsers/src/main/resources/patterns/yaf
+++ b/metron-streaming/Metron-MessageParsers/src/main/resources/patterns/yaf
@@ -1,2 +1,2 @@
 YAF_TIME_FORMAT %{YEAR:UNWANTED}-%{MONTHNUM:UNWANTED}-%{MONTHDAY:UNWANTED}[T ]%{HOUR:UNWANTED}:%{MINUTE:UNWANTED}:%{SECOND:UNWANTED}
-YAF_DELIMITED %{YAF_TIME_FORMAT:start_time}\|%{YAF_TIME_FORMAT:end_time}\|%{SPACE:UNWANTED}%{BASE10NUM:duration}\|%{SPACE:UNWANTED}%{BASE10NUM:rtt}\|%{SPACE:UNWANTED}%{INT:proto}\|%{SPACE:UNWANTED}%{IP:sip}\|%{SPACE:UNWANTED}%{INT:sp}\|%{SPACE:UNWANTED}%{IP:dip}\|%{SPACE:UNWANTED}%{INT:dp}\|%{SPACE:UNWANTED}%{DATA:iflags}\|%{SPACE:UNWANTED}%{DATA:uflags}\|%{SPACE:UNWANTED}%{DATA:riflags}\|%{SPACE:UNWANTED}%{DATA:ruflags}\|%{SPACE:UNWANTED}%{WORD:isn}\|%{SPACE:UNWANTED}%{DATA:risn}\|%{SPACE:UNWANTED}%{DATA:tag}\|%{GREEDYDATA:rtag}\|%{SPACE:UNWANTED}%{INT:pkt}\|%{SPACE:UNWANTED}%{INT:oct}\|%{SPACE:UNWANTED}%{INT:rpkt}\|%{SPACE:UNWANTED}%{INT:roct}\|%{SPACE:UNWANTED}%{INT:app}\|%{GREEDYDATA:end_reason}
\ No newline at end of file
+YAF_DELIMITED %{YAF_TIME_FORMAT:start_time}\|%{YAF_TIME_FORMAT:end_time}\|%{SPACE:UNWANTED}%{BASE10NUM:duration}\|%{SPACE:UNWANTED}%{BASE10NUM:rtt}\|%{SPACE:UNWANTED}%{INT:protocol}\|%{SPACE:UNWANTED}%{IP:ip_src_addr}\|%{SPACE:UNWANTED}%{INT:ip_src_port}\|%{SPACE:UNWANTED}%{IP:ip_dst_addr}\|%{SPACE:UNWANTED}%{INT:ip_dst_port}\|%{SPACE:UNWANTED}%{DATA:iflags}\|%{SPACE:UNWANTED}%{DATA:uflags}\|%{SPACE:UNWANTED}%{DATA:riflags}\|%{SPACE:UNWANTED}%{DATA:ruflags}\|%{SPACE:UNWANTED}%{WORD:isn}\|%{SPACE:UNWANTED}%{DATA:risn}\|%{SPACE:UNWANTED}%{DATA:tag}\|%{GREEDYDATA:rtag}\|%{SPACE:UNWANTED}%{INT:pkt}\|%{SPACE:UNWANTED}%{INT:oct}\|%{SPACE:UNWANTED}%{INT:rpkt}\|%{SPACE:UNWANTED}%{INT:roct}\|%{SPACE:UNWANTED}%{INT:app}\|%{GREEDYDATA:end_reason}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/f32af016/metron-streaming/Metron-Topologies/pom.xml
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Topologies/pom.xml b/metron-streaming/Metron-Topologies/pom.xml
index 4530e6d..9077b82 100644
--- a/metron-streaming/Metron-Topologies/pom.xml
+++ b/metron-streaming/Metron-Topologies/pom.xml
@@ -235,7 +235,8 @@
                         </configuration>
                     </execution>
                 </executions>
-            </plugin>
+            </plugin> 
+            
             <plugin>
                 <groupId>org.apache.maven.plugins</groupId>
                 <artifactId>maven-shade-plugin</artifactId>
@@ -274,6 +275,23 @@
                     </execution>
                 </executions>
             </plugin>
+            <plugin>
+                <artifactId>maven-assembly-plugin</artifactId>
+                <configuration>
+                    <descriptor>src/main/assembly/assembly.xml</descriptor>
+                </configuration>
+                <executions>
+                    <execution>
+                        <id>make-assembly</id> <!-- this is used for inheritance merges -->
+                        <phase>package</phase> <!-- bind to the packaging phase -->
+                        <goals>
+                            <goal>single</goal>
+                        </goals>
+                    </execution>
+                </executions>
+            </plugin>
+
+
         </plugins>
     </build>
 </project>

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/f32af016/metron-streaming/Metron-Topologies/src/main/assembly/assembly.xml
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Topologies/src/main/assembly/assembly.xml b/metron-streaming/Metron-Topologies/src/main/assembly/assembly.xml
new file mode 100644
index 0000000..fee1238
--- /dev/null
+++ b/metron-streaming/Metron-Topologies/src/main/assembly/assembly.xml
@@ -0,0 +1,74 @@
+<!--
+  Licensed to the Apache Software
+	Foundation (ASF) under one or more contributor license agreements. See the
+	NOTICE file distributed with this work for additional information regarding
+	copyright ownership. The ASF licenses this file to You under the Apache License,
+	Version 2.0 (the "License"); you may not use this file except in compliance
+	with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
+	Unless required by applicable law or agreed to in writing, software distributed
+	under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES
+	OR CONDITIONS OF ANY KIND, either express or implied. See the License for
+  the specific language governing permissions and limitations under the License.
+  -->
+
+<assembly>
+  <id>archive</id>
+  <formats>
+    <format>tar.gz</format>
+  </formats>
+  <includeBaseDirectory>false</includeBaseDirectory>
+  <fileSets>
+    <fileSet>
+      <directory>${project.basedir}/src/main/bash</directory>
+      <outputDirectory>/bin</outputDirectory>
+      <useDefaultExcludes>true</useDefaultExcludes>
+      <excludes>
+        <exclude>**/*.formatted</exclude>
+        <exclude>**/*.filtered</exclude>
+      </excludes>
+      <fileMode>0755</fileMode>
+      <lineEnding>unix</lineEnding>
+    </fileSet>
+    <fileSet>
+      <directory>${project.basedir}/src/main/resources/Metron_Configs/etc</directory>
+      <outputDirectory>/config/etc</outputDirectory>
+      <useDefaultExcludes>true</useDefaultExcludes>
+      <excludes>
+        <exclude>**/*.formatted</exclude>
+        <exclude>**/*.filtered</exclude>
+      </excludes>
+      <fileMode>0644</fileMode>
+      <lineEnding>unix</lineEnding>
+    </fileSet>
+    <fileSet>
+      <directory>${project.basedir}/src/main/resources/Metron_Configs/topologies</directory>
+      <outputDirectory>/config/topologies</outputDirectory>
+      <useDefaultExcludes>true</useDefaultExcludes>
+      <excludes>
+        <exclude>**/*.formatted</exclude>
+        <exclude>**/*.filtered</exclude>
+      </excludes>
+      <fileMode>0644</fileMode>
+      <lineEnding>unix</lineEnding>
+    </fileSet>
+    <fileSet>
+      <directory>${project.basedir}/../Metron-MessageParsers/src/main/resources/patterns</directory>
+      <outputDirectory>/config/patterns</outputDirectory>
+      <useDefaultExcludes>true</useDefaultExcludes>
+      <excludes>
+        <exclude>**/*.formatted</exclude>
+        <exclude>**/*.filtered</exclude>
+      </excludes>
+      <fileMode>0644</fileMode>
+      <lineEnding>unix</lineEnding>
+    </fileSet>
+    <fileSet>
+      <directory>${project.basedir}/target</directory>
+      <includes>
+        <include>${project.artifactId}-${project.version}.jar</include>
+      </includes>
+      <outputDirectory>/lib</outputDirectory>
+      <useDefaultExcludes>true</useDefaultExcludes>
+    </fileSet>
+  </fileSets>
+</assembly>

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/f32af016/metron-streaming/Metron-Topologies/src/main/bash/latency_summarizer.sh
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Topologies/src/main/bash/latency_summarizer.sh b/metron-streaming/Metron-Topologies/src/main/bash/latency_summarizer.sh
new file mode 100755
index 0000000..335c3fd
--- /dev/null
+++ b/metron-streaming/Metron-Topologies/src/main/bash/latency_summarizer.sh
@@ -0,0 +1,32 @@
+#!/bin/bash
+# 
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+# 
+#     http://www.apache.org/licenses/LICENSE-2.0
+# 
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+BIGTOP_DEFAULTS_DIR=${BIGTOP_DEFAULTS_DIR-/etc/default}
+[ -n "${BIGTOP_DEFAULTS_DIR}" -a -r ${BIGTOP_DEFAULTS_DIR}/hbase ] && . ${BIGTOP_DEFAULTS_DIR}/hbase
+
+# Autodetect JAVA_HOME if not defined
+if [ -e /usr/libexec/bigtop-detect-javahome ]; then
+  . /usr/libexec/bigtop-detect-javahome
+elif [ -e /usr/lib/bigtop-utils/bigtop-detect-javahome ]; then
+  . /usr/lib/bigtop-utils/bigtop-detect-javahome
+fi
+export METRON_VERSION=0.1BETA
+export METRON_HOME=/usr/metron/$METRON_VERSION
+export TOPOLOGIES_JAR=Metron-Topologies-$METRON_VERSION.jar
+java -cp $METRON_HOME/lib/$TOPOLOGIES_JAR org.apache.metron.utils.LatencySummarizer "$@"

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/f32af016/metron-streaming/Metron-Topologies/src/main/bash/start_topology.sh
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Topologies/src/main/bash/start_topology.sh b/metron-streaming/Metron-Topologies/src/main/bash/start_topology.sh
new file mode 100755
index 0000000..21626c2
--- /dev/null
+++ b/metron-streaming/Metron-Topologies/src/main/bash/start_topology.sh
@@ -0,0 +1,22 @@
+#!/bin/bash
+# 
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+# 
+#     http://www.apache.org/licenses/LICENSE-2.0
+# 
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+METRON_VERSION=0.1BETA
+METRON_HOME=/usr/metron/$METRON_VERSION
+TOPOLOGY_JAR=Metron-Topologies-$METRON_VERSION.jar
+storm jar $METRON_HOME/lib/$TOPOLOGY_JAR org.apache.storm.flux.Flux --remote $METRON_HOME/config/topologies/$1/remote.yaml --filter $METRON_HOME/config/etc/env/config.properties

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/f32af016/metron-streaming/Metron-Topologies/src/main/bash/zk_load_configs.sh
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Topologies/src/main/bash/zk_load_configs.sh b/metron-streaming/Metron-Topologies/src/main/bash/zk_load_configs.sh
new file mode 100755
index 0000000..4273b7d
--- /dev/null
+++ b/metron-streaming/Metron-Topologies/src/main/bash/zk_load_configs.sh
@@ -0,0 +1,33 @@
+#!/bin/bash
+# 
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+# 
+#     http://www.apache.org/licenses/LICENSE-2.0
+# 
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+BIGTOP_DEFAULTS_DIR=${BIGTOP_DEFAULTS_DIR-/etc/default}
+[ -n "${BIGTOP_DEFAULTS_DIR}" -a -r ${BIGTOP_DEFAULTS_DIR}/hbase ] && . ${BIGTOP_DEFAULTS_DIR}/hbase
+
+# Autodetect JAVA_HOME if not defined
+if [ -e /usr/libexec/bigtop-detect-javahome ]; then
+  . /usr/libexec/bigtop-detect-javahome
+elif [ -e /usr/lib/bigtop-utils/bigtop-detect-javahome ]; then
+  . /usr/lib/bigtop-utils/bigtop-detect-javahome
+fi
+export METRON_VERSION=0.1BETA
+export METRON_HOME=/usr/metron/$METRON_VERSION
+export TOPOLOGIES_JAR=Metron-Topologies-$METRON_VERSION.jar
+export ZK_HOME=${ZK_HOME:-/usr/hdp/current/hbase-client}
+java -cp $METRON_HOME/lib/$TOPOLOGIES_JAR org.apache.metron.utils.SourceConfigUtils "$@"

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/f32af016/metron-streaming/Metron-Topologies/src/main/resources/Metron_Configs/etc/env/config.properties
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Topologies/src/main/resources/Metron_Configs/etc/env/config.properties b/metron-streaming/Metron-Topologies/src/main/resources/Metron_Configs/etc/env/config.properties
index 1f61609..9f66a95 100644
--- a/metron-streaming/Metron-Topologies/src/main/resources/Metron_Configs/etc/env/config.properties
+++ b/metron-streaming/Metron-Topologies/src/main/resources/Metron_Configs/etc/env/config.properties
@@ -27,7 +27,7 @@ spout.kafka.topic.lancope=lancope
 spout.kafka.topic.paloalto=paloalto
 spout.kafka.topic.pcap=pcap
 spout.kafka.topic.snort=snort
-spout.kafka.topic.yaf=ipfix
+spout.kafka.topic.yaf=yaf
 
 ##### ElasticSearch #####
 

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/f32af016/metron-streaming/Metron-Topologies/src/main/resources/Metron_Configs/topologies/bro/remote.yaml
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Topologies/src/main/resources/Metron_Configs/topologies/bro/remote.yaml b/metron-streaming/Metron-Topologies/src/main/resources/Metron_Configs/topologies/bro/remote.yaml
index 42412fa..34670f8 100644
--- a/metron-streaming/Metron-Topologies/src/main/resources/Metron_Configs/topologies/bro/remote.yaml
+++ b/metron-streaming/Metron-Topologies/src/main/resources/Metron_Configs/topologies/bro/remote.yaml
@@ -44,7 +44,7 @@ components:
             -   name: "ignoreZkOffsets"
                 value: true
             -   name: "startOffsetTime"
-                value: -2
+                value: -1
             -   name: "socketTimeoutMs"
                 value: 1000000
 

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/f32af016/metron-streaming/Metron-Topologies/src/main/resources/Metron_Configs/topologies/enrichment/remote.yaml
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Topologies/src/main/resources/Metron_Configs/topologies/enrichment/remote.yaml b/metron-streaming/Metron-Topologies/src/main/resources/Metron_Configs/topologies/enrichment/remote.yaml
index 45d8a50..d3961bf 100644
--- a/metron-streaming/Metron-Topologies/src/main/resources/Metron_Configs/topologies/enrichment/remote.yaml
+++ b/metron-streaming/Metron-Topologies/src/main/resources/Metron_Configs/topologies/enrichment/remote.yaml
@@ -148,7 +148,7 @@ components:
             -   name: "ignoreZkOffsets"
                 value: true
             -   name: "startOffsetTime"
-                value: -2
+                value: -1
 
 spouts:
     -   id: "kafkaSpout"

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/f32af016/metron-streaming/Metron-Topologies/src/main/resources/Metron_Configs/topologies/yaf/test.yaml
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Topologies/src/main/resources/Metron_Configs/topologies/yaf/test.yaml b/metron-streaming/Metron-Topologies/src/main/resources/Metron_Configs/topologies/yaf/test.yaml
index fe764d4..f478777 100644
--- a/metron-streaming/Metron-Topologies/src/main/resources/Metron_Configs/topologies/yaf/test.yaml
+++ b/metron-streaming/Metron-Topologies/src/main/resources/Metron_Configs/topologies/yaf/test.yaml
@@ -35,6 +35,9 @@ components:
             -   name: "withDateFormat"
                 args:
                     - "yyyy-MM-dd HH:mm:ss.S"
+            -   name: "withMetronHDFSHome"
+                args:
+                    - ""
     -   id: "writer"
         className: "org.apache.metron.writer.KafkaWriter"
         constructorArgs:
@@ -56,7 +59,7 @@ components:
             - "${spout.kafka.topic.yaf}"
         properties:
             -   name: "ignoreZkOffsets"
-                value: true
+                value: false
             -   name: "startOffsetTime"
                 value: -2
             -   name: "socketTimeoutMs"
@@ -73,7 +76,7 @@ bolts:
         className: "org.apache.metron.bolt.ParserBolt"
         constructorArgs:
             - "${kafka.zk}"
-            - "yaf"
+            - "${spout.kafka.topic.yaf}"
             - ref: "parser"
             - ref: "writer"
 

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/f32af016/metron-streaming/Metron-Topologies/src/main/resources/SampleParsed/SnortParsed
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Topologies/src/main/resources/SampleParsed/SnortParsed b/metron-streaming/Metron-Topologies/src/main/resources/SampleParsed/SnortParsed
index 86236ea..9b99cb7 100644
--- a/metron-streaming/Metron-Topologies/src/main/resources/SampleParsed/SnortParsed
+++ b/metron-streaming/Metron-Topologies/src/main/resources/SampleParsed/SnortParsed
@@ -1,3 +1,3 @@
-{"msg":"\"Consecutive TCP small segments exceeding threshold\"","sig_rev":"1","dst":"10.0.2.15","dstport":"22","ethsrc":"52:54:00:12:35:02","tcpseq":"0x9AFF3D7","dgmlen":"64","icmpid":"","tcplen":"","tcpwindow":"0xFFFF","icmpseq":"","tcpack":"0xC8761D52","original_string":"01\/27-16:01:04.877970 ,129,12,1,\"Consecutive TCP small segments exceeding threshold\",TCP,10.0.2.2,56642,10.0.2.15,22,52:54:00:12:35:02,08:00:27:7F:93:2D,0x4E,***AP***,0x9AFF3D7,0xC8761D52,,0xFFFF,64,0,59677,64,65536,,,,","icmpcode":"","tos":"0","id":"59677","timestamp":1453932941970,"ethdst":"08:00:27:7F:93:2D","src":"10.0.2.2","ttl":"64","source.type":"test","ethlen":"0x4E","iplen":"65536","icmptype":"","proto":"TCP","srcport":"56642","tcpflags":"***AP***","sig_id":"12","sig_generator":"129", "is_alert" : "true"}
-{"msg":"\"Consecutive TCP small segments exceeding threshold\"","sig_rev":"1","dst":"10.0.2.15","dstport":"50895","ethsrc":"52:54:00:12:35:02","tcpseq":"0xDB45F7A","dgmlen":"96","icmpid":"","tcplen":"","tcpwindow":"0xFFFF","icmpseq":"","tcpack":"0x7701DD5B","original_string":"02\/22-15:56:48.612494 ,129,12,1,\"Consecutive TCP small segments exceeding threshold\",TCP,96.44.142.5,80,10.0.2.15,50895,52:54:00:12:35:02,08:00:27:7F:93:2D,0x6E,***AP***,0xDB45F7A,0x7701DD5B,,0xFFFF,64,0,16785,96,98304,,,,","icmpcode":"","tos":"0","id":"16785","timestamp":1456178820494,"ethdst":"08:00:27:7F:93:2D","src":"96.44.142.5","ttl":"64","source.type":"test","ethlen":"0x6E","iplen":"98304","icmptype":"","proto":"TCP","srcport":"80","tcpflags":"***AP***","sig_id":"12","sig_generator":"129", "is_alert" : "true"}
-{"msg":"\"Consecutive TCP small segments exceeding threshold\"","sig_rev":"1","dst":"10.0.2.15","dstport":"50895","ethsrc":"52:54:00:12:35:02","tcpseq":"0xDB508F2","dgmlen":"152","icmpid":"","tcplen":"","tcpwindow":"0xFFFF","icmpseq":"","tcpack":"0x7701DD5B","original_string":"02\/22-15:56:48.616775 ,129,12,1,\"Consecutive TCP small segments exceeding threshold\",TCP,96.44.142.5,80,10.0.2.15,50895,52:54:00:12:35:02,08:00:27:7F:93:2D,0xA6,***AP***,0xDB508F2,0x7701DD5B,,0xFFFF,64,0,16824,152,155648,,,,","icmpcode":"","tos":"0","id":"16824","timestamp":1456178824775,"ethdst":"08:00:27:7F:93:2D","src":"96.44.142.5","ttl":"64","source.type":"test","ethlen":"0xA6","iplen":"155648","icmptype":"","proto":"TCP","srcport":"80","tcpflags":"***AP***","sig_id":"12","sig_generator":"129", "is_alert" : "true"}
+{"msg":"\"Consecutive TCP small segments exceeding threshold\"","sig_rev":"1","ip_dst_addr":"10.0.2.15","ip_dst_port":"22","ethsrc":"52:54:00:12:35:02","tcpseq":"0x9AFF3D7","dgmlen":"64","icmpid":"","tcplen":"","tcpwindow":"0xFFFF","icmpseq":"","tcpack":"0xC8761D52","original_string":"01\/27-16:01:04.877970 ,129,12,1,\"Consecutive TCP small segments exceeding threshold\",TCP,10.0.2.2,56642,10.0.2.15,22,52:54:00:12:35:02,08:00:27:7F:93:2D,0x4E,***AP***,0x9AFF3D7,0xC8761D52,,0xFFFF,64,0,59677,64,65536,,,,","icmpcode":"","tos":"0","id":"59677","timestamp":1453932941970,"ethdst":"08:00:27:7F:93:2D","ip_src_addr":"10.0.2.2","ttl":"64","source.type":"test","ethlen":"0x4E","iplen":"65536","icmptype":"","protocol":"TCP","ip_src_port":"56642","tcpflags":"***AP***","sig_id":"12","sig_generator":"129", "is_alert" : "true"}
+{"msg":"\"Consecutive TCP small segments exceeding threshold\"","sig_rev":"1","ip_dst_addr":"10.0.2.15","ip_dst_port":"50895","ethsrc":"52:54:00:12:35:02","tcpseq":"0xDB45F7A","dgmlen":"96","icmpid":"","tcplen":"","tcpwindow":"0xFFFF","icmpseq":"","tcpack":"0x7701DD5B","original_string":"02\/22-15:56:48.612494 ,129,12,1,\"Consecutive TCP small segments exceeding threshold\",TCP,96.44.142.5,80,10.0.2.15,50895,52:54:00:12:35:02,08:00:27:7F:93:2D,0x6E,***AP***,0xDB45F7A,0x7701DD5B,,0xFFFF,64,0,16785,96,98304,,,,","icmpcode":"","tos":"0","id":"16785","timestamp":1456178820494,"ethdst":"08:00:27:7F:93:2D","ip_src_addr":"96.44.142.5","ttl":"64","source.type":"test","ethlen":"0x6E","iplen":"98304","icmptype":"","protocol":"TCP","ip_src_port":"80","tcpflags":"***AP***","sig_id":"12","sig_generator":"129", "is_alert" : "true"}
+{"msg":"\"Consecutive TCP small segments exceeding threshold\"","sig_rev":"1","ip_dst_addr":"10.0.2.15","ip_dst_port":"50895","ethsrc":"52:54:00:12:35:02","tcpseq":"0xDB508F2","dgmlen":"152","icmpid":"","tcplen":"","tcpwindow":"0xFFFF","icmpseq":"","tcpack":"0x7701DD5B","original_string":"02\/22-15:56:48.616775 ,129,12,1,\"Consecutive TCP small segments exceeding threshold\",TCP,96.44.142.5,80,10.0.2.15,50895,52:54:00:12:35:02,08:00:27:7F:93:2D,0xA6,***AP***,0xDB508F2,0x7701DD5B,,0xFFFF,64,0,16824,152,155648,,,,","icmpcode":"","tos":"0","id":"16824","timestamp":1456178824775,"ethdst":"08:00:27:7F:93:2D","ip_src_addr":"96.44.142.5","ttl":"64","source.type":"test","ethlen":"0xA6","iplen":"155648","icmptype":"","protocol":"TCP","ip_src_port":"80","tcpflags":"***AP***","sig_id":"12","sig_generator":"129", "is_alert" : "true"}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/f32af016/metron-streaming/Metron-Topologies/src/main/resources/SampleParsed/YafExampleParsed
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Topologies/src/main/resources/SampleParsed/YafExampleParsed b/metron-streaming/Metron-Topologies/src/main/resources/SampleParsed/YafExampleParsed
index 57f07b1..bbb90f3 100644
--- a/metron-streaming/Metron-Topologies/src/main/resources/SampleParsed/YafExampleParsed
+++ b/metron-streaming/Metron-Topologies/src/main/resources/SampleParsed/YafExampleParsed
@@ -1,10 +1,10 @@
-{"iflags":"AS","uflags":0,"isn":"22efa001","dip":"10.0.2.15","dp":39468,"duration":"0.000","rpkt":0,"original_string":"2016-01-28 15:29:48.512|2016-01-28 15:29:48.512|   0.000|   0.000|  6|                          216.21.170.221|   80|                               10.0.2.15|39468|      AS|       0|       0|       0|22efa001|00000000|000|000|       1|      44|       0|       0|    0|idle","pkt":1,"ruflags":0,"roct":0,"sip":"216.21.170.221","tag":0,"rtag":0,"sp":80,"timestamp":1453994988512,"app":0,"oct":44,"end_reason":"idle","risn":0,"end_time":1453994988512,"source.type":"yaf","start_time":1453994988512,"riflags":0,"rtt":"0.000","proto":6}
-{"iflags":"A","uflags":0,"isn":10000000,"dip":"10.0.2.3","dp":53,"duration":"0.000","rpkt":0,"original_string":"2016-01-28 15:29:48.502|2016-01-28 15:29:48.502|   0.000|   0.000| 17|                               10.0.2.15|37299|                                10.0.2.3|   53|       A|       0|       0|       0|10000000|00000000|000|000|       1|      56|       0|       0|    0|idle","pkt":1,"ruflags":0,"roct":0,"sip":"10.0.2.15","tag":0,"rtag":0,"sp":37299,"timestamp":1453994988502,"app":0,"oct":56,"end_reason":"idle","risn":0,"end_time":1453994988502,"source.type":"yaf","start_time":1453994988502,"riflags":0,"rtt":"0.000","proto":17}
-{"iflags":"A","uflags":0,"isn":0,"dip":"10.0.2.15","dp":37299,"duration":"0.000","rpkt":0,"original_string":"2016-01-28 15:29:48.504|2016-01-28 15:29:48.504|   0.000|   0.000| 17|                                10.0.2.3|   53|                               10.0.2.15|37299|       A|       0|       0|       0|00000000|00000000|000|000|       1|     312|       0|       0|    0|idle","pkt":1,"ruflags":0,"roct":0,"sip":"10.0.2.3","tag":0,"rtag":0,"sp":53,"timestamp":1453994988504,"app":0,"oct":312,"end_reason":"idle","risn":0,"end_time":1453994988504,"source.type":"yaf","start_time":1453994988504,"riflags":0,"rtt":"0.000","proto":17}
-{"iflags":"A","uflags":0,"isn":0,"dip":"10.0.2.3","dp":53,"duration":"0.000","rpkt":0,"original_string":"2016-01-28 15:29:48.504|2016-01-28 15:29:48.504|   0.000|   0.000| 17|                               10.0.2.15|56303|                                10.0.2.3|   53|       A|       0|       0|       0|00000000|00000000|000|000|       1|      56|       0|       0|    0|idle","pkt":1,"ruflags":0,"roct":0,"sip":"10.0.2.15","tag":0,"rtag":0,"sp":56303,"timestamp":1453994988504,"app":0,"oct":56,"end_reason":"idle","risn":0,"end_time":1453994988504,"source.type":"yaf","start_time":1453994988504,"riflags":0,"rtt":"0.000","proto":17}
-{"iflags":"A","uflags":0,"isn":0,"dip":"10.0.2.15","dp":56303,"duration":"0.000","rpkt":0,"original_string":"2016-01-28 15:29:48.506|2016-01-28 15:29:48.506|   0.000|   0.000| 17|                                10.0.2.3|   53|                               10.0.2.15|56303|       A|       0|       0|       0|00000000|00000000|000|000|       1|      84|       0|       0|    0|idle","pkt":1,"ruflags":0,"roct":0,"sip":"10.0.2.3","tag":0,"rtag":0,"sp":53,"timestamp":1453994988506,"app":0,"oct":84,"end_reason":"idle","risn":0,"end_time":1453994988506,"source.type":"yaf","start_time":1453994988506,"riflags":0,"rtt":"0.000","proto":17}
-{"iflags":"S","uflags":0,"isn":"58c52fca","dip":"216.21.170.221","dp":80,"duration":"0.000","rpkt":0,"original_string":"2016-01-28 15:29:48.508|2016-01-28 15:29:48.508|   0.000|   0.000|  6|                               10.0.2.15|39468|                          216.21.170.221|   80|       S|       0|       0|       0|58c52fca|00000000|000|000|       1|      60|       0|       0|    0|idle","pkt":1,"ruflags":0,"roct":0,"sip":"10.0.2.15","tag":0,"rtag":0,"sp":39468,"timestamp":1453994988508,"app":0,"oct":60,"end_reason":"idle","risn":0,"end_time":1453994988508,"source.type":"yaf","start_time":1453994988508,"riflags":0,"rtt":"0.000","proto":6}
-{"iflags":"A","uflags":0,"isn":"58c52fcb","dip":"216.21.170.221","dp":80,"duration":"0.000","rpkt":0,"original_string":"2016-01-28 15:29:48.512|2016-01-28 15:29:48.512|   0.000|   0.000|  6|                               10.0.2.15|39468|                          216.21.170.221|   80|       A|       0|       0|       0|58c52fcb|00000000|000|000|       1|      40|       0|       0|    0|idle ","pkt":1,"ruflags":0,"roct":0,"sip":"10.0.2.15","tag":0,"rtag":0,"sp":39468,"timestamp":1453994988512,"app":0,"oct":40,"end_reason":"idle ","risn":0,"end_time":1453994988512,"source.type":"yaf","start_time":1453994988512,"riflags":0,"rtt":"0.000","proto":6}
-{"iflags":"AP","uflags":0,"isn":"58c52fcb","dip":"216.21.170.221","dp":80,"duration":"0.000","rpkt":0,"original_string":"2016-01-28 15:29:48.512|2016-01-28 15:29:48.512|   0.000|   0.000|  6|                               10.0.2.15|39468|                          216.21.170.221|   80|      AP|       0|       0|       0|58c52fcb|00000000|000|000|       1|     148|       0|       0|    0|idle ","pkt":1,"ruflags":0,"roct":0,"sip":"10.0.2.15","tag":0,"rtag":0,"sp":39468,"timestamp":1453994988512,"app":0,"oct":148,"end_reason":"idle ","risn":0,"end_time":1453994988512,"source.type":"yaf","start_time":1453994988512,"riflags":0,"rtt":"0.000","proto":6}
-{"iflags":"A","uflags":0,"isn":"22efa002","dip":"10.0.2.15","dp":39468,"duration":"0.000","rpkt":0,"original_string":"2016-01-28 15:29:48.512|2016-01-28 15:29:48.512|   0.000|   0.000|  6|                          216.21.170.221|   80|                               10.0.2.15|39468|       A|       0|       0|       0|22efa002|00000000|000|000|       1|      40|       0|       0|    0|idle ","pkt":1,"ruflags":0,"roct":0,"sip":"216.21.170.221","tag":0,"rtag":0,"sp":80,"timestamp":1453994988512,"app":0,"oct":40,"end_reason":"idle ","risn":0,"end_time":1453994988512,"source.type":"yaf","start_time":1453994988512,"riflags":0,"rtt":"0.000","proto":6}
-{"iflags":"AP","uflags":0,"isn":"22efa002","dip":"10.0.2.15","dp":39468,"duration":"0.000","rpkt":0,"original_string":"2016-01-28 15:29:48.562|2016-01-28 15:29:48.562|   0.000|   0.000|  6|                          216.21.170.221|   80|                               10.0.2.15|39468|      AP|       0|       0|       0|22efa002|00000000|000|000|       1|     604|       0|       0|    0|idle","pkt":1,"ruflags":0,"roct":0,"sip":"216.21.170.221","tag":0,"rtag":0,"sp":80,"timestamp":1453994988562,"app":0,"oct":604,"end_reason":"idle","risn":0,"end_time":1453994988562,"source.type":"yaf","start_time":1453994988562,"riflags":0,"rtt":"0.000","proto":6}
+{"iflags":"AS","uflags":0,"isn":"22efa001","ip_dst_addr":"10.0.2.15","ip_dst_port":39468,"duration":"0.000","rpkt":0,"original_string":"2016-01-28 15:29:48.512|2016-01-28 15:29:48.512|   0.000|   0.000|  6|                          216.21.170.221|   80|                               10.0.2.15|39468|      AS|       0|       0|       0|22efa001|00000000|000|000|       1|      44|       0|       0|    0|idle","pkt":1,"ruflags":0,"roct":0,"ip_src_addr":"216.21.170.221","tag":0,"rtag":0,"ip_src_port":80,"timestamp":1453994988512,"app":0,"oct":44,"end_reason":"idle","risn":0,"end_time":1453994988512,"source.type":"test","start_time":1453994988512,"riflags":0,"rtt":"0.000","protocol":6}
+{"iflags":"A","uflags":0,"isn":10000000,"ip_dst_addr":"10.0.2.3","ip_dst_port":53,"duration":"0.000","rpkt":0,"original_string":"2016-01-28 15:29:48.502|2016-01-28 15:29:48.502|   0.000|   0.000| 17|                               10.0.2.15|37299|                                10.0.2.3|   53|       A|       0|       0|       0|10000000|00000000|000|000|       1|      56|       0|       0|    0|idle","pkt":1,"ruflags":0,"roct":0,"ip_src_addr":"10.0.2.15","tag":0,"rtag":0,"ip_src_port":37299,"timestamp":1453994988502,"app":0,"oct":56,"end_reason":"idle","risn":0,"end_time":1453994988502,"source.type":"test","start_time":1453994988502,"riflags":0,"rtt":"0.000","protocol":17}
+{"iflags":"A","uflags":0,"isn":0,"ip_dst_addr":"10.0.2.15","ip_dst_port":37299,"duration":"0.000","rpkt":0,"original_string":"2016-01-28 15:29:48.504|2016-01-28 15:29:48.504|   0.000|   0.000| 17|                                10.0.2.3|   53|                               10.0.2.15|37299|       A|       0|       0|       0|00000000|00000000|000|000|       1|     312|       0|       0|    0|idle","pkt":1,"ruflags":0,"roct":0,"ip_src_addr":"10.0.2.3","tag":0,"rtag":0,"ip_src_port":53,"timestamp":1453994988504,"app":0,"oct":312,"end_reason":"idle","risn":0,"end_time":1453994988504,"source.type":"test","start_time":1453994988504,"riflags":0,"rtt":"0.000","protocol":17}
+{"iflags":"A","uflags":0,"isn":0,"ip_dst_addr":"10.0.2.3","ip_dst_port":53,"duration":"0.000","rpkt":0,"original_string":"2016-01-28 15:29:48.504|2016-01-28 15:29:48.504|   0.000|   0.000| 17|                               10.0.2.15|56303|                                10.0.2.3|   53|       A|       0|       0|       0|00000000|00000000|000|000|       1|      56|       0|       0|    0|idle","pkt":1,"ruflags":0,"roct":0,"ip_src_addr":"10.0.2.15","tag":0,"rtag":0,"ip_src_port":56303,"timestamp":1453994988504,"app":0,"oct":56,"end_reason":"idle","risn":0,"end_time":1453994988504,"source.type":"test","start_time":1453994988504,"riflags":0,"rtt":"0.000","protocol":17}
+{"iflags":"A","uflags":0,"isn":0,"ip_dst_addr":"10.0.2.15","ip_dst_port":56303,"duration":"0.000","rpkt":0,"original_string":"2016-01-28 15:29:48.506|2016-01-28 15:29:48.506|   0.000|   0.000| 17|                                10.0.2.3|   53|                               10.0.2.15|56303|       A|       0|       0|       0|00000000|00000000|000|000|       1|      84|       0|       0|    0|idle","pkt":1,"ruflags":0,"roct":0,"ip_src_addr":"10.0.2.3","tag":0,"rtag":0,"ip_src_port":53,"timestamp":1453994988506,"app":0,"oct":84,"end_reason":"idle","risn":0,"end_time":1453994988506,"source.type":"test","start_time":1453994988506,"riflags":0,"rtt":"0.000","protocol":17}
+{"iflags":"S","uflags":0,"isn":"58c52fca","ip_dst_addr":"216.21.170.221","ip_dst_port":80,"duration":"0.000","rpkt":0,"original_string":"2016-01-28 15:29:48.508|2016-01-28 15:29:48.508|   0.000|   0.000|  6|                               10.0.2.15|39468|                          216.21.170.221|   80|       S|       0|       0|       0|58c52fca|00000000|000|000|       1|      60|       0|       0|    0|idle","pkt":1,"ruflags":0,"roct":0,"ip_src_addr":"10.0.2.15","tag":0,"rtag":0,"ip_src_port":39468,"timestamp":1453994988508,"app":0,"oct":60,"end_reason":"idle","risn":0,"end_time":1453994988508,"source.type":"test","start_time":1453994988508,"riflags":0,"rtt":"0.000","protocol":6}
+{"iflags":"A","uflags":0,"isn":"58c52fcb","ip_dst_addr":"216.21.170.221","ip_dst_port":80,"duration":"0.000","rpkt":0,"original_string":"2016-01-28 15:29:48.512|2016-01-28 15:29:48.512|   0.000|   0.000|  6|                               10.0.2.15|39468|                          216.21.170.221|   80|       A|       0|       0|       0|58c52fcb|00000000|000|000|       1|      40|       0|       0|    0|idle ","pkt":1,"ruflags":0,"roct":0,"ip_src_addr":"10.0.2.15","tag":0,"rtag":0,"ip_src_port":39468,"timestamp":1453994988512,"app":0,"oct":40,"end_reason":"idle ","risn":0,"end_time":1453994988512,"source.type":"test","start_time":1453994988512,"riflags":0,"rtt":"0.000","protocol":6}
+{"iflags":"AP","uflags":0,"isn":"58c52fcb","ip_dst_addr":"216.21.170.221","ip_dst_port":80,"duration":"0.000","rpkt":0,"original_string":"2016-01-28 15:29:48.512|2016-01-28 15:29:48.512|   0.000|   0.000|  6|                               10.0.2.15|39468|                          216.21.170.221|   80|      AP|       0|       0|       0|58c52fcb|00000000|000|000|       1|     148|       0|       0|    0|idle ","pkt":1,"ruflags":0,"roct":0,"ip_src_addr":"10.0.2.15","tag":0,"rtag":0,"ip_src_port":39468,"timestamp":1453994988512,"app":0,"oct":148,"end_reason":"idle ","risn":0,"end_time":1453994988512,"source.type":"test","start_time":1453994988512,"riflags":0,"rtt":"0.000","protocol":6}
+{"iflags":"A","uflags":0,"isn":"22efa002","ip_dst_addr":"10.0.2.15","ip_dst_port":39468,"duration":"0.000","rpkt":0,"original_string":"2016-01-28 15:29:48.512|2016-01-28 15:29:48.512|   0.000|   0.000|  6|                          216.21.170.221|   80|                               10.0.2.15|39468|       A|       0|       0|       0|22efa002|00000000|000|000|       1|      40|       0|       0|    0|idle ","pkt":1,"ruflags":0,"roct":0,"ip_src_addr":"216.21.170.221","tag":0,"rtag":0,"ip_src_port":80,"timestamp":1453994988512,"app":0,"oct":40,"end_reason":"idle ","risn":0,"end_time":1453994988512,"source.type":"test","start_time":1453994988512,"riflags":0,"rtt":"0.000","protocol":6}
+{"iflags":"AP","uflags":0,"isn":"22efa002","ip_dst_addr":"10.0.2.15","ip_dst_port":39468,"duration":"0.000","rpkt":0,"original_string":"2016-01-28 15:29:48.562|2016-01-28 15:29:48.562|   0.000|   0.000|  6|                          216.21.170.221|   80|                               10.0.2.15|39468|      AP|       0|       0|       0|22efa002|00000000|000|000|       1|     604|       0|       0|    0|idle","pkt":1,"ruflags":0,"roct":0,"ip_src_addr":"216.21.170.221","tag":0,"rtag":0,"ip_src_port":80,"timestamp":1453994988562,"app":0,"oct":604,"end_reason":"idle","risn":0,"end_time":1453994988562,"source.type":"test","start_time":1453994988562,"riflags":0,"rtt":"0.000","protocol":6}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/f32af016/metron-streaming/Metron-Topologies/src/test/java/org/apache/metron/integration/EnrichmentIntegrationTest.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Topologies/src/test/java/org/apache/metron/integration/EnrichmentIntegrationTest.java b/metron-streaming/Metron-Topologies/src/test/java/org/apache/metron/integration/EnrichmentIntegrationTest.java
index 4d5f897..1aa729c 100644
--- a/metron-streaming/Metron-Topologies/src/test/java/org/apache/metron/integration/EnrichmentIntegrationTest.java
+++ b/metron-streaming/Metron-Topologies/src/test/java/org/apache/metron/integration/EnrichmentIntegrationTest.java
@@ -50,7 +50,8 @@ import java.text.SimpleDateFormat;
 import java.util.*;
 
 public class EnrichmentIntegrationTest {
-
+  private static final String SRC_IP = "ip_src_addr";
+  private static final String DST_IP = "ip_dst_addr";
   private String fluxPath = "src/main/resources/Metron_Configs/topologies/enrichment/test.yaml";
   private String indexDir = "target/elasticsearch";
   private String hdfsDir = "target/enrichmentIntegrationTest/hdfs";
@@ -129,21 +130,21 @@ public class EnrichmentIntegrationTest {
   public void test() throws Exception {
     cleanHdfsDir(hdfsDir);
     final String dateFormat = "yyyy.MM.dd.hh";
-    final String index = "yaf_index_" + new SimpleDateFormat(dateFormat).format(new Date());
+    final String index = "test_index_" + new SimpleDateFormat(dateFormat).format(new Date());
     String yafConfig = "{\n" +
-            "  \"index\": \"yaf\",\n" +
+            "  \"index\": \"test\",\n" +
             "  \"batchSize\": 5,\n" +
             "  \"enrichmentFieldMap\":\n" +
             "  {\n" +
-            "    \"geo\": [\"sip\", \"dip\"],\n" +
-            "    \"host\": [\"sip\", \"dip\"]\n" +
+            "    \"geo\": [\"" + SRC_IP + "\", \"" + DST_IP + "\"],\n" +
+            "    \"host\": [\"" + SRC_IP + "\", \"" + DST_IP + "\"]\n" +
             "  },\n" +
             "  \"threatIntelFieldMap\":\n" +
             "  {\n" +
-            "    \"ip\": [\"sip\", \"dip\"]\n" +
+            "    \"ip\": [\"" + SRC_IP + "\", \"" + DST_IP + "\"]\n" +
             "  }\n" +
             "}";
-    sourceConfigs.put("yaf", yafConfig);
+    sourceConfigs.put("test", yafConfig);
     final List<byte[]> inputMessages = TestUtils.readSampleData(sampleParsedPath);
     final String cf = "cf";
     final String trackerHBaseTable = "tracker";
@@ -222,7 +223,7 @@ public class EnrichmentIntegrationTest {
                   if (elasticSearchComponent.hasIndex(index)) {
                     List<Map<String, Object>> docsFromDisk;
                     try {
-                      docs = elasticSearchComponent.getAllIndexedDocs(index, "yaf_doc");
+                      docs = elasticSearchComponent.getAllIndexedDocs(index, "test_doc");
                       docsFromDisk = readDocsFromDisk(hdfsDir);
                       System.out.println(docs.size() + " vs " + inputMessages.size() + " vs " + docsFromDisk.size());
                     } catch (IOException e) {
@@ -257,7 +258,7 @@ public class EnrichmentIntegrationTest {
       Assert.assertEquals(docsFromDisk.size(), docs.size()) ;
 
       Assert.assertEquals(new File(hdfsDir).list().length, 1);
-      Assert.assertEquals(new File(hdfsDir).list()[0], "yaf_doc");
+      Assert.assertEquals(new File(hdfsDir).list()[0], "test_doc");
       for (Map<String, Object> doc : docsFromDisk) {
         baseValidation(doc);
         hostEnrichmentValidation(doc);
@@ -280,8 +281,8 @@ public class EnrichmentIntegrationTest {
       Assert.assertTrue(kv.getValue().toString().length() > 0);
     }
     //ensure we always have a source ip and destination ip
-    Assert.assertNotNull(jsonDoc.get("sip"));
-    Assert.assertNotNull(jsonDoc.get("dip"));
+    Assert.assertNotNull(jsonDoc.get(SRC_IP));
+    Assert.assertNotNull(jsonDoc.get(DST_IP));
   }
 
   private static class EvaluationPayload {
@@ -368,11 +369,11 @@ public class EnrichmentIntegrationTest {
     }
     //ip threat intels
     if(keyPatternExists("threatintels.ip.", indexedDoc)) {
-      if(indexedDoc.get("sip").equals("10.0.2.3")) {
-        Assert.assertEquals(indexedDoc.get("threatintels.ip.sip.ip_threat_intel"), "alert");
+      if(indexedDoc.get(SRC_IP).equals("10.0.2.3")) {
+        Assert.assertEquals(indexedDoc.get("threatintels.ip." + SRC_IP + ".ip_threat_intel"), "alert");
       }
-      else if(indexedDoc.get("dip").equals("10.0.2.3")) {
-        Assert.assertEquals(indexedDoc.get("threatintels.ip.dip.ip_threat_intel"), "alert");
+      else if(indexedDoc.get(DST_IP).equals("10.0.2.3")) {
+        Assert.assertEquals(indexedDoc.get("threatintels.ip." + DST_IP + ".ip_threat_intel"), "alert");
       }
       else {
         Assert.fail("There was a threat intels that I did not expect.");
@@ -383,20 +384,20 @@ public class EnrichmentIntegrationTest {
 
   private static void geoEnrichmentValidation(Map<String, Object> indexedDoc) {
     //should have geo enrichment on every message due to mock geo adapter
-    Assert.assertEquals(indexedDoc.get("enrichments.geo.dip.location_point"), MockGeoAdapter.DEFAULT_LOCATION_POINT);
-    Assert.assertEquals(indexedDoc.get("enrichments.geo.sip.location_point"), MockGeoAdapter.DEFAULT_LOCATION_POINT);
-    Assert.assertEquals(indexedDoc.get("enrichments.geo.dip.longitude"), MockGeoAdapter.DEFAULT_LONGITUDE);
-    Assert.assertEquals(indexedDoc.get("enrichments.geo.sip.longitude"), MockGeoAdapter.DEFAULT_LONGITUDE);
-    Assert.assertEquals(indexedDoc.get("enrichments.geo.dip.city"), MockGeoAdapter.DEFAULT_CITY);
-    Assert.assertEquals(indexedDoc.get("enrichments.geo.sip.city"), MockGeoAdapter.DEFAULT_CITY);
-    Assert.assertEquals(indexedDoc.get("enrichments.geo.dip.latitude"), MockGeoAdapter.DEFAULT_LATITUDE);
-    Assert.assertEquals(indexedDoc.get("enrichments.geo.sip.latitude"), MockGeoAdapter.DEFAULT_LATITUDE);
-    Assert.assertEquals(indexedDoc.get("enrichments.geo.dip.country"), MockGeoAdapter.DEFAULT_COUNTRY);
-    Assert.assertEquals(indexedDoc.get("enrichments.geo.sip.country"), MockGeoAdapter.DEFAULT_COUNTRY);
-    Assert.assertEquals(indexedDoc.get("enrichments.geo.dip.dmaCode"), MockGeoAdapter.DEFAULT_DMACODE);
-    Assert.assertEquals(indexedDoc.get("enrichments.geo.sip.dmaCode"), MockGeoAdapter.DEFAULT_DMACODE);
-    Assert.assertEquals(indexedDoc.get("enrichments.geo.dip.postalCode"), MockGeoAdapter.DEFAULT_POSTAL_CODE);
-    Assert.assertEquals(indexedDoc.get("enrichments.geo.sip.postalCode"), MockGeoAdapter.DEFAULT_POSTAL_CODE);
+    Assert.assertEquals(indexedDoc.get("enrichments.geo." + DST_IP + ".location_point"), MockGeoAdapter.DEFAULT_LOCATION_POINT);
+    Assert.assertEquals(indexedDoc.get("enrichments.geo." + SRC_IP +".location_point"), MockGeoAdapter.DEFAULT_LOCATION_POINT);
+    Assert.assertEquals(indexedDoc.get("enrichments.geo." + DST_IP + ".longitude"), MockGeoAdapter.DEFAULT_LONGITUDE);
+    Assert.assertEquals(indexedDoc.get("enrichments.geo." + SRC_IP + ".longitude"), MockGeoAdapter.DEFAULT_LONGITUDE);
+    Assert.assertEquals(indexedDoc.get("enrichments.geo." + DST_IP + ".city"), MockGeoAdapter.DEFAULT_CITY);
+    Assert.assertEquals(indexedDoc.get("enrichments.geo." + SRC_IP + ".city"), MockGeoAdapter.DEFAULT_CITY);
+    Assert.assertEquals(indexedDoc.get("enrichments.geo." + DST_IP + ".latitude"), MockGeoAdapter.DEFAULT_LATITUDE);
+    Assert.assertEquals(indexedDoc.get("enrichments.geo." + SRC_IP + ".latitude"), MockGeoAdapter.DEFAULT_LATITUDE);
+    Assert.assertEquals(indexedDoc.get("enrichments.geo." + DST_IP + ".country"), MockGeoAdapter.DEFAULT_COUNTRY);
+    Assert.assertEquals(indexedDoc.get("enrichments.geo." + SRC_IP + ".country"), MockGeoAdapter.DEFAULT_COUNTRY);
+    Assert.assertEquals(indexedDoc.get("enrichments.geo." + DST_IP + ".dmaCode"), MockGeoAdapter.DEFAULT_DMACODE);
+    Assert.assertEquals(indexedDoc.get("enrichments.geo." + SRC_IP + ".dmaCode"), MockGeoAdapter.DEFAULT_DMACODE);
+    Assert.assertEquals(indexedDoc.get("enrichments.geo." + DST_IP + ".postalCode"), MockGeoAdapter.DEFAULT_POSTAL_CODE);
+    Assert.assertEquals(indexedDoc.get("enrichments.geo." + SRC_IP + ".postalCode"), MockGeoAdapter.DEFAULT_POSTAL_CODE);
   }
 
   private static void hostEnrichmentValidation(Map<String, Object> indexedDoc) {
@@ -404,20 +405,20 @@ public class EnrichmentIntegrationTest {
     //important local printers
     {
       Set<String> ips = setOf("10.0.2.15", "10.60.10.254");
-      if (ips.contains(indexedDoc.get("sip"))) {
+      if (ips.contains(indexedDoc.get(SRC_IP))) {
         //this is a local, important, printer
         Assert.assertTrue(Predicates.and(HostEnrichments.LOCAL_LOCATION
                 ,HostEnrichments.IMPORTANT
                 ,HostEnrichments.PRINTER_TYPE
-                ).apply(new EvaluationPayload(indexedDoc, "sip"))
+                ).apply(new EvaluationPayload(indexedDoc, SRC_IP))
         );
         enriched = true;
       }
-      if (ips.contains(indexedDoc.get("dip"))) {
+      if (ips.contains(indexedDoc.get(DST_IP))) {
         Assert.assertTrue(Predicates.and(HostEnrichments.LOCAL_LOCATION
                 ,HostEnrichments.IMPORTANT
                 ,HostEnrichments.PRINTER_TYPE
-                ).apply(new EvaluationPayload(indexedDoc, "dip"))
+                ).apply(new EvaluationPayload(indexedDoc, DST_IP))
         );
         enriched = true;
       }
@@ -425,20 +426,20 @@ public class EnrichmentIntegrationTest {
     //important local webservers
     {
       Set<String> ips = setOf("10.1.128.236");
-      if (ips.contains(indexedDoc.get("sip"))) {
+      if (ips.contains(indexedDoc.get(SRC_IP))) {
         //this is a local, important, printer
         Assert.assertTrue(Predicates.and(HostEnrichments.LOCAL_LOCATION
                 ,HostEnrichments.IMPORTANT
                 ,HostEnrichments.WEBSERVER_TYPE
-                ).apply(new EvaluationPayload(indexedDoc, "sip"))
+                ).apply(new EvaluationPayload(indexedDoc, SRC_IP))
         );
         enriched = true;
       }
-      if (ips.contains(indexedDoc.get("dip"))) {
+      if (ips.contains(indexedDoc.get(DST_IP))) {
         Assert.assertTrue(Predicates.and(HostEnrichments.LOCAL_LOCATION
                 ,HostEnrichments.IMPORTANT
                 ,HostEnrichments.WEBSERVER_TYPE
-                ).apply(new EvaluationPayload(indexedDoc, "dip"))
+                ).apply(new EvaluationPayload(indexedDoc, DST_IP))
         );
         enriched = true;
       }

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/f32af016/metron-streaming/Metron-Topologies/src/test/java/org/apache/metron/integration/ParserIntegrationTest.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Topologies/src/test/java/org/apache/metron/integration/ParserIntegrationTest.java b/metron-streaming/Metron-Topologies/src/test/java/org/apache/metron/integration/ParserIntegrationTest.java
index 80688b7..3c6972a 100644
--- a/metron-streaming/Metron-Topologies/src/test/java/org/apache/metron/integration/ParserIntegrationTest.java
+++ b/metron-streaming/Metron-Topologies/src/test/java/org/apache/metron/integration/ParserIntegrationTest.java
@@ -83,6 +83,7 @@ public abstract class ParserIntegrationTest {
             .withComponent("kafka", kafkaComponent)
             .withComponent("storm", fluxComponent)
             .withMillisecondsBetweenAttempts(5000)
+            .withNumRetries(10)
             .build();
     runner.start();
     fluxComponent.submitTopology();

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/f32af016/metron-streaming/Metron-Topologies/src/test/java/org/apache/metron/integration/YafIntegrationTest.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Topologies/src/test/java/org/apache/metron/integration/YafIntegrationTest.java b/metron-streaming/Metron-Topologies/src/test/java/org/apache/metron/integration/YafIntegrationTest.java
index cf91bea..f255a0a 100644
--- a/metron-streaming/Metron-Topologies/src/test/java/org/apache/metron/integration/YafIntegrationTest.java
+++ b/metron-streaming/Metron-Topologies/src/test/java/org/apache/metron/integration/YafIntegrationTest.java
@@ -45,12 +45,12 @@ public class YafIntegrationTest extends ParserIntegrationTest {
             " \"batchSize\": 5," +
             " \"enrichmentFieldMap\":" +
             "  {" +
-            "    \"geo\": [\"sip\", \"dip\"]," +
-            "    \"host\": [\"sip\", \"dip\"]" +
+            "    \"geo\": [\"ip_src_addr\", \"ip_dst_addr\"]," +
+            "    \"host\": [\"ip_src_addr\", \"ip_dst_addr\"]" +
             "  }," +
             "  \"threatIntelFieldMap\":" +
             "  {" +
-            "    \"ip\": [\"sip\", \"dip\"]" +
+            "    \"ip\": [\"ip_src_addr\", \"ip_dst_addr\"]" +
             "  }" +
             "}";
   }