You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@metron.apache.org by ce...@apache.org on 2016/03/08 22:45:54 UTC

incubator-metron git commit: METRON-62 Add writing to enriched data back to the enrichment topology closes apache/incubator-metron#39

Repository: incubator-metron
Updated Branches:
  refs/heads/master 560f7abe7 -> d043f2453


METRON-62 Add writing to enriched data back to the enrichment topology closes apache/incubator-metron#39


Project: http://git-wip-us.apache.org/repos/asf/incubator-metron/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-metron/commit/d043f245
Tree: http://git-wip-us.apache.org/repos/asf/incubator-metron/tree/d043f245
Diff: http://git-wip-us.apache.org/repos/asf/incubator-metron/diff/d043f245

Branch: refs/heads/master
Commit: d043f24536d14fd1421d5b72af4a7dd92c69ec38
Parents: 560f7ab
Author: cestella <ce...@gmail.com>
Authored: Tue Mar 8 16:45:45 2016 -0500
Committer: cstella <ce...@gmail.com>
Committed: Tue Mar 8 16:45:45 2016 -0500

----------------------------------------------------------------------
 .../roles/metron_streaming/tasks/main.yml       |  13 ++
 .../metron_streaming/tasks/threat_intel.yml     |   2 -
 deployment/roles/metron_streaming/vars/main.yml |   2 +
 .../metron/bolt/BulkMessageWriterBolt.java      |   3 +-
 .../java/org/apache/metron/utils/JSONUtils.java |   8 +
 .../writer/interfaces/BulkMessageWriter.java    |   3 +-
 .../src/main/bash/latency_summarizer.sh         |  32 ++++
 .../src/main/bash/start_topology.sh             |  22 +++
 .../src/main/bash/zk_load_configs.sh            |  33 ++++
 .../enrichment/bolt/EnrichmentSplitterBolt.java |   4 +-
 metron-streaming/Metron-Indexing/pom.xml        |  16 +-
 .../metron/writer/ElasticsearchWriter.java      |   2 +-
 .../org/apache/metron/writer/HdfsWriter.java    |  44 -----
 .../apache/metron/writer/hdfs/HdfsWriter.java   |  94 ++++++++++
 .../writer/hdfs/SourceAwareMoveAction.java      |  48 +++++
 .../writer/hdfs/SourceFileNameFormat.java       |  48 +++++
 .../metron/writer/hdfs/SourceHandler.java       | 160 ++++++++++++++++
 .../apache/metron/utils/LatencySummarizer.java  | 188 +++++++++++++++++++
 .../apache/metron/utils/SourceConfigUtils.java  |  61 ++++--
 .../Metron_Configs/etc/env/config.properties    |   2 +-
 .../topologies/enrichment/remote.yaml           |  33 ++++
 .../topologies/enrichment/test.yaml             |  34 ++++
 .../integration/EnrichmentIntegrationTest.java  | 153 +++++++++++----
 23 files changed, 895 insertions(+), 110 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/d043f245/deployment/roles/metron_streaming/tasks/main.yml
----------------------------------------------------------------------
diff --git a/deployment/roles/metron_streaming/tasks/main.yml b/deployment/roles/metron_streaming/tasks/main.yml
index ad1e081..b1b8734 100644
--- a/deployment/roles/metron_streaming/tasks/main.yml
+++ b/deployment/roles/metron_streaming/tasks/main.yml
@@ -44,6 +44,12 @@
 - name: Add hbase-site.xml to topology jar
   shell: cd {{ hbase_config_path }} && jar -uf {{ metron_directory }}/lib/{{ metron_jar_name }} hbase-site.xml
 
+- name: Add core-site.xml to topology jar
+  shell: cd {{ hdfs_config_path }} && jar -uf {{ metron_directory }}/lib/{{ metron_jar_name }} core-site.xml
+
+- name: Add hdfs-site.xml to topology jar
+  shell: cd {{ hdfs_config_path }} && jar -uf {{ metron_directory }}/lib/{{ metron_jar_name }} hdfs-site.xml
+
 - name: Copy Metron topology config files
   copy:
     src: "{{ metron_src_config_path }}/{{ item }}"
@@ -56,6 +62,12 @@
   include_vars: "../roles/mysql_server/vars/main.yml"
   when: mysql_root_password is undefined
 
+- name: Create root user HDFS directory
+  command: su - hdfs -c "hdfs dfs -mkdir -p /user/root && hdfs dfs -chown root:root /user/root"
+
+- name: Create Metron HDFS output directory
+  command: su - hdfs -c "hdfs dfs -mkdir -p {{ metron_hdfs_output_dir }} && hdfs dfs -chown hdfs:hadoop {{ metron_hdfs_output_dir }} && hdfs dfs -chmod 775 {{ metron_hdfs_output_dir }}"
+
 - name: Configure Metron topologies
   lineinfile: >
     dest={{ metron_properties_config_path }}
@@ -77,6 +89,7 @@
     - { regexp: "threat.intel.ip.cf=", line: "threat.intel.ip.cf=t" }
     - { regexp: "mysql.ip=", line: "mysql.ip={{ groups.mysql[0] }}" }
     - { regexp: "mysql.password=", line: "mysql.password={{ mysql_root_password }}" }
+    - { regexp: "index.hdfs.output=", line: "index.hdfs.output={{ metron_hdfs_output_dir }}/enrichment/indexed" }
 
 - name: Add Elasticsearch templates for topologies
   uri:

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/d043f245/deployment/roles/metron_streaming/tasks/threat_intel.yml
----------------------------------------------------------------------
diff --git a/deployment/roles/metron_streaming/tasks/threat_intel.yml b/deployment/roles/metron_streaming/tasks/threat_intel.yml
index 0439e46..6798319 100644
--- a/deployment/roles/metron_streaming/tasks/threat_intel.yml
+++ b/deployment/roles/metron_streaming/tasks/threat_intel.yml
@@ -15,8 +15,6 @@
 #  limitations under the License.
 #
 ---
-- name: Create root user HDFS directory
-  command: su - hdfs -c "hdfs dfs -mkdir -p /user/root && hdfs dfs -chown root:root /user/root"
 
 - name: Create Bulk load working Directory
   file:

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/d043f245/deployment/roles/metron_streaming/vars/main.yml
----------------------------------------------------------------------
diff --git a/deployment/roles/metron_streaming/vars/main.yml b/deployment/roles/metron_streaming/vars/main.yml
index b93a70f..83379c6 100644
--- a/deployment/roles/metron_streaming/vars/main.yml
+++ b/deployment/roles/metron_streaming/vars/main.yml
@@ -26,3 +26,5 @@ elasticsearch_config_path: /etc/elasticsearch
 elasticsearch_cluster_name: metron
 elasticsearch_transport_port: 9300
 hbase_config_path: "/etc/hbase/conf"
