You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@metron.apache.org by rm...@apache.org on 2016/04/26 16:45:59 UTC

[11/51] [partial] incubator-metron git commit: METRON-113 Project Reorganization (merrimanr) closes apache/incubator-metron#88

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/0117987e/metron-platform/metron-pcap/pom.xml
----------------------------------------------------------------------
diff --git a/metron-platform/metron-pcap/pom.xml b/metron-platform/metron-pcap/pom.xml
new file mode 100644
index 0000000..7125382
--- /dev/null
+++ b/metron-platform/metron-pcap/pom.xml
@@ -0,0 +1,103 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed to the Apache Software
+  Foundation (ASF) under one or more contributor license agreements. See the
+  NOTICE file distributed with this work for additional information regarding
+  copyright ownership. The ASF licenses this file to You under the Apache License,
+  Version 2.0 (the "License"); you may not use this file except in compliance
+  with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
+  Unless required by applicable law or agreed to in writing, software distributed
+  under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES
+  OR CONDITIONS OF ANY KIND, either express or implied. See the License for
+  the specific language governing permissions and limitations under the License.
+  -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+    <parent>
+        <groupId>org.apache.metron</groupId>
+        <artifactId>metron-platform</artifactId>
+        <version>0.1BETA</version>
+    </parent>
+    <artifactId>metron-pcap</artifactId>
+    <description>Metron Pcap</description>
+    <properties>
+    </properties>
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.metron</groupId>
+            <artifactId>metron-hbase</artifactId>
+            <version>${project.parent.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>com.google.guava</groupId>
+            <artifactId>guava</artifactId>
+            <version>${global_guava_version}</version>
+            <scope>compile</scope>
+        </dependency>
+        <dependency>
+            <groupId>com.googlecode.json-simple</groupId>
+            <artifactId>json-simple</artifactId>
+            <version>${global_json_simple_version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.hadoop</groupId>
+            <artifactId>hadoop-common</artifactId>
+            <version>${global_hadoop_version}</version>
+            <exclusions>
+                <exclusion>
+                    <artifactId>servlet-api</artifactId>
+                    <groupId>javax.servlet</groupId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.storm</groupId>
+            <artifactId>storm-core</artifactId>
+            <version>${global_storm_version}</version>
+            <scope>provided</scope>
+            <exclusions>
+                <exclusion>
+                    <artifactId>servlet-api</artifactId>
+                    <groupId>javax.servlet</groupId>
+                </exclusion>
+                <exclusion>
+                    <artifactId>log4j-over-slf4j</artifactId>
+                    <groupId>org.slf4j</groupId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.kafka</groupId>
+            <artifactId>kafka_2.9.2</artifactId>
+            <version>${global_kafka_version}</version>
+            <scope>provided</scope>
+            <exclusions>
+                <exclusion>
+                    <groupId>com.sun.jmx</groupId>
+                    <artifactId>jmxri</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>com.sun.jdmk</groupId>
+                    <artifactId>jmxtools</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>javax.jms</groupId>
+                    <artifactId>jms</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.storm</groupId>
+            <artifactId>storm-kafka</artifactId>
+            <version>${global_storm_version}</version>
+            <exclusions>
+                <exclusion>
+                    <artifactId>org.apache.curator</artifactId>
+                    <groupId>curator-client</groupId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+    </dependencies>
+</project>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/0117987e/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/Constants.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/Constants.java b/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/Constants.java
new file mode 100644
index 0000000..99945cb
--- /dev/null
+++ b/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/Constants.java
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.pcap;
+
+
+/**
+* The Interface Constants.
+* 
+* @author sheetal
+* @version $Revision: 1.0 $
+*/
+public interface Constants {
+
+/** The protocol tcp. */
+public static final int PROTOCOL_TCP = 6;
+
+/** The protocol udp. */
+public static final int PROTOCOL_UDP = 17;
+
+/** The document key separator. */
+public static final char DOCUMENT_KEY_SEPARATOR = '-';
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/0117987e/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/IEEE_802_1Q.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/IEEE_802_1Q.java b/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/IEEE_802_1Q.java
new file mode 100644
index 0000000..27fae51
--- /dev/null
+++ b/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/IEEE_802_1Q.java
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.pcap;
+
+public class IEEE_802_1Q {
+
+	  int priorityCodePoint = 0;
+	  int dropEligibleIndicator = 0;
+	  int vLANIdentifier = 0;
+
+	  public IEEE_802_1Q(int priorityCodePoint, int dropEligibleIndicator,
+	      int vLANIdentifier) {
+	    this.priorityCodePoint = priorityCodePoint;
+	    this.dropEligibleIndicator = dropEligibleIndicator;
+	    this.vLANIdentifier = vLANIdentifier;
+	  }
+
+	  public int getPriorityCodePoint() {
+	    return priorityCodePoint;
+	  }
+
+	  public int getDropEligibleIndicator() {
+	    return dropEligibleIndicator;
+	  }
+
+	  public int getvLANIdentifier() {
+	    return vLANIdentifier;
+	  }
+	}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/0117987e/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/MetronEthernetDecoder.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/MetronEthernetDecoder.java b/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/MetronEthernetDecoder.java
new file mode 100644
index 0000000..1609887
--- /dev/null
+++ b/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/MetronEthernetDecoder.java
@@ -0,0 +1,134 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.pcap;
+
+import java.util.BitSet;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CopyOnWriteArraySet;
+
+import org.krakenapps.pcap.decoder.ethernet.EthernetDecoder;
+import org.krakenapps.pcap.decoder.ethernet.EthernetFrame;
+import org.krakenapps.pcap.decoder.ethernet.EthernetProcessor;
+import org.krakenapps.pcap.decoder.ethernet.MacAddress;
+import org.krakenapps.pcap.packet.PcapPacket;
+import org.krakenapps.pcap.util.Buffer;
+
+public class MetronEthernetDecoder extends EthernetDecoder {
+
+  private Set<EthernetProcessor> callbacks;
+  private Map<Integer, Set<EthernetProcessor>> typeCallbacks;
+
+  public MetronEthernetDecoder() {
+    callbacks = new CopyOnWriteArraySet<EthernetProcessor>();
+    typeCallbacks = new ConcurrentHashMap<Integer, Set<EthernetProcessor>>();
+  }
+
+  public void register(EthernetProcessor processor) {
+    this.callbacks.add(processor);
+  }
+
+  public void register(int type, EthernetProcessor processor) {
+    Set<EthernetProcessor> processors = typeCallbacks.get(type);
+    if (processors == null) {
+      processors = new HashSet<EthernetProcessor>();
+      typeCallbacks.put(type, processors);
+    }
+
+    processors.add(processor);
+  }
+
+  public void unregister(EthernetProcessor processor) {
+    this.callbacks.remove(processor);
+  }
+
+  public void unregister(int type, EthernetProcessor processor) {
+    Set<EthernetProcessor> processors = typeCallbacks.get(type);
+    if (processors == null)
+      return;
+
+    processors.remove(processor);
+  }
+
+  public void decode(PcapPacket packet) {
+    // do not reorder following codes (parse sequence)
+    MacAddress destination = getMacAddress(packet.getPacketData());
+    MacAddress source = getMacAddress(packet.getPacketData());
+    int type = getEtherType(packet.getPacketData());
+
+    if (type == 0x8100) {
+      // It is 802.1Q VLAN tag
+      IEEE_802_1Q iee802_1qTag = get802_1qTag(packet.getPacketData());
+      // Now get the type
+      type = getEtherType(packet.getPacketData());
+    }
+
+    Buffer buffer = packet.getPacketData();
+    buffer.discardReadBytes();
+
+    EthernetFrame frame = new EthernetFrame(source, destination, type, buffer);
+    frame.setPcapPacket(packet);
+    dispatch(frame);
+  }
+
+  private MacAddress getMacAddress(Buffer data) {
+    byte[] mac = new byte[6];
+    data.gets(mac, 0, 6);
+    return new MacAddress(mac);
+  }
+
+  private int getEtherType(Buffer data) {
+    return ((int) data.getShort()) & 0x0000FFFF;
+  }
+
+  private IEEE_802_1Q get802_1qTag(Buffer data) {
+
+    // reference http://en.wikipedia.org/wiki/EtherType &
+    // http://en.wikipedia.org/wiki/IEEE_802.1Q
+    byte[] b802_1qTag = new byte[2];
+    data.gets(b802_1qTag, 0, 2);
+    BitSet bits = BitSet.valueOf(b802_1qTag);
+    int pcp = convertBitToInt(bits.get(0, 3));
+    int dei = convertBitToInt(bits.get(3, 4));
+    int vid = convertBitToInt(bits.get(4, 16));
+
+    return new IEEE_802_1Q(pcp, dei, vid);
+  }
+
+  public static int convertBitToInt(BitSet bits) {
+    int value = 0;
+    for (int i = 0; i < bits.length(); ++i) {
+      value += bits.get(i) ? (1 << i) : 0;
+    }
+    return value;
+  }
+
+  private void dispatch(EthernetFrame frame) {
+    for (EthernetProcessor processor : callbacks)
+      processor.process(frame);
+
+    Set<EthernetProcessor> processors = typeCallbacks.get(frame.getType());
+    if (processors == null)
+      return;
+
+    for (EthernetProcessor processor : processors)
+      processor.process(frame.dup());
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/0117987e/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/PacketInfo.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/PacketInfo.java b/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/PacketInfo.java
new file mode 100644
index 0000000..fcaf1b0
--- /dev/null
+++ b/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/PacketInfo.java
@@ -0,0 +1,470 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.pcap;
+
+import java.text.MessageFormat;
+import org.apache.log4j.Logger;
+
+import org.krakenapps.pcap.decoder.ip.Ipv4Packet;
+import org.krakenapps.pcap.decoder.tcp.TcpPacket;
+import org.krakenapps.pcap.decoder.udp.UdpPacket;
+import org.krakenapps.pcap.file.GlobalHeader;
+import org.krakenapps.pcap.packet.PacketHeader;
+import org.krakenapps.pcap.packet.PcapPacket;
+
+import org.apache.metron.pcap.utils.PcapUtils;
+
+/**
+ * The Class PacketInfo.
+ * 
+ * @author sheetal
+ * @version $Revision: 1.0 $
+ */
+public class PacketInfo {
+
+  /** The packetHeader. */
+  private PacketHeader packetHeader = null;
+
+  /** The packet. */
+  private PcapPacket packet = null;
+
+  /** The ipv4 packet. */
+  private Ipv4Packet ipv4Packet = null;
+
+  /** The tcp packet. */
+  private TcpPacket tcpPacket = null;
+
+  /** The udp packet. */
+  private UdpPacket udpPacket = null;
+
+  /** The global header. */
+  private GlobalHeader globalHeader = null;
+
+  /** The Constant globalHeaderJsonTemplateSB. */
+  private static final StringBuffer globalHeaderJsonTemplateSB = new StringBuffer();
+
+  /** The Constant ipv4HeaderJsonTemplateSB. */
+  private static final StringBuffer ipv4HeaderJsonTemplateSB = new StringBuffer();
+
+  /** The Constant tcpHeaderJsonTemplateSB. */
+  private static final StringBuffer tcpHeaderJsonTemplateSB = new StringBuffer();
+
+  /** The Constant udpHeaderJsonTemplateSB. */
+  private static final StringBuffer udpHeaderJsonTemplateSB = new StringBuffer();
+
+  /** The Constant LOG. */
+  private static final Logger LOG = Logger.getLogger(PacketInfo.class);
+  
+  static {
+    globalHeaderJsonTemplateSB.append("<\"global_header\":<\"pcap_id\":\"").append("{0}").append('"');
+    globalHeaderJsonTemplateSB.append(",\"inc_len\":").append("{1}");
+    globalHeaderJsonTemplateSB.append(",\"orig_len\":").append("{2}");
+    globalHeaderJsonTemplateSB.append(",\"ts_sec\":").append("{3}");
+    globalHeaderJsonTemplateSB.append(",\"ts_usec\":").append("{4}");
+    globalHeaderJsonTemplateSB.append(">,"); // NOPMD by sheetal on 1/29/14 2:37
+    // PM
+
+    // ipv4 header
+
+    ipv4HeaderJsonTemplateSB.append("\"ipv4_header\":");
+
+    ipv4HeaderJsonTemplateSB.append("\"ip_dst\":").append("{0}");
+    ipv4HeaderJsonTemplateSB.append(",\"ip_dst_addr\":\"").append("{1}");
+    ipv4HeaderJsonTemplateSB.append("\",\"ip_flags\":").append("{2}");
+    ipv4HeaderJsonTemplateSB.append(",\"ip_fragment_offset\":").append("{3}");
+    ipv4HeaderJsonTemplateSB.append(",\"ip_header_checksum\":").append("{4}");
+    ipv4HeaderJsonTemplateSB.append(",\"ip_id\":").append("{5}");
+    ipv4HeaderJsonTemplateSB.append(",\"ip_header_length\":").append("{6}");
+    ipv4HeaderJsonTemplateSB.append(",\"ip_protocol\":").append("{7}");
+    ipv4HeaderJsonTemplateSB.append(",\"ip_src\":").append("{8}");
+    ipv4HeaderJsonTemplateSB.append(",\"ip_src_addr\":\"").append("{9}");
+    ipv4HeaderJsonTemplateSB.append("\",\"ip_tos\":").append("{10}");
+    ipv4HeaderJsonTemplateSB.append(",\"ip_total_length\":").append("{11}");
+    ipv4HeaderJsonTemplateSB.append(",\"ip_ttl\":").append("{12}");
+    ipv4HeaderJsonTemplateSB.append(",\"ip_version\":").append("{13}");
+    ipv4HeaderJsonTemplateSB.append('>');
+
+    // tcp header
+    tcpHeaderJsonTemplateSB.append(",\"tcp_header\":<\"ack\":").append("{0}");
+    tcpHeaderJsonTemplateSB.append(",\"checksum\":").append("{1}");
+    tcpHeaderJsonTemplateSB.append(",\"data_length\":").append("{2}");
+    tcpHeaderJsonTemplateSB.append(",\"data_offset\":").append("{3}");
+    tcpHeaderJsonTemplateSB.append(",\"dst_addr\":\"").append("{4}");
+    tcpHeaderJsonTemplateSB.append("\",\"dst_port\":").append("{5}");
+    tcpHeaderJsonTemplateSB.append(",\"direction\":").append("{6}");
+    tcpHeaderJsonTemplateSB.append(",\"flags\":").append("{7}");
+    tcpHeaderJsonTemplateSB.append(",\"reassembled_length \":").append("{8}");
+    tcpHeaderJsonTemplateSB.append(",\"relative_ack\":").append("{9}");
+    tcpHeaderJsonTemplateSB.append(",\"relative_seq\":").append("{10}");
+    tcpHeaderJsonTemplateSB.append(",\"seq\":").append("{11}");
+    tcpHeaderJsonTemplateSB.append(",\"session_key\":\"").append("{12}");
+    tcpHeaderJsonTemplateSB.append("\",\"src_addr\":\"").append("{13}");
+    tcpHeaderJsonTemplateSB.append("\",\"src_port\":").append("{14}");
+    tcpHeaderJsonTemplateSB.append(",\"total_length\":").append("{15}");
+    tcpHeaderJsonTemplateSB.append(",\"urgent_pointer\":").append("{16}");
+    tcpHeaderJsonTemplateSB.append(",\"window\":").append("{17}");
+    tcpHeaderJsonTemplateSB.append(">>");
+
+    // udp headers
+    udpHeaderJsonTemplateSB.append(",\"udp_header\":<\"checksum\":").append("{0}");
+    udpHeaderJsonTemplateSB.append(",\"dst_port\":").append("{1}");
+    udpHeaderJsonTemplateSB.append(",\"length\":").append("{2}");
+    udpHeaderJsonTemplateSB.append(",\"src_port\":").append("{3}");
+    udpHeaderJsonTemplateSB.append(",\"dst_addr\":\"").append("{4}");
+    udpHeaderJsonTemplateSB.append("\",\"src_addr\":\"").append("{5}").append('"');
+    tcpHeaderJsonTemplateSB.append(">>");
+
+  }
+
+  /** The Constant globalHeaderJsonTemplateString. */
+  private static final String globalHeaderJsonTemplateString = globalHeaderJsonTemplateSB.toString();
+
+  /** The Constant ipv4HeaderJsonTemplateString. */
+  private static final String ipv4HeaderJsonTemplateString = ipv4HeaderJsonTemplateSB.toString();
+
+  /** The Constant tcpHeaderJsonTemplateString. */
+  private static final String tcpHeaderJsonTemplateString = tcpHeaderJsonTemplateSB.toString();
+
+  /** The Constant udpHeaderJsonTemplateString. */
+  private static final String udpHeaderJsonTemplateString = udpHeaderJsonTemplateSB.toString();
+
+  /**
+   * Instantiates a new packet info.
+   * 
+   * @param globalHeader
+   *          the global header
+   * @param packetHeader
+   *          the packet header
+   * @param packet
+   *          the packet
+   * @param ipv4Packet
+   *          the ipv4 packet
+   * @param tcpPacket
+   *          the tcp packet
+   * @param udpPacket
+   *          the udp packet
+   */
+  public PacketInfo(GlobalHeader globalHeader, PacketHeader packetHeader, PcapPacket packet, Ipv4Packet ipv4Packet, TcpPacket tcpPacket,
+      UdpPacket udpPacket) {
+    this.packetHeader = packetHeader;
+    this.packet = packet;
+    this.ipv4Packet = ipv4Packet;
+    this.tcpPacket = tcpPacket;
+    this.udpPacket = udpPacket;
+    this.globalHeader = globalHeader;
+  }
+
+  /**
+   * Gets the global header.
+   * 
+   * @return the global header
+   */
+  public GlobalHeader getGlobalHeader() {
+    return globalHeader;
+  }
+
+  /**
+   * Gets the packet header.
+   * 
+   * 
+   * @return the packet header
+   */
+  public PacketHeader getPacketHeader() {
+    return packetHeader;
+  }
+
+  /**
+   * Gets the packet.
+   * 
+   * 
+   * @return the packet
+   */
+  public PcapPacket getPacket() {
+    return packet;
+  }
+
+  /**
+   * Gets the ipv4 packet.
+   * 
+   * 
+   * @return the ipv4 packet
+   */
+  public Ipv4Packet getIpv4Packet() {
+    return ipv4Packet;
+  }
+
+  /**
+   * Gets the tcp packet.
+   * 
+   * 
+   * @return the tcp packet
+   */
+  public TcpPacket getTcpPacket() {
+    return tcpPacket;
+  }
+
+  /**
+   * Gets the udp packet.
+   * 
+   * 
+   * @return the udp packet
+   */
+  public UdpPacket getUdpPacket() {
+    return udpPacket;
+  }
+
+  /**
+   * Gets the key.
+   * 
+   * 
+   * @return the key
+   */
+  public String getKey() {
+    int sourcePort = 0;
+    int destinationPort = 0;
+    if (Constants.PROTOCOL_UDP == ipv4Packet.getProtocol()) {
+      sourcePort = udpPacket.getSourcePort();
+
+      destinationPort = udpPacket.getDestinationPort();
+
+    } else if (Constants.PROTOCOL_TCP == ipv4Packet.getProtocol()) {
+      sourcePort = tcpPacket.getSourcePort();
+
+      destinationPort = tcpPacket.getDestinationPort();
+
+    }
+
+    return PcapUtils.getSessionKey(ipv4Packet.getSourceAddress().getHostAddress(), ipv4Packet.getDestinationAddress().getHostAddress(),
+        ipv4Packet.getProtocol(), sourcePort, destinationPort, ipv4Packet.getId(), ipv4Packet.getFragmentOffset());
+
+  }
+
+  /**
+   * Gets the short key
+   * 
+   * 
+   * @return the short key
+   */
+  public String getShortKey() {
+	int sourcePort = 0;
+	int destinationPort = 0;
+	if(Constants.PROTOCOL_UDP == ipv4Packet.getProtocol()) {
+		sourcePort = udpPacket.getSourcePort();
+		destinationPort = udpPacket.getDestinationPort();
+	} else if (Constants.PROTOCOL_TCP == ipv4Packet.getProtocol()) {
+		sourcePort = tcpPacket.getSourcePort();
+		destinationPort = tcpPacket.getDestinationPort();
+	}
+	  
+	return PcapUtils.getShortSessionKey(ipv4Packet.getSourceAddress().getHostAddress(), ipv4Packet.getDestinationAddress().getHostAddress(),
+	    ipv4Packet.getProtocol(), sourcePort, destinationPort);
+			 
+  }
+  
+  /**
+   * Gets the json doc.
+   * 
+   * 
+   * @return the json doc
+   */
+  public String getJsonDoc() {
+
+    return getJsonDocUsingSBAppend();
+  }
+
+  /**
+   * Gets the json doc.
+   * 
+   * 
+   * @return the json doc
+   */
+  public String getJsonIndexDoc() {
+
+    return getJsonIndexDocUsingSBAppend();
+  }
+
+  /**
+   * Gets the json doc using sb append.
+   * 
+   * @return the json doc using sb append
+   */
+  private String getJsonDocUsingSBAppend() {
+
+	
+    StringBuffer jsonSb = new StringBuffer(1024);
+
+    // global header
+    jsonSb.append("{\"global_header\":{\"pcap_id\":\"").append(getKey());
+    jsonSb.append("\",\"inc_len\":").append(packetHeader.getInclLen());
+    jsonSb.append(",\"orig_len\":").append(packetHeader.getOrigLen());
+    jsonSb.append(",\"ts_sec\":").append(packetHeader.getTsSec());
+    jsonSb.append(",\"ts_usec\":").append(packetHeader.getTsUsec());
+    jsonSb.append("},"); // NOPMD by sheetal on 1/29/14 2:37 PM
+
+    // ipv4 header
+
+    jsonSb.append("\"ipv4_header\":{");
+
+    jsonSb.append("\"ip_dst\":").append(ipv4Packet.getDestination());
+    jsonSb.append(",\"ip_dst_addr\":\"").append(ipv4Packet.getDestinationAddress().getHostAddress());
+    jsonSb.append("\",\"ip_flags\":").append(ipv4Packet.getFlags());
+    jsonSb.append(",\"ip_fragment_offset\":").append(ipv4Packet.getFragmentOffset());
+    jsonSb.append(",\"ip_header_checksum\":").append(ipv4Packet.getHeaderChecksum());
+    jsonSb.append(",\"ip_id\":").append(ipv4Packet.getId());
+    jsonSb.append(",\"ip_header_length\":").append(ipv4Packet.getIhl());
+    jsonSb.append(",\"ip_protocol\":").append(ipv4Packet.getProtocol());
+    jsonSb.append(",\"ip_src\":").append(ipv4Packet.getSource());
+    jsonSb.append(",\"ip_src_addr\":\"").append(ipv4Packet.getSourceAddress().getHostAddress());
+    jsonSb.append("\",\"ip_tos\":").append(ipv4Packet.getTos());
+    jsonSb.append(",\"ip_total_length\":").append(ipv4Packet.getTotalLength());
+    jsonSb.append(",\"ip_ttl\":").append(ipv4Packet.getTtl());
+    jsonSb.append(",\"ip_version\":").append(ipv4Packet.getVersion());
+    jsonSb.append('}');
+
+    // tcp header
+    if (tcpPacket != null) {
+      jsonSb.append(",\"tcp_header\":{\"ack\":").append(tcpPacket.getAck());
+      jsonSb.append(",\"checksum\":").append(tcpPacket.getChecksum());
+      jsonSb.append(",\"data_length\":").append(tcpPacket.getDataLength());
+      jsonSb.append(",\"data_offset\":").append(tcpPacket.getDataOffset());
+      jsonSb.append(",\"dst_addr\":\"").append(tcpPacket.getDestinationAddress().getHostAddress());
+      jsonSb.append("\",\"dst_port\":").append(tcpPacket.getDestinationPort());
+      jsonSb.append(",\"direction\":").append(tcpPacket.getDirection());
+      jsonSb.append(",\"flags\":").append(tcpPacket.getFlags());
+      jsonSb.append(",\"reassembled_length \":").append(tcpPacket.getReassembledLength());
+      jsonSb.append(",\"relative_ack\":").append(tcpPacket.getRelativeAck());
+      jsonSb.append(",\"relative_seq\":").append(tcpPacket.getRelativeSeq());
+      jsonSb.append(",\"seq\":").append(tcpPacket.getSeq());
+      jsonSb.append(",\"session_key\":\"").append(tcpPacket.getSessionKey());
+      jsonSb.append("\",\"src_addr\":\"").append(tcpPacket.getSourceAddress().getHostAddress());
+      jsonSb.append("\",\"src_port\":").append(tcpPacket.getSourcePort());
+      jsonSb.append(",\"total_length\":").append(tcpPacket.getTotalLength());
+      jsonSb.append(",\"urgent_pointer\":").append(tcpPacket.getUrgentPointer());
+      jsonSb.append(",\"window\":").append(tcpPacket.getWindow());
+      jsonSb.append('}');
+    }
+
+    // udp headers
+    if (udpPacket != null) {
+      jsonSb.append(",\"udp_header\":{\"checksum\":").append(udpPacket.getChecksum());
+      jsonSb.append(",\"dst_port\":").append(udpPacket.getDestinationPort());
+      jsonSb.append(",\"length\":").append(udpPacket.getLength());
+      jsonSb.append(",\"src_port\":").append(udpPacket.getSourcePort());
+      jsonSb.append(",\"dst_addr\":\"").append(udpPacket.getDestination().getAddress().getHostAddress());
+      jsonSb.append("\",\"src_addr\":\"").append(udpPacket.getSource().getAddress().getHostAddress());
+      jsonSb.append("\"}");
+    }
+
+    jsonSb.append('}');
+
+    return jsonSb.toString();
+  }
+
+  /**
+   * Gets the json doc using message format.
+   * 
+   * @return the json doc using message format
+   */
+  private String getJsonDocUsingMessageFormat() {
+
+    StringBuffer jsonSb = new StringBuffer(600);
+
+    jsonSb.append(MessageFormat.format(globalHeaderJsonTemplateString, getKey(), packetHeader.getInclLen(), packetHeader.getOrigLen(),
+        packetHeader.getTsSec(), packetHeader.getTsUsec()));
+
+    jsonSb.append(MessageFormat.format(ipv4HeaderJsonTemplateString, ipv4Packet.getDestination(), ipv4Packet.getDestinationAddress()
+        .getHostAddress(), ipv4Packet.getFlags(), ipv4Packet.getFragmentOffset(), ipv4Packet.getHeaderChecksum(), ipv4Packet.getId(),
+        ipv4Packet.getIhl(), ipv4Packet.getProtocol(), ipv4Packet.getSource(), ipv4Packet.getSourceAddress().getHostAddress(), ipv4Packet
+            .getTos(), ipv4Packet.getTotalLength(), ipv4Packet.getTtl(), ipv4Packet.getVersion()));
+
+    // tcp header
+    if (tcpPacket != null) {
+      jsonSb.append(MessageFormat.format(tcpHeaderJsonTemplateString, tcpPacket.getAck(), tcpPacket.getChecksum(), tcpPacket
+          .getDataLength(), tcpPacket.getDataOffset(), tcpPacket.getDestinationAddress().getHostAddress(), tcpPacket.getDestinationPort(),
+          tcpPacket.getDirection(), tcpPacket.getFlags(), tcpPacket.getReassembledLength(), tcpPacket.getRelativeAck(), tcpPacket
+              .getRelativeSeq(), tcpPacket.getSeq(), tcpPacket.getSessionKey(), tcpPacket.getSourceAddress().getHostAddress(), tcpPacket
+              .getSourcePort(), tcpPacket.getTotalLength(), tcpPacket.getUrgentPointer(), tcpPacket.getWindow()));
+    } else
+    // udp headers
+    if (udpPacket != null) {
+      jsonSb.append(MessageFormat.format(udpHeaderJsonTemplateString, udpPacket.getChecksum(), udpPacket.getDestinationPort(),
+          udpPacket.getLength(), udpPacket.getSourcePort(), udpPacket.getDestination().getAddress().getHostAddress(), udpPacket.getSource()
+              .getAddress().getHostAddress()));
+
+    } else {
+      jsonSb.append('}');
+    }
+    return jsonSb.toString().replace('<', '{').replace('>', '}');
+  }
+
+  /**
+   * Gets the json index doc using sb append.
+   * 
+   * @return the json index doc using sb append
+   */
+  private String getJsonIndexDocUsingSBAppend() {
+
+	Long ts_micro = getPacketTimeInNanos() / 1000L;
+	StringBuffer jsonSb = new StringBuffer(175);
+
+	jsonSb.append("{\"pcap_id\":\"").append(getShortKey());
+    jsonSb.append("\",\"ip_protocol\":").append(ipv4Packet.getProtocol());
+    jsonSb.append(",\"ip_id\":").append(ipv4Packet.getId());
+    jsonSb.append(",\"frag_offset\":").append(ipv4Packet.getFragmentOffset());
+    jsonSb.append(",\"ts_micro\":").append(ts_micro);
+
+
+    // tcp header
+    if (tcpPacket != null) {
+      jsonSb.append(",\"ip_src_addr\":\"").append(tcpPacket.getSourceAddress().getHostAddress());
+      jsonSb.append("\",\"ip_src_port\":").append(tcpPacket.getSourcePort());
+      jsonSb.append(",\"ip_dst_addr\":\"").append(tcpPacket.getDestinationAddress().getHostAddress());
+      jsonSb.append("\",\"ip_dst_port\":").append(tcpPacket.getDestinationPort());
+    }
+
+    // udp headers
+    if (udpPacket != null) {
+      jsonSb.append(",\"ip_src_addr\":\"").append(udpPacket.getSource().getAddress().getHostAddress());
+      jsonSb.append("\",\"ip_src_port\":").append(udpPacket.getSourcePort());
+      jsonSb.append(",\"ip_dst_addr\":\"").append(udpPacket.getDestination().getAddress().getHostAddress());
+      jsonSb.append("\",\"ip_dst_port\":").append(udpPacket.getDestinationPort());
+    }
+
+    jsonSb.append('}');
+
+    return jsonSb.toString();
+  }
+  
+  public long getPacketTimeInNanos()
+  {
+	  if ( getGlobalHeader().getMagicNumber() == 0xa1b2c3d4 || getGlobalHeader().getMagicNumber() == 0xd4c3b2a1 )
+	  {
+		  //Time is in micro assemble as nano
+		  LOG.info("Times are in micro according to the magic number");
+		  return getPacketHeader().getTsSec() * 1000000000L + getPacketHeader().getTsUsec() * 1000L ; 
+	  }
+	  else if ( getGlobalHeader().getMagicNumber() == 0xa1b23c4d || getGlobalHeader().getMagicNumber() == 0x4d3cb2a1 ) {
+		//Time is in nano assemble as nano
+		  LOG.info("Times are in nano according to the magic number");
+		  return getPacketHeader().getTsSec() * 1000000000L + getPacketHeader().getTsUsec() ; 
+	  }
+	  //Default assume time is in micro assemble as nano
+	  LOG.warn("Unknown magic number. Defaulting to micro");
+	  return getPacketHeader().getTsSec() * 1000000000L + getPacketHeader().getTsUsec() * 1000L ;  
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/0117987e/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/PcapByteInputStream.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/PcapByteInputStream.java b/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/PcapByteInputStream.java
new file mode 100644
index 0000000..e2d56c8
--- /dev/null
+++ b/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/PcapByteInputStream.java
@@ -0,0 +1,185 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.pcap;
+
+import java.io.ByteArrayInputStream;
+import java.io.DataInputStream;
+import java.io.IOException;
+
+import org.krakenapps.pcap.PcapInputStream;
+import org.krakenapps.pcap.file.GlobalHeader;
+import org.krakenapps.pcap.packet.PacketHeader;
+import org.krakenapps.pcap.packet.PcapPacket;
+import org.krakenapps.pcap.util.Buffer;
+import org.krakenapps.pcap.util.ByteOrderConverter;
+import org.krakenapps.pcap.util.ChainBuffer;
+
+/**
+ * The Class PcapByteInputStream.
+ * 
+ * @author sheetal
+ * @version $Revision: 1.0 $
+ */
+public class PcapByteInputStream implements PcapInputStream {
+
+  /** The is. */
+  private DataInputStream is;
+
+  /** The global header. */
+  private GlobalHeader globalHeader;
+
+  /**
+   * Opens pcap file input stream.
+   * 
+   * @param pcap
+   *          the byte array to be read
+   * 
+   * @throws IOException
+   *           Signals that an I/O exception has occurred.
+   */
+  public PcapByteInputStream(byte[] pcap) throws IOException {
+    is = new DataInputStream(new ByteArrayInputStream(pcap)); // $codepro.audit.disable
+                                                              // closeWhereCreated
+    readGlobalHeader();
+  }
+
+  /**
+   * Reads a packet from pcap byte array.
+   * 
+   * @return the packet throws IOException the stream has been closed and the
+   *         contained input stream does not support reading after close, or
+   *         another I/O error occurs. * @throws IOException Signals that an I/O
+   *         exception has occurred. * @see
+   *         org.krakenapps.pcap.PcapInputStream#getPacket()
+   * @throws IOException
+   *           Signals that an I/O exception has occurred.
+   */
+
+  public PcapPacket getPacket() throws IOException {
+    return readPacket(globalHeader.getMagicNumber());
+  }
+
+  /**
+   * Gets the global header.
+   * 
+   * 
+   * @return the global header
+   */
+  public GlobalHeader getGlobalHeader() {
+    return globalHeader;
+  }
+
+  /**
+   * Read global header.
+   * 
+   * 
+   * @throws IOException
+   *           Signals that an I/O exception has occurred.
+   */
+  private void readGlobalHeader() throws IOException {
+    int magic = is.readInt();
+    short major = is.readShort();
+    short minor = is.readShort();
+    int tz = is.readInt();
+    int sigfigs = is.readInt();
+    int snaplen = is.readInt();
+    int network = is.readInt();
+
+    globalHeader = new GlobalHeader(magic, major, minor, tz, sigfigs, snaplen,
+        network);
+
+    if (globalHeader.getMagicNumber() == 0xD4C3B2A1) {
+      globalHeader.swapByteOrder();
+    }
+  }
+
+  /**
+   * Read packet.
+   * 
+   * @param magicNumber
+   *          the magic number
+   * @return the pcap packet * @throws IOException Signals that an I/O exception
+   *         has occurred. * @throws EOFException the EOF exception
+   * @throws IOException
+   *           Signals that an I/O exception has occurred.
+   */
+  private PcapPacket readPacket(int magicNumber) throws IOException {
+    PacketHeader packetHeader = readPacketHeader(magicNumber);
+    Buffer packetData = readPacketData(packetHeader.getInclLen());
+    return new PcapPacket(packetHeader, packetData);
+  }
+
+  /**
+   * Read packet header.
+   * 
+   * @param magicNumber
+   *          the magic number
+   * @return the packet header * @throws IOException Signals that an I/O
+   *         exception has occurred. * @throws EOFException the EOF exception
+   * @throws IOException
+   *           Signals that an I/O exception has occurred.
+   */
+  private PacketHeader readPacketHeader(int magicNumber) throws IOException {
+    int tsSec = is.readInt();
+    int tsUsec = is.readInt();
+    int inclLen = is.readInt();
+    int origLen = is.readInt();
+
+    if (magicNumber == 0xD4C3B2A1) {
+      tsSec = ByteOrderConverter.swap(tsSec);
+      tsUsec = ByteOrderConverter.swap(tsUsec);
+      inclLen = ByteOrderConverter.swap(inclLen);
+      origLen = ByteOrderConverter.swap(origLen);
+    }
+
+    return new PacketHeader(tsSec, tsUsec, inclLen, origLen);
+  }
+
+  /**
+   * Read packet data.
+   * 
+   * @param packetLength
+   *          the packet length
+   * @return the buffer * @throws IOException Signals that an I/O exception has
+   *         occurred.
+   * @throws IOException
+   *           Signals that an I/O exception has occurred.
+   */
+  private Buffer readPacketData(int packetLength) throws IOException {
+    byte[] packets = new byte[packetLength];
+    is.read(packets);
+
+    Buffer payload = new ChainBuffer();
+    payload.addLast(packets);
+    return payload;
+    // return new PacketPayload(packets);
+  }
+
+  /**
+   * Closes pcap stream handle.
+   * 
+   * 
+   * @throws IOException
+   *           Signals that an I/O exception has occurred. * @see
+   *           org.krakenapps.pcap.PcapInputStream#close()
+   */
+
+  public void close() throws IOException {
+    is.close(); // $codepro.audit.disable closeInFinally
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/0117987e/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/PcapByteOutputStream.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/PcapByteOutputStream.java b/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/PcapByteOutputStream.java
new file mode 100644
index 0000000..06d6af6
--- /dev/null
+++ b/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/PcapByteOutputStream.java
@@ -0,0 +1,305 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+// $codepro.audit.disable explicitThisUsage, lossOfPrecisionInCast
+package org.apache.metron.pcap;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.nio.BufferUnderflowException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.log4j.Logger;
+import org.krakenapps.pcap.PcapOutputStream;
+import org.krakenapps.pcap.file.GlobalHeader;
+import org.krakenapps.pcap.packet.PacketHeader;
+import org.krakenapps.pcap.packet.PcapPacket;
+import org.krakenapps.pcap.util.Buffer;
+
+// TODO: Auto-generated Javadoc
+/**
+ * The Class PcapByteOutputStream.
+ * 
+ * @author sheetal
+ * @version $Revision: 1.0 $
+ */
+public class PcapByteOutputStream implements PcapOutputStream {
+
+  /** The Constant LOG. */
+  private static final Logger LOG = Logger
+      .getLogger(PcapByteOutputStream.class);
+
+  /** The Constant MAX_CACHED_PACKET_NUMBER. */
+  private static final int MAX_CACHED_PACKET_NUMBER = 1000;
+
+  /** The cached packet num. */
+  private int cachedPacketNum = 0; // NOPMD by sheetal on 1/29/14 2:34 PM
+
+  /** The baos. */
+  private ByteArrayOutputStream baos; // NOPMD by sheetal on 1/29/14 2:34 PM
+
+  /** The list. */
+  private List<Byte> list; // NOPMD by sheetal on 1/29/14 2:34 PM
+
+  /**
+   * Instantiates a new pcap byte output stream.
+   * 
+   * @param baos
+   *          the baos
+   */
+  public PcapByteOutputStream(ByteArrayOutputStream baos) {
+    this.baos = baos;
+    list = new ArrayList<Byte>();
+    createGlobalHeader();
+  }
+
+  /**
+   * Instantiates a new pcap byte output stream.
+   * 
+   * @param baos
+   *          the baos
+   * @param header
+   *          the header
+   */
+  public PcapByteOutputStream(ByteArrayOutputStream baos, GlobalHeader header) {
+    this.baos = baos;
+    list = new ArrayList<Byte>();
+    copyGlobalHeader(header);
+  }
+
+  /**
+   * Creates the global header.
+   */
+  private void createGlobalHeader() {
+    /* magic number(swapped) */
+    list.add((byte) 0xd4);
+    list.add((byte) 0xc3);
+    list.add((byte) 0xb2);
+    list.add((byte) 0xa1);
+
+    /* major version number */
+    list.add((byte) 0x02);
+    list.add((byte) 0x00);
+
+    /* minor version number */
+    list.add((byte) 0x04);
+    list.add((byte) 0x00);
+
+    /* GMT to local correction */
+    list.add((byte) 0x00);
+    list.add((byte) 0x00);
+    list.add((byte) 0x00);
+    list.add((byte) 0x00);
+
+    /* accuracy of timestamps */
+    list.add((byte) 0x00);
+    list.add((byte) 0x00);
+    list.add((byte) 0x00);
+    list.add((byte) 0x00);
+
+    /* max length of captured packets, in octets */
+    list.add((byte) 0xff);
+    list.add((byte) 0xff);
+    list.add((byte) 0x00);
+    list.add((byte) 0x00);
+
+    /* data link type(ethernet) */
+    list.add((byte) 0x01);
+    list.add((byte) 0x00);
+    list.add((byte) 0x00);
+    list.add((byte) 0x00);
+  }
+
+  /**
+   * Copy global header.
+   * 
+   * @param header
+   *          the header
+   */
+  private void copyGlobalHeader(GlobalHeader header) {
+    final byte[] magicNumber = intToByteArray(header.getMagicNumber());
+    final byte[] majorVersion = shortToByteArray(header.getMajorVersion());
+    final byte[] minorVersion = shortToByteArray(header.getMinorVersion());
+    final byte[] zone = intToByteArray(header.getThiszone());
+    final byte[] sigFigs = intToByteArray(header.getSigfigs());
+    final byte[] snapLen = intToByteArray(header.getSnaplen());
+    final byte[] network = intToByteArray(header.getNetwork());
+
+    list.add(magicNumber[0]);
+    list.add(magicNumber[1]);
+    list.add(magicNumber[2]);
+    list.add(magicNumber[3]);
+
+    list.add(majorVersion[1]);
+    list.add(majorVersion[0]);
+
+    list.add(minorVersion[1]);
+    list.add(minorVersion[0]);
+
+    list.add(zone[3]);
+    list.add(zone[2]);
+    list.add(zone[1]);
+    list.add(zone[0]);
+
+    list.add(sigFigs[3]);
+    list.add(sigFigs[2]);
+    list.add(sigFigs[1]);
+    list.add(sigFigs[0]);
+
+    list.add(snapLen[3]);
+    list.add(snapLen[2]);
+    list.add(snapLen[1]);
+    list.add(snapLen[0]);
+
+    list.add(network[3]);
+    list.add(network[2]);
+    list.add(network[1]);
+    list.add(network[0]);
+  }
+
+  /*
+   * (non-Javadoc)
+   * 
+   * @see org.krakenapps.pcap.PcapOutputStream#write(org.krakenapps.pcap.packet
+   * .PcapPacket)
+   */
+  /**
+   * Method write.
+   * 
+   * @param packet
+   *          PcapPacket
+   * 
+   * 
+   * @throws IOException
+   *           * @see org.krakenapps.pcap.PcapOutputStream#write(PcapPacket) * @see
+   *           org.krakenapps.pcap.PcapOutputStream#write(PcapPacket)
+   */
+ 
+  public void write(PcapPacket packet) throws IOException {
+    PacketHeader packetHeader = packet.getPacketHeader();
+
+    int tsSec = packetHeader.getTsSec();
+    int tsUsec = packetHeader.getTsUsec();
+    int inclLen = packetHeader.getInclLen();
+    int origLen = packetHeader.getOrigLen();
+
+    addInt(tsSec);
+    addInt(tsUsec);
+    addInt(inclLen);
+    addInt(origLen);
+
+    Buffer payload = packet.getPacketData();
+
+    try {
+      payload.mark();
+      while (true) {
+        list.add(payload.get());
+      }
+    } catch (BufferUnderflowException e) {
+      //LOG.debug("Ignorable exception while writing packet", e);
+      payload.reset();
+    }
+
+    cachedPacketNum++;
+    if (cachedPacketNum == MAX_CACHED_PACKET_NUMBER) {
+      flush();
+    }
+  }
+
+  /**
+   * Adds the int.
+   * 
+   * @param number
+   *          the number
+   */
+  private void addInt(int number) {
+    list.add((byte) (number & 0xff));
+    list.add((byte) ((number & 0xff00) >> 8));
+    list.add((byte) ((number & 0xff0000) >> 16));
+    list.add((byte) ((number & 0xff000000) >> 24));
+  }
+
+  /**
+   * Int to byte array.
+   * 
+   * @param number
+   *          the number
+   * 
+   * @return the byte[]
+   */
+  private byte[] intToByteArray(int number) {
+    return new byte[] { (byte) (number >>> 24), (byte) (number >>> 16),
+        (byte) (number >>> 8), (byte) number };
+  }
+
+  /**
+   * Short to byte array.
+   * 
+   * @param number
+   *          the number
+   * 
+   * @return the byte[]
+   */
+  private byte[] shortToByteArray(short number) {
+    return new byte[] { (byte) (number >>> 8), (byte) number };
+  }
+
+  /*
+   * (non-Javadoc)
+   * 
+   * @see org.krakenapps.pcap.PcapOutputStream#flush()
+   */
+  /**
+   * Method flush.
+   * 
+   * 
+   * @throws IOException
+   *           * @see org.krakenapps.pcap.PcapOutputStream#flush() * @see
+   *           org.krakenapps.pcap.PcapOutputStream#flush()
+   */
+ 
+  public void flush() throws IOException {
+    byte[] fileBinary = new byte[list.size()];
+    for (int i = 0; i < fileBinary.length; i++) {
+      fileBinary[i] = list.get(i);
+    }
+
+    list.clear();
+    baos.write(fileBinary);
+    cachedPacketNum = 0;
+  }
+
+  /*
+   * (non-Javadoc)
+   * 
+   * @see org.krakenapps.pcap.PcapOutputStream#close()
+   */
+  /**
+   * Method close.
+   * 
+   * 
+   * @throws IOException
+   *           * @see org.krakenapps.pcap.PcapOutputStream#close() * @see
+   *           org.krakenapps.pcap.PcapOutputStream#close()
+   */
+ 
+  public void close() throws IOException {
+    flush();
+    baos.close(); // $codepro.audit.disable closeInFinally
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/0117987e/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/PcapMerger.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/PcapMerger.java b/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/PcapMerger.java
new file mode 100644
index 0000000..48d25c7
--- /dev/null
+++ b/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/PcapMerger.java
@@ -0,0 +1,262 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.pcap;
+
+import java.io.ByteArrayOutputStream;
+import java.io.EOFException;
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.log4j.Logger;
+
+import org.krakenapps.pcap.packet.PcapPacket;
+import org.krakenapps.pcap.file.GlobalHeader;
+
+// TODO: Auto-generated Javadoc
+/**
+ * The Class PcapMerger.
+ * 
+ * @author sheetal
+ * @version $Revision: 1.0 $
+ */
+public final class PcapMerger {
+
+  /** The Constant LOG. */
+  private static final Logger LOG = Logger.getLogger(PcapMerger.class);
+  
+  /** The comparator for PcapPackets */
+  private static PcapPacketComparator PCAP_PACKET_COMPARATOR = new PcapPacketComparator();
+
+  /**
+   * Instantiates a new pcap merger.
+   */
+  private PcapMerger() { // $codepro.audit.disable emptyMethod
+  }
+
+  /**
+   * Merge two pcap byte arrays.
+   * 
+   * @param baos
+   *          the baos
+   * @param pcaps
+   *          the pcaps
+   * 
+   * @throws IOException
+   *           if there is no byte array, no access permission, or other io
+   *           related problems.
+   */
+  // public static void merge(byte[] to, byte[] from) throws IOException {
+  // PcapByteInputStream is = null;
+  // PcapByteOutputStream os = null;
+  // ByteArrayOutputStream baos = null;
+  // try {
+  // is = new PcapByteInputStream(from);
+  // baos = new ByteArrayOutputStream();
+  // os = new PcapByteOutputStream(baos, is.getGlobalHeader());
+  //
+  // writePacket(is, os);
+  // } finally {
+  // closeInput(is);
+  // if (baos != null) {
+  // baos.close();
+  // }
+  // closeOutput(os);
+  // }
+  // }
+
+  public static void merge(ByteArrayOutputStream baos, List<byte[]> pcaps)
+      throws IOException {
+    PcapByteInputStream is = null;
+    PcapByteOutputStream os = null;
+    ByteArrayOutputStream unsortedBaos = new ByteArrayOutputStream();
+    
+    try {
+      int i = 1;
+      for (byte[] pcap : pcaps) {
+        is = new PcapByteInputStream(pcap);
+        if (i == 1) {
+          os = new PcapByteOutputStream(unsortedBaos, is.getGlobalHeader());
+        }
+
+        writePacket(is, os);
+        i++;
+        closeInput(is);
+      }
+    } finally {
+      if (unsortedBaos != null) {
+        unsortedBaos.close();
+      }
+      closeOutput(os);
+      sort(baos, unsortedBaos.toByteArray());
+    }
+  }
+
+  /**
+   * Merge byte array1 with byte array2, and write to output byte array. It
+   * doesn't hurt original pcap dump byte arrays.
+   * 
+   * @param baos
+   *          the baos
+   * @param pcaps
+   *          the pcaps
+   * 
+   * @throws IOException
+   *           if there are no source byte arrays, have no read and/or write
+   *           permissions, or anything else.
+   */
+  public static void merge(ByteArrayOutputStream baos, byte[]... pcaps) // $codepro.audit.disable
+                                                                        // overloadedMethods
+      throws IOException {
+    merge(baos, Arrays.asList(pcaps));
+
+  }
+  
+  /**
+   * Sort the potentially unsorted byte array according to the timestamp
+   * in the packet header
+   * 
+   * @param unsortedBytes
+   * 	a byte array of a pcap file
+   * 
+   * @return byte array of a pcap file with packets in cronological order
+   * 
+   * @throws IOException
+   * 	if there are no source byte arrays, have no read and or write 
+   * 	permission, or anything else.
+   */
+  private static void sort(ByteArrayOutputStream baos, byte[] unsortedBytes) throws IOException {
+	  PcapByteInputStream pcapIs = new PcapByteInputStream(unsortedBytes);
+	  PcapByteOutputStream pcapOs = new PcapByteOutputStream(baos, pcapIs.getGlobalHeader());
+	  PcapPacket packet;
+	  ArrayList<PcapPacket> packetList = new ArrayList<PcapPacket>();
+	  
+	  try {
+		  while (true) {
+			  packet = pcapIs.getPacket();
+			  if (packet == null)
+				  break;
+			  packetList.add(packet);
+			  LOG.debug("Presort packet: " + packet.getPacketHeader().toString());
+		  }
+	  } catch (EOFException e) {
+		  //LOG.debug("Ignoreable exception in sort", e);
+	  }
+	  
+	  Collections.sort(packetList, PCAP_PACKET_COMPARATOR);
+	  for (PcapPacket p : packetList) {
+		  pcapOs.write(p);
+		  LOG.debug("Postsort packet: " + p.getPacketHeader().toString());
+	  }
+	  pcapOs.close();  
+  }
+  
+  /**
+   * Write packet.
+   * 
+   * @param is
+   *          the is
+   * @param os
+   *          the os
+   * 
+   * @throws IOException
+   *           Signals that an I/O exception has occurred.
+   */
+  private static void writePacket(PcapByteInputStream is,
+      PcapByteOutputStream os) throws IOException {
+    PcapPacket packet = null;
+    try {
+      while (true) {
+        packet = is.getPacket();
+        if (packet == null) {
+          break;
+        }
+        os.write(packet);
+      }
+    } catch (EOFException e) {
+      //LOG.debug("Ignorable exception in writePacket", e);
+    }
+
+  }
+
+  /**
+   * Close input.
+   * 
+   * @param is
+   *          the is
+   */
+  private static void closeInput(PcapByteInputStream is) {
+    if (is == null) {
+      return;
+    }
+    try {
+      is.close(); // $codepro.audit.disable closeInFinally
+    } catch (IOException e) {
+      LOG.error("Failed to close input stream", e);
+    }
+  }
+
+  /**
+   * Close output.
+   * 
+   * @param os
+   *          the os
+   */
+  private static void closeOutput(PcapByteOutputStream os) {
+    if (os == null) {
+      return;
+    }
+    try {
+      os.close();
+    } catch (IOException e) {
+      LOG.error("Failed to close output stream", e);
+
+    }
+  }
+
+  /**
+   * The main method.
+   * 
+   * @param args
+   *          the arguments
+   * 
+   * @throws IOException
+   *           Signals that an I/O exception has occurred.
+   */
+  public static void main(String[] args) throws IOException {
+    byte[] b1 = FileUtils.readFileToByteArray(new File(
+        "/Users/sheetal/Downloads/constructedTcpDump.1.pcap"));
+    byte[] b2 = FileUtils.readFileToByteArray(new File(
+        "/Users/sheetal/Downloads/constructedTcpDump.2.pcap"));
+    byte[] b3 = FileUtils.readFileToByteArray(new File(
+        "/Users/sheetal/Downloads/constructedTcpDump.3.pcap"));
+
+    ByteArrayOutputStream boas = new ByteArrayOutputStream(); // $codepro.audit.disable
+                                                              // closeWhereCreated
+    PcapMerger.merge(boas, b1, b2, b3);
+
+    FileUtils.writeByteArrayToFile(new File(
+        "/Users/sheetal/Downloads/constructedTcpDump.automerged.1.2.pcap"),
+        boas.toByteArray(), false);
+
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/0117987e/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/PcapPacketComparator.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/PcapPacketComparator.java b/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/PcapPacketComparator.java
new file mode 100644
index 0000000..96f64a0
--- /dev/null
+++ b/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/PcapPacketComparator.java
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.pcap;
+
+import java.util.Comparator;
+
+import org.apache.log4j.Logger;
+
+import org.krakenapps.pcap.packet.PcapPacket;
+
+public class PcapPacketComparator implements Comparator<PcapPacket> {
+
+	/** The Constant LOG. */
+	private static final Logger LOG = Logger.getLogger(PcapMerger.class);
+	
+	public int compare(PcapPacket p1, PcapPacket p2) {
+
+		Long p1time = new Long(p1.getPacketHeader().getTsSec()) * 1000000L + new Long(p1.getPacketHeader().getTsUsec());
+		Long p2time = new Long(p2.getPacketHeader().getTsSec()) * 1000000L + new Long(p2.getPacketHeader().getTsUsec());
+		Long delta = p1time - p2time;
+		LOG.debug("p1time: " + p1time.toString() + " p2time: " + p2time.toString() + " delta: " + delta.toString());
+		return delta.intValue();
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/0117987e/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/spout/HDFSWriterCallback.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/spout/HDFSWriterCallback.java b/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/spout/HDFSWriterCallback.java
new file mode 100644
index 0000000..fcfcafd
--- /dev/null
+++ b/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/spout/HDFSWriterCallback.java
@@ -0,0 +1,168 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.pcap.spout;
+
+import com.google.common.base.Joiner;
+import com.google.common.collect.ImmutableList;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.log4j.Logger;
+import storm.kafka.Callback;
+import storm.kafka.EmitContext;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.List;
+
+public class HDFSWriterCallback implements Callback {
+  static final long serialVersionUID = 0xDEADBEEFL;
+  private static final Logger LOG = Logger.getLogger(HDFSWriterCallback.class);
+  public static final byte[] PCAP_GLOBAL_HEADER = new byte[] {
+          (byte) 0xd4, (byte) 0xc3, (byte) 0xb2, (byte) 0xa1, 0x02, 0x00, 0x04, 0x00, 0x00, 0x00, 0x00, 0x00
+          ,0x00, 0x00, 0x00, 0x00, (byte) 0xff, (byte) 0xff, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00
+  };
+
+  private static final List<Object> RET_TUPLE = ImmutableList.of((Object)Byte.valueOf((byte) 0x00), Byte.valueOf((byte)0x00));
+  private FileSystem fs;
+  private SequenceFile.Writer writer;
+  private HDFSWriterConfig config;
+  private long batchStartTime;
+  private long numWritten;
+  private EmitContext context;
+
+  public HDFSWriterCallback() {
+    //this.config = config;
+  }
+
+  public HDFSWriterCallback withConfig(HDFSWriterConfig config) {
+    LOG.info("Configured: " + config);
+    this.config = config;
+    return this;
+  }
+
+  @Override
+  public List<Object> apply(List<Object> tuple, EmitContext context) {
+
+    LongWritable ts = (LongWritable) tuple.get(0);
+    BytesWritable rawPacket = (BytesWritable)tuple.get(1);
+    try {
+      turnoverIfNecessary(ts.get());
+      writer.append(ts, headerize(rawPacket.getBytes()));
+      writer.hflush();
+    } catch (IOException e) {
+      LOG.error(e.getMessage(), e);
+      //drop?  not sure..
+    }
+    return RET_TUPLE;
+  }
+
+  private static BytesWritable headerize(byte[] packet) {
+    byte[] ret = new byte[packet.length + PCAP_GLOBAL_HEADER.length];
+    int offset = 0;
+    System.arraycopy(PCAP_GLOBAL_HEADER, 0, ret, offset, PCAP_GLOBAL_HEADER.length);
+    offset += PCAP_GLOBAL_HEADER.length;
+    System.arraycopy(packet, 0, ret, offset, packet.length);
+    return new BytesWritable(ret);
+  }
+
+
+  private synchronized void turnoverIfNecessary(long ts) throws IOException {
+    long duration = ts - batchStartTime;
+    if(batchStartTime == 0L || duration > config.getMaxTimeMS() || numWritten > config.getNumPackets()) {
+      //turnover
+      Path path = getPath(ts);
+      if(writer != null) {
+        writer.close();
+      }
+      writer = SequenceFile.createWriter(new Configuration()
+              , SequenceFile.Writer.file(path)
+              , SequenceFile.Writer.keyClass(LongWritable.class)
+              , SequenceFile.Writer.valueClass(BytesWritable.class)
+      );
+      //reset state
+      LOG.info("Turning over and writing to " + path);
+      batchStartTime = ts;
+      numWritten = 0;
+    }
+  }
+
+  private Path getPath(long ts) {
+    String fileName = Joiner.on("_").join("pcap"
+            , "" + ts
+            , context.get(EmitContext.Type.UUID)
+    );
+    return new Path(config.getOutputPath(), fileName);
+  }
+
+  @Override
+  public void initialize(EmitContext context) {
+    this.context = context;
+    try {
+      fs = FileSystem.get(new Configuration());
+    } catch (IOException e) {
+      throw new IllegalStateException("Unable to create filesystem", e);
+    }
+  }
+
+  /**
+   * Closes this resource, relinquishing any underlying resources.
+   * This method is invoked automatically on objects managed by the
+   * {@code try}-with-resources statement.
+   * <p/>
+   * <p>While this interface method is declared to throw {@code
+   * Exception}, implementers are <em>strongly</em> encouraged to
+   * declare concrete implementations of the {@code close} method to
+   * throw more specific exceptions, or to throw no exception at all
+   * if the close operation cannot fail.
+   * <p/>
+   * <p><em>Implementers of this interface are also strongly advised
+   * to not have the {@code close} method throw {@link
+   * InterruptedException}.</em>
+   * <p/>
+   * This exception interacts with a thread's interrupted status,
+   * and runtime misbehavior is likely to occur if an {@code
+   * InterruptedException} is {@linkplain Throwable#addSuppressed
+   * suppressed}.
+   * <p/>
+   * More generally, if it would cause problems for an
+   * exception to be suppressed, the {@code AutoCloseable.close}
+   * method should not throw it.
+   * <p/>
+   * <p>Note that unlike the {@link Closeable#close close}
+   * method of {@link Closeable}, this {@code close} method
+   * is <em>not</em> required to be idempotent.  In other words,
+   * calling this {@code close} method more than once may have some
+   * visible side effect, unlike {@code Closeable.close} which is
+   * required to have no effect if called more than once.
+   * <p/>
+   * However, implementers of this interface are strongly encouraged
+   * to make their {@code close} methods idempotent.
+   *
+   * @throws Exception if this resource cannot be closed
+   */
+  @Override
+  public void close() throws Exception {
+    if(writer != null) {
+      writer.close();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/0117987e/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/spout/HDFSWriterConfig.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/spout/HDFSWriterConfig.java b/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/spout/HDFSWriterConfig.java
new file mode 100644
index 0000000..60a23c2
--- /dev/null
+++ b/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/spout/HDFSWriterConfig.java
@@ -0,0 +1,97 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.pcap.spout;
+
+import com.google.common.base.Splitter;
+import com.google.common.collect.Iterables;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+
+public class HDFSWriterConfig implements Serializable {
+  static final long serialVersionUID = 0xDEADBEEFL;
+  private long numPackets;
+  private long maxTimeMS;
+  private String outputPath;
+  private String zookeeperQuorum;
+
+  public HDFSWriterConfig withOutputPath(String path) {
+    outputPath = path;
+    return this;
+  }
+
+  public HDFSWriterConfig withNumPackets(long n) {
+    numPackets = n;
+    return this;
+  }
+
+  public HDFSWriterConfig withMaxTimeMS(long t) {
+    maxTimeMS = t;
+    return this;
+  }
+
+  public HDFSWriterConfig withZookeeperQuorum(String zookeeperQuorum) {
+    this.zookeeperQuorum = zookeeperQuorum;
+    return this;
+  }
+
+  public List<String> getZookeeperServers() {
+    List<String> out = new ArrayList<>();
+    if(zookeeperQuorum != null) {
+      for (String hostPort : Splitter.on(',').split(zookeeperQuorum)) {
+        Iterable<String> tokens = Splitter.on(':').split(hostPort);
+        String host = Iterables.getFirst(tokens, null);
+        if(host != null) {
+          out.add(host);
+        }
+      }
+    }
+    return out;
+  }
+
+  public Integer getZookeeperPort() {
+    if(zookeeperQuorum != null) {
+      String hostPort = Iterables.getFirst(Splitter.on(',').split(zookeeperQuorum), null);
+      String portStr = Iterables.getLast(Splitter.on(':').split(hostPort));
+      return Integer.parseInt(portStr);
+    }
+    return  null;
+  }
+
+  public String getOutputPath() {
+    return outputPath;
+  }
+
+  public long getNumPackets() {
+    return numPackets;
+  }
+
+  public long getMaxTimeMS() {
+    return maxTimeMS;
+  }
+
+  @Override
+  public String toString() {
+    return "HDFSWriterConfig{" +
+            "numPackets=" + numPackets +
+            ", maxTimeMS=" + maxTimeMS +
+            ", outputPath='" + outputPath + '\'' +
+            '}';
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/0117987e/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/utils/PcapUtils.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/utils/PcapUtils.java b/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/utils/PcapUtils.java
new file mode 100644
index 0000000..48e99d2
--- /dev/null
+++ b/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/utils/PcapUtils.java
@@ -0,0 +1,475 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.pcap.utils;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+
+import com.google.common.base.Joiner;
+import org.apache.commons.lang.StringUtils;
+
+import com.google.common.collect.BiMap;
+import com.google.common.collect.HashBiMap;
+import org.json.simple.JSONObject;
+
+/**
+ * The Class PcapUtils.
+ */
+public class PcapUtils {
+
+  /** The Constant SESSION_KEY_SEPERATOR. */
+  private static final char SESSION_KEY_SEPERATOR = '-';
+
+  /** The Constant protocolIdToNameMap. */
+  private static final BiMap<Integer, String> protocolIdToNameMap = HashBiMap
+      .create();
+
+  // private static final Map<Integer, String> protocolIdToNameMap = new
+  // HashMap();
+
+  static {
+
+    protocolIdToNameMap.put(0, "HOPOPT");
+    protocolIdToNameMap.put(1, "ICMP");
+    protocolIdToNameMap.put(2, "IGMP");
+    protocolIdToNameMap.put(3, "GGP");
+    protocolIdToNameMap.put(4, "IPV4");
+    protocolIdToNameMap.put(5, "ST");
+    protocolIdToNameMap.put(6, "TCP");
+    protocolIdToNameMap.put(7, "CBT");
+    protocolIdToNameMap.put(8, "EGP");
+    protocolIdToNameMap.put(9, "IGP");
+    protocolIdToNameMap.put(10, "BBN-RCC-MON");
+    protocolIdToNameMap.put(11, "NVP-II");
+    protocolIdToNameMap.put(12, "PUP");
+    protocolIdToNameMap.put(13, "ARGUS");
+    protocolIdToNameMap.put(14, "EMCON");
+    protocolIdToNameMap.put(15, "XNET");
+    protocolIdToNameMap.put(16, "CHAOS");
+    protocolIdToNameMap.put(17, "UDP");
+    protocolIdToNameMap.put(18, "MUX");
+    protocolIdToNameMap.put(19, "DCN-MEAS");
+    protocolIdToNameMap.put(20, "HMP");
+    protocolIdToNameMap.put(21, "PRM");
+    protocolIdToNameMap.put(22, "XNS-IDP");
+    protocolIdToNameMap.put(23, "TRUNK-1");
+    protocolIdToNameMap.put(24, "TRUNK-2");
+    protocolIdToNameMap.put(25, "LEAF-1");
+    protocolIdToNameMap.put(26, "LEAF-2");
+    protocolIdToNameMap.put(27, "RDP");
+    protocolIdToNameMap.put(28, "IRTP");
+    protocolIdToNameMap.put(29, "ISO-TP4");
+    protocolIdToNameMap.put(30, "NETBLT");
+    protocolIdToNameMap.put(31, "MFE-NSP");
+    protocolIdToNameMap.put(32, "MERIT-INP");
+    protocolIdToNameMap.put(33, "DCCP");
+    protocolIdToNameMap.put(34, "3PC");
+    protocolIdToNameMap.put(35, "IDPR");
+    protocolIdToNameMap.put(36, "XTP");
+    protocolIdToNameMap.put(37, "DDP");
+    protocolIdToNameMap.put(38, "IDPR-CMTP");
+    protocolIdToNameMap.put(39, "TP++");
+    protocolIdToNameMap.put(40, "IL");
+    protocolIdToNameMap.put(41, "IPV6");
+    protocolIdToNameMap.put(42, "SDRP");
+    protocolIdToNameMap.put(43, "IPV6-ROUTE");
+    protocolIdToNameMap.put(44, "IPV6-FRAG");
+    protocolIdToNameMap.put(45, "IDRP");
+    protocolIdToNameMap.put(46, "RSVP");
+    protocolIdToNameMap.put(47, "GRE");
+    protocolIdToNameMap.put(48, "DSR");
+    protocolIdToNameMap.put(49, "BNA");
+    protocolIdToNameMap.put(50, "ESP");
+    protocolIdToNameMap.put(51, "AH");
+    protocolIdToNameMap.put(52, "I-NLSP");
+    protocolIdToNameMap.put(53, "SWIPE");
+    protocolIdToNameMap.put(54, "NARP");
+    protocolIdToNameMap.put(55, "MOBILE");
+    protocolIdToNameMap.put(56, "TLSP");
+    protocolIdToNameMap.put(57, "SKIP");
+    protocolIdToNameMap.put(58, "IPV6-ICMP");
+    protocolIdToNameMap.put(59, "IPV6-NONXT");
+    protocolIdToNameMap.put(60, "IPV6-OPTS");
+    protocolIdToNameMap.put(62, "CFTP");
+    protocolIdToNameMap.put(64, "SAT-EXPAK");
+    protocolIdToNameMap.put(65, "KRYPTOLAN");
+    protocolIdToNameMap.put(66, "RVD");
+    protocolIdToNameMap.put(67, "IPPC");
+    protocolIdToNameMap.put(69, "SAT-MON");
+    protocolIdToNameMap.put(70, "VISA");
+    protocolIdToNameMap.put(71, "IPCV");
+    protocolIdToNameMap.put(72, "CPNX");
+    protocolIdToNameMap.put(73, "CPHB");
+    protocolIdToNameMap.put(74, "WSN");
+    protocolIdToNameMap.put(75, "PVP");
+    protocolIdToNameMap.put(76, "BR-SAT-MON");
+    protocolIdToNameMap.put(77, "SUN-ND");
+    protocolIdToNameMap.put(78, "WB-MON");
+    protocolIdToNameMap.put(79, "WB-EXPAK");
+    protocolIdToNameMap.put(80, "ISO-IP");
+    protocolIdToNameMap.put(81, "VMTP");
+    protocolIdToNameMap.put(82, "SECURE-VMTP");
+    protocolIdToNameMap.put(83, "VINES");
+    protocolIdToNameMap.put(84, "TTP");
+    protocolIdToNameMap.put(85, "NSFNET-IGP");
+    protocolIdToNameMap.put(86, "DGP");
+    protocolIdToNameMap.put(87, "TCF");
+    protocolIdToNameMap.put(88, "EIGRP");
+    protocolIdToNameMap.put(89, "OSPFIGP");
+    protocolIdToNameMap.put(90, "SPRITE-RPC");
+    protocolIdToNameMap.put(91, "LARP");
+    protocolIdToNameMap.put(92, "MTP");
+    protocolIdToNameMap.put(93, "AX.25");
+    protocolIdToNameMap.put(94, "IPIP");
+    protocolIdToNameMap.put(95, "MICP");
+    protocolIdToNameMap.put(96, "SCC-SP");
+    protocolIdToNameMap.put(97, "ETHERIP");
+    protocolIdToNameMap.put(98, "ENCAP");
+    protocolIdToNameMap.put(100, "GMTP");
+    protocolIdToNameMap.put(101, "IFMP");
+    protocolIdToNameMap.put(102, "PNNI");
+    protocolIdToNameMap.put(103, "PIM");
+    protocolIdToNameMap.put(104, "ARIS");
+    protocolIdToNameMap.put(105, "SCPS");
+    protocolIdToNameMap.put(106, "QNX");
+    protocolIdToNameMap.put(107, "A/N");
+    protocolIdToNameMap.put(108, "IPCOMP");
+    protocolIdToNameMap.put(109, "SNP");
+    protocolIdToNameMap.put(110, "COMPAQ-PEER");
+    protocolIdToNameMap.put(111, "IPX-IN-IP");
+    protocolIdToNameMap.put(112, "VRRP");
+    protocolIdToNameMap.put(113, "PGM");
+    protocolIdToNameMap.put(115, "L2TP");
+    protocolIdToNameMap.put(116, "DDX");
+    protocolIdToNameMap.put(117, "IATP");
+    protocolIdToNameMap.put(118, "STP");
+    protocolIdToNameMap.put(119, "SRP");
+    protocolIdToNameMap.put(120, "UTI");
+    protocolIdToNameMap.put(121, "SMP");
+    protocolIdToNameMap.put(122, "SM");
+    protocolIdToNameMap.put(123, "PTP");
+    protocolIdToNameMap.put(124, "ISIS OVER IPV4");
+    protocolIdToNameMap.put(125, "FIRE");
+    protocolIdToNameMap.put(126, "CRTP");
+    protocolIdToNameMap.put(127, "CRUDP");
+    protocolIdToNameMap.put(128, "SSCOPMCE");
+    protocolIdToNameMap.put(129, "IPLT");
+    protocolIdToNameMap.put(130, "SPS");
+    protocolIdToNameMap.put(131, "PIPE");
+    protocolIdToNameMap.put(132, "SCTP");
+    protocolIdToNameMap.put(133, "FC");
+    protocolIdToNameMap.put(134, "RSVP-E2E-IGNORE");
+    protocolIdToNameMap.put(135, "MOBILITY HEADER");
+    protocolIdToNameMap.put(136, "UDPLITE");
+    protocolIdToNameMap.put(137, "MPLS-IN-IP");
+    protocolIdToNameMap.put(138, "MANET");
+    protocolIdToNameMap.put(139, "HIP");
+    protocolIdToNameMap.put(140, "SHIM6");
+    protocolIdToNameMap.put(141, "WESP");
+    protocolIdToNameMap.put(142, "ROHC");
+  }
+
+  /** The Constant protocolNameToIdMap. */
+  private static final BiMap<String, Integer> protocolNameToIdMap = protocolIdToNameMap
+      .inverse();
+
+  // private static final Map<String, Integer> protocolNameToIdMap =
+  // invertMap(protocolIdToNameMap);
+
+  /**
+   * Convert ipv4 ip to hex.
+   * 
+   * @param ipAddress
+   *          the ip address
+   * @return the string
+   */
+  public static String convertIpv4IpToHex(String ipAddress) {
+    StringBuffer hexIp = new StringBuffer(64);
+    String[] ipSegments = ipAddress.split("\\.");
+
+    for (String ipSegment : ipSegments) {
+      hexIp.append(convertIpSegmentToHex(ipSegment));
+    }
+
+    return hexIp.toString();
+
+  }
+
+  public static String convertHexToIpv4Ip(String hex) {
+    List<Integer> ipSegments = new ArrayList<>();
+    for(int i = 0; i < hex.length(); i += 2) {
+      String segment = hex.substring(i, i + 2);
+      ipSegments.add(Integer.parseInt(segment, 16));
+    }
+    return Joiner.on(".").join(ipSegments);
+  }
+
+  /**
+   * Gets the session key.
+   * 
+   * @param srcIp
+   *          the src ip
+   * @param dstIp
+   *          the dst ip
+   * @param protocol
+   *          the protocol
+   * @param srcPort
+   *          the src port
+   * @param dstPort
+   *          the dst port
+   * @return the session key
+   */
+  public static String getSessionKey(String srcIp, String dstIp,
+      String protocol, String srcPort, String dstPort) {
+    return getSessionKey(srcIp, dstIp, protocol, srcPort, dstPort, null, null);
+  }
+
+  /**
+   * Gets the session key.
+   * 
+   * @param srcIp
+   *          the src ip
+   * @param dstIp
+   *          the dst ip
+   * @param protocol
+   *          the protocol
+   * @param srcPort
+   *          the src port
+   * @param dstPort
+   *          the dst port
+   * @param ipId
+   *          the ip id
+   * @param fragmentOffset
+   *          the fragment offset
+   * @return the session key
+   */
+  public static String getSessionKey(String srcIp, String dstIp,
+      String protocol, String srcPort, String dstPort, String ipId,
+      String fragmentOffset) {
+
+    StringBuffer sb = new StringBuffer(40);
+    sb.append(convertIpv4IpToHex(srcIp)).append(SESSION_KEY_SEPERATOR)
+        .append(convertIpv4IpToHex(dstIp)).append(SESSION_KEY_SEPERATOR)
+        .append(protocol == null ? "0" : protocol)
+        .append(SESSION_KEY_SEPERATOR).append(srcPort == null ? "0" : srcPort)
+        .append(SESSION_KEY_SEPERATOR).append(dstPort == null ? "0" : dstPort)
+        .append(SESSION_KEY_SEPERATOR).append(ipId == null ? "0" : ipId)
+        .append(SESSION_KEY_SEPERATOR)
+        .append(fragmentOffset == null ? "0" : fragmentOffset);
+
+    return sb.toString();
+  }
+
+  public static String getSessionKey(JSONObject message) {
+    String srcIp = (String) message.get("ip_src_addr");
+    String dstIp = (String) message.get("ip_dst_addr");
+    Long protocol = (Long) message.get("ip_protocol");
+    Long srcPort = (Long) message.get("ip_src_port");
+    Long dstPort = (Long) message.get("ip_dst_port");
+    Long ipId = (Long) message.get("ip_id");
+    String ipIdString = ipId == null ? null : ipId.toString();
+    Long fragmentOffset = (Long) message.get("frag_offset");
+    String fragmentOffsetString = fragmentOffset == null ? null : fragmentOffset.toString();
+    return PcapUtils.getSessionKey(srcIp, dstIp, protocol.toString(), srcPort.toString(), dstPort.toString(), ipIdString, fragmentOffsetString);
+  }
+
+  public static String getPartialSessionKey(String srcIp, String dstIp,
+                                            String protocol, String srcPort, String dstPort) {
+    StringBuffer sb = new StringBuffer(40);
+    sb.append(convertIpv4IpToHex(srcIp)).append(SESSION_KEY_SEPERATOR)
+            .append(convertIpv4IpToHex(dstIp)).append(SESSION_KEY_SEPERATOR)
+            .append(protocol == null ? "0" : protocol)
+            .append(SESSION_KEY_SEPERATOR).append(srcPort == null ? "0" : srcPort)
+            .append(SESSION_KEY_SEPERATOR).append(dstPort == null ? "0" : dstPort);
+    return sb.toString();
+  }
+
+  /**
+   * Gets the session key.
+   * 
+   * @param srcIp
+   *          the src ip
+   * @param dstIp
+   *          the dst ip
+   * @param protocol
+   *          the protocol
+   * @param srcPort
+   *          the src port
+   * @param dstPort
+   *          the dst port
+   * @param ipId
+   *          the ip id
+   * @param fragmentOffset
+   *          the fragment offset
+   * @return the session key
+   */
+  public static String getSessionKey(String srcIp, String dstIp, int protocol,
+      int srcPort, int dstPort, int ipId, int fragmentOffset) {
+    String keySeperator = "-";
+    StringBuffer sb = new StringBuffer(40);
+    sb.append(convertIpv4IpToHex(srcIp)).append(keySeperator)
+        .append(convertIpv4IpToHex(dstIp)).append(keySeperator)
+        .append(protocol).append(keySeperator).append(srcPort)
+        .append(keySeperator).append(dstPort).append(keySeperator).append(ipId)
+        .append(keySeperator).append(fragmentOffset);
+
+    return sb.toString();
+  }
+
+  /**
+   * Gets the short session key. (5-tuple only)
+   * 
+   * @param srcIp
+   *          the src ip
+   * @param dstIp
+   *          the dst ip
+   * @param protocol
+   *          the protocol
+   * @param srcPort
+   *          the src port
+   * @param dstPort
+   *          the dst port
+   * @return the session key
+   */
+  public static String getShortSessionKey(String srcIp, String dstIp, int protocol,
+      int srcPort, int dstPort) {
+    String keySeperator = "-";
+    StringBuffer sb = new StringBuffer(40);
+    sb.append(convertIpv4IpToHex(srcIp)).append(keySeperator)
+        .append(convertIpv4IpToHex(dstIp)).append(keySeperator)
+        .append(protocol).append(keySeperator).append(srcPort)
+        .append(keySeperator).append(dstPort);
+
+    return sb.toString();
+  }
+  
+  // public static String convertPortToHex(String portNumber) {
+  // return convertPortToHex(Integer.valueOf(portNumber));
+  //
+  // }
+  //
+  // public static String convertPortToHex(int portNumber) {
+  // return convertToHex(portNumber, 4);
+  //
+  // }
+  //
+  // public static String convertProtocolToHex(String protocol) {
+  // return convertProtocolToHex(Integer.valueOf(protocol));
+  //
+  // }
+  //
+  // public static String convertProtocolToHex(int protocol) {
+  // return convertToHex(protocol, 2);
+  // }
+
+  /**
+   * Convert ip segment to hex.
+   * 
+   * @param ipSegment
+   *          the ip segment
+   * @return the string
+   */
+  public static String convertIpSegmentToHex(String ipSegment) {
+    return convertIpSegmentToHex(Integer.valueOf(ipSegment));
+
+  }
+
+  /**
+   * Convert ip segment to hex.
+   * 
+   * @param ipSegment
+   *          the ip segment
+   * @return the string
+   */
+  public static String convertIpSegmentToHex(int ipSegment) {
+    return convertToHex(ipSegment, 2);
+
+  }
+
+  /**
+   * Convert to hex.
+   * 
+   * @param number
+   *          the number
+   * @param length
+   *          the length
+   * @return the string
+   */
+  public static String convertToHex(int number, int length) {
+    return StringUtils.leftPad(Integer.toHexString(number), length, '0');
+
+  }
+
+  /**
+   * Gets the protocol name.
+   * 
+   * @param protocolNumber
+   *          the protocol number
+   * 
+   * @return the protocol name
+   */
+  public static String getProtocolNameFromId(int protocolNumber) {
+    String protocolName = protocolIdToNameMap.get(protocolNumber);
+
+    if (protocolName == null) {
+      protocolName = String.valueOf(protocolNumber);
+    }
+    return protocolName;
+  }
+
+  /**
+   * Gets the protocol id from name.
+   * 
+   * @param protocolName
+   *          the protocol name
+   * @return the protocol id from name
+   */
+  public static int getProtocolIdFromName(String protocolName) {
+    Integer protocolNumber = protocolNameToIdMap
+        .get(protocolName.toUpperCase());
+
+    if (protocolNumber == null) {
+      protocolNumber = -1;
+    }
+    return protocolNumber;
+  }
+
+  /**
+   * Invert map.
+   * 
+   * @param <V>
+   *          the value type
+   * @param <K>
+   *          the key type
+   * @param map
+   *          the map
+   * @return the map
+   */
+  private static <V, K> Map<V, K> invertMap(Map<K, V> map) {
+
+    Map<V, K> inv = new HashMap<V, K>();
+
+    for (Entry<K, V> entry : map.entrySet())
+      inv.put(entry.getValue(), entry.getKey());
+
+    return inv;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/0117987e/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/writer/PcapWriter.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/writer/PcapWriter.java b/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/writer/PcapWriter.java
new file mode 100644
index 0000000..c07a217
--- /dev/null
+++ b/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/writer/PcapWriter.java
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.pcap.writer;
+
+import backtype.storm.tuple.Tuple;
+import org.apache.metron.hbase.writer.HBaseWriter;
+import org.apache.metron.pcap.utils.PcapUtils;
+import org.json.simple.JSONObject;
+
+import java.util.HashMap;
+import java.util.Map;
+
+public class PcapWriter extends HBaseWriter {
+
+  private String column;
+
+  public PcapWriter(String tableName, String column) {
+    super(tableName);
+    this.column = column;
+  }
+
+  @Override
+  public byte[] getKey(Tuple tuple, JSONObject message) {
+    String key = PcapUtils.getSessionKey(message);
+    return key.getBytes();
+  }
+
+  @Override
+  public long getTimestamp(Tuple tuple, JSONObject message) {
+    return (long) message.get("ts_micro");
+  }
+
+  @Override
+  public Map<String, byte[]> getValues(Tuple tuple, JSONObject message) {
+    Map<String, byte[]> values = new HashMap<>();
+    values.put(column, tuple.getBinary(0));
+    return values;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/0117987e/metron-platform/metron-pcap/src/main/java/storm/kafka/Callback.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-pcap/src/main/java/storm/kafka/Callback.java b/metron-platform/metron-pcap/src/main/java/storm/kafka/Callback.java
new file mode 100644
index 0000000..ff05c29
--- /dev/null
+++ b/metron-platform/metron-pcap/src/main/java/storm/kafka/Callback.java
@@ -0,0 +1,26 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package storm.kafka;
+
+import java.io.Serializable;
+import java.util.List;
+
+public interface Callback extends AutoCloseable, Serializable {
+  List<Object> apply(List<Object> tuple, EmitContext context);
+  void initialize(EmitContext context);
+}