You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@metron.apache.org by ce...@apache.org on 2016/03/21 18:06:21 UTC

[39/43] incubator-metron git commit: METRON-72 Create unified enrichment topology (merrimanr via jsirota) closes apache/incubator-metron#50

METRON-72 Create unified enrichment topology (merrimanr via jsirota) closes apache/incubator-metron#50


Project: http://git-wip-us.apache.org/repos/asf/incubator-metron/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-metron/commit/c737aa9d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-metron/tree/c737aa9d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-metron/diff/c737aa9d

Branch: refs/heads/Metron_0.1BETA
Commit: c737aa9d90ade31514f21312bd58b0da94c6a98e
Parents: d280837
Author: merrimanr <me...@gmail.com>
Authored: Fri Mar 18 13:30:03 2016 -0700
Committer: James Sirota <Ja...@yahoo.com>
Committed: Fri Mar 18 13:30:03 2016 -0700

----------------------------------------------------------------------
 .../inventory/singlenode-vagrant/group_vars/all |   3 +-
 deployment/roles/metron_ui/files/default.json   | 508 -------------------
 deployment/roles/metron_ui/files/opensoc-ui     |   6 -
 deployment/roles/metron_ui/tasks/main.yml       |  53 +-
 deployment/roles/metron_ui/vars/main.yml        |   1 +
 .../java/org/apache/metron/pcap/PcapUtils.java  |  25 +
 .../org/apache/metron/writer/PcapWriter.java    |   3 +-
 .../metron/parsing/parsers/PcapParser.java      |   9 +-
 metron-streaming/Metron-Pcap_Service/pom.xml    |  27 +-
 .../pcapservice/HBaseConfigurationUtil.java     |   2 +-
 .../pcapservice/PcapReceiverImplRestEasy.java   |   2 +-
 .../metron/pcapservice/RestTestingUtil.java     |  74 +++
 .../metron/pcapservice/rest/PcapService.java    |   1 +
 .../resources/hbase-config-default.properties   |   2 +-
 metron-streaming/Metron-Testing/pom.xml         |  10 +
 .../components/KafkaWithZKComponent.java        |   2 +-
 .../util/integration/util/PcapTestUtil.java     |  77 +++
 .../Metron_Configs/etc/env/config.properties    |   6 +-
 .../Metron_Configs/topologies/pcap/local.yaml   | 440 ----------------
 .../Metron_Configs/topologies/pcap/parse.yaml   |  70 ---
 .../Metron_Configs/topologies/pcap/remote.yaml  | 377 +-------------
 .../Metron_Configs/topologies/pcap/test.yaml    |  74 +++
 .../SampleInput/.PCAPExampleOutput.crc          | Bin 0 -> 44 bytes
 .../resources/SampleInput/PCAPExampleOutput     | Bin 4096 -> 4510 bytes
 .../integration/PcapParserIntegrationTest.java  | 218 ++++++++
 metron-ui/.gitignore                            |   3 -
 metron-ui/config.json                           |   6 +
 metron-ui/lib/metron-ui.js                      |   7 +-
 metron-ui/lib/modules/es-proxy.js               |   2 +-
 metron-ui/lib/modules/login.js                  |   2 +-
 metron-ui/lib/modules/pcap.js                   |  12 +-
 .../lib/public/app/dashboards/default.json      | 461 +++++++++--------
 .../lib/public/app/panels/pcap/module.html      | 106 +---
 metron-ui/lib/public/app/panels/pcap/module.js  |   5 +
 metron-ui/lib/views/alerts.jade                 |  36 +-
 metron-ui/lib/views/index.jade                  |  34 +-
 metron-ui/lib/views/login.jade                  |  34 +-
 metron-ui/package.json                          |  72 +--
 38 files changed, 917 insertions(+), 1853 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/c737aa9d/deployment/inventory/singlenode-vagrant/group_vars/all
----------------------------------------------------------------------
diff --git a/deployment/inventory/singlenode-vagrant/group_vars/all b/deployment/inventory/singlenode-vagrant/group_vars/all
index 2d1157e..06acaa9 100644
--- a/deployment/inventory/singlenode-vagrant/group_vars/all
+++ b/deployment/inventory/singlenode-vagrant/group_vars/all
@@ -65,7 +65,7 @@ ambari_server_mem: 512
 threat_intel_bulk_load: False
 
 #Sensors
-install_pycapa: False
+install_pycapa: True
 install_bro: True
 install_snort: True
 install_yaf: False
@@ -74,6 +74,7 @@ sniff_interface: eth1
 pcap_replay_interface: "{{ sniff_interface }}"
 storm_topologies:
     - "{{ metron_directory }}/config/topologies/bro/remote.yaml"