+hdfs_config_path: "/etc/hadoop/conf"
+metron_hdfs_output_dir: "/apps/metron"

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/d043f245/metron-streaming/Metron-Common/src/main/java/org/apache/metron/bolt/BulkMessageWriterBolt.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Common/src/main/java/org/apache/metron/bolt/BulkMessageWriterBolt.java b/metron-streaming/Metron-Common/src/main/java/org/apache/metron/bolt/BulkMessageWriterBolt.java
index a8fda69..d827536 100644
--- a/metron-streaming/Metron-Common/src/main/java/org/apache/metron/bolt/BulkMessageWriterBolt.java
+++ b/metron-streaming/Metron-Common/src/main/java/org/apache/metron/bolt/BulkMessageWriterBolt.java
@@ -45,7 +45,6 @@ public class BulkMessageWriterBolt extends ConfiguredBolt {
   private Map<String, List<Tuple>> sourceTupleMap = new HashMap<>();
   private Map<String, List<JSONObject>> sourceMessageMap = new HashMap<>();
 
-
   public BulkMessageWriterBolt(String zookeeperUrl) {
     super(zookeeperUrl);
   }
@@ -59,7 +58,7 @@ public class BulkMessageWriterBolt extends ConfiguredBolt {
   public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
     this.collector = collector;
     super.prepare(stormConf, context, collector);
-    bulkMessageWriter.init();
+    bulkMessageWriter.init(stormConf);
   }
 
   @SuppressWarnings("unchecked")

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/d043f245/metron-streaming/Metron-Common/src/main/java/org/apache/metron/utils/JSONUtils.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Common/src/main/java/org/apache/metron/utils/JSONUtils.java b/metron-streaming/Metron-Common/src/main/java/org/apache/metron/utils/JSONUtils.java
index d704908..93b0a58 100644
--- a/metron-streaming/Metron-Common/src/main/java/org/apache/metron/utils/JSONUtils.java
+++ b/metron-streaming/Metron-Common/src/main/java/org/apache/metron/utils/JSONUtils.java
@@ -19,6 +19,7 @@
 package org.apache.metron.utils;
 
 import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.core.type.TypeReference;
 import com.fasterxml.jackson.databind.ObjectMapper;
 
 import java.io.IOException;
@@ -51,6 +52,13 @@ public enum JSONUtils {
     }
   };
 
+  public <T> T load(InputStream is, TypeReference<T> ref) throws IOException {
+    return _mapper.get().readValue(is, ref);
+  }
+  public <T> T load(String is, TypeReference<T> ref) throws IOException {
+    return _mapper.get().readValue(is, ref);
+  }
+
   public <T> T load(InputStream is, Class<T> clazz) throws IOException {
     return _mapper.get().readValue(is, clazz);
   }

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/d043f245/metron-streaming/Metron-Common/src/main/java/org/apache/metron/writer/interfaces/BulkMessageWriter.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Common/src/main/java/org/apache/metron/writer/interfaces/BulkMessageWriter.java b/metron-streaming/Metron-Common/src/main/java/org/apache/metron/writer/interfaces/BulkMessageWriter.java
index 90c0261..9b627e6 100644
--- a/metron-streaming/Metron-Common/src/main/java/org/apache/metron/writer/interfaces/BulkMessageWriter.java
+++ b/metron-streaming/Metron-Common/src/main/java/org/apache/metron/writer/interfaces/BulkMessageWriter.java
@@ -21,10 +21,11 @@ import backtype.storm.tuple.Tuple;
 import org.apache.metron.domain.SourceConfig;
 
 import java.util.List;
+import java.util.Map;
 
 public interface BulkMessageWriter<T> extends AutoCloseable {
 
-  void init();
+  void init(Map stormConf);
   void write(String sourceType, SourceConfig configuration, List<Tuple> tuples, List<T> messages) throws Exception;
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/d043f245/metron-streaming/Metron-DataLoads/src/main/bash/latency_summarizer.sh
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-DataLoads/src/main/bash/latency_summarizer.sh b/metron-streaming/Metron-DataLoads/src/main/bash/latency_summarizer.sh
new file mode 100755
index 0000000..335c3fd
--- /dev/null
+++ b/metron-streaming/Metron-DataLoads/src/main/bash/latency_summarizer.sh
@@ -0,0 +1,32 @@
+#!/bin/bash
+# 
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+# 
+#     http://www.apache.org/licenses/LICENSE-2.0
+# 
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+BIGTOP_DEFAULTS_DIR=${BIGTOP_DEFAULTS_DIR-/etc/default}
+[ -n "${BIGTOP_DEFAULTS_DIR}" -a -r ${BIGTOP_DEFAULTS_DIR}/hbase ] && . ${BIGTOP_DEFAULTS_DIR}/hbase
+
+# Autodetect JAVA_HOME if not defined
+if [ -e /usr/libexec/bigtop-detect-javahome ]; then
+  . /usr/libexec/bigtop-detect-javahome
+elif [ -e /usr/lib/bigtop-utils/bigtop-detect-javahome ]; then
+  . /usr/lib/bigtop-utils/bigtop-detect-javahome
+fi
+export METRON_VERSION=0.1BETA
+export METRON_HOME=/usr/metron/$METRON_VERSION
+export TOPOLOGIES_JAR=Metron-Topologies-$METRON_VERSION.jar
+java -cp $METRON_HOME/lib/$TOPOLOGIES_JAR org.apache.metron.utils.LatencySummarizer "$@"

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/d043f245/metron-streaming/Metron-DataLoads/src/main/bash/start_topology.sh
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-DataLoads/src/main/bash/start_topology.sh b/metron-streaming/Metron-DataLoads/src/main/bash/start_topology.sh
new file mode 100755
index 0000000..21626c2
--- /dev/null
+++ b/metron-streaming/Metron-DataLoads/src/main/bash/start_topology.sh
@@ -0,0 +1,22 @@
+#!/bin/bash
+# 
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+# 
+#     http://www.apache.org/licenses/LICENSE-2.0
+# 
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+METRON_VERSION=0.1BETA
+METRON_HOME=/usr/metron/$METRON_VERSION
+TOPOLOGY_JAR=Metron-Topologies-$METRON_VERSION.jar
+storm jar $METRON_HOME/lib/$TOPOLOGY_JAR org.apache.storm.flux.Flux --remote $METRON_HOME/config/topologies/$1/remote.yaml --filter $METRON_HOME/config/etc/env/config.properties

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/d043f245/metron-streaming/Metron-DataLoads/src/main/bash/zk_load_configs.sh
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-DataLoads/src/main/bash/zk_load_configs.sh b/metron-streaming/Metron-DataLoads/src/main/bash/zk_load_configs.sh
new file mode 100755
index 0000000..4273b7d
--- /dev/null
+++ b/metron-streaming/Metron-DataLoads/src/main/bash/zk_load_configs.sh
@@ -0,0 +1,33 @@
+#!/bin/bash
+# 
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+# 
+#     http://www.apache.org/licenses/LICENSE-2.0
+# 
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+BIGTOP_DEFAULTS_DIR=${BIGTOP_DEFAULTS_DIR-/etc/default}
+[ -n "${BIGTOP_DEFAULTS_DIR}" -a -r ${BIGTOP_DEFAULTS_DIR}/hbase ] && . ${BIGTOP_DEFAULTS_DIR}/hbase
+
+# Autodetect JAVA_HOME if not defined
+if [ -e /usr/libexec/bigtop-detect-javahome ]; then
+  . /usr/libexec/bigtop-detect-javahome
+elif [ -e /usr/lib/bigtop-utils/bigtop-detect-javahome ]; then
+  . /usr/lib/bigtop-utils/bigtop-detect-javahome
+fi
+export METRON_VERSION=0.1BETA
+export METRON_HOME=/usr/metron/$METRON_VERSION
+export TOPOLOGIES_JAR=Metron-Topologies-$METRON_VERSION.jar
+export ZK_HOME=${ZK_HOME:-/usr/hdp/current/hbase-client}
+java -cp $METRON_HOME/lib/$TOPOLOGIES_JAR org.apache.metron.utils.SourceConfigUtils "$@"

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/d043f245/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/bolt/EnrichmentSplitterBolt.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/bolt/EnrichmentSplitterBolt.java b/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/bolt/EnrichmentSplitterBolt.java
index 51508d8..7970674 100644
--- a/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/bolt/EnrichmentSplitterBolt.java
+++ b/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/bolt/EnrichmentSplitterBolt.java
@@ -82,12 +82,13 @@ public class EnrichmentSplitterBolt extends SplitBolt<JSONObject> {
             byte[] data = tuple.getBinary(0);
             try {
                 message = (JSONObject) parser.parse(new String(data, "UTF8"));
+                message.put(getClass().getSimpleName().toLowerCase() + ".splitter.begin.ts", "" + System.currentTimeMillis());
             } catch (ParseException | UnsupportedEncodingException e) {
                 e.printStackTrace();
             }
         } else {
             message = (JSONObject) tuple.getValueByField(messageFieldName);
-            message.put(getClass().getSimpleName().toLowerCase() + ".splitter.ts", "" + System.currentTimeMillis());
+            message.put(getClass().getSimpleName().toLowerCase() + ".splitter.begin.ts", "" + System.currentTimeMillis());
         }
         return message;
     }
