You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@metron.apache.org by ce...@apache.org on 2015/12/17 21:46:26 UTC

[24/26] incubator-metron git commit: replace opensoc-steaming version 0.4BETA with 0.6BETA 8e7a6b4ad9febbc4ea47ba7810c42cc94d4dee37

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/a919cc19/opensoc-streaming/OpenSOC-Alerts/src/main/resources/hbase-site.xml
----------------------------------------------------------------------
diff --git a/opensoc-streaming/OpenSOC-Alerts/src/main/resources/hbase-site.xml b/opensoc-streaming/OpenSOC-Alerts/src/main/resources/hbase-site.xml
index dc7cba5..8d812a9 100644
--- a/opensoc-streaming/OpenSOC-Alerts/src/main/resources/hbase-site.xml
+++ b/opensoc-streaming/OpenSOC-Alerts/src/main/resources/hbase-site.xml
@@ -1,90 +1,131 @@
-<!--Tue Feb 11 02:34:08 2014 -->
-<configuration>
-
-	<property>
-		<name>hbase.regionserver.global.memstore.lowerLimit</name>
-		<value>0.38</value>
-	</property>
-	<property>
-		<name>zookeeper.session.timeout</name>
-		<value>20</value>
-	</property>
-
-	<property>
-		<name>hbase.security.authorization</name>
-		<value>false</value>
-	</property>
-	<property>
-		<name>hbase.cluster.distributed</name>
-		<value>true</value>
-	</property>
-	
-	<property>
-		<name>hbase.hstore.flush.retries.number</name>
-		<value>120</value>
-	</property>
-	<property>
-		<name>hbase.hregion.memstore.block.multiplier</name>
-		<value>4</value>
-	</property>
-	<property>
-		<name>hbase.hstore.blockingStoreFiles</name>
-		<value>200</value>
-	</property>
-	<property>
-		<name>hbase.defaults.for.version.skip</name>
-		<value>true</value>
-	</property>
-	<property>
-		<name>hbase.regionserver.global.memstore.upperLimit</name>
-		<value>0.4</value>
-	</property>
-	<property>
-		<name>hbase.hregion.memstore.mslab.enabled</name>
-		<value>true</value>
-	</property>
-	<property>
-		<name>hbase.client.keyvalue.maxsize</name>
-		<value>10485760</value>
-	</property>
-	<property>
-		<name>hbase.superuser</name>
-		<value>hbase</value>
-	</property>
-	<property>
-		<name>hfile.block.cache.size</name>
-		<value>0.40</value>
-	</property>
-	<property>
-		<name>zookeeper.znode.parent</name>
-		<value>/hbase-unsecure</value>
-	</property>
-	<property>
-		<name>hbase.hregion.max.filesize</name>
-		<value>10737418240</value>
-	</property>
-	<property>
-		<name>hbase.zookeeper.property.clientPort</name>
-		<value>2181</value>
-	</property>
-	<property>
-		<name>hbase.security.authentication</name>
-		<value>simple</value>
-	</property>
-	<property>
-		<name>hbase.client.scanner.caching</name>
-		<value>100</value>
-	</property>
-	<property>
-		<name>hbase.hregion.memstore.flush.size</name>
-		<value>134217728</value>
-	</property>
-	<property>
-		<name>hbase.hregion.majorcompaction</name>
-		<value>86400000</value>
-	</property>
-	<property>
-		<name>hbase.client.write.buffer</name>
-		<value>500000000</value>
-	</property>
-</configuration>
\ No newline at end of file
+<!--Tue Apr  1 18:16:39 2014-->
+  <configuration>
+    <property>
+    <name>hbase.tmp.dir</name>
+    <value>/disk/h/hbase</value>
+  </property>
+    <property>
+    <name>hbase.hregion.memstore.chunkpool.maxsize</name>
+    <value>0.5</value>
+  </property>
+    <property>
+    <name>hbase.regionserver.codecs</name>
+    <value>lzo,gz,snappy</value>
+  </property>
+    <property>
+    <name>hbase.hstore.flush.retries.number</name>
+    <value>120</value>
+  </property>
+    <property>
+    <name>hbase.client.keyvalue.maxsize</name>
+    <value>10485760</value>
+  </property>
+    <property>
+    <name>hbase.rootdir</name>
+    <value>hdfs://nn1:8020/apps/hbase/data</value>
+  </property>
+    <property>
+    <name>hbase.defaults.for.version.skip</name>
+    <value>true</value>
+  </property>
+    <property>
+    <name>hbase.client.scanner.caching</name>
+    <value>100</value>
+  </property>
+    <property>
+    <name>hbase.superuser</name>
+    <value>hbase</value>
+  </property>
+    <property>
+    <name>hfile.block.cache.size</name>
+    <value>0.40</value>
+  </property>
+    <property>
+    <name>hbase.regionserver.checksum.verify</name>
+    <value>true</value>
+  </property>
+    <property>
+    <name>hbase.hregion.memstore.mslab.enabled</name>
+    <value>true</value>
+  </property>
+    <property>
+    <name>hbase.hregion.max.filesize</name>
+    <value>107374182400</value>
+  </property>
+    <property>
+    <name>hbase.cluster.distributed</name>
+    <value>true</value>
+  </property>
+    <property>
+    <name>zookeeper.session.timeout</name>
+    <value>30000</value>
+  </property>
+    <property>
+    <name>zookeeper.znode.parent</name>
+    <value>/hbase-unsecure</value>
+  </property>
+    <property>
+    <name>hbase.regionserver.global.memstore.lowerLimit</name>
+    <value>0.38</value>
+  </property>
+    <property>
+    <name>hbase.regionserver.handler.count</name>
+    <value>240</value>
+  </property>
+    <property>
+    <name>hbase.hregion.memstore.mslab.chunksize</name>
+    <value>8388608</value>
+  </property>
+    <property>
+    <name>hbase.zookeeper.quorum</name>
+    <value>zkpr1,zkpr2,zkpr3</value>
+  </property>
+    <property>
+    <name>hbase.zookeeper.useMulti</name>
+    <value>true</value>
+  </property>
+    <property>
+    <name>hbase.hregion.majorcompaction</name>
+    <value>86400000</value>
+  </property>
+    <property>
+    <name>hbase.hstore.blockingStoreFiles</name>
+    <value>200</value>
+  </property>
+    <property>
+    <name>hbase.zookeeper.property.clientPort</name>
+    <value>2181</value>
+  </property>
+    <property>
+    <name>hbase.hregion.memstore.flush.size</name>
+    <value>134217728</value>
+  </property>
+    <property>
+    <name>hbase.security.authorization</name>
+    <value>false</value>
+  </property>
+    <property>
+    <name>hbase.regionserver.global.memstore.upperLimit</name>
+    <value>0.4</value>
+  </property>
+    <property>
+    <name>hbase.hstore.compactionThreshold</name>
+    <value>4</value>
+  </property>
+    <property>
+    <name>hbase.hregion.memstore.block.multiplier</name>
+    <value>8</value>
+  </property>
+    <property>
+    <name>hbase.security.authentication</name>
+    <value>simple</value>
+  </property>
+    <property>
+    <name>dfs.client.read.shortcircuit</name>
+    <value>true</value>
+  </property>
+  <property>
+    <name>dfs.domain.socket.path</name>
+    <value>/var/run/hdfs/dn_socket</value>
+  </property>
+  </configuration>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/a919cc19/opensoc-streaming/OpenSOC-Alerts/src/test/java/com/opensoc/alerts/adapters/AllAlertAdapterTest.java
----------------------------------------------------------------------
diff --git a/opensoc-streaming/OpenSOC-Alerts/src/test/java/com/opensoc/alerts/adapters/AllAlertAdapterTest.java b/opensoc-streaming/OpenSOC-Alerts/src/test/java/com/opensoc/alerts/adapters/AllAlertAdapterTest.java
new file mode 100644
index 0000000..65c74c0
--- /dev/null
+++ b/opensoc-streaming/OpenSOC-Alerts/src/test/java/com/opensoc/alerts/adapters/AllAlertAdapterTest.java
@@ -0,0 +1,166 @@
+ /*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.opensoc.alerts.adapters;
+
+import java.lang.reflect.Constructor;
+import java.util.Map;
+import java.util.Properties;
+
+import com.opensoc.test.AbstractConfigTest;
+import com.opensoc.alerts.adapters.AllAlertAdapter;
+
+ /**
+ * <ul>
+ * <li>Title: AllAlertAdapterTest</li>
+ * <li>Description: Tests for AllAlertAdapter</li>
+ * <li>Created: Oct 8, 2014</li>
+ * </ul>
+ * @version $Revision: 1.1 $
+ */
+public class AllAlertAdapterTest extends AbstractConfigTest {
+
+     /**
+     * The allAlertAdapter.
+     */
+    private static AllAlertAdapter allAlertAdapter=null;
+    
+     /**
+     * The connected.
+     */
+    private static boolean connected=false;
+
+    /**
+     * Constructs a new <code>AllAlertAdapterTest</code> instance.
+     * @param name
+     */
+    public AllAlertAdapterTest(String name) {
+        super(name);
+    }
+
+    /**
+     * @throws java.lang.Exception
+     */
+    protected static void setUpBeforeClass() throws Exception {
+    }
+
+    /**
+     * @throws java.lang.Exception
+     */
+    protected static void tearDownAfterClass() throws Exception {
+    }
+
+    /* 
+     * (non-Javadoc)
+     * @see junit.framework.TestCase#setUp()
+     */
+
+    @SuppressWarnings("unchecked")
+    protected void setUp() throws Exception {
+          super.setUp("com.opensoc.alerts.adapters.AllAlertAdapter");
+          Properties prop = super.getTestProperties();
+          assertNotNull(prop);   
+       // this.setMode("global");
+        if(skipTests(this.getMode())){
+            System.out.println(getClass().getName()+" Skipping Tests !!Local Mode");
+            return;//skip tests
+       }else{      
+           Map<String, String> settings = super.getSettings();
+           @SuppressWarnings("rawtypes")
+        Class loaded_class = Class.forName("com.opensoc.alerts.adapters.AllAlertAdapter");
+           @SuppressWarnings("rawtypes")
+        Constructor constructor = loaded_class.getConstructor(new Class[] { Map.class});
+           
+           AllAlertAdapterTest.allAlertAdapter = (AllAlertAdapter) constructor.newInstance(settings);
+            // AllAlertAdapterTest.allAlertAdapter = new AllAlertAdapter(settings)
+      }
+    }
+
+    /* 
+     * (non-Javadoc)
+     * @see junit.framework.TestCase#tearDown()
+     */
+
+    protected void tearDown() throws Exception {
+        super.tearDown();
+    }
+
+
+    /**
+     * Test method for {@link com.opensoc.alerts.adapters.AlllterAdapter#initialize()}.
+     */
+    public void testInitializeAdapter() {
+        if(skipTests(this.getMode())){
+            return;//skip tests
+       }else{        
+           
+        boolean initialized =AllAlertAdapterTest.getAllAlertAdapter().initialize();
+        assertTrue(initialized);
+       }
+    }
+    
+    /**
+     * Test method for containsAlertId(@link  com.opensoc.alerts.adapters.AlllterAdapter#containsAlertId()}.
+     */
+    public void testContainsAlertId(){
+        if(skipTests(this.getMode())){
+            return;//skip tests
+       }else{          
+            boolean containsAlert =AllAlertAdapterTest.getAllAlertAdapter().containsAlertId("test");
+            assertFalse(containsAlert);
+       }
+    }
+ 
+   
+
+    /**
+     * Returns the allAlertAdapter.
+     * @return the allAlertAdapter.
+     */
+    
+    public static AllAlertAdapter getAllAlertAdapter() {
+        return allAlertAdapter;
+    }
+
+    /**
+     * Sets the allAlertAdapter.
+     * @param allAlertAdapter the allAlertAdapter.
+     */
+    
+    public static void setAllAlertAdapter(AllAlertAdapter allAlertAdapter) {
+    
+        AllAlertAdapterTest.allAlertAdapter = allAlertAdapter;
+    }
+    /**
+     * Returns the connected.
+     * @return the connected.
+     */
+    
+    public static boolean isConnected() {
+        return connected;
+    }
+
+    /**
+     * Sets the connected.
+     * @param connected the connected.
+     */
+    
+    public static void setConnected(boolean connected) {
+    
+        AllAlertAdapterTest.connected = connected;
+    }    
+}
+

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/a919cc19/opensoc-streaming/OpenSOC-Alerts/src/test/resources/AllAlertAdapterTest.properties
----------------------------------------------------------------------
diff --git a/opensoc-streaming/OpenSOC-Alerts/src/test/resources/AllAlertAdapterTest.properties b/opensoc-streaming/OpenSOC-Alerts/src/test/resources/AllAlertAdapterTest.properties
new file mode 100644
index 0000000..8b13789
--- /dev/null
+++ b/opensoc-streaming/OpenSOC-Alerts/src/test/resources/AllAlertAdapterTest.properties
@@ -0,0 +1 @@
+

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/a919cc19/opensoc-streaming/OpenSOC-Alerts/src/test/resources/TestSchemas/AllAlertAdapterSchema.json
----------------------------------------------------------------------
diff --git a/opensoc-streaming/OpenSOC-Alerts/src/test/resources/TestSchemas/AllAlertAdapterSchema.json b/opensoc-streaming/OpenSOC-Alerts/src/test/resources/TestSchemas/AllAlertAdapterSchema.json
new file mode 100644
index 0000000..c4f2a82
--- /dev/null
+++ b/opensoc-streaming/OpenSOC-Alerts/src/test/resources/TestSchemas/AllAlertAdapterSchema.json
@@ -0,0 +1,42 @@
+{
+"title": "GeoMySql Schema",
+"type": "object",
+"properties": {
+
+         "city"    : {
+					   "type": "string"
+				  },
+		 "country" : {
+						"type": "string"
+					},
+		 "dmaCode" :
+		 			 {
+						"type": "string"
+					},
+	     "geoHash" : 
+	     			{
+						"type": "string"
+					},
+		 "latitude" : 
+		 			{
+						"type": "string"
+				   },
+		 "locID" : 
+		 			{
+					   "type": "string"
+				   },
+		 "location_point" : 
+		 			{
+					   "type": "string"
+				    },
+		 "longitude" : 
+		 			{
+						"type": "string"
+					},
+		 "postalCode" : 
+		 			{
+						"type": "string"
+					}
+   },
+   "required": ["city", "country", "dmaCode","latitude","locID","location_point","postalCode"]
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/a919cc19/opensoc-streaming/OpenSOC-Alerts/src/test/resources/config/AllAlertAdapterTest.config
----------------------------------------------------------------------
diff --git a/opensoc-streaming/OpenSOC-Alerts/src/test/resources/config/AllAlertAdapterTest.config b/opensoc-streaming/OpenSOC-Alerts/src/test/resources/config/AllAlertAdapterTest.config
new file mode 100644
index 0000000..f6e5dd1
--- /dev/null
+++ b/opensoc-streaming/OpenSOC-Alerts/src/test/resources/config/AllAlertAdapterTest.config
@@ -0,0 +1,8 @@
+#Alerts Bolt
+bolt.alerts.adapter=com.opensoc.alerts.adapters.AllAlertAdapter
+com.opensoc.alerts.adapters.AllAlertAdapter.whitelist_table_name = ip_whitelist
+com.opensoc.alerts.adapters.AllAlertAdapter.blacklist_table_name = ip_blacklist
+com.opensoc.alerts.adapters.AllAlertAdapter.quorum=zkpr1,zkpr2,zkpr3
+com.opensoc.alerts.adapters.AllAlertAdapter.port=2181
+com.opensoc.alerts.adapters.AllAlertAdapter._MAX_CACHE_SIZE_OBJECTS_NUM=3600
+com.opensoc.alerts.adapters.AllAlertAdapter._MAX_TIME_RETAIN_MINUTES=1000
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/a919cc19/opensoc-streaming/OpenSOC-Common/.gitignore
----------------------------------------------------------------------
diff --git a/opensoc-streaming/OpenSOC-Common/.gitignore b/opensoc-streaming/OpenSOC-Common/.gitignore
new file mode 100644
index 0000000..b83d222
--- /dev/null
+++ b/opensoc-streaming/OpenSOC-Common/.gitignore
@@ -0,0 +1 @@
+/target/

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/a919cc19/opensoc-streaming/OpenSOC-Common/pom.xml
----------------------------------------------------------------------
diff --git a/opensoc-streaming/OpenSOC-Common/pom.xml b/opensoc-streaming/OpenSOC-Common/pom.xml
index 582093d..ad1382f 100644
--- a/opensoc-streaming/OpenSOC-Common/pom.xml
+++ b/opensoc-streaming/OpenSOC-Common/pom.xml
@@ -15,21 +15,23 @@
 	<parent>
 		<groupId>com.opensoc</groupId>
 		<artifactId>OpenSOC-Streaming</artifactId>
-		<version>0.3BETA-SNAPSHOT</version>
+		<version>0.6BETA</version>
 	</parent>
 	<artifactId>OpenSOC-Common</artifactId>
 	<name>OpenSOC-Common</name>
 	<description>Components common to all enrichments</description>
 	<properties>
+		<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+		<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
 		<kafka.version>0.8.0</kafka.version>
 		<commons.config.version>1.10</commons.config.version>
 		<hbase.version>0.98.5-hadoop2</hbase.version>
 	</properties>
 	<repositories>
 		<repository>
-			<id>Kraken-Repo</id>
-			<name>Kraken Repository</name>
-			<url>http://download.krakenapps.org</url>
+			<id>OpenSOC-Kraken-Repo</id>
+			<name>OpenSOC Kraken Repository</name>
+			<url>https://raw.github.com/opensoc/kraken/mvn-repo</url>
 		</repository>
 	</repositories>
 	<dependencies>
@@ -43,6 +45,15 @@
 			<artifactId>storm-core</artifactId>
 			<version>${global_storm_version}</version>
 			<scope>provided</scope>
+			<exclusions>
+				<exclusion>
+				
+				   <artifactId>servlet-api</artifactId>
+				
+				   <groupId>javax.servlet</groupId>
+				
+				  </exclusion>
+			</exclusions>			
 		</dependency>
 		<dependency>
 			<groupId>org.apache.kafka</groupId>
@@ -82,7 +93,7 @@
 		<dependency>
 			<groupId>org.krakenapps</groupId>
 			<artifactId>kraken-pcap</artifactId>
-			<version>1.5.0</version>
+			<version>1.7.1</version>
 		</dependency>
 		<dependency>
 			<groupId>junit</groupId>
@@ -93,6 +104,21 @@
 			<groupId>org.apache.hbase</groupId>
 			<artifactId>hbase-client</artifactId>
 			<version>${hbase.version}</version>
+			<exclusions>
+				<exclusion>
+					<groupId>org.slf4j</groupId>
+					<artifactId>slf4j-log4j12</artifactId>
+				</exclusion>
+				<exclusion>
+					<groupId>log4j</groupId>
+					<artifactId>log4j</artifactId>
+				</exclusion>
+			</exclusions>
+		</dependency>
+		<dependency>
+			<groupId>com.github.fge</groupId>
+			<artifactId>json-schema-validator</artifactId>
+			<version>${global_json_schema_validator_version}</version>
 		</dependency>
 	</dependencies>
 
@@ -123,6 +149,18 @@
 		</plugins>
 	</reporting>
 	<build>
+		<plugins>
+			<plugin>
+				<groupId>org.apache.maven.plugins</groupId>
+				<artifactId>maven-compiler-plugin</artifactId>
+				<version>3.1</version>
+				<configuration>
+					<source>1.7</source>
+					<compilerArgument>-Xlint:unchecked</compilerArgument>
+					<target>1.7</target>
+				</configuration>
+			</plugin>
+		</plugins>
 		<resources>
 			<resource>
 				<directory>src/main/resources</directory>

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/a919cc19/opensoc-streaming/OpenSOC-Common/src/main/java/com/opensoc/configuration/ConfigurationManager.java
----------------------------------------------------------------------
diff --git a/opensoc-streaming/OpenSOC-Common/src/main/java/com/opensoc/configuration/ConfigurationManager.java b/opensoc-streaming/OpenSOC-Common/src/main/java/com/opensoc/configuration/ConfigurationManager.java
new file mode 100644
index 0000000..74f19a5
--- /dev/null
+++ b/opensoc-streaming/OpenSOC-Common/src/main/java/com/opensoc/configuration/ConfigurationManager.java
@@ -0,0 +1,119 @@
+package com.opensoc.configuration;
+
+
+
+import java.io.File;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.commons.configuration.CombinedConfiguration;
+import org.apache.commons.configuration.Configuration;
+import org.apache.commons.configuration.ConfigurationException;
+import org.apache.commons.configuration.DefaultConfigurationBuilder;
+import org.apache.commons.lang.StringUtils;
+import org.apache.log4j.Logger;
+
+/**
+ * Configuration manager class which loads all 'config-definition.xml' files and
+ * creates a Configuration object which holds all properties from the underlying
+ * configuration resource
+ */
+public class ConfigurationManager {
+
+  /** configuration definition file name. */
+  private static String DEFAULT_CONFIG_DEFINITION_FILE_NAME = "config-definition.xml";
+
+  /** Stores a map with the configuration for each path specified. */
+  private static Map<String, Configuration> configurationsCache = new HashMap<String, Configuration>();
+
+  /** The Constant LOGGER. */
+  private static final Logger LOGGER = Logger
+      .getLogger(ConfigurationManager.class);
+
+  /**
+   * Common method to load content of all configuration resources defined in
+   * 'config-definition.xml'.
+   * 
+   * @param configDefFilePath
+   *          the config def file path
+   * @return Configuration
+   */
+  public static Configuration getConfiguration(String configDefFilePath) {
+    if (configurationsCache.containsKey(configDefFilePath)) {
+      return configurationsCache.get(configDefFilePath);
+    }
+    CombinedConfiguration configuration = null;
+    synchronized (configurationsCache) {
+      if (configurationsCache.containsKey(configDefFilePath)) {
+        return configurationsCache.get(configDefFilePath);
+      }
+      DefaultConfigurationBuilder builder = new DefaultConfigurationBuilder();
+      String fielPath = getConfigDefFilePath(configDefFilePath);
+      LOGGER.info("loading from 'configDefFilePath' :" + fielPath);
+      builder.setFile(new File(fielPath));
+      try {
+        configuration = builder.getConfiguration(true);
+        configurationsCache.put(fielPath, configuration);
+      } catch (ConfigurationException e) {
+        LOGGER.info("Exception in loading property files.", e);
+      }
+    }
+    return configuration;
+  }
+
+  /**
+   * Removes the configuration created from a config definition file located at
+   * 'configDefFilePath'.
+   * 
+   * @param configDefFilePath
+   *          path to the config definition file
+   */
+  public static void clearConfiguration(String configDefFilePath) {
+    configurationsCache.remove(configDefFilePath);
+  }
+
+  /**
+   * Gets the configuration.
+   * 
+   * @return the configuration
+   */
+  public static Configuration getConfiguration() {
+    return getConfiguration(null);
+  }
+
+  /**
+   * Returns the 'config-definition.xml' file path. 1. If the param
+   * 'configDefFilePath' has a valid value, returns configDefFilePath 2. If the
+   * system property key 'configDefFilePath' has a valid value, returns the
+   * value 3. By default, it returns the file name 'config-definition.xml'
+   * 
+   * @param configDefFilePath
+   *          given input path to the config definition file
+   * @return the config def file path
+   */
+  private static String getConfigDefFilePath(String configDefFilePath) {
+    if (StringUtils.isNotEmpty(configDefFilePath)) {
+      return configDefFilePath;
+    }
+    return DEFAULT_CONFIG_DEFINITION_FILE_NAME;
+  }
+
+  /**
+   * The main method.
+   * 
+   * @param args
+   *          the args
+   * @throws InterruptedException
+   *           the interrupted exception
+   */
+  public static void main(String[] args) throws InterruptedException {
+    Configuration config = ConfigurationManager
+        .getConfiguration("/Users/Sayi/Documents/config/config-definition-dpi.xml");
+    System.out.println("elastic.search.cluster ="
+        + config.getString("elastic.search.cluster"));
+    Thread.sleep(10000);
+    System.out.println("storm.topology.dpi.bolt.es-index.index.name ="
+        + config.getString("storm.topology.dpi.bolt.es-index.index.name"));
+  }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/a919cc19/opensoc-streaming/OpenSOC-Common/src/main/java/com/opensoc/dataloads/interfaces/ThreatIntelSource.java
----------------------------------------------------------------------
diff --git a/opensoc-streaming/OpenSOC-Common/src/main/java/com/opensoc/dataloads/interfaces/ThreatIntelSource.java b/opensoc-streaming/OpenSOC-Common/src/main/java/com/opensoc/dataloads/interfaces/ThreatIntelSource.java
new file mode 100644
index 0000000..e19646a
--- /dev/null
+++ b/opensoc-streaming/OpenSOC-Common/src/main/java/com/opensoc/dataloads/interfaces/ThreatIntelSource.java
@@ -0,0 +1,11 @@
+package com.opensoc.dataloads.interfaces;
+
+import java.util.Iterator;
+import org.apache.commons.configuration.Configuration;
+import org.json.simple.JSONObject;
+
+public interface ThreatIntelSource extends Iterator<JSONObject> {
+
+	void initializeSource(Configuration config);
+	void cleanupSource();
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/a919cc19/opensoc-streaming/OpenSOC-Common/src/main/java/com/opensoc/hbase/HBaseBolt.java
----------------------------------------------------------------------
diff --git a/opensoc-streaming/OpenSOC-Common/src/main/java/com/opensoc/hbase/HBaseBolt.java b/opensoc-streaming/OpenSOC-Common/src/main/java/com/opensoc/hbase/HBaseBolt.java
index 9c8f604..ef155f1 100644
--- a/opensoc-streaming/OpenSOC-Common/src/main/java/com/opensoc/hbase/HBaseBolt.java
+++ b/opensoc-streaming/OpenSOC-Common/src/main/java/com/opensoc/hbase/HBaseBolt.java
@@ -5,16 +5,9 @@ package com.opensoc.hbase;
 import java.io.IOException;
 import java.util.Map;
 
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.HBaseConfiguration;
-import org.apache.hadoop.hbase.HColumnDescriptor;
-import org.apache.hadoop.hbase.HTableDescriptor;
-import org.apache.hadoop.hbase.client.HBaseAdmin;
 import org.apache.log4j.Logger;
 import org.json.simple.JSONObject;
 
-import com.opensoc.topologyhelpers.ErrorGenerator;
-
 import backtype.storm.task.OutputCollector;
 import backtype.storm.task.TopologyContext;
 import backtype.storm.topology.IRichBolt;
@@ -23,6 +16,8 @@ import backtype.storm.tuple.Fields;
 import backtype.storm.tuple.Tuple;
 import backtype.storm.tuple.Values;
 
+import com.opensoc.helpers.topology.ErrorGenerator;
+
 /**
  * A Storm bolt for putting data into HBase.
  * <p>
@@ -76,12 +71,9 @@ public class HBaseBolt implements IRichBolt {
     try {
       this.connector.getTable().put(conf.getPutFromTuple(input));
     } catch (IOException ex) {
-    	
-        String error_as_string = org.apache.commons.lang.exception.ExceptionUtils
-  				.getStackTrace(ex);
 
   		JSONObject error = ErrorGenerator.generateErrorMessage(
-  				"Alerts problem: " + input.getBinary(0), error_as_string);
+  				"Alerts problem: " + input.getBinary(0), ex);
   		collector.emit("error", new Values(error));
   		
       throw new RuntimeException(ex);

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/a919cc19/opensoc-streaming/OpenSOC-Common/src/main/java/com/opensoc/helpers/services/PcapServiceCli.java
----------------------------------------------------------------------
diff --git a/opensoc-streaming/OpenSOC-Common/src/main/java/com/opensoc/helpers/services/PcapServiceCli.java b/opensoc-streaming/OpenSOC-Common/src/main/java/com/opensoc/helpers/services/PcapServiceCli.java
new file mode 100644
index 0000000..70f8683
--- /dev/null
+++ b/opensoc-streaming/OpenSOC-Common/src/main/java/com/opensoc/helpers/services/PcapServiceCli.java
@@ -0,0 +1,110 @@
+package com.opensoc.helpers.services;
+
+import org.apache.commons.cli.BasicParser;
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.CommandLineParser;
+import org.apache.commons.cli.HelpFormatter;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.ParseException;
+
+public class PcapServiceCli {
+
+	private String[] args = null;
+	private Options options = new Options();
+
+	int port = 8081;
+	String uri = "/pcapGetter";
+
+	public int getPort() {
+		return port;
+	}
+
+	public void setPort(int port) {
+		this.port = port;
+	}
+
+	public String getUri() {
+		return uri;
+	}
+
+	public void setUri(String uri) {
+		this.uri = uri;
+	}
+
+	public PcapServiceCli(String[] args) {
+
+		this.args = args;
+
+		Option help = new Option("h", "Display help menue");
+		options.addOption(help);
+		options.addOption(
+				"port",
+				true,
+				"OPTIONAL ARGUMENT [portnumber] If this argument sets the port for starting the service.  If this argument is not set the port will start on defaut port 8081");
+		options.addOption(
+				"endpoint_uri",
+				true,
+				"OPTIONAL ARGUMENT [/uri/to/service] This sets the URI for the service to be hosted.  The default URI is /pcapGetter");
+	}
+
+	public void parse() {
+		CommandLineParser parser = new BasicParser();
+
+		CommandLine cmd = null;
+
+		try {
+			cmd = parser.parse(options, args);
+		} catch (ParseException e1) {
+
+			e1.printStackTrace();
+		}
+
+		if (cmd.hasOption("h"))
+			help();
+
+		if (cmd.hasOption("port")) {
+
+			try {
+				port = Integer.parseInt(cmd.getOptionValue("port").trim());
+			} catch (Exception e) {
+
+				System.out.println("[OpenSOC] Invalid value for port entered");
+				help();
+			}
+		}
+		if (cmd.hasOption("endpoint_uri")) {
+
+			try {
+
+				if (uri == null || uri.equals(""))
+					throw new Exception("invalid uri");
+
+				uri = cmd.getOptionValue("uri").trim();
+
+				if (uri.charAt(0) != '/')
+					uri = "/" + uri;
+
+				if (uri.charAt(uri.length()) == '/')
+					uri = uri.substring(0, uri.length() - 1);
+
+			} catch (Exception e) {
+				System.out.println("[OpenSOC] Invalid URI entered");
+				help();
+			}
+		}
+
+	}
+
+	private void help() {
+		// This prints out some help
+		HelpFormatter formater = new HelpFormatter();
+
+		formater.printHelp("Topology Options:", options);
+
+		// System.out
+		// .println("[OpenSOC] Example usage: \n storm jar OpenSOC-Topologies-0.3BETA-SNAPSHOT.jar com.opensoc.topology.Bro -local_mode true -config_path OpenSOC_Configs/ -generator_spout true");
+
+		System.exit(0);
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/a919cc19/opensoc-streaming/OpenSOC-Common/src/main/java/com/opensoc/helpers/topology/Cli.java
----------------------------------------------------------------------
diff --git a/opensoc-streaming/OpenSOC-Common/src/main/java/com/opensoc/helpers/topology/Cli.java b/opensoc-streaming/OpenSOC-Common/src/main/java/com/opensoc/helpers/topology/Cli.java
new file mode 100644
index 0000000..0d9486e
--- /dev/null
+++ b/opensoc-streaming/OpenSOC-Common/src/main/java/com/opensoc/helpers/topology/Cli.java
@@ -0,0 +1,186 @@
+package com.opensoc.helpers.topology;
+
+import java.io.File;
+
+import org.apache.commons.cli.BasicParser;
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.CommandLineParser;
+import org.apache.commons.cli.HelpFormatter;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.ParseException;
+
+public class Cli {
+
+	private String[] args = null;
+	private Options options = new Options();
+
+	private String path = null;
+	private boolean debug = true;
+	private boolean local_mode = true;
+	private boolean generator_spout = false;
+
+	public boolean isGenerator_spout() {
+		return generator_spout;
+	}
+
+	public void setGenerator_spout(boolean generator_spout) {
+		this.generator_spout = generator_spout;
+	}
+
+	public String getPath() {
+		return path;
+	}
+
+	public void setPath(String path) {
+		this.path = path;
+	}
+
+	public boolean isDebug() {
+		return debug;
+	}
+
+	public void setDebug(boolean debug) {
+		this.debug = debug;
+	}
+
+	public boolean isLocal_mode() {
+		return local_mode;
+	}
+
+	public void setLocal_mode(boolean local_mode) {
+		this.local_mode = local_mode;
+	}
+
+	public Cli(String[] args) {
+
+		this.args = args;
+
+		Option help = new Option("h", "Display help menue");
+		options.addOption(help);
+		options.addOption(
+				"config_path",
+				true,
+				"OPTIONAL ARGUMENT [/path/to/configs] Path to configuration folder. If not provided topology will initialize with default configs");
+		options.addOption(
+				"local_mode",
+				true,
+				"REQUIRED ARGUMENT [true|false] Local mode or cluster mode.  If set to true the topology will run in local mode.  If set to false the topology will be deployed to Storm nimbus");
+		options.addOption(
+				"debug",
+				true,
+				"OPTIONAL ARGUMENT [true|false] Storm debugging enabled.  Default value is true");
+		options.addOption(
+				"generator_spout",
+				true,
+				"REQUIRED ARGUMENT [true|false] Turn on test generator spout.  Default is set to false.  If test generator spout is turned on then kafka spout is turned off.  Instead the generator spout will read telemetry from file and ingest it into a topology");
+	}
+
+	public void parse() {
+		CommandLineParser parser = new BasicParser();
+
+		CommandLine cmd = null;
+		try {
+			cmd = parser.parse(options, args);
+
+			if (cmd.hasOption("h"))
+				help();
+
+			if (cmd.hasOption("local_mode")) {
+
+				String local_value = cmd.getOptionValue("local_mode").trim()
+						.toLowerCase();
+
+				if (local_value.equals("true"))
+					local_mode = true;
+
+				else if (local_value.equals("false"))
+					local_mode = false;
+				else {
+					System.out
+							.println("[OpenSOC] ERROR: Invalid value for local mode");
+					System.out
+							.println("[OpenSOC] ERROR: Using cli argument -local_mode="
+									+ cmd.getOptionValue("local_mode"));
+					help();
+				}
+			} else {
+				System.out
+						.println("[OpenSOC] ERROR: Invalid value for local mode");
+				help();
+			}
+			if (cmd.hasOption("generator_spout")) {
+
+				String local_value = cmd.getOptionValue("generator_spout").trim()
+						.toLowerCase();
+
+				if (local_value.equals("true"))
+					generator_spout = true;
+
+				else if (local_value.equals("false"))
+					generator_spout = false;
+				else {
+					System.out
+							.println("[OpenSOC] ERROR: Invalid value for local generator_spout");
+					System.out
+							.println("[OpenSOC] ERROR: Using cli argument -generator_spout="
+									+ cmd.getOptionValue("generator_spout"));
+					help();
+				}
+			} else {
+				System.out
+						.println("[OpenSOC] ERROR: Invalid value for generator_spout");
+				help();
+			}
+			if (cmd.hasOption("config_path")) {
+
+				path = cmd.getOptionValue("config_path").trim();
+
+				File file = new File(path);
+
+				if (!file.isDirectory() || !file.exists()) {
+					System.out
+							.println("[OpenSOC] ERROR: Invalid settings directory name given");
+					System.out
+							.println("[OpenSOC] ERROR: Using cli argument -config_path="
+									+ cmd.getOptionValue("config_path"));
+					help();
+				}
+			}
+
+			if (cmd.hasOption("debug")) {
+				String debug_value = cmd.getOptionValue("debug");
+
+				if (debug_value.equals("true"))
+					debug = true;
+				else if (debug_value.equals("false"))
+					debug = false;
+				else {
+					System.out
+							.println("[OpenSOC] ERROR: Invalid value for debug_value");
+					System.out
+							.println("[OpenSOC] ERROR: Using cli argument -debug_value="
+									+ cmd.getOptionValue("debug_value"));
+					help();
+				}
+			}
+
+		} catch (ParseException e) {
+			System.out
+					.println("[OpenSOC] ERROR: Failed to parse command line arguments");
+			help();
+		}
+	}
+
+	private void help() {
+		// This prints out some help
+		HelpFormatter formater = new HelpFormatter();
+
+		formater.printHelp("Topology Options:", options);
+
+		System.out
+				.println("[OpenSOC] Example usage: \n storm jar OpenSOC-Topologies-0.3BETA-SNAPSHOT.jar com.opensoc.topology.Bro -local_mode true -config_path OpenSOC_Configs/ -generator_spout true");
+
+		System.exit(0);
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/a919cc19/opensoc-streaming/OpenSOC-Common/src/main/java/com/opensoc/helpers/topology/ErrorGenerator.java
----------------------------------------------------------------------
diff --git a/opensoc-streaming/OpenSOC-Common/src/main/java/com/opensoc/helpers/topology/ErrorGenerator.java b/opensoc-streaming/OpenSOC-Common/src/main/java/com/opensoc/helpers/topology/ErrorGenerator.java
new file mode 100644
index 0000000..97f0ba7
--- /dev/null
+++ b/opensoc-streaming/OpenSOC-Common/src/main/java/com/opensoc/helpers/topology/ErrorGenerator.java
@@ -0,0 +1,37 @@
+package com.opensoc.helpers.topology;
+
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+
+import org.apache.commons.lang.exception.ExceptionUtils;
+import org.json.simple.JSONObject;
+
+public class ErrorGenerator {
+
+	@SuppressWarnings("unchecked")
+	public static JSONObject generateErrorMessage(String message, Exception e)
+	{
+		JSONObject error_message = new JSONObject();
+		
+		/*
+		 * Save full stack trace in object.
+		 */
+		String stackTrace = ExceptionUtils.getStackTrace(e);
+		
+		String exception = e.toString();
+		
+		error_message.put("time", System.currentTimeMillis());
+		try {
+			error_message.put("hostname", InetAddress.getLocalHost().getHostName());
+		} catch (UnknownHostException ex) {
+			// TODO Auto-generated catch block
+			ex.printStackTrace();
+		}
+		
+		error_message.put("message", message);
+		error_message.put("exception", exception);
+		error_message.put("stack", stackTrace);
+		
+		return error_message;
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/a919cc19/opensoc-streaming/OpenSOC-Common/src/main/java/com/opensoc/helpers/topology/SettingsLoader.java
----------------------------------------------------------------------
diff --git a/opensoc-streaming/OpenSOC-Common/src/main/java/com/opensoc/helpers/topology/SettingsLoader.java b/opensoc-streaming/OpenSOC-Common/src/main/java/com/opensoc/helpers/topology/SettingsLoader.java
new file mode 100644
index 0000000..261d481
--- /dev/null
+++ b/opensoc-streaming/OpenSOC-Common/src/main/java/com/opensoc/helpers/topology/SettingsLoader.java
@@ -0,0 +1,149 @@
+package com.opensoc.helpers.topology;
+
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+
+import org.apache.commons.configuration.Configuration;
+import org.apache.commons.configuration.ConfigurationException;
+import org.apache.commons.configuration.PropertiesConfiguration;
+import org.apache.commons.configuration.XMLConfiguration;
+import org.json.simple.JSONArray;
+import org.json.simple.JSONObject;
+import org.json.simple.parser.JSONParser;
+import org.json.simple.parser.ParseException;
+
+public class SettingsLoader {
+
+	@SuppressWarnings("unchecked")
+	public static JSONObject loadEnvironmentIdnetifier(String config_path)
+			throws ConfigurationException {
+		Configuration config = new PropertiesConfiguration(config_path);
+
+		String customer = config.getString("customer.id", "unknown");
+		String datacenter = config.getString("datacenter.id", "unknown");
+		String instance = config.getString("instance.id", "unknown");
+
+		JSONObject identifier = new JSONObject();
+		identifier.put("customer", customer);
+		identifier.put("datacenter", datacenter);
+		identifier.put("instance", instance);
+
+		return identifier;
+	}
+
+	@SuppressWarnings("unchecked")
+	public static JSONObject loadTopologyIdnetifier(String config_path)
+			throws ConfigurationException {
+		Configuration config = new PropertiesConfiguration(config_path);
+
+		String topology = config.getString("topology.id", "unknown");
+		String instance = config.getString("instance.id", "unknown");
+
+		JSONObject identifier = new JSONObject();
+		identifier.put("topology", topology);
+		identifier.put("topology_instance", instance);
+
+		return identifier;
+	}
+	
+
+	public static String generateTopologyName(JSONObject env, JSONObject topo) {
+
+		return (env.get("customer") + "_" + env.get("datacenter") + "_"
+				+ env.get("instance") + "_" + topo.get("topology") + "_" + topo.get("topology_instance"));
+	}
+	
+	@SuppressWarnings("unchecked")
+	public static JSONObject generateAlertsIdentifier(JSONObject env, JSONObject topo)
+	{
+		JSONObject identifier = new JSONObject();
+		identifier.put("environment", env);
+		identifier.put("topology", topo);
+		
+		return identifier;
+	}
+
+	public static Map<String, JSONObject> loadRegexAlerts(String config_path)
+			throws ConfigurationException, ParseException {
+		XMLConfiguration alert_rules = new XMLConfiguration();
+		alert_rules.setDelimiterParsingDisabled(true);
+		alert_rules.load(config_path);
+
+		//int number_of_rules = alert_rules.getList("rule.pattern").size();
+
+		String[] patterns = alert_rules.getStringArray("rule.pattern");
+		String[] alerts = alert_rules.getStringArray("rule.alert");
+
+		JSONParser pr = new JSONParser();
+		Map<String, JSONObject> rules = new HashMap<String, JSONObject>();
+
+		for (int i = 0; i < patterns.length; i++)
+			rules.put(patterns[i], (JSONObject) pr.parse(alerts[i]));
+
+		return rules;
+	}
+
+	public static Map<String, JSONObject> loadKnownHosts(String config_path)
+			throws ConfigurationException, ParseException {
+		Configuration hosts = new PropertiesConfiguration(config_path);
+
+		Iterator<String> keys = hosts.getKeys();
+		Map<String, JSONObject> known_hosts = new HashMap<String, JSONObject>();
+		JSONParser parser = new JSONParser();
+
+		while (keys.hasNext()) {
+			String key = keys.next().trim();
+			JSONArray value = (JSONArray) parser.parse(hosts.getProperty(key)
+					.toString());
+			known_hosts.put(key, (JSONObject) value.get(0));
+		}
+
+		return known_hosts;
+	}
+
+	public static void printConfigOptions(PropertiesConfiguration config, String path_fragment)
+	{
+		Iterator<String> itr = config.getKeys();
+		
+		while(itr.hasNext())
+		{
+			String key = itr.next();
+			
+			if(key.contains(path_fragment))
+			{
+				
+				System.out.println("[OpenSOC] Key: " + key + " -> " + config.getString(key));
+			}
+		}
+
+	}
+	
+	public static void printOptionalSettings(Map<String, String> settings)
+	{
+		for(String setting: settings.keySet())
+		{
+			System.out.println("[OpenSOC] Optional Setting: " + setting + " -> " +settings.get(setting));
+		}
+
+	}
+	
+	public static Map<String, String> getConfigOptions(PropertiesConfiguration config, String path_fragment)
+	{
+		Iterator<String> itr = config.getKeys();
+		Map<String, String> settings = new HashMap<String, String>();
+		
+		while(itr.hasNext())
+		{
+			String key = itr.next();
+			
+			if(key.contains(path_fragment))
+			{
+				String tmp_key = key.replace(path_fragment, "");
+				settings.put(tmp_key, config.getString(key));
+			}
+		}
+
+		return settings;
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/a919cc19/opensoc-streaming/OpenSOC-Common/src/main/java/com/opensoc/index/interfaces/IndexAdapter.java
----------------------------------------------------------------------
diff --git a/opensoc-streaming/OpenSOC-Common/src/main/java/com/opensoc/index/interfaces/IndexAdapter.java b/opensoc-streaming/OpenSOC-Common/src/main/java/com/opensoc/index/interfaces/IndexAdapter.java
index 1f88342..dfdfc8e 100644
--- a/opensoc-streaming/OpenSOC-Common/src/main/java/com/opensoc/index/interfaces/IndexAdapter.java
+++ b/opensoc-streaming/OpenSOC-Common/src/main/java/com/opensoc/index/interfaces/IndexAdapter.java
@@ -1,11 +1,15 @@
 package com.opensoc.index.interfaces;
 
+import java.util.Map;
+
 import org.json.simple.JSONObject;
 
 public interface IndexAdapter {
 
 	boolean initializeConnection(String ip, int port, String cluster_name,
-			String index_name, String document_name, int bulk) throws Exception;
+			String index_name, String document_name, int bulk, String date_format) throws Exception;
 
 	int bulkIndex(JSONObject raw_message);
+
+	void setOptionalSettings(Map<String, String> settings);
 }

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/a919cc19/opensoc-streaming/OpenSOC-Common/src/main/java/com/opensoc/ise/parser/ISEParser.java
----------------------------------------------------------------------
diff --git a/opensoc-streaming/OpenSOC-Common/src/main/java/com/opensoc/ise/parser/ISEParser.java b/opensoc-streaming/OpenSOC-Common/src/main/java/com/opensoc/ise/parser/ISEParser.java
index 7c88ae3..a54f1ce 100644
--- a/opensoc-streaming/OpenSOC-Common/src/main/java/com/opensoc/ise/parser/ISEParser.java
+++ b/opensoc-streaming/OpenSOC-Common/src/main/java/com/opensoc/ise/parser/ISEParser.java
@@ -2,17 +2,20 @@
 package com.opensoc.ise.parser;
 import java.io.*;
 import java.util.*;
+
 import org.json.simple.*;
 
 /**
 * Basic ISE data parser generated by JavaCC. 
 */
 public class ISEParser implements Serializable, ISEParserConstants {
-  private boolean nativeNumbers = false;
+ // private boolean nativeNumbers = false;
 
-  public ISEParser()
-  { //do nothing
-  }
+	private static final long serialVersionUID = -2531656825360044979L;
+
+	public ISEParser()
+	  { //do nothing
+	  }
 
   public ISEParser(String input)
   {
@@ -29,7 +32,8 @@ public class ISEParser implements Serializable, ISEParserConstants {
     return toReturn;
   }
 
-  final public boolean ensureEOF() throws ParseException {
+  @SuppressWarnings("unused")
+final public boolean ensureEOF() throws ParseException {
     switch (jj_nt.kind) {
     case COMMA:
       jj_consume_token(COMMA);
@@ -43,7 +47,8 @@ public class ISEParser implements Serializable, ISEParserConstants {
     throw new Error("Missing return statement in function");
   }
 
-  final public JSONObject innerMap() throws ParseException {
+  @SuppressWarnings({ "unchecked", "unused" })
+final public JSONObject innerMap() throws ParseException {
   final JSONObject json = new JSONObject();
   String key;
   Object value;
@@ -76,7 +81,8 @@ public class ISEParser implements Serializable, ISEParserConstants {
     throw new Error("Missing return statement in function");
   }
 
-  final public JSONObject object() throws ParseException {
+  @SuppressWarnings({ "unused", "unchecked" })
+final public JSONObject object() throws ParseException {
   final JSONObject json = new JSONObject();
   String key;
   Object value;
@@ -105,7 +111,8 @@ public class ISEParser implements Serializable, ISEParserConstants {
     throw new Error("Missing return statement in function");
   }
 
-  final public String objectKey() throws ParseException {
+  @SuppressWarnings("unused")
+final public String objectKey() throws ParseException {
   String k;
     k = string();
     //  System.out.println("key == " + k);
@@ -113,7 +120,8 @@ public class ISEParser implements Serializable, ISEParserConstants {
     throw new Error("Missing return statement in function");
   }
 
-  final public Object value() throws ParseException {
+  @SuppressWarnings({ "unused", "rawtypes" })
+final public Object value() throws ParseException {
   Object x;
   String eof = "EOF";
   Map m = null;
@@ -147,12 +155,14 @@ public class ISEParser implements Serializable, ISEParserConstants {
     throw new Error("Missing return statement in function");
   }
 
-  final public String nullValue() throws ParseException {
+  @SuppressWarnings("unused")
+final public String nullValue() throws ParseException {
     {if (true) return null;}
     throw new Error("Missing return statement in function");
   }
 
-  final public String tagString() throws ParseException {
+  @SuppressWarnings("unused")
+final public String tagString() throws ParseException {
   String output = "(tag=0)";
     jj_consume_token(TAG);
     jj_consume_token(STRING_BODY);
@@ -160,19 +170,22 @@ public class ISEParser implements Serializable, ISEParserConstants {
     throw new Error("Missing return statement in function");
   }
 
-  final public String blankValue() throws ParseException {
+  @SuppressWarnings("unused")
+final public String blankValue() throws ParseException {
     {if (true) return null;}
     throw new Error("Missing return statement in function");
   }
 
-  final public String string() throws ParseException {
+  @SuppressWarnings("unused")
+final public String string() throws ParseException {
   String s;
     jj_consume_token(STRING_BODY);
     {if (true) return token.image.trim();}
     throw new Error("Missing return statement in function");
   }
 
-  final public String braced_string() throws ParseException {
+  @SuppressWarnings("unused")
+final public String braced_string() throws ParseException {
   String s;
     jj_consume_token(BRACED_STRING);
     //  System.out.println("braced == " + token.image);
@@ -471,7 +484,9 @@ public class ISEParser implements Serializable, ISEParserConstants {
     throw generateParseException();
   }
 
-  static private final class LookaheadSuccess extends java.lang.Error { }
+  static private final class LookaheadSuccess extends java.lang.Error {
+
+	private static final long serialVersionUID = -5724812746511794505L; }
   final private LookaheadSuccess jj_ls = new LookaheadSuccess();
   private boolean jj_scan_token(int kind) {
     if (jj_scanpos == jj_lastpos) {

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/a919cc19/opensoc-streaming/OpenSOC-Common/src/main/java/com/opensoc/ise/parser/ISEParserTokenManager.java
----------------------------------------------------------------------
diff --git a/opensoc-streaming/OpenSOC-Common/src/main/java/com/opensoc/ise/parser/ISEParserTokenManager.java b/opensoc-streaming/OpenSOC-Common/src/main/java/com/opensoc/ise/parser/ISEParserTokenManager.java
index adf9401..9999452 100644
--- a/opensoc-streaming/OpenSOC-Common/src/main/java/com/opensoc/ise/parser/ISEParserTokenManager.java
+++ b/opensoc-streaming/OpenSOC-Common/src/main/java/com/opensoc/ise/parser/ISEParserTokenManager.java
@@ -1,8 +1,5 @@
 /* Generated By:JavaCC: Do not edit this line. ISEParserTokenManager.java */
 package com.opensoc.ise.parser;
-import java.io.*;
-import java.util.*;
-import org.json.simple.*;
 
 /** Token Manager. */
 class ISEParserTokenManager implements ISEParserConstants

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/a919cc19/opensoc-streaming/OpenSOC-Common/src/main/java/com/opensoc/json/serialization/JSONEncoderHelper.java
----------------------------------------------------------------------
diff --git a/opensoc-streaming/OpenSOC-Common/src/main/java/com/opensoc/json/serialization/JSONEncoderHelper.java b/opensoc-streaming/OpenSOC-Common/src/main/java/com/opensoc/json/serialization/JSONEncoderHelper.java
index 38ad375..b388397 100644
--- a/opensoc-streaming/OpenSOC-Common/src/main/java/com/opensoc/json/serialization/JSONEncoderHelper.java
+++ b/opensoc-streaming/OpenSOC-Common/src/main/java/com/opensoc/json/serialization/JSONEncoderHelper.java
@@ -19,6 +19,7 @@ package com.opensoc.json.serialization;
 import java.io.DataOutputStream;
 import java.io.IOException;
 import java.util.Iterator;
+
 import org.apache.commons.configuration.Configuration;
 import org.json.simple.JSONObject;
 
@@ -68,6 +69,7 @@ public class JSONEncoderHelper {
 
 	}
 
+	@SuppressWarnings({ "rawtypes", "unchecked" })
 	public static JSONObject getJSON(Configuration config) {
 
 		JSONObject output = new JSONObject();

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/a919cc19/opensoc-streaming/OpenSOC-Common/src/main/java/com/opensoc/json/serialization/JSONKafkaSerializer.java
----------------------------------------------------------------------
diff --git a/opensoc-streaming/OpenSOC-Common/src/main/java/com/opensoc/json/serialization/JSONKafkaSerializer.java b/opensoc-streaming/OpenSOC-Common/src/main/java/com/opensoc/json/serialization/JSONKafkaSerializer.java
index 08f3b44..c08444f 100644
--- a/opensoc-streaming/OpenSOC-Common/src/main/java/com/opensoc/json/serialization/JSONKafkaSerializer.java
+++ b/opensoc-streaming/OpenSOC-Common/src/main/java/com/opensoc/json/serialization/JSONKafkaSerializer.java
@@ -17,30 +17,32 @@
 
 package com.opensoc.json.serialization;
 
+import static com.opensoc.json.serialization.JSONDecoderHelper.getObject;
+import static com.opensoc.json.serialization.JSONEncoderHelper.putBoolean;
+import static com.opensoc.json.serialization.JSONEncoderHelper.putNull;
+import static com.opensoc.json.serialization.JSONEncoderHelper.putNumber;
+import static com.opensoc.json.serialization.JSONEncoderHelper.putString;
+
 import java.io.BufferedReader;
 import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
 import java.io.DataInputStream;
 import java.io.DataOutputStream;
-import java.io.FileNotFoundException;
 import java.io.FileReader;
 import java.io.IOException;
-import java.io.Reader;
 import java.util.Iterator;
 import java.util.Map;
 import java.util.Map.Entry;
 
+import kafka.serializer.Decoder;
+import kafka.serializer.Encoder;
+import kafka.utils.VerifiableProperties;
+
 import org.json.simple.JSONArray;
 import org.json.simple.JSONObject;
 import org.json.simple.parser.JSONParser;
 import org.json.simple.parser.ParseException;
 
-import kafka.serializer.Decoder;
-import kafka.serializer.Encoder;
-import kafka.utils.VerifiableProperties;
-import static com.opensoc.json.serialization.JSONEncoderHelper.*;
-import static com.opensoc.json.serialization.JSONDecoderHelper.*;
-
 /**
  * JSON Serailization class for kafka. Implements kafka Encoder and Decoder
  * String, JSONObject, Number, Boolean,JSONObject.NULL JSONArray

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/a919cc19/opensoc-streaming/OpenSOC-Common/src/main/java/com/opensoc/parser/interfaces/MessageParser.java
----------------------------------------------------------------------
diff --git a/opensoc-streaming/OpenSOC-Common/src/main/java/com/opensoc/parser/interfaces/MessageParser.java b/opensoc-streaming/OpenSOC-Common/src/main/java/com/opensoc/parser/interfaces/MessageParser.java
index 700d3ab..b71e4f9 100644
--- a/opensoc-streaming/OpenSOC-Common/src/main/java/com/opensoc/parser/interfaces/MessageParser.java
+++ b/opensoc-streaming/OpenSOC-Common/src/main/java/com/opensoc/parser/interfaces/MessageParser.java
@@ -5,6 +5,7 @@ import org.json.simple.JSONObject;
 public interface MessageParser {
 	
 	void initializeParser();
+	void init();
 	JSONObject parse(byte[] raw_message);
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/a919cc19/opensoc-streaming/OpenSOC-Common/src/main/java/com/opensoc/pcap/PacketInfo.java
----------------------------------------------------------------------
diff --git a/opensoc-streaming/OpenSOC-Common/src/main/java/com/opensoc/pcap/PacketInfo.java b/opensoc-streaming/OpenSOC-Common/src/main/java/com/opensoc/pcap/PacketInfo.java
index 151e3d3..804387d 100644
--- a/opensoc-streaming/OpenSOC-Common/src/main/java/com/opensoc/pcap/PacketInfo.java
+++ b/opensoc-streaming/OpenSOC-Common/src/main/java/com/opensoc/pcap/PacketInfo.java
@@ -1,6 +1,7 @@
 package com.opensoc.pcap;
 
 import java.text.MessageFormat;
+import org.apache.log4j.Logger;
 
 import org.krakenapps.pcap.decoder.ip.Ipv4Packet;
 import org.krakenapps.pcap.decoder.tcp.TcpPacket;
@@ -9,6 +10,9 @@ import org.krakenapps.pcap.file.GlobalHeader;
 import org.krakenapps.pcap.packet.PacketHeader;
 import org.krakenapps.pcap.packet.PcapPacket;
 
+import com.opensoc.pcap.Constants;
+import com.opensoc.pcap.PcapUtils;
+
 /**
  * The Class PacketInfo.
  * 
@@ -47,6 +51,9 @@ public class PacketInfo {
   /** The Constant udpHeaderJsonTemplateSB. */
   private static final StringBuffer udpHeaderJsonTemplateSB = new StringBuffer();
 
+  /** The Constant LOG. */
+  private static final Logger LOG = Logger.getLogger(PacketInfo.class);
+  
   static {
     globalHeaderJsonTemplateSB.append("<\"global_header\":<\"pcap_id\":\"").append("{0}").append('"');
     globalHeaderJsonTemplateSB.append(",\"inc_len\":").append("{1}");
@@ -232,6 +239,28 @@ public class PacketInfo {
   }
 
   /**
+   * Gets the short key
+   * 
+   * 
+   * @return the short key
+   */
+  public String getShortKey() {
+	int sourcePort = 0;
+	int destinationPort = 0;
+	if(Constants.PROTOCOL_UDP == ipv4Packet.getProtocol()) {
+		sourcePort = udpPacket.getSourcePort();
+		destinationPort = udpPacket.getDestinationPort();
+	} else if (Constants.PROTOCOL_TCP == ipv4Packet.getProtocol()) {
+		sourcePort = tcpPacket.getSourcePort();
+		destinationPort = tcpPacket.getDestinationPort();
+	}
+	  
+	return PcapUtils.getShortSessionKey(ipv4Packet.getSourceAddress().getHostAddress(), ipv4Packet.getDestinationAddress().getHostAddress(),
+	    ipv4Packet.getProtocol(), sourcePort, destinationPort);
+			 
+  }
+  
+  /**
    * Gets the json doc.
    * 
    * 
@@ -260,6 +289,7 @@ public class PacketInfo {
    */
   private String getJsonDocUsingSBAppend() {
 
+	
     StringBuffer jsonSb = new StringBuffer(1024);
 
     // global header
@@ -373,29 +403,52 @@ public class PacketInfo {
    */
   private String getJsonIndexDocUsingSBAppend() {
 
-    StringBuffer jsonSb = new StringBuffer(175);
+	Long ts_micro = getPacketTimeInNanos() / 1000L;
+	StringBuffer jsonSb = new StringBuffer(175);
 
-    jsonSb.append("{\"pcap_id\":\"").append(getKey());
+	jsonSb.append("{\"pcap_id\":\"").append(getShortKey());
     jsonSb.append("\",\"ip_protocol\":").append(ipv4Packet.getProtocol());
+    jsonSb.append(",\"ip_id\":").append(ipv4Packet.getId());
+    jsonSb.append(",\"frag_offset\":").append(ipv4Packet.getFragmentOffset());
+    jsonSb.append(",\"ts_micro\":").append(ts_micro);
+
 
     // tcp header
     if (tcpPacket != null) {
-      jsonSb.append(",\"src_addr\":\"").append(tcpPacket.getSourceAddress().getHostAddress());
-      jsonSb.append("\",\"src_port\":").append(tcpPacket.getSourcePort());
-      jsonSb.append(",\"dst_addr\":\"").append(tcpPacket.getDestinationAddress().getHostAddress());
-      jsonSb.append("\",\"dst_port\":").append(tcpPacket.getDestinationPort());
+      jsonSb.append(",\"ip_src_addr\":\"").append(tcpPacket.getSourceAddress().getHostAddress());
+      jsonSb.append("\",\"ip_src_port\":").append(tcpPacket.getSourcePort());
+      jsonSb.append(",\"ip_dst_addr\":\"").append(tcpPacket.getDestinationAddress().getHostAddress());
+      jsonSb.append("\",\"ip_dst_port\":").append(tcpPacket.getDestinationPort());
     }
 
     // udp headers
     if (udpPacket != null) {
-      jsonSb.append(",\"src_addr\":\"").append(udpPacket.getSource().getAddress().getHostAddress());
-      jsonSb.append("\",\"src_port\":").append(udpPacket.getSourcePort());
-      jsonSb.append(",\"dst_addr\":\"").append(udpPacket.getDestination().getAddress().getHostAddress());
-      jsonSb.append("\",\"dst_port\":").append(udpPacket.getDestinationPort());
+      jsonSb.append(",\"ip_src_addr\":\"").append(udpPacket.getSource().getAddress().getHostAddress());
+      jsonSb.append("\",\"ip_src_port\":").append(udpPacket.getSourcePort());
+      jsonSb.append(",\"ip_dst_addr\":\"").append(udpPacket.getDestination().getAddress().getHostAddress());
+      jsonSb.append("\",\"ip_dst_port\":").append(udpPacket.getDestinationPort());
     }
 
     jsonSb.append('}');
 
     return jsonSb.toString();
   }
+  
+  public long getPacketTimeInNanos()
+  {
+	  if ( getGlobalHeader().getMagicNumber() == 0xa1b2c3d4 || getGlobalHeader().getMagicNumber() == 0xd4c3b2a1 )
+	  {
+		  //Time is in micro assemble as nano
+		  LOG.info("Times are in micro according to the magic number");
+		  return getPacketHeader().getTsSec() * 1000000000L + getPacketHeader().getTsUsec() * 1000L ; 
+	  }
+	  else if ( getGlobalHeader().getMagicNumber() == 0xa1b23c4d || getGlobalHeader().getMagicNumber() == 0x4d3cb2a1 ) {
+		//Time is in nano assemble as nano
+		  LOG.info("Times are in nano according to the magic number");
+		  return getPacketHeader().getTsSec() * 1000000000L + getPacketHeader().getTsUsec() ; 
+	  }
+	  //Default assume time is in micro assemble as nano
+	  LOG.warn("Unknown magic number. Defaulting to micro");
+	  return getPacketHeader().getTsSec() * 1000000000L + getPacketHeader().getTsUsec() * 1000L ;  
+  }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/a919cc19/opensoc-streaming/OpenSOC-Common/src/main/java/com/opensoc/pcap/PcapByteOutputStream.java
----------------------------------------------------------------------
diff --git a/opensoc-streaming/OpenSOC-Common/src/main/java/com/opensoc/pcap/PcapByteOutputStream.java b/opensoc-streaming/OpenSOC-Common/src/main/java/com/opensoc/pcap/PcapByteOutputStream.java
new file mode 100644
index 0000000..8a5ad18
--- /dev/null
+++ b/opensoc-streaming/OpenSOC-Common/src/main/java/com/opensoc/pcap/PcapByteOutputStream.java
@@ -0,0 +1,288 @@
+// $codepro.audit.disable explicitThisUsage, lossOfPrecisionInCast
+package com.opensoc.pcap;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.nio.BufferUnderflowException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.log4j.Logger;
+import org.krakenapps.pcap.PcapOutputStream;
+import org.krakenapps.pcap.file.GlobalHeader;
+import org.krakenapps.pcap.packet.PacketHeader;
+import org.krakenapps.pcap.packet.PcapPacket;
+import org.krakenapps.pcap.util.Buffer;
+
+// TODO: Auto-generated Javadoc
+/**
+ * The Class PcapByteOutputStream.
+ * 
+ * @author sheetal
+ * @version $Revision: 1.0 $
+ */
+public class PcapByteOutputStream implements PcapOutputStream {
+
+  /** The Constant LOG. */
+  private static final Logger LOG = Logger
+      .getLogger(PcapByteOutputStream.class);
+
+  /** The Constant MAX_CACHED_PACKET_NUMBER. */
+  private static final int MAX_CACHED_PACKET_NUMBER = 1000;
+
+  /** The cached packet num. */
+  private int cachedPacketNum = 0; // NOPMD by sheetal on 1/29/14 2:34 PM
+
+  /** The baos. */
+  private ByteArrayOutputStream baos; // NOPMD by sheetal on 1/29/14 2:34 PM
+
+  /** The list. */
+  private List<Byte> list; // NOPMD by sheetal on 1/29/14 2:34 PM
+
+  /**
+   * Instantiates a new pcap byte output stream.
+   * 
+   * @param baos
+   *          the baos
+   */
+  public PcapByteOutputStream(ByteArrayOutputStream baos) {
+    this.baos = baos;
+    list = new ArrayList<Byte>();
+    createGlobalHeader();
+  }
+
+  /**
+   * Instantiates a new pcap byte output stream.
+   * 
+   * @param baos
+   *          the baos
+   * @param header
+   *          the header
+   */
+  public PcapByteOutputStream(ByteArrayOutputStream baos, GlobalHeader header) {
+    this.baos = baos;
+    list = new ArrayList<Byte>();
+    copyGlobalHeader(header);
+  }
+
+  /**
+   * Creates the global header.
+   */
+  private void createGlobalHeader() {
+    /* magic number(swapped) */
+    list.add((byte) 0xd4);
+    list.add((byte) 0xc3);
+    list.add((byte) 0xb2);
+    list.add((byte) 0xa1);
+
+    /* major version number */
+    list.add((byte) 0x02);
+    list.add((byte) 0x00);
+
+    /* minor version number */
+    list.add((byte) 0x04);
+    list.add((byte) 0x00);
+
+    /* GMT to local correction */
+    list.add((byte) 0x00);
+    list.add((byte) 0x00);
+    list.add((byte) 0x00);
+    list.add((byte) 0x00);
+
+    /* accuracy of timestamps */
+    list.add((byte) 0x00);
+    list.add((byte) 0x00);
+    list.add((byte) 0x00);
+    list.add((byte) 0x00);
+
+    /* max length of captured packets, in octets */
+    list.add((byte) 0xff);
+    list.add((byte) 0xff);
+    list.add((byte) 0x00);
+    list.add((byte) 0x00);
+
+    /* data link type(ethernet) */
+    list.add((byte) 0x01);
+    list.add((byte) 0x00);
+    list.add((byte) 0x00);
+    list.add((byte) 0x00);
+  }
+
+  /**
+   * Copy global header.
+   * 
+   * @param header
+   *          the header
+   */
+  private void copyGlobalHeader(GlobalHeader header) {
+    final byte[] magicNumber = intToByteArray(header.getMagicNumber());
+    final byte[] majorVersion = shortToByteArray(header.getMajorVersion());
+    final byte[] minorVersion = shortToByteArray(header.getMinorVersion());
+    final byte[] zone = intToByteArray(header.getThiszone());
+    final byte[] sigFigs = intToByteArray(header.getSigfigs());
+    final byte[] snapLen = intToByteArray(header.getSnaplen());
+    final byte[] network = intToByteArray(header.getNetwork());
+
+    list.add(magicNumber[0]);
+    list.add(magicNumber[1]);
+    list.add(magicNumber[2]);
+    list.add(magicNumber[3]);
+
+    list.add(majorVersion[1]);
+    list.add(majorVersion[0]);
+
+    list.add(minorVersion[1]);
+    list.add(minorVersion[0]);
+
+    list.add(zone[3]);
+    list.add(zone[2]);
+    list.add(zone[1]);
+    list.add(zone[0]);
+
+    list.add(sigFigs[3]);
+    list.add(sigFigs[2]);
+    list.add(sigFigs[1]);
+    list.add(sigFigs[0]);
+
+    list.add(snapLen[3]);
+    list.add(snapLen[2]);
+    list.add(snapLen[1]);
+    list.add(snapLen[0]);
+
+    list.add(network[3]);
+    list.add(network[2]);
+    list.add(network[1]);
+    list.add(network[0]);
+  }
+
+  /*
+   * (non-Javadoc)
+   * 
+   * @see org.krakenapps.pcap.PcapOutputStream#write(org.krakenapps.pcap.packet
+   * .PcapPacket)
+   */
+  /**
+   * Method write.
+   * 
+   * @param packet
+   *          PcapPacket
+   * 
+   * 
+   * @throws IOException
+   *           * @see org.krakenapps.pcap.PcapOutputStream#write(PcapPacket) * @see
+   *           org.krakenapps.pcap.PcapOutputStream#write(PcapPacket)
+   */
+ 
+  public void write(PcapPacket packet) throws IOException {
+    PacketHeader packetHeader = packet.getPacketHeader();
+
+    int tsSec = packetHeader.getTsSec();
+    int tsUsec = packetHeader.getTsUsec();
+    int inclLen = packetHeader.getInclLen();
+    int origLen = packetHeader.getOrigLen();
+
+    addInt(tsSec);
+    addInt(tsUsec);
+    addInt(inclLen);
+    addInt(origLen);
+
+    Buffer payload = packet.getPacketData();
+
+    try {
+      payload.mark();
+      while (true) {
+        list.add(payload.get());
+      }
+    } catch (BufferUnderflowException e) {
+      //LOG.debug("Ignorable exception while writing packet", e);
+      payload.reset();
+    }
+
+    cachedPacketNum++;
+    if (cachedPacketNum == MAX_CACHED_PACKET_NUMBER) {
+      flush();
+    }
+  }
+
+  /**
+   * Adds the int.
+   * 
+   * @param number
+   *          the number
+   */
+  private void addInt(int number) {
+    list.add((byte) (number & 0xff));
+    list.add((byte) ((number & 0xff00) >> 8));
+    list.add((byte) ((number & 0xff0000) >> 16));
+    list.add((byte) ((number & 0xff000000) >> 24));
+  }
+
+  /**
+   * Int to byte array.
+   * 
+   * @param number
+   *          the number
+   * 
+   * @return the byte[]
+   */
+  private byte[] intToByteArray(int number) {
+    return new byte[] { (byte) (number >>> 24), (byte) (number >>> 16),
+        (byte) (number >>> 8), (byte) number };
+  }
+
+  /**
+   * Short to byte array.
+   * 
+   * @param number
+   *          the number
+   * 
+   * @return the byte[]
+   */
+  private byte[] shortToByteArray(short number) {
+    return new byte[] { (byte) (number >>> 8), (byte) number };
+  }
+
+  /*
+   * (non-Javadoc)
+   * 
+   * @see org.krakenapps.pcap.PcapOutputStream#flush()
+   */
+  /**
+   * Method flush.
+   * 
+   * 
+   * @throws IOException
+   *           * @see org.krakenapps.pcap.PcapOutputStream#flush() * @see
+   *           org.krakenapps.pcap.PcapOutputStream#flush()
+   */
+ 
+  public void flush() throws IOException {
+    byte[] fileBinary = new byte[list.size()];
+    for (int i = 0; i < fileBinary.length; i++) {
+      fileBinary[i] = list.get(i);
+    }
+
+    list.clear();
+    baos.write(fileBinary);
+    cachedPacketNum = 0;
+  }
+
+  /*
+   * (non-Javadoc)
+   * 
+   * @see org.krakenapps.pcap.PcapOutputStream#close()
+   */
+  /**
+   * Method close.
+   * 
+   * 
+   * @throws IOException
+   *           * @see org.krakenapps.pcap.PcapOutputStream#close() * @see
+   *           org.krakenapps.pcap.PcapOutputStream#close()
+   */
+ 
+  public void close() throws IOException {
+    flush();
+    baos.close(); // $codepro.audit.disable closeInFinally
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/a919cc19/opensoc-streaming/OpenSOC-Common/src/main/java/com/opensoc/pcap/PcapMerger.java
----------------------------------------------------------------------
diff --git a/opensoc-streaming/OpenSOC-Common/src/main/java/com/opensoc/pcap/PcapMerger.java b/opensoc-streaming/OpenSOC-Common/src/main/java/com/opensoc/pcap/PcapMerger.java
new file mode 100644
index 0000000..392523b
--- /dev/null
+++ b/opensoc-streaming/OpenSOC-Common/src/main/java/com/opensoc/pcap/PcapMerger.java
@@ -0,0 +1,245 @@
+ package com.opensoc.pcap;
+
+import java.io.ByteArrayOutputStream;
+import java.io.EOFException;
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.log4j.Logger;
+
+import org.krakenapps.pcap.packet.PcapPacket;
+import org.krakenapps.pcap.file.GlobalHeader;
+
+// TODO: Auto-generated Javadoc
+/**
+ * The Class PcapMerger.
+ * 
+ * @author sheetal
+ * @version $Revision: 1.0 $
+ */
+public final class PcapMerger {
+
+  /** The Constant LOG. */
+  private static final Logger LOG = Logger.getLogger(PcapMerger.class);
+  
+  /** The comparator for PcapPackets */
+  private static PcapPacketComparator PCAP_PACKET_COMPARATOR = new PcapPacketComparator();
+
+  /**
+   * Instantiates a new pcap merger.
+   */
+  private PcapMerger() { // $codepro.audit.disable emptyMethod
+  }
+
+  /**
+   * Merge two pcap byte arrays.
+   * 
+   * @param baos
+   *          the baos
+   * @param pcaps
+   *          the pcaps
+   * 
+   * @throws IOException
+   *           if there is no byte array, no access permission, or other io
+   *           related problems.
+   */
+  // public static void merge(byte[] to, byte[] from) throws IOException {
+  // PcapByteInputStream is = null;
+  // PcapByteOutputStream os = null;
+  // ByteArrayOutputStream baos = null;
+  // try {
+  // is = new PcapByteInputStream(from);
+  // baos = new ByteArrayOutputStream();
+  // os = new PcapByteOutputStream(baos, is.getGlobalHeader());
+  //
+  // writePacket(is, os);
+  // } finally {
+  // closeInput(is);
+  // if (baos != null) {
+  // baos.close();
+  // }
+  // closeOutput(os);
+  // }
+  // }
+
+  public static void merge(ByteArrayOutputStream baos, List<byte[]> pcaps)
+      throws IOException {
+    PcapByteInputStream is = null;
+    PcapByteOutputStream os = null;
+    ByteArrayOutputStream unsortedBaos = new ByteArrayOutputStream();
+    
+    try {
+      int i = 1;
+      for (byte[] pcap : pcaps) {
+        is = new PcapByteInputStream(pcap);
+        if (i == 1) {
+          os = new PcapByteOutputStream(unsortedBaos, is.getGlobalHeader());
+        }
+
+        writePacket(is, os);
+        i++;
+        closeInput(is);
+      }
+    } finally {
+      if (unsortedBaos != null) {
+        unsortedBaos.close();
+      }
+      closeOutput(os);
+      sort(baos, unsortedBaos.toByteArray());
+    }
+  }
+
+  /**
+   * Merge byte array1 with byte array2, and write to output byte array. It
+   * doesn't hurt original pcap dump byte arrays.
+   * 
+   * @param baos
+   *          the baos
+   * @param pcaps
+   *          the pcaps
+   * 
+   * @throws IOException
+   *           if there are no source byte arrays, have no read and/or write
+   *           permissions, or anything else.
+   */
+  public static void merge(ByteArrayOutputStream baos, byte[]... pcaps) // $codepro.audit.disable
+                                                                        // overloadedMethods
+      throws IOException {
+    merge(baos, Arrays.asList(pcaps));
+
+  }
+  
+  /**
+   * Sort the potentially unsorted byte array according to the timestamp
+   * in the packet header
+   * 
+   * @param unsortedBytes
+   * 	a byte array of a pcap file
+   * 
+   * @return byte array of a pcap file with packets in cronological order
+   * 
+   * @throws IOException
+   * 	if there are no source byte arrays, have no read and or write 
+   * 	permission, or anything else.
+   */
+  private static void sort(ByteArrayOutputStream baos, byte[] unsortedBytes) throws IOException {
+	  PcapByteInputStream pcapIs = new PcapByteInputStream(unsortedBytes);
+	  PcapByteOutputStream pcapOs = new PcapByteOutputStream(baos, pcapIs.getGlobalHeader());
+	  PcapPacket packet;
+	  ArrayList<PcapPacket> packetList = new ArrayList<PcapPacket>();
+	  
+	  try {
+		  while (true) {
+			  packet = pcapIs.getPacket();
+			  if (packet == null)
+				  break;
+			  packetList.add(packet);
+			  LOG.debug("Presort packet: " + packet.getPacketHeader().toString());
+		  }
+	  } catch (EOFException e) {
+		  //LOG.debug("Ignoreable exception in sort", e);
+	  }
+	  
+	  Collections.sort(packetList, PCAP_PACKET_COMPARATOR);
+	  for (PcapPacket p : packetList) {
+		  pcapOs.write(p);
+		  LOG.debug("Postsort packet: " + p.getPacketHeader().toString());
+	  }
+	  pcapOs.close();  
+  }
+  
+  /**
+   * Write packet.
+   * 
+   * @param is
+   *          the is
+   * @param os
+   *          the os
+   * 
+   * @throws IOException
+   *           Signals that an I/O exception has occurred.
+   */
+  private static void writePacket(PcapByteInputStream is,
+      PcapByteOutputStream os) throws IOException {
+    PcapPacket packet = null;
+    try {
+      while (true) {
+        packet = is.getPacket();
+        if (packet == null) {
+          break;
+        }
+        os.write(packet);
+      }
+    } catch (EOFException e) {
+      //LOG.debug("Ignorable exception in writePacket", e);
+    }
+
+  }
+
+  /**
+   * Close input.
+   * 
+   * @param is
+   *          the is
+   */
+  private static void closeInput(PcapByteInputStream is) {
+    if (is == null) {
+      return;
+    }
+    try {
+      is.close(); // $codepro.audit.disable closeInFinally
+    } catch (IOException e) {
+      LOG.error("Failed to close input stream", e);
+    }
+  }
+
+  /**
+   * Close output.
+   * 
+   * @param os
+   *          the os
+   */
+  private static void closeOutput(PcapByteOutputStream os) {
+    if (os == null) {
+      return;
+    }
+    try {
+      os.close();
+    } catch (IOException e) {
+      LOG.error("Failed to close output stream", e);
+
+    }
+  }
+
+  /**
+   * The main method.
+   * 
+   * @param args
+   *          the arguments
+   * 
+   * @throws IOException
+   *           Signals that an I/O exception has occurred.
+   */
+  public static void main(String[] args) throws IOException {
+    byte[] b1 = FileUtils.readFileToByteArray(new File(
+        "/Users/sheetal/Downloads/constructedTcpDump.1.pcap"));
+    byte[] b2 = FileUtils.readFileToByteArray(new File(
+        "/Users/sheetal/Downloads/constructedTcpDump.2.pcap"));
+    byte[] b3 = FileUtils.readFileToByteArray(new File(
+        "/Users/sheetal/Downloads/constructedTcpDump.3.pcap"));
+
+    ByteArrayOutputStream boas = new ByteArrayOutputStream(); // $codepro.audit.disable
+                                                              // closeWhereCreated
+    PcapMerger.merge(boas, b1, b2, b3);
+
+    FileUtils.writeByteArrayToFile(new File(
+        "/Users/sheetal/Downloads/constructedTcpDump.automerged.1.2.pcap"),
+        boas.toByteArray(), false);
+
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/a919cc19/opensoc-streaming/OpenSOC-Common/src/main/java/com/opensoc/pcap/PcapPacketComparator.java
----------------------------------------------------------------------
diff --git a/opensoc-streaming/OpenSOC-Common/src/main/java/com/opensoc/pcap/PcapPacketComparator.java b/opensoc-streaming/OpenSOC-Common/src/main/java/com/opensoc/pcap/PcapPacketComparator.java
new file mode 100644
index 0000000..29a2414
--- /dev/null
+++ b/opensoc-streaming/OpenSOC-Common/src/main/java/com/opensoc/pcap/PcapPacketComparator.java
@@ -0,0 +1,22 @@
+package com.opensoc.pcap;
+
+import java.util.Comparator;
+
+import org.apache.log4j.Logger;
+
+import org.krakenapps.pcap.packet.PcapPacket;
+
+public class PcapPacketComparator implements Comparator<PcapPacket> {
+
+	/** The Constant LOG. */
+	private static final Logger LOG = Logger.getLogger(PcapMerger.class);
+	
+	public int compare(PcapPacket p1, PcapPacket p2) {
+
+		Long p1time = new Long(p1.getPacketHeader().getTsSec()) * 1000000L + new Long(p1.getPacketHeader().getTsUsec());
+		Long p2time = new Long(p2.getPacketHeader().getTsSec()) * 1000000L + new Long(p2.getPacketHeader().getTsUsec());
+		Long delta = p1time - p2time;
+		LOG.debug("p1time: " + p1time.toString() + " p2time: " + p2time.toString() + " delta: " + delta.toString());
+		return delta.intValue();
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/a919cc19/opensoc-streaming/OpenSOC-Common/src/main/java/com/opensoc/pcap/PcapParser.java
----------------------------------------------------------------------
diff --git a/opensoc-streaming/OpenSOC-Common/src/main/java/com/opensoc/pcap/PcapParser.java b/opensoc-streaming/OpenSOC-Common/src/main/java/com/opensoc/pcap/PcapParser.java
new file mode 100644
index 0000000..abc0873
--- /dev/null
+++ b/opensoc-streaming/OpenSOC-Common/src/main/java/com/opensoc/pcap/PcapParser.java
@@ -0,0 +1,183 @@
+package com.opensoc.pcap;
+
+import java.io.EOFException;
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.log4j.Logger;
+import org.krakenapps.pcap.decoder.ethernet.EthernetDecoder;
+import org.krakenapps.pcap.decoder.ethernet.EthernetType;
+import org.krakenapps.pcap.decoder.ip.IpDecoder;
+import org.krakenapps.pcap.decoder.ip.Ipv4Packet;
+import org.krakenapps.pcap.decoder.tcp.TcpPacket;
+import org.krakenapps.pcap.decoder.udp.UdpPacket;
+import org.krakenapps.pcap.file.GlobalHeader;
+import org.krakenapps.pcap.packet.PacketHeader;
+import org.krakenapps.pcap.packet.PcapPacket;
+import org.krakenapps.pcap.util.Buffer;
+
+// TODO: Auto-generated Javadoc
+/**
+ * The Class PcapParser.
+ * 
+ * @author sheetal
+ * @version $Revision: 1.0 $
+ */
+public final class PcapParser {
+
+  /** The Constant LOG. */
+  private static final Logger LOG = Logger.getLogger(PcapParser.class);
+
+  /** The ETHERNET_DECODER. */
+  private static final EthernetDecoder ETHERNET_DECODER = new EthernetDecoder();
+
+  /** The ip decoder. */
+  private static final IpDecoder IP_DECODER = new IpDecoder();
+
+  // /** The tcp decoder. */
+  // private static final TcpDecoder TCP_DECODER = new TcpDecoder(new
+  // TcpPortProtocolMapper());
+  //
+  // /** The udp decoder. */
+  // private static final UdpDecoder UDP_DECODER = new UdpDecoder(new
+  // UdpPortProtocolMapper());
+
+  static {
+    // IP_DECODER.register(InternetProtocol.TCP, TCP_DECODER);
+    // IP_DECODER.register(InternetProtocol.UDP, UDP_DECODER);
+    ETHERNET_DECODER.register(EthernetType.IPV4, IP_DECODER);
+  }
+
+  /**
+   * Instantiates a new pcap parser.
+   */
+  private PcapParser() { // $codepro.audit.disable emptyMethod
+
+  }
+
+  /**
+   * Parses the.
+   * 
+   * @param tcpdump
+   *          the tcpdump
+   * @return the list * @throws IOException Signals that an I/O exception has
+   *         occurred. * @throws IOException * @throws IOException * @throws
+   *         IOException
+   * @throws IOException
+   *           Signals that an I/O exception has occurred.
+   */
+  public static List<PacketInfo> parse(byte[] tcpdump) throws IOException {
+    List<PacketInfo> packetInfoList = new ArrayList<PacketInfo>();
+
+    PcapByteInputStream pcapByteInputStream = new PcapByteInputStream(tcpdump);
+
+    GlobalHeader globalHeader = pcapByteInputStream.getGlobalHeader();
+    while (true) {
+      try
+
+      {
+        PcapPacket packet = pcapByteInputStream.getPacket();
+        // int packetCounter = 0;
+        // PacketHeader packetHeader = null;
+        // Ipv4Packet ipv4Packet = null;
+        TcpPacket tcpPacket = null;
+        UdpPacket udpPacket = null;
+        // Buffer packetDataBuffer = null;
+        int sourcePort = 0;
+        int destinationPort = 0;
+
+        // LOG.trace("Got packet # " + ++packetCounter);
+
+        // LOG.trace(packet.getPacketData());
+        ETHERNET_DECODER.decode(packet);
+
+        PacketHeader packetHeader = packet.getPacketHeader();
+        Ipv4Packet ipv4Packet = Ipv4Packet.parse(packet.getPacketData());
+
+        if (ipv4Packet.getProtocol() == Constants.PROTOCOL_TCP) {
+          tcpPacket = TcpPacket.parse(ipv4Packet);
+
+        }
+
+        if (ipv4Packet.getProtocol() == Constants.PROTOCOL_UDP) {
+
+          Buffer packetDataBuffer = ipv4Packet.getData();
+          sourcePort = packetDataBuffer.getUnsignedShort();
+          destinationPort = packetDataBuffer.getUnsignedShort();
+
+          udpPacket = new UdpPacket(ipv4Packet, sourcePort, destinationPort);
+
+          udpPacket.setLength(packetDataBuffer.getUnsignedShort());
+          udpPacket.setChecksum(packetDataBuffer.getUnsignedShort());
+          packetDataBuffer.discardReadBytes();
+          udpPacket.setData(packetDataBuffer);
+        }
+
+        packetInfoList.add(new PacketInfo(globalHeader, packetHeader, packet,
+            ipv4Packet, tcpPacket, udpPacket));
+      } catch (NegativeArraySizeException ignored) {
+        LOG.debug("Ignorable exception while parsing packet.", ignored);
+      } catch (EOFException eof) { // $codepro.audit.disable logExceptions
+        // Ignore exception and break
+        break;
+      }
+    }
+    return packetInfoList;
+  }
+
+  /**
+   * The main method.
+   * 
+   * @param args
+   *          the arguments
+   * @throws IOException
+   *           Signals that an I/O exception has occurred.
+   * @throws InterruptedException
+   *           the interrupted exception
+   */
+  public static void main(String[] args) throws IOException,
+      InterruptedException {
+
+    double totalIterations = 1000000;
+    double parallelism = 64;
+    double targetEvents = 1000000;
+
+    File fin = new File("/Users/sheetal/Downloads/udp.pcap");
+    File fout = new File(fin.getAbsolutePath() + ".parsed");
+    byte[] pcapBytes = FileUtils.readFileToByteArray(fin);
+    long startTime = System.currentTimeMillis();
+    for (int i = 0; i < totalIterations; i++) {
+      List<PacketInfo> list = parse(pcapBytes);
+
+      for (PacketInfo packetInfo : list) {
+        // FileUtils.writeStringToFile(fout, packetInfo.getJsonDoc(), true);
+        // FileUtils.writeStringToFile(fout, "\n", true);
+        // System.out.println(packetInfo.getJsonDoc());
+      }
+    }
+    long endTime = System.currentTimeMillis();
+
+    System.out.println("Time taken to process " + totalIterations + " events :"
+        + (endTime - startTime) + " milliseconds");
+
+    System.out
+        .println("With parallelism of "
+            + parallelism
+            + " estimated time to process "
+            + targetEvents
+            + " events: "
+            + (((((endTime - startTime) / totalIterations) * targetEvents) / parallelism) / 1000)
+            + " seconds");
+    System.out.println("With parallelism of " + parallelism
+        + " estimated # of events per second: "
+        + ((parallelism * 1000 * totalIterations) / (endTime - startTime))
+        + " events");
+    System.out.println("Expected Parallelism to process " + targetEvents
+        + " events in a second: "
+        + (targetEvents / ((1000 * totalIterations) / (endTime - startTime))));
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/a919cc19/opensoc-streaming/OpenSOC-Common/src/main/java/com/opensoc/pcap/PcapUtils.java
----------------------------------------------------------------------
diff --git a/opensoc-streaming/OpenSOC-Common/src/main/java/com/opensoc/pcap/PcapUtils.java b/opensoc-streaming/OpenSOC-Common/src/main/java/com/opensoc/pcap/PcapUtils.java
index 8d06caa..8f9520f 100644
--- a/opensoc-streaming/OpenSOC-Common/src/main/java/com/opensoc/pcap/PcapUtils.java
+++ b/opensoc-streaming/OpenSOC-Common/src/main/java/com/opensoc/pcap/PcapUtils.java
@@ -280,6 +280,33 @@ public class PcapUtils {
     return sb.toString();
   }
 
+  /**
+   * Gets the short session key. (5-tuple only)
+   * 
+   * @param srcIp
+   *          the src ip
+   * @param dstIp
+   *          the dst ip
+   * @param protocol
+   *          the protocol
+   * @param srcPort
+   *          the src port
+   * @param dstPort
+   *          the dst port
+   * @return the session key
+   */
+  public static String getShortSessionKey(String srcIp, String dstIp, int protocol,
+      int srcPort, int dstPort) {
+    String keySeperator = "-";
+    StringBuffer sb = new StringBuffer(40);
+    sb.append(convertIpv4IpToHex(srcIp)).append(keySeperator)
+        .append(convertIpv4IpToHex(dstIp)).append(keySeperator)
+        .append(protocol).append(keySeperator).append(srcPort)
+        .append(keySeperator).append(dstPort);
+
+    return sb.toString();
+  }
+  
   // public static String convertPortToHex(String portNumber) {
   // return convertPortToHex(Integer.valueOf(portNumber));
   //

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/a919cc19/opensoc-streaming/OpenSOC-Common/src/main/java/com/opensoc/pcap/asdf.java
----------------------------------------------------------------------
diff --git a/opensoc-streaming/OpenSOC-Common/src/main/java/com/opensoc/pcap/asdf.java b/opensoc-streaming/OpenSOC-Common/src/main/java/com/opensoc/pcap/asdf.java
deleted file mode 100644
index db2c2b2..0000000
--- a/opensoc-streaming/OpenSOC-Common/src/main/java/com/opensoc/pcap/asdf.java
+++ /dev/null
@@ -1,5 +0,0 @@
-package com.opensoc.pcap;
-
-public class asdf {
-
-}