+    - "{{ metron_directory }}/config/topologies/pcap/remote.yaml"
     - "{{ metron_directory }}/config/topologies/snort/remote.yaml"
     - "{{ metron_directory }}/config/topologies/enrichment/remote.yaml"
 pcapservice_port: 8081

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/c737aa9d/deployment/roles/metron_ui/files/default.json
----------------------------------------------------------------------
diff --git a/deployment/roles/metron_ui/files/default.json b/deployment/roles/metron_ui/files/default.json
deleted file mode 100644
index f4c1d2f..0000000
--- a/deployment/roles/metron_ui/files/default.json
+++ /dev/null
@@ -1,508 +0,0 @@
-{
-  "title": "Metron",
-  "services": {
-    "query": {
-      "list": {
-        "1": {
-          "id": 1,
-          "color": "#6ED0E0",
-          "alias": "Yaf",
-          "pin": true,
-          "type": "lucene",
-          "enable": true,
-          "query": "_type:yaf_doc"
-        },
-        "2": {
-          "id": 2,
-          "color": "#BA43A9",
-          "alias": "All Events",
-          "pin": true,
-          "type": "lucene",
-          "enable": true,
-          "query": "_type:*_doc"
-        },
-        "3": {
-          "id": 3,
-          "color": "#BA43A9",
-          "alias": "All Alerts",
-          "pin": true,
-          "type": "lucene",
-          "enable": true,
-          "query": "_type:*_alert"
-        },
-        "4": {
-          "id": 4,
-          "color": "#1F78C1",
-          "alias": "Bro Events",
-          "pin": true,
-          "type": "lucene",
-          "enable": true,
-          "query": "_type:bro_doc"
-        },
-        "5": {
-          "id": 5,
-          "color": "#EF843C",
-          "alias": "Bro Alerts",
-          "pin": true,
-          "type": "lucene",
-          "enable": true,
-          "query": "_type:bro_alert"
-        },
-        "6": {
-          "id": 6,
-          "color": "#BA43A9",
-          "alias": "Sourcefire Events",
-          "pin": true,
-          "type": "lucene",
-          "enable": true,
-          "query": "_type:sourcefire_doc"
-        },
-        "7": {
-          "id": 7,
-          "color": "#BF1B00",
-          "alias": "Sourcefire Alerts",
-          "pin": true,
-          "type": "lucene",
-          "enable": true,
-          "query": "_type:sourcefire_alert"
-        }
-      },
-      "ids": [
-        1,
-        2,
-        3,
-        4,
-        5,
-        6,
-        7
-      ]
-    },
-    "filter": {
-      "list": {
-        "0": {
-          "type": "time",
-          "field": "timestamp",
-          "from": "now-24h",
-          "to": "now",
-          "mandate": "must",
-          "active": true,
-          "alias": "",
-          "id": 0
-        }
-      },
-      "ids": [
-        0
-      ]
-    }
-  },
-  "rows": [
-    {
-      "title": "Histogram",
-      "height": "150px",
-      "editable": true,
-      "collapse": false,
-      "collapsable": true,
-      "panels": [
-        {
-          "span": 6,
-          "editable": true,
-          "type": "histogram",
-          "loadingEditor": false,
-          "mode": "count",
-          "time_field": "timestamp",
-          "value_field": null,
-          "x-axis": true,
-          "y-axis": true,
-          "scale": 1,
-          "y_format": "short",
-          "grid": {
-            "max": null,
-            "min": 0
-          },
-          "queries": {
-            "mode": "selected",
-            "ids": [
-              4,
-              5
-            ]
-          },
-          "locked": false,
-          "annotate": {
-            "enable": false,
-            "query": "*",
-            "size": 20,
-            "field": "_type",
-            "sort": [
-              "_score",
-              "desc"
-            ]
-          },
-          "auto_int": true,
-          "resolution": 100,
-          "interval": "10m",
-          "intervals": [
-            "auto",
-            "1s",
-            "1m",
-            "5m",
-            "10m",
-            "30m",
-            "1h",
-            "3h",
-            "12h",
-            "1d",
-            "1w",
-            "1y"
-          ],
-          "lines": false,
-          "fill": 0,
-          "linewidth": 3,
-          "points": false,
-          "pointradius": 5,
-          "bars": true,
-          "stack": false,
-          "spyable": true,
-          "zoomlinks": true,
-          "options": true,
-          "legend": true,
-          "show_query": true,
-          "interactive": true,
-          "legend_counts": true,
-          "timezone": "browser",
-          "percentage": false,
-          "zerofill": true,
-          "derivative": false,
-          "tooltip": {
-            "value_type": "individual",
-            "query_as_alias": true
-          },
-          "title": "Bro Data"
-        },
-        {
-          "span": 6,
-          "editable": true,
-          "type": "histogram",
-          "loadingEditor": false,
-          "mode": "count",
-          "time_field": "timestamp",
-          "value_field": null,
-          "x-axis": true,
-          "y-axis": true,
-          "scale": 1,
-          "y_format": "none",
-          "grid": {
-            "max": null,
-            "min": 0
-          },
-          "queries": {
-            "mode": "selected",
-            "ids": [
-              6,
-              7
-            ]
-          },
-          "locked": false,
-          "annotate": {
-            "enable": false,
-            "query": "*",
-            "size": 20,
-            "field": "_type",
-            "sort": [
-              "_score",
-              "desc"
-            ]
-          },
-          "auto_int": true,
-          "resolution": 100,
-          "interval": "10m",
-          "intervals": [
-            "auto",
-            "1s",
-            "1m",
-            "5m",
-            "10m",
-            "30m",
-            "1h",
-            "3h",
-            "12h",
-            "1d",
-            "1w",
-            "1y"
-          ],
-          "lines": false,
-          "fill": 0,
-          "linewidth": 3,
-          "points": false,
-          "pointradius": 5,
-          "bars": true,
-          "stack": true,
-          "spyable": true,
-          "zoomlinks": true,
-          "options": true,
-          "legend": true,
-          "show_query": true,
-          "interactive": true,
-          "legend_counts": true,
-          "timezone": "browser",
-          "percentage": false,
-          "zerofill": true,
-          "derivative": false,
-          "tooltip": {
-            "value_type": "cumulative",
-            "query_as_alias": true
-          },
-          "title": "Sourcefire Data"
-        }
-      ],
-      "notice": false
-    },
-    {
-      "title": "Alerts",
-      "height": "150px",
-      "editable": true,
-      "collapse": false,
-      "collapsable": true,
-      "panels": [
-        {
-          "error": false,
-          "span": 12,
-          "editable": true,
-          "type": "table",
-          "loadingEditor": false,
-          "size": 10,
-          "pages": 100,
-          "offset": 0,
-          "sort": [
-            "timestamp",
-            "desc"
-          ],
-          "overflow": "min-height",
-          "fields": [
-            "_type",
-            "timestamp",
-            "priority",
-            "designated_host",
-            "description"
-          ],
-          "highlight": [],
-          "sortable": true,
-          "header": true,
-          "paging": true,
-          "field_list": false,
-          "all_fields": false,
-          "trimFactor": 400,
-          "localTime": true,
-          "timeField": "timestamp",
-          "spyable": true,
-          "queries": {
-            "mode": "selected",
-            "ids": [
-              3
-            ]
-          },
-          "locked": false,
-          "style": {
-            "font-size": "9pt"
-          },
-          "normTimes": true,
-          "title": "Alerts"
-        }
-      ],
-      "notice": false
-    },
-    {
-      "title": "Events",
-      "height": "150px",
-      "editable": true,
-      "collapse": false,
-      "collapsable": true,
-      "panels": [
-        {
-          "error": false,
-          "span": 12,
-          "editable": true,
-          "type": "table",
-          "loadingEditor": false,
-          "size": 10,
-          "pages": 100,
-          "offset": 0,
-          "sort": [
-            "message.timestamp",
-            "desc"
-          ],
-          "overflow": "min-height",
-          "fields": [
-            "_type",
-            "message.timestamp",
-            "message.original_string"
-          ],
-          "highlight": [],
-          "sortable": true,
-          "header": true,
-          "paging": true,
-          "field_list": false,
-          "all_fields": false,
-          "trimFactor": 400,
-          "localTime": true,
-          "timeField": "message.timestamp",
-          "spyable": true,
-          "queries": {
-            "mode": "selected",
-            "ids": [
-              2
-            ]
-          },
-          "locked": false,
-          "style": {
-            "font-size": "9pt"
-          },
-          "normTimes": true,
-          "title": "Events"
-        }
-      ],
-      "notice": false
-    },
-    {
-      "title": "PCAP Data",
-      "height": "150px",
-      "editable": true,
-      "collapse": false,
-      "collapsable": true,
-      "panels": [
-        {
-          "error": false,
-          "span": 12,
-          "editable": true,
-          "type": "pcap",
-          "loadingEditor": false,
-          "title": "PCAP Data"
-        },
-        {
-          "error": false,
-          "span": 12,
-          "editable": true,
-          "type": "table",
-          "loadingEditor": false,
-          "size": 100,
-          "pages": 5,
-          "offset": 0,
-          "sort": [
-            "_score",
-            "desc"
-          ],
-          "overflow": "min-height",
-          "fields": [],
-          "highlight": [],
-          "sortable": true,
-          "header": true,
-          "paging": true,
-          "field_list": true,
-          "all_fields": false,
-          "trimFactor": 300,
-          "localTime": false,
-          "timeField": "@timestamp",
-          "spyable": true,
-          "queries": {
-            "mode": "selected",
-            "ids": [
-              1
-            ]
-          },
-          "style": {
-            "font-size": "9pt"
-          },
-          "normTimes": true,
-          "title": "Yaf"
-        }
-      ],
-      "notice": false
-    }
-  ],
-  "editable": true,
-  "failover": false,
-  "index": {
-    "interval": "none",
-    "pattern": "[logstash-]YYYY.MM.DD",
-    "default": "*alert*,*_index*",
-    "warm_fields": false
-  },
-  "style": "dark",
-  "realtime": true,
-  "panel_hints": true,
-  "pulldowns": [
-    {
-      "type": "query",
-      "collapse": false,
-      "notice": false,
-      "enable": true,
-      "query": "*",
-      "pinned": true,
-      "history": [
-        "_type:sourcefire_doc",
-        "_type:bro_doc",
-        "_type:*_doc",
-        "_type:sourcefire_alert",
-        "_type:bro_alert",
-        "_type:*_alert",
-        "*"
-      ],
-      "remember": 10
-    },
-    {
-      "type": "filtering",
-      "collapse": true,
-      "notice": true,
-      "enable": true
-    }
-  ],
-  "nav": [
-    {
-      "type": "timepicker",
-      "collapse": false,
-      "notice": false,
-      "enable": true,
-      "status": "Stable",
-      "time_options": [
-        "5m",
-        "15m",
-        "1h",
-        "6h",
-        "12h",
-        "24h",
-        "2d",
-        "7d",
-        "30d"
-      ],
-      "refresh_intervals": [
-        "5s",
-        "10s",
-        "30s",
-        "1m",
-        "5m",
-        "15m",
-        "30m",
-        "1h",
-        "2h",
-        "1d"
-      ],
-      "timefield": "timestamp",
-      "now": true,
-      "filter_id": 0
-    }
-  ],
-  "loader": {
-    "save_gist": false,
-    "save_elasticsearch": true,
-    "save_local": true,
-    "save_default": true,
-    "save_temp": true,
-    "save_temp_ttl_enable": true,
-    "save_temp_ttl": "30d",
-    "load_gist": false,
-    "load_elasticsearch": true,
-    "load_elasticsearch_size": 20,
-    "load_local": false,
-    "hide": false
-  },
-  "refresh": false
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/c737aa9d/deployment/roles/metron_ui/files/opensoc-ui
----------------------------------------------------------------------
diff --git a/deployment/roles/metron_ui/files/opensoc-ui b/deployment/roles/metron_ui/files/opensoc-ui
deleted file mode 100644
index 16e3dbe..0000000
--- a/deployment/roles/metron_ui/files/opensoc-ui
+++ /dev/null
@@ -1,6 +0,0 @@
-{
-  "auth":false,
-  "secrent":"secret",
-  "elasticsearch": { "url": "http://host:port" },
-  "pcap": { "url": "http://host:port/pcap/pcapGetter","mock": false }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/c737aa9d/deployment/roles/metron_ui/tasks/main.yml
----------------------------------------------------------------------
diff --git a/deployment/roles/metron_ui/tasks/main.yml b/deployment/roles/metron_ui/tasks/main.yml
index eb2220d..861abbd 100644
--- a/deployment/roles/metron_ui/tasks/main.yml
+++ b/deployment/roles/metron_ui/tasks/main.yml
@@ -23,46 +23,33 @@
       - { package: 'nodejs'}
       - { package: 'npm'}
 
-- name: Download nvm installer
-  get_url: url=https://raw.githubusercontent.com/creationix/nvm/v0.30.1/install.sh dest=/tmp/nvm-install.sh
-
-- name: Change nvm installer permissions
-  file: path=/tmp/nvm-install.sh mode=0755
-
-- name: Install nvm
-  shell: /tmp/nvm-install.sh creates=~/.nvm/nvm.sh
-
-- name: Install Metron UI
-  shell: source ~/.bashrc && nvm install 0.10.35 && nvm use 0.10.35 && nvm alias default 0.10.35 && npm install -g opensoc-ui creates=/root/.nvm/v0.10.35/lib/node_modules/opensoc-ui/package.json
-
-- name: Add Metron UI config
+- name: Copy Metron UI source
   copy:
-    src: opensoc-ui
-    dest: ~/.opensoc-ui
-    mode: 0644
+    src: "{{ playbook_dir }}/../../metron-ui"
+    dest: "{{ metron_directory }}"
 
-- name: Configure opensoc-ui
-  lineinfile: >
-    dest=~/.opensoc-ui
+- name: Configure Metron UI
+  lineinfile:
+    dest="{{ metron_directory }}/metron-ui/config.json"
     regexp="{{ item.regexp }}"
     line="{{ item.line }}"
     state=present
   with_items:
     - { regexp: '"elasticsearch":', line: '"elasticsearch": { "url": "http://{{ groups.search[0] }}:{{ elasticsearch_web_port }}" },' }
-    - { regexp: '"pcap":', line: '  "pcap": { "url": "http://{{ groups.web[0] }}:{{ pcapservice_port }}/pcap/pcapGetter","mock": false }' }
+    - { regexp: '"pcap":', line: '  "pcap": { "url": "http://{{ groups.web[0] }}:{{ pcapservice_port }}/pcapGetter","mock": false }' }
 
-- name: Fix pcap_all error
-  replace:
-    dest: /root/.nvm/v0.10.35/lib/node_modules/opensoc-ui/lib/static_dist/app/panels/pcap/module.js
-    regexp: "pcap_all"
-    replace: "pcap_*"
-    backup: yes
+- name: Install Node dependencies
+  npm:
+    name: "{{ item }}"
+    path: "{{ metron_ui_directory }}"
+    global: true
+  with_items:
+    - pm2
 
-- name: Configure default dashboard
-  copy:
-    src: default.json
-    dest: /root/.nvm/v0.10.35/lib/node_modules/opensoc-ui/lib/static_dist/app/dashboards/
-    mode: 0644
+- name: Install Metron UI
+  npm:
+    path: "{{ metron_ui_directory }}"
+    production: no
 
-- name: Start Metron UI service
-  shell: source ~/.bashrc && opensoc-ui start
+- name: Start Metron UI
+  shell: "pm2 start /usr/metron/0.1BETA/metron-ui/lib/metron-ui.js --name metron"

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/c737aa9d/deployment/roles/metron_ui/vars/main.yml
----------------------------------------------------------------------
diff --git a/deployment/roles/metron_ui/vars/main.yml b/deployment/roles/metron_ui/vars/main.yml
index ff647ae..ea41dfa 100644
--- a/deployment/roles/metron_ui/vars/main.yml
+++ b/deployment/roles/metron_ui/vars/main.yml
@@ -17,5 +17,6 @@
 ---
 metron_version: 0.1BETA
 metron_directory: /usr/metron/{{ metron_version }}
+metron_ui_directory: "{{ metron_directory }}/metron-ui"
 metron_jar_url: http://192.168.1.174:8080/repository/internal/com/opensoc/OpenSOC-Topologies/{{ metron_version }}/OpenSOC-Topologies-{{ metron_version }}.jar
 metron_source_url: https://github.com/merrimanr/incubator-metron/archive/master.zip

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/c737aa9d/metron-streaming/Metron-Common/src/main/java/org/apache/metron/pcap/PcapUtils.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Common/src/main/java/org/apache/metron/pcap/PcapUtils.java b/metron-streaming/Metron-Common/src/main/java/org/apache/metron/pcap/PcapUtils.java
index baa3bbd..a046801 100644
--- a/metron-streaming/Metron-Common/src/main/java/org/apache/metron/pcap/PcapUtils.java
+++ b/metron-streaming/Metron-Common/src/main/java/org/apache/metron/pcap/PcapUtils.java
@@ -25,6 +25,7 @@ import org.apache.commons.lang.StringUtils;
 
 import com.google.common.collect.BiMap;
 import com.google.common.collect.HashBiMap;
+import org.json.simple.JSONObject;
 
 /**
  * The Class PcapUtils.
@@ -265,6 +266,30 @@ public class PcapUtils {
     return sb.toString();
   }
 
+  public static String getSessionKey(JSONObject message) {
+    String srcIp = (String) message.get("ip_src_addr");
+    String dstIp = (String) message.get("ip_dst_addr");
+    Long protocol = (Long) message.get("ip_protocol");
+    Long srcPort = (Long) message.get("ip_src_port");
+    Long dstPort = (Long) message.get("ip_dst_port");
+    Long ipId = (Long) message.get("ip_id");
+    String ipIdString = ipId == null ? null : ipId.toString();
+    Long fragmentOffset = (Long) message.get("frag_offset");
+    String fragmentOffsetString = fragmentOffset == null ? null : fragmentOffset.toString();
+    return PcapUtils.getSessionKey(srcIp, dstIp, protocol.toString(), srcPort.toString(), dstPort.toString(), ipIdString, fragmentOffsetString);
+  }
+
+  public static String getPartialSessionKey(String srcIp, String dstIp,
+                                            String protocol, String srcPort, String dstPort) {
+    StringBuffer sb = new StringBuffer(40);
+    sb.append(convertIpv4IpToHex(srcIp)).append(SESSION_KEY_SEPERATOR)
+            .append(convertIpv4IpToHex(dstIp)).append(SESSION_KEY_SEPERATOR)
+            .append(protocol == null ? "0" : protocol)
+            .append(SESSION_KEY_SEPERATOR).append(srcPort == null ? "0" : srcPort)
+            .append(SESSION_KEY_SEPERATOR).append(dstPort == null ? "0" : dstPort);
+    return sb.toString();
+  }
+
   /**
    * Gets the session key.
    * 

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/c737aa9d/metron-streaming/Metron-Common/src/main/java/org/apache/metron/writer/PcapWriter.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Common/src/main/java/org/apache/metron/writer/PcapWriter.java b/metron-streaming/Metron-Common/src/main/java/org/apache/metron/writer/PcapWriter.java
index b5ab587..3320bda 100644
--- a/metron-streaming/Metron-Common/src/main/java/org/apache/metron/writer/PcapWriter.java
+++ b/metron-streaming/Metron-Common/src/main/java/org/apache/metron/writer/PcapWriter.java
@@ -18,6 +18,7 @@
 package org.apache.metron.writer;
 
 import backtype.storm.tuple.Tuple;
+import org.apache.metron.pcap.PcapUtils;
 import org.json.simple.JSONObject;
 
 import java.util.HashMap;
@@ -34,7 +35,7 @@ public class PcapWriter extends HBaseWriter {
 
   @Override
   public byte[] getKey(Tuple tuple, JSONObject message) {
-    String key = (String) message.get("pcap_id");
+    String key = PcapUtils.getSessionKey(message);
     return key.getBytes();
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/c737aa9d/metron-streaming/Metron-MessageParsers/src/main/java/org/apache/metron/parsing/parsers/PcapParser.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-MessageParsers/src/main/java/org/apache/metron/parsing/parsers/PcapParser.java b/metron-streaming/Metron-MessageParsers/src/main/java/org/apache/metron/parsing/parsers/PcapParser.java
index c5677f3..bfd943e 100644
--- a/metron-streaming/Metron-MessageParsers/src/main/java/org/apache/metron/parsing/parsers/PcapParser.java
+++ b/metron-streaming/Metron-MessageParsers/src/main/java/org/apache/metron/parsing/parsers/PcapParser.java
@@ -42,6 +42,7 @@ import java.io.File;
 import java.io.IOException;
 import java.io.Serializable;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.List;
 
 public class PcapParser implements MessageParser<JSONObject>, Serializable {
@@ -96,7 +97,13 @@ public class PcapParser implements MessageParser<JSONObject>, Serializable {
 
   @Override
   public boolean validate(JSONObject message) {
-    return true;
+    List<String> requiredFields = Arrays.asList("ip_src_addr",
+            "ip_dst_addr",
+            "ip_protocol",
+            "ip_src_port",
+            "ip_dst_port");
+    return message.keySet().containsAll(requiredFields);
+
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/c737aa9d/metron-streaming/Metron-Pcap_Service/pom.xml
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Pcap_Service/pom.xml b/metron-streaming/Metron-Pcap_Service/pom.xml
index 441a65d..bb850ae 100644
--- a/metron-streaming/Metron-Pcap_Service/pom.xml
+++ b/metron-streaming/Metron-Pcap_Service/pom.xml
@@ -114,15 +114,24 @@
 			<version>2.3</version>
 		</dependency>
 		<dependency>
+			<groupId>com.google.guava</groupId>
+			<artifactId>guava</artifactId>
+			<version>${global_hbase_guava_version}</version>
+		</dependency>
+		<dependency>
 			<groupId>org.apache.hbase</groupId>
 			<artifactId>hbase-client</artifactId>
 			<version>${global_hbase_version}</version>
-			<scope>provided</scope>
+			<!--scope>provided</scope-->
 			<exclusions>
 				<exclusion>
 					<groupId>org.slf4j</groupId>
 					<artifactId>slf4j-log4j12</artifactId>
 				</exclusion>
+				<!--exclusion>
+					<groupId>com.google.guava</groupId>
+					<artifactId>guava</artifactId>
+				</exclusion-->
 			</exclusions>
 		</dependency>
 		<dependency>
@@ -141,15 +150,25 @@
 			<groupId>org.apache.hadoop</groupId>
 			<artifactId>hadoop-common</artifactId>
 			<version>${global_hadoop_version}</version>
-			<scope>provided</scope>
+			<!--scope>provided</scope-->
 		</dependency>
 		<dependency>
 			<groupId>org.apache.hadoop</groupId>
 			<artifactId>hadoop-hdfs</artifactId>
 			<version>${global_hadoop_version}</version>
-			<scope>provided</scope>
+			<!--scope>provided</scope-->
+		</dependency>
+		<dependency>
+			<groupId>org.apache.hadoop</groupId>
+			<artifactId>hadoop-client</artifactId>
+			<version>${global_hadoop_version}</version>
+			<exclusions>
+				<exclusion>
+					<groupId>org.slf4j</groupId>
+					<artifactId>slf4j-log4j12</artifactId>
+				</exclusion>
+			</exclusions>
 		</dependency>
-
 		<dependency>
 			<groupId>org.springframework.integration</groupId>
 			<artifactId>spring-integration-http</artifactId>

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/c737aa9d/metron-streaming/Metron-Pcap_Service/src/main/java/org/apache/metron/pcapservice/HBaseConfigurationUtil.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Pcap_Service/src/main/java/org/apache/metron/pcapservice/HBaseConfigurationUtil.java b/metron-streaming/Metron-Pcap_Service/src/main/java/org/apache/metron/pcapservice/HBaseConfigurationUtil.java
index f7457fb..75932ab 100644
--- a/metron-streaming/Metron-Pcap_Service/src/main/java/org/apache/metron/pcapservice/HBaseConfigurationUtil.java
+++ b/metron-streaming/Metron-Pcap_Service/src/main/java/org/apache/metron/pcapservice/HBaseConfigurationUtil.java
@@ -80,7 +80,7 @@ public class HBaseConfigurationUtil {
       if (connectionAvailable()) {
         return;
       }
-      clusterConnection = HConnectionManager.createConnection(read());
+      clusterConnection = HConnectionManager.createConnection(HBaseConfiguration.create());
       addShutdownHook();
       System.out.println("Created HConnection and added shutDownHook");
     } catch (IOException e) {

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/c737aa9d/metron-streaming/Metron-Pcap_Service/src/main/java/org/apache/metron/pcapservice/PcapReceiverImplRestEasy.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Pcap_Service/src/main/java/org/apache/metron/pcapservice/PcapReceiverImplRestEasy.java b/metron-streaming/Metron-Pcap_Service/src/main/java/org/apache/metron/pcapservice/PcapReceiverImplRestEasy.java
index ae59643..e747350 100644
--- a/metron-streaming/Metron-Pcap_Service/src/main/java/org/apache/metron/pcapservice/PcapReceiverImplRestEasy.java
+++ b/metron-streaming/Metron-Pcap_Service/src/main/java/org/apache/metron/pcapservice/PcapReceiverImplRestEasy.java
@@ -222,7 +222,7 @@ public class PcapReceiverImplRestEasy {
 	
 	    PcapsResponse response = null;
 	    try {
-	      String sessionKey = PcapUtils.getSessionKey(srcIp, dstIp, protocol,
+	      String sessionKey = PcapUtils.getPartialSessionKey(srcIp, dstIp, protocol,
 	          srcPort, dstPort);
 	      LOGGER.info("sessionKey =" + sessionKey);
 	      IPcapGetter pcapGetter = PcapGetterHBaseImpl.getInstance();

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/c737aa9d/metron-streaming/Metron-Pcap_Service/src/main/java/org/apache/metron/pcapservice/RestTestingUtil.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Pcap_Service/src/main/java/org/apache/metron/pcapservice/RestTestingUtil.java b/metron-streaming/Metron-Pcap_Service/src/main/java/org/apache/metron/pcapservice/RestTestingUtil.java
index aa6fb29..e452b89 100644
--- a/metron-streaming/Metron-Pcap_Service/src/main/java/org/apache/metron/pcapservice/RestTestingUtil.java
+++ b/metron-streaming/Metron-Pcap_Service/src/main/java/org/apache/metron/pcapservice/RestTestingUtil.java
@@ -223,6 +223,80 @@ public class RestTestingUtil {
 
   }
 
+  private static void getPcapsByIdentifiers(String srcIp, String dstIp, String protocol, String srcPort, String dstPort) {
+    System.out
+            .println("**********************getPcapsByKeysRange ******************************************************************************************");
+    // 1.
+    String url = "http://" + hostName
+            + "/pcapGetter/getPcapsByIdentifiers?srcIp={srcIp}"
+            + "&dstIp={dstIp}" + "&protocol={protocol}" + "&srcPort={srcPort}"
+            + "&dstPort={dstPort}";
+    // default values
+    String startTime = "-1";
+    String endTime = "-1";
+    String maxResponseSize = "6";
+    @SuppressWarnings("rawtypes")
+    Map map = new HashMap();
+    map.put("srcIp", srcIp);
+    map.put("dstIp", dstIp);
+    map.put("protocol", protocol);
+    map.put("srcPort", srcPort);
+    map.put("dstPort", dstPort);
+
+    RestTemplate template = new RestTemplate();
+
+    // set headers and entity to send
+    HttpHeaders headers = new HttpHeaders();
+    headers.set("Accept", MediaType.APPLICATION_OCTET_STREAM_VALUE);
+    HttpEntity<Object> requestEntity = new HttpEntity<Object>(headers);
+
+    // 1.
+    ResponseEntity<byte[]> response1 = template.exchange(url, HttpMethod.GET,
+            requestEntity, byte[].class, map);
+    System.out
+            .println("----------------------------------------------------------------------------------------------------");
+    System.out
+            .format(
+                    "getPcapsByIdentifiers : request= <srcIp=%s; dstIp=%s; protocol=%s; srcPort=%s; dstPort=%s> \n response= %s \n",
+                    srcIp, dstIp, protocol, endTime, srcPort, dstPort, response1);
+    System.out
+            .println("----------------------------------------------------------------------------------------------------");
+    System.out.println();
+
+    // 2. with time range
+    startTime = System.getProperty("startTime", "-1");
+    endTime = System.getProperty("endTime", "-1");
+    map.put("startTime", startTime);
+    map.put("endTime", endTime);
+    ResponseEntity<byte[]> response2 = template.exchange(url, HttpMethod.GET,
+            requestEntity, byte[].class, map);
+    System.out
+            .println("----------------------------------------------------------------------------------------------------");
+    System.out
+            .format(
+                    "getPcapsByIdentifiers : request= <srcIp=%s; dstIp=%s; protocol=%s; srcPort=%s; dstPort=%s> \n response= %s \n",
+                    srcIp, dstIp, protocol, endTime, srcPort, dstPort, response2);
+    System.out
+            .println("----------------------------------------------------------------------------------------------------");
+    System.out.println();
+
+    // 3. with maxResponseSize
+    maxResponseSize = System.getProperty("maxResponseSize", "6");
+    map.put("maxResponseSize", maxResponseSize);
+    ResponseEntity<byte[]> response3 = template.exchange(url, HttpMethod.GET,
+            requestEntity, byte[].class, map);
+    System.out
+            .println("----------------------------------------------------------------------------------------------------");
+    System.out
+            .format(
+                    "getPcapsByIdentifiers : request= <srcIp=%s; dstIp=%s; protocol=%s; srcPort=%s; dstPort=%s> \n response= %s \n",
+                    srcIp, dstIp, protocol, endTime, srcPort, dstPort, response3);
+    System.out
+            .println("----------------------------------------------------------------------------------------------------");
+    System.out.println();
+
+  }
+
   /**
    * The main method.
    * 

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/c737aa9d/metron-streaming/Metron-Pcap_Service/src/main/java/org/apache/metron/pcapservice/rest/PcapService.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Pcap_Service/src/main/java/org/apache/metron/pcapservice/rest/PcapService.java b/metron-streaming/Metron-Pcap_Service/src/main/java/org/apache/metron/pcapservice/rest/PcapService.java
index d11113a..d3bbe24 100644
--- a/metron-streaming/Metron-Pcap_Service/src/main/java/org/apache/metron/pcapservice/rest/PcapService.java
+++ b/metron-streaming/Metron-Pcap_Service/src/main/java/org/apache/metron/pcapservice/rest/PcapService.java
@@ -19,6 +19,7 @@ package org.apache.metron.pcapservice.rest;
 
 import java.io.IOException;
 
+import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.eclipse.jetty.server.Server;
 import org.eclipse.jetty.servlet.ServletContextHandler;
 import org.eclipse.jetty.servlet.ServletHolder;

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/c737aa9d/metron-streaming/Metron-Pcap_Service/src/main/resources/hbase-config-default.properties
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Pcap_Service/src/main/resources/hbase-config-default.properties b/metron-streaming/Metron-Pcap_Service/src/main/resources/hbase-config-default.properties
index 912485c..0f47193 100644
--- a/metron-streaming/Metron-Pcap_Service/src/main/resources/hbase-config-default.properties
+++ b/metron-streaming/Metron-Pcap_Service/src/main/resources/hbase-config-default.properties
@@ -25,7 +25,7 @@ zookeeper.recovery.retry=0
 #hbase table configuration
 hbase.table.name=pcap
 hbase.table.column.family=t
-hbase.table.column.qualifier=pcap
+hbase.table.column.qualifier=value
 hbase.table.column.maxVersions=5
 
 # scan size limit configuration in MB or KB; if the input is negative or greater than max value throw an error.

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/c737aa9d/metron-streaming/Metron-Testing/pom.xml
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Testing/pom.xml b/metron-streaming/Metron-Testing/pom.xml
index 5f1c946..7498375 100644
--- a/metron-streaming/Metron-Testing/pom.xml
+++ b/metron-streaming/Metron-Testing/pom.xml
@@ -105,6 +105,16 @@
         </exclusion>
       </exclusions>
     </dependency>
+    <dependency>
+      <groupId>org.apache.metron</groupId>
+      <artifactId>Metron-MessageParsers</artifactId>
+      <version>0.1BETA</version>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.metron</groupId>
+      <artifactId>Metron-MessageParsers</artifactId>
+      <version>0.1BETA</version>
+    </dependency>
   </dependencies>
 
   <build>

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/c737aa9d/metron-streaming/Metron-Testing/src/main/java/org/apache/metron/integration/util/integration/components/KafkaWithZKComponent.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Testing/src/main/java/org/apache/metron/integration/util/integration/components/KafkaWithZKComponent.java b/metron-streaming/Metron-Testing/src/main/java/org/apache/metron/integration/util/integration/components/KafkaWithZKComponent.java
index 83ecd42..54b4f27 100644
--- a/metron-streaming/Metron-Testing/src/main/java/org/apache/metron/integration/util/integration/components/KafkaWithZKComponent.java
+++ b/metron-streaming/Metron-Testing/src/main/java/org/apache/metron/integration/util/integration/components/KafkaWithZKComponent.java
@@ -218,7 +218,7 @@ public class KafkaWithZKComponent implements InMemoryComponent {
     }
   }
 
-  public void writeMessages(String topic, List<byte[]> messages) {
+  public void writeMessages(String topic, Collection<byte[]> messages) {
     KafkaProducer<String, byte[]> kafkaProducer = createProducer();
     for(byte[] message: messages) {
       kafkaProducer.send(new ProducerRecord<String, byte[]>(topic, message));

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/c737aa9d/metron-streaming/Metron-Testing/src/main/java/org/apache/metron/integration/util/integration/util/PcapTestUtil.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Testing/src/main/java/org/apache/metron/integration/util/integration/util/PcapTestUtil.java b/metron-streaming/Metron-Testing/src/main/java/org/apache/metron/integration/util/integration/util/PcapTestUtil.java
new file mode 100644
index 0000000..52c8288
--- /dev/null
+++ b/metron-streaming/Metron-Testing/src/main/java/org/apache/metron/integration/util/integration/util/PcapTestUtil.java
@@ -0,0 +1,77 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.integration.util.integration.util;
+
+import kafka.api.FetchRequest;
+import kafka.api.FetchRequestBuilder;
+import kafka.javaapi.FetchResponse;
+import kafka.javaapi.consumer.SimpleConsumer;
+import kafka.message.MessageAndOffset;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.SequenceFile.Writer;
+import org.apache.metron.parsing.parsers.PcapParser;
+import org.json.simple.JSONObject;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Iterator;
+import java.util.List;
+
+public class PcapTestUtil {
+
+  public static final String OUTPUT_PATH = "./metron-streaming/Metron-Topologies/src/main/resources/SampleInput/PCAPExampleOutputTest";
+
+  public static void main(String[] args) throws IOException {
+    String topic = "pcap";
+    SimpleConsumer consumer = new SimpleConsumer("node1", 6667, 100000, 64 * 1024, "consumer");
+    FetchRequest req = new FetchRequestBuilder()
+            .clientId("consumer")
+            .addFetch(topic, 0, 0, 100000)
+            .build();
+    FetchResponse fetchResponse = consumer.fetch(req);
+    Iterator<MessageAndOffset> results = fetchResponse.messageSet(topic, 0).iterator();
+    Writer writer = SequenceFile.createWriter(new Configuration(),
+            Writer.file(new Path(OUTPUT_PATH)),
+            Writer.compression(SequenceFile.CompressionType.NONE),
+            Writer.keyClass(IntWritable.class),
+            Writer.valueClass(BytesWritable.class));
+    int index = 0;
+    int size = 20;
+    PcapParser pcapParser = new PcapParser();
+    pcapParser.init();
+    while(results.hasNext()) {
+      if (index == size) break;
+      ByteBuffer payload = results.next().message().payload();
+      byte[] bytes = new byte[payload.limit()];
+      payload.get(bytes);
+      List<JSONObject> parsed = pcapParser.parse(bytes);
+      if (parsed != null && parsed.size() > 0) {
+        JSONObject message = parsed.get(0);
+        if (pcapParser.validate(message)) {
+          writer.append(new IntWritable(index++), new BytesWritable(bytes));
+        }
+      }
+    }
+    writer.close();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/c737aa9d/metron-streaming/Metron-Topologies/src/main/resources/Metron_Configs/etc/env/config.properties
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Topologies/src/main/resources/Metron_Configs/etc/env/config.properties b/metron-streaming/Metron-Topologies/src/main/resources/Metron_Configs/etc/env/config.properties
index 9f66a95..5d36ef1 100644
--- a/metron-streaming/Metron-Topologies/src/main/resources/Metron_Configs/etc/env/config.properties
+++ b/metron-streaming/Metron-Topologies/src/main/resources/Metron_Configs/etc/env/config.properties
@@ -17,8 +17,8 @@
 
 ##### Kafka #####
 
-kafka.zk=zkpr1:2181,zkpr2:2181,zkpr3:2181
-kafka.broker=kfka1:6667
+kafka.zk=node1:2181
+kafka.broker=node1:6667
 spout.kafka.topic.asa=asa
 spout.kafka.topic.bro=bro
 spout.kafka.topic.fireeye=fireeye
@@ -89,7 +89,7 @@ bolt.hdfs.compression.codec.class=org.apache.hadoop.io.compress.SnappyCodec
 index.hdfs.output=/tmp/metron/enriched
 
 ##### HBase #####
-bolt.hbase.table.name=pcap_test
+bolt.hbase.table.name=pcap
 bolt.hbase.table.fields=t:value
 bolt.hbase.table.key.tuple.field.name=key
 bolt.hbase.table.timestamp.tuple.field.name=timestamp

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/c737aa9d/metron-streaming/Metron-Topologies/src/main/resources/Metron_Configs/topologies/pcap/local.yaml
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Topologies/src/main/resources/Metron_Configs/topologies/pcap/local.yaml b/metron-streaming/Metron-Topologies/src/main/resources/Metron_Configs/topologies/pcap/local.yaml
deleted file mode 100644
index 3987a18..0000000
--- a/metron-streaming/Metron-Topologies/src/main/resources/Metron_Configs/topologies/pcap/local.yaml
+++ /dev/null
@@ -1,440 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-#
-#     http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-name: "pcap-local"
-config:
-    topology.workers: 1
-
-components:
-# Parser
-    -   id: "parser"
-        className: "org.apache.metron.parsing.parsers.PcapParser"
-        configMethods:
-            -   name: "withTsPrecision"
-                args: ["MICRO"]
-# Threat Intel
-    -   id: "ipThreatIntelConfig"
-        className: "org.apache.metron.threatintel.ThreatIntelConfig"
-        configMethods:
-            -   name: "withProviderImpl"
-                args:
-                    - "${hbase.provider.impl}"
-            -   name: "withTrackerHBaseTable"
-                args:
-                    - "${threat.intel.tracker.table}"
-            -   name: "withTrackerHBaseCF"
-                args:
-                    - "${threat.intel.tracker.cf}"
-            -   name: "withHBaseTable"
-                args:
-                    - "${threat.intel.ip.table}"
-            -   name: "withHBaseCF"
-                args:
-                    - "${threat.intel.ip.cf}"
-    -   id: "ipThreatIntelAdapter"
-        className: "org.apache.metron.threatintel.ThreatIntelAdapter"
-        configMethods:
-           -    name: "withConfig"
-                args:
-                    - ref: "ipThreatIntelConfig"
-    -   id: "ipThreatIntelEnrichment"
-        className: "org.apache.metron.domain.Enrichment"
-        properties:
-           - name: "type"
-             value: "ip"
-           - name: "fields"
-             value: ["message/ip_src_addr", "message/ip_dst_addr"]
-           - name: "adapter"
-             ref: "ipThreatIntelAdapter"
-    -   id: "threatIntels"
-        className: "java.util.ArrayList"
-        configMethods:
-            -   name: "add"
-                args:
-                    - ref: "ipThreatIntelEnrichment"
-# Enrichment
-    -   id: "geoEnrichmentAdapter"
-        className: "org.apache.metron.integration.util.mock.MockGeoAdapter"
-    -   id: "geoEnrichment"
-        className: "org.apache.metron.domain.Enrichment"
-        properties:
-            -   name: "type"
-                value:  "geo"
-            -   name: "fields"
-                value: ["ip_src_addr", "ip_dst_addr"]
-            -   name: "adapter"
-                ref: "geoEnrichmentAdapter"
-    -   id: "hostEnrichmentAdapter"
-        className: "org.apache.metron.enrichment.adapters.host.HostFromJSONListAdapter"
-        constructorArgs:
-            - '${org.apache.metron.enrichment.host.known_hosts}'
-    -   id: "hostEnrichment"
-        className: "org.apache.metron.domain.Enrichment"
-        properties:
-            -   name: "type"
-                value:  "host"
-            -   name: "fields"
-                value: ["ip_src_addr", "ip_dst_addr"]
-            -   name: "adapter"
-                ref: "hostEnrichmentAdapter"
-    -   id: "enrichments"
-        className: "java.util.ArrayList"
-        configMethods:
-            -   name: "add"
-                args:
-                    - ref: "geoEnrichment"
-            -   name: "add"
-                args:
-                    - ref: "hostEnrichment"
-#indexing
-    -   id: "indexAdapter"
-        className: "org.apache.metron.indexing.adapters.ESTimedRotatingAdapter"
-    -   id: "metricConfig"
-        className: "org.apache.commons.configuration.BaseConfiguration"
-        configMethods:
-            -   name: "setProperty"
-                args:
-                    - "org.apache.metron.metrics.reporter.graphite"
-                    - "${org.apache.metron.metrics.reporter.graphite}"
-            -   name: "setProperty"
-                args:
-                    - "org.apache.metron.metrics.reporter.console"
-                    - "${org.apache.metron.metrics.reporter.console}"
-            -   name: "setProperty"
-                args:
-                    - "org.apache.metron.metrics.reporter.jmx"
-                    - "${org.apache.metron.metrics.reporter.jmx}"
-            -   name: "setProperty"
-                args:
-                    - "org.apache.metron.metrics.graphite.address"
-                    - "${org.apache.metron.metrics.graphite.address}"
-            -   name: "setProperty"
-                args:
-                    - "org.apache.metron.metrics.graphite.port"
-                    - "${org.apache.metron.metrics.graphite.port}"
-            -   name: "setProperty"
-                args:
-                    - "org.apache.metron.metrics.TelemetryParserBolt.acks"
-                    - "${org.apache.metron.metrics.TelemetryParserBolt.acks}"
-            -   name: "setProperty"
-                args:
-                    - "org.apache.metron.metrics.TelemetryParserBolt.emits"
-                    - "${org.apache.metron.metrics.TelemetryParserBolt.emits}"
-            -   name: "setProperty"
-                args:
-                    - "org.apache.metron.metrics.TelemetryParserBolt.fails"
-                    - "${org.apache.metron.metrics.TelemetryParserBolt.fails}"
-            -   name: "setProperty"
-                args:
-                    - "org.apache.metron.metrics.GenericEnrichmentBolt.acks"
-                    - "${org.apache.metron.metrics.GenericEnrichmentBolt.acks}"
-            -   name: "setProperty"
-                args:
-                    - "org.apache.metron.metrics.GenericEnrichmentBolt.emits"
-                    - "${org.apache.metron.metrics.GenericEnrichmentBolt.emits}"
-            -   name: "setProperty"
-                args:
-                    - "org.apache.metron.metrics.GenericEnrichmentBolt.fails"
-                    - "${org.apache.metron.metrics.GenericEnrichmentBolt.fails}"
-            -   name: "setProperty"
-                args:
-                    - "org.apache.metron.metrics.TelemetryIndexingBolt.acks"
-                    - "${org.apache.metron.metrics.TelemetryIndexingBolt.acks}"
-            -   name: "setProperty"
-                args:
-                    - "org.apache.metron.metrics.TelemetryIndexingBolt.emits"
-                    - "${org.apache.metron.metrics.TelemetryIndexingBolt.emits}"
-            -   name: "setProperty"
-                args:
-                    - "org.apache.metron.metrics.TelemetryIndexingBolt.fails"
-                    - "${org.apache.metron.metrics.TelemetryIndexingBolt.fails}"
-#hbase bolt
-    -   id: "hbaseConfig"
-        className: "org.apache.metron.hbase.TupleTableConfig"
-        configMethods:
-            -   name: "withFields"
-                args:
-                    - "${bolt.hbase.table.fields}"
-            -   name: "withTable"
-                args:
-                    - "${bolt.hbase.table.name}"
-            -   name: "withRowKeyField"
-                args:
-                    - "${bolt.hbase.table.key.tuple.field.name}"
-            -   name: "withTimestampField"
-                args:
-                    - "${bolt.hbase.table.timestamp.tuple.field.name}"
-            -   name: "withBatch"
-                args:
-                    - ${bolt.hbase.enable.batching}
-            -   name: "withConnectorImpl"
-                args:
-                    - "${hbase.provider.impl}"
-spouts:
-    -   id: "testingSpout"
-        className: "org.apache.metron.test.spouts.GenericInternalTestSpout"
-        parallelism: 1
-        configMethods:
-            -   name: "withBinaryConverter"
-                args:
-                    - "${pcap.binary.converter}"
-            -   name: "withFilename"
-                args:
-                    - "${input.path}SampleInput/PCAPExampleOutput"
-            -   name: "withRepeating"
-                args:
-                    - ${testing.repeating}
-bolts:
-    -   id: "hbaseBolt"
-        className: "org.apache.metron.hbase.HBaseBolt"
-        constructorArgs:
-            - ref: "hbaseConfig"
-            - "${kafka.zk}"
-    -   id: "parserBolt"
-        className: "org.apache.metron.bolt.PcapParserBolt"
-        configMethods:
-            -   name: "withMessageParser"
-                args:
-                    - ref: "parser"
-            -   name: "withEnrichments"
-                args:
-                    - ref: "enrichments"
-    -   id: "indexingBolt"
-        className: "org.apache.metron.indexing.TelemetryIndexingBolt"
-        configMethods:
-            -   name: "withIndexIP"
-                args:
-                    - "${es.ip}"
-            -   name: "withIndexPort"
-                args:
-                    - ${es.port}
-            -   name: "withClusterName"
-                args:
-                    - "${es.clustername}"
-            -   name: "withIndexName"
-                args:
-                    - "pcap_index"
-            -   name: "withIndexTimestamp"
-                args:
-                    - "yyyy.MM.dd.hh"
-            -   name: "withDocumentName"
-                args:
-                    - "pcap_doc"
-            -   name: "withBulk"
-                args:
-                    - 1
-            -   name: "withIndexAdapter"
-                args:
-                    - ref: "indexAdapter"
-            -   name: "withMetricConfiguration"
-                args:
-                    - ref: "metricConfig"
-    -   id: "errorIndexingBolt"
-        className: "org.apache.metron.indexing.TelemetryIndexingBolt"
-        configMethods:
-            -   name: "withIndexIP"
-                args:
-                    - "${es.ip}"
-            -   name: "withIndexPort"
-                args:
-                    - ${es.port}
-            -   name: "withClusterName"
-                args:
-                    - "${es.clustername}"
-            -   name: "withIndexName"
-                args:
-                    - "error"
-            -   name: "withIndexTimestamp"
-                args:
-                    - "yyyy.MM"
-            -   name: "withDocumentName"
-                args:
-                    - "pcap_error"
-            -   name: "withBulk"
-                args:
-                    - 1
-            -   name: "withIndexAdapter"
-                args:
-                    - ref: "indexAdapter"
-            -   name: "withMetricConfiguration"
-                args:
-                    - ref: "metricConfig"
-# Threat Intel Bolts
-    -   id: "threatIntelSplitBolt"
-        className: "org.apache.metron.enrichment.bolt.EnrichmentSplitterBolt"
-        configMethods:
-            -   name: "withEnrichments"
-                args:
-                    - ref: "threatIntels"
-    -   id: "ipThreatIntelBolt"
-        className: "org.apache.metron.enrichment.bolt.GenericEnrichmentBolt"
-        configMethods:
-            -   name: "withEnrichment"
-                args:
-                    - ref: "ipThreatIntelEnrichment"
-            -   name: "withMaxCacheSize"
-                args: [10000]
-            -   name: "withMaxTimeRetain"
-                args: [10]
-    -   id: "threatIntelJoinBolt"
-        className: "org.apache.metron.enrichment.bolt.EnrichmentJoinBolt"
-        configMethods:
-            -   name: "withEnrichments"
-                args:
-                    - ref: "threatIntels"
-            -   name: "withType"
-                args:
-                    - "alerts"
-            -   name: "withMaxCacheSize"
-                args: [10000]
-            -   name: "withMaxTimeRetain"
-                args: [10]
-# Enrichment Bolts
-    -   id: "geoEnrichmentBolt"
-        className: "org.apache.metron.enrichment.bolt.GenericEnrichmentBolt"
-        configMethods:
-            -   name: "withEnrichment"
-                args:
-                    - ref: "geoEnrichment"
-            -   name: "withMaxCacheSize"
-                args: [10000]
-            -   name: "withMaxTimeRetain"
-                args: [10]
-    -   id: "hostEnrichmentBolt"
-        className: "org.apache.metron.enrichment.bolt.GenericEnrichmentBolt"
-        configMethods:
-            -   name: "withEnrichment"
-                args:
-                    - ref: "hostEnrichment"
-            -   name: "withMaxCacheSize"
-                args: [10000]
-            -   name: "withMaxTimeRetain"
-                args: [10]
-    -   id: "joinBolt"
-        className: "org.apache.metron.enrichment.bolt.EnrichmentJoinBolt"
-        configMethods:
-        -   name: "withEnrichments"
-            args:
-                - ref: "enrichments"
-        -   name: "withMaxCacheSize"
-            args: [10000]
-        -   name: "withMaxTimeRetain"
-            args: [10]
-
-streams:
-#parser
-    -   name: "spout -> parser"
-        from: "testingSpout"
-        to: "parserBolt"
-        grouping:
-            type: SHUFFLE
-#hbase
-#    -   name: "parser -> hbase"
-#        from: "parserBolt"
-#        to: "hbaseBolt"
-#        grouping:
-#            streamId: "raw"
-#            type: FIELDS
-#            args: ["key"]
-#enrichment
-    -   name: "parser -> host"
-        from: "parserBolt"
-        to: "hostEnrichmentBolt"
-        grouping:
-            streamId: "host"
-            type: FIELDS
-            args: ["key"]
-    -   name: "parser -> geo"
-        from: "parserBolt"
-        to: "geoEnrichmentBolt"
-        grouping:
-            streamId: "geo"
-            type: FIELDS
-            args: ["key"]
-    -   name: "parser -> join"
-        from: "parserBolt"
-        to: "joinBolt"
-        grouping:
-            streamId: "message"
-            type: FIELDS
-            args: ["key"]
-    -   name: "geo -> join"
-        from: "geoEnrichmentBolt"
-        to: "joinBolt"
-        grouping:
-            streamId: "geo"
-            type: FIELDS
-            args: ["key"]
-    -   name: "host -> join"
-        from: "hostEnrichmentBolt"
-        to: "joinBolt"
-        grouping:
-            streamId: "host"
-            type: FIELDS
-            args: ["key"]
-
-#threat intel
-    -   name: "enrichmentJoin -> threatSplit"
-        from: "joinBolt"
-        to: "threatIntelSplitBolt"
-        grouping:
-            streamId: "message"
-            type: FIELDS
-            args: ["key"]
-
-    -   name: "threatSplit -> ip"
-        from: "threatIntelSplitBolt"
-        to: "ipThreatIntelBolt"
-        grouping:
-            streamId: "ip"
-            type: FIELDS
-            args: ["key"]
-
-    -   name: "ip -> join"
-        from: "ipThreatIntelBolt"
-        to: "threatIntelJoinBolt"
-        grouping:
-            streamId: "ip"
-            type: FIELDS
-            args: ["key"]
-    -   name: "threatIntelSplit -> threatIntelJoin"
-        from: "threatIntelSplitBolt"
-        to: "threatIntelJoinBolt"
-        grouping:
-            streamId: "message"
-            type: FIELDS
-            args: ["key"]
-#indexing
-    -   name: "threatIntelJoin -> indexing"
-        from: "threatIntelJoinBolt"
-        to: "indexingBolt"
-        grouping:
-            streamId: "message"
-            type: FIELDS
-            args: ["key"]
-#errors
-    -   name: "parser -> errors"
-        from: "parserBolt"
-        to: "errorIndexingBolt"
-        grouping:
-            streamId: "error"
-            type: SHUFFLE
-    -   name: "indexing -> errors"
-        from: "indexingBolt"
-        to: "errorIndexingBolt"
-        grouping:
-            streamId: "error"
-            type: SHUFFLE

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/c737aa9d/metron-streaming/Metron-Topologies/src/main/resources/Metron_Configs/topologies/pcap/parse.yaml
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Topologies/src/main/resources/Metron_Configs/topologies/pcap/parse.yaml b/metron-streaming/Metron-Topologies/src/main/resources/Metron_Configs/topologies/pcap/parse.yaml
deleted file mode 100644
index bfc8527..0000000
--- a/metron-streaming/Metron-Topologies/src/main/resources/Metron_Configs/topologies/pcap/parse.yaml
+++ /dev/null
@@ -1,70 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-#
-#     http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-name: "pcap-parse"
-config:
-    topology.workers: 1
-
-components:
-    -   id: "parser"
-        className: "org.apache.metron.parsing.parsers.PcapParser"
-    -   id: "writer"
-        className: "org.apache.metron.writer.PcapWriter"
-        constructorArgs:
-            - "${bolt.hbase.table.name}"
-            - "${bolt.hbase.table.fields}"
-    -   id: "zkHosts"
-        className: "storm.kafka.ZkHosts"
-        constructorArgs:
-            - "${kafka.zk}"
-    -   id: "kafkaConfig"
-        className: "storm.kafka.SpoutConfig"
-        constructorArgs:
-            # zookeeper hosts
-            - ref: "zkHosts"
-            # topic name
-            - "${spout.kafka.topic.pcap}"
-            # zk root
-            - ""
-            # id
-            - "${spout.kafka.topic.pcap}"
-        properties:
-            -   name: "ignoreZkOffsets"
-                value: true
-            -   name: "startOffsetTime"
-                value: -1
-
-spouts:
-    -   id: "kafkaSpout"
-        className: "storm.kafka.KafkaSpout"
-        constructorArgs:
-            - ref: "kafkaConfig"
-
-bolts:
-    -   id: "parserBolt"
-        className: "org.apache.metron.bolt.ParserBolt"
-        constructorArgs:
-            - "${kafka.zk}"
-            - "pcap"
-            - ref: "parser"
-            - ref: "writer"
-
-streams:
-    -   name: "spout -> bolt"
-        from: "kafkaSpout"
-        to: "parserBolt"
-        grouping:
-            type: SHUFFLE

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/c737aa9d/metron-streaming/Metron-Topologies/src/main/resources/Metron_Configs/topologies/pcap/remote.yaml
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Topologies/src/main/resources/Metron_Configs/topologies/pcap/remote.yaml b/metron-streaming/Metron-Topologies/src/main/resources/Metron_Configs/topologies/pcap/remote.yaml
index 5bdbc17..bfc8527 100644
--- a/metron-streaming/Metron-Topologies/src/main/resources/Metron_Configs/topologies/pcap/remote.yaml
+++ b/metron-streaming/Metron-Topologies/src/main/resources/Metron_Configs/topologies/pcap/remote.yaml
@@ -14,137 +14,18 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-name: "pcap"
+name: "pcap-parse"
 config:
     topology.workers: 1
 
 components:
-# Parser
     -   id: "parser"
         className: "org.apache.metron.parsing.parsers.PcapParser"
-        configMethods:
-            -   name: "withTsPrecision"
-                args: ["MICRO"]
-# Threat Intel
-    -   id: "ipThreatIntelConfig"
-        className: "org.apache.metron.threatintel.ThreatIntelConfig"
-        configMethods:
-            -   name: "withTrackerHBaseTable"
-                args:
-                    - "${threat.intel.tracker.table}"
-            -   name: "withTrackerHBaseCF"
-                args:
-                    - "${threat.intel.tracker.cf}"
-            -   name: "withHBaseTable"
-                args:
-                    - "${threat.intel.ip.table}"
-            -   name: "withHBaseCF"
-                args:
-                    - "${threat.intel.ip.cf}"
-    -   id: "ipThreatIntelAdapter"
-        className: "org.apache.metron.threatintel.ThreatIntelAdapter"
-        configMethods:
-           -    name: "withConfig"
-                args:
-                    - ref: "ipThreatIntelConfig"
-    -   id: "ipThreatIntelEnrichment"
-        className: "org.apache.metron.domain.Enrichment"
-        properties:
-           - name: "type"
-             value: "ip"
-           - name: "fields"
-             value: ["message/ip_src_addr", "message/ip_dst_addr"]
-           - name: "adapter"
-             ref: "ipThreatIntelAdapter"
-    -   id: "threatIntels"
-        className: "java.util.ArrayList"
-        configMethods:
-            -   name: "add"
-                args:
-                    - ref: "ipThreatIntelEnrichment"
-# Enrichment
-
-    -   id: "hostEnrichmentAdapter"
-        className: "org.apache.metron.enrichment.adapters.host.HostFromJSONListAdapter"
+    -   id: "writer"
+        className: "org.apache.metron.writer.PcapWriter"
         constructorArgs:
-            - '${org.apache.metron.enrichment.host.known_hosts}'
-    -   id: "hostEnrichment"
-        className: "org.apache.metron.domain.Enrichment"
-        properties:
-            -   name: "type"
-                value:  "host"
-            -   name: "fields"
-                value: ["ip_src_addr", "ip_dst_addr"]
-            -   name: "adapter"
-                ref: "hostEnrichmentAdapter"
-    -   id: "enrichments"
-        className: "java.util.ArrayList"
-        configMethods:
-            -   name: "add"
-                args:
-                    - ref: "hostEnrichment"
-#indexing
-    -   id: "indexAdapter"
-        className: "org.apache.metron.indexing.adapters.ESTimedRotatingAdapter"
-    -   id: "metricConfig"
-        className: "org.apache.commons.configuration.BaseConfiguration"
-        configMethods:
-            -   name: "setProperty"
-                args:
-                    - "org.apache.metron.metrics.reporter.graphite"
-                    - "${org.apache.metron.metrics.reporter.graphite}"
-            -   name: "setProperty"
-                args:
-                    - "org.apache.metron.metrics.reporter.console"
-                    - "${org.apache.metron.metrics.reporter.console}"
-            -   name: "setProperty"
-                args:
-                    - "org.apache.metron.metrics.reporter.jmx"
-                    - "${org.apache.metron.metrics.reporter.jmx}"
-            -   name: "setProperty"
-                args:
-                    - "org.apache.metron.metrics.graphite.address"
-                    - "${org.apache.metron.metrics.graphite.address}"
-            -   name: "setProperty"
-                args:
-                    - "org.apache.metron.metrics.graphite.port"
-                    - "${org.apache.metron.metrics.graphite.port}"
-            -   name: "setProperty"
-                args:
-                    - "org.apache.metron.metrics.TelemetryParserBolt.acks"
-                    - "${org.apache.metron.metrics.TelemetryParserBolt.acks}"
-            -   name: "setProperty"
-                args:
-                    - "org.apache.metron.metrics.TelemetryParserBolt.emits"
-                    - "${org.apache.metron.metrics.TelemetryParserBolt.emits}"
-            -   name: "setProperty"
-                args:
-                    - "org.apache.metron.metrics.TelemetryParserBolt.fails"
-                    - "${org.apache.metron.metrics.TelemetryParserBolt.fails}"
-            -   name: "setProperty"
-                args:
-                    - "org.apache.metron.metrics.GenericEnrichmentBolt.acks"
-                    - "${org.apache.metron.metrics.GenericEnrichmentBolt.acks}"
-            -   name: "setProperty"
-                args:
-                    - "org.apache.metron.metrics.GenericEnrichmentBolt.emits"
-                    - "${org.apache.metron.metrics.GenericEnrichmentBolt.emits}"
-            -   name: "setProperty"
-                args:
-                    - "org.apache.metron.metrics.GenericEnrichmentBolt.fails"
-                    - "${org.apache.metron.metrics.GenericEnrichmentBolt.fails}"
-            -   name: "setProperty"
-                args:
-                    - "org.apache.metron.metrics.TelemetryIndexingBolt.acks"
-                    - "${org.apache.metron.metrics.TelemetryIndexingBolt.acks}"
-            -   name: "setProperty"
-                args:
-                    - "org.apache.metron.metrics.TelemetryIndexingBolt.emits"
-                    - "${org.apache.metron.metrics.TelemetryIndexingBolt.emits}"
-            -   name: "setProperty"
-                args:
-                    - "org.apache.metron.metrics.TelemetryIndexingBolt.fails"
-                    - "${org.apache.metron.metrics.TelemetryIndexingBolt.fails}"
+            - "${bolt.hbase.table.name}"
+            - "${bolt.hbase.table.fields}"
     -   id: "zkHosts"
         className: "storm.kafka.ZkHosts"
         constructorArgs:
@@ -165,259 +46,25 @@ components:
                 value: true
             -   name: "startOffsetTime"
                 value: -1
-            -   name: "socketTimeoutMs"
-                value: 1000000
-#hbase bolt
-    -   id: "hbaseConfig"
-        className: "org.apache.metron.hbase.TupleTableConfig"
-        configMethods:
-            -   name: "withFields"
-                args:
-                    - "${bolt.hbase.table.fields}"
-            -   name: "withTable"
-                args:
-                    - "${bolt.hbase.table.name}"
-            -   name: "withRowKeyField"
-                args:
-                    - "${bolt.hbase.table.key.tuple.field.name}"
-            -   name: "withTimestampField"
-                args:
-                    - "${bolt.hbase.table.timestamp.tuple.field.name}"
-            -   name: "withBatch"
-                args:
-                    - ${bolt.hbase.enable.batching}
+
 spouts:
     -   id: "kafkaSpout"
         className: "storm.kafka.KafkaSpout"
         constructorArgs:
             - ref: "kafkaConfig"
+
 bolts:
-    -   id: "hbaseBolt"
-        className: "org.apache.metron.hbase.HBaseBolt"
-        constructorArgs:
-            - ref: "hbaseConfig"
-            - "${kafka.zk}"
     -   id: "parserBolt"
-        className: "org.apache.metron.bolt.PcapParserBolt"
-        constructorArgs:
-            - "${kafka.zk}"
-        configMethods:
-            -   name: "withMessageParser"
-                args:
-                    - ref: "parser"
-            -   name: "withEnrichments"
-                args:
-                    - ref: "enrichments"
-    -   id: "indexingBolt"
-        className: "org.apache.metron.indexing.TelemetryIndexingBolt"
-        constructorArgs:
-            - "${kafka.zk}"
-        configMethods:
-            -   name: "withIndexIP"
-                args:
-                    - "${es.ip}"
-            -   name: "withIndexPort"
-                args:
-                    - ${es.port}
-            -   name: "withClusterName"
-                args:
-                    - "${es.clustername}"
-            -   name: "withIndexName"
-                args:
-                    - "pcap_index"
-            -   name: "withIndexTimestamp"
-                args:
-                    - "yyyy.MM.dd.hh"
-            -   name: "withDocumentName"
-                args:
-                    - "pcap_doc"
-            -   name: "withBulk"
-                args:
-                    - 1
-            -   name: "withIndexAdapter"
-                args:
-                    - ref: "indexAdapter"
-            -   name: "withMetricConfiguration"
-                args:
-                    - ref: "metricConfig"
-    -   id: "errorIndexingBolt"
-        className: "org.apache.metron.indexing.TelemetryIndexingBolt"
-        constructorArgs:
-            - "${kafka.zk}"
-        configMethods:
-            -   name: "withIndexIP"
-                args:
-                    - "${es.ip}"
-            -   name: "withIndexPort"
-                args:
-                    - ${es.port}
-            -   name: "withClusterName"
-                args:
-                    - "${es.clustername}"
-            -   name: "withIndexName"
-                args:
-                    - "error"
-            -   name: "withIndexTimestamp"
-                args:
-                    - "yyyy.MM"
-            -   name: "withDocumentName"
-                args:
-                    - "pcap_error"
-            -   name: "withBulk"
-                args:
-                    - 1
-            -   name: "withIndexAdapter"
-                args:
-                    - ref: "indexAdapter"
-            -   name: "withMetricConfiguration"
-                args:
-                    - ref: "metricConfig"
-# Threat Intel Bolts
-    -   id: "threatIntelSplitBolt"
-        className: "org.apache.metron.enrichment.bolt.EnrichmentSplitterBolt"
-        constructorArgs:
-            - "${kafka.zk}"
-        configMethods:
-            -   name: "withEnrichments"
-                args:
-                    - ref: "threatIntels"
-    -   id: "ipThreatIntelBolt"
-        className: "org.apache.metron.enrichment.bolt.GenericEnrichmentBolt"
-        constructorArgs:
-            - "${kafka.zk}"
-        configMethods:
-            -   name: "withEnrichment"
-                args:
-                    - ref: "ipThreatIntelEnrichment"
-            -   name: "withMaxCacheSize"
-                args: [10000]
-            -   name: "withMaxTimeRetain"
-                args: [10]
-    -   id: "threatIntelJoinBolt"
-        className: "org.apache.metron.enrichment.bolt.EnrichmentJoinBolt"
-        constructorArgs:
-            - "${kafka.zk}"
-        configMethods:
-            -   name: "withEnrichments"
-                args:
-                    - ref: "threatIntels"
-            -   name: "withMaxCacheSize"
-                args: [10000]
-            -   name: "withMaxTimeRetain"
-                args: [10]
-# Enrichment Bolts
-    -   id: "hostEnrichmentBolt"
-        className: "org.apache.metron.enrichment.bolt.GenericEnrichmentBolt"
+        className: "org.apache.metron.bolt.ParserBolt"
         constructorArgs:
             - "${kafka.zk}"
-        configMethods:
-            -   name: "withEnrichment"
-                args:
-                    - ref: "hostEnrichment"
-            -   name: "withMaxCacheSize"
-                args: [10000]
-            -   name: "withMaxTimeRetain"
-                args: [10]
-    -   id: "joinBolt"
-        className: "org.apache.metron.enrichment.bolt.EnrichmentJoinBolt"
-        constructorArgs:
-            - "${kafka.zk}"
-        configMethods:
-        -   name: "withEnrichments"
-            args:
-                - ref: "enrichments"
-        -   name: "withMaxCacheSize"
-            args: [10000]
-        -   name: "withMaxTimeRetain"
-            args: [10]
+            - "pcap"
+            - ref: "parser"
+            - ref: "writer"
 
 streams:
-#parser
-    -   name: "spout -> parser"
+    -   name: "spout -> bolt"
         from: "kafkaSpout"
         to: "parserBolt"
         grouping:
             type: SHUFFLE
-#hbase
-    -   name: "parser -> hbase"
-        from: "parserBolt"
-        to: "hbaseBolt"
-        grouping:
-            streamId: "raw"
-            type: FIELDS
-            args: ["key"]
-#enrichment
-    -   name: "parser -> host"
-        from: "parserBolt"
-        to: "hostEnrichmentBolt"
-        grouping:
-            streamId: "host"
-            type: FIELDS
-            args: ["key"]
-    -   name: "parser -> join"
-        from: "parserBolt"
-        to: "joinBolt"
-        grouping:
-            streamId: "message"
-            type: FIELDS
-            args: ["key"]
-    -   name: "host -> join"
-        from: "hostEnrichmentBolt"
-        to: "joinBolt"
-        grouping:
-            streamId: "host"
-            type: FIELDS
-            args: ["key"]
-
-#threat intel
-    -   name: "enrichmentJoin -> threatSplit"
-        from: "joinBolt"
-        to: "threatIntelSplitBolt"
-        grouping:
-            streamId: "message"
-            type: FIELDS
-            args: ["key"]
-
-    -   name: "threatSplit -> ip"
-        from: "threatIntelSplitBolt"
-        to: "ipThreatIntelBolt"
-        grouping:
-            streamId: "ip"
-            type: FIELDS
-            args: ["key"]
-
-    -   name: "ip -> join"
-        from: "ipThreatIntelBolt"
-        to: "threatIntelJoinBolt"
-        grouping:
-            streamId: "ip"
-            type: FIELDS
-            args: ["key"]
-    -   name: "threatIntelSplit -> threatIntelJoin"
-        from: "threatIntelSplitBolt"
-        to: "threatIntelJoinBolt"
-        grouping:
-            streamId: "message"
-            type: FIELDS
-            args: ["key"]
-#indexing
-    -   name: "threatIntelJoin -> indexing"
-        from: "threatIntelJoinBolt"
-        to: "indexingBolt"
-        grouping:
-            streamId: "message"
-            type: FIELDS
-            args: ["key"]
-#errors
-    -   name: "parser -> errors"
-        from: "parserBolt"
-        to: "errorIndexingBolt"
-        grouping:
-            streamId: "error"
-            type: SHUFFLE
-    -   name: "indexing -> errors"
-        from: "indexingBolt"
-        to: "errorIndexingBolt"
-        grouping:
-            streamId: "error"
-            type: SHUFFLE

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/c737aa9d/metron-streaming/Metron-Topologies/src/main/resources/Metron_Configs/topologies/pcap/test.yaml
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Topologies/src/main/resources/Metron_Configs/topologies/pcap/test.yaml b/metron-streaming/Metron-Topologies/src/main/resources/Metron_Configs/topologies/pcap/test.yaml
new file mode 100644
index 0000000..a3a79fd
--- /dev/null
+++ b/metron-streaming/Metron-Topologies/src/main/resources/Metron_Configs/topologies/pcap/test.yaml
@@ -0,0 +1,74 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+name: "pcap-test"
+config:
+    topology.workers: 1
+
+components:
+    -   id: "parser"
+        className: "org.apache.metron.parsing.parsers.PcapParser"
+    -   id: "writer"
+        className: "org.apache.metron.writer.PcapWriter"
+        constructorArgs:
+            - "${bolt.hbase.table.name}"
+            - "${bolt.hbase.table.fields}"
+        configMethods:
+            -   name: "withProviderImpl"
+                args:
+                    - "${hbase.provider.impl}"
+    -   id: "zkHosts"
+        className: "storm.kafka.ZkHosts"
+        constructorArgs:
+            - "${kafka.zk}"
+    -   id: "kafkaConfig"
+        className: "storm.kafka.SpoutConfig"
+        constructorArgs:
+            # zookeeper hosts
+            - ref: "zkHosts"
+            # topic name
+            - "${spout.kafka.topic.pcap}"
+            # zk root
+            - ""
+            # id
+            - "${spout.kafka.topic.pcap}"
+        properties:
+            -   name: "ignoreZkOffsets"
+                value: false
+            -   name: "startOffsetTime"
+                value: -2
+
+spouts:
+    -   id: "kafkaSpout"
+        className: "storm.kafka.KafkaSpout"
+        constructorArgs:
+            - ref: "kafkaConfig"
+
+bolts:
+    -   id: "parserBolt"
+        className: "org.apache.metron.bolt.ParserBolt"
+        constructorArgs:
+            - "${kafka.zk}"
+            - "pcap"
+            - ref: "parser"
+            - ref: "writer"
+
+streams:
+    -   name: "spout -> bolt"
+        from: "kafkaSpout"
+        to: "parserBolt"
+        grouping:
+            type: SHUFFLE

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/c737aa9d/metron-streaming/Metron-Topologies/src/main/resources/SampleInput/.PCAPExampleOutput.crc
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Topologies/src/main/resources/SampleInput/.PCAPExampleOutput.crc b/metron-streaming/Metron-Topologies/src/main/resources/SampleInput/.PCAPExampleOutput.crc
new file mode 100644
index 0000000..6e53497
Binary files /dev/null and b/metron-streaming/Metron-Topologies/src/main/resources/SampleInput/.PCAPExampleOutput.crc differ

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/c737aa9d/metron-streaming/Metron-Topologies/src/main/resources/SampleInput/PCAPExampleOutput
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Topologies/src/main/resources/SampleInput/PCAPExampleOutput b/metron-streaming/Metron-Topologies/src/main/resources/SampleInput/PCAPExampleOutput
index 2de0edc..e730181 100644
Binary files a/metron-streaming/Metron-Topologies/src/main/resources/SampleInput/PCAPExampleOutput and b/metron-streaming/Metron-Topologies/src/main/resources/SampleInput/PCAPExampleOutput differ