@@ -119,6 +120,7 @@ public class EnrichmentSplitterBolt extends SplitBolt<JSONObject> {
                 streamMessageMap.put(enrichmentType, enrichmentObject);
             }
         }
+        message.put(getClass().getSimpleName().toLowerCase() + ".splitter.end.ts", "" + System.currentTimeMillis());
         return streamMessageMap;
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/d043f245/metron-streaming/Metron-Indexing/pom.xml
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Indexing/pom.xml b/metron-streaming/Metron-Indexing/pom.xml
index e2a1037..1f5d04d 100644
--- a/metron-streaming/Metron-Indexing/pom.xml
+++ b/metron-streaming/Metron-Indexing/pom.xml
@@ -92,7 +92,21 @@
             <version>1.9</version>
             <scope>provided</scope>
         </dependency>
-
+        <dependency>
+            <groupId>org.apache.storm</groupId>
+            <artifactId>storm-hdfs</artifactId>
+            <version>${global_storm_version}</version>
+            <exclusions>
+                <exclusion>
+                    <groupId>org.apache.storm</groupId>
+                    <artifactId>storm-core</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.apache.hadoop</groupId>
+                    <artifactId>hadoop-client</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
     </dependencies>
     <reporting>
         <plugins>

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/d043f245/metron-streaming/Metron-Indexing/src/main/java/org/apache/metron/writer/ElasticsearchWriter.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Indexing/src/main/java/org/apache/metron/writer/ElasticsearchWriter.java b/metron-streaming/Metron-Indexing/src/main/java/org/apache/metron/writer/ElasticsearchWriter.java
index 2769efe..e8d654d 100644
--- a/metron-streaming/Metron-Indexing/src/main/java/org/apache/metron/writer/ElasticsearchWriter.java
+++ b/metron-streaming/Metron-Indexing/src/main/java/org/apache/metron/writer/ElasticsearchWriter.java
@@ -60,7 +60,7 @@ public class ElasticsearchWriter implements BulkMessageWriter<JSONObject>, Seria
   }
 
   @Override
