You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@metron.apache.org by rm...@apache.org on 2016/04/26 16:45:59 UTC
[11/51] [partial] incubator-metron git commit: METRON-113 Project
Reorganization (merrimanr) closes apache/incubator-metron#88
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/0117987e/metron-platform/metron-pcap/pom.xml
----------------------------------------------------------------------
diff --git a/metron-platform/metron-pcap/pom.xml b/metron-platform/metron-pcap/pom.xml
new file mode 100644
index 0000000..7125382
--- /dev/null
+++ b/metron-platform/metron-pcap/pom.xml
@@ -0,0 +1,103 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software
+ Foundation (ASF) under one or more contributor license agreements. See the
+ NOTICE file distributed with this work for additional information regarding
+ copyright ownership. The ASF licenses this file to You under the Apache License,
+ Version 2.0 (the "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
+ Unless required by applicable law or agreed to in writing, software distributed
+ under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES
+ OR CONDITIONS OF ANY KIND, either express or implied. See the License for
+ the specific language governing permissions and limitations under the License.
+ -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <parent>
+ <groupId>org.apache.metron</groupId>
+ <artifactId>metron-platform</artifactId>
+ <version>0.1BETA</version>
+ </parent>
+ <artifactId>metron-pcap</artifactId>
+ <description>Metron Pcap</description>
+ <properties>
+ </properties>
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.metron</groupId>
+ <artifactId>metron-hbase</artifactId>
+ <version>${project.parent.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>com.google.guava</groupId>
+ <artifactId>guava</artifactId>
+ <version>${global_guava_version}</version>
+ <scope>compile</scope>
+ </dependency>
+ <dependency>
+ <groupId>com.googlecode.json-simple</groupId>
+ <artifactId>json-simple</artifactId>
+ <version>${global_json_simple_version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-common</artifactId>
+ <version>${global_hadoop_version}</version>
+ <exclusions>
+ <exclusion>
+ <artifactId>servlet-api</artifactId>
+ <groupId>javax.servlet</groupId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.storm</groupId>
+ <artifactId>storm-core</artifactId>
+ <version>${global_storm_version}</version>
+ <scope>provided</scope>
+ <exclusions>
+ <exclusion>
+ <artifactId>servlet-api</artifactId>
+ <groupId>javax.servlet</groupId>
+ </exclusion>
+ <exclusion>
+ <artifactId>log4j-over-slf4j</artifactId>
+ <groupId>org.slf4j</groupId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.kafka</groupId>
+ <artifactId>kafka_2.9.2</artifactId>
+ <version>${global_kafka_version}</version>
+ <scope>provided</scope>
+ <exclusions>
+ <exclusion>
+ <groupId>com.sun.jmx</groupId>
+ <artifactId>jmxri</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>com.sun.jdmk</groupId>
+ <artifactId>jmxtools</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>javax.jms</groupId>
+ <artifactId>jms</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.storm</groupId>
+ <artifactId>storm-kafka</artifactId>
+ <version>${global_storm_version}</version>
+ <exclusions>
+ <exclusion>
+ <artifactId>org.apache.curator</artifactId>
+ <groupId>curator-client</groupId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ </dependencies>
+</project>
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/0117987e/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/Constants.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/Constants.java b/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/Constants.java
new file mode 100644
index 0000000..99945cb
--- /dev/null
+++ b/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/Constants.java
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.pcap;
+
+
+/**
+* The Interface Constants.
+*
+* @author sheetal
+* @version $Revision: 1.0 $
+*/
+public interface Constants {
+
+/** The protocol tcp. */
+public static final int PROTOCOL_TCP = 6;
+
+/** The protocol udp. */
+public static final int PROTOCOL_UDP = 17;
+
+/** The document key separator. */
+public static final char DOCUMENT_KEY_SEPARATOR = '-';
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/0117987e/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/IEEE_802_1Q.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/IEEE_802_1Q.java b/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/IEEE_802_1Q.java
new file mode 100644
index 0000000..27fae51
--- /dev/null
+++ b/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/IEEE_802_1Q.java
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.pcap;
+
+public class IEEE_802_1Q {
+
+ int priorityCodePoint = 0;
+ int dropEligibleIndicator = 0;
+ int vLANIdentifier = 0;
+
+ public IEEE_802_1Q(int priorityCodePoint, int dropEligibleIndicator,
+ int vLANIdentifier) {
+ this.priorityCodePoint = priorityCodePoint;
+ this.dropEligibleIndicator = dropEligibleIndicator;
+ this.vLANIdentifier = vLANIdentifier;
+ }
+
+ public int getPriorityCodePoint() {
+ return priorityCodePoint;
+ }
+
+ public int getDropEligibleIndicator() {
+ return dropEligibleIndicator;
+ }
+
+ public int getvLANIdentifier() {
+ return vLANIdentifier;
+ }
+ }
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/0117987e/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/MetronEthernetDecoder.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/MetronEthernetDecoder.java b/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/MetronEthernetDecoder.java
new file mode 100644
index 0000000..1609887
--- /dev/null
+++ b/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/MetronEthernetDecoder.java
@@ -0,0 +1,134 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.pcap;
+
+import java.util.BitSet;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CopyOnWriteArraySet;
+
+import org.krakenapps.pcap.decoder.ethernet.EthernetDecoder;
+import org.krakenapps.pcap.decoder.ethernet.EthernetFrame;
+import org.krakenapps.pcap.decoder.ethernet.EthernetProcessor;
+import org.krakenapps.pcap.decoder.ethernet.MacAddress;
+import org.krakenapps.pcap.packet.PcapPacket;
+import org.krakenapps.pcap.util.Buffer;
+
+public class MetronEthernetDecoder extends EthernetDecoder {
+
+ private Set<EthernetProcessor> callbacks;
+ private Map<Integer, Set<EthernetProcessor>> typeCallbacks;
+
+ public MetronEthernetDecoder() {
+ callbacks = new CopyOnWriteArraySet<EthernetProcessor>();
+ typeCallbacks = new ConcurrentHashMap<Integer, Set<EthernetProcessor>>();
+ }
+
+ public void register(EthernetProcessor processor) {
+ this.callbacks.add(processor);
+ }
+
+ public void register(int type, EthernetProcessor processor) {
+ Set<EthernetProcessor> processors = typeCallbacks.get(type);
+ if (processors == null) {
+ processors = new HashSet<EthernetProcessor>();
+ typeCallbacks.put(type, processors);
+ }
+
+ processors.add(processor);
+ }
+
+ public void unregister(EthernetProcessor processor) {
+ this.callbacks.remove(processor);
+ }
+
+ public void unregister(int type, EthernetProcessor processor) {
+ Set<EthernetProcessor> processors = typeCallbacks.get(type);
+ if (processors == null)
+ return;
+
+ processors.remove(processor);
+ }
+
+ public void decode(PcapPacket packet) {
+ // do not reorder following codes (parse sequence)
+ MacAddress destination = getMacAddress(packet.getPacketData());
+ MacAddress source = getMacAddress(packet.getPacketData());
+ int type = getEtherType(packet.getPacketData());
+
+ if (type == 0x8100) {
+ // It is 802.1Q VLAN tag
+ IEEE_802_1Q iee802_1qTag = get802_1qTag(packet.getPacketData());
+ // Now get the type
+ type = getEtherType(packet.getPacketData());
+ }
+
+ Buffer buffer = packet.getPacketData();
+ buffer.discardReadBytes();
+
+ EthernetFrame frame = new EthernetFrame(source, destination, type, buffer);
+ frame.setPcapPacket(packet);
+ dispatch(frame);
+ }
+
+ private MacAddress getMacAddress(Buffer data) {
+ byte[] mac = new byte[6];
+ data.gets(mac, 0, 6);
+ return new MacAddress(mac);
+ }
+
+ private int getEtherType(Buffer data) {
+ return ((int) data.getShort()) & 0x0000FFFF;
+ }
+
+ private IEEE_802_1Q get802_1qTag(Buffer data) {
+
+ // reference http://en.wikipedia.org/wiki/EtherType &
+ // http://en.wikipedia.org/wiki/IEEE_802.1Q
+ byte[] b802_1qTag = new byte[2];
+ data.gets(b802_1qTag, 0, 2);
+ BitSet bits = BitSet.valueOf(b802_1qTag);
+ int pcp = convertBitToInt(bits.get(0, 3));
+ int dei = convertBitToInt(bits.get(3, 4));
+ int vid = convertBitToInt(bits.get(4, 16));
+
+ return new IEEE_802_1Q(pcp, dei, vid);
+ }
+
+ public static int convertBitToInt(BitSet bits) {
+ int value = 0;
+ for (int i = 0; i < bits.length(); ++i) {
+ value += bits.get(i) ? (1 << i) : 0;
+ }
+ return value;
+ }
+
+ private void dispatch(EthernetFrame frame) {
+ for (EthernetProcessor processor : callbacks)
+ processor.process(frame);
+
+ Set<EthernetProcessor> processors = typeCallbacks.get(frame.getType());
+ if (processors == null)
+ return;
+
+ for (EthernetProcessor processor : processors)
+ processor.process(frame.dup());
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/0117987e/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/PacketInfo.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/PacketInfo.java b/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/PacketInfo.java
new file mode 100644
index 0000000..fcaf1b0
--- /dev/null
+++ b/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/PacketInfo.java
@@ -0,0 +1,470 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.pcap;
+
+import java.text.MessageFormat;
+import org.apache.log4j.Logger;
+
+import org.krakenapps.pcap.decoder.ip.Ipv4Packet;
+import org.krakenapps.pcap.decoder.tcp.TcpPacket;
+import org.krakenapps.pcap.decoder.udp.UdpPacket;
+import org.krakenapps.pcap.file.GlobalHeader;
+import org.krakenapps.pcap.packet.PacketHeader;
+import org.krakenapps.pcap.packet.PcapPacket;
+
+import org.apache.metron.pcap.utils.PcapUtils;
+
+/**
+ * The Class PacketInfo.
+ *
+ * @author sheetal
+ * @version $Revision: 1.0 $
+ */
+public class PacketInfo {
+
+ /** The packetHeader. */
+ private PacketHeader packetHeader = null;
+
+ /** The packet. */
+ private PcapPacket packet = null;
+
+ /** The ipv4 packet. */
+ private Ipv4Packet ipv4Packet = null;
+
+ /** The tcp packet. */
+ private TcpPacket tcpPacket = null;
+
+ /** The udp packet. */
+ private UdpPacket udpPacket = null;
+
+ /** The global header. */
+ private GlobalHeader globalHeader = null;
+
+ /** The Constant globalHeaderJsonTemplateSB. */
+ private static final StringBuffer globalHeaderJsonTemplateSB = new StringBuffer();
+
+ /** The Constant ipv4HeaderJsonTemplateSB. */
+ private static final StringBuffer ipv4HeaderJsonTemplateSB = new StringBuffer();
+
+ /** The Constant tcpHeaderJsonTemplateSB. */
+ private static final StringBuffer tcpHeaderJsonTemplateSB = new StringBuffer();
+
+ /** The Constant udpHeaderJsonTemplateSB. */
+ private static final StringBuffer udpHeaderJsonTemplateSB = new StringBuffer();
+
+ /** The Constant LOG. */
+ private static final Logger LOG = Logger.getLogger(PacketInfo.class);
+
+ static {
+ globalHeaderJsonTemplateSB.append("<\"global_header\":<\"pcap_id\":\"").append("{0}").append('"');
+ globalHeaderJsonTemplateSB.append(",\"inc_len\":").append("{1}");
+ globalHeaderJsonTemplateSB.append(",\"orig_len\":").append("{2}");
+ globalHeaderJsonTemplateSB.append(",\"ts_sec\":").append("{3}");
+ globalHeaderJsonTemplateSB.append(",\"ts_usec\":").append("{4}");
+ globalHeaderJsonTemplateSB.append(">,"); // NOPMD by sheetal on 1/29/14 2:37
+ // PM
+
+ // ipv4 header
+
+ ipv4HeaderJsonTemplateSB.append("\"ipv4_header\":");
+
+ ipv4HeaderJsonTemplateSB.append("\"ip_dst\":").append("{0}");
+ ipv4HeaderJsonTemplateSB.append(",\"ip_dst_addr\":\"").append("{1}");
+ ipv4HeaderJsonTemplateSB.append("\",\"ip_flags\":").append("{2}");
+ ipv4HeaderJsonTemplateSB.append(",\"ip_fragment_offset\":").append("{3}");
+ ipv4HeaderJsonTemplateSB.append(",\"ip_header_checksum\":").append("{4}");
+ ipv4HeaderJsonTemplateSB.append(",\"ip_id\":").append("{5}");
+ ipv4HeaderJsonTemplateSB.append(",\"ip_header_length\":").append("{6}");
+ ipv4HeaderJsonTemplateSB.append(",\"ip_protocol\":").append("{7}");
+ ipv4HeaderJsonTemplateSB.append(",\"ip_src\":").append("{8}");
+ ipv4HeaderJsonTemplateSB.append(",\"ip_src_addr\":\"").append("{9}");
+ ipv4HeaderJsonTemplateSB.append("\",\"ip_tos\":").append("{10}");
+ ipv4HeaderJsonTemplateSB.append(",\"ip_total_length\":").append("{11}");
+ ipv4HeaderJsonTemplateSB.append(",\"ip_ttl\":").append("{12}");
+ ipv4HeaderJsonTemplateSB.append(",\"ip_version\":").append("{13}");
+ ipv4HeaderJsonTemplateSB.append('>');
+
+ // tcp header
+ tcpHeaderJsonTemplateSB.append(",\"tcp_header\":<\"ack\":").append("{0}");
+ tcpHeaderJsonTemplateSB.append(",\"checksum\":").append("{1}");
+ tcpHeaderJsonTemplateSB.append(",\"data_length\":").append("{2}");
+ tcpHeaderJsonTemplateSB.append(",\"data_offset\":").append("{3}");
+ tcpHeaderJsonTemplateSB.append(",\"dst_addr\":\"").append("{4}");
+ tcpHeaderJsonTemplateSB.append("\",\"dst_port\":").append("{5}");
+ tcpHeaderJsonTemplateSB.append(",\"direction\":").append("{6}");
+ tcpHeaderJsonTemplateSB.append(",\"flags\":").append("{7}");
+ tcpHeaderJsonTemplateSB.append(",\"reassembled_length \":").append("{8}");
+ tcpHeaderJsonTemplateSB.append(",\"relative_ack\":").append("{9}");
+ tcpHeaderJsonTemplateSB.append(",\"relative_seq\":").append("{10}");
+ tcpHeaderJsonTemplateSB.append(",\"seq\":").append("{11}");
+ tcpHeaderJsonTemplateSB.append(",\"session_key\":\"").append("{12}");
+ tcpHeaderJsonTemplateSB.append("\",\"src_addr\":\"").append("{13}");
+ tcpHeaderJsonTemplateSB.append("\",\"src_port\":").append("{14}");
+ tcpHeaderJsonTemplateSB.append(",\"total_length\":").append("{15}");
+ tcpHeaderJsonTemplateSB.append(",\"urgent_pointer\":").append("{16}");
+ tcpHeaderJsonTemplateSB.append(",\"window\":").append("{17}");
+ tcpHeaderJsonTemplateSB.append(">>");
+
+ // udp headers
+ udpHeaderJsonTemplateSB.append(",\"udp_header\":<\"checksum\":").append("{0}");
+ udpHeaderJsonTemplateSB.append(",\"dst_port\":").append("{1}");
+ udpHeaderJsonTemplateSB.append(",\"length\":").append("{2}");
+ udpHeaderJsonTemplateSB.append(",\"src_port\":").append("{3}");
+ udpHeaderJsonTemplateSB.append(",\"dst_addr\":\"").append("{4}");
+ udpHeaderJsonTemplateSB.append("\",\"src_addr\":\"").append("{5}").append('"');
+ tcpHeaderJsonTemplateSB.append(">>");
+
+ }
+
+ /** The Constant globalHeaderJsonTemplateString. */
+ private static final String globalHeaderJsonTemplateString = globalHeaderJsonTemplateSB.toString();
+
+ /** The Constant ipv4HeaderJsonTemplateString. */
+ private static final String ipv4HeaderJsonTemplateString = ipv4HeaderJsonTemplateSB.toString();
+
+ /** The Constant tcpHeaderJsonTemplateString. */
+ private static final String tcpHeaderJsonTemplateString = tcpHeaderJsonTemplateSB.toString();
+
+ /** The Constant udpHeaderJsonTemplateString. */
+ private static final String udpHeaderJsonTemplateString = udpHeaderJsonTemplateSB.toString();
+
+ /**
+ * Instantiates a new packet info.
+ *
+ * @param globalHeader
+ * the global header
+ * @param packetHeader
+ * the packet header
+ * @param packet
+ * the packet
+ * @param ipv4Packet
+ * the ipv4 packet
+ * @param tcpPacket
+ * the tcp packet
+ * @param udpPacket
+ * the udp packet
+ */
+ public PacketInfo(GlobalHeader globalHeader, PacketHeader packetHeader, PcapPacket packet, Ipv4Packet ipv4Packet, TcpPacket tcpPacket,
+ UdpPacket udpPacket) {
+ this.packetHeader = packetHeader;
+ this.packet = packet;
+ this.ipv4Packet = ipv4Packet;
+ this.tcpPacket = tcpPacket;
+ this.udpPacket = udpPacket;
+ this.globalHeader = globalHeader;
+ }
+
+ /**
+ * Gets the global header.
+ *
+ * @return the global header
+ */
+ public GlobalHeader getGlobalHeader() {
+ return globalHeader;
+ }
+
+ /**
+ * Gets the packet header.
+ *
+ *
+ * @return the packet header
+ */
+ public PacketHeader getPacketHeader() {
+ return packetHeader;
+ }
+
+ /**
+ * Gets the packet.
+ *
+ *
+ * @return the packet
+ */
+ public PcapPacket getPacket() {
+ return packet;
+ }
+
+ /**
+ * Gets the ipv4 packet.
+ *
+ *
+ * @return the ipv4 packet
+ */
+ public Ipv4Packet getIpv4Packet() {
+ return ipv4Packet;
+ }
+
+ /**
+ * Gets the tcp packet.
+ *
+ *
+ * @return the tcp packet
+ */
+ public TcpPacket getTcpPacket() {
+ return tcpPacket;
+ }
+
+ /**
+ * Gets the udp packet.
+ *
+ *
+ * @return the udp packet
+ */
+ public UdpPacket getUdpPacket() {
+ return udpPacket;
+ }
+
+ /**
+ * Gets the key.
+ *
+ *
+ * @return the key
+ */
+ public String getKey() {
+ int sourcePort = 0;
+ int destinationPort = 0;
+ if (Constants.PROTOCOL_UDP == ipv4Packet.getProtocol()) {
+ sourcePort = udpPacket.getSourcePort();
+
+ destinationPort = udpPacket.getDestinationPort();
+
+ } else if (Constants.PROTOCOL_TCP == ipv4Packet.getProtocol()) {
+ sourcePort = tcpPacket.getSourcePort();
+
+ destinationPort = tcpPacket.getDestinationPort();
+
+ }
+
+ return PcapUtils.getSessionKey(ipv4Packet.getSourceAddress().getHostAddress(), ipv4Packet.getDestinationAddress().getHostAddress(),
+ ipv4Packet.getProtocol(), sourcePort, destinationPort, ipv4Packet.getId(), ipv4Packet.getFragmentOffset());
+
+ }
+
+ /**
+ * Gets the short key
+ *
+ *
+ * @return the short key
+ */
+ public String getShortKey() {
+ int sourcePort = 0;
+ int destinationPort = 0;
+ if(Constants.PROTOCOL_UDP == ipv4Packet.getProtocol()) {
+ sourcePort = udpPacket.getSourcePort();
+ destinationPort = udpPacket.getDestinationPort();
+ } else if (Constants.PROTOCOL_TCP == ipv4Packet.getProtocol()) {
+ sourcePort = tcpPacket.getSourcePort();
+ destinationPort = tcpPacket.getDestinationPort();
+ }
+
+ return PcapUtils.getShortSessionKey(ipv4Packet.getSourceAddress().getHostAddress(), ipv4Packet.getDestinationAddress().getHostAddress(),
+ ipv4Packet.getProtocol(), sourcePort, destinationPort);
+
+ }
+
+ /**
+ * Gets the json doc.
+ *
+ *
+ * @return the json doc
+ */
+ public String getJsonDoc() {
+
+ return getJsonDocUsingSBAppend();
+ }
+
+ /**
+ * Gets the json doc.
+ *
+ *
+ * @return the json doc
+ */
+ public String getJsonIndexDoc() {
+
+ return getJsonIndexDocUsingSBAppend();
+ }
+
+ /**
+ * Gets the json doc using sb append.
+ *
+ * @return the json doc using sb append
+ */
+ private String getJsonDocUsingSBAppend() {
+
+
+ StringBuffer jsonSb = new StringBuffer(1024);
+
+ // global header
+ jsonSb.append("{\"global_header\":{\"pcap_id\":\"").append(getKey());
+ jsonSb.append("\",\"inc_len\":").append(packetHeader.getInclLen());
+ jsonSb.append(",\"orig_len\":").append(packetHeader.getOrigLen());
+ jsonSb.append(",\"ts_sec\":").append(packetHeader.getTsSec());
+ jsonSb.append(",\"ts_usec\":").append(packetHeader.getTsUsec());
+ jsonSb.append("},"); // NOPMD by sheetal on 1/29/14 2:37 PM
+
+ // ipv4 header
+
+ jsonSb.append("\"ipv4_header\":{");
+
+ jsonSb.append("\"ip_dst\":").append(ipv4Packet.getDestination());
+ jsonSb.append(",\"ip_dst_addr\":\"").append(ipv4Packet.getDestinationAddress().getHostAddress());
+ jsonSb.append("\",\"ip_flags\":").append(ipv4Packet.getFlags());
+ jsonSb.append(",\"ip_fragment_offset\":").append(ipv4Packet.getFragmentOffset());
+ jsonSb.append(",\"ip_header_checksum\":").append(ipv4Packet.getHeaderChecksum());
+ jsonSb.append(",\"ip_id\":").append(ipv4Packet.getId());
+ jsonSb.append(",\"ip_header_length\":").append(ipv4Packet.getIhl());
+ jsonSb.append(",\"ip_protocol\":").append(ipv4Packet.getProtocol());
+ jsonSb.append(",\"ip_src\":").append(ipv4Packet.getSource());
+ jsonSb.append(",\"ip_src_addr\":\"").append(ipv4Packet.getSourceAddress().getHostAddress());
+ jsonSb.append("\",\"ip_tos\":").append(ipv4Packet.getTos());
+ jsonSb.append(",\"ip_total_length\":").append(ipv4Packet.getTotalLength());
+ jsonSb.append(",\"ip_ttl\":").append(ipv4Packet.getTtl());
+ jsonSb.append(",\"ip_version\":").append(ipv4Packet.getVersion());
+ jsonSb.append('}');
+
+ // tcp header
+ if (tcpPacket != null) {
+ jsonSb.append(",\"tcp_header\":{\"ack\":").append(tcpPacket.getAck());
+ jsonSb.append(",\"checksum\":").append(tcpPacket.getChecksum());
+ jsonSb.append(",\"data_length\":").append(tcpPacket.getDataLength());
+ jsonSb.append(",\"data_offset\":").append(tcpPacket.getDataOffset());
+ jsonSb.append(",\"dst_addr\":\"").append(tcpPacket.getDestinationAddress().getHostAddress());
+ jsonSb.append("\",\"dst_port\":").append(tcpPacket.getDestinationPort());
+ jsonSb.append(",\"direction\":").append(tcpPacket.getDirection());
+ jsonSb.append(",\"flags\":").append(tcpPacket.getFlags());
+ jsonSb.append(",\"reassembled_length \":").append(tcpPacket.getReassembledLength());
+ jsonSb.append(",\"relative_ack\":").append(tcpPacket.getRelativeAck());
+ jsonSb.append(",\"relative_seq\":").append(tcpPacket.getRelativeSeq());
+ jsonSb.append(",\"seq\":").append(tcpPacket.getSeq());
+ jsonSb.append(",\"session_key\":\"").append(tcpPacket.getSessionKey());
+ jsonSb.append("\",\"src_addr\":\"").append(tcpPacket.getSourceAddress().getHostAddress());
+ jsonSb.append("\",\"src_port\":").append(tcpPacket.getSourcePort());
+ jsonSb.append(",\"total_length\":").append(tcpPacket.getTotalLength());
+ jsonSb.append(",\"urgent_pointer\":").append(tcpPacket.getUrgentPointer());
+ jsonSb.append(",\"window\":").append(tcpPacket.getWindow());
+ jsonSb.append('}');
+ }
+
+ // udp headers
+ if (udpPacket != null) {
+ jsonSb.append(",\"udp_header\":{\"checksum\":").append(udpPacket.getChecksum());
+ jsonSb.append(",\"dst_port\":").append(udpPacket.getDestinationPort());
+ jsonSb.append(",\"length\":").append(udpPacket.getLength());
+ jsonSb.append(",\"src_port\":").append(udpPacket.getSourcePort());
+ jsonSb.append(",\"dst_addr\":\"").append(udpPacket.getDestination().getAddress().getHostAddress());
+ jsonSb.append("\",\"src_addr\":\"").append(udpPacket.getSource().getAddress().getHostAddress());
+ jsonSb.append("\"}");
+ }
+
+ jsonSb.append('}');
+
+ return jsonSb.toString();
+ }
+
+ /**
+ * Gets the json doc using message format.
+ *
+ * @return the json doc using message format
+ */
+ private String getJsonDocUsingMessageFormat() {
+
+ StringBuffer jsonSb = new StringBuffer(600);
+
+ jsonSb.append(MessageFormat.format(globalHeaderJsonTemplateString, getKey(), packetHeader.getInclLen(), packetHeader.getOrigLen(),
+ packetHeader.getTsSec(), packetHeader.getTsUsec()));
+
+ jsonSb.append(MessageFormat.format(ipv4HeaderJsonTemplateString, ipv4Packet.getDestination(), ipv4Packet.getDestinationAddress()
+ .getHostAddress(), ipv4Packet.getFlags(), ipv4Packet.getFragmentOffset(), ipv4Packet.getHeaderChecksum(), ipv4Packet.getId(),
+ ipv4Packet.getIhl(), ipv4Packet.getProtocol(), ipv4Packet.getSource(), ipv4Packet.getSourceAddress().getHostAddress(), ipv4Packet
+ .getTos(), ipv4Packet.getTotalLength(), ipv4Packet.getTtl(), ipv4Packet.getVersion()));
+
+ // tcp header
+ if (tcpPacket != null) {
+ jsonSb.append(MessageFormat.format(tcpHeaderJsonTemplateString, tcpPacket.getAck(), tcpPacket.getChecksum(), tcpPacket
+ .getDataLength(), tcpPacket.getDataOffset(), tcpPacket.getDestinationAddress().getHostAddress(), tcpPacket.getDestinationPort(),
+ tcpPacket.getDirection(), tcpPacket.getFlags(), tcpPacket.getReassembledLength(), tcpPacket.getRelativeAck(), tcpPacket
+ .getRelativeSeq(), tcpPacket.getSeq(), tcpPacket.getSessionKey(), tcpPacket.getSourceAddress().getHostAddress(), tcpPacket
+ .getSourcePort(), tcpPacket.getTotalLength(), tcpPacket.getUrgentPointer(), tcpPacket.getWindow()));
+ } else
+ // udp headers
+ if (udpPacket != null) {
+ jsonSb.append(MessageFormat.format(udpHeaderJsonTemplateString, udpPacket.getChecksum(), udpPacket.getDestinationPort(),
+ udpPacket.getLength(), udpPacket.getSourcePort(), udpPacket.getDestination().getAddress().getHostAddress(), udpPacket.getSource()
+ .getAddress().getHostAddress()));
+
+ } else {
+ jsonSb.append('}');
+ }
+ return jsonSb.toString().replace('<', '{').replace('>', '}');
+ }
+
+ /**
+ * Gets the json index doc using sb append.
+ *
+ * @return the json index doc using sb append
+ */
+ private String getJsonIndexDocUsingSBAppend() {
+
+ Long ts_micro = getPacketTimeInNanos() / 1000L;
+ StringBuffer jsonSb = new StringBuffer(175);
+
+ jsonSb.append("{\"pcap_id\":\"").append(getShortKey());
+ jsonSb.append("\",\"ip_protocol\":").append(ipv4Packet.getProtocol());
+ jsonSb.append(",\"ip_id\":").append(ipv4Packet.getId());
+ jsonSb.append(",\"frag_offset\":").append(ipv4Packet.getFragmentOffset());
+ jsonSb.append(",\"ts_micro\":").append(ts_micro);
+
+
+ // tcp header
+ if (tcpPacket != null) {
+ jsonSb.append(",\"ip_src_addr\":\"").append(tcpPacket.getSourceAddress().getHostAddress());
+ jsonSb.append("\",\"ip_src_port\":").append(tcpPacket.getSourcePort());
+ jsonSb.append(",\"ip_dst_addr\":\"").append(tcpPacket.getDestinationAddress().getHostAddress());
+ jsonSb.append("\",\"ip_dst_port\":").append(tcpPacket.getDestinationPort());
+ }
+
+ // udp headers
+ if (udpPacket != null) {
+ jsonSb.append(",\"ip_src_addr\":\"").append(udpPacket.getSource().getAddress().getHostAddress());
+ jsonSb.append("\",\"ip_src_port\":").append(udpPacket.getSourcePort());
+ jsonSb.append(",\"ip_dst_addr\":\"").append(udpPacket.getDestination().getAddress().getHostAddress());
+ jsonSb.append("\",\"ip_dst_port\":").append(udpPacket.getDestinationPort());
+ }
+
+ jsonSb.append('}');
+
+ return jsonSb.toString();
+ }
+
+ public long getPacketTimeInNanos()
+ {
+ if ( getGlobalHeader().getMagicNumber() == 0xa1b2c3d4 || getGlobalHeader().getMagicNumber() == 0xd4c3b2a1 )
+ {
+ //Time is in micro assemble as nano
+ LOG.info("Times are in micro according to the magic number");
+ return getPacketHeader().getTsSec() * 1000000000L + getPacketHeader().getTsUsec() * 1000L ;
+ }
+ else if ( getGlobalHeader().getMagicNumber() == 0xa1b23c4d || getGlobalHeader().getMagicNumber() == 0x4d3cb2a1 ) {
+ //Time is in nano assemble as nano
+ LOG.info("Times are in nano according to the magic number");
+ return getPacketHeader().getTsSec() * 1000000000L + getPacketHeader().getTsUsec() ;
+ }
+ //Default assume time is in micro assemble as nano
+ LOG.warn("Unknown magic number. Defaulting to micro");
+ return getPacketHeader().getTsSec() * 1000000000L + getPacketHeader().getTsUsec() * 1000L ;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/0117987e/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/PcapByteInputStream.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/PcapByteInputStream.java b/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/PcapByteInputStream.java
new file mode 100644
index 0000000..e2d56c8
--- /dev/null
+++ b/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/PcapByteInputStream.java
@@ -0,0 +1,185 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.pcap;
+
+import java.io.ByteArrayInputStream;
+import java.io.DataInputStream;
+import java.io.IOException;
+
+import org.krakenapps.pcap.PcapInputStream;
+import org.krakenapps.pcap.file.GlobalHeader;
+import org.krakenapps.pcap.packet.PacketHeader;
+import org.krakenapps.pcap.packet.PcapPacket;
+import org.krakenapps.pcap.util.Buffer;
+import org.krakenapps.pcap.util.ByteOrderConverter;
+import org.krakenapps.pcap.util.ChainBuffer;
+
+/**
+ * The Class PcapByteInputStream.
+ *
+ * @author sheetal
+ * @version $Revision: 1.0 $
+ */
+public class PcapByteInputStream implements PcapInputStream {
+
+ /** The is. */
+ private DataInputStream is;
+
+ /** The global header. */
+ private GlobalHeader globalHeader;
+
+ /**
+ * Opens pcap file input stream.
+ *
+ * @param pcap
+ * the byte array to be read
+ *
+ * @throws IOException
+ * Signals that an I/O exception has occurred.
+ */
+ public PcapByteInputStream(byte[] pcap) throws IOException {
+ is = new DataInputStream(new ByteArrayInputStream(pcap)); // $codepro.audit.disable
+ // closeWhereCreated
+ readGlobalHeader();
+ }
+
+ /**
+ * Reads a packet from pcap byte array.
+ *
+ * @return the packet throws IOException the stream has been closed and the
+ * contained input stream does not support reading after close, or
+ * another I/O error occurs. * @throws IOException Signals that an I/O
+ * exception has occurred. * @see
+ * org.krakenapps.pcap.PcapInputStream#getPacket()
+ * @throws IOException
+ * Signals that an I/O exception has occurred.
+ */
+
+ public PcapPacket getPacket() throws IOException {
+ return readPacket(globalHeader.getMagicNumber());
+ }
+
+ /**
+ * Gets the global header.
+ *
+ *
+ * @return the global header
+ */
+ public GlobalHeader getGlobalHeader() {
+ return globalHeader;
+ }
+
+ /**
+ * Read global header.
+ *
+ *
+ * @throws IOException
+ * Signals that an I/O exception has occurred.
+ */
+ private void readGlobalHeader() throws IOException {
+ int magic = is.readInt();
+ short major = is.readShort();
+ short minor = is.readShort();
+ int tz = is.readInt();
+ int sigfigs = is.readInt();
+ int snaplen = is.readInt();
+ int network = is.readInt();
+
+ globalHeader = new GlobalHeader(magic, major, minor, tz, sigfigs, snaplen,
+ network);
+
+ if (globalHeader.getMagicNumber() == 0xD4C3B2A1) {
+ globalHeader.swapByteOrder();
+ }
+ }
+
+ /**
+ * Read packet.
+ *
+ * @param magicNumber
+ * the magic number
+ * @return the pcap packet * @throws IOException Signals that an I/O exception
+ * has occurred. * @throws EOFException the EOF exception
+ * @throws IOException
+ * Signals that an I/O exception has occurred.
+ */
+ private PcapPacket readPacket(int magicNumber) throws IOException {
+ PacketHeader packetHeader = readPacketHeader(magicNumber);
+ Buffer packetData = readPacketData(packetHeader.getInclLen());
+ return new PcapPacket(packetHeader, packetData);
+ }
+
+ /**
+ * Read packet header.
+ *
+ * @param magicNumber
+ * the magic number
+ * @return the packet header * @throws IOException Signals that an I/O
+ * exception has occurred. * @throws EOFException the EOF exception
+ * @throws IOException
+ * Signals that an I/O exception has occurred.
+ */
+ private PacketHeader readPacketHeader(int magicNumber) throws IOException {
+ int tsSec = is.readInt();
+ int tsUsec = is.readInt();
+ int inclLen = is.readInt();
+ int origLen = is.readInt();
+
+ if (magicNumber == 0xD4C3B2A1) {
+ tsSec = ByteOrderConverter.swap(tsSec);
+ tsUsec = ByteOrderConverter.swap(tsUsec);
+ inclLen = ByteOrderConverter.swap(inclLen);
+ origLen = ByteOrderConverter.swap(origLen);
+ }
+
+ return new PacketHeader(tsSec, tsUsec, inclLen, origLen);
+ }
+
+ /**
+ * Read packet data.
+ *
+ * @param packetLength
+ * the packet length
+ * @return the buffer * @throws IOException Signals that an I/O exception has
+ * occurred.
+ * @throws IOException
+ * Signals that an I/O exception has occurred.
+ */
+ private Buffer readPacketData(int packetLength) throws IOException {
+ byte[] packets = new byte[packetLength];
+ is.read(packets);
+
+ Buffer payload = new ChainBuffer();
+ payload.addLast(packets);
+ return payload;
+ // return new PacketPayload(packets);
+ }
+
+ /**
+ * Closes pcap stream handle.
+ *
+ *
+ * @throws IOException
+ * Signals that an I/O exception has occurred. * @see
+ * org.krakenapps.pcap.PcapInputStream#close()
+ */
+
+ public void close() throws IOException {
+ is.close(); // $codepro.audit.disable closeInFinally
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/0117987e/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/PcapByteOutputStream.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/PcapByteOutputStream.java b/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/PcapByteOutputStream.java
new file mode 100644
index 0000000..06d6af6
--- /dev/null
+++ b/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/PcapByteOutputStream.java
@@ -0,0 +1,305 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+// $codepro.audit.disable explicitThisUsage, lossOfPrecisionInCast
+package org.apache.metron.pcap;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.nio.BufferUnderflowException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.log4j.Logger;
+import org.krakenapps.pcap.PcapOutputStream;
+import org.krakenapps.pcap.file.GlobalHeader;
+import org.krakenapps.pcap.packet.PacketHeader;
+import org.krakenapps.pcap.packet.PcapPacket;
+import org.krakenapps.pcap.util.Buffer;
+
+// TODO: Auto-generated Javadoc
+/**
+ * The Class PcapByteOutputStream.
+ *
+ * @author sheetal
+ * @version $Revision: 1.0 $
+ */
+public class PcapByteOutputStream implements PcapOutputStream {
+
+ /** The Constant LOG. */
+ private static final Logger LOG = Logger
+ .getLogger(PcapByteOutputStream.class);
+
+ /** The Constant MAX_CACHED_PACKET_NUMBER. */
+ private static final int MAX_CACHED_PACKET_NUMBER = 1000;
+
+ /** The cached packet num. */
+ private int cachedPacketNum = 0; // NOPMD by sheetal on 1/29/14 2:34 PM
+
+ /** The baos. */
+ private ByteArrayOutputStream baos; // NOPMD by sheetal on 1/29/14 2:34 PM
+
+ /** The list. */
+ private List<Byte> list; // NOPMD by sheetal on 1/29/14 2:34 PM
+
+ /**
+ * Instantiates a new pcap byte output stream.
+ *
+ * @param baos
+ * the baos
+ */
+ public PcapByteOutputStream(ByteArrayOutputStream baos) {
+ this.baos = baos;
+ list = new ArrayList<Byte>();
+ createGlobalHeader();
+ }
+
+ /**
+ * Instantiates a new pcap byte output stream.
+ *
+ * @param baos
+ * the baos
+ * @param header
+ * the header
+ */
+ public PcapByteOutputStream(ByteArrayOutputStream baos, GlobalHeader header) {
+ this.baos = baos;
+ list = new ArrayList<Byte>();
+ copyGlobalHeader(header);
+ }
+
+ /**
+ * Creates the global header.
+ */
+ private void createGlobalHeader() {
+ /* magic number(swapped) */
+ list.add((byte) 0xd4);
+ list.add((byte) 0xc3);
+ list.add((byte) 0xb2);
+ list.add((byte) 0xa1);
+
+ /* major version number */
+ list.add((byte) 0x02);
+ list.add((byte) 0x00);
+
+ /* minor version number */
+ list.add((byte) 0x04);
+ list.add((byte) 0x00);
+
+ /* GMT to local correction */
+ list.add((byte) 0x00);
+ list.add((byte) 0x00);
+ list.add((byte) 0x00);
+ list.add((byte) 0x00);
+
+ /* accuracy of timestamps */
+ list.add((byte) 0x00);
+ list.add((byte) 0x00);
+ list.add((byte) 0x00);
+ list.add((byte) 0x00);
+
+ /* max length of captured packets, in octets */
+ list.add((byte) 0xff);
+ list.add((byte) 0xff);
+ list.add((byte) 0x00);
+ list.add((byte) 0x00);
+
+ /* data link type(ethernet) */
+ list.add((byte) 0x01);
+ list.add((byte) 0x00);
+ list.add((byte) 0x00);
+ list.add((byte) 0x00);
+ }
+
+ /**
+ * Copy global header.
+ *
+ * @param header
+ * the header
+ */
+ private void copyGlobalHeader(GlobalHeader header) {
+ final byte[] magicNumber = intToByteArray(header.getMagicNumber());
+ final byte[] majorVersion = shortToByteArray(header.getMajorVersion());
+ final byte[] minorVersion = shortToByteArray(header.getMinorVersion());
+ final byte[] zone = intToByteArray(header.getThiszone());
+ final byte[] sigFigs = intToByteArray(header.getSigfigs());
+ final byte[] snapLen = intToByteArray(header.getSnaplen());
+ final byte[] network = intToByteArray(header.getNetwork());
+
+ list.add(magicNumber[0]);
+ list.add(magicNumber[1]);
+ list.add(magicNumber[2]);
+ list.add(magicNumber[3]);
+
+ list.add(majorVersion[1]);
+ list.add(majorVersion[0]);
+
+ list.add(minorVersion[1]);
+ list.add(minorVersion[0]);
+
+ list.add(zone[3]);
+ list.add(zone[2]);
+ list.add(zone[1]);
+ list.add(zone[0]);
+
+ list.add(sigFigs[3]);
+ list.add(sigFigs[2]);
+ list.add(sigFigs[1]);
+ list.add(sigFigs[0]);
+
+ list.add(snapLen[3]);
+ list.add(snapLen[2]);
+ list.add(snapLen[1]);
+ list.add(snapLen[0]);
+
+ list.add(network[3]);
+ list.add(network[2]);
+ list.add(network[1]);
+ list.add(network[0]);
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.krakenapps.pcap.PcapOutputStream#write(org.krakenapps.pcap.packet
+ * .PcapPacket)
+ */
+ /**
+ * Method write.
+ *
+ * @param packet
+ * PcapPacket
+ *
+ *
+ * @throws IOException
+ * * @see org.krakenapps.pcap.PcapOutputStream#write(PcapPacket) * @see
+ * org.krakenapps.pcap.PcapOutputStream#write(PcapPacket)
+ */
+
+ public void write(PcapPacket packet) throws IOException {
+ PacketHeader packetHeader = packet.getPacketHeader();
+
+ int tsSec = packetHeader.getTsSec();
+ int tsUsec = packetHeader.getTsUsec();
+ int inclLen = packetHeader.getInclLen();
+ int origLen = packetHeader.getOrigLen();
+
+ addInt(tsSec);
+ addInt(tsUsec);
+ addInt(inclLen);
+ addInt(origLen);
+
+ Buffer payload = packet.getPacketData();
+
+ try {
+ payload.mark();
+ while (true) {
+ list.add(payload.get());
+ }
+ } catch (BufferUnderflowException e) {
+ //LOG.debug("Ignorable exception while writing packet", e);
+ payload.reset();
+ }
+
+ cachedPacketNum++;
+ if (cachedPacketNum == MAX_CACHED_PACKET_NUMBER) {
+ flush();
+ }
+ }
+
+ /**
+ * Adds the int.
+ *
+ * @param number
+ * the number
+ */
+ private void addInt(int number) {
+ list.add((byte) (number & 0xff));
+ list.add((byte) ((number & 0xff00) >> 8));
+ list.add((byte) ((number & 0xff0000) >> 16));
+ list.add((byte) ((number & 0xff000000) >> 24));
+ }
+
+ /**
+ * Int to byte array.
+ *
+ * @param number
+ * the number
+ *
+ * @return the byte[]
+ */
+ private byte[] intToByteArray(int number) {
+ return new byte[] { (byte) (number >>> 24), (byte) (number >>> 16),
+ (byte) (number >>> 8), (byte) number };
+ }
+
+ /**
+ * Short to byte array.
+ *
+ * @param number
+ * the number
+ *
+ * @return the byte[]
+ */
+ private byte[] shortToByteArray(short number) {
+ return new byte[] { (byte) (number >>> 8), (byte) number };
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.krakenapps.pcap.PcapOutputStream#flush()
+ */
+ /**
+ * Method flush.
+ *
+ *
+ * @throws IOException
+ * * @see org.krakenapps.pcap.PcapOutputStream#flush() * @see
+ * org.krakenapps.pcap.PcapOutputStream#flush()
+ */
+
+ public void flush() throws IOException {
+ byte[] fileBinary = new byte[list.size()];
+ for (int i = 0; i < fileBinary.length; i++) {
+ fileBinary[i] = list.get(i);
+ }
+
+ list.clear();
+ baos.write(fileBinary);
+ cachedPacketNum = 0;
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.krakenapps.pcap.PcapOutputStream#close()
+ */
+ /**
+ * Method close.
+ *
+ *
+ * @throws IOException
+ * * @see org.krakenapps.pcap.PcapOutputStream#close() * @see
+ * org.krakenapps.pcap.PcapOutputStream#close()
+ */
+
+ public void close() throws IOException {
+ flush();
+ baos.close(); // $codepro.audit.disable closeInFinally
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/0117987e/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/PcapMerger.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/PcapMerger.java b/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/PcapMerger.java
new file mode 100644
index 0000000..48d25c7
--- /dev/null
+++ b/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/PcapMerger.java
@@ -0,0 +1,262 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.pcap;
+
+import java.io.ByteArrayOutputStream;
+import java.io.EOFException;
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.log4j.Logger;
+
+import org.krakenapps.pcap.packet.PcapPacket;
+import org.krakenapps.pcap.file.GlobalHeader;
+
+// TODO: Auto-generated Javadoc
+/**
+ * The Class PcapMerger.
+ *
+ * @author sheetal
+ * @version $Revision: 1.0 $
+ */
+public final class PcapMerger {
+
+ /** The Constant LOG. */
+ private static final Logger LOG = Logger.getLogger(PcapMerger.class);
+
+ /** The comparator for PcapPackets */
+ private static PcapPacketComparator PCAP_PACKET_COMPARATOR = new PcapPacketComparator();
+
+ /**
+ * Instantiates a new pcap merger.
+ */
+ private PcapMerger() { // $codepro.audit.disable emptyMethod
+ }
+
+ /**
+ * Merge two pcap byte arrays.
+ *
+ * @param baos
+ * the baos
+ * @param pcaps
+ * the pcaps
+ *
+ * @throws IOException
+ * if there is no byte array, no access permission, or other io
+ * related problems.
+ */
+ // public static void merge(byte[] to, byte[] from) throws IOException {
+ // PcapByteInputStream is = null;
+ // PcapByteOutputStream os = null;
+ // ByteArrayOutputStream baos = null;
+ // try {
+ // is = new PcapByteInputStream(from);
+ // baos = new ByteArrayOutputStream();
+ // os = new PcapByteOutputStream(baos, is.getGlobalHeader());
+ //
+ // writePacket(is, os);
+ // } finally {
+ // closeInput(is);
+ // if (baos != null) {
+ // baos.close();
+ // }
+ // closeOutput(os);
+ // }
+ // }
+
+ public static void merge(ByteArrayOutputStream baos, List<byte[]> pcaps)
+ throws IOException {
+ PcapByteInputStream is = null;
+ PcapByteOutputStream os = null;
+ ByteArrayOutputStream unsortedBaos = new ByteArrayOutputStream();
+
+ try {
+ int i = 1;
+ for (byte[] pcap : pcaps) {
+ is = new PcapByteInputStream(pcap);
+ if (i == 1) {
+ os = new PcapByteOutputStream(unsortedBaos, is.getGlobalHeader());
+ }
+
+ writePacket(is, os);
+ i++;
+ closeInput(is);
+ }
+ } finally {
+ if (unsortedBaos != null) {
+ unsortedBaos.close();
+ }
+ closeOutput(os);
+ sort(baos, unsortedBaos.toByteArray());
+ }
+ }
+
+ /**
+ * Merge byte array1 with byte array2, and write to output byte array. It
+ * doesn't hurt original pcap dump byte arrays.
+ *
+ * @param baos
+ * the baos
+ * @param pcaps
+ * the pcaps
+ *
+ * @throws IOException
+ * if there are no source byte arrays, have no read and/or write
+ * permissions, or anything else.
+ */
+ public static void merge(ByteArrayOutputStream baos, byte[]... pcaps) // $codepro.audit.disable
+ // overloadedMethods
+ throws IOException {
+ merge(baos, Arrays.asList(pcaps));
+
+ }
+
+ /**
+ * Sort the potentially unsorted byte array according to the timestamp
+ * in the packet header
+ *
+ * @param unsortedBytes
+ * a byte array of a pcap file
+ *
+ * @return byte array of a pcap file with packets in cronological order
+ *
+ * @throws IOException
+ * if there are no source byte arrays, have no read and or write
+ * permission, or anything else.
+ */
+ private static void sort(ByteArrayOutputStream baos, byte[] unsortedBytes) throws IOException {
+ PcapByteInputStream pcapIs = new PcapByteInputStream(unsortedBytes);
+ PcapByteOutputStream pcapOs = new PcapByteOutputStream(baos, pcapIs.getGlobalHeader());
+ PcapPacket packet;
+ ArrayList<PcapPacket> packetList = new ArrayList<PcapPacket>();
+
+ try {
+ while (true) {
+ packet = pcapIs.getPacket();
+ if (packet == null)
+ break;
+ packetList.add(packet);
+ LOG.debug("Presort packet: " + packet.getPacketHeader().toString());
+ }
+ } catch (EOFException e) {
+ //LOG.debug("Ignoreable exception in sort", e);
+ }
+
+ Collections.sort(packetList, PCAP_PACKET_COMPARATOR);
+ for (PcapPacket p : packetList) {
+ pcapOs.write(p);
+ LOG.debug("Postsort packet: " + p.getPacketHeader().toString());
+ }
+ pcapOs.close();
+ }
+
+ /**
+ * Write packet.
+ *
+ * @param is
+ * the is
+ * @param os
+ * the os
+ *
+ * @throws IOException
+ * Signals that an I/O exception has occurred.
+ */
+ private static void writePacket(PcapByteInputStream is,
+ PcapByteOutputStream os) throws IOException {
+ PcapPacket packet = null;
+ try {
+ while (true) {
+ packet = is.getPacket();
+ if (packet == null) {
+ break;
+ }
+ os.write(packet);
+ }
+ } catch (EOFException e) {
+ //LOG.debug("Ignorable exception in writePacket", e);
+ }
+
+ }
+
+ /**
+ * Close input.
+ *
+ * @param is
+ * the is
+ */
+ private static void closeInput(PcapByteInputStream is) {
+ if (is == null) {
+ return;
+ }
+ try {
+ is.close(); // $codepro.audit.disable closeInFinally
+ } catch (IOException e) {
+ LOG.error("Failed to close input stream", e);
+ }
+ }
+
+ /**
+ * Close output.
+ *
+ * @param os
+ * the os
+ */
+ private static void closeOutput(PcapByteOutputStream os) {
+ if (os == null) {
+ return;
+ }
+ try {
+ os.close();
+ } catch (IOException e) {
+ LOG.error("Failed to close output stream", e);
+
+ }
+ }
+
+ /**
+ * The main method.
+ *
+ * @param args
+ * the arguments
+ *
+ * @throws IOException
+ * Signals that an I/O exception has occurred.
+ */
+ public static void main(String[] args) throws IOException {
+ byte[] b1 = FileUtils.readFileToByteArray(new File(
+ "/Users/sheetal/Downloads/constructedTcpDump.1.pcap"));
+ byte[] b2 = FileUtils.readFileToByteArray(new File(
+ "/Users/sheetal/Downloads/constructedTcpDump.2.pcap"));
+ byte[] b3 = FileUtils.readFileToByteArray(new File(
+ "/Users/sheetal/Downloads/constructedTcpDump.3.pcap"));
+
+ ByteArrayOutputStream boas = new ByteArrayOutputStream(); // $codepro.audit.disable
+ // closeWhereCreated
+ PcapMerger.merge(boas, b1, b2, b3);
+
+ FileUtils.writeByteArrayToFile(new File(
+ "/Users/sheetal/Downloads/constructedTcpDump.automerged.1.2.pcap"),
+ boas.toByteArray(), false);
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/0117987e/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/PcapPacketComparator.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/PcapPacketComparator.java b/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/PcapPacketComparator.java
new file mode 100644
index 0000000..96f64a0
--- /dev/null
+++ b/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/PcapPacketComparator.java
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.pcap;
+
+import java.util.Comparator;
+
+import org.apache.log4j.Logger;
+
+import org.krakenapps.pcap.packet.PcapPacket;
+
+public class PcapPacketComparator implements Comparator<PcapPacket> {
+
+ /** The Constant LOG. */
+ private static final Logger LOG = Logger.getLogger(PcapMerger.class);
+
+ public int compare(PcapPacket p1, PcapPacket p2) {
+
+ Long p1time = new Long(p1.getPacketHeader().getTsSec()) * 1000000L + new Long(p1.getPacketHeader().getTsUsec());
+ Long p2time = new Long(p2.getPacketHeader().getTsSec()) * 1000000L + new Long(p2.getPacketHeader().getTsUsec());
+ Long delta = p1time - p2time;
+ LOG.debug("p1time: " + p1time.toString() + " p2time: " + p2time.toString() + " delta: " + delta.toString());
+ return delta.intValue();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/0117987e/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/spout/HDFSWriterCallback.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/spout/HDFSWriterCallback.java b/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/spout/HDFSWriterCallback.java
new file mode 100644
index 0000000..fcfcafd
--- /dev/null
+++ b/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/spout/HDFSWriterCallback.java
@@ -0,0 +1,168 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.pcap.spout;
+
+import com.google.common.base.Joiner;
+import com.google.common.collect.ImmutableList;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.log4j.Logger;
+import storm.kafka.Callback;
+import storm.kafka.EmitContext;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.List;
+
+public class HDFSWriterCallback implements Callback {
+ static final long serialVersionUID = 0xDEADBEEFL;
+ private static final Logger LOG = Logger.getLogger(HDFSWriterCallback.class);
+ public static final byte[] PCAP_GLOBAL_HEADER = new byte[] {
+ (byte) 0xd4, (byte) 0xc3, (byte) 0xb2, (byte) 0xa1, 0x02, 0x00, 0x04, 0x00, 0x00, 0x00, 0x00, 0x00
+ ,0x00, 0x00, 0x00, 0x00, (byte) 0xff, (byte) 0xff, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00
+ };
+
+ private static final List<Object> RET_TUPLE = ImmutableList.of((Object)Byte.valueOf((byte) 0x00), Byte.valueOf((byte)0x00));
+ private FileSystem fs;
+ private SequenceFile.Writer writer;
+ private HDFSWriterConfig config;
+ private long batchStartTime;
+ private long numWritten;
+ private EmitContext context;
+
+ public HDFSWriterCallback() {
+ //this.config = config;
+ }
+
+ public HDFSWriterCallback withConfig(HDFSWriterConfig config) {
+ LOG.info("Configured: " + config);
+ this.config = config;
+ return this;
+ }
+
+ @Override
+ public List<Object> apply(List<Object> tuple, EmitContext context) {
+
+ LongWritable ts = (LongWritable) tuple.get(0);
+ BytesWritable rawPacket = (BytesWritable)tuple.get(1);
+ try {
+ turnoverIfNecessary(ts.get());
+ writer.append(ts, headerize(rawPacket.getBytes()));
+ writer.hflush();
+ } catch (IOException e) {
+ LOG.error(e.getMessage(), e);
+ //drop? not sure..
+ }
+ return RET_TUPLE;
+ }
+
+ private static BytesWritable headerize(byte[] packet) {
+ byte[] ret = new byte[packet.length + PCAP_GLOBAL_HEADER.length];
+ int offset = 0;
+ System.arraycopy(PCAP_GLOBAL_HEADER, 0, ret, offset, PCAP_GLOBAL_HEADER.length);
+ offset += PCAP_GLOBAL_HEADER.length;
+ System.arraycopy(packet, 0, ret, offset, packet.length);
+ return new BytesWritable(ret);
+ }
+
+
+ private synchronized void turnoverIfNecessary(long ts) throws IOException {
+ long duration = ts - batchStartTime;
+ if(batchStartTime == 0L || duration > config.getMaxTimeMS() || numWritten > config.getNumPackets()) {
+ //turnover
+ Path path = getPath(ts);
+ if(writer != null) {
+ writer.close();
+ }
+ writer = SequenceFile.createWriter(new Configuration()
+ , SequenceFile.Writer.file(path)
+ , SequenceFile.Writer.keyClass(LongWritable.class)
+ , SequenceFile.Writer.valueClass(BytesWritable.class)
+ );
+ //reset state
+ LOG.info("Turning over and writing to " + path);
+ batchStartTime = ts;
+ numWritten = 0;
+ }
+ }
+
+ private Path getPath(long ts) {
+ String fileName = Joiner.on("_").join("pcap"
+ , "" + ts
+ , context.get(EmitContext.Type.UUID)
+ );
+ return new Path(config.getOutputPath(), fileName);
+ }
+
+ @Override
+ public void initialize(EmitContext context) {
+ this.context = context;
+ try {
+ fs = FileSystem.get(new Configuration());
+ } catch (IOException e) {
+ throw new IllegalStateException("Unable to create filesystem", e);
+ }
+ }
+
+ /**
+ * Closes this resource, relinquishing any underlying resources.
+ * This method is invoked automatically on objects managed by the
+ * {@code try}-with-resources statement.
+ * <p/>
+ * <p>While this interface method is declared to throw {@code
+ * Exception}, implementers are <em>strongly</em> encouraged to
+ * declare concrete implementations of the {@code close} method to
+ * throw more specific exceptions, or to throw no exception at all
+ * if the close operation cannot fail.
+ * <p/>
+ * <p><em>Implementers of this interface are also strongly advised
+ * to not have the {@code close} method throw {@link
+ * InterruptedException}.</em>
+ * <p/>
+ * This exception interacts with a thread's interrupted status,
+ * and runtime misbehavior is likely to occur if an {@code
+ * InterruptedException} is {@linkplain Throwable#addSuppressed
+ * suppressed}.
+ * <p/>
+ * More generally, if it would cause problems for an
+ * exception to be suppressed, the {@code AutoCloseable.close}
+ * method should not throw it.
+ * <p/>
+ * <p>Note that unlike the {@link Closeable#close close}
+ * method of {@link Closeable}, this {@code close} method
+ * is <em>not</em> required to be idempotent. In other words,
+ * calling this {@code close} method more than once may have some
+ * visible side effect, unlike {@code Closeable.close} which is
+ * required to have no effect if called more than once.
+ * <p/>
+ * However, implementers of this interface are strongly encouraged
+ * to make their {@code close} methods idempotent.
+ *
+ * @throws Exception if this resource cannot be closed
+ */
+ @Override
+ public void close() throws Exception {
+ if(writer != null) {
+ writer.close();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/0117987e/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/spout/HDFSWriterConfig.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/spout/HDFSWriterConfig.java b/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/spout/HDFSWriterConfig.java
new file mode 100644
index 0000000..60a23c2
--- /dev/null
+++ b/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/spout/HDFSWriterConfig.java
@@ -0,0 +1,97 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.pcap.spout;
+
+import com.google.common.base.Splitter;
+import com.google.common.collect.Iterables;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+
+public class HDFSWriterConfig implements Serializable {
+ static final long serialVersionUID = 0xDEADBEEFL;
+ private long numPackets;
+ private long maxTimeMS;
+ private String outputPath;
+ private String zookeeperQuorum;
+
+ public HDFSWriterConfig withOutputPath(String path) {
+ outputPath = path;
+ return this;
+ }
+
+ public HDFSWriterConfig withNumPackets(long n) {
+ numPackets = n;
+ return this;
+ }
+
+ public HDFSWriterConfig withMaxTimeMS(long t) {
+ maxTimeMS = t;
+ return this;
+ }
+
+ public HDFSWriterConfig withZookeeperQuorum(String zookeeperQuorum) {
+ this.zookeeperQuorum = zookeeperQuorum;
+ return this;
+ }
+
+ public List<String> getZookeeperServers() {
+ List<String> out = new ArrayList<>();
+ if(zookeeperQuorum != null) {
+ for (String hostPort : Splitter.on(',').split(zookeeperQuorum)) {
+ Iterable<String> tokens = Splitter.on(':').split(hostPort);
+ String host = Iterables.getFirst(tokens, null);
+ if(host != null) {
+ out.add(host);
+ }
+ }
+ }
+ return out;
+ }
+
+ public Integer getZookeeperPort() {
+ if(zookeeperQuorum != null) {
+ String hostPort = Iterables.getFirst(Splitter.on(',').split(zookeeperQuorum), null);
+ String portStr = Iterables.getLast(Splitter.on(':').split(hostPort));
+ return Integer.parseInt(portStr);
+ }
+ return null;
+ }
+
+ public String getOutputPath() {
+ return outputPath;
+ }
+
+ public long getNumPackets() {
+ return numPackets;
+ }
+
+ public long getMaxTimeMS() {
+ return maxTimeMS;
+ }
+
+ @Override
+ public String toString() {
+ return "HDFSWriterConfig{" +
+ "numPackets=" + numPackets +
+ ", maxTimeMS=" + maxTimeMS +
+ ", outputPath='" + outputPath + '\'' +
+ '}';
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/0117987e/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/utils/PcapUtils.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/utils/PcapUtils.java b/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/utils/PcapUtils.java
new file mode 100644
index 0000000..48e99d2
--- /dev/null
+++ b/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/utils/PcapUtils.java
@@ -0,0 +1,475 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.pcap.utils;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+
+import com.google.common.base.Joiner;
+import org.apache.commons.lang.StringUtils;
+
+import com.google.common.collect.BiMap;
+import com.google.common.collect.HashBiMap;
+import org.json.simple.JSONObject;
+
+/**
+ * The Class PcapUtils.
+ */
+public class PcapUtils {
+
+ /** The Constant SESSION_KEY_SEPERATOR. */
+ private static final char SESSION_KEY_SEPERATOR = '-';
+
+ /** The Constant protocolIdToNameMap. */
+ private static final BiMap<Integer, String> protocolIdToNameMap = HashBiMap
+ .create();
+
+ // private static final Map<Integer, String> protocolIdToNameMap = new
+ // HashMap();
+
+ static {
+
+ protocolIdToNameMap.put(0, "HOPOPT");
+ protocolIdToNameMap.put(1, "ICMP");
+ protocolIdToNameMap.put(2, "IGMP");
+ protocolIdToNameMap.put(3, "GGP");
+ protocolIdToNameMap.put(4, "IPV4");
+ protocolIdToNameMap.put(5, "ST");
+ protocolIdToNameMap.put(6, "TCP");
+ protocolIdToNameMap.put(7, "CBT");
+ protocolIdToNameMap.put(8, "EGP");
+ protocolIdToNameMap.put(9, "IGP");
+ protocolIdToNameMap.put(10, "BBN-RCC-MON");
+ protocolIdToNameMap.put(11, "NVP-II");
+ protocolIdToNameMap.put(12, "PUP");
+ protocolIdToNameMap.put(13, "ARGUS");
+ protocolIdToNameMap.put(14, "EMCON");
+ protocolIdToNameMap.put(15, "XNET");
+ protocolIdToNameMap.put(16, "CHAOS");
+ protocolIdToNameMap.put(17, "UDP");
+ protocolIdToNameMap.put(18, "MUX");
+ protocolIdToNameMap.put(19, "DCN-MEAS");
+ protocolIdToNameMap.put(20, "HMP");
+ protocolIdToNameMap.put(21, "PRM");
+ protocolIdToNameMap.put(22, "XNS-IDP");
+ protocolIdToNameMap.put(23, "TRUNK-1");
+ protocolIdToNameMap.put(24, "TRUNK-2");
+ protocolIdToNameMap.put(25, "LEAF-1");
+ protocolIdToNameMap.put(26, "LEAF-2");
+ protocolIdToNameMap.put(27, "RDP");
+ protocolIdToNameMap.put(28, "IRTP");
+ protocolIdToNameMap.put(29, "ISO-TP4");
+ protocolIdToNameMap.put(30, "NETBLT");
+ protocolIdToNameMap.put(31, "MFE-NSP");
+ protocolIdToNameMap.put(32, "MERIT-INP");
+ protocolIdToNameMap.put(33, "DCCP");
+ protocolIdToNameMap.put(34, "3PC");
+ protocolIdToNameMap.put(35, "IDPR");
+ protocolIdToNameMap.put(36, "XTP");
+ protocolIdToNameMap.put(37, "DDP");
+ protocolIdToNameMap.put(38, "IDPR-CMTP");
+ protocolIdToNameMap.put(39, "TP++");
+ protocolIdToNameMap.put(40, "IL");
+ protocolIdToNameMap.put(41, "IPV6");
+ protocolIdToNameMap.put(42, "SDRP");
+ protocolIdToNameMap.put(43, "IPV6-ROUTE");
+ protocolIdToNameMap.put(44, "IPV6-FRAG");
+ protocolIdToNameMap.put(45, "IDRP");
+ protocolIdToNameMap.put(46, "RSVP");
+ protocolIdToNameMap.put(47, "GRE");
+ protocolIdToNameMap.put(48, "DSR");
+ protocolIdToNameMap.put(49, "BNA");
+ protocolIdToNameMap.put(50, "ESP");
+ protocolIdToNameMap.put(51, "AH");
+ protocolIdToNameMap.put(52, "I-NLSP");
+ protocolIdToNameMap.put(53, "SWIPE");
+ protocolIdToNameMap.put(54, "NARP");
+ protocolIdToNameMap.put(55, "MOBILE");
+ protocolIdToNameMap.put(56, "TLSP");
+ protocolIdToNameMap.put(57, "SKIP");
+ protocolIdToNameMap.put(58, "IPV6-ICMP");
+ protocolIdToNameMap.put(59, "IPV6-NONXT");
+ protocolIdToNameMap.put(60, "IPV6-OPTS");
+ protocolIdToNameMap.put(62, "CFTP");
+ protocolIdToNameMap.put(64, "SAT-EXPAK");
+ protocolIdToNameMap.put(65, "KRYPTOLAN");
+ protocolIdToNameMap.put(66, "RVD");
+ protocolIdToNameMap.put(67, "IPPC");
+ protocolIdToNameMap.put(69, "SAT-MON");
+ protocolIdToNameMap.put(70, "VISA");
+ protocolIdToNameMap.put(71, "IPCV");
+ protocolIdToNameMap.put(72, "CPNX");
+ protocolIdToNameMap.put(73, "CPHB");
+ protocolIdToNameMap.put(74, "WSN");
+ protocolIdToNameMap.put(75, "PVP");
+ protocolIdToNameMap.put(76, "BR-SAT-MON");
+ protocolIdToNameMap.put(77, "SUN-ND");
+ protocolIdToNameMap.put(78, "WB-MON");
+ protocolIdToNameMap.put(79, "WB-EXPAK");
+ protocolIdToNameMap.put(80, "ISO-IP");
+ protocolIdToNameMap.put(81, "VMTP");
+ protocolIdToNameMap.put(82, "SECURE-VMTP");
+ protocolIdToNameMap.put(83, "VINES");
+ protocolIdToNameMap.put(84, "TTP");
+ protocolIdToNameMap.put(85, "NSFNET-IGP");
+ protocolIdToNameMap.put(86, "DGP");
+ protocolIdToNameMap.put(87, "TCF");
+ protocolIdToNameMap.put(88, "EIGRP");
+ protocolIdToNameMap.put(89, "OSPFIGP");
+ protocolIdToNameMap.put(90, "SPRITE-RPC");
+ protocolIdToNameMap.put(91, "LARP");
+ protocolIdToNameMap.put(92, "MTP");
+ protocolIdToNameMap.put(93, "AX.25");
+ protocolIdToNameMap.put(94, "IPIP");
+ protocolIdToNameMap.put(95, "MICP");
+ protocolIdToNameMap.put(96, "SCC-SP");
+ protocolIdToNameMap.put(97, "ETHERIP");
+ protocolIdToNameMap.put(98, "ENCAP");
+ protocolIdToNameMap.put(100, "GMTP");
+ protocolIdToNameMap.put(101, "IFMP");
+ protocolIdToNameMap.put(102, "PNNI");
+ protocolIdToNameMap.put(103, "PIM");
+ protocolIdToNameMap.put(104, "ARIS");
+ protocolIdToNameMap.put(105, "SCPS");
+ protocolIdToNameMap.put(106, "QNX");
+ protocolIdToNameMap.put(107, "A/N");
+ protocolIdToNameMap.put(108, "IPCOMP");
+ protocolIdToNameMap.put(109, "SNP");
+ protocolIdToNameMap.put(110, "COMPAQ-PEER");
+ protocolIdToNameMap.put(111, "IPX-IN-IP");
+ protocolIdToNameMap.put(112, "VRRP");
+ protocolIdToNameMap.put(113, "PGM");
+ protocolIdToNameMap.put(115, "L2TP");
+ protocolIdToNameMap.put(116, "DDX");
+ protocolIdToNameMap.put(117, "IATP");
+ protocolIdToNameMap.put(118, "STP");
+ protocolIdToNameMap.put(119, "SRP");
+ protocolIdToNameMap.put(120, "UTI");
+ protocolIdToNameMap.put(121, "SMP");
+ protocolIdToNameMap.put(122, "SM");
+ protocolIdToNameMap.put(123, "PTP");
+ protocolIdToNameMap.put(124, "ISIS OVER IPV4");
+ protocolIdToNameMap.put(125, "FIRE");
+ protocolIdToNameMap.put(126, "CRTP");
+ protocolIdToNameMap.put(127, "CRUDP");
+ protocolIdToNameMap.put(128, "SSCOPMCE");
+ protocolIdToNameMap.put(129, "IPLT");
+ protocolIdToNameMap.put(130, "SPS");
+ protocolIdToNameMap.put(131, "PIPE");
+ protocolIdToNameMap.put(132, "SCTP");
+ protocolIdToNameMap.put(133, "FC");
+ protocolIdToNameMap.put(134, "RSVP-E2E-IGNORE");
+ protocolIdToNameMap.put(135, "MOBILITY HEADER");
+ protocolIdToNameMap.put(136, "UDPLITE");
+ protocolIdToNameMap.put(137, "MPLS-IN-IP");
+ protocolIdToNameMap.put(138, "MANET");
+ protocolIdToNameMap.put(139, "HIP");
+ protocolIdToNameMap.put(140, "SHIM6");
+ protocolIdToNameMap.put(141, "WESP");
+ protocolIdToNameMap.put(142, "ROHC");
+ }
+
+ /** The Constant protocolNameToIdMap. */
+ private static final BiMap<String, Integer> protocolNameToIdMap = protocolIdToNameMap
+ .inverse();
+
+ // private static final Map<String, Integer> protocolNameToIdMap =
+ // invertMap(protocolIdToNameMap);
+
+ /**
+ * Convert ipv4 ip to hex.
+ *
+ * @param ipAddress
+ * the ip address
+ * @return the string
+ */
+ public static String convertIpv4IpToHex(String ipAddress) {
+ StringBuffer hexIp = new StringBuffer(64);
+ String[] ipSegments = ipAddress.split("\\.");
+
+ for (String ipSegment : ipSegments) {
+ hexIp.append(convertIpSegmentToHex(ipSegment));
+ }
+
+ return hexIp.toString();
+
+ }
+
+ public static String convertHexToIpv4Ip(String hex) {
+ List<Integer> ipSegments = new ArrayList<>();
+ for(int i = 0; i < hex.length(); i += 2) {
+ String segment = hex.substring(i, i + 2);
+ ipSegments.add(Integer.parseInt(segment, 16));
+ }
+ return Joiner.on(".").join(ipSegments);
+ }
+
+ /**
+ * Gets the session key.
+ *
+ * @param srcIp
+ * the src ip
+ * @param dstIp
+ * the dst ip
+ * @param protocol
+ * the protocol
+ * @param srcPort
+ * the src port
+ * @param dstPort
+ * the dst port
+ * @return the session key
+ */
+ public static String getSessionKey(String srcIp, String dstIp,
+ String protocol, String srcPort, String dstPort) {
+ return getSessionKey(srcIp, dstIp, protocol, srcPort, dstPort, null, null);
+ }
+
+ /**
+ * Gets the session key.
+ *
+ * @param srcIp
+ * the src ip
+ * @param dstIp
+ * the dst ip
+ * @param protocol
+ * the protocol
+ * @param srcPort
+ * the src port
+ * @param dstPort
+ * the dst port
+ * @param ipId
+ * the ip id
+ * @param fragmentOffset
+ * the fragment offset
+ * @return the session key
+ */
+ public static String getSessionKey(String srcIp, String dstIp,
+ String protocol, String srcPort, String dstPort, String ipId,
+ String fragmentOffset) {
+
+ StringBuffer sb = new StringBuffer(40);
+ sb.append(convertIpv4IpToHex(srcIp)).append(SESSION_KEY_SEPERATOR)
+ .append(convertIpv4IpToHex(dstIp)).append(SESSION_KEY_SEPERATOR)
+ .append(protocol == null ? "0" : protocol)
+ .append(SESSION_KEY_SEPERATOR).append(srcPort == null ? "0" : srcPort)
+ .append(SESSION_KEY_SEPERATOR).append(dstPort == null ? "0" : dstPort)
+ .append(SESSION_KEY_SEPERATOR).append(ipId == null ? "0" : ipId)
+ .append(SESSION_KEY_SEPERATOR)
+ .append(fragmentOffset == null ? "0" : fragmentOffset);
+
+ return sb.toString();
+ }
+
+ public static String getSessionKey(JSONObject message) {
+ String srcIp = (String) message.get("ip_src_addr");
+ String dstIp = (String) message.get("ip_dst_addr");
+ Long protocol = (Long) message.get("ip_protocol");
+ Long srcPort = (Long) message.get("ip_src_port");
+ Long dstPort = (Long) message.get("ip_dst_port");
+ Long ipId = (Long) message.get("ip_id");
+ String ipIdString = ipId == null ? null : ipId.toString();
+ Long fragmentOffset = (Long) message.get("frag_offset");
+ String fragmentOffsetString = fragmentOffset == null ? null : fragmentOffset.toString();
+ return PcapUtils.getSessionKey(srcIp, dstIp, protocol.toString(), srcPort.toString(), dstPort.toString(), ipIdString, fragmentOffsetString);
+ }
+
+ public static String getPartialSessionKey(String srcIp, String dstIp,
+ String protocol, String srcPort, String dstPort) {
+ StringBuffer sb = new StringBuffer(40);
+ sb.append(convertIpv4IpToHex(srcIp)).append(SESSION_KEY_SEPERATOR)
+ .append(convertIpv4IpToHex(dstIp)).append(SESSION_KEY_SEPERATOR)
+ .append(protocol == null ? "0" : protocol)
+ .append(SESSION_KEY_SEPERATOR).append(srcPort == null ? "0" : srcPort)
+ .append(SESSION_KEY_SEPERATOR).append(dstPort == null ? "0" : dstPort);
+ return sb.toString();
+ }
+
+ /**
+ * Gets the session key.
+ *
+ * @param srcIp
+ * the src ip
+ * @param dstIp
+ * the dst ip
+ * @param protocol
+ * the protocol
+ * @param srcPort
+ * the src port
+ * @param dstPort
+ * the dst port
+ * @param ipId
+ * the ip id
+ * @param fragmentOffset
+ * the fragment offset
+ * @return the session key
+ */
+ public static String getSessionKey(String srcIp, String dstIp, int protocol,
+ int srcPort, int dstPort, int ipId, int fragmentOffset) {
+ String keySeperator = "-";
+ StringBuffer sb = new StringBuffer(40);
+ sb.append(convertIpv4IpToHex(srcIp)).append(keySeperator)
+ .append(convertIpv4IpToHex(dstIp)).append(keySeperator)
+ .append(protocol).append(keySeperator).append(srcPort)
+ .append(keySeperator).append(dstPort).append(keySeperator).append(ipId)
+ .append(keySeperator).append(fragmentOffset);
+
+ return sb.toString();
+ }
+
+ /**
+ * Gets the short session key. (5-tuple only)
+ *
+ * @param srcIp
+ * the src ip
+ * @param dstIp
+ * the dst ip
+ * @param protocol
+ * the protocol
+ * @param srcPort
+ * the src port
+ * @param dstPort
+ * the dst port
+ * @return the session key
+ */
+ public static String getShortSessionKey(String srcIp, String dstIp, int protocol,
+ int srcPort, int dstPort) {
+ String keySeperator = "-";
+ StringBuffer sb = new StringBuffer(40);
+ sb.append(convertIpv4IpToHex(srcIp)).append(keySeperator)
+ .append(convertIpv4IpToHex(dstIp)).append(keySeperator)
+ .append(protocol).append(keySeperator).append(srcPort)
+ .append(keySeperator).append(dstPort);
+
+ return sb.toString();
+ }
+
+ // public static String convertPortToHex(String portNumber) {
+ // return convertPortToHex(Integer.valueOf(portNumber));
+ //
+ // }
+ //
+ // public static String convertPortToHex(int portNumber) {
+ // return convertToHex(portNumber, 4);
+ //
+ // }
+ //
+ // public static String convertProtocolToHex(String protocol) {
+ // return convertProtocolToHex(Integer.valueOf(protocol));
+ //
+ // }
+ //
+ // public static String convertProtocolToHex(int protocol) {
+ // return convertToHex(protocol, 2);
+ // }
+
+ /**
+ * Convert ip segment to hex.
+ *
+ * @param ipSegment
+ * the ip segment
+ * @return the string
+ */
+ public static String convertIpSegmentToHex(String ipSegment) {
+ return convertIpSegmentToHex(Integer.valueOf(ipSegment));
+
+ }
+
+ /**
+ * Convert ip segment to hex.
+ *
+ * @param ipSegment
+ * the ip segment
+ * @return the string
+ */
+ public static String convertIpSegmentToHex(int ipSegment) {
+ return convertToHex(ipSegment, 2);
+
+ }
+
+ /**
+ * Convert to hex.
+ *
+ * @param number
+ * the number
+ * @param length
+ * the length
+ * @return the string
+ */
+ public static String convertToHex(int number, int length) {
+ return StringUtils.leftPad(Integer.toHexString(number), length, '0');
+
+ }
+
+ /**
+ * Gets the protocol name.
+ *
+ * @param protocolNumber
+ * the protocol number
+ *
+ * @return the protocol name
+ */
+ public static String getProtocolNameFromId(int protocolNumber) {
+ String protocolName = protocolIdToNameMap.get(protocolNumber);
+
+ if (protocolName == null) {
+ protocolName = String.valueOf(protocolNumber);
+ }
+ return protocolName;
+ }
+
+ /**
+ * Gets the protocol id from name.
+ *
+ * @param protocolName
+ * the protocol name
+ * @return the protocol id from name
+ */
+ public static int getProtocolIdFromName(String protocolName) {
+ Integer protocolNumber = protocolNameToIdMap
+ .get(protocolName.toUpperCase());
+
+ if (protocolNumber == null) {
+ protocolNumber = -1;
+ }
+ return protocolNumber;
+ }
+
+ /**
+ * Invert map.
+ *
+ * @param <V>
+ * the value type
+ * @param <K>
+ * the key type
+ * @param map
+ * the map
+ * @return the map
+ */
+ private static <V, K> Map<V, K> invertMap(Map<K, V> map) {
+
+ Map<V, K> inv = new HashMap<V, K>();
+
+ for (Entry<K, V> entry : map.entrySet())
+ inv.put(entry.getValue(), entry.getKey());
+
+ return inv;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/0117987e/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/writer/PcapWriter.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/writer/PcapWriter.java b/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/writer/PcapWriter.java
new file mode 100644
index 0000000..c07a217
--- /dev/null
+++ b/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/writer/PcapWriter.java
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.pcap.writer;
+
+import backtype.storm.tuple.Tuple;
+import org.apache.metron.hbase.writer.HBaseWriter;
+import org.apache.metron.pcap.utils.PcapUtils;
+import org.json.simple.JSONObject;
+
+import java.util.HashMap;
+import java.util.Map;
+
+public class PcapWriter extends HBaseWriter {
+
+ private String column;
+
+ public PcapWriter(String tableName, String column) {
+ super(tableName);
+ this.column = column;
+ }
+
+ @Override
+ public byte[] getKey(Tuple tuple, JSONObject message) {
+ String key = PcapUtils.getSessionKey(message);
+ return key.getBytes();
+ }
+
+ @Override
+ public long getTimestamp(Tuple tuple, JSONObject message) {
+ return (long) message.get("ts_micro");
+ }
+
+ @Override
+ public Map<String, byte[]> getValues(Tuple tuple, JSONObject message) {
+ Map<String, byte[]> values = new HashMap<>();
+ values.put(column, tuple.getBinary(0));
+ return values;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/0117987e/metron-platform/metron-pcap/src/main/java/storm/kafka/Callback.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-pcap/src/main/java/storm/kafka/Callback.java b/metron-platform/metron-pcap/src/main/java/storm/kafka/Callback.java
new file mode 100644
index 0000000..ff05c29
--- /dev/null
+++ b/metron-platform/metron-pcap/src/main/java/storm/kafka/Callback.java
@@ -0,0 +1,26 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package storm.kafka;
+
+import java.io.Serializable;
+import java.util.List;
+
+public interface Callback extends AutoCloseable, Serializable {
+ List<Object> apply(List<Object> tuple, EmitContext context);
+ void initialize(EmitContext context);
+}