You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@metron.apache.org by om...@apache.org on 2015/12/08 07:38:12 UTC

[48/51] [partial] incubator-metron git commit: Initial import of code from https://github.com/OpenSOC/opensoc at ac0b00373f8f56dfae03a8109af5feb373ea598e.

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/05e188ba/opensoc-streaming/OpenSOC-Common/src/main/java/com/opensoc/json/serialization/JSONKafkaSerializer.java
----------------------------------------------------------------------
diff --git a/opensoc-streaming/OpenSOC-Common/src/main/java/com/opensoc/json/serialization/JSONKafkaSerializer.java b/opensoc-streaming/OpenSOC-Common/src/main/java/com/opensoc/json/serialization/JSONKafkaSerializer.java
new file mode 100644
index 0000000..08f3b44
--- /dev/null
+++ b/opensoc-streaming/OpenSOC-Common/src/main/java/com/opensoc/json/serialization/JSONKafkaSerializer.java
@@ -0,0 +1,263 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.opensoc.json.serialization;
+
+import java.io.BufferedReader;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.FileNotFoundException;
+import java.io.FileReader;
+import java.io.IOException;
+import java.io.Reader;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Map.Entry;
+
+import org.json.simple.JSONArray;
+import org.json.simple.JSONObject;
+import org.json.simple.parser.JSONParser;
+import org.json.simple.parser.ParseException;
+
+import kafka.serializer.Decoder;
+import kafka.serializer.Encoder;
+import kafka.utils.VerifiableProperties;
+import static com.opensoc.json.serialization.JSONEncoderHelper.*;
+import static com.opensoc.json.serialization.JSONDecoderHelper.*;
+
+/**
+ * JSON Serailization class for kafka. Implements kafka Encoder and Decoder
+ * String, JSONObject, Number, Boolean,JSONObject.NULL JSONArray
+ * 
+ * @author kiran
+ * 
+ */
+
+public class JSONKafkaSerializer implements Encoder<JSONObject>,
+		Decoder<JSONObject> {
+
+	// Object ID's for different types
+	public static final byte StringID = 1;
+	public static final byte JSONObjectID = 2;
+	public static final byte NumberID = 3;
+	public static final byte BooleanID = 4;
+	public static final byte NULLID = 5;
+	public static final byte JSONArrayID = 6;
+
+	public JSONKafkaSerializer() {
+		// Blank constructor needed by Storm
+
+	}
+
+	public JSONKafkaSerializer(VerifiableProperties props) {
+		// Do Nothing. constructor needed by Storm
+	}
+
+	/*
+	 * Main Method for unit testing
+	 */
+	public static void main(String args[]) throws IOException {
+
+		//String Input = "/home/kiran/git/opensoc-streaming/OpenSOC-Common/BroExampleOutput";
+		String Input = "/tmp/test";
+
+		BufferedReader reader = new BufferedReader(new FileReader(Input));
+
+		// String jsonString =
+		// "{\"dns\":{\"ts\":[14.0,12,\"kiran\"],\"uid\":\"abullis@mail.csuchico.edu\",\"id.orig_h\":\"10.122.196.204\", \"endval\":null}}";
+		String jsonString ="";// reader.readLine();
+		JSONParser parser = new JSONParser();
+		JSONObject json = null;
+		int count = 1;
+
+		if (args.length > 0)
+			count = Integer.parseInt(args[0]);
+
+		//while ((jsonString = reader.readLine()) != null) 
+		jsonString = reader.readLine();
+		{
+			try {
+				json = (JSONObject) parser.parse(jsonString);
+				System.out.println(json);
+			} catch (ParseException e) {
+				// TODO Auto-generated catch block
+				e.printStackTrace();
+			}
+
+			String jsonString2 = null;
+
+			JSONKafkaSerializer ser = new JSONKafkaSerializer();
+
+			for (int i = 0; i < count; i++) {
+				byte[] bytes = ser.toBytes(json);
+
+				jsonString2 = ((JSONObject)ser.fromBytes(bytes)).toJSONString();
+			}
+			System.out.println((jsonString2));
+			System.out
+					.println(jsonString2.equalsIgnoreCase(json.toJSONString()));
+		}
+
+	}
+
+	@SuppressWarnings("unchecked")
+	public JSONObject fromBytes(byte[] input) {
+
+		ByteArrayInputStream inputBuffer = new ByteArrayInputStream(input);
+		DataInputStream data = new DataInputStream(inputBuffer);
+
+		JSONObject output = new JSONObject();
+
+		try {
+			int mapSize = data.readInt();
+
+			for (int i = 0; i < mapSize; i++) {
+				String key = (String) getObject(data);
+				// System.out.println("Key Found"+ key);
+				Object val = getObject(data);
+				output.put(key, val);
+			}
+
+		} catch (IOException e) {
+			// TODO Auto-generated catch block
+			e.printStackTrace();
+			return null;
+		}
+
+		return output;
+	}
+	
+	@SuppressWarnings("unchecked")
+	public JSONObject fromBytes1(DataInputStream data) {
+
+		//ByteArrayInputStream inputBuffer = new ByteArrayInputStream(input);
+		//DataInputStream data = new DataInputStream(inputBuffer);
+
+		JSONObject output = new JSONObject();
+
+		try {
+			int mapSize = data.readInt();
+
+			for (int i = 0; i < mapSize; i++) {
+				String key = (String) getObject(data);
+				// System.out.println("Key Found"+ key);
+				Object val = getObject(data);
+				output.put(key, val);
+			}
+
+		} catch (IOException e) {
+			// TODO Auto-generated catch block
+			e.printStackTrace();
+			return null;
+		}
+
+		return output;
+	}
+
+	@SuppressWarnings({ "unchecked", "rawtypes" })
+	public byte[] toBytes(JSONObject input) {
+
+		ByteArrayOutputStream outputBuffer = new ByteArrayOutputStream();
+		DataOutputStream data = new DataOutputStream(outputBuffer);
+
+		Iterator it = input.entrySet().iterator();
+		try {
+
+			// write num of entries into output. 
+			//each KV pair is counted as an entry
+			data.writeInt(input.size());
+
+			// Write every single entry in hashmap
+			//Assuming key to be String.
+			while (it.hasNext()) {
+				Map.Entry<String, Object> entry = (Entry<String, Object>) it
+						.next();
+				putObject(data, entry.getKey());
+				putObject(data, entry.getValue());
+			}
+		} catch (Exception e) {
+			e.printStackTrace();
+			return null;
+		}
+
+		return outputBuffer.toByteArray();
+	}
+
+	private void putObject(DataOutputStream data, Object value)
+			throws IOException {
+
+		//Check object type and invoke appropriate method
+		if (value instanceof JSONObject) {
+			putJSON(data, (JSONObject) value);
+			return;
+
+		}
+
+		if (value instanceof String) {
+			putString(data, (String) value);
+			return;
+		}
+
+		if (value instanceof Number) {
+			putNumber(data, (Number) value);
+			return;
+		}
+
+		if (value instanceof Boolean) {
+			putBoolean(data, (Boolean) value);
+			return;
+		}
+
+		if (value == null) {
+			putNull(data, value);
+			return;
+		}
+
+		if (value instanceof JSONArray) {
+			putArray(data, (JSONArray) value);
+			return;
+		}
+
+	}
+
+	private void putJSON(DataOutputStream data, JSONObject value)
+			throws IOException {
+
+		// JSON ID is 2
+		data.writeByte(JSONKafkaSerializer.JSONObjectID);
+		data.write(toBytes(value));
+
+	}
+
+	public void putArray(DataOutputStream data, JSONArray array)
+			throws IOException {
+
+		data.writeByte(JSONKafkaSerializer.JSONArrayID);
+
+		data.writeInt(array.size());
+
+		for (Object o : array)
+			putObject(data, o);
+
+	}
+
+
+	
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/05e188ba/opensoc-streaming/OpenSOC-Common/src/main/java/com/opensoc/json/serialization/JSONKryoSerializer.java
----------------------------------------------------------------------
diff --git a/opensoc-streaming/OpenSOC-Common/src/main/java/com/opensoc/json/serialization/JSONKryoSerializer.java b/opensoc-streaming/OpenSOC-Common/src/main/java/com/opensoc/json/serialization/JSONKryoSerializer.java
new file mode 100644
index 0000000..7b7d394
--- /dev/null
+++ b/opensoc-streaming/OpenSOC-Common/src/main/java/com/opensoc/json/serialization/JSONKryoSerializer.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.opensoc.json.serialization;
+
+import org.json.simple.JSONObject;
+
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.Output;
+
+/**
+ * @author kiran Custom Serializer to help Storm encode and decode JSONObjects
+ */
+
+public class JSONKryoSerializer extends
+		com.esotericsoftware.kryo.Serializer<JSONObject> {
+
+	// JSONKafkaSerializer object actually does the heavy lifting.
+	private JSONKafkaSerializer jsonSerde = new JSONKafkaSerializer();
+
+	@Override
+	public void write(Kryo kryo, Output output, JSONObject json) {
+
+		byte[] bytes = jsonSerde.toBytes(json);
+		output.writeInt(bytes.length);
+		output.write(bytes);
+	}
+
+	@Override
+	public JSONObject read(Kryo kryo, Input input, Class<JSONObject> type) {
+
+		// Get number of Entries
+		int size = input.readInt();
+		byte[] bytes = input.readBytes(size);
+
+		JSONObject json = jsonSerde.fromBytes(bytes);
+
+		return json;
+
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/05e188ba/opensoc-streaming/OpenSOC-Common/src/main/java/com/opensoc/metrics/MetricReporter.java
----------------------------------------------------------------------
diff --git a/opensoc-streaming/OpenSOC-Common/src/main/java/com/opensoc/metrics/MetricReporter.java b/opensoc-streaming/OpenSOC-Common/src/main/java/com/opensoc/metrics/MetricReporter.java
new file mode 100644
index 0000000..3d344a2
--- /dev/null
+++ b/opensoc-streaming/OpenSOC-Common/src/main/java/com/opensoc/metrics/MetricReporter.java
@@ -0,0 +1,89 @@
+package com.opensoc.metrics;
+
+import com.codahale.metrics.ConsoleReporter;
+import com.codahale.metrics.Counter;
+import com.codahale.metrics.JmxReporter;
+import com.codahale.metrics.MetricRegistry;
+import com.codahale.metrics.graphite.Graphite;
+import com.codahale.metrics.graphite.GraphiteReporter;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.log4j.Logger;
+
+public class MetricReporter {
+
+	final MetricRegistry metrics = new MetricRegistry();
+	private ConsoleReporter consoleReporter = null;
+	private JmxReporter jmxReporter = null;
+	private GraphiteReporter graphiteReporter = null;
+
+	private Class _klas;
+	private String _topologyname = "topology";
+
+	/** The Constant LOGGER. */
+	private static final Logger _Logger = Logger
+			.getLogger(MetricReporter.class);
+
+	public void initialize(Map config, Class klas) {
+
+		_Logger.debug("===========Initializing Reporter");
+		this._klas = klas;
+		if (config.get("topologyname")!=null)
+			_topologyname = (String) config.get("topologyname");
+			
+		this.start(config);
+
+	}
+
+	public Counter registerCounter(String countername) {
+		return metrics.counter(MetricRegistry.name(_topologyname,_klas.getCanonicalName(), countername));
+	}
+
+	public void start(Map config) {
+		try {
+			if (config.get("reporter.jmx").equals("true")) {
+				jmxReporter = JmxReporter.forRegistry(metrics).build();
+				jmxReporter.start();
+			}
+
+			if (config.get("reporter.console").equals("true")) {
+				consoleReporter = ConsoleReporter.forRegistry(metrics).build();
+				consoleReporter.start(1, TimeUnit.SECONDS);
+			}
+
+		} catch (Exception e) {
+			e.printStackTrace();
+		}
+
+		try {
+			if (config.get("reporter.graphite").equals("true")) {
+				String address = (String) config.get("graphite.address");
+				int port = Integer.parseInt((String) config
+						.get("graphite.port"));
+
+				_Logger.debug("===========Graphite ADDRESS: " + address + ":"
+						+ port);
+
+				Graphite graphite = new Graphite(new InetSocketAddress(address,
+						port));
+				// Check if graphite connectivity works
+				graphite.connect();
+				graphite.close();
+
+				graphiteReporter = GraphiteReporter.forRegistry(metrics).build(
+						graphite);
+
+				_Logger.debug("---------******STARTING GRAPHITE*********---------");
+				graphiteReporter.start(1, TimeUnit.SECONDS);
+			}
+		}
+
+		catch (IOException io) {
+			_Logger.warn("Unable to Connect to Graphite");
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/05e188ba/opensoc-streaming/OpenSOC-Common/src/main/java/com/opensoc/metrics/MyMetricReporter.java
----------------------------------------------------------------------
diff --git a/opensoc-streaming/OpenSOC-Common/src/main/java/com/opensoc/metrics/MyMetricReporter.java b/opensoc-streaming/OpenSOC-Common/src/main/java/com/opensoc/metrics/MyMetricReporter.java
new file mode 100644
index 0000000..fc6752f
--- /dev/null
+++ b/opensoc-streaming/OpenSOC-Common/src/main/java/com/opensoc/metrics/MyMetricReporter.java
@@ -0,0 +1,33 @@
+package com.opensoc.metrics;
+
+import com.codahale.metrics.ConsoleReporter;
+import com.codahale.metrics.JmxReporter;
+import com.codahale.metrics.MetricRegistry;
+import com.codahale.metrics.graphite.GraphiteReporter;
+
+public class MyMetricReporter extends MetricReporter {
+	
+	final MetricRegistry metrics = new MetricRegistry();
+	private ConsoleReporter consoleReporter = null;
+	private JmxReporter jmxReporter=null; 
+	private GraphiteReporter graphiteReporter = null;
+
+	
+	public MyMetricReporter(boolean withConsole, boolean withJMX, boolean witGraphite)
+	{
+		consoleReporter = ConsoleReporter.forRegistry(metrics).build();
+		jmxReporter = JmxReporter.forRegistry(metrics).build();
+		graphiteReporter = GraphiteReporter.forRegistry(metrics).build(null);
+	}
+	
+
+	public static void main(String[] args) {
+		// TODO Auto-generated method stub
+
+	}
+
+	public void report() {
+
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/05e188ba/opensoc-streaming/OpenSOC-Common/src/main/java/com/opensoc/metrics/NullReporter.java
----------------------------------------------------------------------
diff --git a/opensoc-streaming/OpenSOC-Common/src/main/java/com/opensoc/metrics/NullReporter.java b/opensoc-streaming/OpenSOC-Common/src/main/java/com/opensoc/metrics/NullReporter.java
new file mode 100644
index 0000000..6585a32
--- /dev/null
+++ b/opensoc-streaming/OpenSOC-Common/src/main/java/com/opensoc/metrics/NullReporter.java
@@ -0,0 +1,10 @@
+package com.opensoc.metrics;
+
+public class NullReporter extends MetricReporter {
+	
+	public void report()
+	{
+
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/05e188ba/opensoc-streaming/OpenSOC-Common/src/main/java/com/opensoc/parser/interfaces/MessageFilter.java
----------------------------------------------------------------------
diff --git a/opensoc-streaming/OpenSOC-Common/src/main/java/com/opensoc/parser/interfaces/MessageFilter.java b/opensoc-streaming/OpenSOC-Common/src/main/java/com/opensoc/parser/interfaces/MessageFilter.java
new file mode 100644
index 0000000..339a2ec
--- /dev/null
+++ b/opensoc-streaming/OpenSOC-Common/src/main/java/com/opensoc/parser/interfaces/MessageFilter.java
@@ -0,0 +1,10 @@
+package com.opensoc.parser.interfaces;
+
+import org.json.simple.JSONObject;
+
+
+public interface MessageFilter {
+
+	public boolean emitTuple(JSONObject message);
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/05e188ba/opensoc-streaming/OpenSOC-Common/src/main/java/com/opensoc/parser/interfaces/MessageParser.java
----------------------------------------------------------------------
diff --git a/opensoc-streaming/OpenSOC-Common/src/main/java/com/opensoc/parser/interfaces/MessageParser.java b/opensoc-streaming/OpenSOC-Common/src/main/java/com/opensoc/parser/interfaces/MessageParser.java
new file mode 100644
index 0000000..700d3ab
--- /dev/null
+++ b/opensoc-streaming/OpenSOC-Common/src/main/java/com/opensoc/parser/interfaces/MessageParser.java
@@ -0,0 +1,10 @@
+package com.opensoc.parser.interfaces;
+
+import org.json.simple.JSONObject;
+
+public interface MessageParser {
+	
+	void initializeParser();
+	JSONObject parse(byte[] raw_message);
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/05e188ba/opensoc-streaming/OpenSOC-Common/src/main/java/com/opensoc/pcap/Constants.java
----------------------------------------------------------------------
diff --git a/opensoc-streaming/OpenSOC-Common/src/main/java/com/opensoc/pcap/Constants.java b/opensoc-streaming/OpenSOC-Common/src/main/java/com/opensoc/pcap/Constants.java
new file mode 100644
index 0000000..9192665
--- /dev/null
+++ b/opensoc-streaming/OpenSOC-Common/src/main/java/com/opensoc/pcap/Constants.java
@@ -0,0 +1,21 @@
+package com.opensoc.pcap;
+
+
+/**
+* The Interface Constants.
+* 
+* @author sheetal
+* @version $Revision: 1.0 $
+*/
+public interface Constants {
+
+/** The protocol tcp. */
+public static final int PROTOCOL_TCP = 6;
+
+/** The protocol udp. */
+public static final int PROTOCOL_UDP = 17;
+
+/** The document key separator. */
+public static final char DOCUMENT_KEY_SEPARATOR = '-';
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/05e188ba/opensoc-streaming/OpenSOC-Common/src/main/java/com/opensoc/pcap/IEEE_802_1Q.java
----------------------------------------------------------------------
diff --git a/opensoc-streaming/OpenSOC-Common/src/main/java/com/opensoc/pcap/IEEE_802_1Q.java b/opensoc-streaming/OpenSOC-Common/src/main/java/com/opensoc/pcap/IEEE_802_1Q.java
new file mode 100644
index 0000000..6375a3f
--- /dev/null
+++ b/opensoc-streaming/OpenSOC-Common/src/main/java/com/opensoc/pcap/IEEE_802_1Q.java
@@ -0,0 +1,27 @@
+package com.opensoc.pcap;
+
+public class IEEE_802_1Q {
+
+	  int priorityCodePoint = 0;
+	  int dropEligibleIndicator = 0;
+	  int vLANIdentifier = 0;
+
+	  public IEEE_802_1Q(int priorityCodePoint, int dropEligibleIndicator,
+	      int vLANIdentifier) {
+	    this.priorityCodePoint = priorityCodePoint;
+	    this.dropEligibleIndicator = dropEligibleIndicator;
+	    this.vLANIdentifier = vLANIdentifier;
+	  }
+
+	  public int getPriorityCodePoint() {
+	    return priorityCodePoint;
+	  }
+
+	  public int getDropEligibleIndicator() {
+	    return dropEligibleIndicator;
+	  }
+
+	  public int getvLANIdentifier() {
+	    return vLANIdentifier;
+	  }
+	}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/05e188ba/opensoc-streaming/OpenSOC-Common/src/main/java/com/opensoc/pcap/OpenSocEthernetDecoder.java
----------------------------------------------------------------------
diff --git a/opensoc-streaming/OpenSOC-Common/src/main/java/com/opensoc/pcap/OpenSocEthernetDecoder.java b/opensoc-streaming/OpenSOC-Common/src/main/java/com/opensoc/pcap/OpenSocEthernetDecoder.java
new file mode 100644
index 0000000..2dee341
--- /dev/null
+++ b/opensoc-streaming/OpenSOC-Common/src/main/java/com/opensoc/pcap/OpenSocEthernetDecoder.java
@@ -0,0 +1,117 @@
+package com.opensoc.pcap;
+
+import java.util.BitSet;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CopyOnWriteArraySet;
+
+import org.krakenapps.pcap.decoder.ethernet.EthernetDecoder;
+import org.krakenapps.pcap.decoder.ethernet.EthernetFrame;
+import org.krakenapps.pcap.decoder.ethernet.EthernetProcessor;
+import org.krakenapps.pcap.decoder.ethernet.MacAddress;
+import org.krakenapps.pcap.packet.PcapPacket;
+import org.krakenapps.pcap.util.Buffer;
+
+public class OpenSocEthernetDecoder extends EthernetDecoder {
+
+  private Set<EthernetProcessor> callbacks;
+  private Map<Integer, Set<EthernetProcessor>> typeCallbacks;
+
+  public OpenSocEthernetDecoder() {
+    callbacks = new CopyOnWriteArraySet<EthernetProcessor>();
+    typeCallbacks = new ConcurrentHashMap<Integer, Set<EthernetProcessor>>();
+  }
+
+  public void register(EthernetProcessor processor) {
+    this.callbacks.add(processor);
+  }
+
+  public void register(int type, EthernetProcessor processor) {
+    Set<EthernetProcessor> processors = typeCallbacks.get(type);
+    if (processors == null) {
+      processors = new HashSet<EthernetProcessor>();
+      typeCallbacks.put(type, processors);
+    }
+
+    processors.add(processor);
+  }
+
+  public void unregister(EthernetProcessor processor) {
+    this.callbacks.remove(processor);
+  }
+
+  public void unregister(int type, EthernetProcessor processor) {
+    Set<EthernetProcessor> processors = typeCallbacks.get(type);
+    if (processors == null)
+      return;
+
+    processors.remove(processor);
+  }
+
+  public void decode(PcapPacket packet) {
+    // do not reorder following codes (parse sequence)
+    MacAddress destination = getMacAddress(packet.getPacketData());
+    MacAddress source = getMacAddress(packet.getPacketData());
+    int type = getEtherType(packet.getPacketData());
+
+    if (type == 0x8100) {
+      // It is 802.1Q VLAN tag
+      IEEE_802_1Q iee802_1qTag = get802_1qTag(packet.getPacketData());
+      // Now get the type
+      type = getEtherType(packet.getPacketData());
+    }
+
+    Buffer buffer = packet.getPacketData();
+    buffer.discardReadBytes();
+
+    EthernetFrame frame = new EthernetFrame(source, destination, type, buffer);
+    frame.setPcapPacket(packet);
+    dispatch(frame);
+  }
+
+  private MacAddress getMacAddress(Buffer data) {
+    byte[] mac = new byte[6];
+    data.gets(mac, 0, 6);
+    return new MacAddress(mac);
+  }
+
+  private int getEtherType(Buffer data) {
+    return ((int) data.getShort()) & 0x0000FFFF;
+  }
+
+  private IEEE_802_1Q get802_1qTag(Buffer data) {
+
+    // reference http://en.wikipedia.org/wiki/EtherType &
+    // http://en.wikipedia.org/wiki/IEEE_802.1Q
+    byte[] b802_1qTag = new byte[2];
+    data.gets(b802_1qTag, 0, 2);
+    BitSet bits = BitSet.valueOf(b802_1qTag);
+    int pcp = convertBitToInt(bits.get(0, 3));
+    int dei = convertBitToInt(bits.get(3, 4));
+    int vid = convertBitToInt(bits.get(4, 16));
+
+    return new IEEE_802_1Q(pcp, dei, vid);
+  }
+
+  public static int convertBitToInt(BitSet bits) {
+    int value = 0;
+    for (int i = 0; i < bits.length(); ++i) {
+      value += bits.get(i) ? (1 << i) : 0;
+    }
+    return value;
+  }
+
+  private void dispatch(EthernetFrame frame) {
+    for (EthernetProcessor processor : callbacks)
+      processor.process(frame);
+
+    Set<EthernetProcessor> processors = typeCallbacks.get(frame.getType());
+    if (processors == null)
+      return;
+
+    for (EthernetProcessor processor : processors)
+      processor.process(frame.dup());
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/05e188ba/opensoc-streaming/OpenSOC-Common/src/main/java/com/opensoc/pcap/PacketInfo.java
----------------------------------------------------------------------
diff --git a/opensoc-streaming/OpenSOC-Common/src/main/java/com/opensoc/pcap/PacketInfo.java b/opensoc-streaming/OpenSOC-Common/src/main/java/com/opensoc/pcap/PacketInfo.java
new file mode 100644
index 0000000..151e3d3
--- /dev/null
+++ b/opensoc-streaming/OpenSOC-Common/src/main/java/com/opensoc/pcap/PacketInfo.java
@@ -0,0 +1,401 @@
+package com.opensoc.pcap;
+
+import java.text.MessageFormat;
+
+import org.krakenapps.pcap.decoder.ip.Ipv4Packet;
+import org.krakenapps.pcap.decoder.tcp.TcpPacket;
+import org.krakenapps.pcap.decoder.udp.UdpPacket;
+import org.krakenapps.pcap.file.GlobalHeader;
+import org.krakenapps.pcap.packet.PacketHeader;
+import org.krakenapps.pcap.packet.PcapPacket;
+
+/**
+ * The Class PacketInfo.
+ * 
+ * @author sheetal
+ * @version $Revision: 1.0 $
+ */
+public class PacketInfo {
+
+  /** The packetHeader. */
+  private PacketHeader packetHeader = null;
+
+  /** The packet. */
+  private PcapPacket packet = null;
+
+  /** The ipv4 packet. */
+  private Ipv4Packet ipv4Packet = null;
+
+  /** The tcp packet. */
+  private TcpPacket tcpPacket = null;
+
+  /** The udp packet. */
+  private UdpPacket udpPacket = null;
+
+  /** The global header. */
+  private GlobalHeader globalHeader = null;
+
+  /** The Constant globalHeaderJsonTemplateSB. */
+  private static final StringBuffer globalHeaderJsonTemplateSB = new StringBuffer();
+
+  /** The Constant ipv4HeaderJsonTemplateSB. */
+  private static final StringBuffer ipv4HeaderJsonTemplateSB = new StringBuffer();
+
+  /** The Constant tcpHeaderJsonTemplateSB. */
+  private static final StringBuffer tcpHeaderJsonTemplateSB = new StringBuffer();
+
+  /** The Constant udpHeaderJsonTemplateSB. */
+  private static final StringBuffer udpHeaderJsonTemplateSB = new StringBuffer();
+
+  static {
+    globalHeaderJsonTemplateSB.append("<\"global_header\":<\"pcap_id\":\"").append("{0}").append('"');
+    globalHeaderJsonTemplateSB.append(",\"inc_len\":").append("{1}");
+    globalHeaderJsonTemplateSB.append(",\"orig_len\":").append("{2}");
+    globalHeaderJsonTemplateSB.append(",\"ts_sec\":").append("{3}");
+    globalHeaderJsonTemplateSB.append(",\"ts_usec\":").append("{4}");
+    globalHeaderJsonTemplateSB.append(">,"); // NOPMD by sheetal on 1/29/14 2:37
+    // PM
+
+    // ipv4 header
+
+    ipv4HeaderJsonTemplateSB.append("\"ipv4_header\":");
+
+    ipv4HeaderJsonTemplateSB.append("\"ip_dst\":").append("{0}");
+    ipv4HeaderJsonTemplateSB.append(",\"ip_dst_addr\":\"").append("{1}");
+    ipv4HeaderJsonTemplateSB.append("\",\"ip_flags\":").append("{2}");
+    ipv4HeaderJsonTemplateSB.append(",\"ip_fragment_offset\":").append("{3}");
+    ipv4HeaderJsonTemplateSB.append(",\"ip_header_checksum\":").append("{4}");
+    ipv4HeaderJsonTemplateSB.append(",\"ip_id\":").append("{5}");
+    ipv4HeaderJsonTemplateSB.append(",\"ip_header_length\":").append("{6}");
+    ipv4HeaderJsonTemplateSB.append(",\"ip_protocol\":").append("{7}");
+    ipv4HeaderJsonTemplateSB.append(",\"ip_src\":").append("{8}");
+    ipv4HeaderJsonTemplateSB.append(",\"ip_src_addr\":\"").append("{9}");
+    ipv4HeaderJsonTemplateSB.append("\",\"ip_tos\":").append("{10}");
+    ipv4HeaderJsonTemplateSB.append(",\"ip_total_length\":").append("{11}");
+    ipv4HeaderJsonTemplateSB.append(",\"ip_ttl\":").append("{12}");
+    ipv4HeaderJsonTemplateSB.append(",\"ip_version\":").append("{13}");
+    ipv4HeaderJsonTemplateSB.append('>');
+
+    // tcp header
+    tcpHeaderJsonTemplateSB.append(",\"tcp_header\":<\"ack\":").append("{0}");
+    tcpHeaderJsonTemplateSB.append(",\"checksum\":").append("{1}");
+    tcpHeaderJsonTemplateSB.append(",\"data_length\":").append("{2}");
+    tcpHeaderJsonTemplateSB.append(",\"data_offset\":").append("{3}");
+    tcpHeaderJsonTemplateSB.append(",\"dst_addr\":\"").append("{4}");
+    tcpHeaderJsonTemplateSB.append("\",\"dst_port\":").append("{5}");
+    tcpHeaderJsonTemplateSB.append(",\"direction\":").append("{6}");
+    tcpHeaderJsonTemplateSB.append(",\"flags\":").append("{7}");
+    tcpHeaderJsonTemplateSB.append(",\"reassembled_length \":").append("{8}");
+    tcpHeaderJsonTemplateSB.append(",\"relative_ack\":").append("{9}");
+    tcpHeaderJsonTemplateSB.append(",\"relative_seq\":").append("{10}");
+    tcpHeaderJsonTemplateSB.append(",\"seq\":").append("{11}");
+    tcpHeaderJsonTemplateSB.append(",\"session_key\":\"").append("{12}");
+    tcpHeaderJsonTemplateSB.append("\",\"src_addr\":\"").append("{13}");
+    tcpHeaderJsonTemplateSB.append("\",\"src_port\":").append("{14}");
+    tcpHeaderJsonTemplateSB.append(",\"total_length\":").append("{15}");
+    tcpHeaderJsonTemplateSB.append(",\"urgent_pointer\":").append("{16}");
+    tcpHeaderJsonTemplateSB.append(",\"window\":").append("{17}");
+    tcpHeaderJsonTemplateSB.append(">>");
+
+    // udp headers
+    udpHeaderJsonTemplateSB.append(",\"udp_header\":<\"checksum\":").append("{0}");
+    udpHeaderJsonTemplateSB.append(",\"dst_port\":").append("{1}");
+    udpHeaderJsonTemplateSB.append(",\"length\":").append("{2}");
+    udpHeaderJsonTemplateSB.append(",\"src_port\":").append("{3}");
+    udpHeaderJsonTemplateSB.append(",\"dst_addr\":\"").append("{4}");
+    udpHeaderJsonTemplateSB.append("\",\"src_addr\":\"").append("{5}").append('"');
+    tcpHeaderJsonTemplateSB.append(">>");
+
+  }
+
+  /** The Constant globalHeaderJsonTemplateString. */
+  private static final String globalHeaderJsonTemplateString = globalHeaderJsonTemplateSB.toString();
+
+  /** The Constant ipv4HeaderJsonTemplateString. */
+  private static final String ipv4HeaderJsonTemplateString = ipv4HeaderJsonTemplateSB.toString();
+
+  /** The Constant tcpHeaderJsonTemplateString. */
+  private static final String tcpHeaderJsonTemplateString = tcpHeaderJsonTemplateSB.toString();
+
+  /** The Constant udpHeaderJsonTemplateString. */
+  private static final String udpHeaderJsonTemplateString = udpHeaderJsonTemplateSB.toString();
+
+  /**
+   * Instantiates a new packet info.
+   * 
+   * @param globalHeader
+   *          the global header
+   * @param packetHeader
+   *          the packet header
+   * @param packet
+   *          the packet
+   * @param ipv4Packet
+   *          the ipv4 packet
+   * @param tcpPacket
+   *          the tcp packet
+   * @param udpPacket
+   *          the udp packet
+   */
+  public PacketInfo(GlobalHeader globalHeader, PacketHeader packetHeader, PcapPacket packet, Ipv4Packet ipv4Packet, TcpPacket tcpPacket,
+      UdpPacket udpPacket) {
+    this.packetHeader = packetHeader;
+    this.packet = packet;
+    this.ipv4Packet = ipv4Packet;
+    this.tcpPacket = tcpPacket;
+    this.udpPacket = udpPacket;
+    this.globalHeader = globalHeader;
+  }
+
+  /**
+   * Gets the global header.
+   * 
+   * @return the global header
+   */
+  public GlobalHeader getGlobalHeader() {
+    return globalHeader;
+  }
+
+  /**
+   * Gets the packet header.
+   * 
+   * 
+   * @return the packet header
+   */
+  public PacketHeader getPacketHeader() {
+    return packetHeader;
+  }
+
+  /**
+   * Gets the packet.
+   * 
+   * 
+   * @return the packet
+   */
+  public PcapPacket getPacket() {
+    return packet;
+  }
+
+  /**
+   * Gets the ipv4 packet.
+   * 
+   * 
+   * @return the ipv4 packet
+   */
+  public Ipv4Packet getIpv4Packet() {
+    return ipv4Packet;
+  }
+
+  /**
+   * Gets the tcp packet.
+   * 
+   * 
+   * @return the tcp packet
+   */
+  public TcpPacket getTcpPacket() {
+    return tcpPacket;
+  }
+
+  /**
+   * Gets the udp packet.
+   * 
+   * 
+   * @return the udp packet
+   */
+  public UdpPacket getUdpPacket() {
+    return udpPacket;
+  }
+
+  /**
+   * Gets the key.
+   * 
+   * 
+   * @return the key
+   */
+  public String getKey() {
+    int sourcePort = 0;
+    int destinationPort = 0;
+    if (Constants.PROTOCOL_UDP == ipv4Packet.getProtocol()) {
+      sourcePort = udpPacket.getSourcePort();
+
+      destinationPort = udpPacket.getDestinationPort();
+
+    } else if (Constants.PROTOCOL_TCP == ipv4Packet.getProtocol()) {
+      sourcePort = tcpPacket.getSourcePort();
+
+      destinationPort = tcpPacket.getDestinationPort();
+
+    }
+
+    return PcapUtils.getSessionKey(ipv4Packet.getSourceAddress().getHostAddress(), ipv4Packet.getDestinationAddress().getHostAddress(),
+        ipv4Packet.getProtocol(), sourcePort, destinationPort, ipv4Packet.getId(), ipv4Packet.getFragmentOffset());
+
+  }
+
+  /**
+   * Gets the json doc.
+   * 
+   * 
+   * @return the json doc
+   */
+  public String getJsonDoc() {
+
+    return getJsonDocUsingSBAppend();
+  }
+
+  /**
+   * Gets the json doc.
+   * 
+   * 
+   * @return the json doc
+   */
+  public String getJsonIndexDoc() {
+
+    return getJsonIndexDocUsingSBAppend();
+  }
+
+  /**
+   * Gets the json doc using sb append.
+   * 
+   * @return the json doc using sb append
+   */
+  private String getJsonDocUsingSBAppend() {
+
+    StringBuffer jsonSb = new StringBuffer(1024);
+
+    // global header
+    jsonSb.append("{\"global_header\":{\"pcap_id\":\"").append(getKey());
+    jsonSb.append("\",\"inc_len\":").append(packetHeader.getInclLen());
+    jsonSb.append(",\"orig_len\":").append(packetHeader.getOrigLen());
+    jsonSb.append(",\"ts_sec\":").append(packetHeader.getTsSec());
+    jsonSb.append(",\"ts_usec\":").append(packetHeader.getTsUsec());
+    jsonSb.append("},"); // NOPMD by sheetal on 1/29/14 2:37 PM
+
+    // ipv4 header
+
+    jsonSb.append("\"ipv4_header\":{");
+
+    jsonSb.append("\"ip_dst\":").append(ipv4Packet.getDestination());
+    jsonSb.append(",\"ip_dst_addr\":\"").append(ipv4Packet.getDestinationAddress().getHostAddress());
+    jsonSb.append("\",\"ip_flags\":").append(ipv4Packet.getFlags());
+    jsonSb.append(",\"ip_fragment_offset\":").append(ipv4Packet.getFragmentOffset());
+    jsonSb.append(",\"ip_header_checksum\":").append(ipv4Packet.getHeaderChecksum());
+    jsonSb.append(",\"ip_id\":").append(ipv4Packet.getId());
+    jsonSb.append(",\"ip_header_length\":").append(ipv4Packet.getIhl());
+    jsonSb.append(",\"ip_protocol\":").append(ipv4Packet.getProtocol());
+    jsonSb.append(",\"ip_src\":").append(ipv4Packet.getSource());
+    jsonSb.append(",\"ip_src_addr\":\"").append(ipv4Packet.getSourceAddress().getHostAddress());
+    jsonSb.append("\",\"ip_tos\":").append(ipv4Packet.getTos());
+    jsonSb.append(",\"ip_total_length\":").append(ipv4Packet.getTotalLength());
+    jsonSb.append(",\"ip_ttl\":").append(ipv4Packet.getTtl());
+    jsonSb.append(",\"ip_version\":").append(ipv4Packet.getVersion());
+    jsonSb.append('}');
+
+    // tcp header
+    if (tcpPacket != null) {
+      jsonSb.append(",\"tcp_header\":{\"ack\":").append(tcpPacket.getAck());
+      jsonSb.append(",\"checksum\":").append(tcpPacket.getChecksum());
+      jsonSb.append(",\"data_length\":").append(tcpPacket.getDataLength());
+      jsonSb.append(",\"data_offset\":").append(tcpPacket.getDataOffset());
+      jsonSb.append(",\"dst_addr\":\"").append(tcpPacket.getDestinationAddress().getHostAddress());
+      jsonSb.append("\",\"dst_port\":").append(tcpPacket.getDestinationPort());
+      jsonSb.append(",\"direction\":").append(tcpPacket.getDirection());
+      jsonSb.append(",\"flags\":").append(tcpPacket.getFlags());
+      jsonSb.append(",\"reassembled_length \":").append(tcpPacket.getReassembledLength());
+      jsonSb.append(",\"relative_ack\":").append(tcpPacket.getRelativeAck());
+      jsonSb.append(",\"relative_seq\":").append(tcpPacket.getRelativeSeq());
+      jsonSb.append(",\"seq\":").append(tcpPacket.getSeq());
+      jsonSb.append(",\"session_key\":\"").append(tcpPacket.getSessionKey());
+      jsonSb.append("\",\"src_addr\":\"").append(tcpPacket.getSourceAddress().getHostAddress());
+      jsonSb.append("\",\"src_port\":").append(tcpPacket.getSourcePort());
+      jsonSb.append(",\"total_length\":").append(tcpPacket.getTotalLength());
+      jsonSb.append(",\"urgent_pointer\":").append(tcpPacket.getUrgentPointer());
+      jsonSb.append(",\"window\":").append(tcpPacket.getWindow());
+      jsonSb.append('}');
+    }
+
+    // udp headers
+    if (udpPacket != null) {
+      jsonSb.append(",\"udp_header\":{\"checksum\":").append(udpPacket.getChecksum());
+      jsonSb.append(",\"dst_port\":").append(udpPacket.getDestinationPort());
+      jsonSb.append(",\"length\":").append(udpPacket.getLength());
+      jsonSb.append(",\"src_port\":").append(udpPacket.getSourcePort());
+      jsonSb.append(",\"dst_addr\":\"").append(udpPacket.getDestination().getAddress().getHostAddress());
+      jsonSb.append("\",\"src_addr\":\"").append(udpPacket.getSource().getAddress().getHostAddress());
+      jsonSb.append("\"}");
+    }
+
+    jsonSb.append('}');
+
+    return jsonSb.toString();
+  }
+
+  /**
+   * Gets the json doc using message format.
+   * 
+   * @return the json doc using message format
+   */
+  private String getJsonDocUsingMessageFormat() {
+
+    StringBuffer jsonSb = new StringBuffer(600);
+
+    jsonSb.append(MessageFormat.format(globalHeaderJsonTemplateString, getKey(), packetHeader.getInclLen(), packetHeader.getOrigLen(),
+        packetHeader.getTsSec(), packetHeader.getTsUsec()));
+
+    jsonSb.append(MessageFormat.format(ipv4HeaderJsonTemplateString, ipv4Packet.getDestination(), ipv4Packet.getDestinationAddress()
+        .getHostAddress(), ipv4Packet.getFlags(), ipv4Packet.getFragmentOffset(), ipv4Packet.getHeaderChecksum(), ipv4Packet.getId(),
+        ipv4Packet.getIhl(), ipv4Packet.getProtocol(), ipv4Packet.getSource(), ipv4Packet.getSourceAddress().getHostAddress(), ipv4Packet
+            .getTos(), ipv4Packet.getTotalLength(), ipv4Packet.getTtl(), ipv4Packet.getVersion()));
+
+    // tcp header
+    if (tcpPacket != null) {
+      jsonSb.append(MessageFormat.format(tcpHeaderJsonTemplateString, tcpPacket.getAck(), tcpPacket.getChecksum(), tcpPacket
+          .getDataLength(), tcpPacket.getDataOffset(), tcpPacket.getDestinationAddress().getHostAddress(), tcpPacket.getDestinationPort(),
+          tcpPacket.getDirection(), tcpPacket.getFlags(), tcpPacket.getReassembledLength(), tcpPacket.getRelativeAck(), tcpPacket
+              .getRelativeSeq(), tcpPacket.getSeq(), tcpPacket.getSessionKey(), tcpPacket.getSourceAddress().getHostAddress(), tcpPacket
+              .getSourcePort(), tcpPacket.getTotalLength(), tcpPacket.getUrgentPointer(), tcpPacket.getWindow()));
+    } else
+    // udp headers
+    if (udpPacket != null) {
+      jsonSb.append(MessageFormat.format(udpHeaderJsonTemplateString, udpPacket.getChecksum(), udpPacket.getDestinationPort(),
+          udpPacket.getLength(), udpPacket.getSourcePort(), udpPacket.getDestination().getAddress().getHostAddress(), udpPacket.getSource()
+              .getAddress().getHostAddress()));
+
+    } else {
+      jsonSb.append('}');
+    }
+    return jsonSb.toString().replace('<', '{').replace('>', '}');
+  }
+
+  /**
+   * Gets the json index doc using sb append.
+   * 
+   * @return the json index doc using sb append
+   */
+  private String getJsonIndexDocUsingSBAppend() {
+
+    StringBuffer jsonSb = new StringBuffer(175);
+
+    jsonSb.append("{\"pcap_id\":\"").append(getKey());
+    jsonSb.append("\",\"ip_protocol\":").append(ipv4Packet.getProtocol());
+
+    // tcp header
+    if (tcpPacket != null) {
+      jsonSb.append(",\"src_addr\":\"").append(tcpPacket.getSourceAddress().getHostAddress());
+      jsonSb.append("\",\"src_port\":").append(tcpPacket.getSourcePort());
+      jsonSb.append(",\"dst_addr\":\"").append(tcpPacket.getDestinationAddress().getHostAddress());
+      jsonSb.append("\",\"dst_port\":").append(tcpPacket.getDestinationPort());
+    }
+
+    // udp headers
+    if (udpPacket != null) {
+      jsonSb.append(",\"src_addr\":\"").append(udpPacket.getSource().getAddress().getHostAddress());
+      jsonSb.append("\",\"src_port\":").append(udpPacket.getSourcePort());
+      jsonSb.append(",\"dst_addr\":\"").append(udpPacket.getDestination().getAddress().getHostAddress());
+      jsonSb.append("\",\"dst_port\":").append(udpPacket.getDestinationPort());
+    }
+
+    jsonSb.append('}');
+
+    return jsonSb.toString();
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/05e188ba/opensoc-streaming/OpenSOC-Common/src/main/java/com/opensoc/pcap/PcapByteInputStream.java
----------------------------------------------------------------------
diff --git a/opensoc-streaming/OpenSOC-Common/src/main/java/com/opensoc/pcap/PcapByteInputStream.java b/opensoc-streaming/OpenSOC-Common/src/main/java/com/opensoc/pcap/PcapByteInputStream.java
new file mode 100644
index 0000000..44f3597
--- /dev/null
+++ b/opensoc-streaming/OpenSOC-Common/src/main/java/com/opensoc/pcap/PcapByteInputStream.java
@@ -0,0 +1,168 @@
+package com.opensoc.pcap;
+
+import java.io.ByteArrayInputStream;
+import java.io.DataInputStream;
+import java.io.IOException;
+
+import org.krakenapps.pcap.PcapInputStream;
+import org.krakenapps.pcap.file.GlobalHeader;
+import org.krakenapps.pcap.packet.PacketHeader;
+import org.krakenapps.pcap.packet.PcapPacket;
+import org.krakenapps.pcap.util.Buffer;
+import org.krakenapps.pcap.util.ByteOrderConverter;
+import org.krakenapps.pcap.util.ChainBuffer;
+
+/**
+ * The Class PcapByteInputStream.
+ * 
+ * @author sheetal
+ * @version $Revision: 1.0 $
+ */
+public class PcapByteInputStream implements PcapInputStream {
+
+  /** The is. */
+  private DataInputStream is;
+
+  /** The global header. */
+  private GlobalHeader globalHeader;
+
+  /**
+   * Opens pcap file input stream.
+   * 
+   * @param pcap
+   *          the byte array to be read
+   * 
+   * @throws IOException
+   *           Signals that an I/O exception has occurred.
+   */
+  public PcapByteInputStream(byte[] pcap) throws IOException {
+    is = new DataInputStream(new ByteArrayInputStream(pcap)); // $codepro.audit.disable
+                                                              // closeWhereCreated
+    readGlobalHeader();
+  }
+
+  /**
+   * Reads a packet from pcap byte array.
+   * 
+   * @return the packet throws IOException the stream has been closed and the
+   *         contained input stream does not support reading after close, or
+   *         another I/O error occurs. * @throws IOException Signals that an I/O
+   *         exception has occurred. * @see
+   *         org.krakenapps.pcap.PcapInputStream#getPacket()
+   * @throws IOException
+   *           Signals that an I/O exception has occurred.
+   */
+
+  public PcapPacket getPacket() throws IOException {
+    return readPacket(globalHeader.getMagicNumber());
+  }
+
+  /**
+   * Gets the global header.
+   * 
+   * 
+   * @return the global header
+   */
+  public GlobalHeader getGlobalHeader() {
+    return globalHeader;
+  }
+
+  /**
+   * Read global header.
+   * 
+   * 
+   * @throws IOException
+   *           Signals that an I/O exception has occurred.
+   */
+  private void readGlobalHeader() throws IOException {
+    int magic = is.readInt();
+    short major = is.readShort();
+    short minor = is.readShort();
+    int tz = is.readInt();
+    int sigfigs = is.readInt();
+    int snaplen = is.readInt();
+    int network = is.readInt();
+
+    globalHeader = new GlobalHeader(magic, major, minor, tz, sigfigs, snaplen,
+        network);
+
+    if (globalHeader.getMagicNumber() == 0xD4C3B2A1) {
+      globalHeader.swapByteOrder();
+    }
+  }
+
+  /**
+   * Read packet.
+   * 
+   * @param magicNumber
+   *          the magic number
+   * @return the pcap packet * @throws IOException Signals that an I/O exception
+   *         has occurred. * @throws EOFException the EOF exception
+   * @throws IOException
+   *           Signals that an I/O exception has occurred.
+   */
+  private PcapPacket readPacket(int magicNumber) throws IOException {
+    PacketHeader packetHeader = readPacketHeader(magicNumber);
+    Buffer packetData = readPacketData(packetHeader.getInclLen());
+    return new PcapPacket(packetHeader, packetData);
+  }
+
+  /**
+   * Read packet header.
+   * 
+   * @param magicNumber
+   *          the magic number
+   * @return the packet header * @throws IOException Signals that an I/O
+   *         exception has occurred. * @throws EOFException the EOF exception
+   * @throws IOException
+   *           Signals that an I/O exception has occurred.
+   */
+  private PacketHeader readPacketHeader(int magicNumber) throws IOException {
+    int tsSec = is.readInt();
+    int tsUsec = is.readInt();
+    int inclLen = is.readInt();
+    int origLen = is.readInt();
+
+    if (magicNumber == 0xD4C3B2A1) {
+      tsSec = ByteOrderConverter.swap(tsSec);
+      tsUsec = ByteOrderConverter.swap(tsUsec);
+      inclLen = ByteOrderConverter.swap(inclLen);
+      origLen = ByteOrderConverter.swap(origLen);
+    }
+
+    return new PacketHeader(tsSec, tsUsec, inclLen, origLen);
+  }
+
+  /**
+   * Read packet data.
+   * 
+   * @param packetLength
+   *          the packet length
+   * @return the buffer * @throws IOException Signals that an I/O exception has
+   *         occurred.
+   * @throws IOException
+   *           Signals that an I/O exception has occurred.
+   */
+  private Buffer readPacketData(int packetLength) throws IOException {
+    byte[] packets = new byte[packetLength];
+    is.read(packets);
+
+    Buffer payload = new ChainBuffer();
+    payload.addLast(packets);
+    return payload;
+    // return new PacketPayload(packets);
+  }
+
+  /**
+   * Closes pcap stream handle.
+   * 
+   * 
+   * @throws IOException
+   *           Signals that an I/O exception has occurred. * @see
+   *           org.krakenapps.pcap.PcapInputStream#close()
+   */
+
+  public void close() throws IOException {
+    is.close(); // $codepro.audit.disable closeInFinally
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/05e188ba/opensoc-streaming/OpenSOC-Common/src/main/java/com/opensoc/pcap/PcapUtils.java
----------------------------------------------------------------------
diff --git a/opensoc-streaming/OpenSOC-Common/src/main/java/com/opensoc/pcap/PcapUtils.java b/opensoc-streaming/OpenSOC-Common/src/main/java/com/opensoc/pcap/PcapUtils.java
new file mode 100644
index 0000000..8d06caa
--- /dev/null
+++ b/opensoc-streaming/OpenSOC-Common/src/main/java/com/opensoc/pcap/PcapUtils.java
@@ -0,0 +1,394 @@
+package com.opensoc.pcap;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Map.Entry;
+
+import org.apache.commons.lang.StringUtils;
+
+import com.google.common.collect.BiMap;
+import com.google.common.collect.HashBiMap;
+
+/**
+ * The Class PcapUtils.
+ */
+public class PcapUtils {
+
+  /** The Constant SESSION_KEY_SEPERATOR. */
+  private static final char SESSION_KEY_SEPERATOR = '-';
+
+  /** The Constant protocolIdToNameMap. */
+  private static final BiMap<Integer, String> protocolIdToNameMap = HashBiMap
+      .create();
+
+  // private static final Map<Integer, String> protocolIdToNameMap = new
+  // HashMap();
+
+  static {
+
+    protocolIdToNameMap.put(0, "HOPOPT");
+    protocolIdToNameMap.put(1, "ICMP");
+    protocolIdToNameMap.put(2, "IGMP");
+    protocolIdToNameMap.put(3, "GGP");
+    protocolIdToNameMap.put(4, "IPV4");
+    protocolIdToNameMap.put(5, "ST");
+    protocolIdToNameMap.put(6, "TCP");
+    protocolIdToNameMap.put(7, "CBT");
+    protocolIdToNameMap.put(8, "EGP");
+    protocolIdToNameMap.put(9, "IGP");
+    protocolIdToNameMap.put(10, "BBN-RCC-MON");
+    protocolIdToNameMap.put(11, "NVP-II");
+    protocolIdToNameMap.put(12, "PUP");
+    protocolIdToNameMap.put(13, "ARGUS");
+    protocolIdToNameMap.put(14, "EMCON");
+    protocolIdToNameMap.put(15, "XNET");
+    protocolIdToNameMap.put(16, "CHAOS");
+    protocolIdToNameMap.put(17, "UDP");
+    protocolIdToNameMap.put(18, "MUX");
+    protocolIdToNameMap.put(19, "DCN-MEAS");
+    protocolIdToNameMap.put(20, "HMP");
+    protocolIdToNameMap.put(21, "PRM");
+    protocolIdToNameMap.put(22, "XNS-IDP");
+    protocolIdToNameMap.put(23, "TRUNK-1");
+    protocolIdToNameMap.put(24, "TRUNK-2");
+    protocolIdToNameMap.put(25, "LEAF-1");
+    protocolIdToNameMap.put(26, "LEAF-2");
+    protocolIdToNameMap.put(27, "RDP");
+    protocolIdToNameMap.put(28, "IRTP");
+    protocolIdToNameMap.put(29, "ISO-TP4");
+    protocolIdToNameMap.put(30, "NETBLT");
+    protocolIdToNameMap.put(31, "MFE-NSP");
+    protocolIdToNameMap.put(32, "MERIT-INP");
+    protocolIdToNameMap.put(33, "DCCP");
+    protocolIdToNameMap.put(34, "3PC");
+    protocolIdToNameMap.put(35, "IDPR");
+    protocolIdToNameMap.put(36, "XTP");
+    protocolIdToNameMap.put(37, "DDP");
+    protocolIdToNameMap.put(38, "IDPR-CMTP");
+    protocolIdToNameMap.put(39, "TP++");
+    protocolIdToNameMap.put(40, "IL");
+    protocolIdToNameMap.put(41, "IPV6");
+    protocolIdToNameMap.put(42, "SDRP");
+    protocolIdToNameMap.put(43, "IPV6-ROUTE");
+    protocolIdToNameMap.put(44, "IPV6-FRAG");
+    protocolIdToNameMap.put(45, "IDRP");
+    protocolIdToNameMap.put(46, "RSVP");
+    protocolIdToNameMap.put(47, "GRE");
+    protocolIdToNameMap.put(48, "DSR");
+    protocolIdToNameMap.put(49, "BNA");
+    protocolIdToNameMap.put(50, "ESP");
+    protocolIdToNameMap.put(51, "AH");
+    protocolIdToNameMap.put(52, "I-NLSP");
+    protocolIdToNameMap.put(53, "SWIPE");
+    protocolIdToNameMap.put(54, "NARP");
+    protocolIdToNameMap.put(55, "MOBILE");
+    protocolIdToNameMap.put(56, "TLSP");
+    protocolIdToNameMap.put(57, "SKIP");
+    protocolIdToNameMap.put(58, "IPV6-ICMP");
+    protocolIdToNameMap.put(59, "IPV6-NONXT");
+    protocolIdToNameMap.put(60, "IPV6-OPTS");
+    protocolIdToNameMap.put(62, "CFTP");
+    protocolIdToNameMap.put(64, "SAT-EXPAK");
+    protocolIdToNameMap.put(65, "KRYPTOLAN");
+    protocolIdToNameMap.put(66, "RVD");
+    protocolIdToNameMap.put(67, "IPPC");
+    protocolIdToNameMap.put(69, "SAT-MON");
+    protocolIdToNameMap.put(70, "VISA");
+    protocolIdToNameMap.put(71, "IPCV");
+    protocolIdToNameMap.put(72, "CPNX");
+    protocolIdToNameMap.put(73, "CPHB");
+    protocolIdToNameMap.put(74, "WSN");
+    protocolIdToNameMap.put(75, "PVP");
+    protocolIdToNameMap.put(76, "BR-SAT-MON");
+    protocolIdToNameMap.put(77, "SUN-ND");
+    protocolIdToNameMap.put(78, "WB-MON");
+    protocolIdToNameMap.put(79, "WB-EXPAK");
+    protocolIdToNameMap.put(80, "ISO-IP");
+    protocolIdToNameMap.put(81, "VMTP");
+    protocolIdToNameMap.put(82, "SECURE-VMTP");
+    protocolIdToNameMap.put(83, "VINES");
+    protocolIdToNameMap.put(84, "TTP");
+    protocolIdToNameMap.put(85, "NSFNET-IGP");
+    protocolIdToNameMap.put(86, "DGP");
+    protocolIdToNameMap.put(87, "TCF");
+    protocolIdToNameMap.put(88, "EIGRP");
+    protocolIdToNameMap.put(89, "OSPFIGP");
+    protocolIdToNameMap.put(90, "SPRITE-RPC");
+    protocolIdToNameMap.put(91, "LARP");
+    protocolIdToNameMap.put(92, "MTP");
+    protocolIdToNameMap.put(93, "AX.25");
+    protocolIdToNameMap.put(94, "IPIP");
+    protocolIdToNameMap.put(95, "MICP");
+    protocolIdToNameMap.put(96, "SCC-SP");
+    protocolIdToNameMap.put(97, "ETHERIP");
+    protocolIdToNameMap.put(98, "ENCAP");
+    protocolIdToNameMap.put(100, "GMTP");
+    protocolIdToNameMap.put(101, "IFMP");
+    protocolIdToNameMap.put(102, "PNNI");
+    protocolIdToNameMap.put(103, "PIM");
+    protocolIdToNameMap.put(104, "ARIS");
+    protocolIdToNameMap.put(105, "SCPS");
+    protocolIdToNameMap.put(106, "QNX");
+    protocolIdToNameMap.put(107, "A/N");
+    protocolIdToNameMap.put(108, "IPCOMP");
+    protocolIdToNameMap.put(109, "SNP");
+    protocolIdToNameMap.put(110, "COMPAQ-PEER");
+    protocolIdToNameMap.put(111, "IPX-IN-IP");
+    protocolIdToNameMap.put(112, "VRRP");
+    protocolIdToNameMap.put(113, "PGM");
+    protocolIdToNameMap.put(115, "L2TP");
+    protocolIdToNameMap.put(116, "DDX");
+    protocolIdToNameMap.put(117, "IATP");
+    protocolIdToNameMap.put(118, "STP");
+    protocolIdToNameMap.put(119, "SRP");
+    protocolIdToNameMap.put(120, "UTI");
+    protocolIdToNameMap.put(121, "SMP");
+    protocolIdToNameMap.put(122, "SM");
+    protocolIdToNameMap.put(123, "PTP");
+    protocolIdToNameMap.put(124, "ISIS OVER IPV4");
+    protocolIdToNameMap.put(125, "FIRE");
+    protocolIdToNameMap.put(126, "CRTP");
+    protocolIdToNameMap.put(127, "CRUDP");
+    protocolIdToNameMap.put(128, "SSCOPMCE");
+    protocolIdToNameMap.put(129, "IPLT");
+    protocolIdToNameMap.put(130, "SPS");
+    protocolIdToNameMap.put(131, "PIPE");
+    protocolIdToNameMap.put(132, "SCTP");
+    protocolIdToNameMap.put(133, "FC");
+    protocolIdToNameMap.put(134, "RSVP-E2E-IGNORE");
+    protocolIdToNameMap.put(135, "MOBILITY HEADER");
+    protocolIdToNameMap.put(136, "UDPLITE");
+    protocolIdToNameMap.put(137, "MPLS-IN-IP");
+    protocolIdToNameMap.put(138, "MANET");
+    protocolIdToNameMap.put(139, "HIP");
+    protocolIdToNameMap.put(140, "SHIM6");
+    protocolIdToNameMap.put(141, "WESP");
+    protocolIdToNameMap.put(142, "ROHC");
+  }
+
+  /** The Constant protocolNameToIdMap. */
+  private static final BiMap<String, Integer> protocolNameToIdMap = protocolIdToNameMap
+      .inverse();
+
+  // private static final Map<String, Integer> protocolNameToIdMap =
+  // invertMap(protocolIdToNameMap);
+
+  /**
+   * Convert ipv4 ip to hex.
+   * 
+   * @param ipAddress
+   *          the ip address
+   * @return the string
+   */
+  public static String convertIpv4IpToHex(String ipAddress) {
+    StringBuffer hexIp = new StringBuffer(64);
+    String[] ipSegments = ipAddress.split("\\.");
+
+    for (String ipSegment : ipSegments) {
+      hexIp.append(convertIpSegmentToHex(ipSegment));
+    }
+
+    return hexIp.toString();
+
+  }
+
+  /**
+   * Gets the session key.
+   * 
+   * @param srcIp
+   *          the src ip
+   * @param dstIp
+   *          the dst ip
+   * @param protocol
+   *          the protocol
+   * @param srcPort
+   *          the src port
+   * @param dstPort
+   *          the dst port
+   * @return the session key
+   */
+  public static String getSessionKey(String srcIp, String dstIp,
+      String protocol, String srcPort, String dstPort) {
+    return getSessionKey(srcIp, dstIp, protocol, srcPort, dstPort, null, null);
+  }
+
+  /**
+   * Gets the session key.
+   * 
+   * @param srcIp
+   *          the src ip
+   * @param dstIp
+   *          the dst ip
+   * @param protocol
+   *          the protocol
+   * @param srcPort
+   *          the src port
+   * @param dstPort
+   *          the dst port
+   * @param ipId
+   *          the ip id
+   * @param fragmentOffset
+   *          the fragment offset
+   * @return the session key
+   */
+  public static String getSessionKey(String srcIp, String dstIp,
+      String protocol, String srcPort, String dstPort, String ipId,
+      String fragmentOffset) {
+
+    StringBuffer sb = new StringBuffer(40);
+    sb.append(convertIpv4IpToHex(srcIp)).append(SESSION_KEY_SEPERATOR)
+        .append(convertIpv4IpToHex(dstIp)).append(SESSION_KEY_SEPERATOR)
+        .append(protocol == null ? "0" : protocol)
+        .append(SESSION_KEY_SEPERATOR).append(srcPort == null ? "0" : srcPort)
+        .append(SESSION_KEY_SEPERATOR).append(dstPort == null ? "0" : dstPort)
+        .append(SESSION_KEY_SEPERATOR).append(ipId == null ? "0" : ipId)
+        .append(SESSION_KEY_SEPERATOR)
+        .append(fragmentOffset == null ? "0" : fragmentOffset);
+
+    return sb.toString();
+  }
+
+  /**
+   * Gets the session key.
+   * 
+   * @param srcIp
+   *          the src ip
+   * @param dstIp
+   *          the dst ip
+   * @param protocol
+   *          the protocol
+   * @param srcPort
+   *          the src port
+   * @param dstPort
+   *          the dst port
+   * @param ipId
+   *          the ip id
+   * @param fragmentOffset
+   *          the fragment offset
+   * @return the session key
+   */
+  public static String getSessionKey(String srcIp, String dstIp, int protocol,
+      int srcPort, int dstPort, int ipId, int fragmentOffset) {
+    String keySeperator = "-";
+    StringBuffer sb = new StringBuffer(40);
+    sb.append(convertIpv4IpToHex(srcIp)).append(keySeperator)
+        .append(convertIpv4IpToHex(dstIp)).append(keySeperator)
+        .append(protocol).append(keySeperator).append(srcPort)
+        .append(keySeperator).append(dstPort).append(keySeperator).append(ipId)
+        .append(keySeperator).append(fragmentOffset);
+
+    return sb.toString();
+  }
+
+  // public static String convertPortToHex(String portNumber) {
+  // return convertPortToHex(Integer.valueOf(portNumber));
+  //
+  // }
+  //
+  // public static String convertPortToHex(int portNumber) {
+  // return convertToHex(portNumber, 4);
+  //
+  // }
+  //
+  // public static String convertProtocolToHex(String protocol) {
+  // return convertProtocolToHex(Integer.valueOf(protocol));
+  //
+  // }
+  //
+  // public static String convertProtocolToHex(int protocol) {
+  // return convertToHex(protocol, 2);
+  // }
+
+  /**
+   * Convert ip segment to hex.
+   * 
+   * @param ipSegment
+   *          the ip segment
+   * @return the string
+   */
+  public static String convertIpSegmentToHex(String ipSegment) {
+    return convertIpSegmentToHex(Integer.valueOf(ipSegment));
+
+  }
+
+  /**
+   * Convert ip segment to hex.
+   * 
+   * @param ipSegment
+   *          the ip segment
+   * @return the string
+   */
+  public static String convertIpSegmentToHex(int ipSegment) {
+    return convertToHex(ipSegment, 2);
+
+  }
+
+  /**
+   * Convert to hex.
+   * 
+   * @param number
+   *          the number
+   * @param length
+   *          the length
+   * @return the string
+   */
+  public static String convertToHex(int number, int length) {
+    return StringUtils.leftPad(Integer.toHexString(number), length, '0');
+
+  }
+
+  /**
+   * Gets the protocol name.
+   * 
+   * @param protocolNumber
+   *          the protocol number
+   * 
+   * @return the protocol name
+   */
+  public static String getProtocolNameFromId(int protocolNumber) {
+    String protocolName = protocolIdToNameMap.get(protocolNumber);
+
+    if (protocolName == null) {
+      protocolName = String.valueOf(protocolNumber);
+    }
+    return protocolName;
+  }
+
+  /**
+   * Gets the protocol id from name.
+   * 
+   * @param protocolName
+   *          the protocol name
+   * @return the protocol id from name
+   */
+  public static int getProtocolIdFromName(String protocolName) {
+    Integer protocolNumber = protocolNameToIdMap
+        .get(protocolName.toUpperCase());
+
+    if (protocolNumber == null) {
+      protocolNumber = -1;
+    }
+    return protocolNumber;
+  }
+
+  /**
+   * Invert map.
+   * 
+   * @param <V>
+   *          the value type
+   * @param <K>
+   *          the key type
+   * @param map
+   *          the map
+   * @return the map
+   */
+  private static <V, K> Map<V, K> invertMap(Map<K, V> map) {
+
+    Map<V, K> inv = new HashMap<V, K>();
+
+    for (Entry<K, V> entry : map.entrySet())
+      inv.put(entry.getValue(), entry.getKey());
+
+    return inv;
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/05e188ba/opensoc-streaming/OpenSOC-Common/src/main/java/com/opensoc/pcap/asdf.java
----------------------------------------------------------------------
diff --git a/opensoc-streaming/OpenSOC-Common/src/main/java/com/opensoc/pcap/asdf.java b/opensoc-streaming/OpenSOC-Common/src/main/java/com/opensoc/pcap/asdf.java
new file mode 100644
index 0000000..db2c2b2
--- /dev/null
+++ b/opensoc-streaming/OpenSOC-Common/src/main/java/com/opensoc/pcap/asdf.java
@@ -0,0 +1,5 @@
+package com.opensoc.pcap;
+
+public class asdf {
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/05e188ba/opensoc-streaming/OpenSOC-Common/src/main/java/com/opensoc/test/AbstractTestContext.java
----------------------------------------------------------------------
diff --git a/opensoc-streaming/OpenSOC-Common/src/main/java/com/opensoc/test/AbstractTestContext.java b/opensoc-streaming/OpenSOC-Common/src/main/java/com/opensoc/test/AbstractTestContext.java
new file mode 100644
index 0000000..7f7f34a
--- /dev/null
+++ b/opensoc-streaming/OpenSOC-Common/src/main/java/com/opensoc/test/AbstractTestContext.java
@@ -0,0 +1,189 @@
+
+ 
+ /*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.opensoc.test;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Properties;
+
+import junit.framework.TestCase;
+
+ /**
+ * <ul>
+ * <li>Title: </li>
+ * <li>Description: The class <code>AbstractTestContext</code> is
+ * an abstract base class for implementing JUnit tests that need to load a
+ * test properties. The <code>setup</code> method will attempt to
+ * load a properties from a file, located in src/test/resources,
+ * with the same name as the class.</li>
+ * <li>Created: Aug 7, 2014</li>
+ * </ul>
+ * @version $Revision: 1.1 $
+ */
+public class AbstractTestContext  extends TestCase{
+         /**
+         * The testProps.
+         */
+        protected File testPropFile=null;
+
+        /**
+         * The properties loaded for test.
+         */
+        protected Properties testProperties=new Properties();
+        
+        /**
+         * Any Object for mavenMode
+         * @parameter
+         *   expression="${mode}"
+         *   default-value="local"
+         */
+         private Object mode="local";        
+
+        /**
+         * Constructs a new <code>AbstractTestContext</code> instance.
+         */
+        public AbstractTestContext() {
+            super();
+        }
+
+        /**
+         * Constructs a new <code>AbstractTestContext</code> instance.
+         * @param name the name of the test case.
+         */
+        public AbstractTestContext(String name) {
+            super(name);
+            try{
+                if(System.getProperty("mode")!=null){
+                    setMode(System.getProperty("mode") );                
+                }else
+                {
+                    setMode("local");
+                }
+            }catch(Exception ex){
+                setMode("local");
+            }            
+        }
+
+        /*
+         * (non-Javadoc)
+         * @see junit.framework.TestCase#setUp()
+         */
+        @Override
+        protected void setUp() throws Exception {
+            InputStream input=null;
+            File directory = new File("src/test/resources");
+            if (!directory.isDirectory()) {
+                return;
+            }
+            File file = new File(directory, getClass().getSimpleName() + ".properties");
+            if (!file.canRead()) {
+                return;
+            }
+            setTestPropFile(file);
+            try{
+                input=new FileInputStream(file);
+                testProperties.load(input);
+            }catch(IOException ex){
+                ex.printStackTrace();
+                fail("failed to load properties");
+            }
+            
+            
+        }
+
+        /*
+         * (non-Javadoc)
+         * @see junit.framework.TestCase#tearDown()
+         */
+        @Override
+        protected void tearDown() throws Exception {
+
+        }
+
+        /**
+         * Returns the testProperties.
+         * @return the testProperties.
+         */
+        
+        public Properties getTestProperties() {
+            return testProperties;
+        }
+
+        /**
+         * Sets the testProperties.
+         * @param testProperties the testProperties.
+         */
+        
+        public void setTestProperties(Properties testProperties) {
+        
+            this.testProperties = testProperties;
+        }    
+        /**
+        * Returns the testPropFile.
+        * @return the testPropFile.
+        */
+       
+       public File getTestPropFile() {
+           return testPropFile;
+       }
+
+       /**
+        * Sets the testPropFile.
+        * @param testPropFile the testPropFile.
+        */
+       
+       public void setTestPropFile(File testPropFile) {
+       
+           this.testPropFile = testPropFile;
+       }     
+       
+       /**
+        * Skip Tests
+        */
+       public boolean skipTests(Object mode){
+           if(mode.toString().equals("local")){
+               return true;
+           }else {
+               return false;
+           }
+       }
+       
+       /**
+        * Returns the mode.
+        * @return the mode.
+        */
+       
+       public Object getMode() {
+           return mode;
+       }
+
+       /**
+        * Sets the mode.
+        * @param mode the mode.
+        */
+       
+       public void setMode(Object mode) {
+       
+           this.mode = mode;
+       }
+     
+    }
+
+

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/05e188ba/opensoc-streaming/OpenSOC-Common/src/main/java/com/opensoc/test/ISEParserTest.java
----------------------------------------------------------------------
diff --git a/opensoc-streaming/OpenSOC-Common/src/main/java/com/opensoc/test/ISEParserTest.java b/opensoc-streaming/OpenSOC-Common/src/main/java/com/opensoc/test/ISEParserTest.java
new file mode 100644
index 0000000..47061b6
--- /dev/null
+++ b/opensoc-streaming/OpenSOC-Common/src/main/java/com/opensoc/test/ISEParserTest.java
@@ -0,0 +1,27 @@
+package com.opensoc.test;
+
+import java.io.BufferedReader;
+import java.io.DataInputStream;
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.io.StringReader;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Map.Entry;
+
+import org.json.simple.JSONObject;
+
+import com.opensoc.ise.parser.ISEParser;
+import com.opensoc.ise.parser.ParseException;
+
+public class ISEParserTest {
+
+	public static void main(String[] args) throws ParseException, IOException {
+	}
+
+}
+
+

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/05e188ba/opensoc-streaming/OpenSOC-Common/src/main/java/com/opensoc/topologyhelpers/Cli.java
----------------------------------------------------------------------
diff --git a/opensoc-streaming/OpenSOC-Common/src/main/java/com/opensoc/topologyhelpers/Cli.java b/opensoc-streaming/OpenSOC-Common/src/main/java/com/opensoc/topologyhelpers/Cli.java
new file mode 100644
index 0000000..9f8bae3
--- /dev/null
+++ b/opensoc-streaming/OpenSOC-Common/src/main/java/com/opensoc/topologyhelpers/Cli.java
@@ -0,0 +1,186 @@
+package com.opensoc.topologyhelpers;
+
+import java.io.File;
+
+import org.apache.commons.cli.BasicParser;
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.CommandLineParser;
+import org.apache.commons.cli.HelpFormatter;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.ParseException;
+
+public class Cli {
+
+	private String[] args = null;
+	private Options options = new Options();
+
+	private String path = null;
+	private boolean debug = true;
+	private boolean local_mode = true;
+	private boolean generator_spout = false;
+
+	public boolean isGenerator_spout() {
+		return generator_spout;
+	}
+
+	public void setGenerator_spout(boolean generator_spout) {
+		this.generator_spout = generator_spout;
+	}
+
+	public String getPath() {
+		return path;
+	}
+
+	public void setPath(String path) {
+		this.path = path;
+	}
+
+	public boolean isDebug() {
+		return debug;
+	}
+
+	public void setDebug(boolean debug) {
+		this.debug = debug;
+	}
+
+	public boolean isLocal_mode() {
+		return local_mode;
+	}
+
+	public void setLocal_mode(boolean local_mode) {
+		this.local_mode = local_mode;
+	}
+
+	public Cli(String[] args) {
+
+		this.args = args;
+
+		Option help = new Option("h", "Display help menue");
+		options.addOption(help);
+		options.addOption(
+				"config_path",
+				true,
+				"OPTIONAL ARGUMENT [/path/to/configs] Path to configuration folder. If not provided topology will initialize with default configs");
+		options.addOption(
+				"local_mode",
+				true,
+				"REQUIRED ARGUMENT [true|false] Local mode or cluster mode.  If set to true the topology will run in local mode.  If set to false the topology will be deployed to Storm nimbus");
+		options.addOption(
+				"debug",
+				true,
+				"OPTIONAL ARGUMENT [true|false] Storm debugging enabled.  Default value is true");
+		options.addOption(
+				"generator_spout",
+				true,
+				"REQUIRED ARGUMENT [true|false] Turn on test generator spout.  Default is set to false.  If test generator spout is turned on then kafka spout is turned off.  Instead the generator spout will read telemetry from file and ingest it into a topology");
+	}
+
+	public void parse() {
+		CommandLineParser parser = new BasicParser();
+
+		CommandLine cmd = null;
+		try {
+			cmd = parser.parse(options, args);
+
+			if (cmd.hasOption("h"))
+				help();
+
+			if (cmd.hasOption("local_mode")) {
+
+				String local_value = cmd.getOptionValue("local_mode").trim()
+						.toLowerCase();
+
+				if (local_value.equals("true"))
+					local_mode = true;
+
+				else if (local_value.equals("false"))
+					local_mode = false;
+				else {
+					System.out
+							.println("[OpenSOC] ERROR: Invalid value for local mode");
+					System.out
+							.println("[OpenSOC] ERROR: Using cli argument -local_mode="
+									+ cmd.getOptionValue("local_mode"));
+					help();
+				}
+			} else {
+				System.out
+						.println("[OpenSOC] ERROR: Invalid value for local mode");
+				help();
+			}
+			if (cmd.hasOption("generator_spout")) {
+
+				String local_value = cmd.getOptionValue("generator_spout").trim()
+						.toLowerCase();
+
+				if (local_value.equals("true"))
+					generator_spout = true;
+
+				else if (local_value.equals("false"))
+					generator_spout = false;
+				else {
+					System.out
+							.println("[OpenSOC] ERROR: Invalid value for local generator_spout");
+					System.out
+							.println("[OpenSOC] ERROR: Using cli argument -generator_spout="
+									+ cmd.getOptionValue("generator_spout"));
+					help();
+				}
+			} else {
+				System.out
+						.println("[OpenSOC] ERROR: Invalid value for generator_spout");
+				help();
+			}
+			if (cmd.hasOption("config_path")) {
+
+				path = cmd.getOptionValue("config_path").trim();
+
+				File file = new File(path);
+
+				if (!file.isDirectory() || !file.exists()) {
+					System.out
+							.println("[OpenSOC] ERROR: Invalid settings directory name given");
+					System.out
+							.println("[OpenSOC] ERROR: Using cli argument -config_path="
+									+ cmd.getOptionValue("config_path"));
+					help();
+				}
+			}
+
+			if (cmd.hasOption("debug")) {
+				String debug_value = cmd.getOptionValue("debug");
+
+				if (debug_value.equals("true"))
+					debug = true;
+				else if (debug_value.equals("false"))
+					debug = false;
+				else {
+					System.out
+							.println("[OpenSOC] ERROR: Invalid value for debug_value");
+					System.out
+							.println("[OpenSOC] ERROR: Using cli argument -debug_value="
+									+ cmd.getOptionValue("debug_value"));
+					help();
+				}
+			}
+
+		} catch (ParseException e) {
+			System.out
+					.println("[OpenSOC] ERROR: Failed to parse command line arguments");
+			help();
+		}
+	}
+
+	private void help() {
+		// This prints out some help
+		HelpFormatter formater = new HelpFormatter();
+
+		formater.printHelp("Topology Options:", options);
+
+		System.out
+				.println("[OpenSOC] Example usage: \n storm jar OpenSOC-Topologies-0.3BETA-SNAPSHOT.jar com.opensoc.topology.Bro -local_mode true -config_path OpenSOC_Configs/ -generator_spout true");
+
+		System.exit(0);
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/05e188ba/opensoc-streaming/OpenSOC-Common/src/main/java/com/opensoc/topologyhelpers/ErrorGenerator.java
----------------------------------------------------------------------
diff --git a/opensoc-streaming/OpenSOC-Common/src/main/java/com/opensoc/topologyhelpers/ErrorGenerator.java b/opensoc-streaming/OpenSOC-Common/src/main/java/com/opensoc/topologyhelpers/ErrorGenerator.java
new file mode 100644
index 0000000..c21205e
--- /dev/null
+++ b/opensoc-streaming/OpenSOC-Common/src/main/java/com/opensoc/topologyhelpers/ErrorGenerator.java
@@ -0,0 +1,27 @@
+package com.opensoc.topologyhelpers;
+
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+
+import org.json.simple.JSONObject;
+
+public class ErrorGenerator {
+
+	public static JSONObject generateErrorMessage(String message, String exception)
+	{
+		JSONObject error_message = new JSONObject();
+		
+		error_message.put("time", System.currentTimeMillis());
+		try {
+			error_message.put("hostname", InetAddress.getLocalHost().getHostName());
+		} catch (UnknownHostException e) {
+			// TODO Auto-generated catch block
+			e.printStackTrace();
+		}
+		
+		error_message.put("message", message);
+		error_message.put("exception", exception);
+		
+		return error_message;
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/05e188ba/opensoc-streaming/OpenSOC-Common/src/main/java/com/opensoc/topologyhelpers/SettingsLoader.java
----------------------------------------------------------------------
diff --git a/opensoc-streaming/OpenSOC-Common/src/main/java/com/opensoc/topologyhelpers/SettingsLoader.java b/opensoc-streaming/OpenSOC-Common/src/main/java/com/opensoc/topologyhelpers/SettingsLoader.java
new file mode 100644
index 0000000..bb2a460
--- /dev/null
+++ b/opensoc-streaming/OpenSOC-Common/src/main/java/com/opensoc/topologyhelpers/SettingsLoader.java
@@ -0,0 +1,118 @@
+package com.opensoc.topologyhelpers;
+
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+
+import org.apache.commons.configuration.Configuration;
+import org.apache.commons.configuration.ConfigurationException;
+import org.apache.commons.configuration.PropertiesConfiguration;
+import org.apache.commons.configuration.XMLConfiguration;
+import org.json.simple.JSONArray;
+import org.json.simple.JSONObject;
+import org.json.simple.parser.JSONParser;
+import org.json.simple.parser.ParseException;
+
+public class SettingsLoader {
+
+	public static JSONObject loadEnvironmentIdnetifier(String config_path)
+			throws ConfigurationException {
+		Configuration config = new PropertiesConfiguration(config_path);
+
+		String customer = config.getString("customer.id", "unknown");
+		String datacenter = config.getString("datacenter.id", "unknown");
+		String instance = config.getString("instance.id", "unknown");
+
+		JSONObject identifier = new JSONObject();
+		identifier.put("customer", customer);
+		identifier.put("datacenter", datacenter);
+		identifier.put("instance", instance);
+
+		return identifier;
+	}
+
+	public static JSONObject loadTopologyIdnetifier(String config_path)
+			throws ConfigurationException {
+		Configuration config = new PropertiesConfiguration(config_path);
+
+		String topology = config.getString("topology.id", "unknown");
+		String instance = config.getString("instance.id", "unknown");
+
+		JSONObject identifier = new JSONObject();
+		identifier.put("topology", topology);
+		identifier.put("topology_instance", instance);
+
+		return identifier;
+	}
+	
+
+	public static String generateTopologyName(JSONObject env, JSONObject topo) {
+
+		return (env.get("customer") + "_" + env.get("datacenter") + "_"
+				+ env.get("instance") + "_" + topo.get("topology") + "_" + topo.get("topology_instance"));
+	}
+	
+	public static JSONObject generateAlertsIdentifier(JSONObject env, JSONObject topo)
+	{
+		JSONObject identifier = new JSONObject();
+		identifier.put("environment", env);
+		identifier.put("topology", topo);
+		
+		return identifier;
+	}
+
+	public static Map<String, JSONObject> loadRegexAlerts(String config_path)
+			throws ConfigurationException, ParseException {
+		XMLConfiguration alert_rules = new XMLConfiguration();
+		alert_rules.setDelimiterParsingDisabled(true);
+		alert_rules.load(config_path);
+
+		int number_of_rules = alert_rules.getList("rule.pattern").size();
+
+		String[] patterns = alert_rules.getStringArray("rule.pattern");
+		String[] alerts = alert_rules.getStringArray("rule.alert");
+
+		JSONParser pr = new JSONParser();
+		Map<String, JSONObject> rules = new HashMap<String, JSONObject>();
+
+		for (int i = 0; i < patterns.length; i++)
+			rules.put(patterns[i], (JSONObject) pr.parse(alerts[i]));
+
+		return rules;
+	}
+
+	public static Map<String, JSONObject> loadKnownHosts(String config_path)
+			throws ConfigurationException, ParseException {
+		Configuration hosts = new PropertiesConfiguration(config_path);
+
+		Iterator<String> keys = hosts.getKeys();
+		Map<String, JSONObject> known_hosts = new HashMap<String, JSONObject>();
+		JSONParser parser = new JSONParser();
+
+		while (keys.hasNext()) {
+			String key = keys.next().trim();
+			JSONArray value = (JSONArray) parser.parse(hosts.getProperty(key)
+					.toString());
+			known_hosts.put(key, (JSONObject) value.get(0));
+		}
+
+		return known_hosts;
+	}
+
+	public static void printConfigOptions(PropertiesConfiguration config, String path_fragment)
+	{
+		Iterator<String> itr = config.getKeys();
+		
+		while(itr.hasNext())
+		{
+			String key = itr.next();
+			
+			if(key.contains(path_fragment))
+			{
+				
+				System.out.println("[OpenSOC] Key: " + key + " -> " + config.getString(key));
+			}
+		}
+
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/05e188ba/opensoc-streaming/OpenSOC-DataLoads/dependency-reduced-pom.xml
----------------------------------------------------------------------
diff --git a/opensoc-streaming/OpenSOC-DataLoads/dependency-reduced-pom.xml b/opensoc-streaming/OpenSOC-DataLoads/dependency-reduced-pom.xml
new file mode 100644
index 0000000..679e46a
--- /dev/null
+++ b/opensoc-streaming/OpenSOC-DataLoads/dependency-reduced-pom.xml
@@ -0,0 +1,145 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+  <parent>
+    <artifactId>OpenSOC-Streaming</artifactId>
+    <groupId>com.opensoc</groupId>
+    <version>0.3BETA-SNAPSHOT</version>
+  </parent>
+  <modelVersion>4.0.0</modelVersion>
+  <artifactId>OpenSOC-DataLoads</artifactId>
+  <build>
+    <sourceDirectory>src</sourceDirectory>
+    <resources>
+      <resource>
+        <directory>src</directory>
+        <excludes>
+          <exclude>**/*.java</exclude>
+        </excludes>
+      </resource>
+    </resources>
+    <plugins>
+      <plugin>
+        <artifactId>maven-compiler-plugin</artifactId>
+        <version>3.1</version>
+        <configuration>
+          <source>1.7</source>
+          <target>1.7</target>
+        </configuration>
+      </plugin>
+      <plugin>
+        <artifactId>maven-shade-plugin</artifactId>
+        <version>2.3</version>
+        <executions>
+          <execution>
+            <phase>package</phase>
+            <goals>
+              <goal>shade</goal>
+            </goals>
+            <configuration>
+              <artifactSet>
+                <excludes>
+                  <exclude>classworlds:classworlds</exclude>
+                  <exclude>junit:junit</exclude>
+                  <exclude>jmock:*</exclude>
+                  <exclude>*:xml-apis</exclude>
+                  <exclude>org.apache.maven:lib:tests</exclude>
+                  <exclude>log4j:log4j:jar:</exclude>
+                  <exclude>*:hbase:*</exclude>
+                </excludes>
+              </artifactSet>
+            </configuration>
+          </execution>
+        </executions>
+      </plugin>
+    </plugins>
+  </build>
+  <dependencies>
+    <dependency>
+      <groupId>org.apache.storm</groupId>
+      <artifactId>storm-core</artifactId>
+      <version>0.9.2-incubating</version>
+      <scope>provided</scope>
+      <exclusions>
+        <exclusion>
+          <artifactId>clojure</artifactId>
+          <groupId>org.clojure</groupId>
+        </exclusion>
+        <exclusion>
+          <artifactId>clj-time</artifactId>
+          <groupId>clj-time</groupId>
+        </exclusion>
+        <exclusion>
+          <artifactId>compojure</artifactId>
+          <groupId>compojure</groupId>
+        </exclusion>
+        <exclusion>
+          <artifactId>hiccup</artifactId>
+          <groupId>hiccup</groupId>
+        </exclusion>
+        <exclusion>
+          <artifactId>ring-devel</artifactId>
+          <groupId>ring</groupId>
+        </exclusion>
+        <exclusion>
+          <artifactId>ring-jetty-adapter</artifactId>
+          <groupId>ring</groupId>
+        </exclusion>
+        <exclusion>
+          <artifactId>tools.logging</artifactId>
+          <groupId>org.clojure</groupId>
+        </exclusion>
+        <exclusion>
+          <artifactId>math.numeric-tower</artifactId>
+          <groupId>org.clojure</groupId>
+        </exclusion>
+        <exclusion>
+          <artifactId>tools.cli</artifactId>
+          <groupId>org.clojure</groupId>
+        </exclusion>
+        <exclusion>
+          <artifactId>commons-exec</artifactId>
+          <groupId>org.apache.commons</groupId>
+        </exclusion>
+        <exclusion>
+          <artifactId>curator-framework</artifactId>
+          <groupId>org.apache.curator</groupId>
+        </exclusion>
+        <exclusion>
+          <artifactId>carbonite</artifactId>
+          <groupId>com.twitter</groupId>
+        </exclusion>
+        <exclusion>
+          <artifactId>snakeyaml</artifactId>
+          <groupId>org.yaml</groupId>
+        </exclusion>
+        <exclusion>
+          <artifactId>httpclient</artifactId>
+          <groupId>org.apache.httpcomponents</groupId>
+        </exclusion>
+        <exclusion>
+          <artifactId>disruptor</artifactId>
+          <groupId>com.googlecode.disruptor</groupId>
+        </exclusion>
+        <exclusion>
+          <artifactId>jgrapht-core</artifactId>
+          <groupId>org.jgrapht</groupId>
+        </exclusion>
+        <exclusion>
+          <artifactId>logback-classic</artifactId>
+          <groupId>ch.qos.logback</groupId>
+        </exclusion>
+        <exclusion>
+          <artifactId>log4j-over-slf4j</artifactId>
+          <groupId>org.slf4j</groupId>
+        </exclusion>
+      </exclusions>
+    </dependency>
+    <dependency>
+      <groupId>junit</groupId>
+      <artifactId>junit</artifactId>
+      <version>3.8.2</version>
+      <scope>compile</scope>
+    </dependency>
+  </dependencies>
+</project>
+

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/05e188ba/opensoc-streaming/OpenSOC-DataLoads/pom.xml
----------------------------------------------------------------------
diff --git a/opensoc-streaming/OpenSOC-DataLoads/pom.xml b/opensoc-streaming/OpenSOC-DataLoads/pom.xml
new file mode 100644
index 0000000..44df767
--- /dev/null
+++ b/opensoc-streaming/OpenSOC-DataLoads/pom.xml
@@ -0,0 +1,90 @@
+<?xml version="1.0" encoding="UTF-8"?><!-- Licensed to the Apache Software 
+	Foundation (ASF) under one or more contributor license agreements. See the 
+	NOTICE file distributed with this work for additional information regarding 
+	copyright ownership. The ASF licenses this file to You under the Apache License, 
+	Version 2.0 (the "License"); you may not use this file except in compliance 
+	with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 
+	Unless required by applicable law or agreed to in writing, software distributed 
+	under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES 
+	OR CONDITIONS OF ANY KIND, either express or implied. See the License for 
+	the specific language governing permissions and limitations under the License. -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+	<modelVersion>4.0.0</modelVersion>
+	<parent>
+		<groupId>com.opensoc</groupId>
+		<artifactId>OpenSOC-Streaming</artifactId>
+		<version>0.3BETA-SNAPSHOT</version>
+	</parent>
+	<artifactId>OpenSOC-DataLoads</artifactId>
+	<properties>
+
+	</properties>
+	<dependencies>
+		<dependency>
+			<groupId>com.opensoc</groupId>
+			<artifactId>OpenSOC-Common</artifactId>
+			<version>${parent.version}</version>
+		</dependency>
+		<dependency>
+			<groupId>org.apache.storm</groupId>
+			<artifactId>storm-core</artifactId>
+			<version>${global_storm_version}</version>
+			<scope>provided</scope>
+		</dependency>
+		<dependency>
+			<groupId>org.apache.hbase</groupId>
+			<artifactId>hbase-client</artifactId>
+			<version>${global_hbase_version}</version>
+		</dependency>
+	</dependencies>
+	<build>
+		<sourceDirectory>src</sourceDirectory>
+		<resources>
+			<resource>
+				<directory>src</directory>
+				<excludes>
+					<exclude>**/*.java</exclude>
+				</excludes>
+			</resource>
+		</resources>
+		<plugins>
+			<plugin>
+				<artifactId>maven-compiler-plugin</artifactId>
+				<version>3.1</version>
+				<configuration>
+					<source>1.7</source>
+					<target>1.7</target>
+				</configuration>
+			</plugin>
+			<plugin>
+				<groupId>org.apache.maven.plugins</groupId>
+				<artifactId>maven-shade-plugin</artifactId>
+				<version>2.3</version>
+				<executions>
+					<execution>
+						<phase>package</phase>
+						<goals>
+							<goal>shade</goal>
+						</goals>
+						<configuration>
+							<artifactSet>
+								<excludes>
+									<exclude>classworlds:classworlds</exclude>
+									<exclude>junit:junit</exclude>
+									<exclude>jmock:*</exclude>
+									<exclude>*:xml-apis</exclude>
+									<exclude>org.apache.maven:lib:tests</exclude>
+									<exclude>log4j:log4j:jar:</exclude>
+									<exclude>*:hbase:*</exclude>
+								</excludes>
+							</artifactSet>
+						</configuration>
+					</execution>
+				</executions>
+			</plugin>
+		</plugins>
+	</build>
+
+</project>