-  public void init() {
+  public void init(Map stormConf) {
     ImmutableSettings.Builder builder = ImmutableSettings.settingsBuilder();
     builder.put("cluster.name", clusterName);
     builder.put("client.transport.ping_timeout","500s");

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/d043f245/metron-streaming/Metron-Indexing/src/main/java/org/apache/metron/writer/HdfsWriter.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Indexing/src/main/java/org/apache/metron/writer/HdfsWriter.java b/metron-streaming/Metron-Indexing/src/main/java/org/apache/metron/writer/HdfsWriter.java
deleted file mode 100644
index eace952..0000000
--- a/metron-streaming/Metron-Indexing/src/main/java/org/apache/metron/writer/HdfsWriter.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.metron.writer;
-
-import backtype.storm.tuple.Tuple;
-import org.apache.metron.domain.SourceConfig;
-import org.apache.metron.writer.interfaces.BulkMessageWriter;
-import org.json.simple.JSONObject;
-
-import java.io.Serializable;
-import java.util.List;
-
-public class HdfsWriter implements BulkMessageWriter<JSONObject>, Serializable {
-
-  @Override
-  public void init() {
-
-  }
-
-  @Override
-  public void write(String sourceType, SourceConfig configuration, List<Tuple> tuples, List<JSONObject> messages) throws Exception {
-
-  }
-
-  @Override
-  public void close() {
-
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/d043f245/metron-streaming/Metron-Indexing/src/main/java/org/apache/metron/writer/hdfs/HdfsWriter.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Indexing/src/main/java/org/apache/metron/writer/hdfs/HdfsWriter.java b/metron-streaming/Metron-Indexing/src/main/java/org/apache/metron/writer/hdfs/HdfsWriter.java
new file mode 100644
index 0000000..d2cc827
--- /dev/null
+++ b/metron-streaming/Metron-Indexing/src/main/java/org/apache/metron/writer/hdfs/HdfsWriter.java
@@ -0,0 +1,94 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.writer.hdfs;
+
+import backtype.storm.tuple.Tuple;
+import org.apache.metron.domain.SourceConfig;
+import org.apache.metron.writer.interfaces.BulkMessageWriter;
+import org.apache.storm.hdfs.bolt.format.FileNameFormat;
+import org.apache.storm.hdfs.bolt.rotation.FileRotationPolicy;
+import org.apache.storm.hdfs.bolt.rotation.NoRotationPolicy;
+import org.apache.storm.hdfs.bolt.sync.CountSyncPolicy;
+import org.apache.storm.hdfs.bolt.sync.SyncPolicy;
+import org.apache.storm.hdfs.common.rotation.RotationAction;
+import org.json.simple.JSONObject;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class HdfsWriter implements BulkMessageWriter<JSONObject>, Serializable {
+  List<RotationAction> rotationActions = new ArrayList<>();
+  FileRotationPolicy rotationPolicy = new NoRotationPolicy();
+  SyncPolicy syncPolicy = new CountSyncPolicy(1); //sync every time, duh.
+  FileNameFormat fileNameFormat;
+  Map<String, SourceHandler> sourceHandlerMap = new HashMap<>();
+  transient Map stormConfig;
+  public HdfsWriter withFileNameFormat(FileNameFormat fileNameFormat){
+    this.fileNameFormat = fileNameFormat;
+    return this;
+  }
+
+  public HdfsWriter withSyncPolicy(SyncPolicy syncPolicy){
+    this.syncPolicy = syncPolicy;
+    return this;
+  }
+  public HdfsWriter withRotationPolicy(FileRotationPolicy rotationPolicy){
+    this.rotationPolicy = rotationPolicy;
+    return this;
+  }
+
+  public HdfsWriter addRotationAction(RotationAction action){
+    this.rotationActions.add(action);
+    return this;
+  }
+
+  @Override
+  public void init(Map stormConfig) {
+    this.stormConfig = stormConfig;
+  }
+
+  @Override
+  public void write( String sourceType
+                   , SourceConfig configuration
+                   , List<Tuple> tuples
+                   , List<JSONObject> messages
+                   ) throws Exception
+  {
+    SourceHandler handler = getSourceHandler(sourceType);
+    handler.handle(messages);
+  }
+
+  @Override
+  public void close() {
+    for(SourceHandler handler : sourceHandlerMap.values()) {
+      handler.close();
+    }
+  }
+  private synchronized SourceHandler getSourceHandler(String sourceType) throws IOException {
+    SourceHandler ret = sourceHandlerMap.get(sourceType);
+    if(ret == null) {
+      ret = new SourceHandler(rotationActions, rotationPolicy, syncPolicy, new SourceFileNameFormat(sourceType, fileNameFormat), stormConfig);
+      sourceHandlerMap.put(sourceType, ret);
+    }
+    return ret;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/d043f245/metron-streaming/Metron-Indexing/src/main/java/org/apache/metron/writer/hdfs/SourceAwareMoveAction.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Indexing/src/main/java/org/apache/metron/writer/hdfs/SourceAwareMoveAction.java b/metron-streaming/Metron-Indexing/src/main/java/org/apache/metron/writer/hdfs/SourceAwareMoveAction.java
new file mode 100644
index 0000000..1c345b4
--- /dev/null
+++ b/metron-streaming/Metron-Indexing/src/main/java/org/apache/metron/writer/hdfs/SourceAwareMoveAction.java
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.metron.writer.hdfs;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.log4j.Logger;
+import org.apache.storm.hdfs.common.rotation.RotationAction;
+
+import java.io.IOException;
+
+public class SourceAwareMoveAction implements RotationAction{
+  private static final Logger LOG = Logger.getLogger(SourceHandler.class);
+  private String destination;
+
+  public SourceAwareMoveAction toDestination(String destDir){
+    destination = destDir;
+    return this;
+  }
+
+  private static String getSource(Path filePath) {
+    return filePath.getParent().getName();
+  }
+
+  @Override
+  public void execute(FileSystem fileSystem, Path filePath) throws IOException {
+    Path destPath = new Path(new Path(destination, getSource(filePath)), filePath.getName());
+    LOG.info("Moving file " + filePath + " to " + destPath);
+    boolean success = fileSystem.rename(filePath, destPath);
+    return;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/d043f245/metron-streaming/Metron-Indexing/src/main/java/org/apache/metron/writer/hdfs/SourceFileNameFormat.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Indexing/src/main/java/org/apache/metron/writer/hdfs/SourceFileNameFormat.java b/metron-streaming/Metron-Indexing/src/main/java/org/apache/metron/writer/hdfs/SourceFileNameFormat.java
new file mode 100644
index 0000000..ae0242d
--- /dev/null
+++ b/metron-streaming/Metron-Indexing/src/main/java/org/apache/metron/writer/hdfs/SourceFileNameFormat.java
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.metron.writer.hdfs;
+
+import backtype.storm.task.TopologyContext;
+import org.apache.storm.hdfs.bolt.format.FileNameFormat;
+
+import java.util.Map;
+
+public class SourceFileNameFormat implements FileNameFormat {
+  FileNameFormat delegate;
+  String sourceType;
+  public SourceFileNameFormat(String sourceType, FileNameFormat delegate) {
+    this.delegate = delegate;
+    this.sourceType = sourceType;
+  }
+
+  @Override
+  public void prepare(Map map, TopologyContext topologyContext) {
+    this.delegate.prepare(map, topologyContext);
+  }
+
+  @Override
+  public String getName(long l, long l1) {
+    return delegate.getName(l, l1);
+  }
+
+  @Override
+  public String getPath() {
+    return delegate.getPath() + "/" + sourceType;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/d043f245/metron-streaming/Metron-Indexing/src/main/java/org/apache/metron/writer/hdfs/SourceHandler.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Indexing/src/main/java/org/apache/metron/writer/hdfs/SourceHandler.java b/metron-streaming/Metron-Indexing/src/main/java/org/apache/metron/writer/hdfs/SourceHandler.java
new file mode 100644
index 0000000..0225137
--- /dev/null
+++ b/metron-streaming/Metron-Indexing/src/main/java/org/apache/metron/writer/hdfs/SourceHandler.java
@@ -0,0 +1,160 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.metron.writer.hdfs;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocalFileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.client.HdfsDataOutputStream;
+import org.apache.hadoop.hdfs.util.MD5FileUtils;
+import org.apache.hadoop.io.MD5Hash;
+import org.apache.log4j.Logger;
+import org.apache.storm.hdfs.bolt.format.FileNameFormat;
+import org.apache.storm.hdfs.bolt.rotation.FileRotationPolicy;
+import org.apache.storm.hdfs.bolt.rotation.TimedRotationPolicy;
+import org.apache.storm.hdfs.bolt.sync.SyncPolicy;
+import org.apache.storm.hdfs.common.rotation.RotationAction;
+import org.apache.storm.hdfs.common.security.HdfsSecurityUtil;
+import org.json.simple.JSONObject;
+
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.util.*;
+
+public class SourceHandler {
+  private static final Logger LOG = Logger.getLogger(SourceHandler.class);
+  List<RotationAction> rotationActions = new ArrayList<>();
+  FileRotationPolicy rotationPolicy;
+  SyncPolicy syncPolicy;
+  FileNameFormat fileNameFormat;
+  private long offset = 0;
+  private int rotation = 0;
+  private transient FSDataOutputStream out;
+  private transient Object writeLock;
+  protected transient Timer rotationTimer; // only used for TimedRotationPolicy
+  protected transient FileSystem fs;
+  protected transient Path currentFile;
+  public SourceHandler(List<RotationAction> rotationActions
+                      , FileRotationPolicy rotationPolicy
+                      , SyncPolicy syncPolicy
+                      , FileNameFormat fileNameFormat
+                      , Map config
+                      ) throws IOException {
+    this.rotationActions = rotationActions;
+    this.rotationPolicy = rotationPolicy;
+    this.syncPolicy = syncPolicy;
+    this.fileNameFormat = fileNameFormat;
+    initialize(config);
+  }
+
+  public void handle(List<JSONObject> messages) throws Exception{
+
+    for(JSONObject message : messages) {
+      byte[] bytes = (message.toJSONString() + "\n").getBytes();
+      synchronized (this.writeLock) {
+        out.write(bytes);
+        this.offset += bytes.length;
+
+        if (this.syncPolicy.mark(null, this.offset)) {
+          if (this.out instanceof HdfsDataOutputStream) {
+            ((HdfsDataOutputStream) this.out).hsync(EnumSet.of(HdfsDataOutputStream.SyncFlag.UPDATE_LENGTH));
+          } else {
+            this.out.hsync();
+          }
+          this.syncPolicy.reset();
+        }
+      }
+
+      if (this.rotationPolicy.mark(null, this.offset)) {
+        rotateOutputFile(); // synchronized
+        this.offset = 0;
+        this.rotationPolicy.reset();
+      }
+    }
+  }
+
+  private void initialize(Map config) throws IOException {
+    this.writeLock = new Object();
+    Configuration hdfsConfig = new Configuration();
+    this.fs = FileSystem.get(new Configuration());
+    HdfsSecurityUtil.login(config, hdfsConfig);
+    this.currentFile = createOutputFile();
+    if(this.rotationPolicy instanceof TimedRotationPolicy){
+      long interval = ((TimedRotationPolicy)this.rotationPolicy).getInterval();
+      this.rotationTimer = new Timer(true);
+      TimerTask task = new TimerTask() {
+        @Override
+        public void run() {
+          try {
+            rotateOutputFile();
+          } catch(IOException e){
+            LOG.warn("IOException during scheduled file rotation.", e);
+          }
+        }
+      };
+      this.rotationTimer.scheduleAtFixedRate(task, interval, interval);
+    }
+  }
+
+  protected void rotateOutputFile() throws IOException {
+    LOG.info("Rotating output file...");
+    long start = System.currentTimeMillis();
+    synchronized (this.writeLock) {
+      closeOutputFile();
+      this.rotation++;
+
+      Path newFile = createOutputFile();
+      LOG.info("Performing " +  this.rotationActions.size() + " file rotation actions." );
+      for (RotationAction action : this.rotationActions) {
+        action.execute(this.fs, this.currentFile);
+      }
+      this.currentFile = newFile;
+    }
+    long time = System.currentTimeMillis() - start;
+    LOG.info("File rotation took " + time + " ms.");
+  }
+
+  private Path createOutputFile() throws IOException {
+    Path path = new Path(this.fileNameFormat.getPath(), this.fileNameFormat.getName(this.rotation, System.currentTimeMillis()));
+    if(fs.getScheme().equals("file")) {
+      //in the situation where we're running this in a local filesystem, flushing doesn't work.
+      fs.mkdirs(path.getParent());
+      this.out = new FSDataOutputStream(new FileOutputStream(path.toString()), null);
+    }
+    else {
+      this.out = this.fs.create(path);
+    }
+    return path;
+  }
+
+  private void closeOutputFile() throws IOException {
+    this.out.close();
+  }
+
+
+  public void close() {
+    try {
+      closeOutputFile();
+    } catch (IOException e) {
+      throw new RuntimeException("Unable to close output file.", e);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/d043f245/metron-streaming/Metron-Topologies/src/main/java/org/apache/metron/utils/LatencySummarizer.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Topologies/src/main/java/org/apache/metron/utils/LatencySummarizer.java b/metron-streaming/Metron-Topologies/src/main/java/org/apache/metron/utils/LatencySummarizer.java
new file mode 100644
index 0000000..3066334
--- /dev/null
+++ b/metron-streaming/Metron-Topologies/src/main/java/org/apache/metron/utils/LatencySummarizer.java
@@ -0,0 +1,188 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.utils;
+
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.google.common.base.Joiner;
+import com.google.common.base.Splitter;
+import com.google.common.collect.Iterables;
+import org.apache.commons.cli.*;
+import org.apache.commons.math3.stat.descriptive.DescriptiveStatistics;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.PrintStream;
+import java.util.*;
+
+public class LatencySummarizer {
+  public static class Pair extends AbstractMap.SimpleEntry<String, String> {
+    public Pair(String key, String value) {
+      super(key, value);
+    }
+  }
+
+  public static class LatencyStats {
+    private NavigableMap<Integer, Map<Pair, DescriptiveStatistics>> depthMap = new TreeMap<>();
+    private List<String> metrics;
+    public void updateMetrics(List<String> metrics) {
+      this.metrics = metrics;
+    }
+    public Map<Pair, DescriptiveStatistics> getStatsMap(int depth) {
+      Map<Pair, DescriptiveStatistics> statsMap = depthMap.get(depth);
+      if(statsMap == null) {
+        statsMap = new HashMap<>();
+        depthMap.put(depth, statsMap);
+      }
+      return statsMap;
+    }
+    public DescriptiveStatistics getStats( int depth, Pair p) {
+      Map<Pair, DescriptiveStatistics> statsMap = getStatsMap(depth);
+      DescriptiveStatistics stats = statsMap.get(p);
+      if(stats == null) {
+        stats = new DescriptiveStatistics();
+        statsMap.put(p, stats);
+      }
+      return stats;
+    }
+    public void put(int depth, Pair p, double val) {
+      getStats(depth, p).addValue(val);
+    }
+
+    public static void summary(String title, DescriptiveStatistics statistics, PrintStream pw, boolean meanOnly) {
+      if(meanOnly) {
+        pw.println(title + ": "
+                + "\n\tMean: " + statistics.getMean()
+        );
+      }
+      else {
+        pw.println(title + ": "
+                + "\n\tMean: " + statistics.getMean()
+                + "\n\tMin: " + statistics.getMin()
+                + "\n\t1th: " + statistics.getPercentile(1)
+                + "\n\t5th: " + statistics.getPercentile(5)
+                + "\n\t10th: " + statistics.getPercentile(10)
+                + "\n\t25th: " + statistics.getPercentile(25)
+                + "\n\t50th: " + statistics.getPercentile(50)
+                + "\n\t90th: " + statistics.getPercentile(90)
+                + "\n\t95th: " + statistics.getPercentile(95)
+                + "\n\t99th: " + statistics.getPercentile(99)
+                + "\n\tMax: " + statistics.getMax()
+                + "\n\tStdDev: " + statistics.getStandardDeviation()
+        );
+      }
+    }
+    public void printDepthSummary(int depth, boolean meanOnly) {
+      Map<Pair, DescriptiveStatistics> statsMap = depthMap.get(depth);
+      System.out.println("\nDistance " + depth);
+      System.out.println("----------------\n");
+      List<Map.Entry<Pair, DescriptiveStatistics>> sortedStats = new ArrayList<>();
+      for(Map.Entry<Pair, DescriptiveStatistics> stats : statsMap.entrySet()) {
+        sortedStats.add(stats);
+      }
+      Collections.sort(sortedStats, new Comparator<Map.Entry<Pair, DescriptiveStatistics>>() {
+        @Override
+        public int compare(Map.Entry<Pair, DescriptiveStatistics> o1, Map.Entry<Pair, DescriptiveStatistics> o2) {
+          return -1*Double.compare(o1.getValue().getMean(), o2.getValue().getMean());
+        }
+      });
+      for(Map.Entry<Pair, DescriptiveStatistics> stats : sortedStats) {
+        summary(stats.getKey().getKey() + " -> " + stats.getKey().getValue(), stats.getValue(), System.out, meanOnly);
+      }
+    }
+    public void printSummary(boolean meanOnly) {
+      System.out.println("Flow:");
+      System.out.println("\t" + Joiner.on(" -> ").join(metrics));
+      System.out.println("\nSUMMARY BY DISTANCE\n--------------------------");
+      for(int depth : depthMap.keySet()) {
+        printDepthSummary(depth, meanOnly);
+      }
+    }
+
+  }
+
+  public static String getBaseMetric(String s) {
+    Iterable<String> tokenIt = Splitter.on('.').split(s);
+    int num = Iterables.size(tokenIt);
+    return Joiner.on('.').join(Iterables.limit(tokenIt, num-1));
+  }
+
+  public static void updateStats(LatencyStats stats, Map<String, Object> doc) {
+    Map<String, Long> latencyMap = new HashMap<>();
+    NavigableMap<Long, String> latencyInvMap = new TreeMap<>();
+    for(Map.Entry<String, Object> kv : doc.entrySet()) {
+      if(kv.getKey().endsWith(".ts")) {
+        String base = getBaseMetric(kv.getKey());
+        long latency = Long.parseLong(kv.getValue().toString());
+        latencyInvMap.put(latency, base);
+        latencyMap.put( base, latency);
+      }
+    }
+    List<String> metrics = new ArrayList<>();
+    for(Map.Entry<Long, String> kv : latencyInvMap.entrySet()) {
+      metrics.add(kv.getValue());
+    }
+    stats.updateMetrics(metrics);
+    for(int i = 0;i < metrics.size();++i) {
+      for(int j = i+1;j < metrics.size();++j) {
+        Pair p = new Pair(metrics.get(i), metrics.get(j));
+        long ms = latencyMap.get(metrics.get(j)) - latencyMap.get(metrics.get(i));
+        stats.put(j-i, p, ms);
+      }
+    }
+  }
+
+
+
+  public static void main(String... argv) throws IOException {
+    Options options = new Options();
+    {
+      Option o = new Option("h", "help", false, "This screen");
+      o.setRequired(false);
+      options.addOption(o);
+    }
+    {
+      Option o = new Option("m", "mean_only", false, "Print the mean only when we summarize");
+      o.setRequired(false);
+      options.addOption(o);
+    }
+    CommandLineParser parser = new PosixParser();
+    CommandLine cmd = null;
+    try {
+      cmd = parser.parse(options, argv);
+    }
+    catch(ParseException pe) {
+      pe.printStackTrace();
+      final HelpFormatter usageFormatter = new HelpFormatter();
+      usageFormatter.printHelp(LatencySummarizer.class.getSimpleName().toLowerCase(), null, options, null, true);
+      System.exit(-1);
+    }
+    if( cmd.hasOption("h") ){
+      final HelpFormatter usageFormatter = new HelpFormatter();
+      usageFormatter.printHelp(LatencySummarizer.class.getSimpleName().toLowerCase(), null, options, null, true);
+      System.exit(0);
+    }
+    LatencyStats statsMap = new LatencyStats();
+    BufferedReader reader = new BufferedReader(new InputStreamReader(System.in));
+    for(String line = null;(line = reader.readLine()) != null;) {
+      Map<String, Object> doc = JSONUtils.INSTANCE.load(line, new TypeReference<HashMap<String, Object>>() {});
+      updateStats(statsMap, doc);
+    }
+    statsMap.printSummary(cmd.hasOption('m'));
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/d043f245/metron-streaming/Metron-Topologies/src/main/java/org/apache/metron/utils/SourceConfigUtils.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Topologies/src/main/java/org/apache/metron/utils/SourceConfigUtils.java b/metron-streaming/Metron-Topologies/src/main/java/org/apache/metron/utils/SourceConfigUtils.java
index 13ccd0c..127ca1f 100644
--- a/metron-streaming/Metron-Topologies/src/main/java/org/apache/metron/utils/SourceConfigUtils.java
+++ b/metron-streaming/Metron-Topologies/src/main/java/org/apache/metron/utils/SourceConfigUtils.java
@@ -83,31 +83,52 @@ public class SourceConfigUtils {
 
   public static void main(String[] args) {
 
-      Options options = new Options();
-      options.addOption("p", true, "Path to source option files");
-      options.addOption("z", true, "Zookeeper Quroum URL (zk1:2181,zk2:2181,...");
+    Options options = new Options();
+    {
+      Option o = new Option("h", "help", false, "This screen");
+      o.setRequired(false);
+      options.addOption(o);
+    }
+    {
+      Option o = new Option("p", "config_files", true, "Path to the source config files.  Must be named like \"$source\"-config.json");
+      o.setArgName("DIR_NAME");
+      o.setRequired(false);
+      options.addOption(o);
+    }
+    {
+      Option o = new Option("z", "zk", true, "Zookeeper Quroum URL (zk1:2181,zk2:2181,...");
+      o.setArgName("ZK_QUORUM");
+      o.setRequired(true);
+      options.addOption(o);
+    }
 
     try {
-      CommandLineParser parser = new BasicParser();
-      CommandLine cmd = parser.parse( options, args);
-
-      if( !cmd.hasOption('p') || !cmd.hasOption('z') ){
-        final PrintWriter writer = new PrintWriter(System.out);
+      CommandLineParser parser = new PosixParser();
+      CommandLine cmd = null;
+      try {
+        cmd = parser.parse(options, args);
+      }
+      catch(ParseException pe) {
+        pe.printStackTrace();
         final HelpFormatter usageFormatter = new HelpFormatter();
-        usageFormatter.printUsage(writer, 80, "Apache Metron SourceConfigUtils", options);
-        writer.close();
-        System.exit(1);
+        usageFormatter.printHelp("SourceConfigUtils", null, options, null, true);
+        System.exit(-1);
+      }
+      if( cmd.hasOption("h") ){
+        final HelpFormatter usageFormatter = new HelpFormatter();
+        usageFormatter.printHelp("SourceConfigUtils", null, options, null, true);
+        System.exit(0);
       }
 
-      String sourcePath = cmd.getOptionValue('p');
-      String zkQuorum = cmd.getOptionValue('z');
-
-
-      File root = new File(sourcePath);
+      String zkQuorum = cmd.getOptionValue("z");
+      if(cmd.hasOption("p")) {
+        String sourcePath = cmd.getOptionValue("p");
+        File root = new File(sourcePath);
 
-      if( root.isDirectory() ) {
-        for (File child : root.listFiles()) {
-          writeToZookeeperFromFile(child.getName().replaceFirst("-config.json", ""), child.getPath(), zkQuorum);
+        if (root.isDirectory()) {
+          for (File child : root.listFiles()) {
+            writeToZookeeperFromFile(child.getName().replaceFirst("-config.json", ""), child.getPath(), zkQuorum);
+          }
         }
       }
 
@@ -115,6 +136,8 @@ public class SourceConfigUtils {
 
     } catch (Exception e) {
       e.printStackTrace();
+      System.exit(-1);
     }
+
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/d043f245/metron-streaming/Metron-Topologies/src/main/resources/Metron_Configs/etc/env/config.properties
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Topologies/src/main/resources/Metron_Configs/etc/env/config.properties b/metron-streaming/Metron-Topologies/src/main/resources/Metron_Configs/etc/env/config.properties
index 5d2786d..1f61609 100644
--- a/metron-streaming/Metron-Topologies/src/main/resources/Metron_Configs/etc/env/config.properties
+++ b/metron-streaming/Metron-Topologies/src/main/resources/Metron_Configs/etc/env/config.properties
@@ -86,6 +86,7 @@ bolt.hdfs.file.system.url=hdfs://iot01.cloud.hortonworks.com:8020
 bolt.hdfs.wip.file.path=/paloalto/wip
 bolt.hdfs.finished.file.path=/paloalto/rotated
 bolt.hdfs.compression.codec.class=org.apache.hadoop.io.compress.SnappyCodec
+index.hdfs.output=/tmp/metron/enriched
 
 ##### HBase #####
 bolt.hbase.table.name=pcap_test
@@ -103,4 +104,3 @@ threat.intel.tracker.table=
 threat.intel.tracker.cf=
 threat.intel.ip.table=
 threat.intel.ip.cf=
-

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/d043f245/metron-streaming/Metron-Topologies/src/main/resources/Metron_Configs/topologies/enrichment/remote.yaml
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Topologies/src/main/resources/Metron_Configs/topologies/enrichment/remote.yaml b/metron-streaming/Metron-Topologies/src/main/resources/Metron_Configs/topologies/enrichment/remote.yaml
index ec36f2c..45d8a50 100644
--- a/metron-streaming/Metron-Topologies/src/main/resources/Metron_Configs/topologies/enrichment/remote.yaml
+++ b/metron-streaming/Metron-Topologies/src/main/resources/Metron_Configs/topologies/enrichment/remote.yaml
@@ -101,7 +101,25 @@ components:
                 args:
                     - ref: "ipThreatIntelEnrichment"
 
+    -   id: "fileNameFormat"
+        className: "org.apache.storm.hdfs.bolt.format.DefaultFileNameFormat"
+        configMethods:
+            -   name: "withPrefix"
+                args:
+                    - "enrichment-"
+            -   name: "withExtension"
+                args:
+                  - ".json"
+            -   name: "withPath"
+                args:
+                    - "${index.hdfs.output}"
 #indexing
+    -   id: "hdfsWriter"
+        className: "org.apache.metron.writer.hdfs.HdfsWriter"
+        configMethods:
+            -   name: "withFileNameFormat"
+                args:
+                    - ref: "fileNameFormat"
     -   id: "indexWriter"
         className: "org.apache.metron.writer.ElasticsearchWriter"
         constructorArgs:
@@ -228,6 +246,14 @@ bolts:
             -   name: "withBulkMessageWriter"
                 args:
                     - ref: "indexWriter"
+    -   id: "hdfsIndexingBolt"
+        className: "org.apache.metron.bolt.BulkMessageWriterBolt"
+        constructorArgs:
+            - "${kafka.zk}"
+        configMethods:
+            -   name: "withBulkMessageWriter"
+                args:
+                    - ref: "hdfsWriter"
 
 
 streams:
@@ -314,6 +340,13 @@ streams:
             streamId: "message"
             type: FIELDS
             args: ["key"]
+    -   name: "threatIntelJoin -> hdfs"
+        from: "threatIntelJoinBolt"
+        to: "hdfsIndexingBolt"
+        grouping:
+            streamId: "message"
+            type: SHUFFLE
+
     -   name: "indexingBolt -> errorIndexingBolt"
         from: "indexingBolt"
         to: "indexingBolt"

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/d043f245/metron-streaming/Metron-Topologies/src/main/resources/Metron_Configs/topologies/enrichment/test.yaml
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Topologies/src/main/resources/Metron_Configs/topologies/enrichment/test.yaml b/metron-streaming/Metron-Topologies/src/main/resources/Metron_Configs/topologies/enrichment/test.yaml
index 0e530f5..e1e0ad1 100644
--- a/metron-streaming/Metron-Topologies/src/main/resources/Metron_Configs/topologies/enrichment/test.yaml
+++ b/metron-streaming/Metron-Topologies/src/main/resources/Metron_Configs/topologies/enrichment/test.yaml
@@ -83,7 +83,25 @@ components:
                 args:
                     - ref: "ipThreatIntelEnrichment"
 
+    -   id: "fileNameFormat"
+        className: "org.apache.storm.hdfs.bolt.format.DefaultFileNameFormat"
+        configMethods:
+            -   name: "withPrefix"
+                args:
+                    - "enrichment-"
+            -   name: "withExtension"
+                args:
+                  - ".json"
+            -   name: "withPath"
+                args:
+                    - "${index.hdfs.output}"
 #indexing
+    -   id: "hdfsWriter"
+        className: "org.apache.metron.writer.hdfs.HdfsWriter"
+        configMethods:
+            -   name: "withFileNameFormat"
+                args:
+                    - ref: "fileNameFormat"
     -   id: "indexWriter"
         className: "org.apache.metron.writer.ElasticsearchWriter"
         constructorArgs:
@@ -220,6 +238,14 @@ bolts:
             -   name: "withBulkMessageWriter"
                 args:
                     - ref: "indexWriter"
+    -   id: "hdfsIndexingBolt"
+        className: "org.apache.metron.bolt.BulkMessageWriterBolt"
+        constructorArgs:
+            - "${kafka.zk}"
+        configMethods:
+            -   name: "withBulkMessageWriter"
+                args:
+                    - ref: "hdfsWriter"
 
 
 streams:
@@ -306,6 +332,14 @@ streams:
             streamId: "message"
             type: FIELDS
             args: ["key"]
+
+    -   name: "threatIntelJoin -> hdfs"
+        from: "threatIntelJoinBolt"
+        to: "hdfsIndexingBolt"
+        grouping:
+            streamId: "message"
+            type: SHUFFLE
+
     -   name: "indexingBolt -> errorIndexingBolt"
         from: "indexingBolt"
         to: "indexingBolt"

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/d043f245/metron-streaming/Metron-Topologies/src/test/java/org/apache/metron/integration/EnrichmentIntegrationTest.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Topologies/src/test/java/org/apache/metron/integration/EnrichmentIntegrationTest.java b/metron-streaming/Metron-Topologies/src/test/java/org/apache/metron/integration/EnrichmentIntegrationTest.java
index 6e62e84..4d5f897 100644
--- a/metron-streaming/Metron-Topologies/src/test/java/org/apache/metron/integration/EnrichmentIntegrationTest.java
+++ b/metron-streaming/Metron-Topologies/src/test/java/org/apache/metron/integration/EnrichmentIntegrationTest.java
@@ -17,8 +17,10 @@
  */
 package org.apache.metron.integration;
 
+import com.fasterxml.jackson.core.type.TypeReference;
 import com.google.common.base.*;
 import com.google.common.collect.Iterables;
+import com.google.common.io.Files;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.client.HTableInterface;
 import org.apache.metron.Constants;
@@ -43,10 +45,7 @@ import org.junit.Test;
 import org.apache.metron.utils.JSONUtils;
 
 import javax.annotation.Nullable;
-import java.io.File;
-import java.io.IOException;
-import java.io.PrintWriter;
-import java.io.Serializable;
+import java.io.*;
 import java.text.SimpleDateFormat;
 import java.util.*;
 
@@ -54,6 +53,7 @@ public class EnrichmentIntegrationTest {
 
   private String fluxPath = "src/main/resources/Metron_Configs/topologies/enrichment/test.yaml";
   private String indexDir = "target/elasticsearch";
+  private String hdfsDir = "target/enrichmentIntegrationTest/hdfs";
   private String sampleParsedPath = "src/main/resources/SampleParsed/YafExampleParsed";
   private String sampleIndexedPath = "src/main/resources/SampleIndexed/YafIndexed";
   private Map<String, String> sourceConfigs = new HashMap<>();
@@ -66,9 +66,68 @@ public class EnrichmentIntegrationTest {
     }
   }
 
+  public static void cleanHdfsDir(String hdfsDirStr) {
+    File hdfsDir = new File(hdfsDirStr);
+    Stack<File> fs = new Stack<>();
+    if(hdfsDir.exists()) {
+      fs.push(hdfsDir);
+      while(!fs.empty()) {
+        File f = fs.pop();
+        if (f.isDirectory()) {
+          for(File child : f.listFiles()) {
+            fs.push(child);
+          }
+        }
+        else {
+          if (f.getName().startsWith("enrichment") || f.getName().endsWith(".json")) {
+            f.delete();
+          }
+        }
+      }
+    }
+  }
+
+  public static List<Map<String, Object> > readDocsFromDisk(String hdfsDirStr) throws IOException {
+    List<Map<String, Object>> ret = new ArrayList<>();
+    File hdfsDir = new File(hdfsDirStr);
+    Stack<File> fs = new Stack<>();
+    if(hdfsDir.exists()) {
+      fs.push(hdfsDir);
+      while(!fs.empty()) {
+        File f = fs.pop();
+        if(f.isDirectory()) {
+          for (File child : f.listFiles()) {
+            fs.push(child);
+          }
+        }
+        else {
+          System.out.println("Processed " + f);
+          if (f.getName().startsWith("enrichment") || f.getName().endsWith(".json")) {
+            List<byte[]> data = TestUtils.readSampleData(f.getPath());
+            Iterables.addAll(ret, Iterables.transform(data, new Function<byte[], Map<String, Object>>() {
+              @Nullable
+              @Override
+              public Map<String, Object> apply(@Nullable byte[] bytes) {
+                String s = new String(bytes);
+                try {
+                  return JSONUtils.INSTANCE.load(s, new TypeReference<Map<String, Object>>() {
+                  });
+                } catch (IOException e) {
+                  throw new RuntimeException(e);
+                }
+              }
+            }));
+          }
+        }
+      }
+    }
+    return ret;
+  }
+
 
   @Test
   public void test() throws Exception {
+    cleanHdfsDir(hdfsDir);
     final String dateFormat = "yyyy.MM.dd.hh";
     final String index = "yaf_index_" + new SimpleDateFormat(dateFormat).format(new Date());
     String yafConfig = "{\n" +
@@ -103,6 +162,7 @@ public class EnrichmentIntegrationTest {
       setProperty("es.port", "9300");
       setProperty("es.ip", "localhost");
       setProperty("index.date.format", dateFormat);
+      setProperty("index.hdfs.output", hdfsDir);
     }};
     final KafkaWithZKComponent kafkaComponent = new KafkaWithZKComponent().withTopics(new ArrayList<KafkaWithZKComponent.Topic>() {{
       add(new KafkaWithZKComponent.Topic(Constants.ENRICHMENT_TOPIC, 1));
@@ -147,52 +207,69 @@ public class EnrichmentIntegrationTest {
             .withComponent("elasticsearch", esComponent)
             .withComponent("storm", fluxComponent)
             .withMillisecondsBetweenAttempts(10000)
-            .withNumRetries(30)
-            .withMaxTimeMS(300000)
+            .withNumRetries(10)
             .build();
     runner.start();
-    fluxComponent.submitTopology();
-    kafkaComponent.writeMessages(Constants.ENRICHMENT_TOPIC, inputMessages);
-    List<Map<String, Object>> docs =
-            runner.process(new Processor<List<Map<String, Object>>> () {
-              List<Map<String, Object>> docs = null;
-              public ReadinessState process(ComponentRunner runner){
-                ElasticSearchComponent elasticSearchComponent = runner.getComponent("elasticsearch", ElasticSearchComponent.class);
-                if(elasticSearchComponent.hasIndex(index)) {
-                  try {
-                    docs = elasticSearchComponent.getAllIndexedDocs(index, "yaf_doc");
-                  } catch (IOException e) {
-                    throw new IllegalStateException("Unable to retrieve indexed documents.", e);
-                  }
-                  if(docs.size() < inputMessages.size()) {
+    try {
+      fluxComponent.submitTopology();
+      kafkaComponent.writeMessages(Constants.ENRICHMENT_TOPIC, inputMessages);
+      List<Map<String, Object>> docs =
+              runner.process(new Processor<List<Map<String, Object>>>() {
+                List<Map<String, Object>> docs = null;
+
+                public ReadinessState process(ComponentRunner runner) {
+                  ElasticSearchComponent elasticSearchComponent = runner.getComponent("elasticsearch", ElasticSearchComponent.class);
+                  if (elasticSearchComponent.hasIndex(index)) {
+                    List<Map<String, Object>> docsFromDisk;
+                    try {
+                      docs = elasticSearchComponent.getAllIndexedDocs(index, "yaf_doc");
+                      docsFromDisk = readDocsFromDisk(hdfsDir);
+                      System.out.println(docs.size() + " vs " + inputMessages.size() + " vs " + docsFromDisk.size());
+                    } catch (IOException e) {
+                      throw new IllegalStateException("Unable to retrieve indexed documents.", e);
+                    }
+                    if (docs.size() < inputMessages.size() || docs.size() != docsFromDisk.size()) {
+                      return ReadinessState.NOT_READY;
+                    } else {
+                      return ReadinessState.READY;
+                    }
+                  } else {
                     return ReadinessState.NOT_READY;
                   }
-                  else {
-                    return ReadinessState.READY;
-                  }
                 }
-                else {
-                  return ReadinessState.NOT_READY;
+
+                public List<Map<String, Object>> getResult() {
+                  return docs;
                 }
-              }
+              });
 
-              public List<Map<String, Object>> getResult() {
-                return docs;
-              }
-            });
 
-    List<byte[]> sampleIndexedMessages = TestUtils.readSampleData(sampleIndexedPath);
-    Assert.assertEquals(sampleIndexedMessages.size(), docs.size());
+      Assert.assertEquals(inputMessages.size(), docs.size());
+
+      for (Map<String, Object> doc : docs) {
+        baseValidation(doc);
+        hostEnrichmentValidation(doc);
+        geoEnrichmentValidation(doc);
+        threatIntelValidation(doc);
 
-    for (Map<String, Object> doc : docs) {
-      baseValidation(doc);
+      }
+      List<Map<String, Object>> docsFromDisk = readDocsFromDisk(hdfsDir);
+      Assert.assertEquals(docsFromDisk.size(), docs.size()) ;
 
-      hostEnrichmentValidation(doc);
-      geoEnrichmentValidation(doc);
-      threatIntelValidation(doc);
+      Assert.assertEquals(new File(hdfsDir).list().length, 1);
+      Assert.assertEquals(new File(hdfsDir).list()[0], "yaf_doc");
+      for (Map<String, Object> doc : docsFromDisk) {
+        baseValidation(doc);
+        hostEnrichmentValidation(doc);
+        geoEnrichmentValidation(doc);
+        threatIntelValidation(doc);
 
+      }
+    }
+    finally {
+      cleanHdfsDir(hdfsDir);
+      runner.stop();
     }
-    runner.stop();
   }
 
   public static void baseValidation(Map<String, Object> jsonDoc) {